Deno’s concurrency and parallelism model is built on modern JavaScript’s asynchronous features and the efficient scheduling capabilities of the Rust runtime. Understanding Deno’s concurrency mechanisms is crucial for building high-performance applications. This tutorial dives deep into Deno’s concurrency and parallelism programming patterns, demonstrating applications from basic to advanced through code examples.
Concurrency Fundamentals
Concurrency vs. Parallelism
Concurrency
Refers to a system’s ability to handle multiple tasks simultaneously, with tasks interleaving their execution. On a single-core CPU, this is achieved through time-slicing to create the effect of “simultaneous” execution.
Parallelism
Refers to a system’s ability to execute multiple tasks truly simultaneously, requiring multi-core CPU hardware support.
Manifestation in Deno
- Concurrency: Achieved through asynchronous I/O operations (e.g., network requests, file reads/writes).
- Parallelism: Implemented via Web Workers or multi-threading.
Deno’s Concurrency Model
Promise-Based Asynchronous Model
All asynchronous operations in Deno are based on Promises, following the event loop mechanism:
// Typical concurrent operation example
async function concurrentOperations() {
const promise1 = fetch("https://api.example.com/data1");
const promise2 = Deno.readTextFile("file.txt");
// Execute concurrently, wait for all to complete
const [data1, fileContent] = await Promise.all([promise1, promise2]);
console.log(data1, fileContent);
}Event Loop Mechanism
Deno uses V8’s event loop, which processes different types of asynchronous operations in multiple phases.
Concurrency Programming Patterns
Promise Concurrency Control
Basic Usage of Promise.all
// Execute multiple async tasks concurrently
async function fetchMultipleUrls(urls: string[]) {
const promises = urls.map(url => fetch(url).then(res => res.json()));
return Promise.all(promises);
}
// Usage example
const results = await fetchMultipleUrls([
"https://api.example.com/users",
"https://api.example.com/posts"
]);Application of Promise.allSettled
// Handle potentially failing tasks
async function fetchWithFallback(urls: string[]) {
const results = await Promise.allSettled(
urls.map(url => fetch(url).then(res => res.json()))
);
const successfulData = results
.filter(result => result.status === "fulfilled")
.map(result => (result as PromiseFulfilledResult<any>).value);
const errors = results
.filter(result => result.status === "rejected")
.map(result => (result as PromiseRejectedResult).reason);
return { successfulData, errors };
}Concurrency Limitation Patterns
Custom Concurrent Pool Implementation
class PromisePool {
private queue: Array<() => Promise<any>> = [];
private activeCount = 0;
constructor(private maxConcurrent: number) {}
add(task: () => Promise<any>) {
return new Promise((resolve, reject) => {
const wrappedTask = async () => {
try {
this.activeCount++;
const result = await task();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.activeCount--;
this.next();
}
};
this.queue.push(wrappedTask);
this.next();
});
}
private next() {
if (this.activeCount < this.maxConcurrent && this.queue.length > 0) {
const task = this.queue.shift()!;
task();
}
}
}
// Usage example
const pool = new PromisePool(3); // Max concurrency of 3
const urls = Array(10).fill("https://api.example.com/data");
await Promise.all(
urls.map(url => pool.add(() => fetch(url).then(res => res.json())))
);Priority-Based Concurrency Control
class PriorityPromisePool {
private highPriorityQueue: Array<() => Promise<any>> = [];
private lowPriorityQueue: Array<() => Promise<any>> = [];
private activeCount = 0;
constructor(private maxConcurrent: number) {}
addHighPriority(task: () => Promise<any>) {
return this.addTask(task, this.highPriorityQueue);
}
addLowPriority(task: () => Promise<any>) {
return this.addTask(task, this.lowPriorityQueue);
}
private addTask(task: () => Promise<any>, queue: Array<() => Promise<any>>) {
return new Promise((resolve, reject) => {
const wrappedTask = async () => {
try {
this.activeCount++;
const result = await task();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.activeCount--;
this.next();
}
};
queue.push(wrappedTask);
this.next();
});
}
private next() {
if (this.activeCount < this.maxConcurrent) {
if (this.highPriorityQueue.length > 0) {
const task = this.highPriorityQueue.shift()!;
task();
} else if (this.lowPriorityQueue.length > 0) {
const task = this.lowPriorityQueue.shift()!;
task();
}
}
}
}Asynchronous Iteration and Concurrency
Concurrent Processing of Async Iterators
async function* asyncDataGenerator() {
for (let i = 0; i < 10; i++) {
// Simulate async data fetching
await new Promise(resolve => setTimeout(resolve, 100));
yield i;
}
}
async function processConcurrently() {
const concurrency = 3;
const iterator = asyncDataGenerator();
const workers = Array(concurrency).fill(null).map(async () => {
for await (const item of iterator) {
console.log(`Processing item: ${item}`);
// Simulate processing
await new Promise(resolve => setTimeout(resolve, 200));
}
});
await Promise.all(workers);
}
processConcurrently();Batch Concurrent Processing
async function batchProcess<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
batchSize: number
): Promise<R[]> {
const results: R[] = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(item => processor(item));
const batchResults = await Promise.all(batchPromises);
results.push(...batchResults);
}
return results;
}
// Usage example
const items = Array(100).fill(0).map((_, i) => i);
const results = await batchProcess(
items,
async (item) => {
// Simulate async processing
await new Promise(resolve => setTimeout(resolve, 100));
return item * 2;
},
5 // Process 5 items per batch
);Parallel Programming Patterns
Web Workers Basics
Creating and Using Web Workers
// main.ts
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
deno: { namespace: true }
});
// Send message to Worker
worker.postMessage({ type: "calculate", data: [1, 2, 3, 4, 5] });
// Receive message from Worker
worker.onmessage = (e) => {
console.log("Worker result:", e.data);
};
// Handle Worker errors
worker.onerror = (e) => {
console.error("Worker error:", e.message);
};
// worker.ts
self.onmessage = async (e) => {
if (e.data.type === "calculate") {
// Simulate time-consuming computation
await new Promise(resolve => setTimeout(resolve, 1000));
const result = e.data.data.reduce((sum: number, num: number) => sum + num, 0);
self.postMessage({ result });
}
};Shared Memory and Atomics
// main.ts
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
const worker = new Worker(new URL("./worker_shared.ts", import.meta.url).href, {
type: "module",
deno: { namespace: true }
});
worker.postMessage({ sharedBuffer });
// worker_shared.ts
self.onmessage = (e) => {
const sharedArray = new Int32Array(e.data.sharedBuffer);
// Perform thread-safe operations with Atomics
Atomics.add(sharedArray, 0, 1);
console.log("Worker updated shared value:", Atomics.load(sharedArray, 0));
};Parallel Computing Patterns
Parallel Data Processing Example
// parallel_processor.ts
interface Task {
id: number;
data: number[];
}
self.onmessage = (e) => {
const tasks: Task[] = e.data.tasks;
const results: number[] = [];
// Process each task in parallel
for (const task of tasks) {
const result = processTask(task);
results.push(result);
}
self.postMessage(results);
};
function processTask(task: Task): number {
// Simulate CPU-intensive computation
return task.data.reduce((sum, num) => sum + num * num, 0);
}
// main.ts
const tasks: Task[] = Array(4).fill(0).map((_, i) => ({
id: i,
data: Array(1000000).fill(0).map(() => Math.random())
}));
const workerCount = 4;
const workers = tasks.map((task, i) => {
const worker = new Worker(new URL("./parallel_processor.ts", import.meta.url).href, {
type: "module",
deno: { namespace: true }
});
return new Promise<number[]>((resolve) => {
worker.onmessage = (e) => resolve(e.data);
worker.postMessage({ tasks: [task] });
});
});
const allResults = await Promise.all(workers);
console.log("All results:", allResults.flat());Parallel File Processing
Parallel File Reading and Processing
// parallel_file_processor.ts
async function processFile(filePath: string) {
const file = await Deno.open(filePath, { read: true });
const content = await Deno.readTextFile(filePath);
// Simulate file processing
await new Promise(resolve => setTimeout(resolve, 100));
file.close();
return `Processed: ${filePath} (${content.length} bytes)`;
}
// main.ts
const files = Array(10).fill(0).map((_, i) => `file_${i}.txt`);
// Create test files
await Promise.all(
files.map((file, i) =>
Deno.writeTextFile(file, `Content of file ${i}`)
)
);
// Process files in parallel
const concurrency = 3;
const pool = new PromisePool(concurrency);
const results = await Promise.all(
files.map(file => pool.add(() => processFile(file)))
);
console.log("All files processed:", results);
// Clean up test files
await Promise.all(
files.map(file => Deno.remove(file))
);Advanced Concurrency Patterns
Publish-Subscribe Pattern
Promise-Based Publish-Subscribe Implementation
class AsyncEventEmitter {
private events: Map<string, Array<(data: any) => Promise<void>>> = new Map();
subscribe(event: string, callback: (data: any) => Promise<void>) {
if (!this.events.has(event)) {
this.events.set(event, []);
}
this.events.get(event)?.push(callback);
}
async publish(event: string, data: any) {
const callbacks = this.events.get(event);
if (callbacks) {
// Execute all subscribers concurrently
await Promise.all(
callbacks.map(callback => callback(data))
);
}
}
}
// Usage example
const emitter = new AsyncEventEmitter();
emitter.subscribe("data", async (data) => {
console.log("Subscriber 1 processing:", data);
await new Promise(resolve => setTimeout(resolve, 100));
});
emitter.subscribe("data", async (data) => {
console.log("Subscriber 2 processing:", data);
await new Promise(resolve => setTimeout(resolve, 200));
});
// Publish event (execute all subscribers concurrently)
await emitter.publish("data", { message: "Hello World" });Work Queue Pattern
Task Queue Implementation
class TaskQueue {
private queue: Array<() => Promise<any>> = [];
private workers: Array<Promise<void>> = [];
private concurrency: number;
constructor(concurrency: number) {
this.concurrency = concurrency;
this.startWorkers();
}
addTask(task: () => Promise<any>) {
this.queue.push(task);
}
private startWorkers() {
for (let i = 0; i < this.concurrency; i++) {
this.workers.push(this.worker());
}
}
private async worker() {
while (true) {
if (this.queue.length === 0) {
await new Promise(resolve => setTimeout(resolve, 100));
continue;
}
const task = this.queue.shift()!;
try {
await task();
} catch (error) {
console.error("Task failed:", error);
}
}
}
}
// Usage example
const queue = new TaskQueue(3); // 3 worker threads
// Add tasks
for (let i = 0; i < 10; i++) {
queue.addTask(async () => {
console.log(`Task ${i} started`);
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`Task ${i} completed`);
});
}Advanced Concurrency Control Patterns
Leaky Bucket Algorithm Implementation
class LeakyBucket {
private queue: Array<() => Promise<any>> = [];
private processing = false;
private intervalId: number;
constructor(
private capacity: number,
private leakRate: number // Tasks per second
) {
this.intervalId = setInterval(() => this.leak(), 1000 / this.leakRate);
}
add(task: () => Promise<any>) {
return new Promise((resolve, reject) => {
const wrappedTask = async () => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
};
this.queue.push(wrappedTask);
this.process();
});
}
private process() {
if (this.queue.length === 0 || this.processing) return;
if (this.queue.length <= this.capacity) {
this.processing = true;
const task = this.queue.shift()!;
task().finally(() => {
this.processing = false;
this.process();
});
}
}
private leak() {
// Periodically clear queue (controlled by leakRate)
if (this.queue.length > this.capacity) {
const excess = this.queue.length - this.capacity;
this.queue.splice(0, excess);
}
}
destroy() {
clearInterval(this.intervalId);
}
}
// Usage example
const bucket = new LeakyBucket(5, 2); // Capacity 5, process 2 tasks per second
// Add tasks
for (let i = 0; i < 20; i++) {
bucket.add(async () => {
console.log(`Task ${i} executed at ${new Date().toISOString()}`);
await new Promise(resolve => setTimeout(resolve, 500));
});
}Performance Optimization and Debugging
Concurrency Performance Analysis
Performance Measurement Tools
// Performance measurement decorator
function measureConcurrency<T extends (...args: any[]) => Promise<any>>(
fn: T,
name: string
): (...args: Parameters<T>) => ReturnType<T> {
return async function(...args: Parameters<T>): Promise<ReturnType<T>> {
const start = performance.now();
const concurrencyStart = performance.now();
try {
const result = await fn(...args);
const duration = performance.now() - start;
// Calculate concurrency efficiency
const concurrencyDuration = performance.now() - concurrencyStart;
const efficiency = duration / concurrencyDuration;
console.log(`${name} executed in ${duration.toFixed(2)}ms`);
console.log(`Concurrency efficiency: ${efficiency.toFixed(2)}`);
return result;
} catch (error) {
console.error(`${name} failed:`, error);
throw error;
}
};
}
// Usage example
const measuredFetch = measureConcurrency(
async (url: string) => {
const response = await fetch(url);
return response.json();
},
"fetchData"
);
await measuredFetch("https://api.example.com/data");Concurrency Debugging Techniques
Diagnosing Concurrency Issues
// Concurrency debugging tool
class ConcurrencyDebugger {
private activeTasks: Map<string, { start: number; stack: string }> = new Map();
track(taskName: string, task: () => Promise<any>): Promise<any> {
const stack = new Error().stack || "";
this.activeTasks.set(taskName, {
start: performance.now(),
stack
});
return task().finally(() => {
const duration = performance.now() - this.activeTasks.get(taskName)!.start;
console.log(`Task ${taskName} completed in ${duration.toFixed(2)}ms`);
this.activeTasks.delete(taskName);
});
}
printActiveTasks() {
console.log("Active tasks:");
this.activeTasks.forEach((value, key) => {
console.log(`- ${key}: ${((performance.now() - value.start) / 1000).toFixed(2)}s`);
console.log(value.stack);
});
}
}
// Usage example
const debuggerInstance = new ConcurrencyDebugger();
async function problematicTask() {
await debuggerInstance.track("problematic", async () => {
// Simulate problematic task
await new Promise(resolve => setTimeout(resolve, 1000));
await debuggerInstance.track("nested", async () => {
// Nested task
await new Promise(resolve => setTimeout(resolve, 500));
});
});
}
// Run and check concurrency status
problematicTask().catch(console.error);
setTimeout(() => debuggerInstance.printActiveTasks(), 1500);Practical Case Studies
High-Performance Web Crawler
// advanced_crawler.ts
class ConcurrentCrawler {
private visited = new Set<string>();
private queue: Array<{ url: string; depth: number }> = [];
private pool: PromisePool;
private maxDepth: number;
constructor(
startUrl: string,
maxConcurrency: number,
maxDepth: number = 3
) {
this.queue.push({ url: startUrl, depth: 0 });
this.pool = new PromisePool(maxConcurrency);
this.maxDepth = maxDepth;
}
async crawl() {
while (this.queue.length > 0) {
const { url, depth } = this.queue.shift()!;
if (depth > this.maxDepth || this.visited.has(url)) continue;
this.visited.add(url);
await this.pool.add(async () => {
try {
console.log("Crawling:", url);
const response = await fetch(url);
const html = await response.text();
// Parse links (simplified example)
const links = this.extractLinks(html);
for (const link of links) {
if (!this.visited.has(link)) {
this.queue.push({ url: link, depth: depth + 1 });
}
}
// Simulate data processing
await new Promise(resolve => setTimeout(resolve, 100));
} catch (error) {
console.error("Failed to crawl:", url, error);
}
});
}
}
private extractLinks(html: string): string[] {
// Actual implementation requires an HTML parser
return [];
}
}
// Usage example
const crawler = new ConcurrentCrawler("https://example.com", 5, 2);
await crawler.crawl();Real-Time Data Processing Pipeline
// data_pipeline.ts
class ParallelDataPipeline {
private workers: Array<Worker> = [];
private inputQueue: any[] = [];
private outputQueue: any[] = [];
private workerCount: number;
constructor(
workerScript: string,
workerCount: number = navigator.hardwareConcurrency || 4
) {
this.workerCount = workerCount;
this.initWorkers(workerScript);
}
private initWorkers(workerScript: string) {
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker(new URL(workerScript, import.meta.url).href, {
type: "module",
deno: { namespace: true }
});
worker.onmessage = (e) => {
this.outputQueue.push(e.data);
};
worker.onerror = (e) => {
console.error("Worker error:", e.message);
};
this.workers.push(worker);
}
}
async process(data: any[]) {
// Distribute data to worker threads
const chunkSize = Math.ceil(data.length / this.workerCount);
const chunks = Array.from(
{ length: this.workerCount },
(_, i) => data.slice(i * chunkSize, (i + 1) * chunkSize)
);
// Process in parallel
const promises = chunks.map((chunk, i) => {
return new Promise<void>((resolve) => {
this.workers[i].onmessage = (e) => {
resolve();
};
this.workers[i].postMessage(chunk);
});
});
await Promise.all(promises);
// Collect results
return this.outputQueue;
}
terminate() {
this.workers.forEach(worker => worker.terminate());
}
}
// Usage example
// data_processor.ts
self.onmessage = async (e) => {
const data = e.data;
const results = data.map((item: { id: number; value: number }) => ({
id: item.id,
processedValue: item.value * 2,
}));
self.postMessage(results);
};
// main.ts
const pipeline = new ParallelDataPipeline("./data_processor.ts", 4);
const data = Array(1000).fill(0).map((_, i) => ({ id: i, value: Math.random() }));
const results = await pipeline.process(data);
console.log("Processing complete:", results.length);
pipeline.terminate();



