Skip to content

Conversation

@lucasbordeau
Copy link
Contributor

@lucasbordeau lucasbordeau commented Jan 28, 2026

This PR handles SSE event stream edge cases.

  • When a pod restarts, the front clients have to reconnect to SSE
  • When the dev server restarts or is hot reloaded, the front client has to reconnect to SSE
  • When redis server restarts or the redis key is cleared for any reason, the server has to recreate the event stream in redis, this can happen when navigating for example.
  • Log in / log out flow

With this PR we avoid error messages in the front end due to TTL or pod crash, we implement a resilient way of reconnecting silently.

To avoid DDoSing our servers if pods crash or a full restart of the cluster is made, we evenly space retry attempts to reconnect from all the clients, to avoid n clients reconnection at the same time, we use a random wait time between 0 and a constant max wait time (set to 2 mins for now). This is the cheapest and most effective solution, clients who want to force reconnect have to refresh or navigate to another page.

Fixes twentyhq/core-team-issues#2045

Copilot AI review requested due to automatic review settings January 28, 2026 14:48
@lucasbordeau lucasbordeau requested a review from thomtrp January 28, 2026 14:49
@lucasbordeau lucasbordeau changed the title Handle SSE event stream reconnexion Jan 28, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 28, 2026

Greptile Overview

Greptile Summary

This PR improves SSE event stream reconnection handling by making the system resilient to pod restarts, server restarts, and Redis data loss. The implementation prevents DDoS scenarios by spacing client reconnection attempts randomly over 2 minutes.

Key Changes:

  • Frontend: Added infinite retry with random backoff (0-2 minutes) to space reconnection attempts across clients, preventing simultaneous reconnections after cluster restarts
  • Frontend: Added connection handler to clear activeQueryListenersState on reconnection
  • Backend: Renamed createEventStream to createEventStreamIfItDoesNotExist and made it idempotent - now returns early if stream exists instead of throwing error
  • Backend: Added proactive stream recreation in addQueryToEventStream and removeQueryFromEventStream to handle Redis key loss scenarios

Issues Found:

  • Minor naming convention violation: variable uses "rando" abbreviation instead of "random"

The changes are well-architected and handle the edge cases described in the PR. The random backoff strategy is a cost-effective solution to prevent thundering herd problems during cluster-wide restarts.

Confidence Score: 4.5/5

  • This PR is safe to merge with minimal risk - addresses critical reconnection edge cases with sound architecture
  • The implementation correctly handles SSE reconnection edge cases with an idempotent backend approach and intelligent client-side backoff. The only issue is a minor naming convention violation that doesn't affect functionality. The random backoff strategy effectively prevents thundering herd problems.
  • No files require special attention - all changes are well-implemented

Important Files Changed

Filename Overview
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts implemented infinite retry with random backoff and connection handler to clear state, minor naming issue with "rando"
packages/twenty-server/src/engine/subscriptions/event-stream.service.ts renamed method to createEventStreamIfItDoesNotExist and made it idempotent by returning early if stream exists
packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts added proactive stream recreation in addQueryToEventStream and removeQueryFromEventStream to handle Redis data loss

Sequence Diagram

