Skip to content

feat(a2a): add an A2A Agent component to call remote agents#13859

Open
ogabrielluiz wants to merge 58 commits into
feat/a2afrom
feat/a2a-client
Open

feat(a2a): add an A2A Agent component to call remote agents#13859
ogabrielluiz wants to merge 58 commits into
feat/a2afrom
feat/a2a-client

Conversation

@ogabrielluiz

Copy link
Copy Markdown
Contributor

Adds an A2A Agent component. Builds on #13858.

Langflow can serve flows as A2A agents (the A2A routes); this is the other direction: a component that lets a flow call out to any spec-compliant A2A agent.

  • The A2A Agent component takes a remote agent's URL and a message. It resolves the agent card from <url>/.well-known/agent-card.json, sends one message/send through the a2a-sdk client, and returns the reply text (joined from the response artifacts, falling back to the task's status message).
  • An optional API key is sent as x-api-key for auth-gated agents. The same client fetches the card and posts the message, so the key reaches both.

The transport is factored into a call_a2a_agent(agent_url, message, *, httpx_client) helper so the round-trip can be driven with an in-process client. The test points it at Langflow's own A2A server (an echo agent flow) and asserts the reply comes back, so the whole card-resolve + send path runs end to end with no mocks. That also doubles as a conformance check that the server we built speaks the standard client's protocol.

Add an explicit flow_type enum (workflow|agent), a2a_enabled, and
a2a_card_overrides to the Flow model via an additive migration following
the access_type precedent. Surface the fields in the flow create/read/update
schemas, add a flow_type filter to the list endpoint, and add a
suggest_flow_type auto-detect helper (UI suggestion only, never the stored
source of truth).

This is F1, the foundation of the A2A protocol support epic.
cz/hitl-v2 regressed the release-1.11.0 fix: get_type_dict called the
async get_all_types_dict synchronously, so the sync _build_dict received
an un-awaited coroutine and the first Graph.from_payload raised
"'coroutine' object is not a mapping" — breaking json_schema_from_flow
and MCP tool-schema generation. Restore the sync build_custom_components
path (identical to release-1.11.0) and its regression test.
Public GET /api/v1/a2a/{flow_id}/.well-known/agent-card.json serving a
spec-valid A2A agent card for agent-typed, a2a_enabled flows. Gated by
LANGFLOW_A2A_ENABLED (default off) via a per-request guard; the router is
mounted unconditionally so the flag can flip at runtime. Reflects folder
apikey auth onto securitySchemes (reflect-only; F6 enforces). The card
degrades to an empty input schema on unbuildable flows and ignores
malformed a2a_card_overrides instead of 500ing the public endpoint.

Adds a2a-sdk as a direct langflow-base dependency.
Adds POST /api/v1/a2a/{flow_id}/jsonrpc serving the A2A JSON-RPC surface:
message/send runs the flow through the v2 sync surface and returns a
terminal Task, and tasks/get reads it back from an in-memory store. Gated
behind LANGFLOW_A2A_ENABLED and the per-flow agent type + a2a_enabled flag,
like the card route.

The protocol-pure executor (a2a_executor.py) holds no langflow imports and
reaches the flow via an injected run_flow callable, so the protocol layer
stays extractable to lfx. Unexpected build/run failures are logged
server-side and surface a generic message to the unauthenticated caller;
multi-output flows emit each text channel instead of dropping them.
Replaces the in-memory module-singleton task store with DurableTaskStore, a
DB-backed a2a-sdk TaskStore that persists each protobuf Task as a JSON blob in a
new a2a_tasks table (composite PK (id, owner), JSONB on postgres). Tasks now
survive a restart and are visible across workers; save() uses a writing session,
get() a read-only one.

Adds the A2ATask model + alembic migration, and tests covering save/get
round-trip, owner scoping, and a real_services test that runs the migration and
the store on both sqlite and postgres.
Advertises the streaming capability on both the handler card (the SDK's @Validate
gate) and the discovery card, which mounts message/stream and tasks/resubscribe
through the existing dispatcher. message/stream replays the run lifecycle
(submitted -> working -> artifact -> completed) as SSE frames by re-emitting the
same executor events; tasks/resubscribe re-attaches to a live run and returns a
spec error for terminal/cross-worker tasks (tasks/get covers terminal reads).

