Lesson 12-WebSocket Applications in IoT

WebSocket, as a full-duplex communication protocol, has unique advantages in the Internet of Things (IoT) domain:

  1. Real-time Bidirectional Communication: Devices and servers/clients can instantly exchange data.
  2. Low Latency: Significantly reduces communication latency compared to HTTP polling.
  3. Persistent Connection: Avoids the overhead of repeatedly establishing connections.
  4. Lightweight: Small header overhead, suitable for resource-constrained IoT devices.
  5. Protocol Extensibility: Can carry IoT protocols such as MQTT and CoAP.

Typical Application Scenarios

Remote Device Monitoring and Control

Architecture Design:

[IoT Device] ↔ [WebSocket Gateway] ↔ [Monitoring Platform/Mobile App]

Implementation Key Points:

  • Real-time device status reporting (temperature, humidity, battery level, etc.)
  • Remote control command issuance (switch, parameter adjustment, etc.)
  • Instant alert notifications

Code Example (Device Side – Node.js Simulation):

const WebSocket = require('ws');
const ws = new WebSocket('ws://iot-gateway.example.com:8080');

// Device Authentication
ws.on('open', () => {
  ws.send(JSON.stringify({
    type: 'auth',
    deviceId: 'sensor-001',
    token: 'device-secret-token'
  }));
});

// Receive Control Commands
ws.on('message', (message) => {
  const cmd = JSON.parse(message);
  if (cmd.type === 'control') {
    handleControlCommand(cmd);
  }
});

function handleControlCommand(cmd) {
  console.log('Received control command:', cmd);
  // Execute device control logic...
}

// Periodic Status Reporting
setInterval(() => {
  const sensorData = readSensorData(); // Read sensor data
  ws.send(JSON.stringify({
    type: 'status',
    deviceId: 'sensor-001',
    timestamp: Date.now(),
    data: sensorData
  }));
}, 5000); // Report every 5 seconds

Industrial IoT (IIoT) Data Collection

Typical Architecture:

[PLC/Industrial Device] ↔ [OPC UA/Modbus Gateway] ↔ [WebSocket Server] ↔ [MES/ERP System]

Key Features:

  • High-frequency data collection (millisecond level)
  • Time-series data processing
  • Edge computing preprocessing
  • Protocol conversion (OPC UA → WebSocket)

Data Format Example:

{
  "deviceId": "plc-101",
  "timestamp": 1634567890123,
  "values": [
    {"tag": "temperature", "value": 42.5, "unit": "°C"},
    {"tag": "pressure", "value": 101.3, "unit": "kPa"},
    {"tag": "vibration", "value": 0.15, "unit": "mm/s"}
  ],
  "quality": "good"
}

Smart Home Control

System Architecture:

[Smart Device] ↔ [Home Gateway] ↔ [WebSocket Server] ↔ [Mobile App/Voice Assistant]

Functional Features:

  • Device status synchronization (lights, curtains, air conditioning, etc.)
  • Scene linkage (away mode, sleep mode)
  • Real-time voice command response
  • Remote access control

Security Considerations:

  • Device authentication and encryption
  • User permission management
  • Operation audit logging

Technical Implementation Solutions

Connection Management Optimization

Device Connection Pool Design:

class IoTConnectionPool {
  constructor(maxConnections) {
    this.maxConnections = maxConnections;
    this.activeConnections = new Map(); // deviceId -> WebSocket
    this.pendingQueue = [];
  }

  addConnection(deviceId, ws) {
    if (this.activeConnections.size >= this.maxConnections) {
      this.pendingQueue.push({deviceId, ws});
      return false;
    }

    this.activeConnections.set(deviceId, ws);
    setupConnectionHandlers(ws, deviceId);
    return true;
  }

  removeConnection(deviceId) {
    this.activeConnections.delete(deviceId);

    if (this.pendingQueue.length > 0) {
      const next = this.pendingQueue.shift();
      this.addConnection(next.deviceId, next.ws);
    }
  }

  getConnection(deviceId) {
    return this.activeConnections.get(deviceId);
  }

  broadcast(message, filterFn = () => true) {
    for (const [deviceId, ws] of this.activeConnections) {
      if (filterFn(deviceId) && ws.readyState === WebSocket.OPEN) {
        ws.send(JSON.stringify(message));
      }
    }
  }
}

function setupConnectionHandlers(ws, deviceId) {
  ws.on('message', (data) => {
    // Handle device messages
  });

  ws.on('close', () => {
    connectionPool.removeConnection(deviceId);
  });

  ws.on('error', (err) => {
    console.error(`Device ${deviceId} connection error:`, err);
    connectionPool.removeConnection(deviceId);
  });
}