sequenceDiagram
    participant Client as Frontend Client
    participant SSE as SSE Client (useSseClient)
    participant Server as GraphQL Server
    participant Redis as Redis Cache
    participant Resolver as WorkspaceEventEmitterResolver
    participant Service as EventStreamService

    Note over Client,Service: Initial Connection Flow
    Client->>SSE: Initialize SSE connection
    SSE->>Server: Connect with Bearer token
    Server->>Resolver: onEventSubscription(eventStreamId)
    Resolver->>Service: createEventStreamIfItDoesNotExist()
    Service->>Redis: Check if stream exists
    alt Stream does not exist
        Service->>Redis: Create new event stream with TTL
        Service->>Redis: Add to activeStreams set
    else Stream already exists
        Service-->>Resolver: Return (no-op)
    end
    Resolver->>Service: subscribeToEventStream()
    Service-->>Resolver: Return iterator
    Resolver-->>SSE: Stream established
    SSE->>SSE: Call handleSSEClientConnected()
    SSE->>SSE: Clear activeQueryListenersState

    Note over Client,Service: Pod/Server Restart Scenario
    Server->>Server: Pod crashes/restarts
    Server--xSSE: Connection lost
    SSE->>SSE: Detect connection failure
    SSE->>SSE: Generate random wait (0-2min)
    SSE->>SSE: sleep(randomWaitTime)
    SSE->>Server: Retry connection
    Server->>Resolver: onEventSubscription(eventStreamId)
    Resolver->>Service: createEventStreamIfItDoesNotExist()
    Service->>Redis: Check if stream exists
    alt Stream missing (Redis cleared)
        Service->>Redis: Recreate event stream
    else Stream still exists
        Service-->>Resolver: Return (no-op)
    end
    Resolver->>Service: subscribeToEventStream()
    Service-->>Resolver: Return new iterator
    Resolver-->>SSE: Reconnected
    SSE->>SSE: Call handleSSEClientConnected()
    SSE->>SSE: Clear activeQueryListenersState

    Note over Client,Service: Add/Remove Query Operations
    Client->>Resolver: addQueryToEventStream(queryId)
    Resolver->>Service: createEventStreamIfItDoesNotExist()
    alt Stream missing
        Service->>Redis: Recreate stream
    end
    Resolver->>Service: isAuthorized()
    Service->>Redis: Verify auth context
    Service-->>Resolver: Authorization result
    Resolver->>Service: addQuery()
    Service->>Redis: Update stream with new query
    Service-->>Resolver: Success
    Resolver-->>Client: true

    Client->>Resolver: removeQueryFromEventStream(queryId)
    Resolver->>Service: createEventStreamIfItDoesNotExist()
    alt Stream missing
        Service->>Redis: Recreate stream
    end
    Resolver->>Service: isAuthorized()
    Resolver->>Service: removeQuery()
    Service->>Redis: Remove query from stream
    Service-->>Resolver: Success
    Resolver-->>Client: true
Loading
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 4 files

@github-actions
Copy link
Contributor

github-actions bot commented Jan 28, 2026

🚀 Preview Environment Ready!

Your preview environment is available at: http://bore.pub:6803

This environment will automatically shut down when the PR is closed or after 5 hours.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves resilience of the SSE-based event stream by enabling reconnection handling across pod/dev server restarts and Redis key loss, with client-side retry jitter to reduce reconnect spikes.

Changes:

  • Server: make event stream creation idempotent and ensure streams can be recreated when missing during subscription/query mutation flows.
  • Server: proactively create missing event streams when adding/removing queries (to recover after Redis loss).
  • Front: add infinite retry with randomized delay and reset local “active query listeners” on SSE reconnect to force resubscription.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.resolver.ts Switches to idempotent event stream creation and creates streams during add/remove query mutations.
packages/twenty-server/src/engine/subscriptions/event-stream.service.ts Replaces “throw if exists” creation with createEventStreamIfItDoesNotExist.
packages/twenty-front/src/modules/sse-db-event/hooks/useSseClient.util.ts Adds reconnect handling: clears active listeners on connect + randomized retry delay with infinite attempts.
packages/twenty-front/src/modules/sse-db-event/constants/SseConnectionRetryMaxWaitTimeInMs.ts Introduces constant for max reconnect jitter window (2 minutes).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 99 to 122
) {
const eventStreamChannelId = eventStreamIdToChannelId(eventStreamId);

await this.eventStreamService.createEventStream({
await this.eventStreamService.createEventStreamIfItDoesNotExist({
workspaceId: workspace.id,
eventStreamChannelId,
authContext: {
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because createEventStreamIfItDoesNotExist can no-op when the stream already exists, downstream error handling should avoid treating the stream as “owned/created” by this resolver invocation. In particular, the catch block later in this method unconditionally calls destroyEventStream; consider tracking whether the stream was created in this call (e.g., have the service return a boolean) and only destroying when appropriate to avoid deleting a pre-existing/in-use stream on transient subscribe errors.

Copilot uses AI. Check for mistakes.
@lucasbordeau lucasbordeau force-pushed the feat/sse-handle-event-stream-edge-cases branch from 8ec3689 to 25bfa20 Compare January 29, 2026 10:34
@lucasbordeau lucasbordeau force-pushed the feat/sse-handle-event-stream-edge-cases branch from 25bfa20 to 245847c Compare January 30, 2026 11:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment