Lesson 11-WebSocket Advanced Communication Mode

Message Protocol Design

Custom Message Formats (JSON Schema/Protobuf)

JSON Schema Design Example:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "WebSocketMessage",
  "type": "object",
  "properties": {
    "messageId": {
      "type": "string",
      "format": "uuid"
    },
    "timestamp": {
      "type": "integer",
      "minimum": 0
    },
    "type": {
      "type": "string",
      "enum": ["text", "image", "file", "command"]
    },
    "sender": {
      "type": "string",
      "minLength": 1
    },
    "payload": {
      "type": "object",
      "oneOf": [
        {
          "$ref": "#/definitions/textPayload"
        },
        {
          "$ref": "#/definitions/imagePayload"
        },
        {
          "$ref": "#/definitions/filePayload"
        },
        {
          "$ref": "#/definitions/commandPayload"
        }
      ]
    },
    "metadata": {
      "type": "object",
      "additionalProperties": true
    }
  },
  "required": ["messageId", "timestamp", "type", "sender"],
  "definitions": {
    "textPayload": {
      "type": "object",
      "properties": {
        "content": {
          "type": "string",
          "maxLength": 4096
        }
      },
      "required": ["content"]
    },
    "imagePayload": {
      "type": "object",
      "properties": {
        "url": {
          "type": "string",
          "format": "uri"
        },
        "width": {
          "type": "integer",
          "minimum": 1
        },
        "height": {
          "type": "integer",
          "minimum": 1
        }
      },
      "required": ["url", "width", "height"]
    },
    "filePayload": {
      "type": "object",
      "properties": {
        "url": {
          "type": "string",
          "format": "uri"
        },
        "size": {
          "type": "integer",
          "minimum": 1
        },
        "mimeType": {
          "type": "string"
        }
      },
      "required": ["url", "size", "mimeType"]
    },
    "commandPayload": {
      "type": "object",
      "properties": {
        "action": {
          "type": "string",
          "enum": ["join", "leave", "mute", "kick"]
        },
        "target": {
          "type": ["string", "null"]
        }
      },
      "required": ["action"]
    }
  }
}

Protobuf Design Example:

syntax = "proto3";

package websocket;

import "google/protobuf/timestamp.proto";

message WebSocketMessage {
  string message_id = 1;  // UUID format
  google.protobuf.Timestamp timestamp = 2;
  MessageType type = 3;
  string sender = 4;
  oneof payload {
    TextPayload text = 5;
    ImagePayload image = 6;
    FilePayload file = 7;
    CommandPayload command = 8;
  }
  map<string, string> metadata = 9;
}

enum MessageType {
  TEXT = 0;
  IMAGE = 1;
  FILE = 2;
  COMMAND = 3;
}

message TextPayload {
  string content = 1;  // Max 4096 characters
}

message ImagePayload {
  string url = 1;      // Image URL
  int32 width = 2;     // Width (pixels)
  int32 height = 3;    // Height (pixels)
}

message FilePayload {
  string url = 1;      // File URL
  int64 size = 2;      // File size (bytes)
  string mime_type = 3;// MIME type
}

message CommandPayload {
  CommandAction action = 1;
  string target = 2;   // Optional target
}

enum CommandAction {
  JOIN = 0;
  LEAVE = 1;
  MUTE = 2;
  KICK = 3;
}

Message Types and Routing Design

Message Type Classification:

  1. Control Messages: Connection management, room operations, etc.
  2. Real-Time Messages: Chat messages, status updates, etc.
  3. System Messages: Notifications, warnings, etc.
  4. Data Messages: File transfers, large data, etc.

Routing Strategy:

class MessageRouter {
  constructor() {
    this.routes = new Map();
    this.defaultHandler = null;
  }

  addRoute(messageType, handler) {
    this.routes.set(messageType, handler);
  }

  setDefaultHandler(handler) {
    this.defaultHandler = handler;
  }

  route(message) {
    const handler = this.routes.get(message.type) || this.defaultHandler;
    if (handler) {
      return handler(message);
    }
    throw new Error(`No handler for message type: ${message.type}`);
  }
}

// Usage example
const router = new MessageRouter();

// Register route handlers
router.addRoute('text', handleTextMessage);
router.addRoute('image', handleImageMessage);
router.addRoute('command', handleCommandMessage);

// Set default handler
router.setDefaultHandler(handleUnknownMessage);

// Route message
function onWebSocketMessage(message) {
  try {
    const parsedMessage = validateAndParseMessage(message);
    router.route(parsedMessage);
  } catch (error) {
    console.error('Message processing error:', error);
    sendErrorResponse(error);
  }
}

Message Versioning and Compatibility

Versioning Strategy:

  1. Protocol Version Number: Include version number in message header
  2. Backward Compatibility: New server versions support old client versions
  3. Forward Compatibility: Old servers ignore unrecognized fields

Versioned Message Design:

{
  "version": "1.2",
  "messageId": "a1b2c3d4-...",
  "type": "text",
  "sender": "user123",
  "payload": {
    "content": "Hello",
    "mentions": ["user456"]  // New field in newer version
  }
}

Version Compatibility Handling:

function handleVersionedMessage(message) {
  const currentVersion = '1.2';
  const messageVersion = message.version;

  // Version check
  if (compareVersions(messageVersion, currentVersion) > 0) {
    throw new Error(`Unsupported protocol version: ${messageVersion}`);
  }

  // Route to different handlers based on version
  if (compareVersions(messageVersion, '1.0') >= 0) {
    if (message.type === 'text') {
      if (compareVersions(messageVersion, '1.1') >= 0) {
        // Handle messages version 1.1+ (includes mentions field)
        handleTextMessageV1_1(message);
      } else {
        // Handle version 1.0 messages (ignore mentions field)
        handleTextMessageV1_0(message);
      }
    }
  } else {
    throw new Error(`Unsupported protocol version: ${messageVersion}`);
  }
}

function compareVersions(v1, v2) {
  const parts1 = v1.split('.').map(Number);
  const parts2 = v2.split('.').map(Number);

  for (let i = 0; i < Math.max(parts1.length, parts2.length); i++) {
    const p1 = parts1[i] || 0;
    const p2 = parts2[i] || 0;
    if (p1 > p2) return 1;
    if (p1 < p2) return -1;
  }
  return 0;
}

Message Acknowledgment and Retransmission Mechanism

Reliable Message Transmission Design:

class ReliableMessageManager {
  constructor() {
    this.pendingAcks = new Map(); // messageId -> {message, timestamp, retries}
    this.ackTimeout = 5000; // 5-second ACK timeout
    this.maxRetries = 3;
  }

  send(message, sendCallback) {
    const messageId = generateUUID();
    const messageWithId = {
      ...message,
      messageId
    };

    if (this.pendingAcks.size >= 1000) {
      console.warn('Too many pending ACKs, discarding new message');
      return false;
    }

    this.pendingAcks.set(messageId, {
      message: messageWithId,
      timestamp: Date.now(),
      retries: 0
    });

    // Send message
    sendCallback(messageWithId);

    // Set ACK timeout
    this.startAckTimeout(messageId);

    return true;
  }

