-
Notifications
You must be signed in to change notification settings - Fork 5.1k
Handle SSE event stream reconnection #17523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Greptile OverviewGreptile SummaryThis PR improves SSE event stream reconnection handling by making the system resilient to pod restarts, server restarts, and Redis data loss. The implementation prevents DDoS scenarios by spacing client reconnection attempts randomly over 2 minutes. Key Changes:
Issues Found:
The changes are well-architected and handle the edge cases described in the PR. The random backoff strategy is a cost-effective solution to prevent thundering herd problems during cluster-wide restarts. Confidence Score: 4.5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client as Frontend Client
participant SSE as SSE Client (useSseClient)
participant Server as GraphQL Server
participant Redis as Redis Cache
participant Resolver as WorkspaceEventEmitterResolver
participant Service as EventStreamService
Note over Client,Service: Initial Connection Flow
Client->>SSE: Initialize SSE connection
SSE->>Server: Connect with Bearer token
Server->>Resolver: onEventSubscription(eventStreamId)
Resolver->>Service: createEventStreamIfItDoesNotExist()
Service->>Redis: Check if stream exists
alt Stream does not exist
Service->>Redis: Create new event stream with TTL
Service->>Redis: Add to activeStreams set
else Stream already exists
Service-->>Resolver: Return (no-op)
end
Resolver->>Service: subscribeToEventStream()
Service-->>Resolver: Return iterator
Resolver-->>SSE: Stream established
SSE->>SSE: Call handleSSEClientConnected()
SSE->>SSE: Clear activeQueryListenersState
Note over Client,Service: Pod/Server Restart Scenario
Server->>Server: Pod crashes/restarts
Server--xSSE: Connection lost
SSE->>SSE: Detect connection failure
SSE->>SSE: Generate random wait (0-2min)
SSE->>SSE: sleep(randomWaitTime)
SSE->>Server: Retry connection
Server->>Resolver: onEventSubscription(eventStreamId)
Resolver->>Service: createEventStreamIfItDoesNotExist()
Service->>Redis: Check if stream exists
alt Stream missing (Redis cleared)
Service->>Redis: Recreate event stream
else Stream still exists
Service-->>Resolver: Return (no-op)
end
Resolver->>Service: subscribeToEventStream()
Service-->>Resolver: Return new iterator
Resolver-->>SSE: Reconnected
SSE->>SSE: Call handleSSEClientConnected()
SSE->>SSE: Clear activeQueryListenersState
Note over Client,Service: Add/Remove Query Operations
Client->>Resolver: addQueryToEventStream(queryId)
Resolver->>Service: createEventStreamIfItDoesNotExist()
alt Stream missing
Service->>Redis: Recreate stream
end
Resolver->>Service: isAuthorized()
Service->>Redis: Verify auth context
Service-->>Resolver: Authorization result
Resolver->>Service: addQuery()
Service->>Redis: Update stream with new query
Service-->>Resolver: Success
Resolver-->>Client: true
Client->>Resolver: removeQueryFromEventStream(queryId)
Resolver->>Service: createEventStreamIfItDoesNotExist()
alt Stream missing
Service->>Redis: Recreate stream
end
Resolver->>Service: isAuthorized()
Resolver->>Service: removeQuery()
Service->>Redis: Remove query from stream
Service-->>Resolver: Success
Resolver-->>Client: true
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, 1 comment
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts
Outdated
Show resolved
Hide resolved
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 4 files
|
🚀 Preview Environment Ready! Your preview environment is available at: http://bore.pub:6803 This environment will automatically shut down when the PR is closed or after 5 hours. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Improves resilience of the SSE-based event stream by enabling reconnection handling across pod/dev server restarts and Redis key loss, with client-side retry jitter to reduce reconnect spikes.
Changes:
- Server: make event stream creation idempotent and ensure streams can be recreated when missing during subscription/query mutation flows.
- Server: proactively create missing event streams when adding/removing queries (to recover after Redis loss).
- Front: add infinite retry with randomized delay and reset local “active query listeners” on SSE reconnect to force resubscription.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts | Switches to idempotent event stream creation and creates streams during add/remove query mutations. |
| packages/twenty-server/src/engine/subscriptions/event-stream.service.ts | Replaces “throw if exists” creation with createEventStreamIfItDoesNotExist. |
| packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts | Adds reconnect handling: clears active listeners on connect + randomized retry delay with infinite attempts. |
| packages/twenty-front/src/modules/sse-db-event/constants/SseConnectionRetryMaxWaitTimeInMs.ts | Introduces constant for max reconnect jitter window (2 minutes). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
packages/twenty-server/src/engine/subscriptions/event-stream.service.ts
Outdated
Show resolved
Hide resolved
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts
Outdated
Show resolved
Hide resolved
| ) { | ||
| const eventStreamChannelId = eventStreamIdToChannelId(eventStreamId); | ||
|
|
||
| await this.eventStreamService.createEventStream({ | ||
| await this.eventStreamService.createEventStreamIfItDoesNotExist({ | ||
| workspaceId: workspace.id, | ||
| eventStreamChannelId, | ||
| authContext: { |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because createEventStreamIfItDoesNotExist can no-op when the stream already exists, downstream error handling should avoid treating the stream as “owned/created” by this resolver invocation. In particular, the catch block later in this method unconditionally calls destroyEventStream; consider tracking whether the stream was created in this call (e.g., have the service return a boolean) and only destroying when appropriate to avoid deleting a pre-existing/in-use stream on transient subscribe errors.
packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts
Outdated
Show resolved
Hide resolved
packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts
Outdated
Show resolved
Hide resolved
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts
Show resolved
Hide resolved
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts
Outdated
Show resolved
Hide resolved
8ec3689 to
25bfa20
Compare
25bfa20 to
245847c
Compare
This PR handles SSE event stream edge cases.
With this PR we avoid error messages in the front end due to TTL or pod crash, we implement a resilient way of reconnecting silently.
To avoid DDoSing our servers if pods crash or a full restart of the cluster is made, we evenly space retry attempts to reconnect from all the clients, to avoid n clients reconnection at the same time, we use a random wait time between 0 and a constant max wait time (set to 2 mins for now). This is the cheapest and most effective solution, clients who want to force reconnect have to refresh or navigate to another page.
Fixes twentyhq/core-team-issues#2045