Per-token deltas and durable cross-worker re-attach are follow-ups. Tests cover
the advertised capability, the SSE lifecycle, and that resubscribe to a finished
task ends with an error frame rather than hanging.
When a flow's folder requires apikey auth, the JSON-RPC route now requires a valid
langflow API key in the x-api-key header whose owner is the flow owner, before any
dispatch (covers message/send, message/stream, tasks/get, tasks/resubscribe). Flows
in none folders stay public (the A2A public-agent model); the discovery card stays
public by spec.

Uses check_key directly rather than api_key_security, which under AUTO_LOGIN returns
the superuser for a missing key and would bypass the gate. The key is scoped to
flow.user_id since the flow runs as its owner. The auth_type read is extracted into a
shared folder_auth_type helper so card-advertise and route-enforce can't drift.
message/send threads the request's contextId into the flow run as its session_id, so calls sharing a contextId share chat memory (one conversation, one session). The SDK mints a contextId when the client omits one, so each conversation stays isolated. An over-long contextId falls back to the per-flow default session rather than reaching the indexed session_id column.
When a flow pauses at a HumanInput node, message/send now returns a Task in the input-required state carrying the prompt, and a follow-up message/send on the same taskId supplies the human decision and resumes the run to completion or the next pause.

Suspend/resume rides on the lfx checkpoint primitives (GraphCheckpoint, resume_from_checkpoint, human_input_decisions) rather than the langflow job machinery, so it stays portable to lfx serve. The paused graph checkpoint persists in a new a2a_checkpoints table keyed by the task id; the v2 sync runner gains an opt-in checkpoint store that returns a suspended response instead of failing on a pause (off by default, so non-HITL callers are unchanged).

A reply that matches no offered action re-parks the task so the human can answer again instead of burning it. A checkpoint is loaded only for its own flow, so a task parked under one flow cannot be resumed via another flow's endpoint.
Flips on the agent card's push_notifications capability and wires the SDK push-config store + sender into the shared handler, so a client can register a webhook (tasks/pushNotificationConfig/set or the message/send configuration) and receive task lifecycle updates.

The webhook URL is caller-controlled on a public endpoint, so it's validated at registration: only http/https to a host that resolves entirely to public addresses is allowed; loopback, private, link-local (including the cloud metadata IP), reserved, multicast, and unspecified targets are rejected. LANGFLOW_A2A_ALLOW_PRIVATE_WEBHOOKS opts out for a trusted internal network. Config storage is in-memory per worker for now.
Langflow can serve A2A agents; this is the other direction. The A2A Agent component takes a remote agent's URL and a message, resolves the agent card, sends a message/send via the a2a-sdk client, and returns the reply text. An optional API key is sent as x-api-key for auth-gated agents.

The transport logic is factored into a call_a2a_agent helper so it can be driven with an in-process client; the test exercises a real card-resolve + message/send round-trip against Langflow's own A2A server (an echo agent flow), with no mocks.
@coderabbitai

coderabbitai Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

🗂️ Base branches to auto review (1)
  • release-.*

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: cfd54c14-f9ea-4a57-8e3e-4a82066cec59

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/a2a-client

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@github-actions github-actions Bot added the enhancement New feature or request label Jun 26, 2026
@github-actions

Copy link
Copy Markdown
Contributor

✅ Test Coverage Advisor

No source changes detected without accompanying tests. Thanks for keeping coverage up! 🎉

Advisory check only — never blocks merge.

@github-actions

github-actions Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Frontend Unit Test Coverage Report

Coverage Summary

Lines Statements Branches Functions
Coverage: 43%
43.9% (59508/135525) 69.7% (8220/11793) 42.48% (1353/3185)

Unit Test Results

Tests Skipped Failures Errors Time
5031 0 💤 0 ❌ 0 🔥 10m 15s ⏱️
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Caution

Review failed

An error occurred during the review process. Please try again later.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/a2a-client

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@github-actions github-actions Bot removed the enhancement New feature or request label Jun 29, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jun 29, 2026
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jun 29, 2026
@ogabrielluiz

Copy link
Copy Markdown
Contributor Author

Hey @erichare, thanks. I routed both the card fetch and message/send through lfx.utils.ssrf_protection so agent_url is validated, and pinned the api key to the configured agent origin so it can't be sent to whatever host the card declares (off-origin hops are rejected, redirects off). It's in d6bb6c5. Mind re-reviewing?

