Lesson 22-Deno Concurrency and Parallelism

Deno’s concurrency and parallelism model is built on modern JavaScript’s asynchronous features and the efficient scheduling capabilities of the Rust runtime. Understanding Deno’s concurrency mechanisms is crucial for building high-performance applications. This tutorial dives deep into Deno’s concurrency and parallelism programming patterns, demonstrating applications from basic to advanced through code examples.

Concurrency Fundamentals

Concurrency vs. Parallelism

Concurrency
Refers to a system’s ability to handle multiple tasks simultaneously, with tasks interleaving their execution. On a single-core CPU, this is achieved through time-slicing to create the effect of “simultaneous” execution.

Parallelism
Refers to a system’s ability to execute multiple tasks truly simultaneously, requiring multi-core CPU hardware support.

Manifestation in Deno

  • Concurrency: Achieved through asynchronous I/O operations (e.g., network requests, file reads/writes).
  • Parallelism: Implemented via Web Workers or multi-threading.

Deno’s Concurrency Model

Promise-Based Asynchronous Model
All asynchronous operations in Deno are based on Promises, following the event loop mechanism:

// Typical concurrent operation example
async function concurrentOperations() {
  const promise1 = fetch("https://api.example.com/data1");
  const promise2 = Deno.readTextFile("file.txt");

  // Execute concurrently, wait for all to complete
  const [data1, fileContent] = await Promise.all([promise1, promise2]);
  console.log(data1, fileContent);
}

Event Loop Mechanism
Deno uses V8’s event loop, which processes different types of asynchronous operations in multiple phases.

Concurrency Programming Patterns

Promise Concurrency Control

Basic Usage of Promise.all

// Execute multiple async tasks concurrently
async function fetchMultipleUrls(urls: string[]) {
  const promises = urls.map(url => fetch(url).then(res => res.json()));
  return Promise.all(promises);
}

// Usage example
const results = await fetchMultipleUrls([
  "https://api.example.com/users",
  "https://api.example.com/posts"
]);

Application of Promise.allSettled

// Handle potentially failing tasks
async function fetchWithFallback(urls: string[]) {
  const results = await Promise.allSettled(
    urls.map(url => fetch(url).then(res => res.json()))
  );

  const successfulData = results
    .filter(result => result.status === "fulfilled")
    .map(result => (result as PromiseFulfilledResult<any>).value);

  const errors = results
    .filter(result => result.status === "rejected")
    .map(result => (result as PromiseRejectedResult).reason);

  return { successfulData, errors };
}

Concurrency Limitation Patterns

Custom Concurrent Pool Implementation

class PromisePool {
  private queue: Array<() => Promise<any>> = [];
  private activeCount = 0;

  constructor(private maxConcurrent: number) {}

  add(task: () => Promise<any>) {
    return new Promise((resolve, reject) => {
      const wrappedTask = async () => {
        try {
          this.activeCount++;
          const result = await task();
          resolve(result);
        } catch (error) {
          reject(error);
        } finally {
          this.activeCount--;
          this.next();
        }
      };

      this.queue.push(wrappedTask);
      this.next();
    });
  }

  private next() {
    if (this.activeCount < this.maxConcurrent && this.queue.length > 0) {
      const task = this.queue.shift()!;
      task();
    }
  }
}

// Usage example
const pool = new PromisePool(3); // Max concurrency of 3
const urls = Array(10).fill("https://api.example.com/data");

await Promise.all(
  urls.map(url => pool.add(() => fetch(url).then(res => res.json())))
);

Priority-Based Concurrency Control

class PriorityPromisePool {
  private highPriorityQueue: Array<() => Promise<any>> = [];
  private lowPriorityQueue: Array<() => Promise<any>> = [];
  private activeCount = 0;

  constructor(private maxConcurrent: number) {}

  addHighPriority(task: () => Promise<any>) {
    return this.addTask(task, this.highPriorityQueue);
  }

