Skip to content

Conversation

@prd-hoang-doan
Copy link

Ticket:

Flowise Roadmap

Overview:

This pull request implements real-time collaborative editing functionality for the Flowise canvas, enabling multiple users to work on the same chatflow simultaneously. The implementation includes WebSocket infrastructure on both frontend and backend, with features like cursor tracking, node presence indicators, user avatars, and live state synchronization.

Changes:

  • Adds WebSocket server infrastructure with Redis-based pub/sub for horizontal scaling.
  • Implements real-time collaboration features including cursor tracking, node presence, and live editing.
  • Adds comprehensive health monitoring, rate limiting, and connection pooling for WebSocket connections.
  • Implemented CanvasWithPresence component for node presence synchronization.
  • Added ConnectionStatusNotification and ConnectionStatusIndicator for real-time connection status.
  • Introduced CursorOverlay to display user cursors in collaborative mode.
  • Created NodePresenceOverlay to indicate users hovering or editing nodes.
  • Developed WebSocketStatusBanner to show server health and connection issues.
  • Enhanced Canvas component to manage collaborative state and user presence.
  • Integrated throttling for cursor movement based on server health.
  • Updated Vite configuration to support WebSocket connections.

Collaborative mode includes three layers:

Layer 1: Active users' presence

user-presence

Layer 2: Live cursors

live cursor

Layer 3: Node-level presence

node presence

Demo Recording

collaborative.mode.demo.mp4

Security

  • Authentication: Bearer Token from Cookie.
  • Authorization: Verifies the user has permission to access a specific chatflow, same organization.
  • Data Sanitization: Username, User colors, Message.
  • Rate Limiting

Deployment

  • Environment
WS_PORT=3000
ENABLE_WEBSOCKET=true
  • Standalone mode:
MODE=ws-only
  • Run the standalone mode server instance
pnpm start-websocket  
- Adds WebSocket server infrastructure with Redis-based pub/sub for horizontal scaling.
- Implements real-time collaboration features including cursor tracking, node presence, and live editing.
- Adds comprehensive health monitoring, rate limiting, and connection pooling for WebSocket connections.
- Implemented CanvasWithNodePresence component for node presence synchronization.
- Added ConnectionStatusNotification and ConnectionStatusIndicator for real-time connection status.
- Introduced CursorOverlay to display user cursors in collaborative mode.
- Created NodePresenceOverlay to indicate users hovering or editing nodes.
- Developed WebSocketStatusBanner to show server health and connection issues.
- Enhanced Canvas component to manage collaborative state and user presence.
- Integrated throttling for cursor movement based on server health.
- Updated Vite configuration to support WebSocket connections.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @prd-hoang-doan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers a significant new feature: real-time collaborative editing for the Flowise canvas. It establishes a dedicated WebSocket backend, leveraging Redis for scalable message broadcasting, and integrates a suite of frontend components to provide a rich, interactive collaborative environment. Users can now see each other's cursors, active presence on nodes, and live updates to the chatflow, all while ensuring secure and performant communication.

Highlights

  • Real-time Collaborative Editing: Introduces a new feature allowing multiple users to simultaneously edit the Flowise canvas, with changes synchronized in real-time.
  • WebSocket Infrastructure: Implements a robust WebSocket server with Redis-based pub/sub for horizontal scaling, supporting real-time communication between clients and the server.
  • User Presence and Interaction Visualizations: Adds visual indicators for active users, live cursor tracking, and node-level presence (hovering/editing) to enhance the collaborative experience.
  • Connection Management and Health Monitoring: Includes comprehensive features for WebSocket connection management, rate limiting, connection pooling, and health monitoring, with UI components to display connection status and server load.
  • Security and Authorization: Incorporates Bearer Token authentication, chatflow-specific authorization, and data sanitization for WebSocket events to ensure secure collaboration.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major feature: real-time collaborative editing for the Flowise canvas. The implementation is extensive, covering both backend WebSocket infrastructure with Redis for scaling and a rich frontend experience with presence indicators, cursor tracking, and live state synchronization. The architecture is well-designed, with clear separation of concerns for connection management, room management, presence, and state synchronization. The code demonstrates good practices for security and scalability, such as rate limiting, connection pooling, and using Redis for pub/sub.

My review focuses on potential race conditions, performance bottlenecks, and security considerations. I've identified a few areas for improvement:

  • A synchronous file operation that could block the event loop under load.
  • A potential for silent data loss if a chatflow update lacks user information.
  • A bug in handling duplicate WebSocket sessions that could lead to inconsistent client state.
  • A client-side health check logic that might be too permissive.

