Lesson 36-Deno Inter-Process Communication

Deno provides a variety of inter-process communication (IPC) mechanisms, supporting everything from simple subprocess management to complex cross-process data exchange. This article explores Deno’s IPC implementation methods, application scenarios, and best practices in depth.

Basic IPC Mechanisms

Subprocess Management

Creating a Subprocess:

// Basic subprocess example
const child = Deno.run({
  cmd: ["echo", "Hello from child process"],
  stdout: "piped", // Capture standard output
  stderr: "piped"  // Capture error output
});

// Wait for process completion and retrieve output
const { code } = await child.status();
const output = new TextDecoder().decode(await child.output());
const error = new TextDecoder().decode(await child.stderrOutput());

console.log(`Exit code: ${code}`);
console.log(`Output: ${output}`);
if (error) console.error(`Error: ${error}`);

child.close(); // Release resources

Inter-Process Communication Pipe:

// Parent-child process pipe communication
const parentChild = Deno.run({
  cmd: ["deno", "run", "--allow-read", "child.ts"],
  stdin: "piped",  // Parent writes to child stdin
  stdout: "piped"  // Parent reads from child stdout
});

// Send data to child process
const encoder = new TextEncoder();
await parentChild.stdin.write(encoder.encode("message from parent\n"));

// Read response from child process
const response = new TextDecoder().decode(await parentChild.stdout.read(1024));
console.log(`Child response: ${response}`);

parentChild.stdin.close(); // Close pipe
await parentChild.status(); // Wait for process completion

Cross-Platform Process Management

Handling Windows and Unix Compatibility:

// Cross-platform command execution
function getPlatformCommand(command: string): string[] {
  if (Deno.build.os === "windows") {
    return ["cmd", "/C", command];
  } else {
    return ["sh", "-c", command];
  }
}

const cmd = getPlatformCommand("echo $HOME || echo %USERPROFILE%");
const process = Deno.run({ cmd });
await process.status();

Environment Variable Passing:

// Subprocess with environment variables
const envProcess = Deno.run({
  cmd: ["printenv"],
  env: {
    CUSTOM_VAR: "deno_ipc_test",
    PATH: Deno.env.get("PATH") || "" // Preserve system PATH
  },
  stdout: "piped"
});

const envOutput = new TextDecoder().decode(await envProcess.stdout.read(1024));
console.log(envOutput);

Advanced IPC Patterns

File Descriptor-Based IPC

Unix Domain Socket Communication:

// Unix domain socket server (Unix systems only)
if (Deno.build.os !== "windows") {
  const socketPath = "/tmp/deno_ipc.sock";

  // Clean up old socket file
  try { await Deno.remove(socketPath); } catch {}

  const server = Deno.listen({ path: socketPath, transport: "unix" });
  console.log(`Unix domain socket server listening: ${socketPath}`);

  for await (const conn of server) {
    const messages = [];
    const reader = new BufReader(conn);
    let line: string;

    while ((line = await reader.readLine()) !== null) {
      messages.push(line.toString());
      if (line.includes("EOF")) break;
    }

    console.log("Received messages:", messages);
    conn.close();
  }
}

// Unix domain socket client
if (Deno.build.os !== "windows") {
  const socketPath = "/tmp/deno_ipc.sock";
  const conn = await Deno.connect({ path: socketPath, transport: "unix" });

  const messages = ["Hello", "from", "client", "EOF"];
  for (const msg of messages) {
    await conn.write(new TextEncoder().encode(msg + "\n"));
  }

  conn.close();
}

Shared Memory-Based IPC

Shared Memory Using File Mapping:

// Shared memory communication (simplified example)
const sharedFile = await Deno.open("shared_memory.bin", {
  create: true,
  write: true,
  read: true
});

// Parent process writes data
const encoder = new TextEncoder();
await sharedFile.write(encoder.encode("Shared data from parent"));

// Child process reads data (executed in another process)
/*
const sharedFile = await Deno.open("shared_memory.bin", { read: true });
const buf = new Uint8Array(1024);
await sharedFile.read(buf);
console.log("Child received:", new TextDecoder().decode(buf));
*/

sharedFile.close();

Message Channel-Based IPC

Message Passing with Worker Threads:

// Main thread
const worker = new Worker(new URL("worker.ts", import.meta.url).href, {
  type: "module",
  deno: { namespace: true }
});

// Send message to Worker
worker.postMessage({ type: "greet", name: "Deno" });

// Receive message from Worker
worker.onmessage = (e) => {
  console.log("Received from worker:", e.data);
};