  addLowPriority(task: () => Promise<any>) {
    return this.addTask(task, this.lowPriorityQueue);
  }

  private addTask(task: () => Promise<any>, queue: Array<() => Promise<any>>) {
    return new Promise((resolve, reject) => {
      const wrappedTask = async () => {
        try {
          this.activeCount++;
          const result = await task();
          resolve(result);
        } catch (error) {
          reject(error);
        } finally {
          this.activeCount--;
          this.next();
        }
      };

      queue.push(wrappedTask);
      this.next();
    });
  }

  private next() {
    if (this.activeCount < this.maxConcurrent) {
      if (this.highPriorityQueue.length > 0) {
        const task = this.highPriorityQueue.shift()!;
        task();
      } else if (this.lowPriorityQueue.length > 0) {
        const task = this.lowPriorityQueue.shift()!;
        task();
      }
    }
  }
}

Asynchronous Iteration and Concurrency

Concurrent Processing of Async Iterators

async function* asyncDataGenerator() {
  for (let i = 0; i < 10; i++) {
    // Simulate async data fetching
    await new Promise(resolve => setTimeout(resolve, 100));
    yield i;
  }
}

async function processConcurrently() {
  const concurrency = 3;
  const iterator = asyncDataGenerator();
  const workers = Array(concurrency).fill(null).map(async () => {
    for await (const item of iterator) {
      console.log(`Processing item: ${item}`);
      // Simulate processing
      await new Promise(resolve => setTimeout(resolve, 200));
    }
  });

  await Promise.all(workers);
}

processConcurrently();

Batch Concurrent Processing

async function batchProcess<T, R>(
  items: T[],
  processor: (item: T) => Promise<R>,
  batchSize: number
): Promise<R[]> {
  const results: R[] = [];

  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    const batchPromises = batch.map(item => processor(item));
    const batchResults = await Promise.all(batchPromises);
    results.push(...batchResults);
  }

  return results;
}

// Usage example
const items = Array(100).fill(0).map((_, i) => i);
const results = await batchProcess(
  items,
  async (item) => {
    // Simulate async processing
    await new Promise(resolve => setTimeout(resolve, 100));
    return item * 2;
  },
  5 // Process 5 items per batch
);

Parallel Programming Patterns

Web Workers Basics

Creating and Using Web Workers

// main.ts
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
  type: "module",
  deno: { namespace: true }
});

// Send message to Worker
worker.postMessage({ type: "calculate", data: [1, 2, 3, 4, 5] });

// Receive message from Worker
worker.onmessage = (e) => {
  console.log("Worker result:", e.data);
};

// Handle Worker errors
worker.onerror = (e) => {
  console.error("Worker error:", e.message);
};

// worker.ts
self.onmessage = async (e) => {
  if (e.data.type === "calculate") {
    // Simulate time-consuming computation
    await new Promise(resolve => setTimeout(resolve, 1000));
    const result = e.data.data.reduce((sum: number, num: number) => sum + num, 0);
    self.postMessage({ result });
  }
};

Shared Memory and Atomics

// main.ts
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);

const worker = new Worker(new URL("./worker_shared.ts", import.meta.url).href, {
  type: "module",
  deno: { namespace: true }
});

worker.postMessage({ sharedBuffer });

// worker_shared.ts
self.onmessage = (e) => {
  const sharedArray = new Int32Array(e.data.sharedBuffer);

  // Perform thread-safe operations with Atomics
  Atomics.add(sharedArray, 0, 1);
  console.log("Worker updated shared value:", Atomics.load(sharedArray, 0));
};

Parallel Computing Patterns

Parallel Data Processing Example

// parallel_processor.ts
interface Task {
  id: number;
  data: number[];
}

self.onmessage = (e) => {
  const tasks: Task[] = e.data.tasks;
  const results: number[] = [];

  // Process each task in parallel
  for (const task of tasks) {
    const result = processTask(task);
    results.push(result);
  }

  self.postMessage(results);
};

