feat(a2a): add an A2A Agent component to call remote agents#13859
feat(a2a): add an A2A Agent component to call remote agents#13859ogabrielluiz wants to merge 58 commits into
Conversation
Add an explicit flow_type enum (workflow|agent), a2a_enabled, and a2a_card_overrides to the Flow model via an additive migration following the access_type precedent. Surface the fields in the flow create/read/update schemas, add a flow_type filter to the list endpoint, and add a suggest_flow_type auto-detect helper (UI suggestion only, never the stored source of truth). This is F1, the foundation of the A2A protocol support epic.
cz/hitl-v2 regressed the release-1.11.0 fix: get_type_dict called the async get_all_types_dict synchronously, so the sync _build_dict received an un-awaited coroutine and the first Graph.from_payload raised "'coroutine' object is not a mapping" — breaking json_schema_from_flow and MCP tool-schema generation. Restore the sync build_custom_components path (identical to release-1.11.0) and its regression test.
Public GET /api/v1/a2a/{flow_id}/.well-known/agent-card.json serving a
spec-valid A2A agent card for agent-typed, a2a_enabled flows. Gated by
LANGFLOW_A2A_ENABLED (default off) via a per-request guard; the router is
mounted unconditionally so the flag can flip at runtime. Reflects folder
apikey auth onto securitySchemes (reflect-only; F6 enforces). The card
degrades to an empty input schema on unbuildable flows and ignores
malformed a2a_card_overrides instead of 500ing the public endpoint.
Adds a2a-sdk as a direct langflow-base dependency.
Adds POST /api/v1/a2a/{flow_id}/jsonrpc serving the A2A JSON-RPC surface:
message/send runs the flow through the v2 sync surface and returns a
terminal Task, and tasks/get reads it back from an in-memory store. Gated
behind LANGFLOW_A2A_ENABLED and the per-flow agent type + a2a_enabled flag,
like the card route.
The protocol-pure executor (a2a_executor.py) holds no langflow imports and
reaches the flow via an injected run_flow callable, so the protocol layer
stays extractable to lfx. Unexpected build/run failures are logged
server-side and surface a generic message to the unauthenticated caller;
multi-output flows emit each text channel instead of dropping them.
Replaces the in-memory module-singleton task store with DurableTaskStore, a DB-backed a2a-sdk TaskStore that persists each protobuf Task as a JSON blob in a new a2a_tasks table (composite PK (id, owner), JSONB on postgres). Tasks now survive a restart and are visible across workers; save() uses a writing session, get() a read-only one. Adds the A2ATask model + alembic migration, and tests covering save/get round-trip, owner scoping, and a real_services test that runs the migration and the store on both sqlite and postgres.
Advertises the streaming capability on both the handler card (the SDK's @Validate gate) and the discovery card, which mounts message/stream and tasks/resubscribe through the existing dispatcher. message/stream replays the run lifecycle (submitted -> working -> artifact -> completed) as SSE frames by re-emitting the same executor events; tasks/resubscribe re-attaches to a live run and returns a spec error for terminal/cross-worker tasks (tasks/get covers terminal reads). Per-token deltas and durable cross-worker re-attach are follow-ups. Tests cover the advertised capability, the SSE lifecycle, and that resubscribe to a finished task ends with an error frame rather than hanging.
When a flow's folder requires apikey auth, the JSON-RPC route now requires a valid langflow API key in the x-api-key header whose owner is the flow owner, before any dispatch (covers message/send, message/stream, tasks/get, tasks/resubscribe). Flows in none folders stay public (the A2A public-agent model); the discovery card stays public by spec. Uses check_key directly rather than api_key_security, which under AUTO_LOGIN returns the superuser for a missing key and would bypass the gate. The key is scoped to flow.user_id since the flow runs as its owner. The auth_type read is extracted into a shared folder_auth_type helper so card-advertise and route-enforce can't drift.
message/send threads the request's contextId into the flow run as its session_id, so calls sharing a contextId share chat memory (one conversation, one session). The SDK mints a contextId when the client omits one, so each conversation stays isolated. An over-long contextId falls back to the per-flow default session rather than reaching the indexed session_id column.
When a flow pauses at a HumanInput node, message/send now returns a Task in the input-required state carrying the prompt, and a follow-up message/send on the same taskId supplies the human decision and resumes the run to completion or the next pause. Suspend/resume rides on the lfx checkpoint primitives (GraphCheckpoint, resume_from_checkpoint, human_input_decisions) rather than the langflow job machinery, so it stays portable to lfx serve. The paused graph checkpoint persists in a new a2a_checkpoints table keyed by the task id; the v2 sync runner gains an opt-in checkpoint store that returns a suspended response instead of failing on a pause (off by default, so non-HITL callers are unchanged). A reply that matches no offered action re-parks the task so the human can answer again instead of burning it. A checkpoint is loaded only for its own flow, so a task parked under one flow cannot be resumed via another flow's endpoint.
Flips on the agent card's push_notifications capability and wires the SDK push-config store + sender into the shared handler, so a client can register a webhook (tasks/pushNotificationConfig/set or the message/send configuration) and receive task lifecycle updates. The webhook URL is caller-controlled on a public endpoint, so it's validated at registration: only http/https to a host that resolves entirely to public addresses is allowed; loopback, private, link-local (including the cloud metadata IP), reserved, multicast, and unspecified targets are rejected. LANGFLOW_A2A_ALLOW_PRIVATE_WEBHOOKS opts out for a trusted internal network. Config storage is in-memory per worker for now.
Langflow can serve A2A agents; this is the other direction. The A2A Agent component takes a remote agent's URL and a message, resolves the agent card, sends a message/send via the a2a-sdk client, and returns the reply text. An optional API key is sent as x-api-key for auth-gated agents. The transport logic is factored into a call_a2a_agent helper so it can be driven with an in-process client; the test exercises a real card-resolve + message/send round-trip against Langflow's own A2A server (an echo agent flow), with no mocks.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. 🗂️ Base branches to auto review (1)
Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Test Coverage AdvisorNo source changes detected without accompanying tests. Thanks for keeping coverage up! 🎉
|
|
Caution Review failedAn error occurred during the review process. Please try again later. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
# Conflicts: # src/backend/base/langflow/api/v1/a2a.py
# Conflicts: # src/backend/base/langflow/api/v1/a2a_utils.py
# Conflicts: # src/backend/tests/unit/api/v1/test_a2a.py
|
Hey @erichare, thanks. I routed both the card fetch and message/send through |
…nd SSRF-validate instead of reject The request hook rejected any off-origin hop, which also rejects spec-compliant agents whose card advertises an RPC url on a different origin than the configured agent_url. Instead, strip the x-api-key on off-origin requests (the key never leaves the configured agent) and SSRF-validate the off-origin target in the hook (the transport does not pin that host), keeping internal/metadata IPs blocked. Same-origin requests are unchanged.
|
Hey @erichare, I changed the off-origin handling here after your approval, so worth another look. Instead of rejecting any off-origin hop, it now strips the x-api-key on off-origin requests and SSRF-validates the target in the hook (the transport doesn't pin off-origin hosts, so the hook is the only check on that path). The reject version broke spec-compliant agents whose card declares an RPC url on a different origin. Both your asks still hold: every hop is SSRF-validated, and the key never leaves the configured agent origin. It doesn't fully close one thing: off-origin hops aren't DNS-pinned, so a TTL=0 rebind could still reach an internal IP. The key is already stripped though, so that's blind SSRF, not exfil. Tests cover the strip and the internal-block. Commit is d66b3d7. |
erichare
left a comment
There was a problem hiding this comment.
@ogabrielluiz can you check these two things:
Findings
-
[P1] Off-origin A2A RPC URLs are revalidated but not DNS-pinned.
Insrc/lfx/src/lfx/components/models_and_agents/a2a_agent.py, the request hook callsvalidate_and_resolve_url()for off-origin URLs but discards the returned validated IPs, while the client transport only pins the originalagent_urlhost. A malicious agent card can pass validation with a public DNS answer, then rebind before httpx makes the normal unpinned connection. Keep off-origin RPC URLs rejected, or add per-request/per-host DNS pinning for the validated off-origin target before allowing it.
langflow/src/lfx/src/lfx/components/models_and_agents/a2a_agent.py
Lines 56 to 74 in 3d057f2
-
[P1] OAuth-protected folders become public A2A agents.
_enforce_a2a_auth()only enforces"apikey"folders and returns public access for every other auth type, while the card code explicitly advertises no security for"oauth". That means an A2A-enabled flow in an OAuth-protected folder can be run anonymously as the flow owner. Until OAuth enforcement is implemented for A2A, unsupported protected folder auth types should fail closed instead of being treated as public.
langflow/src/backend/base/langflow/api/v1/a2a.py
Lines 95 to 102 in 3d057f2
langflow/src/backend/base/langflow/api/v1/a2a_utils.py
Lines 160 to 162 in 3d057f2
Adds an A2A Agent component. Builds on #13858.
Langflow can serve flows as A2A agents (the A2A routes); this is the other direction: a component that lets a flow call out to any spec-compliant A2A agent.
<url>/.well-known/agent-card.json, sends onemessage/sendthrough the a2a-sdk client, and returns the reply text (joined from the response artifacts, falling back to the task's status message).x-api-keyfor auth-gated agents. The same client fetches the card and posts the message, so the key reaches both.The transport is factored into a
call_a2a_agent(agent_url, message, *, httpx_client)helper so the round-trip can be driven with an in-process client. The test points it at Langflow's own A2A server (an echo agent flow) and asserts the reply comes back, so the whole card-resolve + send path runs end to end with no mocks. That also doubles as a conformance check that the server we built speaks the standard client's protocol.