Message Protocol Design

IoT-Specific Message Format:

{
  "ver": "1.0",
  "msgid": "a1b2c3d4",
  "timestamp": 1634567890123,
  "from": "device-001",
  "to": "server",
  "type": "status|command|event|ack",
  "payload": {
    // Protocol-specific data
  },
  "qos": 1,  // Message quality level
  "retain": false
}

QoS Level Implementation:

  1. QoS 0: At most once (suitable for non-critical data)
  2. QoS 1: At least once (with confirmation mechanism)
  3. QoS 2: Exactly once (for transactional operations)

Confirmation Mechanism Example:

// Device-side QoS 1 Implementation
function sendWithAck(ws, message) {
  return new Promise((resolve, reject) => {
    const msgId = generateUUID();
    const timedMessage = {
      ...message,
      msgid: msgId
    };

    const timeout = setTimeout(() => {
      reject(new Error('ACK timeout'));
    }, 5000); // 5-second timeout

    const ackHandler = (response) => {
      if (response.msgid === msgId) {
        clearTimeout(timeout);
        ws.off('message', ackHandler);
        resolve(response);
      }
    };

    ws.on('message', ackHandler);
    ws.send(JSON.stringify(timedMessage));
  });
}

Data Compression and Optimization

Binary Protocol Design:

// Define message format using Protocol Buffers
syntax = "proto3";

message IoTData {
  string device_id = 1;
  int64 timestamp = 2;
  repeated SensorReading readings = 3;
}

message SensorReading {
  string sensor_id = 1;
  float value = 2;
  string unit = 3;
}

// Device-side encoding example
const protobuf = require('protobufjs');
const root = protobuf.loadSync('iot_data.proto');
const IoTData = root.lookupType('IoTData');

function encodeSensorData(deviceId, readings) {
  const payload = {
    device_id: deviceId,
    timestamp: Date.now(),
    readings: readings.map(r => ({
      sensor_id: r.sensorId,
      value: r.value,
      unit: r.unit
    }))
  };

  const errMsg = IoTData.verify(payload);
  if (errMsg) throw Error(errMsg);

  const message = IoTData.create(payload);
  return IoTData.encode(message).finish();
}

// Send binary data
ws.send(encodeSensorData('sensor-001', [
  {sensorId: 'temp', value: 23.5, unit: 'C'},
  {sensorId: 'humid', value: 45.2, unit: '%'}
]));

Advanced Architecture Patterns

Edge Computing Integration

Edge-Cloud Collaborative Architecture:

[IoT Device] ↔ [Edge Gateway] ↔ [WebSocket Server] ↔ [Cloud Platform]

Edge Processing Logic:

  1. Data preprocessing (filtering, aggregation)
  2. Local decision-making (anomaly detection)
  3. Caching and batch reporting
  4. Offline operation support

Edge Gateway Example:

class EdgeGateway {
  constructor() {
    this.localDevices = new Map();
    this.cloudConnection = null;
    this.dataBuffer = [];
    this.bufferSize = 100;
    this.flushInterval = 5000; // 5 seconds
  }

  addDevice(device) {
    this.localDevices.set(device.id, device);
    device.on('data', (data) => this.handleDeviceData(device.id, data));
  }

  handleDeviceData(deviceId, data) {
    // Local processing (e.g., simple filtering)
    if (this.shouldProcessLocally(data)) {
      this.processLocally(deviceId, data);
    } else {
      // Buffer to cloud
      this.dataBuffer.push({
        deviceId,
        data,
        timestamp: Date.now()
      });

      if (this.dataBuffer.length >= this.bufferSize) {
        this.flushToCloud();
      }
    }
  }

  async flushToCloud() {
    if (this.dataBuffer.length === 0 || !this.cloudConnection) return;

    const batch = this.dataBuffer.splice(0, this.dataBuffer.length);
    try {
      await this.sendToCloud(batch);
    } catch (error) {
      console.error('Cloud sending failed, re-buffering data');
      this.dataBuffer.unshift(...batch);
    }
  }

  start() {
    // Periodically flush buffer
    setInterval(() => this.flushToCloud(), this.flushInterval);

    // Establish cloud connection
    this.connectToCloud();
  }

  connectToCloud() {
    this.cloudConnection = new WebSocket('ws://cloud.example.com:8080');

    this.cloudConnection.on('open', () => {
      console.log('Cloud connection established');
      this.flushToCloud(); // Send buffered data upon connection
    });

    this.cloudConnection.on('message', (message) => {
      this.handleCloudMessage(message);
    });

    this.cloudConnection.on('close', () => {
      console.log('Cloud connection closed');
      this.cloudConnection = null;
      // Implement reconnection logic...
    });
  }
}