function processTask(task: Task): number {
  // Simulate CPU-intensive computation
  return task.data.reduce((sum, num) => sum + num * num, 0);
}

// main.ts
const tasks: Task[] = Array(4).fill(0).map((_, i) => ({
  id: i,
  data: Array(1000000).fill(0).map(() => Math.random())
}));

const workerCount = 4;
const workers = tasks.map((task, i) => {
  const worker = new Worker(new URL("./parallel_processor.ts", import.meta.url).href, {
    type: "module",
    deno: { namespace: true }
  });

  return new Promise<number[]>((resolve) => {
    worker.onmessage = (e) => resolve(e.data);
    worker.postMessage({ tasks: [task] });
  });
});

const allResults = await Promise.all(workers);
console.log("All results:", allResults.flat());

Parallel File Processing

Parallel File Reading and Processing

// parallel_file_processor.ts
async function processFile(filePath: string) {
  const file = await Deno.open(filePath, { read: true });
  const content = await Deno.readTextFile(filePath);
  // Simulate file processing
  await new Promise(resolve => setTimeout(resolve, 100));
  file.close();
  return `Processed: ${filePath} (${content.length} bytes)`;
}

// main.ts
const files = Array(10).fill(0).map((_, i) => `file_${i}.txt`);

// Create test files
await Promise.all(
  files.map((file, i) => 
    Deno.writeTextFile(file, `Content of file ${i}`)
  )
);

// Process files in parallel
const concurrency = 3;
const pool = new PromisePool(concurrency);

const results = await Promise.all(
  files.map(file => pool.add(() => processFile(file)))
);

console.log("All files processed:", results);

// Clean up test files
await Promise.all(
  files.map(file => Deno.remove(file))
);

Advanced Concurrency Patterns

Publish-Subscribe Pattern

Promise-Based Publish-Subscribe Implementation

class AsyncEventEmitter {
  private events: Map<string, Array<(data: any) => Promise<void>>> = new Map();

  subscribe(event: string, callback: (data: any) => Promise<void>) {
    if (!this.events.has(event)) {
      this.events.set(event, []);
    }
    this.events.get(event)?.push(callback);
  }

  async publish(event: string, data: any) {
    const callbacks = this.events.get(event);
    if (callbacks) {
      // Execute all subscribers concurrently
      await Promise.all(
        callbacks.map(callback => callback(data))
      );
    }
  }
}

// Usage example
const emitter = new AsyncEventEmitter();

emitter.subscribe("data", async (data) => {
  console.log("Subscriber 1 processing:", data);
  await new Promise(resolve => setTimeout(resolve, 100));
});

emitter.subscribe("data", async (data) => {
  console.log("Subscriber 2 processing:", data);
  await new Promise(resolve => setTimeout(resolve, 200));
});

// Publish event (execute all subscribers concurrently)
await emitter.publish("data", { message: "Hello World" });

Work Queue Pattern

Task Queue Implementation

class TaskQueue {
  private queue: Array<() => Promise<any>> = [];
  private workers: Array<Promise<void>> = [];
  private concurrency: number;

  constructor(concurrency: number) {
    this.concurrency = concurrency;
    this.startWorkers();
  }

  addTask(task: () => Promise<any>) {
    this.queue.push(task);
  }

  private startWorkers() {
    for (let i = 0; i < this.concurrency; i++) {
      this.workers.push(this.worker());
    }
  }

  private async worker() {
    while (true) {
      if (this.queue.length === 0) {
        await new Promise(resolve => setTimeout(resolve, 100));
        continue;
      }

      const task = this.queue.shift()!;
      try {
        await task();
      } catch (error) {
        console.error("Task failed:", error);
      }
    }
  }
}

// Usage example
const queue = new TaskQueue(3); // 3 worker threads

// Add tasks
for (let i = 0; i < 10; i++) {
  queue.addTask(async () => {
    console.log(`Task ${i} started`);
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`Task ${i} completed`);
  });
}

