-
Notifications
You must be signed in to change notification settings - Fork 1.1k
View worker status on Dashboard #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ure/worker-status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements worker status tracking and visualization in the AgentLightning dashboard. The feature allows users to monitor runner activity, including their current status (idle/busy/unknown), heartbeat metrics, and assignment to rollouts and attempts.
Key Changes:
- Added
Workertype and three new store methods (query_workers,get_worker_by_id,update_worker) to track runner telemetry - Implemented automatic worker status transitions based on rollout lifecycle events (dequeue, attempt updates)
- Created background heartbeat mechanism that periodically reports system metrics (CPU, memory, disk, GPU) to the store
- Built new "Runners" page in the dashboard with filtering, sorting, and detail views
Reviewed Changes
Copilot reviewed 37 out of 38 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Added gpustat, blessed, and nvidia-ml-py dependencies for system monitoring |
| pyproject.toml | Added gpustat to project dependencies |
| agentlightning/types/core.py | Defined Worker model with status tracking and telemetry fields |
| agentlightning/utils/system_snapshot.py | New utility to collect CPU, memory, disk, network, and GPU metrics |
| agentlightning/store/base.py | Extended LightningStore interface with worker management methods |
| agentlightning/store/memory.py | Implemented worker tracking with automatic status transitions |
| agentlightning/store/client_server.py | Added REST endpoints for worker queries and updates |
| agentlightning/store/threading.py | Delegated worker methods to underlying store |
| agentlightning/runner/agent.py | Added configurable heartbeat loop to report worker status |
| dashboard/src/types.ts | Defined Worker TypeScript type matching Python model |
| dashboard/src/features/workers/* | Created Redux slice, selectors, and API integration for workers |
| dashboard/src/features/rollouts/api.ts | Added getWorkers query endpoint |
| dashboard/src/pages/Workers.page.tsx | New page for viewing and searching workers |
| dashboard/src/components/WorkersTable.component.tsx | Table component with status badges and telemetry columns |
| dashboard/src/components/AppDrawer.component.tsx | Added worker detail drawer support |
| dashboard/src/layouts/AppLayout.tsx | Added "Runners" navigation item |
| dashboard/src/Router.tsx | Added /runners route |
| dashboard/src/utils/mock.ts | Added worker filtering, sorting, and mock handlers |
| docs/deep-dive/store.md | Documented worker telemetry behavior |
| tests/utils/test_system_snapshot.py | Tests for system metrics collection |
| tests/runner/test_agent_runner.py | Tests for heartbeat functionality |
| tests/store/*.py | Tests for worker store methods and status transitions |
| dashboard/test-utils/python-server.py | Added mock worker data for development |
| dashboard/src/utils/mock.test.ts | Tests for worker mock utilities |
| dashboard/src/features/workers/workers.test.ts | Integration tests for worker feature |
| dashboard/src/pages/Workers.page.story.tsx | Storybook stories for Workers page |
| dashboard/src/components/WorkersTable.story.tsx | Storybook stories for WorkersTable |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class UpdateWorkerRequest(BaseModel): | ||
| heartbeat_stats: Optional[Dict[str, Any]] = None |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UpdateWorkerRequest model allows heartbeat_stats: Optional[Dict[str, Any]] = None, which permits explicit None values. However, the test at line 1064 in test_restful.py expects a 400 error when None is passed. The validation logic in _get_mandatory_field_or_unset (lines 670-680) converts explicit None to raise an error, but this behavior isn't documented in the UpdateWorkerRequest model. Consider adding a Pydantic validator to reject None values explicitly, or document why None is forbidden while being typed as Optional.
docs/deep-dive/store.md
Outdated
| - [`update_attempt(..., worker_id=...)`][agentlightning.LightningStore.update_attempt] drives the worker status machine. Assigning an attempt marks the worker **busy** and stamps `last_busy_time`; finishing with `status in {"succeeded","failed"}` switches to **idle**, while watchdog transitions such as `timeout`/`unresponsive` make the worker **unknown** and clear `current_rollout_id` / `current_attempt_id`. | ||
| - [`update_worker(...)`][agentlightning.LightningStore.update_worker] is reserved for heartbeats. It snapshots optional `heartbeat_stats` and always updates `last_heartbeat_time`. | ||
|
|
||
| Because every transition flows through these APIs, there is no longer a manual way to flip worker status out of band; telemetry stays consistent with rollout execution. |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation states that worker status is "now derived automatically" and that "there is no longer a manual way to flip worker status out of band" (line 83). However, this claim isn't entirely accurate - the update_worker API can be called with any worker_id and will create a new worker with status "idle" if one doesn't exist (see _get_or_create_worker at line 397 in memory.py). This means external callers could create arbitrary worker records. Clarify whether this is intentional or if update_worker should only modify existing workers.
| Because every transition flows through these APIs, there is no longer a manual way to flip worker status out of band; telemetry stays consistent with rollout execution. | |
| Because every transition flows through these APIs, worker status is derived automatically from rollout execution and heartbeats. Note, however, that calling `update_worker` with a new `worker_id` will create a new worker record with status "idle" if one does not exist. Thus, while manual status changes are not allowed, new worker records can be created externally via heartbeats. |
agentlightning/runner/agent.py
Outdated
| while not stop_evt.is_set(): | ||
| asyncio.run(self._emit_heartbeat(store)) |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In thread mode, the heartbeat loop uses asyncio.run(self._emit_heartbeat(store)) which creates a new event loop for each heartbeat. This is inefficient and could lead to issues if the store client maintains persistent connections or state. Consider using asyncio.get_event_loop().run_until_complete() with proper event loop management, or document that thread mode is not recommended for production use.
| while not stop_evt.is_set(): | |
| asyncio.run(self._emit_heartbeat(store)) | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| while not stop_evt.is_set(): | |
| loop.run_until_complete(self._emit_heartbeat(store)) |
| if worker.status != "unknown": | ||
| worker.last_idle_time = now |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The worker status transitions could result in an incorrect "unknown" state. When an attempt status is set to "timeout" or "unresponsive", the code sets last_idle_time and changes status to "unknown". However, the docstring and comments indicate that "unknown" means the worker has never been busy or idle. Setting last_idle_time for a worker that's "unknown" is semantically inconsistent. Consider either not setting last_idle_time for unknown workers, or documenting that "unknown" can also represent workers that have timed out.
| if worker.status != "unknown": | |
| worker.last_idle_time = now |
| for rollout_id in self._running_rollout_ids: | ||
| rollout = self._rollouts.get(rollout_id) | ||
| if rollout is not None and rollout.status in ["preparing", "running"]: |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _running_rollout_ids set is added as an optimization for the healthcheck (line 1080-1082), but it's not always kept in sync with the actual rollout states. For example, if a rollout transitions from "running" to "succeeded" via update_rollout, line 967 removes it from the set. However, if a rollout is updated through other paths or if there are errors during state transitions, the set could become stale. Consider adding assertions or periodic reconciliation to ensure the cache stays consistent with _rollouts.
| accessor: 'sequenceId', | ||
| title: 'Seq.', | ||
| sortable: true, | ||
| render: ({ sequenceId }) => <Text size='sm'>{sequenceId}</Text>, | ||
| }, | ||
| { |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The addition of the sequenceId column to the TracesTable appears unrelated to the worker status feature described in this PR. If this change is intentional, it should be mentioned in the PR description or committed separately to maintain focused changes. If unintentional, it should be reverted.
| accessor: 'sequenceId', | |
| title: 'Seq.', | |
| sortable: true, | |
| render: ({ sequenceId }) => <Text size='sm'>{sequenceId}</Text>, | |
| }, | |
| { |
| heartbeat_stats: Optional[Dict[str, Any]] = None | ||
| """Statistics about the worker's heartbeat.""" |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The heartbeat_stats field accepts arbitrary Dict[str, Any] without size limits or validation. Malicious or buggy clients could send extremely large payloads (e.g., megabytes of nested data) in heartbeat updates, potentially causing memory exhaustion or performance degradation. Consider adding validation for the maximum size/depth of the heartbeat_stats dictionary, or document expected usage patterns and limits.
| with suppress(Exception): | ||
| for g in GPUStatCollection.new_query().gpus: # type: ignore | ||
| g = cast(GPUStat, g) | ||
| gpus.append( | ||
| { | ||
| "gpu": g.name, # type: ignore | ||
| "util_pct": g.utilization, | ||
| "mem_used_mb": g.memory_used, | ||
| "mem_total_mb": g.memory_total, | ||
| "temp_c": g.temperature, | ||
| } | ||
| ) |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GPU collection uses a broad except Exception clause (line 42) that silently suppresses all errors. While this prevents crashes when GPU libraries aren't available, it also hides genuine errors like network timeouts, permission issues, or bugs in gpustat. Consider logging suppressed exceptions at debug level, or catching more specific exception types (e.g., ImportError, RuntimeError) to allow unexpected errors to surface during development.
| """Create or update a worker entry.""" | ||
| async with self._lock: | ||
| worker = self._get_or_create_worker(worker_id) | ||
| if not isinstance(heartbeat_stats, Unset): |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update_worker method accepts heartbeat_stats that can be either Dict[str, Any] or Unset, but when it's not Unset, the code converts it to a dict with dict(heartbeat_stats). This raises a TypeError if heartbeat_stats=None is explicitly passed (as opposed to being UNSET). The API should either reject None values at the FastAPI layer (currently it doesn't based on line 116 allowing Optional[Dict[str, Any]]), or handle None gracefully in the implementation. The test at line 489 expects a TypeError, which suggests validation happens somewhere, but the validation message would be clearer if it explicitly checked for None before the dict() conversion.
| if not isinstance(heartbeat_stats, Unset): | |
| if not isinstance(heartbeat_stats, Unset): | |
| if heartbeat_stats is None: | |
| raise TypeError("heartbeat_stats cannot be None; pass UNSET or a valid dict.") |
| def _sync_worker_with_attempt(self, attempt: Attempt) -> None: | ||
| worker_id = attempt.worker_id | ||
| if not worker_id: | ||
| return | ||
|
|
||
| worker = self._get_or_create_worker(worker_id) | ||
| now = time.time() | ||
|
|
||
| if attempt.status in ("succeeded", "failed"): | ||
| if worker.status != "idle": | ||
| worker.last_idle_time = now | ||
| worker.status = "idle" | ||
| worker.current_rollout_id = None | ||
| worker.current_attempt_id = None | ||
| elif attempt.status in ("timeout", "unresponsive"): | ||
| if worker.status != "unknown": | ||
| worker.last_idle_time = now | ||
| worker.status = "unknown" | ||
| worker.current_rollout_id = None | ||
| worker.current_attempt_id = None | ||
| else: | ||
| transitioned = worker.status != "busy" or worker.current_attempt_id != attempt.attempt_id | ||
| if transitioned: | ||
| worker.last_busy_time = now | ||
| worker.status = "busy" | ||
| worker.current_rollout_id = attempt.rollout_id | ||
| worker.current_attempt_id = attempt.attempt_id |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _sync_worker_with_attempt method updates worker state based on attempt status, but there's no guarantee about the order of these updates when multiple attempts update concurrently. For example, if attempt A transitions to "succeeded" (setting worker to "idle") while attempt B (for a different rollout on the same worker) is simultaneously being updated to "running", the final worker state depends on which _sync_worker_with_attempt call happens last. Consider adding sequence numbers or timestamps to ensure worker state reflects the most recent attempt update, or document that workers should only handle one attempt at a time.
|
/ci |
|
🚀 CI Watcher for correlation id-3521026115-mhvtejv6 triggered by comment 3521026115
✅ All runs completed. |
|
/ci |
|
🚀 CI Watcher for correlation id-3521145669-mhvufcn7 triggered by comment 3521145669
✅ All runs completed. |
This will be the last missing piece before we disable access log in the store.