  handleAck(messageId) {
    if (this.pendingAcks.has(messageId)) {
      this.pendingAcks.delete(messageId);
    }
  }

  startAckTimeout(messageId) {
    setTimeout(() => {
      if (this.pendingAcks.has(messageId)) {
        const entry = this.pendingAcks.get(messageId);

        if (entry.retries < this.maxRetries) {
          // Retry sending
          console.log(`Message ${messageId} not acknowledged, retrying (attempt ${entry.retries + 1})`);
          entry.retries++;
          entry.timestamp = Date.now();

          // Resend message
          sendCallback(entry.message);

          // Reset timeout
          this.startAckTimeout(messageId);
        } else {
          // Exceeded max retries
          console.error(`Message ${messageId} exceeded retry limit, giving up`);
          this.pendingAcks.delete(messageId);
          // Can notify application layer of failure
        }
      }
    }, this.ackTimeout);
  }

  // Periodically clean up expired pending ACKs
  cleanupExpiredMessages() {
    const now = Date.now();
    for (const [messageId, entry] of this.pendingAcks) {
      if (now - entry.timestamp > this.ackTimeout * 2) {
        this.pendingAcks.delete(messageId);
      }
    }
  }
}

// Usage example
const reliableManager = new ReliableMessageManager();

function sendMessage(message) {
  return reliableManager.send(message, (messageWithId) => {
    websocket.send(JSON.stringify(messageWithId));
  });
}

// Handle ACK messages
websocket.onmessage = (event) => {
  const response = JSON.parse(event.data);
  if (response.type === 'ack') {
    reliableManager.handleAck(response.messageId);
  } else {
    // Handle other messages...
  }
};

Message Queue and Buffering Strategy

Message Queue Implementation:

class MessageQueue {
  constructor(processor, options = {}) {
    this.queue = [];
    this.processor = processor;
    this.batchSize = options.batchSize || 10;
    this.batchTimeout = options.batchTimeout || 50; // ms
    this.timer = null;
    this.isProcessing = false;
  }

  enqueue(message) {
    this.queue.push(message);

    if (this.queue.length >= this.batchSize) {
      this.processBatch();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.processBatch(), this.batchTimeout);
    }
  }

  async processBatch() {
    if (this.isProcessing || this.queue.length === 0) {
      return;
    }

    this.isProcessing = true;
    clearTimeout(this.timer);
    this.timer = null;

    // Extract batch of messages
    const batch = this.queue.splice(0, this.batchSize);

    try {
      // Process batch of messages
      await this.processor(batch);
    } catch (error) {
      console.error('Batch processing error:', error);
      // Can add retry logic
    } finally {
      this.isProcessing = false;

      // If queue has more messages, continue processing
      if (this.queue.length > 0) {
        this.processBatch();
      }
    }
  }
}

// Usage example
const messageQueue = new MessageQueue(async (batch) => {
  console.log('Processing batch of messages:', batch.length);
  // Simulate async processing
  await new Promise(resolve => setTimeout(resolve, 10));
}, { batchSize: 5, batchTimeout: 100 });

// WebSocket message handling
websocket.onmessage = (event) => {
  const message = JSON.parse(event.data);
  messageQueue.enqueue(message);
};

Buffering Strategies:

  1. Memory Buffering: Suitable for temporary message storage
  2. Disk Buffering: Suitable for persisting important messages
  3. Distributed Buffering: Suitable for clustered environments

Smart Buffering Example:

class SmartMessageBuffer {
  constructor(options = {}) {
    this.memoryBuffer = [];
    this.memoryLimit = options.memoryLimit || 1000;
    this.diskBuffer = new DiskBuffer(options.diskConfig);
    this.networkStatus = 'good'; // 'good', 'slow', 'bad'
    this.lastNetworkCheck = 0;
  }

  addMessage(message) {
    // Periodically check network status
    this.checkNetworkStatus();

    // Decide buffering strategy based on network status
    switch (this.networkStatus) {
      case 'good':
        // Good network, send immediately
        this.sendImmediately(message);
        break;
      case 'slow':
        // Slow network, store in memory buffer first
        if (this.memoryBuffer.length < this.memoryLimit) {
          this.memoryBuffer.push(message);
        } else {
          // Memory buffer full, store to disk
          this.diskBuffer.add(message);
        }
        break;
      case 'bad':
        // Poor network, store to disk directly
        this.diskBuffer.add(message);
        break;
    }
  }

  sendImmediately(message) {
    // Implement immediate sending logic
    // If sending fails, decide buffering strategy based on current network status
  }

  checkNetworkStatus() {
    const now = Date.now();
    if (now - this.lastNetworkCheck > 5000) { // Check every 5 seconds
      this.lastNetworkCheck = now;

      // Simulate network status check
      const latency = measureNetworkLatency();
      if (latency < 100) {
        this.networkStatus = 'good';
      } else if (latency < 500) {
        this.networkStatus = 'slow';
      } else {
        this.networkStatus = 'bad';
      }
    }
  }
}

// Usage example
const buffer = new SmartMessageBuffer({
  memoryLimit: 500,
  diskConfig: { path: '/tmp/message_buffer' }
});

websocket.onmessage = (event) => {
  const message = JSON.parse(event.data);
  buffer.addMessage(message);
};

Broadcasting and Multicasting

Broadcast Message Implementation (Server Iterates Clients)

Basic Broadcast Implementation:

class BroadcastManager {
  constructor() {
    this.clients = new Set();
  }

  addClient(client) {
    this.clients.add(client);
  }

  removeClient(client) {
    this.clients.delete(client);
  }

  broadcast(message) {
    const messageStr = JSON.stringify(message);
    for (const client of this.clients) {
      if (client.readyState === WebSocket.OPEN) {
        client.send(messageStr);
      } else {
        // Client connection closed, remove it
        this.removeClient(client);
      }
    }
  }

  broadcastToSubset(predicate, message) {
    const messageStr = JSON.stringify(message);
    for (const client of this.clients) {
      if (client.readyState === WebSocket.OPEN && predicate(client)) {
        client.send(messageStr);
      }
    }
  }
}

// Usage example
const broadcastManager = new BroadcastManager();

// Add client
wss.on('connection', (ws) => {
  broadcastManager.addClient(ws);

  ws.on('close', () => {
    broadcastManager.removeClient(ws);
  });
});

// Broadcast message
function sendSystemNotification(content) {
  broadcastManager.broadcast({
    type: 'system',
    content,
    timestamp: Date.now()
  });
}

// Broadcast to specific user group
function sendToGroup(groupId, message) {
  broadcastManager.broadcastToSubset(
    (client) => client.groupId === groupId,
    message
  );
}

Optimizing Broadcast Performance:

class OptimizedBroadcastManager {
  constructor() {
    this.clientGroups = new Map(); // groupId -> Set<client>
    this.allClients = new Set();
  }

  addClient(client, groupId = 'default') {
    this.allClients.add(client);

    if (!this.clientGroups.has(groupId)) {
      this.clientGroups.set(groupId, new Set());
    }
    this.clientGroups.get(groupId).add(client);
  }

  removeClient(client) {
    this.allClients.delete(client);

    for (const group of this.clientGroups.values()) {
      group.delete(client);
    }
  }