Advanced Concurrency Control Patterns

Leaky Bucket Algorithm Implementation

class LeakyBucket {
  private queue: Array<() => Promise<any>> = [];
  private processing = false;
  private intervalId: number;

  constructor(
    private capacity: number,
    private leakRate: number // Tasks per second
  ) {
    this.intervalId = setInterval(() => this.leak(), 1000 / this.leakRate);
  }

  add(task: () => Promise<any>) {
    return new Promise((resolve, reject) => {
      const wrappedTask = async () => {
        try {
          const result = await task();
          resolve(result);
        } catch (error) {
          reject(error);
        }
      };

      this.queue.push(wrappedTask);
      this.process();
    });
  }

  private process() {
    if (this.queue.length === 0 || this.processing) return;

    if (this.queue.length <= this.capacity) {
      this.processing = true;
      const task = this.queue.shift()!;
      task().finally(() => {
        this.processing = false;
        this.process();
      });
    }
  }

  private leak() {
    // Periodically clear queue (controlled by leakRate)
    if (this.queue.length > this.capacity) {
      const excess = this.queue.length - this.capacity;
      this.queue.splice(0, excess);
    }
  }

  destroy() {
    clearInterval(this.intervalId);
  }
}

// Usage example
const bucket = new LeakyBucket(5, 2); // Capacity 5, process 2 tasks per second

// Add tasks
for (let i = 0; i < 20; i++) {
  bucket.add(async () => {
    console.log(`Task ${i} executed at ${new Date().toISOString()}`);
    await new Promise(resolve => setTimeout(resolve, 500));
  });
}

Performance Optimization and Debugging

Concurrency Performance Analysis

Performance Measurement Tools

// Performance measurement decorator
function measureConcurrency<T extends (...args: any[]) => Promise<any>>(
  fn: T,
  name: string
): (...args: Parameters<T>) => ReturnType<T> {
  return async function(...args: Parameters<T>): Promise<ReturnType<T>> {
    const start = performance.now();
    const concurrencyStart = performance.now();

    try {
      const result = await fn(...args);
      const duration = performance.now() - start;

      // Calculate concurrency efficiency
      const concurrencyDuration = performance.now() - concurrencyStart;
      const efficiency = duration / concurrencyDuration;

      console.log(`${name} executed in ${duration.toFixed(2)}ms`);
      console.log(`Concurrency efficiency: ${efficiency.toFixed(2)}`);

      return result;
    } catch (error) {
      console.error(`${name} failed:`, error);
      throw error;
    }
  };
}

// Usage example
const measuredFetch = measureConcurrency(
  async (url: string) => {
    const response = await fetch(url);
    return response.json();
  },
  "fetchData"
);

await measuredFetch("https://api.example.com/data");

Concurrency Debugging Techniques

Diagnosing Concurrency Issues

// Concurrency debugging tool
class ConcurrencyDebugger {
  private activeTasks: Map<string, { start: number; stack: string }> = new Map();

  track(taskName: string, task: () => Promise<any>): Promise<any> {
    const stack = new Error().stack || "";

    this.activeTasks.set(taskName, {
      start: performance.now(),
      stack
    });

    return task().finally(() => {
      const duration = performance.now() - this.activeTasks.get(taskName)!.start;
      console.log(`Task ${taskName} completed in ${duration.toFixed(2)}ms`);
      this.activeTasks.delete(taskName);
    });
  }

  printActiveTasks() {
    console.log("Active tasks:");
    this.activeTasks.forEach((value, key) => {
      console.log(`- ${key}: ${((performance.now() - value.start) / 1000).toFixed(2)}s`);
      console.log(value.stack);
    });
  }
}

// Usage example
const debuggerInstance = new ConcurrencyDebugger();

