Skip to content

Conversation

@ultmaster
Copy link
Contributor

@ultmaster ultmaster commented Nov 12, 2025

This will be the last missing piece before we disable access log in the store.

Copilot AI review requested due to automatic review settings November 12, 2025 08:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements worker status tracking and visualization in the AgentLightning dashboard. The feature allows users to monitor runner activity, including their current status (idle/busy/unknown), heartbeat metrics, and assignment to rollouts and attempts.

Key Changes:

  • Added Worker type and three new store methods (query_workers, get_worker_by_id, update_worker) to track runner telemetry
  • Implemented automatic worker status transitions based on rollout lifecycle events (dequeue, attempt updates)
  • Created background heartbeat mechanism that periodically reports system metrics (CPU, memory, disk, GPU) to the store
  • Built new "Runners" page in the dashboard with filtering, sorting, and detail views

Reviewed Changes

Copilot reviewed 37 out of 38 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
uv.lock Added gpustat, blessed, and nvidia-ml-py dependencies for system monitoring
pyproject.toml Added gpustat to project dependencies
agentlightning/types/core.py Defined Worker model with status tracking and telemetry fields
agentlightning/utils/system_snapshot.py New utility to collect CPU, memory, disk, network, and GPU metrics
agentlightning/store/base.py Extended LightningStore interface with worker management methods
agentlightning/store/memory.py Implemented worker tracking with automatic status transitions
agentlightning/store/client_server.py Added REST endpoints for worker queries and updates
agentlightning/store/threading.py Delegated worker methods to underlying store
agentlightning/runner/agent.py Added configurable heartbeat loop to report worker status
dashboard/src/types.ts Defined Worker TypeScript type matching Python model
dashboard/src/features/workers/* Created Redux slice, selectors, and API integration for workers
dashboard/src/features/rollouts/api.ts Added getWorkers query endpoint
dashboard/src/pages/Workers.page.tsx New page for viewing and searching workers
dashboard/src/components/WorkersTable.component.tsx Table component with status badges and telemetry columns
dashboard/src/components/AppDrawer.component.tsx Added worker detail drawer support
dashboard/src/layouts/AppLayout.tsx Added "Runners" navigation item
dashboard/src/Router.tsx Added /runners route
dashboard/src/utils/mock.ts Added worker filtering, sorting, and mock handlers
docs/deep-dive/store.md Documented worker telemetry behavior
tests/utils/test_system_snapshot.py Tests for system metrics collection
tests/runner/test_agent_runner.py Tests for heartbeat functionality
tests/store/*.py Tests for worker store methods and status transitions
dashboard/test-utils/python-server.py Added mock worker data for development
dashboard/src/utils/mock.test.ts Tests for worker mock utilities
dashboard/src/features/workers/workers.test.ts Integration tests for worker feature
dashboard/src/pages/Workers.page.story.tsx Storybook stories for Workers page
dashboard/src/components/WorkersTable.story.tsx Storybook stories for WorkersTable

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +115 to +116
class UpdateWorkerRequest(BaseModel):
heartbeat_stats: Optional[Dict[str, Any]] = None
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UpdateWorkerRequest model allows heartbeat_stats: Optional[Dict[str, Any]] = None, which permits explicit None values. However, the test at line 1064 in test_restful.py expects a 400 error when None is passed. The validation logic in _get_mandatory_field_or_unset (lines 670-680) converts explicit None to raise an error, but this behavior isn't documented in the UpdateWorkerRequest model. Consider adding a Pydantic validator to reject None values explicitly, or document why None is forbidden while being typed as Optional.

Copilot uses AI. Check for mistakes.
- [`update_attempt(..., worker_id=...)`][agentlightning.LightningStore.update_attempt] drives the worker status machine. Assigning an attempt marks the worker **busy** and stamps `last_busy_time`; finishing with `status in {"succeeded","failed"}` switches to **idle**, while watchdog transitions such as `timeout`/`unresponsive` make the worker **unknown** and clear `current_rollout_id` / `current_attempt_id`.
- [`update_worker(...)`][agentlightning.LightningStore.update_worker] is reserved for heartbeats. It snapshots optional `heartbeat_stats` and always updates `last_heartbeat_time`.

Because every transition flows through these APIs, there is no longer a manual way to flip worker status out of band; telemetry stays consistent with rollout execution.
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states that worker status is "now derived automatically" and that "there is no longer a manual way to flip worker status out of band" (line 83). However, this claim isn't entirely accurate - the update_worker API can be called with any worker_id and will create a new worker with status "idle" if one doesn't exist (see _get_or_create_worker at line 397 in memory.py). This means external callers could create arbitrary worker records. Clarify whether this is intentional or if update_worker should only modify existing workers.

Suggested change
Because every transition flows through these APIs, there is no longer a manual way to flip worker status out of band; telemetry stays consistent with rollout execution.
Because every transition flows through these APIs, worker status is derived automatically from rollout execution and heartbeats. Note, however, that calling `update_worker` with a new `worker_id` will create a new worker record with status "idle" if one does not exist. Thus, while manual status changes are not allowed, new worker records can be created externally via heartbeats.
Copilot uses AI. Check for mistakes.
Comment on lines 376 to 377
while not stop_evt.is_set():
asyncio.run(self._emit_heartbeat(store))
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In thread mode, the heartbeat loop uses asyncio.run(self._emit_heartbeat(store)) which creates a new event loop for each heartbeat. This is inefficient and could lead to issues if the store client maintains persistent connections or state. Consider using asyncio.get_event_loop().run_until_complete() with proper event loop management, or document that thread mode is not recommended for production use.

Suggested change
while not stop_evt.is_set():
asyncio.run(self._emit_heartbeat(store))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while not stop_evt.is_set():
loop.run_until_complete(self._emit_heartbeat(store))
Copilot uses AI. Check for mistakes.
Comment on lines +274 to +275
if worker.status != "unknown":
worker.last_idle_time = now
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The worker status transitions could result in an incorrect "unknown" state. When an attempt status is set to "timeout" or "unresponsive", the code sets last_idle_time and changes status to "unknown". However, the docstring and comments indicate that "unknown" means the worker has never been busy or idle. Setting last_idle_time for a worker that's "unknown" is semantically inconsistent. Consider either not setting last_idle_time for unknown workers, or documenting that "unknown" can also represent workers that have timed out.

Suggested change
if worker.status != "unknown":
worker.last_idle_time = now
Copilot uses AI. Check for mistakes.
Comment on lines +1080 to +1082
for rollout_id in self._running_rollout_ids:
rollout = self._rollouts.get(rollout_id)
if rollout is not None and rollout.status in ["preparing", "running"]:
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _running_rollout_ids set is added as an optimization for the healthcheck (line 1080-1082), but it's not always kept in sync with the actual rollout states. For example, if a rollout transitions from "running" to "succeeded" via update_rollout, line 967 removes it from the set. However, if a rollout is updated through other paths or if there are errors during state transitions, the set could become stale. Consider adding assertions or periodic reconciliation to ensure the cache stays consistent with _rollouts.

Copilot uses AI. Check for mistakes.
Comment on lines +91 to 96
accessor: 'sequenceId',
title: 'Seq.',
sortable: true,
render: ({ sequenceId }) => <Text size='sm'>{sequenceId}</Text>,
},
{
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The addition of the sequenceId column to the TracesTable appears unrelated to the worker status feature described in this PR. If this change is intentional, it should be mentioned in the PR description or committed separately to maintain focused changes. If unintentional, it should be reverted.

Suggested change
accessor: 'sequenceId',
title: 'Seq.',
sortable: true,
render: ({ sequenceId }) => <Text size='sm'>{sequenceId}</Text>,
},
{
Copilot uses AI. Check for mistakes.
Comment on lines +215 to +216
heartbeat_stats: Optional[Dict[str, Any]] = None
"""Statistics about the worker's heartbeat."""
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heartbeat_stats field accepts arbitrary Dict[str, Any] without size limits or validation. Malicious or buggy clients could send extremely large payloads (e.g., megabytes of nested data) in heartbeat updates, potentially causing memory exhaustion or performance degradation. Consider adding validation for the maximum size/depth of the heartbeat_stats dictionary, or document expected usage patterns and limits.

Copilot uses AI. Check for mistakes.
Comment on lines +42 to +53
with suppress(Exception):
for g in GPUStatCollection.new_query().gpus: # type: ignore
g = cast(GPUStat, g)
gpus.append(
{
"gpu": g.name, # type: ignore
"util_pct": g.utilization,
"mem_used_mb": g.memory_used,
"mem_total_mb": g.memory_total,
"temp_c": g.temperature,
}
)
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GPU collection uses a broad except Exception clause (line 42) that silently suppresses all errors. While this prevents crashes when GPU libraries aren't available, it also hides genuine errors like network timeouts, permission issues, or bugs in gpustat. Consider logging suppressed exceptions at debug level, or catching more specific exception types (e.g., ImportError, RuntimeError) to allow unexpected errors to surface during development.

Copilot uses AI. Check for mistakes.
"""Create or update a worker entry."""
async with self._lock:
worker = self._get_or_create_worker(worker_id)
if not isinstance(heartbeat_stats, Unset):
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update_worker method accepts heartbeat_stats that can be either Dict[str, Any] or Unset, but when it's not Unset, the code converts it to a dict with dict(heartbeat_stats). This raises a TypeError if heartbeat_stats=None is explicitly passed (as opposed to being UNSET). The API should either reject None values at the FastAPI layer (currently it doesn't based on line 116 allowing Optional[Dict[str, Any]]), or handle None gracefully in the implementation. The test at line 489 expects a TypeError, which suggests validation happens somewhere, but the validation message would be clearer if it explicitly checked for None before the dict() conversion.

Suggested change
if not isinstance(heartbeat_stats, Unset):
if not isinstance(heartbeat_stats, Unset):
if heartbeat_stats is None:
raise TypeError("heartbeat_stats cannot be None; pass UNSET or a valid dict.")
Copilot uses AI. Check for mistakes.
Comment on lines +259 to +285
def _sync_worker_with_attempt(self, attempt: Attempt) -> None:
worker_id = attempt.worker_id
if not worker_id:
return

worker = self._get_or_create_worker(worker_id)
now = time.time()

if attempt.status in ("succeeded", "failed"):
if worker.status != "idle":
worker.last_idle_time = now
worker.status = "idle"
worker.current_rollout_id = None
worker.current_attempt_id = None
elif attempt.status in ("timeout", "unresponsive"):
if worker.status != "unknown":
worker.last_idle_time = now
worker.status = "unknown"
worker.current_rollout_id = None
worker.current_attempt_id = None
else:
transitioned = worker.status != "busy" or worker.current_attempt_id != attempt.attempt_id
if transitioned:
worker.last_busy_time = now
worker.status = "busy"
worker.current_rollout_id = attempt.rollout_id
worker.current_attempt_id = attempt.attempt_id
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _sync_worker_with_attempt method updates worker state based on attempt status, but there's no guarantee about the order of these updates when multiple attempts update concurrently. For example, if attempt A transitions to "succeeded" (setting worker to "idle") while attempt B (for a different rollout on the same worker) is simultaneously being updated to "running", the final worker state depends on which _sync_worker_with_attempt call happens last. Consider adding sequence numbers or timestamps to ensure worker state reflects the most recent attempt update, or document that workers should only handle one attempt at a time.

Copilot uses AI. Check for mistakes.
@ultmaster
Copy link
Contributor Author

/ci

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

🚀 CI Watcher for correlation id-3521026115-mhvtejv6 triggered by comment 3521026115
🏃‍♀️ Tracking 6 workflow run(s):

✅ All runs completed.

@ultmaster
Copy link
Contributor Author

/ci

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

🚀 CI Watcher for correlation id-3521145669-mhvufcn7 triggered by comment 3521145669
🏃‍♀️ Tracking 6 workflow run(s):

✅ All runs completed.

@ultmaster ultmaster merged commit 790ed3e into main Nov 12, 2025
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 participants