  broadcastToAll(message) {
    const messageStr = JSON.stringify(message);
    for (const client of this.allClients) {
      if (client.readyState === WebSocket.OPEN) {
        client.send(messageStr);
      } else {
        this.removeClient(client);
      }
    }
  }

  broadcastToGroup(groupId, message) {
    if (!this.clientGroups.has(groupId)) {
      return;
    }

    const messageStr = JSON.stringify(message);
    const groupClients = this.clientGroups.get(groupId);

    for (const client of groupClients) {
      if (client.readyState === WebSocket.OPEN) {
        client.send(messageStr);
      } else {
        groupClients.delete(client);
        this.allClients.delete(client);
      }
    }
  }

  broadcastToGroups(groupIds, message) {
    const messageStr = JSON.stringify(message);

    for (const groupId of groupIds) {
      if (this.clientGroups.has(groupId)) {
        const groupClients = this.clientGroups.get(groupId);

        for (const client of groupClients) {
          if (client.readyState === WebSocket.OPEN) {
            client.send(messageStr);
          } else {
            groupClients.delete(client);
            this.allClients.delete(client);
          }
        }
      }
    }
  }
}

// Usage example
const broadcastManager = new OptimizedBroadcastManager();

// Add client to specific group
wss.on('connection', (ws) => {
  const groupId = determineUserGroup(ws); // Implement group logic
  broadcastManager.addClient(ws, groupId);

  ws.on('close', () => {
    broadcastManager.removeClient(ws);
  });
});

// Broadcast to all clients
broadcastManager.broadcastToAll({
  type: 'system',
  content: 'Server maintenance notice',
  timestamp: Date.now()
});

// Broadcast to specific group
broadcastManager.broadcastToGroup('premium_users', {
  type: 'promotion',
  content: 'Exclusive offer for VIP users',
  timestamp: Date.now()
});

Multicast Message Group Management

Grouping Strategies:

  1. Static Grouping: Predefined based on user attributes
  2. Dynamic Grouping: Created dynamically based on real-time behavior
  3. Hybrid Grouping: Combination of static and dynamic

Group Management Implementation:

class GroupManager {
  constructor() {
    this.groups = new Map(); // groupId -> {name, members, metadata}
    this.userGroups = new Map(); // userId -> Set<groupId>
  }

  createGroup(groupId, name, metadata = {}) {
    if (this.groups.has(groupId)) {
      throw new Error(`Group ${groupId} already exists`);
    }

    this.groups.set(groupId, {
      id: groupId,
      name,
      members: new Set(),
      metadata
    });
  }

  deleteGroup(groupId) {
    if (!this.groups.has(groupId)) {
      throw new Error(`Group ${groupId} does not exist`);
    }

    // Remove group from all users' group lists
    for (const [userId, groupIds] of this.userGroups) {
      if (groupIds.has(groupId)) {
        groupIds.delete(groupId);
      }
    }

    this.groups.delete(groupId);
  }

  addUserToGroup(userId, groupId) {
    if (!this.groups.has(groupId)) {
      throw new Error(`Group ${groupId} does not exist`);
    }

    // Add to group's member list
    this.groups.get(groupId).members.add(userId);

    // Add to user's group list
    if (!this.userGroups.has(userId)) {
      this.userGroups.set(userId, new Set());
    }
    this.userGroups.get(userId).add(groupId);
  }

  removeUserFromGroup(userId, groupId) {
    if (!this.groups.has(groupId)) {
      throw new Error(`Group ${groupId} does not exist`);
    }

    // Remove from group's member list
    this.groups.get(groupId).members.delete(userId);

    // Remove from user's group list
    if (this.userGroups.has(userId)) {
      this.userGroups.get(userId).delete(groupId);

      // If user has no other groups, remove from map
      if (this.userGroups.get(userId).size === 0) {
        this.userGroups.delete(userId);
      }
    }
  }

  getGroupMembers(groupId) {
    if (!this.groups.has(groupId)) {
      throw new Error(`Group ${groupId} does not exist`);
    }

    return Array.from(this.groups.get(groupId).members);
  }

  getUserGroups(userId) {
    if (!this.userGroups.has(userId)) {
      return [];
    }

    return Array.from(this.userGroups.get(userId));
  }
}

// Usage example
const groupManager = new GroupManager();

// Create groups
groupManager.createGroup('premium_users', 'VIP Users');
groupManager.createGroup('new_users', 'Newly Registered Users');

// Add users to groups
groupManager.addUserToGroup('user123', 'premium_users');
groupManager.addUserToGroup('user456', 'new_users');

// Get group members
const premiumUsers = groupManager.getGroupMembers('premium_users');

Room and Channel Design (Chat Rooms, Game Rooms)

Room Management Implementation:

class RoomManager {
  constructor() {
    this.rooms = new Map(); // roomId -> Room
    this.userRooms = new Map(); // userId -> Set<roomId>
  }

  createRoom(roomId, options = {}) {
    if (this.rooms.has(roomId)) {
      throw new Error(`Room ${roomId} already exists`);
    }

    const room = new Room(roomId, options);
    this.rooms.set(roomId, room);

    return room;
  }

  deleteRoom(roomId) {
    if (!this.rooms.has(roomId)) {
      throw new Error(`Room ${roomId} does not exist`);
    }

    // Remove room from all users' room lists
    const room = this.rooms.get(roomId);
    for (const userId of room.getUsers()) {
      this.removeUserFromRoom(userId, roomId);
    }

    this.rooms.delete(roomId);
  }

  joinRoom(userId, roomId) {
    if (!this.rooms.has(roomId)) {
      throw new Error(`Room ${roomId} does not exist`);
    }

    // Add to room's user list
    this.rooms.get(roomId).addUser(userId);

    // Add to user's room list
    if (!this.userRooms.has(userId)) {
      this.userRooms.set(userId, new Set());
    }
    this.userRooms.get(userId).add(roomId);
  }

  leaveRoom(userId, roomId) {
    if (!this.rooms.has(roomId)) {
      throw new Error(`Room ${roomId} does not exist`);
    }

    // Remove from room's user list
    this.rooms.get(roomId).removeUser(userId);

    // Remove from user's room list
    if (this.userRooms.has(userId)) {
      this.userRooms.get(userId).delete(roomId);

      // If user has no other rooms, remove from map
      if (this.userRooms.get(userId).size === 0) {
        this.userRooms.delete(userId);
      }
    }
  }

  getRoomUsers(roomId) {
    if (!this.rooms.has(roomId)) {
      throw new Error(`Room ${roomId} does not exist`);
    }

    return this.rooms.get(roomId).getUsers();
  }

  getUserRooms(userId) {
    if (!this.userRooms.has(userId)) {
      return [];
    }

    return Array.from(this.userRooms.get(userId));
  }
}

class Room {
  constructor(roomId, options = {}) {
    this.id = roomId;
    this.name = options.name || `Room ${roomId}`;
    this.maxUsers = options.maxUsers || 100;
    this.users = new Set();
    this.metadata = options.metadata || {};
  }