// worker.ts
self.onmessage = (e) => {
  if (e.data.type === "greet") {
    console.log(`Worker received: ${e.data.name}`);
    self.postMessage({ reply: `Hello, ${e.data.name} from Worker` });
  }
};

Complex IPC Scenario Implementations

Process Pool Management

Process Pool Implementation:

class ProcessPool {
  private pool: Deno.Process[] = [];
  private maxWorkers: number;
  private taskQueue: Array<{task: string; resolve: (value: string) => void; reject: (reason?: any) => void}> = [];

  constructor(maxWorkers: number = 4) {
    this.maxWorkers = maxWorkers;
  }

  async execute(task: string): Promise<string> {
    if (this.pool.length < this.maxWorkers) {
      return this.createWorker(task);
    } else {
      return new Promise((resolve, reject) => {
        this.taskQueue.push({ task, resolve, reject });
      });
    }
  }

  private async createWorker(task: string): Promise<string> {
    const process = Deno.run({
      cmd: ["deno", "run", "--allow-read", "task_worker.ts", task],
      stdout: "piped",
      stderr: "piped"
    });

    const output = new TextDecoder().decode(await process.output());
    const error = new TextDecoder().decode(await process.stderrOutput());
    await process.status();
    process.close();

    if (error) throw new Error(error);
    return output;
  }

  // Actual implementation requires more complex queue management
}

Distributed Process Communication

TCP-Based Cross-Machine IPC:

// TCP server (main process)
const server = Deno.listen({ port: 8080 });
console.log("IPC Server listening on :8080");

for await (const conn of server) {
  const messages = [];
  const reader = new BufReader(conn);
  let line: string;

  while ((line = await reader.readLine()) !== null) {
    messages.push(line.toString());
    if (line.includes("EOF")) break;

    // Process message and respond
    const response = `Server processed: ${messages.join(" ")}`;
    await conn.write(new TextEncoder().encode(response + "\n"));
  }

  conn.close();
}

// TCP client (subprocess)
const conn = await Deno.connect({ port: 8080 });
await conn.write(new TextEncoder().encode("task1\n"));
await conn.write(new TextEncoder().encode("task2\n"));
await conn.write(new TextEncoder().encode("EOF\n"));

const response = new TextDecoder().decode(await conn.read(1024));
console.log("Server response:", response);
conn.close();

Secure IPC Channels

Permission-Isolated Process Communication:

// Secure IPC server
const secureServer = Deno.listen({ port: 8081 });
console.log("Secure IPC Server listening on :8081");

for await (const conn of secureServer) {
  // Validate client permissions (simplified example)
  const auth = await conn.read(1024);
  if (!isValidAuth(auth)) {
    conn.close();
    continue;
  }

  // Handle secure communication
  const messages = [];
  const reader = new BufReader(conn);
  let line: string;

  while ((line = await reader.readLine()) !== null) {
    messages.push(line.toString());
    if (line.includes("EOF")) break;
  }

  const response = processSecureMessage(messages);
  await conn.write(new TextEncoder().encode(response + "\n"));
  conn.close();
}

function isValidAuth(auth: Uint8Array): boolean {
  // Actual implementation requires stricter authentication logic
  return true;
}

Performance Optimization Strategies

Communication Protocol Optimization

Binary Protocol Design:

// Binary message format
interface BinaryMessage {
  type: number;    // Message type (1 byte)
  length: number;  // Data length (4 bytes)
  data: Uint8Array;// Actual data
}

// Encode message
function encodeMessage(type: number, data: Uint8Array): Uint8Array {
  const length = data.length;
  const buf = new Uint8Array(1 + 4 + length);
  buf[0] = type;
  new DataView(buf.buffer).setUint32(1, length, true);
  buf.set(data, 5);
  return buf;
}

// Decode message
function decodeMessage(buf: Uint8Array): BinaryMessage {
  return {
    type: buf[0],
    length: new DataView(buf.buffer).getUint32(1, true),
    data: buf.slice(5)
  };
}

Batching and Compression

Message Batching Implementation:

// Batch message collector
class MessageBatcher {
  private batch: Uint8Array[] = [];
  private maxSize: number;
  private maxWait: number;
  private timer: number | null = null;

  constructor(maxSize: number = 1024 * 1024, maxWait: number = 100) {
    this.maxSize = maxSize;
    this.maxWait = maxWait;
  }

  addMessage(message: Uint8Array): void {
    this.batch.push(message);
    if (this.batch.reduce((sum, m) => sum + m.length, 0) >= this.maxSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = window.setTimeout(() => this.flush(), this.maxWait) as unknown as number;
    }
  }