@ogabrielluiz ogabrielluiz requested a review from erichare June 29, 2026 23:48

@erichare erichare left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions Bot added the lgtm This PR has been approved by a maintainer label Jun 30, 2026
…nd SSRF-validate instead of reject

The request hook rejected any off-origin hop, which also rejects spec-compliant agents
whose card advertises an RPC url on a different origin than the configured agent_url.
Instead, strip the x-api-key on off-origin requests (the key never leaves the configured
agent) and SSRF-validate the off-origin target in the hook (the transport does not pin
that host), keeping internal/metadata IPs blocked. Same-origin requests are unchanged.
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jun 30, 2026
@ogabrielluiz ogabrielluiz requested a review from erichare June 30, 2026 00:49
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jun 30, 2026
@ogabrielluiz

Copy link
Copy Markdown
Contributor Author

Hey @erichare, I changed the off-origin handling here after your approval, so worth another look. Instead of rejecting any off-origin hop, it now strips the x-api-key on off-origin requests and SSRF-validates the target in the hook (the transport doesn't pin off-origin hosts, so the hook is the only check on that path). The reject version broke spec-compliant agents whose card declares an RPC url on a different origin. Both your asks still hold: every hop is SSRF-validated, and the key never leaves the configured agent origin. It doesn't fully close one thing: off-origin hops aren't DNS-pinned, so a TTL=0 rebind could still reach an internal IP. The key is already stripped though, so that's blind SSRF, not exfil. Tests cover the strip and the internal-block. Commit is d66b3d7.

Base automatically changed from feat/a2a-push to feat/a2a June 30, 2026 14:50
@github-actions github-actions Bot added enhancement New feature or request and removed enhancement New feature or request labels Jun 30, 2026
@erichare erichare self-requested a review June 30, 2026 15:10

@erichare erichare left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ogabrielluiz can you check these two things:

Findings

  • [P1] Off-origin A2A RPC URLs are revalidated but not DNS-pinned.
    In src/lfx/src/lfx/components/models_and_agents/a2a_agent.py, the request hook calls validate_and_resolve_url() for off-origin URLs but discards the returned validated IPs, while the client transport only pins the original agent_url host. A malicious agent card can pass validation with a public DNS answer, then rebind before httpx makes the normal unpinned connection. Keep off-origin RPC URLs rejected, or add per-request/per-host DNS pinning for the validated off-origin target before allowing it.

    async def _guard_request(request: httpx.Request) -> None:
    if _origin(request.url) == agent_origin:
    return
    # Off-origin hop: never leak the configured api key to a card-declared foreign host.
    request.headers.pop("x-api-key", None)
    # The transport doesn't pin this host, so SSRF-validate it here (resolves the hostname
    # and raises SSRFProtectionError if any resolved IP is blocked) before the connection
    # opens. to_thread keeps the blocking DNS resolution off the event loop.
    await asyncio.to_thread(validate_and_resolve_url, str(request.url))
    client_kwargs = {
    "timeout": timeout,
    "headers": {"x-api-key": api_key} if api_key else None,
    "follow_redirects": False,
    "event_hooks": {"request": [_guard_request]},
    }
    hostname = httpx.URL(agent_url).host
    if validated_ips and hostname:
    return create_ssrf_protected_client(hostname=hostname, validated_ips=validated_ips, **client_kwargs)

  • [P1] OAuth-protected folders become public A2A agents.
    _enforce_a2a_auth() only enforces "apikey" folders and returns public access for every other auth type, while the card code explicitly advertises no security for "oauth". That means an A2A-enabled flow in an OAuth-protected folder can be run anonymously as the flow owner. Until OAuth enforcement is implemented for A2A, unsupported protected folder auth types should fail closed instead of being treated as public.

    ``"none"`` / missing / no-folder stay public; ``"oauth"`` stays public this slice
    (the card advertises no scheme for it, so a discovery client sends no key).
    """
    # Short writable session (check_key flushes usage counters), closed before
    # dispatch so no lock is held across the up-to-300s run.
    async with session_scope() as session:
    if await folder_auth_type(flow, session) != "apikey":
    return # public agent

    # "none" / missing / "oauth" -> advertise no security. oauth has no advertised
    # scheme yet, so the route leaves it public until that scheme lands.
    return None, None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request lgtm This PR has been approved by a maintainer

2 participants