  addUser(userId) {
    if (this.users.size >= this.maxUsers) {
      throw new Error(`Room ${this.id} is full`);
    }

    if (this.users.has(userId)) {
      throw new Error(`User ${userId} is already in room ${this.id}`);
    }

    this.users.add(userId);
  }

  removeUser(userId) {
    this.users.delete(userId);
  }

  getUsers() {
    return Array.from(this.users);
  }

  getUserCount() {
    return this.users.size;
  }

  setMetadata(key, value) {
    this.metadata[key] = value;
  }

  getMetadata(key) {
    return this.metadata[key];
  }
}

// Usage example
const roomManager = new RoomManager();

// Create chat room
const chatRoom = roomManager.createRoom('chat_123', {
  name: 'Game Chat Room',
  maxUsers: 50,
  metadata: { type: 'chat' }
});

// User joins room
roomManager.joinRoom('user123', 'chat_123');

// Get room users
const usersInChat = roomManager.getRoomUsers('chat_123');

Client Subscription and Unsubscription

Subscription Management Implementation:

class SubscriptionManager {
  constructor() {
    this.subscriptions = new Map(); // subscriptionId -> {topic, callback}
    this.topicSubscriptions = new Map(); // topic -> Set<subscriptionId>
    this.nextSubscriptionId = 1;
  }

  subscribe(topic, callback) {
    const subscriptionId = this.nextSubscriptionId++;

    // Add to subscription map
    this.subscriptions.set(subscriptionId, {
      topic,
      callback
    });

    // Add to topic subscription map
    if (!this.topicSubscriptions.has(topic)) {
      this.topicSubscriptions.set(topic, new Set());
    }
    this.topicSubscriptions.get(topic).add(subscriptionId);

    return subscriptionId;
  }

  unsubscribe(subscriptionId) {
    if (!this.subscriptions.has(subscriptionId)) {
      throw new Error(`Subscription ${subscriptionId} does not exist`);
    }

    const { topic } = this.subscriptions.get(subscriptionId);

    // Remove from topic subscription map
    if (this.topicSubscriptions.has(topic)) {
      this.topicSubscriptions.get(topic).delete(subscriptionId);

      // If topic has no subscribers, remove from map
      if (this.topicSubscriptions.get(topic).size === 0) {
        this.topicSubscriptions.delete(topic);
      }
    }

    // Remove from subscription map
    this.subscriptions.delete(subscriptionId);
  }

  publish(topic, message) {
    if (!this.topicSubscriptions.has(topic)) {
      return; // No subscribers
    }

    const subscriptionIds = this.topicSubscriptions.get(topic);

    for (const subscriptionId of subscriptionIds) {
      const { callback } = this.subscriptions.get(subscriptionId);
      try {
        callback(message);
      } catch (error) {
        console.error(`Subscription ${subscriptionId} callback execution error:`, error);
      }
    }
  }
}

// Usage example
const subscriptionManager = new SubscriptionManager();

// Client subscribes to topic
const subscriptionId1 = subscriptionManager.subscribe('news', (message) => {
  console.log('Received news message:', message);
});

const subscriptionId2 = subscriptionManager.subscribe('sports', (message) => {
  console.log('Received sports message:', message);
});

// Publish messages
subscriptionManager.publish('news', { title: 'Latest News', content: '...' });
subscriptionManager.publish('sports', { title: 'Match Results', content: '...' });

// Unsubscribe
subscriptionManager.unsubscribe(subscriptionId1);

Distributed Broadcasting (Multi-Server Instance Synchronization)

Distributed Broadcast Architecture:

[Client A] ↔ [WebSocket Server 1] ↔ [Message Bus] ↔ [WebSocket Server 2]
       ↔ [WebSocket Server 3] ↔ [Message Bus] ↔ [WebSocket Server N]

Redis-Based Distributed Broadcast Implementation:

const Redis = require('ioredis');
const redis = new Redis();

class DistributedBroadcast {
  constructor(redisConfig) {
    this.redis = new Redis(redisConfig);
    this.pubSub = new Redis(redisConfig);
    this.localSubscriptions = new Map(); // channel -> Set<callback>

    this.setupPubSub();
  }

  setupPubSub() {
    this.pubSub.on('message', (channel, message) => {
      if (this.localSubscriptions.has(channel)) {
        const callbacks = this.localSubscriptions.get(channel);
        const parsedMessage = JSON.parse(message);

        for (const callback of callbacks) {
          try {
            callback(parsedMessage);
          } catch (error) {
            console.error(`Channel ${channel} callback execution error:`, error);
          }
        }
      }
    });
  }

  subscribe(channel, callback) {
    if (!this.localSubscriptions.has(channel)) {
      this.localSubscriptions.set(channel, new Set());
      this.pubSub.subscribe(channel);
    }

    this.localSubscriptions.get(channel).add(callback);

    // Return unsubscribe function
    return () => {
      this.unsubscribe(channel, callback);
    };
  }

  unsubscribe(channel, callback) {
    if (this.localSubscriptions.has(channel)) {
      const callbacks = this.localSubscriptions.get(channel);
      callbacks.delete(callback);

      if (callbacks.size === 0) {
        this.localSubscriptions.delete(channel);
        this.pubSub.unsubscribe(channel);
      }
    }
  }

  publish(channel, message) {
    // Local publish
    if (this.localSubscriptions.has(channel)) {
      const callbacks = this.localSubscriptions.get(channel);
      const messageStr = JSON.stringify(message);

      for (const callback of callbacks) {
        try {
          callback(message);
        } catch (error) {
          console.error(`Channel ${channel} callback execution error:`, error);
        }
      }
    }

    // Distributed publish
    this.redis.publish(channel, JSON.stringify(message));
  }
}

// Usage example
const broadcast = new DistributedBroadcast({
  host: 'redis-host',
  port: 6379
});

// Subscribe to channel
const unsubscribe = broadcast.subscribe('chat_room_1', (message) => {
  console.log('Received message:', message);
});

// Publish message
broadcast.publish('chat_room_1', {
  type: 'text',
  sender: 'user123',
  content: 'Hello, distributed world!'
});

// Unsubscribe
unsubscribe();

Message Queue-Based Distributed Broadcast:

const { Kafka } = require('kafkajs');

class KafkaBroadcast {
  constructor(kafkaConfig) {
    this.kafka = new Kafka(kafkaConfig);
    this.producer = this.kafka.producer();
    this.consumer = this.kafka.consumer({ groupId: 'websocket-broadcast-group' });
    this.subscriptions = new Map(); // topic -> Set<callback>

    this.setupConsumer();
  }

  async connect() {
    await this.producer.connect();
    await this.consumer.connect();
  }

  async disconnect() {
    await this.producer.disconnect();
    await this.consumer.disconnect();
  }