  private flush(): void {
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    if (this.batch.length === 0) return;

    const combined = this.combineMessages(this.batch);
    this.sendBatch(combined);
    this.batch = [];
  }

  private combineMessages(messages: Uint8Array[]): Uint8Array {
    // Implement message combination logic
    return new Uint8Array(); // Simplified example
  }

  private sendBatch(batch: Uint8Array): void {
    // Implement batch sending logic
  }
}

Connection Reuse

Persistent Connection Pool:

class ConnectionPool {
  private pool: Map<string, Deno.Conn> = new Map();
  private maxPoolSize: number;

  constructor(maxPoolSize: number = 10) {
    this.maxPoolSize = maxPoolSize;
  }

  async getConnection(key: string): Promise<Deno.Conn> {
    if (this.pool.has(key)) {
      return this.pool.get(key)!;
    }

    if (this.pool.size >= this.maxPoolSize) {
      await this.evictOldestConnection();
    }

    const conn = await this.createNewConnection(key);
    this.pool.set(key, conn);
    return conn;
  }

  private async createNewConnection(key: string): Promise<Deno.Conn> {
    // Implement connection creation logic
    return Deno.connect({ port: 8080 }); // Simplified example
  }

  private async evictOldestConnection(): Promise<void> {
    // Implement connection eviction logic
    const firstKey = this.pool.keys().next().value;
    if (firstKey) {
      const conn = this.pool.get(firstKey)!;
      conn.close();
      this.pool.delete(firstKey);
    }
  }
}

Debugging and Issue Diagnosis

IPC Debugging Tools

Communication Logging Middleware:

// IPC wrapper with logging
async function loggedCommunication(
  conn: Deno.Conn,
  message: Uint8Array
): Promise<Uint8Array> {
  console.log("Sending message:", new TextDecoder().decode(message));
  const startTime = performance.now();

  await conn.write(message);
  const response = new Uint8Array(1024);
  const n = await conn.read(response);
  const duration = performance.now() - startTime;

  console.log(`Received ${n} bytes in ${duration.toFixed(2)}ms`);
  console.log("Response:", new TextDecoder().decode(response.subarray(0, n)));

  return response.subarray(0, n);
}

Performance Analysis Tools

IPC Performance Metrics Collection:

class IpcMetrics {
  private metrics: {
    totalTime: number;
    messageCount: number;
    errorCount: number;
  } = { totalTime: 0, messageCount: 0, errorCount: 0 };

  recordMessage(duration: number, isError: boolean = false): void {
    this.metrics.totalTime += duration;
    this.metrics.messageCount++;
    if (isError) this.metrics.errorCount++;
  }

  getStats(): {
    avgLatency: number;
    errorRate: number;
  } {
    return {
      avgLatency: this.metrics.messageCount > 0 
        ? this.metrics.totalTime / this.metrics.messageCount 
        : 0,
      errorRate: this.metrics.messageCount > 0
        ? this.metrics.errorCount / this.metrics.messageCount
        : 0
    };
  }
}

Security Best Practices

Communication Security Hardening

Encrypted Communication Implementation:

// TLS-encrypted communication
const conn = await Deno.connectTls({
  hostname: "example.com",
  port: 443,
  certFile: "./cert.pem", // CA certificate
});

await conn.write(new TextEncoder().encode("secure message"));
const response = new TextDecoder().decode(await conn.read(1024));
conn.close();

Permission Control Strategies

Least Privilege Principle Application:

// Restricted subprocess example
const restrictedChild = Deno.run({
  cmd: ["deno", "run", "--allow-net=example.com", "child.ts"],
  // Allow access to specific domain only
});

// Restricted file system access
const fileWorker = Deno.run({
  cmd: ["deno", "run", "--allow-read=/data/input,/data/output", "file_processor.ts"],
  // Allow read/write to specific directories only
});

Input Validation

Secure Message Handling:

// Secure message parser
function parseSafeMessage(raw: Uint8Array): any {
  try {
    const text = new TextDecoder().decode(raw);
    const data = JSON.parse(text);

    // Validate message structure
    if (typeof data !== "object" || 
        !("type" in data) || 
        !("payload" in data)) {
      throw new Error("Invalid message format");
    }

    // Validate payload type
    if (typeof data.payload !== "string") {
      throw new Error("Payload must be string");
    }

    return data;
  } catch (err) {
    console.error("Message validation failed:", err);
    throw err;
  }
}
Share your love