All original comments have been retained as they do not contradict or align with the provided rules in a way that requires modification or dropping. Addressing these points will further enhance its robustness and reliability.

Comment on lines +264 to +267
if (!user) {
this.savingChatflows.delete(chatflowId)
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If a dirty chatflow snapshot doesn't have an updatedByUser, its changes will be silently dropped and never persisted to the database. This could lead to data loss. While applyEvent is expected to set the user, this check acts as a safeguard. It would be more robust to log a warning when this condition is met to make potential issues visible for debugging.

Suggested change
if (!user) {
this.savingChatflows.delete(chatflowId)
continue
}
if (!user) {
logger.warn(`[ChatFlowStateService] Skipping save for dirty chatflow ${chatflowId} because updatedByUser is not set.`)
this.savingChatflows.delete(chatflowId)
continue
}
/**
* Register a new WebSocket connection
*/
public async addConnection(socket: AuthenticatedWebSocket): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The addConnection method currently returns void. If a duplicate session is detected, it logs a warning and returns, but the calling code in wsServer.ts is unaware of this rejection. This leads to the server sending a connection-established message to a client that has been effectively rejected, causing inconsistent state. The method should return a boolean indicating whether the connection was successfully added. The caller can then handle the rejection appropriately, for instance, by closing the socket with a specific reason.

Suggested change
public async addConnection(socket: AuthenticatedWebSocket): Promise<void> {
public async addConnection(socket: AuthenticatedWebSocket): Promise<boolean> {
Comment on lines +286 to +295
await this.poolManager.addConnection(socket)

// Send connection confirmation with sessionId
socket.send(
JSON.stringify({
type: 'connection-established',
sessionId: socket.sessionId,
userId: user.id
})
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

After addConnection is called, the server immediately sends a connection-established message. However, addConnection might internally reject the connection (e.g., for a duplicate session) without signaling failure. This can lead to a client believing it's connected when the server has not fully registered it. The addConnection method should return a status, and this code should check that status before proceeding. If the connection is not added successfully, the socket should be closed with an appropriate message and code.

            const isAdded = await this.poolManager.addConnection(socket)

            if (!isAdded) {
                socket.send(
                    JSON.stringify({
                        type: 'connection-error',
                        message: 'Duplicate session detected. Another tab with the same chatflow might be open.'
                    })
                )
                socket.close(4009, 'Duplicate session')
                return
            }

            // Send connection confirmation with sessionId
            socket.send(
                JSON.stringify({
                    type: 'connection-established',
                    sessionId: socket.sessionId,
                    userId: user.id
                })
            )
}, [currentUser]) // Re-run when authentication state changes

const sendMessage = (type, payload) => {
const ableToSend = (healthStatus.status !== 'critical' || healthStatus.utilization < 80) && !rateLimitError && !connectionBlocked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic to determine if a message can be sent seems too permissive. The condition (healthStatus.status !== 'critical' || healthStatus.utilization < 80) allows messages to be sent even when the server health is 'critical', as long as the utilization is below 80%. This could exacerbate server load issues during a critical state. It would be safer to block all messages when the status is 'critical', regardless of the utilization percentage.

Suggested change
const ableToSend = (healthStatus.status !== 'critical' || healthStatus.utilization < 80) && !rateLimitError && !connectionBlocked
const ableToSend = healthStatus.status !== 'critical' && !rateLimitError && !connectionBlocked
Comment on lines +14 to +23
async saveEvent(event: any) {
const dir = path.join(__dirname, '../../../../logs/chat-flow-events')
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
const filePath = path.join(dir, `events.jsonl`)
fs.appendFileSync(filePath, JSON.stringify(event, null, 2) + '\n')

return event
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using fs.appendFileSync is a synchronous operation that blocks the Node.js event loop. In a high-throughput WebSocket server, this can become a performance bottleneck, as it will prevent the server from handling other requests while the file is being written to. It's recommended to use the asynchronous version fs.promises.appendFile to avoid blocking.

    async saveEvent(event: any) {
        const dir = path.join(__dirname, '../../../../logs/chat-flow-events')
        if (!fs.existsSync(dir)) {
            // mkdirSync is acceptable here as it's a one-time setup operation.
            fs.mkdirSync(dir, { recursive: true })
        }
        const filePath = path.join(dir, `events.jsonl`)
        try {
            await fs.promises.appendFile(filePath, JSON.stringify(event, null, 2) + '\n')
        } catch (error) {
            // It's better to log the error than to let it crash the process
            console.error('Failed to save event to log file:', error);
        }

        return event
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

1 participant