  async setupConsumer() {
    await this.consumer.subscribe({ topic: '*', fromBeginning: false });

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        if (this.subscriptions.has(topic)) {
          const callbacks = this.subscriptions.get(topic);
          const parsedMessage = JSON.parse(message.value.toString());

          for (const callback of callbacks) {
            try {
              callback(parsedMessage);
            } catch (error) {
              console.error(`Topic ${topic} callback execution error:`, error);
            }
          }
        }
      }
    });
  }

  subscribe(topic, callback) {
    if (!this.subscriptions.has(topic)) {
      this.subscriptions.set(topic, new Set());
    }

    this.subscriptions.get(topic).add(callback);

    // Return unsubscribe function
    return () => {
      this.unsubscribe(topic, callback);
    };
  }

  unsubscribe(topic, callback) {
    if (this.subscriptions.has(topic)) {
      const callbacks = this.subscriptions.get(topic);
      callbacks.delete(callback);

      if (callbacks.size === 0) {
        this.subscriptions.delete(topic);
      }
    }
  }

  async publish(topic, message) {
    // Local publish
    if (this.subscriptions.has(topic)) {
      const callbacks = this.subscriptions.get(topic);
      const messageStr = JSON.stringify(message);

      for (const callback of callbacks) {
        try {
          callback(message);
        } catch (error) {
          console.error(`Topic ${topic} callback execution error:`, error);
        }
      }
    }

    // Distributed publish
    await this.producer.send({
      topic,
      messages: [{ value: JSON.stringify(message) }]
    });
  }
}

// Usage example
(async () => {
  const broadcast = new KafkaBroadcast({
    clientId: 'websocket-broadcast',
    brokers: ['kafka1:9092', 'kafka2:9092']
  });

  await broadcast.connect();

  // Subscribe to topic
  const unsubscribe = broadcast.subscribe('chat_room_1', (message) => {
    console.log('Received message:', message);
  });

  // Publish message
  await broadcast.publish('chat_room_1', {
    type: 'text',
    sender: 'user123',
    content: 'Hello, Kafka world!'
  });

  // Unsubscribe
  unsubscribe();

  await broadcast.disconnect();
})();

Connection Management and Scaling

Client Connection State Monitoring

Connection State Monitoring Implementation:

class ConnectionMonitor {
  constructor() {
    this.connections = new Map(); // connectionId -> {ws, state, stats}
    this.statsInterval = 5000; // Collect stats every 5 seconds
    this.monitorInterval = null;
  }

  addConnection(connectionId, ws) {
    this.connections.set(connectionId, {
      ws,
      state: 'connected',
      stats: {
        messagesReceived: 0,
        messagesSent: 0,
        bytesReceived: 0,
        bytesSent: 0,
        lastActivity: Date.now()
      }
    });

    // Set up WebSocket event handlers
    ws.on('message', (message) => {
      const connection = this.connections.get(connectionId);
      if (connection) {
        connection.stats.messagesReceived++;
        connection.stats.bytesReceived += message.length;
        connection.stats.lastActivity = Date.now();
      }
    });

    ws.on('close', () => {
      this.removeConnection(connectionId);
    });

    ws.on('error', () => {
      this.updateConnectionState(connectionId, 'error');
    });

    // Start monitoring if not already started
    if (!this.monitorInterval) {
      this.startMonitoring();
    }
  }

  removeConnection(connectionId) {
    if (this.connections.has(connectionId)) {
      const connection = this.connections.get(connectionId);
      connection.state = 'closed';

      // Log connection stats
      this.logConnectionStats(connectionId, connection.stats);

      this.connections.delete(connectionId);

      // Stop monitoring if no more connections
      if (this.connections.size === 0) {
        this.stopMonitoring();
      }
    }
  }

  updateConnectionState(connectionId, newState) {
    if (this.connections.has(connectionId)) {
      this.connections.get(connectionId).state = newState;
    }
  }

  startMonitoring() {
    this.monitorInterval = setInterval(() => {
      const now = Date.now();
      const inactiveConnections = [];

      // Check all connection states
      for (const [connectionId, connection] of this.connections) {
        // Check for inactive connections
        if (now - connection.stats.lastActivity > 30000) { // 30 seconds inactive
          inactiveConnections.push(connectionId);
        }

        // Collect additional metrics here
      }

      // Handle inactive connections
      for (const connectionId of inactiveConnections) {
        console.log(`Connection ${connectionId} inactive, possibly disconnected`);
        // Can try sending ping or close connection
      }
    }, this.statsInterval);
  }

  stopMonitoring() {
    if (this.monitorInterval) {
      clearInterval(this.monitorInterval);
      this.monitorInterval = null;
    }
  }

  logConnectionStats(connectionId, stats) {
    // Log stats to monitoring system
    console.log(`Connection ${connectionId} statistics:`, {
      messagesReceived: stats.messagesReceived,
      messagesSent: stats.messagesSent,
      bytesReceived: stats.bytesReceived,
      bytesSent: stats.bytesSent,
      duration: (stats.lastActivity - this.connections.get(connectionId).stats.lastActivity) / 1000 + ' seconds'
    });

    // Can integrate with Prometheus, Datadog, etc.
  }

  getConnectionStats(connectionId) {
    if (this.connections.has(connectionId)) {
      const connection = this.connections.get(connectionId);
      return {
        state: connection.state,
        ...connection.stats
      };
    }
    return null;
  }

  getAllConnectionStats() {
    const stats = [];
    for (const [connectionId, connection] of this.connections) {
      stats.push({
        connectionId,
        state: connection.state,
        ...connection.stats
      });
    }
    return stats;
  }
}

// Usage example
const monitor = new ConnectionMonitor();

// WebSocket server integration
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  const connectionId = generateUUID(); // Implement UUID generation
  monitor.addConnection(connectionId, ws);

  ws.connectionId = connectionId;

  ws.on('close', () => {
    monitor.removeConnection(connectionId);
  });
});

// Periodically print all connection stats
setInterval(() => {
  const stats = monitor.getAllConnectionStats();
  console.log('Current connection stats:', stats);
}, 60000); // Print every minute

Connection Limits and Load Balancing

Connection Limit Implementation:

class ConnectionLimiter {
  constructor(maxConnections) {
    this.maxConnections = maxConnections;
    this.currentConnections = 0;
    this.waitingQueue = [];
  }

  async acquireConnection() {
    if (this.currentConnections < this.maxConnections) {
      this.currentConnections++;
      return true;
    } else {
      // Return a Promise resolved when connection is available
      return new Promise((resolve) => {
        this.waitingQueue.push(resolve);
      });
    }
  }

  releaseConnection() {
    this.currentConnections--;

    // Release one waiting connection if any
    if (this.waitingQueue.length > 0) {
      const nextResolve = this.waitingQueue.shift();
      this.currentConnections++;
      nextResolve(true);
    }
  }

  getCurrentConnections() {
    return this.currentConnections;
  }
}

// WebSocket server integration
const limiter = new ConnectionLimiter(1000); // Max 1000 connections

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', async (ws) => {
  try {
    await limiter.acquireConnection();

    ws.on('close', () => {
      limiter.releaseConnection();
    });

    ws.on('error', () => {
      limiter.releaseConnection();
    });

    // Handle WebSocket messages...
  } catch (error) {
    console.error('Connection acquisition failed:', error);
    ws.close(1013, 'Server busy, please try again later'); // 1013 is Try Again Later
  }
});

// Load balancer strategy
class LoadBalancer {
  constructor(servers) {
    this.servers = servers; // List of servers
    this.serverLoads = new Map(); // server -> current connections

    // Initialize server loads
    for (const server of this.servers) {
      this.serverLoads.set(server, 0);
    }
  }