MQTT over WebSocket

Protocol Conversion Architecture:

[MQTT Device] ↔ [MQTT Broker] ↔ [WebSocket Gateway] ↔ [Web Client]

Implementation Solution:

  1. Use Eclipse Paho or Mosquitto as MQTT Broker
  2. WebSocket gateway implements MQTT protocol conversion
  3. Web client subscribes to MQTT topics via WebSocket

Node.js Implementation Example:

const WebSocket = require('ws');
const mqtt = require('mqtt');

// Create MQTT client
const mqttClient = mqtt.connect('mqtt://broker.example.com');

// Create WebSocket server
const wss = new WebSocket.Server({ port: 8080 });

// Client mapping
const clients = new Map(); // ws -> {mqttClient, subscriptions}

wss.on('connection', (ws) => {
  const clientInfo = {
    mqttClient: null,
    subscriptions: new Set()
  };

  clients.set(ws, clientInfo);

  // Create MQTT client (one per WebSocket connection)
  clientInfo.mqttClient = mqtt.connect('mqtt://broker.example.com');

  // Forward MQTT messages to WebSocket
  clientInfo.mqttClient.on('message', (topic, message) => {
    if (clientInfo.subscriptions.has(topic)) {
      ws.send(JSON.stringify({
        type: 'mqtt',
        topic,
        payload: message.toString()
      }));
    }
  });

  // Handle WebSocket messages
  ws.on('message', (message) => {
    try {
      const msg = JSON.parse(message);

      if (msg.type === 'subscribe') {
        clientInfo.mqttClient.subscribe(msg.topic);
        clientInfo.subscriptions.add(msg.topic);
      } else if (msg.type === 'publish') {
        clientInfo.mqttClient.publish(msg.topic, msg.payload);
      }
    } catch (error) {
      console.error('Message processing error:', error);
    }
  });

  ws.on('close', () => {
    // Clean up resources
    if (clientInfo.mqttClient) {
      clientInfo.subscriptions.forEach(topic => {
        clientInfo.mqttClient.unsubscribe(topic);
      });
      clientInfo.mqttClient.end();
    }
    clients.delete(ws);
  });
});

Security and Reliability

Security Measures

  1. Transport Security:
    • Enforce WSS (WebSocket Secure)
    • Regular certificate rotation
    • TLS 1.2+ configuration
  2. Authentication and Authorization:
    • Device-specific credentials (certificate/JWT)
    • Role-based access control (RBAC)
    • Device group permission management
  3. Data Security:
    • Sensitive data encryption
    • Integrity verification (MAC)
    • Replay attack prevention (timestamp/Nonce)

Authentication Example:

// Authentication process during device connection
ws.on('message', async (message) => {
  try {
    const authMsg = JSON.parse(message);

    if (authMsg.type === 'auth') {
      // Verify device credentials
      const isValid = await verifyDeviceCredentials(
        authMsg.deviceId,
        authMsg.token
      );

      if (isValid) {
        // Authentication successful, add to connection pool
        connectionPool.addConnection(authMsg.deviceId, ws);

        // Send authentication success response
        ws.send(JSON.stringify({
          type: 'auth_response',
          status: 'success',
          timestamp: Date.now()
        }));
      } else {
        // Authentication failed, close connection
        ws.close(4403, 'Authentication failed');
      }
    }
  } catch (error) {
    console.error('Authentication processing error:', error);
    ws.close(4400, 'Protocol error');
  }
});

Reliability Assurance

  1. Connection Reliability:
    • Heartbeat mechanism (Keepalive)
    • Automatic reconnection strategy
    • Connection status monitoring
  2. Message Reliability:
    • Message confirmation (ACK) mechanism
    • Retransmission queue
    • Message deduplication
  3. Fault Tolerance:
    • Multi-server load balancing
    • Connection migration
    • Data persistence

Heartbeat Implementation Example:

// Server-side heartbeat detection
class ConnectionManager {
  constructor() {
    this.connections = new Map(); // deviceId -> {ws, lastActivity}
    this.heartbeatInterval = 30000; // 30 seconds
    this.timeoutThreshold = 90000; // 90 seconds without activity considered timeout
  }

