Node.js 事件循环(Event Loop)
一、事件循环的作用与重要性
1.1 什么是事件循环
事件循环是Node实现非阻塞I/O操作的核心机制,它可以允许Node.js在单线程环境下处理大量并发操作
1.2 核心作用
非阻塞I/O处理:让单线程的JavaScript能够处理高并发请求
事件驱动架构:基于回调函数响应各种事件
资源高效利用:避免为每个连接创建新线程的开销
二、事件循环的阶段详解
2.1 事件循环的六个阶段
// 事件循环阶段示意图┌───────────────────────────┐
┌─>│ timers │ // 执行setTimeout和setInterval回调
│ └─────────────┬─────────────┘
│ ┌─────────────┴─────────────┐
│ │ pending callbacks │ // 执行系统操作的回调(如TCP错误)
│ └─────────────┬─────────────┘
│ ┌─────────────┴─────────────┐
│ │ idle, prepare │ // 内部使用
│ └─────────────┬─────────────┘
│ ┌─────────────┴─────────────┐
│ │ poll │ // 检索新的I/O事件,执行相关回调
│ └─────────────┬─────────────┘
│ ┌─────────────┴─────────────┐
│ │ check │ // 执行setImmediate回调
│ └─────────────┬─────────────┘
│ ┌─────────────┴─────────────┐
└──┤ close callbacks │ // 执行关闭事件的回调(如socket.close)└───────────────────────────┘
2.2 代码示例 -> 各阶段详细说明
const fs = require('fs');// 1. timers阶段
console.log('开始');setTimeout(() => {console.log('setTimeout - timers阶段');
}, 0);// 2. poll阶段
fs.readFile(__filename, () => {console.log('fs.readFile - poll阶段');// 3. check阶段setImmediate(() => {console.log('setImmediate - check阶段');});
});// 4. nextTick队列(微任务)
process.nextTick(() => {console.log('process.nextTick - 微任务');
});console.log('结束');
三、实际应用场景
3.1 高性能Web服务器
const http = require('http');const server = http.createServer((req, res) => {// 模拟异步数据库查询setImmediate(() => {// 在check阶段执行,避免阻塞其他请求const userData = { id: 1, name: '张三' };process.nextTick(() => {// 确保在发送响应前完成所有同步操作res.writeHead(200, { 'Content-Type': 'application/json' });res.end(JSON.stringify(userData));});});
});server.listen(3000, () => {console.log('服务器运行在端口3000');
});
3.2 批量数据处理
class BatchProcessor {constructor(batchSize = 100) {this.batchSize = batchSize;this.queue = [];this.processing = false;}add(data) {this.queue.push(data);if (!this.processing) {this.processing = true;// 使用setImmediate而不是setTimeout,更高效setImmediate(() => this.processBatch());}}processBatch() {const batch = this.queue.splice(0, this.batchSize);if (batch.length > 0) {// 处理当前批次this.processItems(batch);// 使用nextTick确保递归调用不会导致栈溢出process.nextTick(() => this.processBatch());} else {this.processing = false;}}processItems(items) {// 模拟数据处理items.forEach(item => {console.log('处理项目:', item);});}
}// 使用示例
const processor = new BatchProcessor();
for (let i = 0; i < 1000; i++) {processor.add(`数据-${i}`);
}
四、实战技巧与最佳实践
4.1 避免阻塞事件循环
// ❌ 错误示范:CPU密集型任务会阻塞事件循环
app.get('/slow', (req, res) => {let result = 0;for (let i = 0; i < 1000000000; i++) {result += i; // 这会阻塞事件循环}res.json({ result });
});// ✅ 正确做法:分解任务或使用工作线程
app.get('/fast', (req, res) => {// 使用setImmediate分解任务let result = 0;let i = 0;function calculateChunk() {for (let j = 0; j < 1000000; j++) {if (i >= 1000000000) {res.json({ result });return;}result += i;i++;}// 让出事件循环,处理其他任务setImmediate(calculateChunk);}calculateChunk();
});
4.2 合理的任务调度
class TaskScheduler {constructor() {this.tasks = [];this.isRunning = false;}addTask(task, priority = 'normal') {this.tasks.push({ task, priority });this.schedule();}schedule() {if (this.isRunning) return;this.isRunning = true;const execute = () => {if (this.tasks.length === 0) {this.isRunning = false;return;}// 按优先级排序this.tasks.sort((a, b) => {const priorityOrder = { high: 0, normal: 1, low: 2 };return priorityOrder[a.priority] - priorityOrder[b.priority];});const currentTask = this.tasks.shift().task;// 使用nextTick确保同步代码执行完毕process.nextTick(() => {try {currentTask();} catch (error) {console.error('任务执行错误:', error);}// 使用setImmediate让出控制权setImmediate(execute);});};execute();}
}// 使用示例
const scheduler = new TaskScheduler();
scheduler.addTask(() => console.log('高优先级任务'), 'high');
scheduler.addTask(() => console.log('普通任务'), 'normal');
4.3 错误处理与监控
// 监控事件循环延迟
class EventLoopMonitor {constructor() {this.lastTime = process.hrtime();this.delays = [];this.startMonitoring();}startMonitoring() {setInterval(() => {const diff = process.hrtime(this.lastTime);const nanosec = diff[0] * 1e9 + diff[1];const delay = nanosec - 1e9; // 期望1秒间隔if (delay > 0) {this.delays.push(delay);if (this.delays.length > 100) this.delays.shift();const avgDelay = this.delays.reduce((a, b) => a + b) / this.delays.length;if (avgDelay > 1e7) { // 10ms平均延迟console.warn('事件循环延迟警告:', Math.round(avgDelay / 1e6) + 'ms');}}this.lastTime = process.hrtime();}, 1000);}
}// 全局错误处理
process.on('uncaughtException', (error) => {console.error('未捕获的异常:', error);// 优雅关闭process.exit(1);
});process.on('unhandledRejection', (reason, promise) => {console.error('未处理的Promise拒绝:', reason);
});
五、高级应用场景
5.1 实时数据处理管道
class DataProcessingPipeline {constructor() {this.stages = [];this.buffer = [];this.processing = false;}addStage(stage) {this.stages.push(stage);}push(data) {this.buffer.push(data);this.process();}async process() {if (this.processing) return;this.processing = true;while (this.buffer.length > 0) {const data = this.buffer.shift();try {let result = data;for (const stage of this.stages) {// 每个阶段使用setImmediate避免阻塞await new Promise((resolve) => {setImmediate(async () => {result = await stage(result);resolve();});});}console.log('处理结果:', result);} catch (error) {console.error('处理错误:', error);}}this.processing = false;}
}// 使用示例
const pipeline = new DataProcessingPipeline();pipeline.addStage(async (data) => {// 数据验证阶段return { ...data, validated: true };
});pipeline.addStage(async (data) => {// 数据转换阶段return { ...data, transformed: true };
});// 推送数据
for (let i = 0; i < 10; i++) {pipeline.push({ id: i, value: Math.random() });
}
5.2 资源池管理
class ConnectionPool {constructor(maxConnections = 10) {this.maxConnections = maxConnections;this.activeConnections = 0;this.waitingQueue = [];}async getConnection() {return new Promise((resolve) => {const tryAcquire = () => {if (this.activeConnections < this.maxConnections) {this.activeConnections++;resolve({release: () => {this.activeConnections--;this.processQueue();}});} else {this.waitingQueue.push(tryAcquire);}};// 使用nextTick避免同步递归process.nextTick(tryAcquire);});}processQueue() {if (this.waitingQueue.length > 0 && this.activeConnections < this.maxConnections) {const nextRequest = this.waitingQueue.shift();setImmediate(nextRequest);}}
}// 使用示例
const pool = new ConnectionPool(3);async function useConnection(id) {const connection = await pool.getConnection();console.log(`连接 ${id} 获取成功`);// 模拟使用连接setTimeout(() => {connection.release();console.log(`连接 ${id} 已释放`);}, 1000);
}// 模拟并发请求
for (let i = 0; i < 10; i++) {useConnection(i);
}
六、性能优化技巧
6.1 选择合适的定时器
// 根据需求选择合适的调度方法
function scheduleTask(task, options = {}) {const { type = 'immediate', delay = 0 } = options;switch (type) {case 'nextTick':// 最高优先级,在当前操作完成后立即执行process.nextTick(task);break;case 'immediate':// 在check阶段执行,适合I/O操作后setImmediate(task);break;case 'timeout':// 在timers阶段执行,适合需要延迟的任务setTimeout(task, delay);break;default:setImmediate(task);}
}
6.2 批量操作优化
// 批量处理I/O操作
async function batchIOOperations(operations) {const results = [];const batchSize = 10;for (let i = 0; i < operations.length; i += batchSize) {const batch = operations.slice(i, i + batchSize);// 使用Promise.all并行处理,但通过setImmediate控制并发const batchResults = await new Promise((resolve) => {setImmediate(async () => {const promises = batch.map(op => op());resolve(await Promise.all(promises));});});results.push(...batchResults);// 让出事件循环,避免阻塞if (i + batchSize < operations.length) {await new Promise(resolve => setImmediate(resolve));}}return results;
}