  getLeastLoadedServer() {
    let minLoad = Infinity;
    let selectedServer = null;

    for (const [server, load] of this.serverLoads) {
      if (load < minLoad) {
        minLoad = load;
        selectedServer = server;
      }
    }

    return selectedServer;
  }

  incrementServerLoad(server) {
    if (this.serverLoads.has(server)) {
      this.serverLoads.set(server, this.serverLoads.get(server) + 1);
    }
  }

  decrementServerLoad(server) {
    if (this.serverLoads.has(server)) {
      this.serverLoads.set(server, this.serverLoads.get(server) - 1);
    }
  }

  getConnectionCount(server) {
    return this.serverLoads.get(server) || 0;
  }
}

// Usage example
const servers = ['ws://server1:8080', 'ws://server2:8080', 'ws://server3:8080'];
const loadBalancer = new LoadBalancer(servers);

// Select least-loaded server for client connection
function connectToWebSocket() {
  const server = loadBalancer.getLeastLoadedServer();
  loadBalancer.incrementServerLoad(server);

  const ws = new WebSocket(server);

  ws.on('close', () => {
    loadBalancer.decrementServerLoad(server);
  });

  ws.on('error', () => {
    loadBalancer.decrementServerLoad(server);
  });

  return ws;
}

Server-Side Horizontal Scaling (WebSocket Cluster)

WebSocket Cluster Architecture:

[Client] ↔ [Load Balancer (Nginx/HAProxy)] ↔ [WebSocket Server 1]
                                    ↔ [WebSocket Server 2]
                                    ↔ [WebSocket Server N]

Redis-Based WebSocket Cluster Implementation:

const Redis = require('ioredis');
const redis = new Redis();

class ClusterWebSocketManager {
  constructor() {
    this.localConnections = new Map(); // connectionId -> ws
    this.redisSub = new Redis();
    this.setupRedisPubSub();
  }

  setupRedisPubSub() {
    // Subscribe to cluster message channel
    this.redisSub.subscribe('websocket:cluster:messages');

    this.redisSub.on('message', (channel, message) => {
      if (channel === 'websocket:cluster:messages') {
        const { connectionId, data } = JSON.parse(message);

        // If message is not for local connection, forward
        if (!this.localConnections.has(connectionId)) {
          this.logCrossNodeMessage(connectionId);
          return;
        }

        const ws = this.localConnections.get(connectionId);
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(data);
        }
      }
    });
  }

  addConnection(connectionId, ws) {
    this.localConnections.set(connectionId, ws);

    // Set up WebSocket event handlers
    ws.on('message', (message) => {
      this.handleLocalMessage(connectionId, message);
    });

    ws.on('close', () => {
      this.removeConnection(connectionId);
    });

    ws.on('error', () => {
      this.removeConnection(connectionId);
    });
  }

  removeConnection(connectionId) {
    this.localConnections.delete(connectionId);
  }

  sendToConnection(connectionId, message) {
    if (this.localConnections.has(connectionId)) {
      // Local connection, send directly
      const ws = this.localConnections.get(connectionId);
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(message);
      }
    } else {
      // Cross-node connection, publish via Redis
      redis.publish('websocket:cluster:messages', JSON.stringify({
        connectionId,
        data: message
      }));

      this.logCrossNodeMessage(connectionId);
    }
  }

  broadcast(message) {
    // Local broadcast
    for (const [connectionId, ws] of this.localConnections) {
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(message);
      }
    }

    // Cluster broadcast
    redis.publish('websocket:cluster:broadcast', message);
  }

  handleLocalMessage(connectionId, message) {
    // Handle local message logic...
    if (this.isMessageForOtherNodes(message)) {
      redis.publish('websocket:cluster:messages', JSON.stringify({
        connectionId: this.extractTargetConnectionId(message),
        data: message
      }));
    }
  }

  logCrossNodeMessage(connectionId) {
    // Log cross-node message stats
    console.log(`Cross-node message: ${connectionId}`);
  }

  isMessageForOtherNodes(message) {
    // Determine if message needs to be sent to other nodes
    return false;
  }

  extractTargetConnectionId(message) {
    // Extract target connection ID from message
    return null;
  }
}

// Usage example
const clusterManager = new ClusterWebSocketManager();

// WebSocket server integration
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  const connectionId = generateUUID();
  clusterManager.addConnection(connectionId, ws);

  ws.on('close', () => {
    clusterManager.removeConnection(connectionId);
  });
});

Nginx-Based WebSocket Load Balancer Configuration:

http {
    upstream websocket_servers {
        ip_hash;  # Use ip_hash for sticky sessions
        server ws1.example.com:8080;
        server ws2.example.com:8080;
        server ws3.example.com:8080;
    }

    server {
        listen 80;
        server_name websocket.example.com;

        location /ws {
            proxy_pass http://websocket_servers;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header Host $host;

            # Timeout settings
            proxy_read_timeout 86400s; # 24 hours
            proxy_send_timeout 86400s;
        }
    }
}

Distributed Session Management (Redis/ZooKeeper)

Redis-Based Session Management:

const Redis = require('ioredis');
const redis = new Redis();

class RedisSessionManager {
  constructor() {
    this.sessionTTL = 3600; // Session TTL in seconds
  }

  createSession(userId, connectionId, metadata = {}) {
    const sessionId = generateUUID();
    const now = Math.floor(Date.now() / 1000);

    const sessionData = {
      userId,
      connectionId,
      createdAt: now,
      expiresAt: now + this.sessionTTL,
      metadata
    };

    // Store session data
    redis.hmset(`session:${sessionId}`, sessionData);

    // Set expiration
    redis.expire(`session:${sessionId}`, this.sessionTTL);

    // Map user to session
    redis.set(`user:${userId}:session`, sessionId);

    // Map connection ID to session
    redis.set(`connection:${connectionId}:session`, sessionId);

    return sessionId;
  }

  getSession(sessionId) {
    return redis.hgetall(`session:${sessionId}`);
  }

  getUserSessions(userId) {
    return new Promise((resolve) => {
      redis.get(`user:${userId}:session`, (err, sessionId) => {
        if (err || !sessionId) {
          resolve(null);
          return;
        }

        this.getSession(sessionId).then((session) => {
          resolve(session || null);
        });
      });
    });
  }

  getConnectionSession(connectionId) {
    return new Promise((resolve) => {
      redis.get(`connection:${connectionId}:session`, (err, sessionId) => {
        if (err || !sessionId) {
          resolve(null);
          return;
        }

        this.getSession(sessionId).then((session) => {
          resolve(session || null);
        });
      });
    });
  }

  updateSession(sessionId, updates) {
    // Update session data
    redis.hmset(`session:${sessionId}`, updates);

    // Refresh expiration
    redis.expire(`session:${sessionId}`, this.sessionTTL);
  }

  deleteSession(sessionId) {
    return new Promise((resolve) => {
      redis.hgetall(`session:${sessionId}`, (err, session) => {
        if (err || !session) {
          resolve(false);
          return;
        }

        // Delete all related keys
        const multi = redis.multi();
        multi.del(`session:${sessionId}`);
        multi.del(`user:${session.userId}:session`);
        multi.del(`connection:${session.connectionId}:session`);
        multi.exec(() => {
          resolve(true);
        });
      });
    });
  }