  addConnection(deviceId, ws) {
    this.connections.set(deviceId, {
      ws,
      lastActivity: Date.now()
    });

    // Set heartbeat timer
    const heartbeatTimer = setInterval(() => {
      const connection = this.connections.get(deviceId);
      if (!connection) {
        clearInterval(heartbeatTimer);
        return;
      }

      // Check for timeout
      if (Date.now() - connection.lastActivity > this.timeoutThreshold) {
        console.log(`Device ${deviceId} heartbeat timeout, closing connection`);
        ws.close(4001, 'Heartbeat timeout');
        this.connections.delete(deviceId);
        clearInterval(heartbeatTimer);
        return;
      }

      // Send heartbeat ping
      if (ws.readyState === WebSocket.OPEN) {
        ws.ping();
      }
    }, this.heartbeatInterval);

    // Store timer reference for cleanup
    this.connections.get(deviceId).heartbeatTimer = heartbeatTimer;
  }

  updateActivity(deviceId) {
    if (this.connections.has(deviceId)) {
      this.connections.get(deviceId).lastActivity = Date.now();
    }
  }

  handlePong(deviceId) {
    this.updateActivity(deviceId);
  }
}

// Client-side heartbeat handling
const heartbeatInterval = 25000; // 25 seconds (slightly less than server interval)
let heartbeatTimer;

ws.on('open', () => {
  // Start heartbeat timer
  heartbeatTimer = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
    }
  }, heartbeatInterval);
});

ws.on('pong', () => {
  // Received server pong, update activity time
  connectionManager.updateActivity(deviceId);
});

ws.on('close', () => {
  clearInterval(heartbeatTimer);
});

Performance Optimization

Connection Scaling Strategies

  1. Horizontal Scaling:
    • WebSocket server cluster
    • Sharding strategy based on device ID
    • Stateless design
  2. Load Balancing:
    • LVS/Nginx TCP load balancing
    • Session persistence (sticky session)
    • Health checks
  3. Resource Optimization:
    • Connection limit
    • Memory management
    • CPU affinity

Nginx Configuration Example:

upstream websocket_servers {
    # Use consistent hashing based on device ID
    hash $http_device_id consistent;

    server ws1.example.com:8080;
    server ws2.example.com:8080;
    server ws3.example.com:8080;
}

server {
    listen 80;
    server_name iot.example.com;

    location /ws {
        proxy_pass http://websocket_servers;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header Device-ID $http_device_id;

        # Timeout settings
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

Data Flow Optimization

  1. Batch Reporting:
    • Data aggregation
    • Time-window batch sending
  2. Differential Updates:
    • Send only changed data
    • Incremental status reporting
  3. Priority Queue:
    • Prioritize critical data transmission
    • Delay non-critical data

Batch Reporting Implementation:

class DataBatcher {
  constructor(flushInterval, maxBatchSize) {
    this.batch = [];
    this.flushInterval = flushInterval;
    this.maxBatchSize = maxBatchSize;
    this.timer = null;
  }

  addData(data) {
    this.batch.push(data);

    if (this.batch.length >= this.maxBatchSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.flushInterval);
    }
  }

  flush() {
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    if (this.batch.length > 0) {
      const batchToSend = this.batch.splice(0, this.batch.length);
      ws.send(JSON.stringify({
        type: 'batch',
        data: batchToSend
      }));
    }
  }
}

// Usage example
const batcher = new DataBatcher(5000, 100); // Flush every 5 seconds or 100 data points

// Device sensor data collection
setInterval(() => {
  const sensorData = readSensors();
  batcher.addData({
    deviceId: 'sensor-001',
    timestamp: Date.now(),
    data: sensorData
  });
}, 1000); // Collect every 1 second

Typical Application Cases

Smart Agriculture Monitoring System

Architecture Features:

  • Thousands of sensor nodes
  • Regional gateway aggregation
  • Real-time environmental monitoring
  • Automated control

Communication Model:

[Soil Sensor] → [Regional Gateway] → [WebSocket Server] → [Monitoring Platform]
[Weather Station] → [Regional Gateway] → [WebSocket Server] → [Mobile App]
[Irrigation Device] ← [WebSocket Server] ← [Control Center]

Smart Grid System

Key Functions:

  • Real-time power data collection
  • Load forecasting and scheduling
  • Fault detection and isolation
  • Distributed energy management

Technical Challenges:

  • High-frequency data (second-level)
  • Large-scale device connections
  • Strict real-time requirements
  • Data security and privacy

Industry 4.0 Smart Manufacturing

Application Scenarios:

  • Production equipment status monitoring
  • Predictive maintenance
  • Quality control
  • Supply chain collaboration

System Architecture:

[CNC Machine] ↔ [Edge Computing Node] ↔ [WebSocket Gateway] ↔ [MES System]
[AGV Cart] ↔ [Wireless AP] ↔ [WebSocket Server] ↔ [Logistics Management System]
[Quality Inspection Device] ↔ [Industrial Camera] ↔ [WebSocket Gateway] ↔ [Quality Analysis Platform]
Share your love