【Node.js】高级主题
个人主页:Guiat
归属专栏:node.js
文章目录
- 1. Node.js 高级主题概览
- 1.1 高级主题架构图
- 2. 事件循环与异步编程深度解析
- 2.1 事件循环机制详解
- 事件循环阶段详解
- 2.2 异步编程模式演进
- 高级异步模式实现
- 3. 内存管理与性能优化
- 3.1 V8 内存管理机制
- 内存监控与分析工具
- 3.2 性能优化策略
- 性能分析工具实现
- 4. 微服务架构与设计模式
- 4.1 微服务架构模式
- 微服务基础框架实现
- 4.2 设计模式在 Node.js 中的应用
- 高级设计模式实现
- 5. 实时通信与 WebSocket
- 5.1 实时通信架构
- WebSocket 服务器实现
- 5.2 实时数据同步系统
- 实时协作编辑器实现
- 6. 安全与认证系统
- 6.1 认证与授权架构
- 高级认证系统实现
- 7. 数据处理与分析
- 7.1 大数据处理架构
- 流数据处理系统
- 7.2 机器学习集成
- 机器学习服务实现
正文
1. Node.js 高级主题概览
Node.js 高级主题涵盖了深入的技术概念和实践,包括事件循环机制、内存管理、性能优化、微服务架构、实时通信等核心领域。掌握这些高级主题对于构建高性能、可扩展的企业级应用至关重要。
1.1 高级主题架构图
2. 事件循环与异步编程深度解析
2.1 事件循环机制详解
事件循环阶段详解
// event-loop-demo.js
const fs = require('fs');console.log('=== 事件循环演示 ===');// 1. 同步代码
console.log('1. 同步代码执行');// 2. process.nextTick (微任务,最高优先级)
process.nextTick(() => {console.log('2. process.nextTick 回调');
});// 3. Promise (微任务)
Promise.resolve().then(() => {console.log('3. Promise.then 回调');
});// 4. setImmediate (Check 阶段)
setImmediate(() => {console.log('4. setImmediate 回调');
});// 5. setTimeout (Timer 阶段)
setTimeout(() => {console.log('5. setTimeout 回调');
}, 0);// 6. I/O 操作 (Poll 阶段)
fs.readFile(__filename, () => {console.log('6. fs.readFile 回调');// 在 I/O 回调中的 setImmediate 会在下一次 setTimeout 之前执行setImmediate(() => {console.log('7. setImmediate 在 I/O 回调中');});setTimeout(() => {console.log('8. setTimeout 在 I/O 回调中');}, 0);
});console.log('9. 同步代码结束');/*
输出顺序:
1. 同步代码执行
9. 同步代码结束
2. process.nextTick 回调
3. Promise.then 回调
5. setTimeout 回调
4. setImmediate 回调
6. fs.readFile 回调
7. setImmediate 在 I/O 回调中
8. setTimeout 在 I/O 回调中
*/
2.2 异步编程模式演进
高级异步模式实现
// advanced-async-patterns.js// 1. 异步迭代器
class AsyncDataProcessor {constructor(data) {this.data = data;this.index = 0;}async *[Symbol.asyncIterator]() {while (this.index < this.data.length) {// 模拟异步处理await new Promise(resolve => setTimeout(resolve, 100));yield this.data[this.index++];}}
}// 使用异步迭代器
async function processDataAsync() {const processor = new AsyncDataProcessor([1, 2, 3, 4, 5]);for await (const item of processor) {console.log('处理项目:', item);}
}// 2. 异步生成器与流控制
async function* asyncGenerator() {let i = 0;while (i < 10) {await new Promise(resolve => setTimeout(resolve, 500));yield i++;}
}// 3. 高级 Promise 模式
class PromisePool {constructor(concurrency = 3) {this.concurrency = concurrency;this.running = [];this.queue = [];}async add(promiseFunction) {return new Promise((resolve, reject) => {this.queue.push({promiseFunction,resolve,reject});this.process();});}async process() {if (this.running.length >= this.concurrency || this.queue.length === 0) {return;}const { promiseFunction, resolve, reject } = this.queue.shift();const promise = promiseFunction().then(resolve).catch(reject).finally(() => {this.running.splice(this.running.indexOf(promise), 1);this.process();});this.running.push(promise);}
}// 4. 可取消的 Promise
class CancellablePromise {constructor(executor) {this.isCancelled = false;this.promise = new Promise((resolve, reject) => {this.cancel = () => {this.isCancelled = true;reject(new Error('Promise was cancelled'));};executor((value) => {if (!this.isCancelled) {resolve(value);}},(reason) => {if (!this.isCancelled) {reject(reason);}});});}then(onFulfilled, onRejected) {return this.promise.then(onFulfilled, onRejected);}catch(onRejected) {return this.promise.catch(onRejected);}
}// 5. 异步重试机制
async function retryAsync(fn, maxRetries = 3, delay = 1000) {let lastError;for (let i = 0; i <= maxRetries; i++) {try {return await fn();} catch (error) {lastError = error;if (i === maxRetries) {throw lastError;}// 指数退避const waitTime = delay * Math.pow(2, i);await new Promise(resolve => setTimeout(resolve, waitTime));}}
}module.exports = {AsyncDataProcessor,PromisePool,CancellablePromise,retryAsync,processDataAsync,asyncGenerator
};
3. 内存管理与性能优化
3.1 V8 内存管理机制
内存监控与分析工具
// memory-profiler.js
const v8 = require('v8');
const fs = require('fs');class MemoryProfiler {constructor() {this.snapshots = [];this.startTime = Date.now();}// 获取内存使用情况getMemoryUsage() {const usage = process.memoryUsage();const heapStats = v8.getHeapStatistics();return {timestamp: Date.now(),rss: usage.rss,heapTotal: usage.heapTotal,heapUsed: usage.heapUsed,external: usage.external,arrayBuffers: usage.arrayBuffers,heapSizeLimit: heapStats.heap_size_limit,totalHeapSize: heapStats.total_heap_size,usedHeapSize: heapStats.used_heap_size,mallocedMemory: heapStats.malloced_memory,peakMallocedMemory: heapStats.peak_malloced_memory};}// 生成堆快照takeHeapSnapshot(filename) {const snapshotStream = v8.getHeapSnapshot();const fileStream = fs.createWriteStream(filename || `heap-${Date.now()}.heapsnapshot`);snapshotStream.pipe(fileStream);return new Promise((resolve, reject) => {fileStream.on('finish', () => {console.log(`堆快照已保存: ${filename}`);resolve(filename);});fileStream.on('error', reject);});}// 监控内存泄漏startMemoryLeak Detection() {const interval = setInterval(() => {const usage = this.getMemoryUsage();this.snapshots.push(usage);// 保留最近100个快照if (this.snapshots.length > 100) {this.snapshots.shift();}// 检测内存泄漏if (this.snapshots.length >= 10) {const recent = this.snapshots.slice(-10);const trend = this.calculateMemoryTrend(recent);if (trend.isIncreasing && trend.rate > 1024 * 1024) { // 1MB/snapshotconsole.warn('检测到可能的内存泄漏:', {trend: trend.rate,currentUsage: usage.heapUsed});}}}, 5000);return () => clearInterval(interval);}// 计算内存趋势calculateMemoryTrend(snapshots) {if (snapshots.length < 2) return { isIncreasing: false, rate: 0 };const first = snapshots[0].heapUsed;const last = snapshots[snapshots.length - 1].heapUsed;const rate = (last - first) / snapshots.length;return {isIncreasing: rate > 0,rate: rate};}// 强制垃圾回收 (需要 --expose-gc 标志)forceGC() {if (global.gc) {const before = this.getMemoryUsage();global.gc();const after = this.getMemoryUsage();console.log('垃圾回收效果:', {before: before.heapUsed,after: after.heapUsed,freed: before.heapUsed - after.heapUsed});} else {console.warn('垃圾回收不可用,请使用 --expose-gc 标志启动');}}// 生成内存报告generateReport() {const current = this.getMemoryUsage();const runtime = Date.now() - this.startTime;return {runtime: runtime,currentMemory: current,snapshots: this.snapshots.length,averageHeapUsed: this.snapshots.reduce((sum, s) => sum + s.heapUsed, 0) / this.snapshots.length};}
}module.exports = MemoryProfiler;
3.2 性能优化策略
性能分析工具实现
// performance-analyzer.js
const { performance, PerformanceObserver } = require('perf_hooks');class PerformanceAnalyzer {constructor() {this.metrics = new Map();this.observers = [];this.setupObservers();}// 设置性能观察器setupObservers() {// HTTP 请求性能观察const httpObserver = new PerformanceObserver((list) => {for (const entry of list.getEntries()) {this.recordMetric('http', {name: entry.name,duration: entry.duration,startTime: entry.startTime});}});httpObserver.observe({ entryTypes: ['http'] });this.observers.push(httpObserver);// 函数性能观察const functionObserver = new PerformanceObserver((list) => {for (const entry of list.getEntries()) {this.recordMetric('function', {name: entry.name,duration: entry.duration,startTime: entry.startTime});}});functionObserver.observe({ entryTypes: ['function', 'measure'] });this.observers.push(functionObserver);}// 记录指标recordMetric(type, data) {if (!this.metrics.has(type)) {this.metrics.set(type, []);}this.metrics.get(type).push({...data,timestamp: Date.now()});}// 性能装饰器performanceDecorator(target, propertyKey, descriptor) {const originalMethod = descriptor.value;descriptor.value = async function(...args) {const start = performance.now();const markStart = `${propertyKey}-start`;const markEnd = `${propertyKey}-end`;const measureName = `${propertyKey}-duration`;performance.mark(markStart);try {const result = await originalMethod.apply(this, args);return result;} finally {performance.mark(markEnd);performance.measure(measureName, markStart, markEnd);const end = performance.now();console.log(`${propertyKey} 执行时间: ${(end - start).toFixed(2)}ms`);}};return descriptor;}// 函数性能测试async measureFunction(fn, iterations = 1000) {const results = [];for (let i = 0; i < iterations; i++) {const start = performance.now();await fn();const end = performance.now();results.push(end - start);}return {iterations,min: Math.min(...results),max: Math.max(...results),average: results.reduce((a, b) => a + b, 0) / results.length,median: results.sort((a, b) => a - b)[Math.floor(results.length / 2)],p95: results.sort((a, b) => a - b)[Math.floor(results.length * 0.95)],p99: results.sort((a, b) => a - b)[Math.floor(results.length * 0.99)]};}// CPU 使用率监控monitorCPU() {const startUsage = process.cpuUsage();const startTime = process.hrtime();return () => {const endUsage = process.cpuUsage(startUsage);const endTime = process.hrtime(startTime);const totalTime = endTime[0] * 1000000 + endTime[1] / 1000; // 微秒const cpuPercent = (endUsage.user + endUsage.system) / totalTime * 100;return {user: endUsage.user,system: endUsage.system,total: endUsage.user + endUsage.system,percentage: cpuPercent};};}// 生成性能报告generateReport() {const report = {timestamp: new Date().toISOString(),metrics: {}};for (const [type, data] of this.metrics) {const durations = data.map(d => d.duration).filter(d => d !== undefined);if (durations.length > 0) {report.metrics[type] = {count: data.length,averageDuration: durations.reduce((a, b) => a + b, 0) / durations.length,minDuration: Math.min(...durations),maxDuration: Math.max(...durations),recentEntries: data.slice(-10)};}}return report;}// 清理资源cleanup() {this.observers.forEach(observer => observer.disconnect());this.metrics.clear();}
}// 使用示例
const analyzer = new PerformanceAnalyzer();// 装饰器使用示例
class DatabaseService {@analyzer.performanceDecoratorasync queryUsers() {// 模拟数据库查询await new Promise(resolve => setTimeout(resolve, 100));return [];}
}module.exports = PerformanceAnalyzer;
4. 微服务架构与设计模式
4.1 微服务架构模式
微服务基础框架实现
// microservice-framework.js
const express = require('express');
const { EventEmitter } = require('events');
const axios = require('axios');class MicroService extends EventEmitter {constructor(config) {super();this.config = {name: 'unnamed-service',port: 3000,version: '1.0.0',...config};this.app = express();this.services = new Map();this.middlewares = [];this.routes = new Map();this.healthChecks = [];this.setupDefaultMiddleware();this.setupDefaultRoutes();}// 设置默认中间件setupDefaultMiddleware() {this.app.use(express.json());this.app.use(express.urlencoded({ extended: true }));// 请求追踪中间件this.app.use((req, res, next) => {req.traceId = this.generateTraceId();req.startTime = Date.now();res.on('finish', () => {const duration = Date.now() - req.startTime;this.emit('request', {traceId: req.traceId,method: req.method,path: req.path,statusCode: res.statusCode,duration});});next();});}// 设置默认路由setupDefaultRoutes() {// 健康检查this.app.get('/health', async (req, res) => {const health = await this.performHealthCheck();const statusCode = health.status === 'healthy' ? 200 : 503;res.status(statusCode).json(health);});// 服务信息this.app.get('/info', (req, res) => {res.json({name: this.config.name,version: this.config.version,uptime: process.uptime(),memory: process.memoryUsage(),pid: process.pid});});// 指标端点this.app.get('/metrics', (req, res) => {res.json(this.getMetrics());});}// 注册服务registerService(name, config) {this.services.set(name, {name,url: config.url,timeout: config.timeout || 5000,retries: config.retries || 3,circuitBreaker: new CircuitBreaker(config.circuitBreaker)});}// 调用其他服务async callService(serviceName, path, options = {}) {const service = this.services.get(serviceName);if (!service) {throw new Error(`Service ${serviceName} not registered`);}const url = `${service.url}${path}`;const config = {timeout: service.timeout,headers: {'X-Trace-ID': options.traceId || this.generateTraceId(),'X-Service-Name': this.config.name,...options.headers},...options};try {return await service.circuitBreaker.execute(() => axios(url, config));} catch (error) {this.emit('service-call-error', {service: serviceName,url,error: error.message});throw error;}}// 添加中间件use(middleware) {this.middlewares.push(middleware);this.app.use(middleware);}// 添加路由route(method, path, handler) {const routeKey = `${method.toUpperCase()} ${path}`;this.routes.set(routeKey, handler);this.app[method.toLowerCase()](path, handler);}// 添加健康检查addHealthCheck(name, checkFunction) {this.healthChecks.push({ name, check: checkFunction });}// 执行健康检查async performHealthCheck() {const results = await Promise.allSettled(this.healthChecks.map(async ({ name, check }) => {try {const result = await check();return { name, status: 'healthy', ...result };} catch (error) {return { name, status: 'unhealthy', error: error.message };}}));const checks = results.map(result => result.value || result.reason);const allHealthy = checks.every(check => check.status === 'healthy');return {status: allHealthy ? 'healthy' : 'unhealthy',timestamp: new Date().toISOString(),checks};}// 获取指标getMetrics() {return {service: this.config.name,version: this.config.version,uptime: process.uptime(),memory: process.memoryUsage(),cpu: process.cpuUsage(),routes: Array.from(this.routes.keys()),services: Array.from(this.services.keys())};}// 生成追踪IDgenerateTraceId() {return Math.random().toString(36).substr(2, 9);}// 启动服务async start() {return new Promise((resolve, reject) => {this.server = this.app.listen(this.config.port, (error) => {if (error) {reject(error);} else {console.log(`${this.config.name} started on port ${this.config.port}`);this.emit('started');resolve();}});});}// 停止服务async stop() {return new Promise((resolve) => {if (this.server) {this.server.close(() => {console.log(`${this.config.name} stopped`);this.emit('stopped');resolve();});} else {resolve();}});}
}// 熔断器实现
class CircuitBreaker {constructor(options = {}) {this.failureThreshold = options.failureThreshold || 5;this.timeout = options.timeout || 60000;this.monitoringPeriod = options.monitoringPeriod || 10000;this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPENthis.failureCount = 0;this.lastFailureTime = null;this.successCount = 0;}async execute(operation) {if (this.state === 'OPEN') {if (Date.now() - this.lastFailureTime >= this.timeout) {this.state = 'HALF_OPEN';this.successCount = 0;} else {throw new Error('Circuit breaker is OPEN');}}try {const result = await operation();this.onSuccess();return result;} catch (error) {this.onFailure();throw error;}}onSuccess() {this.failureCount = 0;if (this.state === 'HALF_OPEN') {this.successCount++;if (this.successCount >= 3) {this.state = 'CLOSED';}}}onFailure() {this.failureCount++;this.lastFailureTime = Date.now();if (this.failureCount >= this.failureThreshold) {this.state = 'OPEN';}}
}module.exports = { MicroService, CircuitBreaker };
4.2 设计模式在 Node.js 中的应用
高级设计模式实现
// design-patterns.js// 1. 单例模式 - 数据库连接管理
class DatabaseManager {constructor() {if (DatabaseManager.instance) {return DatabaseManager.instance;}this.connections = new Map();DatabaseManager.instance = this;}async getConnection(config) {const key = `${config.host}:${config.port}/${config.database}`;if (!this.connections.has(key)) {const connection = await this.createConnection(config);this.connections.set(key, connection);}return this.connections.get(key);}async createConnection(config) {// 模拟数据库连接创建return {host: config.host,port: config.port,database: config.database,connected: true};}
}// 2. 工厂模式 - 日志记录器工厂
class LoggerFactory {static createLogger(type, config) {switch (type) {case 'console':return new ConsoleLogger(config);case 'file':return new FileLogger(config);case 'database':return new DatabaseLogger(config);default:throw new Error(`Unknown logger type: ${type}`);}}
}class ConsoleLogger {constructor(config) {this.level = config.level || 'info';}log(level, message) {if (this.shouldLog(level)) {console.log(`[${level.toUpperCase()}] ${message}`);}}shouldLog(level) {const levels = ['debug', 'info', 'warn', 'error'];return levels.indexOf(level) >= levels.indexOf(this.level);}
}// 3. 装饰器模式 - 缓存装饰器
function cacheDecorator(ttl = 300000) {const cache = new Map();return function(target, propertyKey, descriptor) {const originalMethod = descriptor.value;descriptor.value = async function(...args) {const cacheKey = `${propertyKey}:${JSON.stringify(args)}`;const cached = cache.get(cacheKey);if (cached && Date.now() - cached.timestamp < ttl) {return cached.value;}const result = await originalMethod.apply(this, args);cache.set(cacheKey, {value: result,timestamp: Date.now()});return result;};return descriptor;};
}// 4. 策略模式 - 支付处理策略
class PaymentProcessor {constructor() {this.strategies = new Map();}addStrategy(name, strategy) {this.strategies.set(name, strategy);}async processPayment(method, amount, details) {const strategy = this.strategies.get(method);if (!strategy) {throw new Error(`Payment method ${method} not supported`);}return await strategy.process(amount, details);}
}class CreditCardStrategy {async process(amount, details) {// 信用卡支付逻辑return {success: true,transactionId: `cc_${Date.now()}`,amount,method: 'credit_card'};}
}class PayPalStrategy {async process(amount, details) {// PayPal 支付逻辑return {success: true,transactionId: `pp_${Date.now()}`,amount,method: 'paypal'};}
}// 5. 观察者模式 - 事件系统
class EventManager {constructor() {this.listeners = new Map();}subscribe(event, callback) {if (!this.listeners.has(event)) {this.listeners.set(event, []);}this.listeners.get(event).push(callback);// 返回取消订阅函数return () => {const callbacks = this.listeners.get(event);const index = callbacks.indexOf(callback);if (index > -1) {callbacks.splice(index, 1);}};}async publish(event, data) {const callbacks = this.listeners.get(event) || [];// 并行执行所有回调await Promise.allSettled(callbacks.map(callback => callback(data)));}
}// 6. 命令模式 - 任务队列
class Command {constructor(execute, undo) {this.execute = execute;this.undo = undo;}
}class TaskQueue {constructor() {this.commands = [];this.currentIndex = -1;}execute(command) {// 移除当前位置之后的命令this.commands = this.commands.slice(0, this.currentIndex + 1);// 添加新命令this.commands.push(command);this.currentIndex++;// 执行命令return command.execute();}undo() {if (this.currentIndex >= 0) {const command = this.commands[this.currentIndex];this.currentIndex--;return command.undo();}}redo() {if (this.currentIndex < this.commands.length - 1) {this.currentIndex++;const command = this.commands[this.currentIndex];return command.execute();}}
}// 7. 中介者模式 - 聊天室
class ChatRoom {constructor() {this.users = new Map();this.rooms = new Map();}addUser(user) {this.users.set(user.id, user);user.setChatRoom(this);}removeUser(userId) {this.users.delete(userId);}sendMessage(fromUserId, toUserId, message) {const toUser = this.users.get(toUserId);if (toUser) {toUser.receiveMessage(fromUserId, message);}}broadcast(fromUserId, message, roomId) {const room = this.rooms.get(roomId);if (room) {room.members.forEach(userId => {if (userId !== fromUserId) {this.sendMessage(fromUserId, userId, message);}});}}
}class User {constructor(id, name) {this.id = id;this.name = name;this.chatRoom = null;}setChatRoom(chatRoom) {this.chatRoom = chatRoom;}sendMessage(toUserId, message) {if (this.chatRoom) {this.chatRoom.sendMessage(this.id, toUserId, message);}}receiveMessage(fromUserId, message) {console.log(`${this.name} received message from ${fromUserId}: ${message}`);}
}module.exports = {DatabaseManager,LoggerFactory,cacheDecorator,PaymentProcessor,CreditCardStrategy,PayPalStrategy,EventManager,Command,TaskQueue,ChatRoom,User
};
5. 实时通信与 WebSocket
5.1 实时通信架构
WebSocket 服务器实现
// websocket-server.js
const WebSocket = require('ws');
const { EventEmitter } = require('events');
const jwt = require('jsonwebtoken');class WebSocketServer extends EventEmitter {constructor(options = {}) {super();this.options = {port: 8080,verifyClient: null,...options};this.clients = new Map();this.rooms = new Map();this.messageHandlers = new Map();this.setupServer();this.setupMessageHandlers();}setupServer() {this.wss = new WebSocket.Server({port: this.options.port,verifyClient: this.verifyClient.bind(this)});this.wss.on('connection', this.handleConnection.bind(this));console.log(`WebSocket server started on port ${this.options.port}`);}verifyClient(info) {if (this.options.verifyClient) {return this.options.verifyClient(info);}// 默认验证逻辑const token = this.extractToken(info.req);if (!token) {return false;}try {const decoded = jwt.verify(token, process.env.JWT_SECRET);info.req.user = decoded;return true;} catch (error) {return false;}}extractToken(req) {const url = new URL(req.url, `http://${req.headers.host}`);return url.searchParams.get('token');}handleConnection(ws, req) {const clientId = this.generateClientId();const user = req.user;const client = {id: clientId,ws,user,rooms: new Set(),lastPing: Date.now(),metadata: {}};this.clients.set(clientId, client);ws.on('message', (data) => this.handleMessage(client, data));ws.on('close', () => this.handleDisconnection(client));ws.on('error', (error) => this.handleError(client, error));ws.on('pong', () => this.handlePong(client));// 发送连接确认this.sendToClient(client, {type: 'connection',clientId,timestamp: Date.now()});this.emit('connection', client);}handleMessage(client, data) {try {const message = JSON.parse(data);const handler = this.messageHandlers.get(message.type);if (handler) {handler(client, message);} else {this.sendError(client, `Unknown message type: ${message.type}`);}} catch (error) {this.sendError(client, 'Invalid message format');}}setupMessageHandlers() {// 加入房间this.messageHandlers.set('join_room', (client, message) => {const { roomId } = message;this.joinRoom(client, roomId);});// 离开房间this.messageHandlers.set('leave_room', (client, message) => {const { roomId } = message;this.leaveRoom(client, roomId);});// 发送消息到房间this.messageHandlers.set('room_message', (client, message) => {const { roomId, content } = message;this.sendToRoom(roomId, {type: 'room_message',from: client.id,content,timestamp: Date.now()}, client.id);});// 私聊消息this.messageHandlers.set('private_message', (client, message) => {const { targetId, content } = message;const targetClient = this.clients.get(targetId);if (targetClient) {this.sendToClient(targetClient, {type: 'private_message',from: client.id,content,timestamp: Date.now()});} else {this.sendError(client, 'Target client not found');}});// 心跳this.messageHandlers.set('ping', (client, message) => {client.lastPing = Date.now();this.sendToClient(client, { type: 'pong' });});}joinRoom(client, roomId) {if (!this.rooms.has(roomId)) {this.rooms.set(roomId, {id: roomId,clients: new Set(),metadata: {}});}const room = this.rooms.get(roomId);room.clients.add(client.id);client.rooms.add(roomId);// 通知房间内其他用户this.sendToRoom(roomId, {type: 'user_joined',userId: client.id,roomId,timestamp: Date.now()}, client.id);// 发送房间信息给新用户this.sendToClient(client, {type: 'room_joined',roomId,users: Array.from(room.clients),timestamp: Date.now()});}leaveRoom(client, roomId) {const room = this.rooms.get(roomId);if (room) {room.clients.delete(client.id);client.rooms.delete(roomId);// 如果房间为空,删除房间if (room.clients.size === 0) {this.rooms.delete(roomId);} else {// 通知房间内其他用户this.sendToRoom(roomId, {type: 'user_left',userId: client.id,roomId,timestamp: Date.now()});}}}sendToClient(client, message) {if (client.ws.readyState === WebSocket.OPEN) {client.ws.send(JSON.stringify(message));}}sendToRoom(roomId, message, excludeClientId = null) {const room = this.rooms.get(roomId);if (room) {room.clients.forEach(clientId => {if (clientId !== excludeClientId) {const client = this.clients.get(clientId);if (client) {this.sendToClient(client, message);}}});}}broadcast(message, excludeClientId = null) {this.clients.forEach((client, clientId) => {if (clientId !== excludeClientId) {this.sendToClient(client, message);}});}sendError(client, error) {this.sendToClient(client, {type: 'error',message: error,timestamp: Date.now()});}handleDisconnection(client) {// 从所有房间中移除客户端client.rooms.forEach(roomId => {this.leaveRoom(client, roomId);});this.clients.delete(client.id);this.emit('disconnection', client);}handleError(client, error) {console.error(`WebSocket error for client ${client.id}:`, error);this.emit('error', { client, error });}handlePong(client) {client.lastPing = Date.now();}generateClientId() {return Math.random().toString(36).substr(2, 9);}// 健康检查 - 清理断开的连接startHealthCheck() {setInterval(() => {const now = Date.now();const timeout = 30000; // 30秒超时this.clients.forEach((client, clientId) => {if (now - client.lastPing > timeout) {console.log(`Client ${clientId} timed out`);client.ws.terminate();this.handleDisconnection(client);}});}, 10000); // 每10秒检查一次}// 获取统计信息getStats() {return {totalClients: this.clients.size,totalRooms: this.rooms.size,roomDetails: Array.from(this.rooms.values()).map(room => ({id: room.id,clientCount: room.clients.size}))};}
}module.exports = WebSocketServer;
5.2 实时数据同步系统
实时协作编辑器实现
// collaborative-editor.js
const { EventEmitter } = require('events');// 操作转换算法实现
class OperationalTransform {static transform(op1, op2) {// 简化的操作转换实现if (op1.type === 'insert' && op2.type === 'insert') {if (op1.position <= op2.position) {return [op1, { ...op2, position: op2.position + op1.content.length }];} else {return [{ ...op1, position: op1.position + op2.content.length }, op2];}}if (op1.type === 'delete' && op2.type === 'delete') {if (op1.position + op1.length <= op2.position) {return [op1, { ...op2, position: op2.position - op1.length }];} else if (op2.position + op2.length <= op1.position) {return [{ ...op1, position: op1.position - op2.length }, op2];} else {// 重叠删除,需要复杂处理return this.handleOverlappingDeletes(op1, op2);}}if (op1.type === 'insert' && op2.type === 'delete') {if (op1.position <= op2.position) {return [op1, { ...op2, position: op2.position + op1.content.length }];} else if (op1.position >= op2.position + op2.length) {return [{ ...op1, position: op1.position - op2.length }, op2];} else {return [{ ...op1, position: op2.position }, op2];}}if (op1.type === 'delete' && op2.type === 'insert') {const [transformedOp2, transformedOp1] = this.transform(op2, op1);return [transformedOp1, transformedOp2];}return [op1, op2];}static handleOverlappingDeletes(op1, op2) {// 处理重叠删除的复杂逻辑const start1 = op1.position;const end1 = op1.position + op1.length;const start2 = op2.position;const end2 = op2.position + op2.length;const newStart = Math.min(start1, start2);const newEnd = Math.max(end1, end2);return [{ type: 'delete', position: newStart, length: newEnd - newStart },{ type: 'noop' }];}
}// 文档状态管理
class DocumentState {constructor(initialContent = '') {this.content = initialContent;this.version = 0;this.operations = [];}applyOperation(operation) {switch (operation.type) {case 'insert':this.content = this.content.slice(0, operation.position) +operation.content +this.content.slice(operation.position);break;case 'delete':this.content = this.content.slice(0, operation.position) +this.content.slice(operation.position + operation.length);break;case 'replace':this.content = this.content.slice(0, operation.position) +operation.content +this.content.slice(operation.position + operation.length);break;}this.version++;this.operations.push({ ...operation, version: this.version });}getOperationsSince(version) {return this.operations.filter(op => op.version > version);}
}// 协作编辑器服务器
class CollaborativeEditor extends EventEmitter {constructor() {super();this.documents = new Map();this.clients = new Map();this.clientDocuments = new Map();}createDocument(documentId, initialContent = '') {if (!this.documents.has(documentId)) {this.documents.set(documentId, new DocumentState(initialContent));}return this.documents.get(documentId);}joinDocument(clientId, documentId) {const document = this.createDocument(documentId);if (!this.clientDocuments.has(documentId)) {this.clientDocuments.set(documentId, new Set());}this.clientDocuments.get(documentId).add(clientId);// 发送当前文档状态给客户端this.sendToClient(clientId, {type: 'document_state',documentId,content: document.content,version: document.version});// 通知其他客户端有新用户加入this.broadcastToDocument(documentId, {type: 'user_joined',clientId,documentId}, clientId);}leaveDocument(clientId, documentId) {const clients = this.clientDocuments.get(documentId);if (clients) {clients.delete(clientId);if (clients.size === 0) {this.clientDocuments.delete(documentId);} else {this.broadcastToDocument(documentId, {type: 'user_left',clientId,documentId});}}}handleOperation(clientId, documentId, operation) {const document = this.documents.get(documentId);if (!document) {return;}// 获取客户端版本之后的所有操作const serverOps = document.getOperationsSince(operation.baseVersion);// 对操作进行转换let transformedOp = operation;for (const serverOp of serverOps) {[transformedOp] = OperationalTransform.transform(transformedOp, serverOp);}// 应用转换后的操作document.applyOperation(transformedOp);// 广播操作给其他客户端this.broadcastToDocument(documentId, {type: 'operation',operation: { ...transformedOp, version: document.version },documentId}, clientId);// 确认操作给发送者this.sendToClient(clientId, {type: 'operation_ack',operationId: operation.id,version: document.version});}sendToClient(clientId, message) {const client = this.clients.get(clientId);if (client && client.send) {client.send(JSON.stringify(message));}}broadcastToDocument(documentId, message, excludeClientId = null) {const clients = this.clientDocuments.get(documentId);if (clients) {clients.forEach(clientId => {if (clientId !== excludeClientId) {this.sendToClient(clientId, message);}});}}addClient(clientId, connection) {this.clients.set(clientId, connection);}removeClient(clientId) {// 从所有文档中移除客户端this.clientDocuments.forEach((clients, documentId) => {if (clients.has(clientId)) {this.leaveDocument(clientId, documentId);}});this.clients.delete(clientId);}getDocumentStats(documentId) {const document = this.documents.get(documentId);const clients = this.clientDocuments.get(documentId);return {documentId,version: document?.version || 0,contentLength: document?.content?.length || 0,activeClients: clients?.size || 0,operationCount: document?.operations?.length || 0};}
}// 客户端操作队列
class ClientOperationQueue {constructor(sendOperation) {this.sendOperation = sendOperation;this.pendingOperations = [];this.acknowledgedVersion = 0;this.localVersion = 0;}addOperation(operation) {operation.id = this.generateOperationId();operation.baseVersion = this.acknowledgedVersion;this.pendingOperations.push(operation);this.sendOperation(operation);}handleAcknowledgment(operationId, serverVersion) {const index = this.pendingOperations.findIndex(op => op.id === operationId);if (index !== -1) {this.pendingOperations.splice(0, index + 1);this.acknowledgedVersion = serverVersion;}}handleServerOperation(serverOperation) {// 转换待处理的操作this.pendingOperations = this.pendingOperations.map(pendingOp => {const [transformed] = OperationalTransform.transform(pendingOp, serverOperation);return transformed;});}generateOperationId() {return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;}
}module.exports = {OperationalTransform,DocumentState,CollaborativeEditor,ClientOperationQueue
};
6. 安全与认证系统
6.1 认证与授权架构
高级认证系统实现
// advanced-auth-system.js
const crypto = require('crypto');
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');
const speakeasy = require('speakeasy');
const QRCode = require('qrcode');class AdvancedAuthSystem {constructor(config) {this.config = {jwtSecret: process.env.JWT_SECRET,jwtExpiry: '15m',refreshTokenExpiry: '7d',bcryptRounds: 12,maxLoginAttempts: 5,lockoutDuration: 15 * 60 * 1000, // 15分钟...config};this.users = new Map();this.refreshTokens = new Map();this.loginAttempts = new Map();this.sessions = new Map();}// 用户注册async register(userData) {const { username, email, password, phone } = userData;// 验证用户是否已存在if (this.findUserByEmail(email) || this.findUserByUsername(username)) {throw new Error('User already exists');}// 密码强度验证this.validatePasswordStrength(password);// 加密密码const hashedPassword = await bcrypt.hash(password, this.config.bcryptRounds);// 生成用户IDconst userId = this.generateUserId();// 创建用户const user = {id: userId,username,email,phone,password: hashedPassword,roles: ['user'],permissions: [],isActive: true,emailVerified: false,phoneVerified: false,twoFactorEnabled: false,twoFactorSecret: null,createdAt: new Date(),lastLogin: null,loginAttempts: 0,lockedUntil: null};this.users.set(userId, user);// 发送验证邮件await this.sendVerificationEmail(user);return {id: user.id,username: user.username,email: user.email,message: 'Registration successful. Please verify your email.'};}// 用户登录async login(credentials) {const { email, password, twoFactorCode } = credentials;const user = this.findUserByEmail(email);if (!user) {throw new Error('Invalid credentials');}// 检查账户锁定if (this.isAccountLocked(user)) {throw new Error('Account is locked due to too many failed attempts');}// 验证密码const isPasswordValid = await bcrypt.compare(password, user.password);if (!isPasswordValid) {await this.handleFailedLogin(user);throw new Error('Invalid credentials');}// 验证两因素认证if (user.twoFactorEnabled) {if (!twoFactorCode) {throw new Error('Two-factor authentication code required');}const isValidCode = speakeasy.totp.verify({secret: user.twoFactorSecret,encoding: 'base32',token: twoFactorCode,window: 2});if (!isValidCode) {throw new Error('Invalid two-factor authentication code');}}// 重置登录尝试user.loginAttempts = 0;user.lockedUntil = null;user.lastLogin = new Date();// 生成令牌const tokens = await this.generateTokens(user);// 创建会话const sessionId = this.createSession(user, tokens);return {user: this.sanitizeUser(user),tokens,sessionId};}// 生成令牌async generateTokens(user) {const payload = {userId: user.id,username: user.username,email: user.email,roles: user.roles,permissions: user.permissions};const accessToken = jwt.sign(payload, this.config.jwtSecret, {expiresIn: this.config.jwtExpiry,issuer: 'auth-service',audience: 'api-service'});const refreshToken = this.generateRefreshToken();// 存储刷新令牌this.refreshTokens.set(refreshToken, {userId: user.id,createdAt: new Date(),expiresAt: new Date(Date.now() + this.parseExpiry(this.config.refreshTokenExpiry))});return {accessToken,refreshToken,expiresIn: this.parseExpiry(this.config.jwtExpiry)};}// 刷新令牌async refreshToken(refreshToken) {const tokenData = this.refreshTokens.get(refreshToken);if (!tokenData || tokenData.expiresAt < new Date()) {throw new Error('Invalid or expired refresh token');}const user = this.users.get(tokenData.userId);if (!user || !user.isActive) {throw new Error('User not found or inactive');}// 删除旧的刷新令牌this.refreshTokens.delete(refreshToken);// 生成新令牌return await this.generateTokens(user);}// 启用两因素认证async enableTwoFactor(userId) {const user = this.users.get(userId);if (!user) {throw new Error('User not found');}const secret = speakeasy.generateSecret({name: `MyApp (${user.email})`,issuer: 'MyApp'});user.twoFactorSecret = secret.base32;const qrCodeUrl = await QRCode.toDataURL(secret.otpauth_url);return {secret: secret.base32,qrCode: qrCodeUrl,backupCodes: this.generateBackupCodes()};}// 验证两因素认证设置async verifyTwoFactorSetup(userId, token) {const user = this.users.get(userId);if (!user || !user.twoFactorSecret) {throw new Error('Two-factor setup not initiated');}const isValid = speakeasy.totp.verify({secret: user.twoFactorSecret,encoding: 'base32',token,window: 2});if (isValid) {user.twoFactorEnabled = true;return { success: true, message: 'Two-factor authentication enabled' };} else {throw new Error('Invalid verification code');}}// 权限检查中间件requirePermission(permission) {return (req, res, next) => {const user = req.user;if (!user) {return res.status(401).json({ error: 'Authentication required' });}if (this.hasPermission(user, permission)) {next();} else {res.status(403).json({ error: 'Insufficient permissions' });}};}// 角色检查中间件requireRole(role) {return (req, res, next) => {const user = req.user;if (!user) {return res.status(401).json({ error: 'Authentication required' });}if (user.roles.includes(role) || user.roles.includes('admin')) {next();} else {res.status(403).json({ error: 'Insufficient role' });}};}// 检查用户权限hasPermission(user, permission) {// 管理员拥有所有权限if (user.roles.includes('admin')) {return true;}// 检查直接权限if (user.permissions.includes(permission)) {return true;}// 检查角色权限return user.roles.some(role => {const rolePermissions = this.getRolePermissions(role);return rolePermissions.includes(permission);});}// 获取角色权限getRolePermissions(role) {const rolePermissions = {user: ['read:profile', 'update:profile'],moderator: ['read:profile', 'update:profile', 'moderate:content'],admin: ['*'] // 所有权限};return rolePermissions[role] || [];}// 密码强度验证validatePasswordStrength(password) {const minLength = 8;const hasUpperCase = /[A-Z]/.test(password);const hasLowerCase = /[a-z]/.test(password);const hasNumbers = /\d/.test(password);const hasSpecialChar = /[!@#$%^&*(),.?":{}|<>]/.test(password);if (password.length < minLength) {throw new Error('Password must be at least 8 characters long');}if (!hasUpperCase || !hasLowerCase || !hasNumbers || !hasSpecialChar) {throw new Error('Password must contain uppercase, lowercase, numbers, and special characters');}}// 处理登录失败async handleFailedLogin(user) {user.loginAttempts = (user.loginAttempts || 0) + 1;if (user.loginAttempts >= this.config.maxLoginAttempts) {user.lockedUntil = new Date(Date.now() + this.config.lockoutDuration);}}// 检查账户是否锁定isAccountLocked(user) {return user.lockedUntil && user.lockedUntil > new Date();}// 创建会话createSession(user, tokens) {const sessionId = this.generateSessionId();this.sessions.set(sessionId, {userId: user.id,tokens,createdAt: new Date(),lastActivity: new Date(),ipAddress: null,userAgent: null});return sessionId;}// 工具方法generateUserId() {return crypto.randomBytes(16).toString('hex');}generateRefreshToken() {return crypto.randomBytes(32).toString('hex');}generateSessionId() {return crypto.randomBytes(24).toString('hex');}generateBackupCodes() {return Array.from({ length: 10 }, () => crypto.randomBytes(4).toString('hex').toUpperCase());}findUserByEmail(email) {return Array.from(this.users.values()).find(user => user.email === email);}findUserByUsername(username) {return Array.from(this.users.values()).find(user => user.username === username);}sanitizeUser(user) {const { password, twoFactorSecret, ...sanitized } = user;return sanitized;}parseExpiry(expiry) {const units = { s: 1000, m: 60000, h: 3600000, d: 86400000 };const match = expiry.match(/^(\d+)([smhd])$/);if (match) {return parseInt(match[1]) * units[match[2]];}return 900000; // 默认15分钟}async sendVerificationEmail(user) {// 实现邮件发送逻辑console.log(`Verification email sent to ${user.email}`);}
}module.exports = AdvancedAuthSystem;
7. 数据处理与分析
7.1 大数据处理架构
流数据处理系统
// stream-processor.js
const { Transform, Writable, pipeline } = require('stream');
const { EventEmitter } = require('events');class StreamProcessor extends EventEmitter {constructor(options = {}) {super();this.options = {batchSize: 1000,flushInterval: 5000,maxMemory: 100 * 1024 * 1024, // 100MB...options};this.processors = new Map();this.metrics = {processed: 0,errors: 0,startTime: Date.now()};}// 创建数据转换流createTransformStream(name, transformFn) {const transform = new Transform({objectMode: true,transform(chunk, encoding, callback) {try {const result = transformFn(chunk);if (result !== null && result !== undefined) {this.push(result);}callback();} catch (error) {callback(error);}}});this.processors.set(name, transform);return transform;}// 创建批处理流createBatchStream(batchSize = this.options.batchSize) {let batch = [];return new Transform({objectMode: true,transform(chunk, encoding, callback) {batch.push(chunk);if (batch.length >= batchSize) {this.push([...batch]);batch = [];}callback();},flush(callback) {if (batch.length > 0) {this.push(batch);}callback();}});}// 创建过滤流createFilterStream(filterFn) {return new Transform({objectMode: true,transform(chunk, encoding, callback) {try {if (filterFn(chunk)) {this.push(chunk);}callback();} catch (error) {callback(error);}}});}// 创建聚合流createAggregateStream(keyFn, aggregateFn, windowSize = 60000) {const windows = new Map();return new Transform({objectMode: true,transform(chunk, encoding, callback) {const key = keyFn(chunk);const now = Date.now();const windowStart = Math.floor(now / windowSize) * windowSize;const windowKey = `${key}:${windowStart}`;if (!windows.has(windowKey)) {windows.set(windowKey, {key,windowStart,windowEnd: windowStart + windowSize,data: []});}const window = windows.get(windowKey);window.data.push(chunk);// 检查是否需要输出完成的窗口const completedWindows = Array.from(windows.entries()).filter(([_, window]) => window.windowEnd <= now).map(([windowKey, window]) => {windows.delete(windowKey);return {key: window.key,windowStart: window.windowStart,windowEnd: window.windowEnd,result: aggregateFn(window.data)};});completedWindows.forEach(result => this.push(result));callback();}});}// 创建输出流createOutputSink(outputFn) {return new Writable({objectMode: true,write(chunk, encoding, callback) {try {outputFn(chunk);callback();} catch (error) {callback(error);}}});}// 创建处理管道createPipeline(streams) {return new Promise((resolve, reject) => {pipeline(...streams, (error) => {if (error) {this.metrics.errors++;this.emit('error', error);reject(error);} else {this.emit('complete');resolve();}});});}// 实时数据分析示例createRealTimeAnalyzer() {// 数据清洗流const cleaningStream = this.createTransformStream('cleaning', (data) => {if (!data || typeof data !== 'object') return null;// 数据清洗逻辑return {...data,timestamp: data.timestamp || Date.now(),processed: true};});// 数据验证流const validationStream = this.createFilterStream((data) => {return data.timestamp && data.value !== undefined && !isNaN(data.value);});// 数据聚合流const aggregationStream = this.createAggregateStream((data) => data.category || 'default',(dataArray) => ({count: dataArray.length,sum: dataArray.reduce((sum, item) => sum + (item.value || 0), 0),avg: dataArray.reduce((sum, item) => sum + (item.value || 0), 0) / dataArray.length,min: Math.min(...dataArray.map(item => item.value || 0)),max: Math.max(...dataArray.map(item => item.value || 0))}));// 输出流const outputStream = this.createOutputSink((result) => {this.metrics.processed++;this.emit('result', result);console.log('Analysis result:', result);});return [cleaningStream, validationStream, aggregationStream, outputStream];}// 获取处理指标getMetrics() {const runtime = Date.now() - this.metrics.startTime;return {...this.metrics,runtime,throughput: this.metrics.processed / (runtime / 1000)};}
}// 时序数据分析器
class TimeSeriesAnalyzer {constructor(options = {}) {this.windowSize = options.windowSize || 60000; // 1分钟this.retentionPeriod = options.retentionPeriod || 24 * 60 * 60 * 1000; // 24小时this.data = new Map();this.startCleanup();}addDataPoint(series, value, timestamp = Date.now()) {if (!this.data.has(series)) {this.data.set(series, []);}const seriesData = this.data.get(series);seriesData.push({ value, timestamp });// 保持数据按时间排序seriesData.sort((a, b) => a.timestamp - b.timestamp);}getMovingAverage(series, windowCount = 10) {const seriesData = this.data.get(series);if (!seriesData || seriesData.length < windowCount) {return null;}const recentData = seriesData.slice(-windowCount);const sum = recentData.reduce((sum, point) => sum + point.value, 0);return sum / recentData.length;}detectAnomalies(series, threshold = 2) {const seriesData = this.data.get(series);if (!seriesData || seriesData.length < 10) {return [];}const values = seriesData.map(point => point.value);const mean = values.reduce((sum, val) => sum + val, 0) / values.length;const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;const stdDev = Math.sqrt(variance);return seriesData.filter(point => Math.abs(point.value - mean) > threshold * stdDev);}getTrend(series, periods = 10) {const seriesData = this.data.get(series);if (!seriesData || seriesData.length < periods) {return null;}const recentData = seriesData.slice(-periods);const n = recentData.length;let sumX = 0, sumY = 0, sumXY = 0, sumXX = 0;recentData.forEach((point, index) => {sumX += index;sumY += point.value;sumXY += index * point.value;sumXX += index * index;});const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);const intercept = (sumY - slope * sumX) / n;return { slope, intercept, trend: slope > 0 ? 'increasing' : slope < 0 ? 'decreasing' : 'stable' };}startCleanup() {setInterval(() => {const cutoff = Date.now() - this.retentionPeriod;this.data.forEach((seriesData, series) => {const filteredData = seriesData.filter(point => point.timestamp > cutoff);this.data.set(series, filteredData);});}, 60000); // 每分钟清理一次}
}module.exports = {StreamProcessor,TimeSeriesAnalyzer
};
7.2 机器学习集成
机器学习服务实现
// ml-service.js
const tf = require('@tensorflow/tfjs-node');
const fs = require('fs').promises;
const path = require('path');class MLService {constructor() {this.models = new Map();this.preprocessors = new Map();this.metrics = new Map();}// 加载预训练模型async loadModel(name, modelPath) {try {const model = await tf.loadLayersModel(`file://${modelPath}`);this.models.set(name, {model,loadedAt: new Date(),predictions: 0});console.log(`Model ${name} loaded successfully`);} catch (error) {console.error(`Failed to load model ${name}:`, error);throw error;}}// 创建简单的线性回归模型createLinearRegressionModel(inputShape) {const model = tf.sequential({layers: [tf.layers.dense({inputShape: [inputShape],units: 64,activation: 'relu'}),tf.layers.dropout({ rate: 0.2 }),tf.layers.dense({units: 32,activation: 'relu'}),tf.layers.dense({units: 1,activation: 'linear'})]});model.compile({optimizer: tf.train.adam(0.001),loss: 'meanSquaredError',metrics: ['mae']});return model;}// 创建分类模型createClassificationModel(inputShape, numClasses) {const model = tf.sequential({layers: [tf.layers.dense({inputShape: [inputShape],units: 128,activation: 'relu'}),tf.layers.dropout({ rate: 0.3 }),tf.layers.dense({units: 64,activation: 'relu'}),tf.layers.dropout({ rate: 0.2 }),tf.layers.dense({units: numClasses,activation: 'softmax'})]});model.compile({optimizer: tf.train.adam(0.001),loss: 'categoricalCrossentropy',metrics: ['accuracy']});return model;}// 训练模型async trainModel(name, trainData, validationData, options = {}) {const model = this.models.get(name)?.model;if (!model) {throw new Error(`Model ${name} not found`);}const {epochs = 100,batchSize = 32,validationSplit = 0.2,callbacks = []} = options;// 添加早停回调const earlyStopping = tf.callbacks.earlyStopping({monitor: 'val_loss',patience: 10,restoreBestWeights: true});// 添加学习率调度const reduceLROnPlateau = tf.callbacks.reduceLROnPlateau({monitor: 'val_loss',factor: 0.5,patience: 5,minLr: 0.0001});const allCallbacks = [earlyStopping, reduceLROnPlateau, ...callbacks];const history = await model.fit(trainData.xs, trainData.ys, {epochs,batchSize,validationData: validationData ? [validationData.xs, validationData.ys] : undefined,validationSplit: validationData ? undefined : validationSplit,callbacks: allCallbacks,verbose: 1});// 保存训练历史this.metrics.set(name, {history: history.history,trainedAt: new Date()});return history;}// 进行预测async predict(modelName, inputData) {const modelInfo = this.models.get(modelName);if (!modelInfo) {throw new Error(`Model ${modelName} not found`);}const { model } = modelInfo;// 预处理输入数据const preprocessedData = await this.preprocessData(modelName, inputData);// 进行预测const prediction = model.predict(preprocessedData);// 更新预测计数modelInfo.predictions++;// 转换为JavaScript数组const result = await prediction.data();// 清理内存prediction.dispose();preprocessedData.dispose();return Array.from(result);}// 批量预测async batchPredict(modelName, inputDataArray) {const results = [];for (const inputData of inputDataArray) {const result = await this.predict(modelName, inputData);results.push(result);}return results;}// 数据预处理async preprocessData(modelName, data) {const preprocessor = this.preprocessors.get(modelName);if (preprocessor) {return preprocessor(data);}// 默认预处理:转换为张量if (Array.isArray(data)) {return tf.tensor2d([data]);} else if (typeof data === 'object') {// 假设是特征对象const features = Object.values(data);return tf.tensor2d([features]);}return tf.tensor2d([[data]]);}// 注册预处理器registerPreprocessor(modelName, preprocessorFn) {this.preprocessors.set(modelName, preprocessorFn);}// 模型评估async evaluateModel(modelName, testData) {const modelInfo = this.models.get(modelName);if (!modelInfo) {throw new Error(`Model ${modelName} not found`);}const { model } = modelInfo;const evaluation = await model.evaluate(testData.xs, testData.ys);const metrics = {};const metricNames = model.metricsNames;for (let i = 0; i < metricNames.length; i++) {metrics[metricNames[i]] = await evaluation[i].data();}// 清理内存evaluation.forEach(tensor => tensor.dispose());return metrics;}// 保存模型async saveModel(modelName, savePath) {const modelInfo = this.models.get(modelName);if (!modelInfo) {throw new Error(`Model ${modelName} not found`);}const { model } = modelInfo;await model.save(`file://${savePath}`);console.log(`Model ${modelName} saved to ${savePath}`);}// 获取模型信息getModelInfo(modelName) {const modelInfo = this.models.get(modelName);if (!modelInfo) {return null;}const { model, loadedAt, predictions } = modelInfo;return {name: modelName,loadedAt,predictions,inputShape: model.inputs[0].shape,outputShape: model.outputs[0].shape,trainableParams: model.countParams(),layers: model.layers.length};}// 获取所有模型统计getAllModelsStats() {const stats = {};this.models.forEach((modelInfo, name) => {stats[name] = this.getModelInfo(name);});return stats;}// 清理模型内存disposeModel(modelName) {const modelInfo = this.models.get(modelName);if (modelInfo) {modelInfo.model.dispose();this.models.delete(modelName);this.preprocessors.delete(modelName);this.metrics.delete(modelName);console.log(`Model ${modelName} disposed`);}}// 清理所有模型disposeAllModels() {this.models.forEach((_, name) => {this.disposeModel(name);});}
}// 特征工程工具
class FeatureEngineering {// 数据标准化static normalize(data) {const tensor = tf.tensor(data);const normalized = tf.div(tf.sub(tensor, tf.mean(tensor, 0)),tf.add(tf.moments(tensor, 0).variance.sqrt(), 1e-7));const result = normalized.arraySync();tensor.dispose();normalized.dispose();return result;}// 最小-最大缩放static minMaxScale(data, min = 0, max = 1) {const tensor = tf.tensor(data);const minVal = tf.min(tensor, 0);const maxVal = tf.max(tensor, 0);const scaled = tf.add(tf.mul(tf.div(tf.sub(tensor, minVal), tf.sub(maxVal, minVal)),max - min),min);const result = scaled.arraySync();tensor.dispose();minVal.dispose();maxVal.dispose();scaled.dispose();return result;}// 独热编码static oneHotEncode(labels, numClasses) {const tensor = tf.tensor1d(labels, 'int32');const oneHot = tf.oneHot(tensor, numClasses);const result = oneHot.arraySync();tensor.dispose();oneHot.dispose();return result;}// 创建时间特征static createTimeFeatures(timestamps) {return timestamps.map(timestamp => {const date = new Date(timestamp);return {hour: date.getHours(),dayOfWeek: date.getDay(),dayOfMonth: date.getDate(),month: date.getMonth(),quarter: Math.floor(date.getMonth() / 3),isWeekend: date.getDay() === 0 || date.getDay() === 6 ? 1 : 0};});}// 多项式特征static polynomialFeatures(data, degree = 2) {const features = [];for (let i = 0; i < data.length; i++) {const row = data[i];const polyRow = [...row];// 添加多项式特征for (let d = 2; d <= degree; d++) {for (let j = 0; j < row.length; j++) {polyRow.push(Math.pow(row[j], d));}}// 添加交互特征if (degree >= 2) {for (let j = 0; j < row.length; j++) {for (let k = j + 1; k < row.length; k++) {polyRow.push(row[j] * row[k]);}}}features.push(polyRow);}return features;}
}module.exports = {MLService,FeatureEngineering
};
结语
感谢您的阅读!期待您的一键三连!欢迎指正!