  refreshSession(sessionId) {
    // Refresh session TTL
    redis.expire(`session:${sessionId}`, this.sessionTTL);
  }
}

// Usage example
const sessionManager = new RedisSessionManager();

// Create session on WebSocket connection
wss.on('connection', (ws) => {
  const userId = authenticateUser(ws); // Implement user authentication
  const connectionId = generateUUID();

  const sessionId = sessionManager.createSession(userId, connectionId, {
    ip: ws._socket.remoteAddress,
    userAgent: ws._socket.remoteFamily
  });

  ws.sessionId = sessionId;
  ws.connectionId = connectionId;

  ws.on('close', () => {
    sessionManager.deleteSession(sessionId);
  });
});

// Fetch user session info
function getUserSessionInfo(userId) {
  return sessionManager.getUserSessions(userId);
}

Connection Migration and Recovery Mechanisms

Connection Migration Design:

class ConnectionMigrator {
  constructor() {
    this.pendingMigrations = new Map(); // connectionId -> {targetServer, retryCount}
    this.migrationTimeout = 10000; // 10-second migration timeout
  }

  initiateMigration(connectionId, targetServer) {
    if (this.pendingMigrations.has(connectionId)) {
      throw new Error(`Connection ${connectionId} already has an ongoing migration`);
    }

    this.pendingMigrations.set(connectionId, {
      targetServer,
      retryCount: 0
    });

    this.startMigrationProcess(connectionId);
  }

  startMigrationProcess(connectionId) {
    const migration = this.pendingMigrations.get(connectionId);
    if (!migration) {
      return;
    }

    this.notifyTargetServer(migration.targetServer, connectionId)
      .then(() => {
        this.pauseMessageSending(connectionId)
          .then(() => {
            this.transferConnectionState(connectionId, migration.targetServer)
              NotifyTargetServer(targetServer, connectionId);
              .then(() => {
                this.notifyClientToReconnect(connectionId, migration.targetServer)
                  .then(() => {
                    this.confirmMigrationComplete(connectionId)
                      .then(() => {
                        this.pendingMigrations.delete(connectionId);
                      })
                      .catch(error => {
                        this.handleMigrationError(connectionId, error);
                      });
                  })
                  .catch(error => {
                    this.handleMigrationError(connectionId, error);
                  });
              })
              .catch(error => {
                this.handleMigrationError(connectionId, error);
              });
          })
          .catch(error => {
            this.handleMigrationError(connectionId, error);
          });
      })
        .catch(error => {
          this.handleMigrationError(connectionId, error);
        });
  }

  notifyTargetServer(targetServer, connectionId) {
    return new Promise((resolve, reject) => {
      // Implement notification to target server
      console.log(`Notify target server ${targetServer} to prepare for connection ${connectionId}`);

      // Simulate async operation
      setTimeout(() => {
        if (Math.random() > 0.1) { // 90% success rate
          resolve();
        } else {
          reject(new Error('Target server preparation failed'));
        }
      }, 500);
    });
  }

  pauseMessageSending(connectionId) {
    return new Promise((resolve, reject) => {
      // Implement message sending pause
      console.log(`Pausing message sending for connection ${connectionId}`);

      // Simulate async operation
      setTimeout(() => {
        if (Math.random() > 0.05) { // 95% success rate
          resolve();
        } else {
          reject(new Error('Pausing message sending failed'));
        }
      }, 300);
    });
  }

  transferConnectionState(connectionId, targetServer) {
    return new Promise((resolve, reject) => {
      // Implement connection state transfer
      console.log(`Transferring state for connection ${connectionId} to server ${targetServer}`);

      // Simulate async operation
      setTimeout(() => {
        if (Math.random() > 0.1) { // 90% success rate
          resolve();
        } else {
          reject(new Error('State transfer failed'));
        }
      }, 1000);
    });
  }

  notifyClientToReconnect(connectionId, targetServer) {
    return new Promise((resolve, reject) => {
      // Implement client reconnect notification
      console.log(`Notifying client of connection ${connectionId} to reconnect to ${targetServer}`);

      // Simulate async operation
      setTimeout(() => {
        if (Math.random() > 0.2) { // 80% success rate
          resolve();
        } else {
          reject(new Error('Client reconnect notification failed'));
        }
      }, 800);
    });
  }

  confirmMigrationComplete(connectionId) {
    return new Promise((resolve, reject) => {
      // Implement migration completion confirmation
      console.log(`Confirming migration completion for ${connectionId}`);

      // Simulate async operation
      setTimeout(() => {
        if (Math.random() > 0.05) { // 95% success rate
          resolve();
        } else {
          reject(new Error('Migration confirmation failed')));
        }
      }, 400);
    });
  }

  handleMigrationError(connectionId, error) {
    const migration = this.pendingMigrations.get(connectionId);
    if (!migration) {
      return;
    }

    migration.retryCount++;

    if (migration.retryCount < 3) {
      console.log(`Migration failed for ${connectionId}, retrying ${migration.retryCount}/3`);
      setTimeout(() => {
        this.startMigrationProcess(connectionId);
      }, 2000); // Retry after 2 seconds
    } else {
      console.error(`Migration failed for ${connectionId}, exceeded max retries:`, error);
      this.pendingMigrations.delete(connectionId);

      // Can notify monitoring system or take recovery actions
    }
  }

  // Connection recovery mechanism
  handleConnectionDrop(connectionId) {
    if (this.pendingMigrations.has(connectionId)) {
      console.log(`Connection ${connectionId} dropped, ongoing migration may have completed`);
      return;
    }

    // Implement connection recovery logic
    console.log(`Connection ${connectionId} dropped, initiating recovery process`);

    // Can integrate with reconnect strategy here
  }
}

// Usage example
const migrator = new ConnectionMigrator();

// Initiate migration
migrator.initiateMigration('conn123', 'ws://new-server:8080');

Client Connection Recovery Implementation:

class ResilientWebSocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.options = {
      maxReconnectAttempts: 5,
      initialDelay: 1000,
      maxDelay: 30000,
      ...options
    };

    this.reconnectAttempts = 0;
    this.reconnectTimer = null;
    this.ws = null;
    this.pendingMessages = [];
    this.connectionId = null;
    this.sessionId = null;

    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('WebSocket connection established');
      this.reconnectAttempts = 0; // Reset reconnect counter

      // Send all pending messages
      this.flushPendingMessages();

      // Restore session if available
      if (this.sessionId) {
        this.restoreSession();
      }
    };

    this.ws.onmessage = (event) => {
      try {
        const message = JSON.parse(event.data);

        // Handle session restoration response
        if (message.type === 'session_restored') {
          this.sessionId = message.sessionId;
          this.connectionId = message.connectionId;
          console.log('Session restored successfully:', this.sessionId);
          return;
        }

        // Handle other messages...
        this.handleMessage(message);
      } catch (error) {
        console.error('Message parsing error:', error);
      }
    };

    this.ws.onclose = () => {
      console.log('WebSocket connection closed');
      this.scheduleReconnect();
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
      this.scheduleReconnect();
    };
  }

  sendMessage(message) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      // Connection not open, cache message
      this.pendingMessages.push(message);

      // For important messages, try reconnecting immediately
      if (message.important) {
        this.connect();
      }
    }
  }

  flushPendingMessages() {
    while (this.pendingMessages.length > 0 && this.ws.readyState === WebSocket.OPEN) {
      const message = this.pendingMessages.shift();
      this.ws.send(JSON.stringify(message));
    }
  }

  scheduleReconnect() {
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
    }

    if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
      console.log('Max reconnect attempts reached');
      if (this.options.onmaxreconnect) {
        this.options.onmaxreconnect();
      }
      return;
    }

    // Calculate exponential backoff delay
    const delay = Math.min(
      this.options.initialDelay * Math.pow(2, this.reconnectAttempts),
      this.options.maxDelay
    );

    this.reconnectAttempts++;
    console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.options.maxReconnectAttempts})`);

    // Clear previous timer
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
    }

    // Set new timer
    this.reconnectTimer = setTimeout(() => {
      this.connect();
    }, delay);
  }

  restoreSession() {
    if (!this.sessionId || !this.connectionId) {
      console.error('Cannot restore session: missing sessionId or connectionId');
      return;
    }

    this.sendMessage({
      type: 'restore_session',
      sessionId: this.sessionId,
      connectionId: this.connectionId
    });
  }

  handleMessage(message) {
    switch (message.type) {
      case 'session_restored':
        this.sessionId = message.sessionId;
        this.connectionId = message.connectionId;
        console.log('Session restored successfully:', this.sessionId);
        break;
      // Handle other message types...
    }
  }

  // Handle server migration request
  handleMigrationRequest(newServerUrl) {
    console.log('Received migration request to new server:', newServerUrl);

    // Save current state
    const currentState = this.saveState();

    // Close current connection
    if (this.ws) {
      this.ws.close();
    }

    // Update URL and reconnect
    this.url = newServerUrl;

    // Restore state
    this.restoreState(currentState);

    // Reconnect
    this.connect();
  }

  saveState() {
    return {
      pendingMessages: this.pendingMessages,
      sessionId: this.sessionId,
      connectionId: this.connectionId
    };
  }

  restoreState(state) {
    this.pendingMessages = state.pendingMessages || [];
    this.sessionId = state.sessionId;
    this.connectionId = state.connectionId;
  }
}

// Usage example
const client = new ResilientWebSocketClient('ws://example.com/socket', {
  maxReconnectAttempts: 10,
  initialDelay: 1000,
  maxDelay: 60000,
  onmaxreconnect: () => {
    console.log('Cannot connect to server, please check network or try again later');
  }
});

// Send message
client.sendMessage({ type: 'chat', content: 'Hello' });

// Important message
client.sendMessage({ 
  type: 'important_command', 
  action: 'update_settings',
  important: true 
});

Comprehensive Application Case Study

Real-Time Collaborative Editing System

Architecture Design:

[Client A] ↔ [WebSocket Server 1] ↔ [Redis Pub/Sub] ↔ [WebSocket Server 2]
                           ↗ [Redis] ↘ [Database]
                           ↗ WebSocket ↘
[WebSocket Server 3]

Implementation Key Points:

  1. Operational Transformation (OT): Resolve conflicting edits
  2. Real-Time Synchronization: Broadcast edit operations via WebSocket
  3. State Persistence: Periodically save to database
  4. Connection Recovery: Restore editing state after disconnection

Core Code Example:

class CollaborativeEditor {
  constructor(documentId) {
    this.documentId = documentId;
    this.operations = []; // Operation history
    this.clients = new Set(); // Connected clients
    this.pendingOperations = new Map(); // clientId -> [operations]
  }

  addClient(clientId, ws) {
    this.clients.add(clientId);
    this.pendingOperations.set(clientId, []);

    // Send current document state to new client
    this.sendDocumentState(clientId, ws);

    // Set up message handler
    ws.on('message', (message) => {
      this.handleClientMessage(clientId, message);
    });

    ws.on('close', () => {
      this.removeClient(clientId);
    });
  }

  removeClient(clientId) {
    this.clients.delete(clientId);
    this.pendingOperations.delete(clientId);
  }

  handleClientMessage(clientId, message) {
    try {
      const msg = JSON.parse(message);

      if (msg.type === 'operation') {
        // Handle edit operation
        const operation = this.transformOperation(msg.operation);
        this.operations.push(operation);

        // Broadcast to other clients
        this.broadcastOperation(clientId, operation);

        // Save operation to persistent storage
        this.saveOperation(operation);
      } else if (msg.type === 'cursor') {
        // Handle cursor position update
        this.broadcastCursor(clientId, msg.cursor);
      }
    } catch (error) {
      console.error('Client message processing error:', error);
    }
  }

  transformOperation(operation) {
    // Implement operation transformation
    return operation; // Simplified example
  }

  broadcastOperation(senderId, operation) {
    const message = JSON.stringify({
      type: 'operation',
      operation
    });

    for (const clientId of this.clients) {
      if (clientId !== senderId) {
        const ws = this.getClientWebSocket(clientId);
        if (ws && ws.readyState === WebSocket.OPEN) {
          ws.send(message);
        }
      }
    }
  }

  broadcastCursor(senderId, cursor) {
    const message = JSON.stringify({
      type: 'cursor',
      clientId: senderId,
      cursor
    });

    for (const clientId of this.clients) {
      if (clientId !== senderId) {
        const ws = this.getClientWebSocket(clientId);
        if (ws && ws.readyState === WebSocket.OPEN) {
          ws.send(message);
        }
      }
    }
  }

  sendDocumentState(clientId, ws) {
    // Get current document state
    const documentState = this.getDocumentState();

    const message = JSON.stringify({
      type: 'init',
      document: documentState
    });

    if (ws && ws.readyState === WebSocket.OPEN) {
      ws.send(message);
    }
  }

  saveOperation(operation) {
    // Implement persistent operation saving
    console.log('Saving operation:', operation);
  }

  getDocumentState() {
    // Implement document state retrieval
    return { content: '...' }; // Placeholder
  }

  getClientWebSocket(clientId) {
    // Implement WebSocket retrieval
    return null;
  }
}

// WebSocket server integration
const wss = new WebSocket.Server({ port: 8080 });
const editors = new Map(); // documentId -> CollaborativeEditor

wss.on('connection', (ws) => {
  ws.on('message', (message) => {
    try {
      const msg = JSON.parse(message);

      if (msg.type === 'join') {
        const documentId = msg.documentId;
        const clientId = generateUUID();

        if (!editors.has(documentId)) {
          editors.set(documentId, new CollaborativeEditor(documentId));
        }

        const editor = editors.get(documentId);
        editor.addClient(clientId, ws);

        // Send client ID to ws
        ws.send(JSON.stringify({
          type: 'joined',
          clientId
        }));
      }
    } catch (error) {
      console.error('Connection message processing error:', error);
    }
  });

  ws.on('close', () => {
    // Implement client disconnection cleanup
    // Need to track clientId to ws mapping
  });
});
Share your love