Message Protocol Design
Custom Message Formats (JSON Schema/Protobuf)
JSON Schema Design Example:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "WebSocketMessage",
"type": "object",
"properties": {
"messageId": {
"type": "string",
"format": "uuid"
},
"timestamp": {
"type": "integer",
"minimum": 0
},
"type": {
"type": "string",
"enum": ["text", "image", "file", "command"]
},
"sender": {
"type": "string",
"minLength": 1
},
"payload": {
"type": "object",
"oneOf": [
{
"$ref": "#/definitions/textPayload"
},
{
"$ref": "#/definitions/imagePayload"
},
{
"$ref": "#/definitions/filePayload"
},
{
"$ref": "#/definitions/commandPayload"
}
]
},
"metadata": {
"type": "object",
"additionalProperties": true
}
},
"required": ["messageId", "timestamp", "type", "sender"],
"definitions": {
"textPayload": {
"type": "object",
"properties": {
"content": {
"type": "string",
"maxLength": 4096
}
},
"required": ["content"]
},
"imagePayload": {
"type": "object",
"properties": {
"url": {
"type": "string",
"format": "uri"
},
"width": {
"type": "integer",
"minimum": 1
},
"height": {
"type": "integer",
"minimum": 1
}
},
"required": ["url", "width", "height"]
},
"filePayload": {
"type": "object",
"properties": {
"url": {
"type": "string",
"format": "uri"
},
"size": {
"type": "integer",
"minimum": 1
},
"mimeType": {
"type": "string"
}
},
"required": ["url", "size", "mimeType"]
},
"commandPayload": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["join", "leave", "mute", "kick"]
},
"target": {
"type": ["string", "null"]
}
},
"required": ["action"]
}
}
}Protobuf Design Example:
syntax = "proto3";
package websocket;
import "google/protobuf/timestamp.proto";
message WebSocketMessage {
string message_id = 1; // UUID format
google.protobuf.Timestamp timestamp = 2;
MessageType type = 3;
string sender = 4;
oneof payload {
TextPayload text = 5;
ImagePayload image = 6;
FilePayload file = 7;
CommandPayload command = 8;
}
map<string, string> metadata = 9;
}
enum MessageType {
TEXT = 0;
IMAGE = 1;
FILE = 2;
COMMAND = 3;
}
message TextPayload {
string content = 1; // Max 4096 characters
}
message ImagePayload {
string url = 1; // Image URL
int32 width = 2; // Width (pixels)
int32 height = 3; // Height (pixels)
}
message FilePayload {
string url = 1; // File URL
int64 size = 2; // File size (bytes)
string mime_type = 3;// MIME type
}
message CommandPayload {
CommandAction action = 1;
string target = 2; // Optional target
}
enum CommandAction {
JOIN = 0;
LEAVE = 1;
MUTE = 2;
KICK = 3;
}Message Types and Routing Design
Message Type Classification:
- Control Messages: Connection management, room operations, etc.
- Real-Time Messages: Chat messages, status updates, etc.
- System Messages: Notifications, warnings, etc.
- Data Messages: File transfers, large data, etc.
Routing Strategy:
class MessageRouter {
constructor() {
this.routes = new Map();
this.defaultHandler = null;
}
addRoute(messageType, handler) {
this.routes.set(messageType, handler);
}
setDefaultHandler(handler) {
this.defaultHandler = handler;
}
route(message) {
const handler = this.routes.get(message.type) || this.defaultHandler;
if (handler) {
return handler(message);
}
throw new Error(`No handler for message type: ${message.type}`);
}
}
// Usage example
const router = new MessageRouter();
// Register route handlers
router.addRoute('text', handleTextMessage);
router.addRoute('image', handleImageMessage);
router.addRoute('command', handleCommandMessage);
// Set default handler
router.setDefaultHandler(handleUnknownMessage);
// Route message
function onWebSocketMessage(message) {
try {
const parsedMessage = validateAndParseMessage(message);
router.route(parsedMessage);
} catch (error) {
console.error('Message processing error:', error);
sendErrorResponse(error);
}
}Message Versioning and Compatibility
Versioning Strategy:
- Protocol Version Number: Include version number in message header
- Backward Compatibility: New server versions support old client versions
- Forward Compatibility: Old servers ignore unrecognized fields
Versioned Message Design:
{
"version": "1.2",
"messageId": "a1b2c3d4-...",
"type": "text",
"sender": "user123",
"payload": {
"content": "Hello",
"mentions": ["user456"] // New field in newer version
}
}Version Compatibility Handling:
function handleVersionedMessage(message) {
const currentVersion = '1.2';
const messageVersion = message.version;
// Version check
if (compareVersions(messageVersion, currentVersion) > 0) {
throw new Error(`Unsupported protocol version: ${messageVersion}`);
}
// Route to different handlers based on version
if (compareVersions(messageVersion, '1.0') >= 0) {
if (message.type === 'text') {
if (compareVersions(messageVersion, '1.1') >= 0) {
// Handle messages version 1.1+ (includes mentions field)
handleTextMessageV1_1(message);
} else {
// Handle version 1.0 messages (ignore mentions field)
handleTextMessageV1_0(message);
}
}
} else {
throw new Error(`Unsupported protocol version: ${messageVersion}`);
}
}
function compareVersions(v1, v2) {
const parts1 = v1.split('.').map(Number);
const parts2 = v2.split('.').map(Number);
for (let i = 0; i < Math.max(parts1.length, parts2.length); i++) {
const p1 = parts1[i] || 0;
const p2 = parts2[i] || 0;
if (p1 > p2) return 1;
if (p1 < p2) return -1;
}
return 0;
}Message Acknowledgment and Retransmission Mechanism
Reliable Message Transmission Design:
class ReliableMessageManager {
constructor() {
this.pendingAcks = new Map(); // messageId -> {message, timestamp, retries}
this.ackTimeout = 5000; // 5-second ACK timeout
this.maxRetries = 3;
}
send(message, sendCallback) {
const messageId = generateUUID();
const messageWithId = {
...message,
messageId
};
if (this.pendingAcks.size >= 1000) {
console.warn('Too many pending ACKs, discarding new message');
return false;
}
this.pendingAcks.set(messageId, {
message: messageWithId,
timestamp: Date.now(),
retries: 0
});
// Send message
sendCallback(messageWithId);
// Set ACK timeout
this.startAckTimeout(messageId);
return true;
}
handleAck(messageId) {
if (this.pendingAcks.has(messageId)) {
this.pendingAcks.delete(messageId);
}
}
startAckTimeout(messageId) {
setTimeout(() => {
if (this.pendingAcks.has(messageId)) {
const entry = this.pendingAcks.get(messageId);
if (entry.retries < this.maxRetries) {
// Retry sending
console.log(`Message ${messageId} not acknowledged, retrying (attempt ${entry.retries + 1})`);
entry.retries++;
entry.timestamp = Date.now();
// Resend message
sendCallback(entry.message);
// Reset timeout
this.startAckTimeout(messageId);
} else {
// Exceeded max retries
console.error(`Message ${messageId} exceeded retry limit, giving up`);
this.pendingAcks.delete(messageId);
// Can notify application layer of failure
}
}
}, this.ackTimeout);
}
// Periodically clean up expired pending ACKs
cleanupExpiredMessages() {
const now = Date.now();
for (const [messageId, entry] of this.pendingAcks) {
if (now - entry.timestamp > this.ackTimeout * 2) {
this.pendingAcks.delete(messageId);
}
}
}
}
// Usage example
const reliableManager = new ReliableMessageManager();
function sendMessage(message) {
return reliableManager.send(message, (messageWithId) => {
websocket.send(JSON.stringify(messageWithId));
});
}
// Handle ACK messages
websocket.onmessage = (event) => {
const response = JSON.parse(event.data);
if (response.type === 'ack') {
reliableManager.handleAck(response.messageId);
} else {
// Handle other messages...
}
};Message Queue and Buffering Strategy
Message Queue Implementation:
class MessageQueue {
constructor(processor, options = {}) {
this.queue = [];
this.processor = processor;
this.batchSize = options.batchSize || 10;
this.batchTimeout = options.batchTimeout || 50; // ms
this.timer = null;
this.isProcessing = false;
}
enqueue(message) {
this.queue.push(message);
if (this.queue.length >= this.batchSize) {
this.processBatch();
} else if (!this.timer) {
this.timer = setTimeout(() => this.processBatch(), this.batchTimeout);
}
}
async processBatch() {
if (this.isProcessing || this.queue.length === 0) {
return;
}
this.isProcessing = true;
clearTimeout(this.timer);
this.timer = null;
// Extract batch of messages
const batch = this.queue.splice(0, this.batchSize);
try {
// Process batch of messages
await this.processor(batch);
} catch (error) {
console.error('Batch processing error:', error);
// Can add retry logic
} finally {
this.isProcessing = false;
// If queue has more messages, continue processing
if (this.queue.length > 0) {
this.processBatch();
}
}
}
}
// Usage example
const messageQueue = new MessageQueue(async (batch) => {
console.log('Processing batch of messages:', batch.length);
// Simulate async processing
await new Promise(resolve => setTimeout(resolve, 10));
}, { batchSize: 5, batchTimeout: 100 });
// WebSocket message handling
websocket.onmessage = (event) => {
const message = JSON.parse(event.data);
messageQueue.enqueue(message);
};Buffering Strategies:
- Memory Buffering: Suitable for temporary message storage
- Disk Buffering: Suitable for persisting important messages
- Distributed Buffering: Suitable for clustered environments
Smart Buffering Example:
class SmartMessageBuffer {
constructor(options = {}) {
this.memoryBuffer = [];
this.memoryLimit = options.memoryLimit || 1000;
this.diskBuffer = new DiskBuffer(options.diskConfig);
this.networkStatus = 'good'; // 'good', 'slow', 'bad'
this.lastNetworkCheck = 0;
}
addMessage(message) {
// Periodically check network status
this.checkNetworkStatus();
// Decide buffering strategy based on network status
switch (this.networkStatus) {
case 'good':
// Good network, send immediately
this.sendImmediately(message);
break;
case 'slow':
// Slow network, store in memory buffer first
if (this.memoryBuffer.length < this.memoryLimit) {
this.memoryBuffer.push(message);
} else {
// Memory buffer full, store to disk
this.diskBuffer.add(message);
}
break;
case 'bad':
// Poor network, store to disk directly
this.diskBuffer.add(message);
break;
}
}
sendImmediately(message) {
// Implement immediate sending logic
// If sending fails, decide buffering strategy based on current network status
}
checkNetworkStatus() {
const now = Date.now();
if (now - this.lastNetworkCheck > 5000) { // Check every 5 seconds
this.lastNetworkCheck = now;
// Simulate network status check
const latency = measureNetworkLatency();
if (latency < 100) {
this.networkStatus = 'good';
} else if (latency < 500) {
this.networkStatus = 'slow';
} else {
this.networkStatus = 'bad';
}
}
}
}
// Usage example
const buffer = new SmartMessageBuffer({
memoryLimit: 500,
diskConfig: { path: '/tmp/message_buffer' }
});
websocket.onmessage = (event) => {
const message = JSON.parse(event.data);
buffer.addMessage(message);
};Broadcasting and Multicasting
Broadcast Message Implementation (Server Iterates Clients)
Basic Broadcast Implementation:
class BroadcastManager {
constructor() {
this.clients = new Set();
}
addClient(client) {
this.clients.add(client);
}
removeClient(client) {
this.clients.delete(client);
}
broadcast(message) {
const messageStr = JSON.stringify(message);
for (const client of this.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(messageStr);
} else {
// Client connection closed, remove it
this.removeClient(client);
}
}
}
broadcastToSubset(predicate, message) {
const messageStr = JSON.stringify(message);
for (const client of this.clients) {
if (client.readyState === WebSocket.OPEN && predicate(client)) {
client.send(messageStr);
}
}
}
}
// Usage example
const broadcastManager = new BroadcastManager();
// Add client
wss.on('connection', (ws) => {
broadcastManager.addClient(ws);
ws.on('close', () => {
broadcastManager.removeClient(ws);
});
});
// Broadcast message
function sendSystemNotification(content) {
broadcastManager.broadcast({
type: 'system',
content,
timestamp: Date.now()
});
}
// Broadcast to specific user group
function sendToGroup(groupId, message) {
broadcastManager.broadcastToSubset(
(client) => client.groupId === groupId,
message
);
}Optimizing Broadcast Performance:
class OptimizedBroadcastManager {
constructor() {
this.clientGroups = new Map(); // groupId -> Set<client>
this.allClients = new Set();
}
addClient(client, groupId = 'default') {
this.allClients.add(client);
if (!this.clientGroups.has(groupId)) {
this.clientGroups.set(groupId, new Set());
}
this.clientGroups.get(groupId).add(client);
}
removeClient(client) {
this.allClients.delete(client);
for (const group of this.clientGroups.values()) {
group.delete(client);
}
}
broadcastToAll(message) {
const messageStr = JSON.stringify(message);
for (const client of this.allClients) {
if (client.readyState === WebSocket.OPEN) {
client.send(messageStr);
} else {
this.removeClient(client);
}
}
}
broadcastToGroup(groupId, message) {
if (!this.clientGroups.has(groupId)) {
return;
}
const messageStr = JSON.stringify(message);
const groupClients = this.clientGroups.get(groupId);
for (const client of groupClients) {
if (client.readyState === WebSocket.OPEN) {
client.send(messageStr);
} else {
groupClients.delete(client);
this.allClients.delete(client);
}
}
}
broadcastToGroups(groupIds, message) {
const messageStr = JSON.stringify(message);
for (const groupId of groupIds) {
if (this.clientGroups.has(groupId)) {
const groupClients = this.clientGroups.get(groupId);
for (const client of groupClients) {
if (client.readyState === WebSocket.OPEN) {
client.send(messageStr);
} else {
groupClients.delete(client);
this.allClients.delete(client);
}
}
}
}
}
}
// Usage example
const broadcastManager = new OptimizedBroadcastManager();
// Add client to specific group
wss.on('connection', (ws) => {
const groupId = determineUserGroup(ws); // Implement group logic
broadcastManager.addClient(ws, groupId);
ws.on('close', () => {
broadcastManager.removeClient(ws);
});
});
// Broadcast to all clients
broadcastManager.broadcastToAll({
type: 'system',
content: 'Server maintenance notice',
timestamp: Date.now()
});
// Broadcast to specific group
broadcastManager.broadcastToGroup('premium_users', {
type: 'promotion',
content: 'Exclusive offer for VIP users',
timestamp: Date.now()
});Multicast Message Group Management
Grouping Strategies:
- Static Grouping: Predefined based on user attributes
- Dynamic Grouping: Created dynamically based on real-time behavior
- Hybrid Grouping: Combination of static and dynamic
Group Management Implementation:
class GroupManager {
constructor() {
this.groups = new Map(); // groupId -> {name, members, metadata}
this.userGroups = new Map(); // userId -> Set<groupId>
}
createGroup(groupId, name, metadata = {}) {
if (this.groups.has(groupId)) {
throw new Error(`Group ${groupId} already exists`);
}
this.groups.set(groupId, {
id: groupId,
name,
members: new Set(),
metadata
});
}
deleteGroup(groupId) {
if (!this.groups.has(groupId)) {
throw new Error(`Group ${groupId} does not exist`);
}
// Remove group from all users' group lists
for (const [userId, groupIds] of this.userGroups) {
if (groupIds.has(groupId)) {
groupIds.delete(groupId);
}
}
this.groups.delete(groupId);
}
addUserToGroup(userId, groupId) {
if (!this.groups.has(groupId)) {
throw new Error(`Group ${groupId} does not exist`);
}
// Add to group's member list
this.groups.get(groupId).members.add(userId);
// Add to user's group list
if (!this.userGroups.has(userId)) {
this.userGroups.set(userId, new Set());
}
this.userGroups.get(userId).add(groupId);
}
removeUserFromGroup(userId, groupId) {
if (!this.groups.has(groupId)) {
throw new Error(`Group ${groupId} does not exist`);
}
// Remove from group's member list
this.groups.get(groupId).members.delete(userId);
// Remove from user's group list
if (this.userGroups.has(userId)) {
this.userGroups.get(userId).delete(groupId);
// If user has no other groups, remove from map
if (this.userGroups.get(userId).size === 0) {
this.userGroups.delete(userId);
}
}
}
getGroupMembers(groupId) {
if (!this.groups.has(groupId)) {
throw new Error(`Group ${groupId} does not exist`);
}
return Array.from(this.groups.get(groupId).members);
}
getUserGroups(userId) {
if (!this.userGroups.has(userId)) {
return [];
}
return Array.from(this.userGroups.get(userId));
}
}
// Usage example
const groupManager = new GroupManager();
// Create groups
groupManager.createGroup('premium_users', 'VIP Users');
groupManager.createGroup('new_users', 'Newly Registered Users');
// Add users to groups
groupManager.addUserToGroup('user123', 'premium_users');
groupManager.addUserToGroup('user456', 'new_users');
// Get group members
const premiumUsers = groupManager.getGroupMembers('premium_users');Room and Channel Design (Chat Rooms, Game Rooms)
Room Management Implementation:
class RoomManager {
constructor() {
this.rooms = new Map(); // roomId -> Room
this.userRooms = new Map(); // userId -> Set<roomId>
}
createRoom(roomId, options = {}) {
if (this.rooms.has(roomId)) {
throw new Error(`Room ${roomId} already exists`);
}
const room = new Room(roomId, options);
this.rooms.set(roomId, room);
return room;
}
deleteRoom(roomId) {
if (!this.rooms.has(roomId)) {
throw new Error(`Room ${roomId} does not exist`);
}
// Remove room from all users' room lists
const room = this.rooms.get(roomId);
for (const userId of room.getUsers()) {
this.removeUserFromRoom(userId, roomId);
}
this.rooms.delete(roomId);
}
joinRoom(userId, roomId) {
if (!this.rooms.has(roomId)) {
throw new Error(`Room ${roomId} does not exist`);
}
// Add to room's user list
this.rooms.get(roomId).addUser(userId);
// Add to user's room list
if (!this.userRooms.has(userId)) {
this.userRooms.set(userId, new Set());
}
this.userRooms.get(userId).add(roomId);
}
leaveRoom(userId, roomId) {
if (!this.rooms.has(roomId)) {
throw new Error(`Room ${roomId} does not exist`);
}
// Remove from room's user list
this.rooms.get(roomId).removeUser(userId);
// Remove from user's room list
if (this.userRooms.has(userId)) {
this.userRooms.get(userId).delete(roomId);
// If user has no other rooms, remove from map
if (this.userRooms.get(userId).size === 0) {
this.userRooms.delete(userId);
}
}
}
getRoomUsers(roomId) {
if (!this.rooms.has(roomId)) {
throw new Error(`Room ${roomId} does not exist`);
}
return this.rooms.get(roomId).getUsers();
}
getUserRooms(userId) {
if (!this.userRooms.has(userId)) {
return [];
}
return Array.from(this.userRooms.get(userId));
}
}
class Room {
constructor(roomId, options = {}) {
this.id = roomId;
this.name = options.name || `Room ${roomId}`;
this.maxUsers = options.maxUsers || 100;
this.users = new Set();
this.metadata = options.metadata || {};
}
addUser(userId) {
if (this.users.size >= this.maxUsers) {
throw new Error(`Room ${this.id} is full`);
}
if (this.users.has(userId)) {
throw new Error(`User ${userId} is already in room ${this.id}`);
}
this.users.add(userId);
}
removeUser(userId) {
this.users.delete(userId);
}
getUsers() {
return Array.from(this.users);
}
getUserCount() {
return this.users.size;
}
setMetadata(key, value) {
this.metadata[key] = value;
}
getMetadata(key) {
return this.metadata[key];
}
}
// Usage example
const roomManager = new RoomManager();
// Create chat room
const chatRoom = roomManager.createRoom('chat_123', {
name: 'Game Chat Room',
maxUsers: 50,
metadata: { type: 'chat' }
});
// User joins room
roomManager.joinRoom('user123', 'chat_123');
// Get room users
const usersInChat = roomManager.getRoomUsers('chat_123');Client Subscription and Unsubscription
Subscription Management Implementation:
class SubscriptionManager {
constructor() {
this.subscriptions = new Map(); // subscriptionId -> {topic, callback}
this.topicSubscriptions = new Map(); // topic -> Set<subscriptionId>
this.nextSubscriptionId = 1;
}
subscribe(topic, callback) {
const subscriptionId = this.nextSubscriptionId++;
// Add to subscription map
this.subscriptions.set(subscriptionId, {
topic,
callback
});
// Add to topic subscription map
if (!this.topicSubscriptions.has(topic)) {
this.topicSubscriptions.set(topic, new Set());
}
this.topicSubscriptions.get(topic).add(subscriptionId);
return subscriptionId;
}
unsubscribe(subscriptionId) {
if (!this.subscriptions.has(subscriptionId)) {
throw new Error(`Subscription ${subscriptionId} does not exist`);
}
const { topic } = this.subscriptions.get(subscriptionId);
// Remove from topic subscription map
if (this.topicSubscriptions.has(topic)) {
this.topicSubscriptions.get(topic).delete(subscriptionId);
// If topic has no subscribers, remove from map
if (this.topicSubscriptions.get(topic).size === 0) {
this.topicSubscriptions.delete(topic);
}
}
// Remove from subscription map
this.subscriptions.delete(subscriptionId);
}
publish(topic, message) {
if (!this.topicSubscriptions.has(topic)) {
return; // No subscribers
}
const subscriptionIds = this.topicSubscriptions.get(topic);
for (const subscriptionId of subscriptionIds) {
const { callback } = this.subscriptions.get(subscriptionId);
try {
callback(message);
} catch (error) {
console.error(`Subscription ${subscriptionId} callback execution error:`, error);
}
}
}
}
// Usage example
const subscriptionManager = new SubscriptionManager();
// Client subscribes to topic
const subscriptionId1 = subscriptionManager.subscribe('news', (message) => {
console.log('Received news message:', message);
});
const subscriptionId2 = subscriptionManager.subscribe('sports', (message) => {
console.log('Received sports message:', message);
});
// Publish messages
subscriptionManager.publish('news', { title: 'Latest News', content: '...' });
subscriptionManager.publish('sports', { title: 'Match Results', content: '...' });
// Unsubscribe
subscriptionManager.unsubscribe(subscriptionId1);Distributed Broadcasting (Multi-Server Instance Synchronization)
Distributed Broadcast Architecture:
[Client A] ↔ [WebSocket Server 1] ↔ [Message Bus] ↔ [WebSocket Server 2]
↔ [WebSocket Server 3] ↔ [Message Bus] ↔ [WebSocket Server N]Redis-Based Distributed Broadcast Implementation:
const Redis = require('ioredis');
const redis = new Redis();
class DistributedBroadcast {
constructor(redisConfig) {
this.redis = new Redis(redisConfig);
this.pubSub = new Redis(redisConfig);
this.localSubscriptions = new Map(); // channel -> Set<callback>
this.setupPubSub();
}
setupPubSub() {
this.pubSub.on('message', (channel, message) => {
if (this.localSubscriptions.has(channel)) {
const callbacks = this.localSubscriptions.get(channel);
const parsedMessage = JSON.parse(message);
for (const callback of callbacks) {
try {
callback(parsedMessage);
} catch (error) {
console.error(`Channel ${channel} callback execution error:`, error);
}
}
}
});
}
subscribe(channel, callback) {
if (!this.localSubscriptions.has(channel)) {
this.localSubscriptions.set(channel, new Set());
this.pubSub.subscribe(channel);
}
this.localSubscriptions.get(channel).add(callback);
// Return unsubscribe function
return () => {
this.unsubscribe(channel, callback);
};
}
unsubscribe(channel, callback) {
if (this.localSubscriptions.has(channel)) {
const callbacks = this.localSubscriptions.get(channel);
callbacks.delete(callback);
if (callbacks.size === 0) {
this.localSubscriptions.delete(channel);
this.pubSub.unsubscribe(channel);
}
}
}
publish(channel, message) {
// Local publish
if (this.localSubscriptions.has(channel)) {
const callbacks = this.localSubscriptions.get(channel);
const messageStr = JSON.stringify(message);
for (const callback of callbacks) {
try {
callback(message);
} catch (error) {
console.error(`Channel ${channel} callback execution error:`, error);
}
}
}
// Distributed publish
this.redis.publish(channel, JSON.stringify(message));
}
}
// Usage example
const broadcast = new DistributedBroadcast({
host: 'redis-host',
port: 6379
});
// Subscribe to channel
const unsubscribe = broadcast.subscribe('chat_room_1', (message) => {
console.log('Received message:', message);
});
// Publish message
broadcast.publish('chat_room_1', {
type: 'text',
sender: 'user123',
content: 'Hello, distributed world!'
});
// Unsubscribe
unsubscribe();Message Queue-Based Distributed Broadcast:
const { Kafka } = require('kafkajs');
class KafkaBroadcast {
constructor(kafkaConfig) {
this.kafka = new Kafka(kafkaConfig);
this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({ groupId: 'websocket-broadcast-group' });
this.subscriptions = new Map(); // topic -> Set<callback>
this.setupConsumer();
}
async connect() {
await this.producer.connect();
await this.consumer.connect();
}
async disconnect() {
await this.producer.disconnect();
await this.consumer.disconnect();
}
async setupConsumer() {
await this.consumer.subscribe({ topic: '*', fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (this.subscriptions.has(topic)) {
const callbacks = this.subscriptions.get(topic);
const parsedMessage = JSON.parse(message.value.toString());
for (const callback of callbacks) {
try {
callback(parsedMessage);
} catch (error) {
console.error(`Topic ${topic} callback execution error:`, error);
}
}
}
}
});
}
subscribe(topic, callback) {
if (!this.subscriptions.has(topic)) {
this.subscriptions.set(topic, new Set());
}
this.subscriptions.get(topic).add(callback);
// Return unsubscribe function
return () => {
this.unsubscribe(topic, callback);
};
}
unsubscribe(topic, callback) {
if (this.subscriptions.has(topic)) {
const callbacks = this.subscriptions.get(topic);
callbacks.delete(callback);
if (callbacks.size === 0) {
this.subscriptions.delete(topic);
}
}
}
async publish(topic, message) {
// Local publish
if (this.subscriptions.has(topic)) {
const callbacks = this.subscriptions.get(topic);
const messageStr = JSON.stringify(message);
for (const callback of callbacks) {
try {
callback(message);
} catch (error) {
console.error(`Topic ${topic} callback execution error:`, error);
}
}
}
// Distributed publish
await this.producer.send({
topic,
messages: [{ value: JSON.stringify(message) }]
});
}
}
// Usage example
(async () => {
const broadcast = new KafkaBroadcast({
clientId: 'websocket-broadcast',
brokers: ['kafka1:9092', 'kafka2:9092']
});
await broadcast.connect();
// Subscribe to topic
const unsubscribe = broadcast.subscribe('chat_room_1', (message) => {
console.log('Received message:', message);
});
// Publish message
await broadcast.publish('chat_room_1', {
type: 'text',
sender: 'user123',
content: 'Hello, Kafka world!'
});
// Unsubscribe
unsubscribe();
await broadcast.disconnect();
})();Connection Management and Scaling
Client Connection State Monitoring
Connection State Monitoring Implementation:
class ConnectionMonitor {
constructor() {
this.connections = new Map(); // connectionId -> {ws, state, stats}
this.statsInterval = 5000; // Collect stats every 5 seconds
this.monitorInterval = null;
}
addConnection(connectionId, ws) {
this.connections.set(connectionId, {
ws,
state: 'connected',
stats: {
messagesReceived: 0,
messagesSent: 0,
bytesReceived: 0,
bytesSent: 0,
lastActivity: Date.now()
}
});
// Set up WebSocket event handlers
ws.on('message', (message) => {
const connection = this.connections.get(connectionId);
if (connection) {
connection.stats.messagesReceived++;
connection.stats.bytesReceived += message.length;
connection.stats.lastActivity = Date.now();
}
});
ws.on('close', () => {
this.removeConnection(connectionId);
});
ws.on('error', () => {
this.updateConnectionState(connectionId, 'error');
});
// Start monitoring if not already started
if (!this.monitorInterval) {
this.startMonitoring();
}
}
removeConnection(connectionId) {
if (this.connections.has(connectionId)) {
const connection = this.connections.get(connectionId);
connection.state = 'closed';
// Log connection stats
this.logConnectionStats(connectionId, connection.stats);
this.connections.delete(connectionId);
// Stop monitoring if no more connections
if (this.connections.size === 0) {
this.stopMonitoring();
}
}
}
updateConnectionState(connectionId, newState) {
if (this.connections.has(connectionId)) {
this.connections.get(connectionId).state = newState;
}
}
startMonitoring() {
this.monitorInterval = setInterval(() => {
const now = Date.now();
const inactiveConnections = [];
// Check all connection states
for (const [connectionId, connection] of this.connections) {
// Check for inactive connections
if (now - connection.stats.lastActivity > 30000) { // 30 seconds inactive
inactiveConnections.push(connectionId);
}
// Collect additional metrics here
}
// Handle inactive connections
for (const connectionId of inactiveConnections) {
console.log(`Connection ${connectionId} inactive, possibly disconnected`);
// Can try sending ping or close connection
}
}, this.statsInterval);
}
stopMonitoring() {
if (this.monitorInterval) {
clearInterval(this.monitorInterval);
this.monitorInterval = null;
}
}
logConnectionStats(connectionId, stats) {
// Log stats to monitoring system
console.log(`Connection ${connectionId} statistics:`, {
messagesReceived: stats.messagesReceived,
messagesSent: stats.messagesSent,
bytesReceived: stats.bytesReceived,
bytesSent: stats.bytesSent,
duration: (stats.lastActivity - this.connections.get(connectionId).stats.lastActivity) / 1000 + ' seconds'
});
// Can integrate with Prometheus, Datadog, etc.
}
getConnectionStats(connectionId) {
if (this.connections.has(connectionId)) {
const connection = this.connections.get(connectionId);
return {
state: connection.state,
...connection.stats
};
}
return null;
}
getAllConnectionStats() {
const stats = [];
for (const [connectionId, connection] of this.connections) {
stats.push({
connectionId,
state: connection.state,
...connection.stats
});
}
return stats;
}
}
// Usage example
const monitor = new ConnectionMonitor();
// WebSocket server integration
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
const connectionId = generateUUID(); // Implement UUID generation
monitor.addConnection(connectionId, ws);
ws.connectionId = connectionId;
ws.on('close', () => {
monitor.removeConnection(connectionId);
});
});
// Periodically print all connection stats
setInterval(() => {
const stats = monitor.getAllConnectionStats();
console.log('Current connection stats:', stats);
}, 60000); // Print every minuteConnection Limits and Load Balancing
Connection Limit Implementation:
class ConnectionLimiter {
constructor(maxConnections) {
this.maxConnections = maxConnections;
this.currentConnections = 0;
this.waitingQueue = [];
}
async acquireConnection() {
if (this.currentConnections < this.maxConnections) {
this.currentConnections++;
return true;
} else {
// Return a Promise resolved when connection is available
return new Promise((resolve) => {
this.waitingQueue.push(resolve);
});
}
}
releaseConnection() {
this.currentConnections--;
// Release one waiting connection if any
if (this.waitingQueue.length > 0) {
const nextResolve = this.waitingQueue.shift();
this.currentConnections++;
nextResolve(true);
}
}
getCurrentConnections() {
return this.currentConnections;
}
}
// WebSocket server integration
const limiter = new ConnectionLimiter(1000); // Max 1000 connections
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', async (ws) => {
try {
await limiter.acquireConnection();
ws.on('close', () => {
limiter.releaseConnection();
});
ws.on('error', () => {
limiter.releaseConnection();
});
// Handle WebSocket messages...
} catch (error) {
console.error('Connection acquisition failed:', error);
ws.close(1013, 'Server busy, please try again later'); // 1013 is Try Again Later
}
});
// Load balancer strategy
class LoadBalancer {
constructor(servers) {
this.servers = servers; // List of servers
this.serverLoads = new Map(); // server -> current connections
// Initialize server loads
for (const server of this.servers) {
this.serverLoads.set(server, 0);
}
}
getLeastLoadedServer() {
let minLoad = Infinity;
let selectedServer = null;
for (const [server, load] of this.serverLoads) {
if (load < minLoad) {
minLoad = load;
selectedServer = server;
}
}
return selectedServer;
}
incrementServerLoad(server) {
if (this.serverLoads.has(server)) {
this.serverLoads.set(server, this.serverLoads.get(server) + 1);
}
}
decrementServerLoad(server) {
if (this.serverLoads.has(server)) {
this.serverLoads.set(server, this.serverLoads.get(server) - 1);
}
}
getConnectionCount(server) {
return this.serverLoads.get(server) || 0;
}
}
// Usage example
const servers = ['ws://server1:8080', 'ws://server2:8080', 'ws://server3:8080'];
const loadBalancer = new LoadBalancer(servers);
// Select least-loaded server for client connection
function connectToWebSocket() {
const server = loadBalancer.getLeastLoadedServer();
loadBalancer.incrementServerLoad(server);
const ws = new WebSocket(server);
ws.on('close', () => {
loadBalancer.decrementServerLoad(server);
});
ws.on('error', () => {
loadBalancer.decrementServerLoad(server);
});
return ws;
}Server-Side Horizontal Scaling (WebSocket Cluster)
WebSocket Cluster Architecture:
[Client] ↔ [Load Balancer (Nginx/HAProxy)] ↔ [WebSocket Server 1]
↔ [WebSocket Server 2]
↔ [WebSocket Server N]Redis-Based WebSocket Cluster Implementation:
const Redis = require('ioredis');
const redis = new Redis();
class ClusterWebSocketManager {
constructor() {
this.localConnections = new Map(); // connectionId -> ws
this.redisSub = new Redis();
this.setupRedisPubSub();
}
setupRedisPubSub() {
// Subscribe to cluster message channel
this.redisSub.subscribe('websocket:cluster:messages');
this.redisSub.on('message', (channel, message) => {
if (channel === 'websocket:cluster:messages') {
const { connectionId, data } = JSON.parse(message);
// If message is not for local connection, forward
if (!this.localConnections.has(connectionId)) {
this.logCrossNodeMessage(connectionId);
return;
}
const ws = this.localConnections.get(connectionId);
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
}
});
}
addConnection(connectionId, ws) {
this.localConnections.set(connectionId, ws);
// Set up WebSocket event handlers
ws.on('message', (message) => {
this.handleLocalMessage(connectionId, message);
});
ws.on('close', () => {
this.removeConnection(connectionId);
});
ws.on('error', () => {
this.removeConnection(connectionId);
});
}
removeConnection(connectionId) {
this.localConnections.delete(connectionId);
}
sendToConnection(connectionId, message) {
if (this.localConnections.has(connectionId)) {
// Local connection, send directly
const ws = this.localConnections.get(connectionId);
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
} else {
// Cross-node connection, publish via Redis
redis.publish('websocket:cluster:messages', JSON.stringify({
connectionId,
data: message
}));
this.logCrossNodeMessage(connectionId);
}
}
broadcast(message) {
// Local broadcast
for (const [connectionId, ws] of this.localConnections) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
}
// Cluster broadcast
redis.publish('websocket:cluster:broadcast', message);
}
handleLocalMessage(connectionId, message) {
// Handle local message logic...
if (this.isMessageForOtherNodes(message)) {
redis.publish('websocket:cluster:messages', JSON.stringify({
connectionId: this.extractTargetConnectionId(message),
data: message
}));
}
}
logCrossNodeMessage(connectionId) {
// Log cross-node message stats
console.log(`Cross-node message: ${connectionId}`);
}
isMessageForOtherNodes(message) {
// Determine if message needs to be sent to other nodes
return false;
}
extractTargetConnectionId(message) {
// Extract target connection ID from message
return null;
}
}
// Usage example
const clusterManager = new ClusterWebSocketManager();
// WebSocket server integration
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
const connectionId = generateUUID();
clusterManager.addConnection(connectionId, ws);
ws.on('close', () => {
clusterManager.removeConnection(connectionId);
});
});Nginx-Based WebSocket Load Balancer Configuration:
http {
upstream websocket_servers {
ip_hash; # Use ip_hash for sticky sessions
server ws1.example.com:8080;
server ws2.example.com:8080;
server ws3.example.com:8080;
}
server {
listen 80;
server_name websocket.example.com;
location /ws {
proxy_pass http://websocket_servers;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
# Timeout settings
proxy_read_timeout 86400s; # 24 hours
proxy_send_timeout 86400s;
}
}
}Distributed Session Management (Redis/ZooKeeper)
Redis-Based Session Management:
const Redis = require('ioredis');
const redis = new Redis();
class RedisSessionManager {
constructor() {
this.sessionTTL = 3600; // Session TTL in seconds
}
createSession(userId, connectionId, metadata = {}) {
const sessionId = generateUUID();
const now = Math.floor(Date.now() / 1000);
const sessionData = {
userId,
connectionId,
createdAt: now,
expiresAt: now + this.sessionTTL,
metadata
};
// Store session data
redis.hmset(`session:${sessionId}`, sessionData);
// Set expiration
redis.expire(`session:${sessionId}`, this.sessionTTL);
// Map user to session
redis.set(`user:${userId}:session`, sessionId);
// Map connection ID to session
redis.set(`connection:${connectionId}:session`, sessionId);
return sessionId;
}
getSession(sessionId) {
return redis.hgetall(`session:${sessionId}`);
}
getUserSessions(userId) {
return new Promise((resolve) => {
redis.get(`user:${userId}:session`, (err, sessionId) => {
if (err || !sessionId) {
resolve(null);
return;
}
this.getSession(sessionId).then((session) => {
resolve(session || null);
});
});
});
}
getConnectionSession(connectionId) {
return new Promise((resolve) => {
redis.get(`connection:${connectionId}:session`, (err, sessionId) => {
if (err || !sessionId) {
resolve(null);
return;
}
this.getSession(sessionId).then((session) => {
resolve(session || null);
});
});
});
}
updateSession(sessionId, updates) {
// Update session data
redis.hmset(`session:${sessionId}`, updates);
// Refresh expiration
redis.expire(`session:${sessionId}`, this.sessionTTL);
}
deleteSession(sessionId) {
return new Promise((resolve) => {
redis.hgetall(`session:${sessionId}`, (err, session) => {
if (err || !session) {
resolve(false);
return;
}
// Delete all related keys
const multi = redis.multi();
multi.del(`session:${sessionId}`);
multi.del(`user:${session.userId}:session`);
multi.del(`connection:${session.connectionId}:session`);
multi.exec(() => {
resolve(true);
});
});
});
}
refreshSession(sessionId) {
// Refresh session TTL
redis.expire(`session:${sessionId}`, this.sessionTTL);
}
}
// Usage example
const sessionManager = new RedisSessionManager();
// Create session on WebSocket connection
wss.on('connection', (ws) => {
const userId = authenticateUser(ws); // Implement user authentication
const connectionId = generateUUID();
const sessionId = sessionManager.createSession(userId, connectionId, {
ip: ws._socket.remoteAddress,
userAgent: ws._socket.remoteFamily
});
ws.sessionId = sessionId;
ws.connectionId = connectionId;
ws.on('close', () => {
sessionManager.deleteSession(sessionId);
});
});
// Fetch user session info
function getUserSessionInfo(userId) {
return sessionManager.getUserSessions(userId);
}Connection Migration and Recovery Mechanisms
Connection Migration Design:
class ConnectionMigrator {
constructor() {
this.pendingMigrations = new Map(); // connectionId -> {targetServer, retryCount}
this.migrationTimeout = 10000; // 10-second migration timeout
}
initiateMigration(connectionId, targetServer) {
if (this.pendingMigrations.has(connectionId)) {
throw new Error(`Connection ${connectionId} already has an ongoing migration`);
}
this.pendingMigrations.set(connectionId, {
targetServer,
retryCount: 0
});
this.startMigrationProcess(connectionId);
}
startMigrationProcess(connectionId) {
const migration = this.pendingMigrations.get(connectionId);
if (!migration) {
return;
}
this.notifyTargetServer(migration.targetServer, connectionId)
.then(() => {
this.pauseMessageSending(connectionId)
.then(() => {
this.transferConnectionState(connectionId, migration.targetServer)
NotifyTargetServer(targetServer, connectionId);
.then(() => {
this.notifyClientToReconnect(connectionId, migration.targetServer)
.then(() => {
this.confirmMigrationComplete(connectionId)
.then(() => {
this.pendingMigrations.delete(connectionId);
})
.catch(error => {
this.handleMigrationError(connectionId, error);
});
})
.catch(error => {
this.handleMigrationError(connectionId, error);
});
})
.catch(error => {
this.handleMigrationError(connectionId, error);
});
})
.catch(error => {
this.handleMigrationError(connectionId, error);
});
})
.catch(error => {
this.handleMigrationError(connectionId, error);
});
}
notifyTargetServer(targetServer, connectionId) {
return new Promise((resolve, reject) => {
// Implement notification to target server
console.log(`Notify target server ${targetServer} to prepare for connection ${connectionId}`);
// Simulate async operation
setTimeout(() => {
if (Math.random() > 0.1) { // 90% success rate
resolve();
} else {
reject(new Error('Target server preparation failed'));
}
}, 500);
});
}
pauseMessageSending(connectionId) {
return new Promise((resolve, reject) => {
// Implement message sending pause
console.log(`Pausing message sending for connection ${connectionId}`);
// Simulate async operation
setTimeout(() => {
if (Math.random() > 0.05) { // 95% success rate
resolve();
} else {
reject(new Error('Pausing message sending failed'));
}
}, 300);
});
}
transferConnectionState(connectionId, targetServer) {
return new Promise((resolve, reject) => {
// Implement connection state transfer
console.log(`Transferring state for connection ${connectionId} to server ${targetServer}`);
// Simulate async operation
setTimeout(() => {
if (Math.random() > 0.1) { // 90% success rate
resolve();
} else {
reject(new Error('State transfer failed'));
}
}, 1000);
});
}
notifyClientToReconnect(connectionId, targetServer) {
return new Promise((resolve, reject) => {
// Implement client reconnect notification
console.log(`Notifying client of connection ${connectionId} to reconnect to ${targetServer}`);
// Simulate async operation
setTimeout(() => {
if (Math.random() > 0.2) { // 80% success rate
resolve();
} else {
reject(new Error('Client reconnect notification failed'));
}
}, 800);
});
}
confirmMigrationComplete(connectionId) {
return new Promise((resolve, reject) => {
// Implement migration completion confirmation
console.log(`Confirming migration completion for ${connectionId}`);
// Simulate async operation
setTimeout(() => {
if (Math.random() > 0.05) { // 95% success rate
resolve();
} else {
reject(new Error('Migration confirmation failed')));
}
}, 400);
});
}
handleMigrationError(connectionId, error) {
const migration = this.pendingMigrations.get(connectionId);
if (!migration) {
return;
}
migration.retryCount++;
if (migration.retryCount < 3) {
console.log(`Migration failed for ${connectionId}, retrying ${migration.retryCount}/3`);
setTimeout(() => {
this.startMigrationProcess(connectionId);
}, 2000); // Retry after 2 seconds
} else {
console.error(`Migration failed for ${connectionId}, exceeded max retries:`, error);
this.pendingMigrations.delete(connectionId);
// Can notify monitoring system or take recovery actions
}
}
// Connection recovery mechanism
handleConnectionDrop(connectionId) {
if (this.pendingMigrations.has(connectionId)) {
console.log(`Connection ${connectionId} dropped, ongoing migration may have completed`);
return;
}
// Implement connection recovery logic
console.log(`Connection ${connectionId} dropped, initiating recovery process`);
// Can integrate with reconnect strategy here
}
}
// Usage example
const migrator = new ConnectionMigrator();
// Initiate migration
migrator.initiateMigration('conn123', 'ws://new-server:8080');Client Connection Recovery Implementation:
class ResilientWebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: 5,
initialDelay: 1000,
maxDelay: 30000,
...options
};
this.reconnectAttempts = 0;
this.reconnectTimer = null;
this.ws = null;
this.pendingMessages = [];
this.connectionId = null;
this.sessionId = null;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connection established');
this.reconnectAttempts = 0; // Reset reconnect counter
// Send all pending messages
this.flushPendingMessages();
// Restore session if available
if (this.sessionId) {
this.restoreSession();
}
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
// Handle session restoration response
if (message.type === 'session_restored') {
this.sessionId = message.sessionId;
this.connectionId = message.connectionId;
console.log('Session restored successfully:', this.sessionId);
return;
}
// Handle other messages...
this.handleMessage(message);
} catch (error) {
console.error('Message parsing error:', error);
}
};
this.ws.onclose = () => {
console.log('WebSocket connection closed');
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.scheduleReconnect();
};
}
sendMessage(message) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// Connection not open, cache message
this.pendingMessages.push(message);
// For important messages, try reconnecting immediately
if (message.important) {
this.connect();
}
}
}
flushPendingMessages() {
while (this.pendingMessages.length > 0 && this.ws.readyState === WebSocket.OPEN) {
const message = this.pendingMessages.shift();
this.ws.send(JSON.stringify(message));
}
}
scheduleReconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
console.log('Max reconnect attempts reached');
if (this.options.onmaxreconnect) {
this.options.onmaxreconnect();
}
return;
}
// Calculate exponential backoff delay
const delay = Math.min(
this.options.initialDelay * Math.pow(2, this.reconnectAttempts),
this.options.maxDelay
);
this.reconnectAttempts++;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);
// Clear previous timer
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
// Set new timer
this.reconnectTimer = setTimeout(() => {
this.connect();
}, delay);
}
restoreSession() {
if (!this.sessionId || !this.connectionId) {
console.error('Cannot restore session: missing sessionId or connectionId');
return;
}
this.sendMessage({
type: 'restore_session',
sessionId: this.sessionId,
connectionId: this.connectionId
});
}
handleMessage(message) {
switch (message.type) {
case 'session_restored':
this.sessionId = message.sessionId;
this.connectionId = message.connectionId;
console.log('Session restored successfully:', this.sessionId);
break;
// Handle other message types...
}
}
// Handle server migration request
handleMigrationRequest(newServerUrl) {
console.log('Received migration request to new server:', newServerUrl);
// Save current state
const currentState = this.saveState();
// Close current connection
if (this.ws) {
this.ws.close();
}
// Update URL and reconnect
this.url = newServerUrl;
// Restore state
this.restoreState(currentState);
// Reconnect
this.connect();
}
saveState() {
return {
pendingMessages: this.pendingMessages,
sessionId: this.sessionId,
connectionId: this.connectionId
};
}
restoreState(state) {
this.pendingMessages = state.pendingMessages || [];
this.sessionId = state.sessionId;
this.connectionId = state.connectionId;
}
}
// Usage example
const client = new ResilientWebSocketClient('ws://example.com/socket', {
maxReconnectAttempts: 10,
initialDelay: 1000,
maxDelay: 60000,
onmaxreconnect: () => {
console.log('Cannot connect to server, please check network or try again later');
}
});
// Send message
client.sendMessage({ type: 'chat', content: 'Hello' });
// Important message
client.sendMessage({
type: 'important_command',
action: 'update_settings',
important: true
});Comprehensive Application Case Study
Real-Time Collaborative Editing System
Architecture Design:
[Client A] ↔ [WebSocket Server 1] ↔ [Redis Pub/Sub] ↔ [WebSocket Server 2]
↗ [Redis] ↘ [Database]
↗ WebSocket ↘
[WebSocket Server 3]Implementation Key Points:
- Operational Transformation (OT): Resolve conflicting edits
- Real-Time Synchronization: Broadcast edit operations via WebSocket
- State Persistence: Periodically save to database
- Connection Recovery: Restore editing state after disconnection
Core Code Example:
class CollaborativeEditor {
constructor(documentId) {
this.documentId = documentId;
this.operations = []; // Operation history
this.clients = new Set(); // Connected clients
this.pendingOperations = new Map(); // clientId -> [operations]
}
addClient(clientId, ws) {
this.clients.add(clientId);
this.pendingOperations.set(clientId, []);
// Send current document state to new client
this.sendDocumentState(clientId, ws);
// Set up message handler
ws.on('message', (message) => {
this.handleClientMessage(clientId, message);
});
ws.on('close', () => {
this.removeClient(clientId);
});
}
removeClient(clientId) {
this.clients.delete(clientId);
this.pendingOperations.delete(clientId);
}
handleClientMessage(clientId, message) {
try {
const msg = JSON.parse(message);
if (msg.type === 'operation') {
// Handle edit operation
const operation = this.transformOperation(msg.operation);
this.operations.push(operation);
// Broadcast to other clients
this.broadcastOperation(clientId, operation);
// Save operation to persistent storage
this.saveOperation(operation);
} else if (msg.type === 'cursor') {
// Handle cursor position update
this.broadcastCursor(clientId, msg.cursor);
}
} catch (error) {
console.error('Client message processing error:', error);
}
}
transformOperation(operation) {
// Implement operation transformation
return operation; // Simplified example
}
broadcastOperation(senderId, operation) {
const message = JSON.stringify({
type: 'operation',
operation
});
for (const clientId of this.clients) {
if (clientId !== senderId) {
const ws = this.getClientWebSocket(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
}
}
}
broadcastCursor(senderId, cursor) {
const message = JSON.stringify({
type: 'cursor',
clientId: senderId,
cursor
});
for (const clientId of this.clients) {
if (clientId !== senderId) {
const ws = this.getClientWebSocket(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
}
}
}
sendDocumentState(clientId, ws) {
// Get current document state
const documentState = this.getDocumentState();
const message = JSON.stringify({
type: 'init',
document: documentState
});
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
}
saveOperation(operation) {
// Implement persistent operation saving
console.log('Saving operation:', operation);
}
getDocumentState() {
// Implement document state retrieval
return { content: '...' }; // Placeholder
}
getClientWebSocket(clientId) {
// Implement WebSocket retrieval
return null;
}
}
// WebSocket server integration
const wss = new WebSocket.Server({ port: 8080 });
const editors = new Map(); // documentId -> CollaborativeEditor
wss.on('connection', (ws) => {
ws.on('message', (message) => {
try {
const msg = JSON.parse(message);
if (msg.type === 'join') {
const documentId = msg.documentId;
const clientId = generateUUID();
if (!editors.has(documentId)) {
editors.set(documentId, new CollaborativeEditor(documentId));
}
const editor = editors.get(documentId);
editor.addClient(clientId, ws);
// Send client ID to ws
ws.send(JSON.stringify({
type: 'joined',
clientId
}));
}
} catch (error) {
console.error('Connection message processing error:', error);
}
});
ws.on('close', () => {
// Implement client disconnection cleanup
// Need to track clientId to ws mapping
});
});