async function problematicTask() {
  await debuggerInstance.track("problematic", async () => {
    // Simulate problematic task
    await new Promise(resolve => setTimeout(resolve, 1000));
    await debuggerInstance.track("nested", async () => {
      // Nested task
      await new Promise(resolve => setTimeout(resolve, 500));
    });
  });
}

// Run and check concurrency status
problematicTask().catch(console.error);
setTimeout(() => debuggerInstance.printActiveTasks(), 1500);

Practical Case Studies

High-Performance Web Crawler

// advanced_crawler.ts
class ConcurrentCrawler {
  private visited = new Set<string>();
  private queue: Array<{ url: string; depth: number }> = [];
  private pool: PromisePool;
  private maxDepth: number;

  constructor(
    startUrl: string,
    maxConcurrency: number,
    maxDepth: number = 3
  ) {
    this.queue.push({ url: startUrl, depth: 0 });
    this.pool = new PromisePool(maxConcurrency);
    this.maxDepth = maxDepth;
  }

  async crawl() {
    while (this.queue.length > 0) {
      const { url, depth } = this.queue.shift()!;
      if (depth > this.maxDepth || this.visited.has(url)) continue;

      this.visited.add(url);
      await this.pool.add(async () => {
        try {
          console.log("Crawling:", url);
          const response = await fetch(url);
          const html = await response.text();

          // Parse links (simplified example)
          const links = this.extractLinks(html);
          for (const link of links) {
            if (!this.visited.has(link)) {
              this.queue.push({ url: link, depth: depth + 1 });
            }
          }

          // Simulate data processing
          await new Promise(resolve => setTimeout(resolve, 100));
        } catch (error) {
          console.error("Failed to crawl:", url, error);
        }
      });
    }
  }

  private extractLinks(html: string): string[] {
    // Actual implementation requires an HTML parser
    return [];
  }
}

// Usage example
const crawler = new ConcurrentCrawler("https://example.com", 5, 2);
await crawler.crawl();

Real-Time Data Processing Pipeline

// data_pipeline.ts
class ParallelDataPipeline {
  private workers: Array<Worker> = [];
  private inputQueue: any[] = [];
  private outputQueue: any[] = [];
  private workerCount: number;

  constructor(
    workerScript: string,
    workerCount: number = navigator.hardwareConcurrency || 4
  ) {
    this.workerCount = workerCount;
    this.initWorkers(workerScript);
  }

  private initWorkers(workerScript: string) {
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(new URL(workerScript, import.meta.url).href, {
        type: "module",
        deno: { namespace: true }
      });

      worker.onmessage = (e) => {
        this.outputQueue.push(e.data);
      };

      worker.onerror = (e) => {
        console.error("Worker error:", e.message);
      };

      this.workers.push(worker);
    }
  }

  async process(data: any[]) {
    // Distribute data to worker threads
    const chunkSize = Math.ceil(data.length / this.workerCount);
    const chunks = Array.from(
      { length: this.workerCount },
      (_, i) => data.slice(i * chunkSize, (i + 1) * chunkSize)
    );

    // Process in parallel
    const promises = chunks.map((chunk, i) => {
      return new Promise<void>((resolve) => {
        this.workers[i].onmessage = (e) => {
          resolve();
        };
        this.workers[i].postMessage(chunk);
      });
    });

    await Promise.all(promises);

    // Collect results
    return this.outputQueue;
  }

  terminate() {
    this.workers.forEach(worker => worker.terminate());
  }
}

// Usage example
// data_processor.ts
self.onmessage = async (e) => {
  const data = e.data;
  const results = data.map((item: { id: number; value: number }) => ({
    id: item.id,
    processedValue: item.value * 2,
  }));
  self.postMessage(results);
};

// main.ts
const pipeline = new ParallelDataPipeline("./data_processor.ts", 4);
const data = Array(1000).fill(0).map((_, i) => ({ id: i, value: Math.random() }));
const results = await pipeline.process(data);
console.log("Processing complete:", results.length);
pipeline.terminate();
Share your love