-
-
Notifications
You must be signed in to change notification settings - Fork 23.7k
Feature: Teams Collaborative Mode (Cloud/Enterprise) #5694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feature: Teams Collaborative Mode (Cloud/Enterprise) #5694
Conversation
- Adds WebSocket server infrastructure with Redis-based pub/sub for horizontal scaling. - Implements real-time collaboration features including cursor tracking, node presence, and live editing. - Adds comprehensive health monitoring, rate limiting, and connection pooling for WebSocket connections. - Implemented CanvasWithNodePresence component for node presence synchronization. - Added ConnectionStatusNotification and ConnectionStatusIndicator for real-time connection status. - Introduced CursorOverlay to display user cursors in collaborative mode. - Created NodePresenceOverlay to indicate users hovering or editing nodes. - Developed WebSocketStatusBanner to show server health and connection issues. - Enhanced Canvas component to manage collaborative state and user presence. - Integrated throttling for cursor movement based on server health. - Updated Vite configuration to support WebSocket connections.
Summary of ChangesHello @prd-hoang-doan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers a significant new feature: real-time collaborative editing for the Flowise canvas. It establishes a dedicated WebSocket backend, leveraging Redis for scalable message broadcasting, and integrates a suite of frontend components to provide a rich, interactive collaborative environment. Users can now see each other's cursors, active presence on nodes, and live updates to the chatflow, all while ensuring secure and performant communication. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major feature: real-time collaborative editing for the Flowise canvas. The implementation is extensive, covering both backend WebSocket infrastructure with Redis for scaling and a rich frontend experience with presence indicators, cursor tracking, and live state synchronization. The architecture is well-designed, with clear separation of concerns for connection management, room management, presence, and state synchronization. The code demonstrates good practices for security and scalability, such as rate limiting, connection pooling, and using Redis for pub/sub.
My review focuses on potential race conditions, performance bottlenecks, and security considerations. I've identified a few areas for improvement:
- A synchronous file operation that could block the event loop under load.
- A potential for silent data loss if a chatflow update lacks user information.
- A bug in handling duplicate WebSocket sessions that could lead to inconsistent client state.
- A client-side health check logic that might be too permissive.
All original comments have been retained as they do not contradict or align with the provided rules in a way that requires modification or dropping. Addressing these points will further enhance its robustness and reliability.
| if (!user) { | ||
| this.savingChatflows.delete(chatflowId) | ||
| continue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a dirty chatflow snapshot doesn't have an updatedByUser, its changes will be silently dropped and never persisted to the database. This could lead to data loss. While applyEvent is expected to set the user, this check acts as a safeguard. It would be more robust to log a warning when this condition is met to make potential issues visible for debugging.
| if (!user) { | |
| this.savingChatflows.delete(chatflowId) | |
| continue | |
| } | |
| if (!user) { | |
| logger.warn(`[ChatFlowStateService] Skipping save for dirty chatflow ${chatflowId} because updatedByUser is not set.`) | |
| this.savingChatflows.delete(chatflowId) | |
| continue | |
| } |
| /** | ||
| * Register a new WebSocket connection | ||
| */ | ||
| public async addConnection(socket: AuthenticatedWebSocket): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addConnection method currently returns void. If a duplicate session is detected, it logs a warning and returns, but the calling code in wsServer.ts is unaware of this rejection. This leads to the server sending a connection-established message to a client that has been effectively rejected, causing inconsistent state. The method should return a boolean indicating whether the connection was successfully added. The caller can then handle the rejection appropriately, for instance, by closing the socket with a specific reason.
| public async addConnection(socket: AuthenticatedWebSocket): Promise<void> { | |
| public async addConnection(socket: AuthenticatedWebSocket): Promise<boolean> { |
| await this.poolManager.addConnection(socket) | ||
|
|
||
| // Send connection confirmation with sessionId | ||
| socket.send( | ||
| JSON.stringify({ | ||
| type: 'connection-established', | ||
| sessionId: socket.sessionId, | ||
| userId: user.id | ||
| }) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After addConnection is called, the server immediately sends a connection-established message. However, addConnection might internally reject the connection (e.g., for a duplicate session) without signaling failure. This can lead to a client believing it's connected when the server has not fully registered it. The addConnection method should return a status, and this code should check that status before proceeding. If the connection is not added successfully, the socket should be closed with an appropriate message and code.
const isAdded = await this.poolManager.addConnection(socket)
if (!isAdded) {
socket.send(
JSON.stringify({
type: 'connection-error',
message: 'Duplicate session detected. Another tab with the same chatflow might be open.'
})
)
socket.close(4009, 'Duplicate session')
return
}
// Send connection confirmation with sessionId
socket.send(
JSON.stringify({
type: 'connection-established',
sessionId: socket.sessionId,
userId: user.id
})
)| }, [currentUser]) // Re-run when authentication state changes | ||
|
|
||
| const sendMessage = (type, payload) => { | ||
| const ableToSend = (healthStatus.status !== 'critical' || healthStatus.utilization < 80) && !rateLimitError && !connectionBlocked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to determine if a message can be sent seems too permissive. The condition (healthStatus.status !== 'critical' || healthStatus.utilization < 80) allows messages to be sent even when the server health is 'critical', as long as the utilization is below 80%. This could exacerbate server load issues during a critical state. It would be safer to block all messages when the status is 'critical', regardless of the utilization percentage.
| const ableToSend = (healthStatus.status !== 'critical' || healthStatus.utilization < 80) && !rateLimitError && !connectionBlocked | |
| const ableToSend = healthStatus.status !== 'critical' && !rateLimitError && !connectionBlocked |
| async saveEvent(event: any) { | ||
| const dir = path.join(__dirname, '../../../../logs/chat-flow-events') | ||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }) | ||
| } | ||
| const filePath = path.join(dir, `events.jsonl`) | ||
| fs.appendFileSync(filePath, JSON.stringify(event, null, 2) + '\n') | ||
|
|
||
| return event | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using fs.appendFileSync is a synchronous operation that blocks the Node.js event loop. In a high-throughput WebSocket server, this can become a performance bottleneck, as it will prevent the server from handling other requests while the file is being written to. It's recommended to use the asynchronous version fs.promises.appendFile to avoid blocking.
async saveEvent(event: any) {
const dir = path.join(__dirname, '../../../../logs/chat-flow-events')
if (!fs.existsSync(dir)) {
// mkdirSync is acceptable here as it's a one-time setup operation.
fs.mkdirSync(dir, { recursive: true })
}
const filePath = path.join(dir, `events.jsonl`)
try {
await fs.promises.appendFile(filePath, JSON.stringify(event, null, 2) + '\n')
} catch (error) {
// It's better to log the error than to let it crash the process
console.error('Failed to save event to log file:', error);
}
return event
}
Ticket:
Flowise Roadmap
Overview:
This pull request implements real-time collaborative editing functionality for the Flowise canvas, enabling multiple users to work on the same chatflow simultaneously. The implementation includes WebSocket infrastructure on both frontend and backend, with features like cursor tracking, node presence indicators, user avatars, and live state synchronization.
Changes:
Collaborative mode includes three layers:
Layer 1: Active users' presence
Layer 2: Live cursors
Layer 3: Node-level presence
Demo Recording
collaborative.mode.demo.mp4
Security
Deployment