文章目录
正文
1. Node.js 高级主题概览
Node.js 高级主题涵盖了深入的技术概念和实践,包括事件循环机制、内存管理、性能优化、微服务架构、实时通信等核心领域。掌握这些高级主题对于构建高性能、可扩展的企业级应用至关重要。
1.1 高级主题架构图
2. 事件循环与异步编程深度解析
2.1 事件循环机制详解
事件循环阶段详解
// event-loop-demo.js
const fs = require('fs');
console.log('=== 事件循环演示 ===');
// 1. 同步代码
console.log('1. 同步代码执行');
// 2. process.nextTick (微任务,最高优先级)
process.nextTick(() => {
console.log('2. process.nextTick 回调');
});
// 3. Promise (微任务)
Promise.resolve().then(() => {
console.log('3. Promise.then 回调');
});
// 4. setImmediate (Check 阶段)
setImmediate(() => {
console.log('4. setImmediate 回调');
});
// 5. setTimeout (Timer 阶段)
setTimeout(() => {
console.log('5. setTimeout 回调');
}, 0);
// 6. I/O 操作 (Poll 阶段)
fs.readFile(__filename, () => {
console.log('6. fs.readFile 回调');
// 在 I/O 回调中的 setImmediate 会在下一次 setTimeout 之前执行
setImmediate(() => {
console.log('7. setImmediate 在 I/O 回调中');
});
setTimeout(() => {
console.log('8. setTimeout 在 I/O 回调中');
}, 0);
});
console.log('9. 同步代码结束');
/*
输出顺序:
1. 同步代码执行
9. 同步代码结束
2. process.nextTick 回调
3. Promise.then 回调
5. setTimeout 回调
4. setImmediate 回调
6. fs.readFile 回调
7. setImmediate 在 I/O 回调中
8. setTimeout 在 I/O 回调中
*/
2.2 异步编程模式演进
高级异步模式实现
// advanced-async-patterns.js
// 1. 异步迭代器
class AsyncDataProcessor {
constructor(data) {
this.data = data;
this.index = 0;
}
async *[Symbol.asyncIterator]() {
while (this.index < this.data.length) {
// 模拟异步处理
await new Promise(resolve => setTimeout(resolve, 100));
yield this.data[this.index++];
}
}
}
// 使用异步迭代器
async function processDataAsync() {
const processor = new AsyncDataProcessor([1, 2, 3, 4, 5]);
for await (const item of processor) {
console.log('处理项目:', item);
}
}
// 2. 异步生成器与流控制
async function* asyncGenerator() {
let i = 0;
while (i < 10) {
await new Promise(resolve => setTimeout(resolve, 500));
yield i++;
}
}
// 3. 高级 Promise 模式
class PromisePool {
constructor(concurrency = 3) {
this.concurrency = concurrency;
this.running = [];
this.queue = [];
}
async add(promiseFunction) {
return new Promise((resolve, reject) => {
this.queue.push({
promiseFunction,
resolve,
reject
});
this.process();
});
}
async process() {
if (this.running.length >= this.concurrency || this.queue.length === 0) {
return;
}
const { promiseFunction, resolve, reject } = this.queue.shift();
const promise = promiseFunction()
.then(resolve)
.catch(reject)
.finally(() => {
this.running.splice(this.running.indexOf(promise), 1);
this.process();
});
this.running.push(promise);
}
}
// 4. 可取消的 Promise
class CancellablePromise {
constructor(executor) {
this.isCancelled = false;
this.promise = new Promise((resolve, reject) => {
this.cancel = () => {
this.isCancelled = true;
reject(new Error('Promise was cancelled'));
};
executor(
(value) => {
if (!this.isCancelled) {
resolve(value);
}
},
(reason) => {
if (!this.isCancelled) {
reject(reason);
}
}
);
});
}
then(onFulfilled, onRejected) {
return this.promise.then(onFulfilled, onRejected);
}
catch(onRejected) {
return this.promise.catch(onRejected);
}
}
// 5. 异步重试机制
async function retryAsync(fn, maxRetries = 3, delay = 1000) {
let lastError;
for (let i = 0; i <= maxRetries; i++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (i === maxRetries) {
throw lastError;
}
// 指数退避
const waitTime = delay * Math.pow(2, i);
await new Promise(resolve => setTimeout(resolve, waitTime));
}
}
}
module.exports = {
AsyncDataProcessor,
PromisePool,
CancellablePromise,
retryAsync,
processDataAsync,
asyncGenerator
};
3. 内存管理与性能优化
3.1 V8 内存管理机制
内存监控与分析工具
// memory-profiler.js
const v8 = require('v8');
const fs = require('fs');
class MemoryProfiler {
constructor() {
this.snapshots = [];
this.startTime = Date.now();
}
// 获取内存使用情况
getMemoryUsage() {
const usage = process.memoryUsage();
const heapStats = v8.getHeapStatistics();
return {
timestamp: Date.now(),
rss: usage.rss,
heapTotal: usage.heapTotal,
heapUsed: usage.heapUsed,
external: usage.external,
arrayBuffers: usage.arrayBuffers,
heapSizeLimit: heapStats.heap_size_limit,
totalHeapSize: heapStats.total_heap_size,
usedHeapSize: heapStats.used_heap_size,
mallocedMemory: heapStats.malloced_memory,
peakMallocedMemory: heapStats.peak_malloced_memory
};
}
// 生成堆快照
takeHeapSnapshot(filename) {
const snapshotStream = v8.getHeapSnapshot();
const fileStream = fs.createWriteStream(filename || `heap-${Date.now()}.heapsnapshot`);
snapshotStream.pipe(fileStream);
return new Promise((resolve, reject) => {
fileStream.on('finish', () => {
console.log(`堆快照已保存: ${filename}`);
resolve(filename);
});
fileStream.on('error', reject);
});
}
// 监控内存泄漏
startMemoryLeak Detection() {
const interval = setInterval(() => {
const usage = this.getMemoryUsage();
this.snapshots.push(usage);
// 保留最近100个快照
if (this.snapshots.length > 100) {
this.snapshots.shift();
}
// 检测内存泄漏
if (this.snapshots.length >= 10) {
const recent = this.snapshots.slice(-10);
const trend = this.calculateMemoryTrend(recent);
if (trend.isIncreasing && trend.rate > 1024 * 1024) { // 1MB/snapshot
console.warn('检测到可能的内存泄漏:', {
trend: trend.rate,
currentUsage: usage.heapUsed
});
}
}
}, 5000);
return () => clearInterval(interval);
}
// 计算内存趋势
calculateMemoryTrend(snapshots) {
if (snapshots.length < 2) return { isIncreasing: false, rate: 0 };
const first = snapshots[0].heapUsed;
const last = snapshots[snapshots.length - 1].heapUsed;
const rate = (last - first) / snapshots.length;
return {
isIncreasing: rate > 0,
rate: rate
};
}
// 强制垃圾回收 (需要 --expose-gc 标志)
forceGC() {
if (global.gc) {
const before = this.getMemoryUsage();
global.gc();
const after = this.getMemoryUsage();
console.log('垃圾回收效果:', {
before: before.heapUsed,
after: after.heapUsed,
freed: before.heapUsed - after.heapUsed
});
} else {
console.warn('垃圾回收不可用,请使用 --expose-gc 标志启动');
}
}
// 生成内存报告
generateReport() {
const current = this.getMemoryUsage();
const runtime = Date.now() - this.startTime;
return {
runtime: runtime,
currentMemory: current,
snapshots: this.snapshots.length,
averageHeapUsed: this.snapshots.reduce((sum, s) => sum + s.heapUsed, 0) / this.snapshots.length
};
}
}
module.exports = MemoryProfiler;
3.2 性能优化策略
性能分析工具实现
// performance-analyzer.js
const { performance, PerformanceObserver } = require('perf_hooks');
class PerformanceAnalyzer {
constructor() {
this.metrics = new Map();
this.observers = [];
this.setupObservers();
}
// 设置性能观察器
setupObservers() {
// HTTP 请求性能观察
const httpObserver = new PerformanceObserver((list) => {
for (const entry of list.getEntries()) {
this.recordMetric('http', {
name: entry.name,
duration: entry.duration,
startTime: entry.startTime
});
}
});
httpObserver.observe({ entryTypes: ['http'] });
this.observers.push(httpObserver);
// 函数性能观察
const functionObserver = new PerformanceObserver((list) => {
for (const entry of list.getEntries()) {
this.recordMetric('function', {
name: entry.name,
duration: entry.duration,
startTime: entry.startTime
});
}
});
functionObserver.observe({ entryTypes: ['function', 'measure'] });
this.observers.push(functionObserver);
}
// 记录指标
recordMetric(type, data) {
if (!this.metrics.has(type)) {
this.metrics.set(type, []);
}
this.metrics.get(type).push({
...data,
timestamp: Date.now()
});
}
// 性能装饰器
performanceDecorator(target, propertyKey, descriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function(...args) {
const start = performance.now();
const markStart = `${propertyKey}-start`;
const markEnd = `${propertyKey}-end`;
const measureName = `${propertyKey}-duration`;
performance.mark(markStart);
try {
const result = await originalMethod.apply(this, args);
return result;
} finally {
performance.mark(markEnd);
performance.measure(measureName, markStart, markEnd);
const end = performance.now();
console.log(`${propertyKey} 执行时间: ${(end - start).toFixed(2)}ms`);
}
};
return descriptor;
}
// 函数性能测试
async measureFunction(fn, iterations = 1000) {
const results = [];
for (let i = 0; i < iterations; i++) {
const start = performance.now();
await fn();
const end = performance.now();
results.push(end - start);
}
return {
iterations,
min: Math.min(...results),
max: Math.max(...results),
average: results.reduce((a, b) => a + b, 0) / results.length,
median: results.sort((a, b) => a - b)[Math.floor(results.length / 2)],
p95: results.sort((a, b) => a - b)[Math.floor(results.length * 0.95)],
p99: results.sort((a, b) => a - b)[Math.floor(results.length * 0.99)]
};
}
// CPU 使用率监控
monitorCPU() {
const startUsage = process.cpuUsage();
const startTime = process.hrtime();
return () => {
const endUsage = process.cpuUsage(startUsage);
const endTime = process.hrtime(startTime);
const totalTime = endTime[0] * 1000000 + endTime[1] / 1000; // 微秒
const cpuPercent = (endUsage.user + endUsage.system) / totalTime * 100;
return {
user: endUsage.user,
system: endUsage.system,
total: endUsage.user + endUsage.system,
percentage: cpuPercent
};
};
}
// 生成性能报告
generateReport() {
const report = {
timestamp: new Date().toISOString(),
metrics: {}
};
for (const [type, data] of this.metrics) {
const durations = data.map(d => d.duration).filter(d => d !== undefined);
if (durations.length > 0) {
report.metrics[type] = {
count: data.length,
averageDuration: durations.reduce((a, b) => a + b, 0) / durations.length,
minDuration: Math.min(...durations),
maxDuration: Math.max(...durations),
recentEntries: data.slice(-10)
};
}
}
return report;
}
// 清理资源
cleanup() {
this.observers.forEach(observer => observer.disconnect());
this.metrics.clear();
}
}
// 使用示例
const analyzer = new PerformanceAnalyzer();
// 装饰器使用示例
class DatabaseService {
@analyzer.performanceDecorator
async queryUsers() {
// 模拟数据库查询
await new Promise(resolve => setTimeout(resolve, 100));
return [];
}
}
module.exports = PerformanceAnalyzer;
4. 微服务架构与设计模式
4.1 微服务架构模式
微服务基础框架实现
// microservice-framework.js
const express = require('express');
const { EventEmitter } = require('events');
const axios = require('axios');
class MicroService extends EventEmitter {
constructor(config) {
super();
this.config = {
name: 'unnamed-service',
port: 3000,
version: '1.0.0',
...config
};
this.app = express();
this.services = new Map();
this.middlewares = [];
this.routes = new Map();
this.healthChecks = [];
this.setupDefaultMiddleware();
this.setupDefaultRoutes();
}
// 设置默认中间件
setupDefaultMiddleware() {
this.app.use(express.json());
this.app.use(express.urlencoded({ extended: true }));
// 请求追踪中间件
this.app.use((req, res, next) => {
req.traceId = this.generateTraceId();
req.startTime = Date.now();
res.on('finish', () => {
const duration = Date.now() - req.startTime;
this.emit('request', {
traceId: req.traceId,
method: req.method,
path: req.path,
statusCode: res.statusCode,
duration
});
});
next();
});
}
// 设置默认路由
setupDefaultRoutes() {
// 健康检查
this.app.get('/health', async (req, res) => {
const health = await this.performHealthCheck();
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
});
// 服务信息
this.app.get('/info', (req, res) => {
res.json({
name: this.config.name,
version: this.config.version,
uptime: process.uptime(),
memory: process.memoryUsage(),
pid: process.pid
});
});
// 指标端点
this.app.get('/metrics', (req, res) => {
res.json(this.getMetrics());
});
}
// 注册服务
registerService(name, config) {
this.services.set(name, {
name,
url: config.url,
timeout: config.timeout || 5000,
retries: config.retries || 3,
circuitBreaker: new CircuitBreaker(config.circuitBreaker)
});
}
// 调用其他服务
async callService(serviceName, path, options = {}) {
const service = this.services.get(serviceName);
if (!service) {
throw new Error(`Service ${serviceName} not registered`);
}
const url = `${service.url}${path}`;
const config = {
timeout: service.timeout,
headers: {
'X-Trace-ID': options.traceId || this.generateTraceId(),
'X-Service-Name': this.config.name,
...options.headers
},
...options
};
try {
return await service.circuitBreaker.execute(() =>
axios(url, config)
);
} catch (error) {
this.emit('service-call-error', {
service: serviceName,
url,
error: error.message
});
throw error;
}
}
// 添加中间件
use(middleware) {
this.middlewares.push(middleware);
this.app.use(middleware);
}
// 添加路由
route(method, path, handler) {
const routeKey = `${method.toUpperCase()} ${path}`;
this.routes.set(routeKey, handler);
this.app[method.toLowerCase()](path, handler);
}
// 添加健康检查
addHealthCheck(name, checkFunction) {
this.healthChecks.push({ name, check: checkFunction });
}
// 执行健康检查
async performHealthCheck() {
const results = await Promise.allSettled(
this.healthChecks.map(async ({ name, check }) => {
try {
const result = await check();
return { name, status: 'healthy', ...result };
} catch (error) {
return { name, status: 'unhealthy', error: error.message };
}
})
);
const checks = results.map(result => result.value || result.reason);
const allHealthy = checks.every(check => check.status === 'healthy');
return {
status: allHealthy ? 'healthy' : 'unhealthy',
timestamp: new Date().toISOString(),
checks
};
}
// 获取指标
getMetrics() {
return {
service: this.config.name,
version: this.config.version,
uptime: process.uptime(),
memory: process.memoryUsage(),
cpu: process.cpuUsage(),
routes: Array.from(this.routes.keys()),
services: Array.from(this.services.keys())
};
}
// 生成追踪ID
generateTraceId() {
return Math.random().toString(36).substr(2, 9);
}
// 启动服务
async start() {
return new Promise((resolve, reject) => {
this.server = this.app.listen(this.config.port, (error) => {
if (error) {
reject(error);
} else {
console.log(`${this.config.name} started on port ${this.config.port}`);
this.emit('started');
resolve();
}
});
});
}
// 停止服务
async stop() {
return new Promise((resolve) => {
if (this.server) {
this.server.close(() => {
console.log(`${this.config.name} stopped`);
this.emit('stopped');
resolve();
});
} else {
resolve();
}
});
}
}
// 熔断器实现
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.timeout = options.timeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.lastFailureTime = null;
this.successCount = 0;
}
async execute(operation) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime >= this.timeout) {
this.state = 'HALF_OPEN';
this.successCount = 0;
} else {
throw new Error('Circuit breaker is OPEN');
}
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) {
this.state = 'CLOSED';
}
}
}
onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
}
}
}
module.exports = { MicroService, CircuitBreaker };
4.2 设计模式在 Node.js 中的应用
高级设计模式实现
// design-patterns.js
// 1. 单例模式 - 数据库连接管理
class DatabaseManager {
constructor() {
if (DatabaseManager.instance) {
return DatabaseManager.instance;
}
this.connections = new Map();
DatabaseManager.instance = this;
}
async getConnection(config) {
const key = `${config.host}:${config.port}/${config.database}`;
if (!this.connections.has(key)) {
const connection = await this.createConnection(config);
this.connections.set(key, connection);
}
return this.connections.get(key);
}
async createConnection(config) {
// 模拟数据库连接创建
return {
host: config.host,
port: config.port,
database: config.database,
connected: true
};
}
}
// 2. 工厂模式 - 日志记录器工厂
class LoggerFactory {
static createLogger(type, config) {
switch (type) {
case 'console':
return new ConsoleLogger(config);
case 'file':
return new FileLogger(config);
case 'database':
return new DatabaseLogger(config);
default:
throw new Error(`Unknown logger type: ${type}`);
}
}
}
class ConsoleLogger {
constructor(config) {
this.level = config.level || 'info';
}
log(level, message) {
if (this.shouldLog(level)) {
console.log(`[${level.toUpperCase()}] ${message}`);
}
}
shouldLog(level) {
const levels = ['debug', 'info', 'warn', 'error'];
return levels.indexOf(level) >= levels.indexOf(this.level);
}
}
// 3. 装饰器模式 - 缓存装饰器
function cacheDecorator(ttl = 300000) {
const cache = new Map();
return function(target, propertyKey, descriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function(...args) {
const cacheKey = `${propertyKey}:${JSON.stringify(args)}`;
const cached = cache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < ttl) {
return cached.value;
}
const result = await originalMethod.apply(this, args);
cache.set(cacheKey, {
value: result,
timestamp: Date.now()
});
return result;
};
return descriptor;
};
}
// 4. 策略模式 - 支付处理策略
class PaymentProcessor {
constructor() {
this.strategies = new Map();
}
addStrategy(name, strategy) {
this.strategies.set(name, strategy);
}
async processPayment(method, amount, details) {
const strategy = this.strategies.get(method);
if (!strategy) {
throw new Error(`Payment method ${method} not supported`);
}
return await strategy.process(amount, details);
}
}
class CreditCardStrategy {
async process(amount, details) {
// 信用卡支付逻辑
return {
success: true,
transactionId: `cc_${Date.now()}`,
amount,
method: 'credit_card'
};
}
}
class PayPalStrategy {
async process(amount, details) {
// PayPal 支付逻辑
return {
success: true,
transactionId: `pp_${Date.now()}`,
amount,
method: 'paypal'
};
}
}
// 5. 观察者模式 - 事件系统
class EventManager {
constructor() {
this.listeners = new Map();
}
subscribe(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
// 返回取消订阅函数
return () => {
const callbacks = this.listeners.get(event);
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
};
}
async publish(event, data) {
const callbacks = this.listeners.get(event) || [];
// 并行执行所有回调
await Promise.allSettled(
callbacks.map(callback => callback(data))
);
}
}
// 6. 命令模式 - 任务队列
class Command {
constructor(execute, undo) {
this.execute = execute;
this.undo = undo;
}
}
class TaskQueue {
constructor() {
this.commands = [];
this.currentIndex = -1;
}
execute(command) {
// 移除当前位置之后的命令
this.commands = this.commands.slice(0, this.currentIndex + 1);
// 添加新命令
this.commands.push(command);
this.currentIndex++;
// 执行命令
return command.execute();
}
undo() {
if (this.currentIndex >= 0) {
const command = this.commands[this.currentIndex];
this.currentIndex--;
return command.undo();
}
}
redo() {
if (this.currentIndex < this.commands.length - 1) {
this.currentIndex++;
const command = this.commands[this.currentIndex];
return command.execute();
}
}
}
// 7. 中介者模式 - 聊天室
class ChatRoom {
constructor() {
this.users = new Map();
this.rooms = new Map();
}
addUser(user) {
this.users.set(user.id, user);
user.setChatRoom(this);
}
removeUser(userId) {
this.users.delete(userId);
}
sendMessage(fromUserId, toUserId, message) {
const toUser = this.users.get(toUserId);
if (toUser) {
toUser.receiveMessage(fromUserId, message);
}
}
broadcast(fromUserId, message, roomId) {
const room = this.rooms.get(roomId);
if (room) {
room.members.forEach(userId => {
if (userId !== fromUserId) {
this.sendMessage(fromUserId, userId, message);
}
});
}
}
}
class User {
constructor(id, name) {
this.id = id;
this.name = name;
this.chatRoom = null;
}
setChatRoom(chatRoom) {
this.chatRoom = chatRoom;
}
sendMessage(toUserId, message) {
if (this.chatRoom) {
this.chatRoom.sendMessage(this.id, toUserId, message);
}
}
receiveMessage(fromUserId, message) {
console.log(`${this.name} received message from ${fromUserId}: ${message}`);
}
}
module.exports = {
DatabaseManager,
LoggerFactory,
cacheDecorator,
PaymentProcessor,
CreditCardStrategy,
PayPalStrategy,
EventManager,
Command,
TaskQueue,
ChatRoom,
User
};
5. 实时通信与 WebSocket
5.1 实时通信架构
WebSocket 服务器实现
// websocket-server.js
const WebSocket = require('ws');
const { EventEmitter } = require('events');
const jwt = require('jsonwebtoken');
class WebSocketServer extends EventEmitter {
constructor(options = {}) {
super();
this.options = {
port: 8080,
verifyClient: null,
...options
};
this.clients = new Map();
this.rooms = new Map();
this.messageHandlers = new Map();
this.setupServer();
this.setupMessageHandlers();
}
setupServer() {
this.wss = new WebSocket.Server({
port: this.options.port,
verifyClient: this.verifyClient.bind(this)
});
this.wss.on('connection', this.handleConnection.bind(this));
console.log(`WebSocket server started on port ${this.options.port}`);
}
verifyClient(info) {
if (this.options.verifyClient) {
return this.options.verifyClient(info);
}
// 默认验证逻辑
const token = this.extractToken(info.req);
if (!token) {
return false;
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
info.req.user = decoded;
return true;
} catch (error) {
return false;
}
}
extractToken(req) {
const url = new URL(req.url, `http://${req.headers.host}`);
return url.searchParams.get('token');
}
handleConnection(ws, req) {
const clientId = this.generateClientId();
const user = req.user;
const client = {
id: clientId,
ws,
user,
rooms: new Set(),
lastPing: Date.now(),
metadata: {}
};
this.clients.set(clientId, client);
ws.on('message', (data) => this.handleMessage(client, data));
ws.on('close', () => this.handleDisconnection(client));
ws.on('error', (error) => this.handleError(client, error));
ws.on('pong', () => this.handlePong(client));
// 发送连接确认
this.sendToClient(client, {
type: 'connection',
clientId,
timestamp: Date.now()
});
this.emit('connection', client);
}
handleMessage(client, data) {
try {
const message = JSON.parse(data);
const handler = this.messageHandlers.get(message.type);
if (handler) {
handler(client, message);
} else {
this.sendError(client, `Unknown message type: ${message.type}`);
}
} catch (error) {
this.sendError(client, 'Invalid message format');
}
}
setupMessageHandlers() {
// 加入房间
this.messageHandlers.set('join_room', (client, message) => {
const { roomId } = message;
this.joinRoom(client, roomId);
});
// 离开房间
this.messageHandlers.set('leave_room', (client, message) => {
const { roomId } = message;
this.leaveRoom(client, roomId);
});
// 发送消息到房间
this.messageHandlers.set('room_message', (client, message) => {
const { roomId, content } = message;
this.sendToRoom(roomId, {
type: 'room_message',
from: client.id,
content,
timestamp: Date.now()
}, client.id);
});
// 私聊消息
this.messageHandlers.set('private_message', (client, message) => {
const { targetId, content } = message;
const targetClient = this.clients.get(targetId);
if (targetClient) {
this.sendToClient(targetClient, {
type: 'private_message',
from: client.id,
content,
timestamp: Date.now()
});
} else {
this.sendError(client, 'Target client not found');
}
});
// 心跳
this.messageHandlers.set('ping', (client, message) => {
client.lastPing = Date.now();
this.sendToClient(client, { type: 'pong' });
});
}
joinRoom(client, roomId) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, {
id: roomId,
clients: new Set(),
metadata: {}
});
}
const room = this.rooms.get(roomId);
room.clients.add(client.id);
client.rooms.add(roomId);
// 通知房间内其他用户
this.sendToRoom(roomId, {
type: 'user_joined',
userId: client.id,
roomId,
timestamp: Date.now()
}, client.id);
// 发送房间信息给新用户
this.sendToClient(client, {
type: 'room_joined',
roomId,
users: Array.from(room.clients),
timestamp: Date.now()
});
}
leaveRoom(client, roomId) {
const room = this.rooms.get(roomId);
if (room) {
room.clients.delete(client.id);
client.rooms.delete(roomId);
// 如果房间为空,删除房间
if (room.clients.size === 0) {
this.rooms.delete(roomId);
} else {
// 通知房间内其他用户
this.sendToRoom(roomId, {
type: 'user_left',
userId: client.id,
roomId,
timestamp: Date.now()
});
}
}
}
sendToClient(client, message) {
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(message));
}
}
sendToRoom(roomId, message, excludeClientId = null) {
const room = this.rooms.get(roomId);
if (room) {
room.clients.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client) {
this.sendToClient(client, message);
}
}
});
}
}
broadcast(message, excludeClientId = null) {
this.clients.forEach((client, clientId) => {
if (clientId !== excludeClientId) {
this.sendToClient(client, message);
}
});
}
sendError(client, error) {
this.sendToClient(client, {
type: 'error',
message: error,
timestamp: Date.now()
});
}
handleDisconnection(client) {
// 从所有房间中移除客户端
client.rooms.forEach(roomId => {
this.leaveRoom(client, roomId);
});
this.clients.delete(client.id);
this.emit('disconnection', client);
}
handleError(client, error) {
console.error(`WebSocket error for client ${client.id}:`, error);
this.emit('error', { client, error });
}
handlePong(client) {
client.lastPing = Date.now();
}
generateClientId() {
return Math.random().toString(36).substr(2, 9);
}
// 健康检查 - 清理断开的连接
startHealthCheck() {
setInterval(() => {
const now = Date.now();
const timeout = 30000; // 30秒超时
this.clients.forEach((client, clientId) => {
if (now - client.lastPing > timeout) {
console.log(`Client ${clientId} timed out`);
client.ws.terminate();
this.handleDisconnection(client);
}
});
}, 10000); // 每10秒检查一次
}
// 获取统计信息
getStats() {
return {
totalClients: this.clients.size,
totalRooms: this.rooms.size,
roomDetails: Array.from(this.rooms.values()).map(room => ({
id: room.id,
clientCount: room.clients.size
}))
};
}
}
module.exports = WebSocketServer;
5.2 实时数据同步系统
实时协作编辑器实现
// collaborative-editor.js
const { EventEmitter } = require('events');
// 操作转换算法实现
class OperationalTransform {
static transform(op1, op2) {
// 简化的操作转换实现
if (op1.type === 'insert' && op2.type === 'insert') {
if (op1.position <= op2.position) {
return [op1, { ...op2, position: op2.position + op1.content.length }];
} else {
return [{ ...op1, position: op1.position + op2.content.length }, op2];
}
}
if (op1.type === 'delete' && op2.type === 'delete') {
if (op1.position + op1.length <= op2.position) {
return [op1, { ...op2, position: op2.position - op1.length }];
} else if (op2.position + op2.length <= op1.position) {
return [{ ...op1, position: op1.position - op2.length }, op2];
} else {
// 重叠删除,需要复杂处理
return this.handleOverlappingDeletes(op1, op2);
}
}
if (op1.type === 'insert' && op2.type === 'delete') {
if (op1.position <= op2.position) {
return [op1, { ...op2, position: op2.position + op1.content.length }];
} else if (op1.position >= op2.position + op2.length) {
return [{ ...op1, position: op1.position - op2.length }, op2];
} else {
return [{ ...op1, position: op2.position }, op2];
}
}
if (op1.type === 'delete' && op2.type === 'insert') {
const [transformedOp2, transformedOp1] = this.transform(op2, op1);
return [transformedOp1, transformedOp2];
}
return [op1, op2];
}
static handleOverlappingDeletes(op1, op2) {
// 处理重叠删除的复杂逻辑
const start1 = op1.position;
const end1 = op1.position + op1.length;
const start2 = op2.position;
const end2 = op2.position + op2.length;
const newStart = Math.min(start1, start2);
const newEnd = Math.max(end1, end2);
return [
{ type: 'delete', position: newStart, length: newEnd - newStart },
{ type: 'noop' }
];
}
}
// 文档状态管理
class DocumentState {
constructor(initialContent = '') {
this.content = initialContent;
this.version = 0;
this.operations = [];
}
applyOperation(operation) {
switch (operation.type) {
case 'insert':
this.content =
this.content.slice(0, operation.position) +
operation.content +
this.content.slice(operation.position);
break;
case 'delete':
this.content =
this.content.slice(0, operation.position) +
this.content.slice(operation.position + operation.length);
break;
case 'replace':
this.content =
this.content.slice(0, operation.position) +
operation.content +
this.content.slice(operation.position + operation.length);
break;
}
this.version++;
this.operations.push({ ...operation, version: this.version });
}
getOperationsSince(version) {
return this.operations.filter(op => op.version > version);
}
}
// 协作编辑器服务器
class CollaborativeEditor extends EventEmitter {
constructor() {
super();
this.documents = new Map();
this.clients = new Map();
this.clientDocuments = new Map();
}
createDocument(documentId, initialContent = '') {
if (!this.documents.has(documentId)) {
this.documents.set(documentId, new DocumentState(initialContent));
}
return this.documents.get(documentId);
}
joinDocument(clientId, documentId) {
const document = this.createDocument(documentId);
if (!this.clientDocuments.has(documentId)) {
this.clientDocuments.set(documentId, new Set());
}
this.clientDocuments.get(documentId).add(clientId);
// 发送当前文档状态给客户端
this.sendToClient(clientId, {
type: 'document_state',
documentId,
content: document.content,
version: document.version
});
// 通知其他客户端有新用户加入
this.broadcastToDocument(documentId, {
type: 'user_joined',
clientId,
documentId
}, clientId);
}
leaveDocument(clientId, documentId) {
const clients = this.clientDocuments.get(documentId);
if (clients) {
clients.delete(clientId);
if (clients.size === 0) {
this.clientDocuments.delete(documentId);
} else {
this.broadcastToDocument(documentId, {
type: 'user_left',
clientId,
documentId
});
}
}
}
handleOperation(clientId, documentId, operation) {
const document = this.documents.get(documentId);
if (!document) {
return;
}
// 获取客户端版本之后的所有操作
const serverOps = document.getOperationsSince(operation.baseVersion);
// 对操作进行转换
let transformedOp = operation;
for (const serverOp of serverOps) {
[transformedOp] = OperationalTransform.transform(transformedOp, serverOp);
}
// 应用转换后的操作
document.applyOperation(transformedOp);
// 广播操作给其他客户端
this.broadcastToDocument(documentId, {
type: 'operation',
operation: { ...transformedOp, version: document.version },
documentId
}, clientId);
// 确认操作给发送者
this.sendToClient(clientId, {
type: 'operation_ack',
operationId: operation.id,
version: document.version
});
}
sendToClient(clientId, message) {
const client = this.clients.get(clientId);
if (client && client.send) {
client.send(JSON.stringify(message));
}
}
broadcastToDocument(documentId, message, excludeClientId = null) {
const clients = this.clientDocuments.get(documentId);
if (clients) {
clients.forEach(clientId => {
if (clientId !== excludeClientId) {
this.sendToClient(clientId, message);
}
});
}
}
addClient(clientId, connection) {
this.clients.set(clientId, connection);
}
removeClient(clientId) {
// 从所有文档中移除客户端
this.clientDocuments.forEach((clients, documentId) => {
if (clients.has(clientId)) {
this.leaveDocument(clientId, documentId);
}
});
this.clients.delete(clientId);
}
getDocumentStats(documentId) {
const document = this.documents.get(documentId);
const clients = this.clientDocuments.get(documentId);
return {
documentId,
version: document?.version || 0,
contentLength: document?.content?.length || 0,
activeClients: clients?.size || 0,
operationCount: document?.operations?.length || 0
};
}
}
// 客户端操作队列
class ClientOperationQueue {
constructor(sendOperation) {
this.sendOperation = sendOperation;
this.pendingOperations = [];
this.acknowledgedVersion = 0;
this.localVersion = 0;
}
addOperation(operation) {
operation.id = this.generateOperationId();
operation.baseVersion = this.acknowledgedVersion;
this.pendingOperations.push(operation);
this.sendOperation(operation);
}
handleAcknowledgment(operationId, serverVersion) {
const index = this.pendingOperations.findIndex(op => op.id === operationId);
if (index !== -1) {
this.pendingOperations.splice(0, index + 1);
this.acknowledgedVersion = serverVersion;
}
}
handleServerOperation(serverOperation) {
// 转换待处理的操作
this.pendingOperations = this.pendingOperations.map(pendingOp => {
const [transformed] = OperationalTransform.transform(pendingOp, serverOperation);
return transformed;
});
}
generateOperationId() {
return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
module.exports = {
OperationalTransform,
DocumentState,
CollaborativeEditor,
ClientOperationQueue
};
6. 安全与认证系统
6.1 认证与授权架构
高级认证系统实现
// advanced-auth-system.js
const crypto = require('crypto');
const jwt = require('jsonwebtoken');
const bcrypt = require('bcrypt');
const speakeasy = require('speakeasy');
const QRCode = require('qrcode');
class AdvancedAuthSystem {
constructor(config) {
this.config = {
jwtSecret: process.env.JWT_SECRET,
jwtExpiry: '15m',
refreshTokenExpiry: '7d',
bcryptRounds: 12,
maxLoginAttempts: 5,
lockoutDuration: 15 * 60 * 1000, // 15分钟
...config
};
this.users = new Map();
this.refreshTokens = new Map();
this.loginAttempts = new Map();
this.sessions = new Map();
}
// 用户注册
async register(userData) {
const { username, email, password, phone } = userData;
// 验证用户是否已存在
if (this.findUserByEmail(email) || this.findUserByUsername(username)) {
throw new Error('User already exists');
}
// 密码强度验证
this.validatePasswordStrength(password);
// 加密密码
const hashedPassword = await bcrypt.hash(password, this.config.bcryptRounds);
// 生成用户ID
const userId = this.generateUserId();
// 创建用户
const user = {
id: userId,
username,
email,
phone,
password: hashedPassword,
roles: ['user'],
permissions: [],
isActive: true,
emailVerified: false,
phoneVerified: false,
twoFactorEnabled: false,
twoFactorSecret: null,
createdAt: new Date(),
lastLogin: null,
loginAttempts: 0,
lockedUntil: null
};
this.users.set(userId, user);
// 发送验证邮件
await this.sendVerificationEmail(user);
return {
id: user.id,
username: user.username,
email: user.email,
message: 'Registration successful. Please verify your email.'
};
}
// 用户登录
async login(credentials) {
const { email, password, twoFactorCode } = credentials;
const user = this.findUserByEmail(email);
if (!user) {
throw new Error('Invalid credentials');
}
// 检查账户锁定
if (this.isAccountLocked(user)) {
throw new Error('Account is locked due to too many failed attempts');
}
// 验证密码
const isPasswordValid = await bcrypt.compare(password, user.password);
if (!isPasswordValid) {
await this.handleFailedLogin(user);
throw new Error('Invalid credentials');
}
// 验证两因素认证
if (user.twoFactorEnabled) {
if (!twoFactorCode) {
throw new Error('Two-factor authentication code required');
}
const isValidCode = speakeasy.totp.verify({
secret: user.twoFactorSecret,
encoding: 'base32',
token: twoFactorCode,
window: 2
});
if (!isValidCode) {
throw new Error('Invalid two-factor authentication code');
}
}
// 重置登录尝试
user.loginAttempts = 0;
user.lockedUntil = null;
user.lastLogin = new Date();
// 生成令牌
const tokens = await this.generateTokens(user);
// 创建会话
const sessionId = this.createSession(user, tokens);
return {
user: this.sanitizeUser(user),
tokens,
sessionId
};
}
// 生成令牌
async generateTokens(user) {
const payload = {
userId: user.id,
username: user.username,
email: user.email,
roles: user.roles,
permissions: user.permissions
};
const accessToken = jwt.sign(payload, this.config.jwtSecret, {
expiresIn: this.config.jwtExpiry,
issuer: 'auth-service',
audience: 'api-service'
});
const refreshToken = this.generateRefreshToken();
// 存储刷新令牌
this.refreshTokens.set(refreshToken, {
userId: user.id,
createdAt: new Date(),
expiresAt: new Date(Date.now() + this.parseExpiry(this.config.refreshTokenExpiry))
});
return {
accessToken,
refreshToken,
expiresIn: this.parseExpiry(this.config.jwtExpiry)
};
}
// 刷新令牌
async refreshToken(refreshToken) {
const tokenData = this.refreshTokens.get(refreshToken);
if (!tokenData || tokenData.expiresAt < new Date()) {
throw new Error('Invalid or expired refresh token');
}
const user = this.users.get(tokenData.userId);
if (!user || !user.isActive) {
throw new Error('User not found or inactive');
}
// 删除旧的刷新令牌
this.refreshTokens.delete(refreshToken);
// 生成新令牌
return await this.generateTokens(user);
}
// 启用两因素认证
async enableTwoFactor(userId) {
const user = this.users.get(userId);
if (!user) {
throw new Error('User not found');
}
const secret = speakeasy.generateSecret({
name: `MyApp (${user.email})`,
issuer: 'MyApp'
});
user.twoFactorSecret = secret.base32;
const qrCodeUrl = await QRCode.toDataURL(secret.otpauth_url);
return {
secret: secret.base32,
qrCode: qrCodeUrl,
backupCodes: this.generateBackupCodes()
};
}
// 验证两因素认证设置
async verifyTwoFactorSetup(userId, token) {
const user = this.users.get(userId);
if (!user || !user.twoFactorSecret) {
throw new Error('Two-factor setup not initiated');
}
const isValid = speakeasy.totp.verify({
secret: user.twoFactorSecret,
encoding: 'base32',
token,
window: 2
});
if (isValid) {
user.twoFactorEnabled = true;
return { success: true, message: 'Two-factor authentication enabled' };
} else {
throw new Error('Invalid verification code');
}
}
// 权限检查中间件
requirePermission(permission) {
return (req, res, next) => {
const user = req.user;
if (!user) {
return res.status(401).json({ error: 'Authentication required' });
}
if (this.hasPermission(user, permission)) {
next();
} else {
res.status(403).json({ error: 'Insufficient permissions' });
}
};
}
// 角色检查中间件
requireRole(role) {
return (req, res, next) => {
const user = req.user;
if (!user) {
return res.status(401).json({ error: 'Authentication required' });
}
if (user.roles.includes(role) || user.roles.includes('admin')) {
next();
} else {
res.status(403).json({ error: 'Insufficient role' });
}
};
}
// 检查用户权限
hasPermission(user, permission) {
// 管理员拥有所有权限
if (user.roles.includes('admin')) {
return true;
}
// 检查直接权限
if (user.permissions.includes(permission)) {
return true;
}
// 检查角色权限
return user.roles.some(role => {
const rolePermissions = this.getRolePermissions(role);
return rolePermissions.includes(permission);
});
}
// 获取角色权限
getRolePermissions(role) {
const rolePermissions = {
user: ['read:profile', 'update:profile'],
moderator: ['read:profile', 'update:profile', 'moderate:content'],
admin: ['*'] // 所有权限
};
return rolePermissions[role] || [];
}
// 密码强度验证
validatePasswordStrength(password) {
const minLength = 8;
const hasUpperCase = /[A-Z]/.test(password);
const hasLowerCase = /[a-z]/.test(password);
const hasNumbers = /\d/.test(password);
const hasSpecialChar = /[!@#$%^&*(),.?":{}|<>]/.test(password);
if (password.length < minLength) {
throw new Error('Password must be at least 8 characters long');
}
if (!hasUpperCase || !hasLowerCase || !hasNumbers || !hasSpecialChar) {
throw new Error('Password must contain uppercase, lowercase, numbers, and special characters');
}
}
// 处理登录失败
async handleFailedLogin(user) {
user.loginAttempts = (user.loginAttempts || 0) + 1;
if (user.loginAttempts >= this.config.maxLoginAttempts) {
user.lockedUntil = new Date(Date.now() + this.config.lockoutDuration);
}
}
// 检查账户是否锁定
isAccountLocked(user) {
return user.lockedUntil && user.lockedUntil > new Date();
}
// 创建会话
createSession(user, tokens) {
const sessionId = this.generateSessionId();
this.sessions.set(sessionId, {
userId: user.id,
tokens,
createdAt: new Date(),
lastActivity: new Date(),
ipAddress: null,
userAgent: null
});
return sessionId;
}
// 工具方法
generateUserId() {
return crypto.randomBytes(16).toString('hex');
}
generateRefreshToken() {
return crypto.randomBytes(32).toString('hex');
}
generateSessionId() {
return crypto.randomBytes(24).toString('hex');
}
generateBackupCodes() {
return Array.from({ length: 10 }, () =>
crypto.randomBytes(4).toString('hex').toUpperCase()
);
}
findUserByEmail(email) {
return Array.from(this.users.values()).find(user => user.email === email);
}
findUserByUsername(username) {
return Array.from(this.users.values()).find(user => user.username === username);
}
sanitizeUser(user) {
const { password, twoFactorSecret, ...sanitized } = user;
return sanitized;
}
parseExpiry(expiry) {
const units = { s: 1000, m: 60000, h: 3600000, d: 86400000 };
const match = expiry.match(/^(\d+)([smhd])$/);
if (match) {
return parseInt(match[1]) * units[match[2]];
}
return 900000; // 默认15分钟
}
async sendVerificationEmail(user) {
// 实现邮件发送逻辑
console.log(`Verification email sent to ${user.email}`);
}
}
module.exports = AdvancedAuthSystem;
7. 数据处理与分析
7.1 大数据处理架构
流数据处理系统
// stream-processor.js
const { Transform, Writable, pipeline } = require('stream');
const { EventEmitter } = require('events');
class StreamProcessor extends EventEmitter {
constructor(options = {}) {
super();
this.options = {
batchSize: 1000,
flushInterval: 5000,
maxMemory: 100 * 1024 * 1024, // 100MB
...options
};
this.processors = new Map();
this.metrics = {
processed: 0,
errors: 0,
startTime: Date.now()
};
}
// 创建数据转换流
createTransformStream(name, transformFn) {
const transform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const result = transformFn(chunk);
if (result !== null && result !== undefined) {
this.push(result);
}
callback();
} catch (error) {
callback(error);
}
}
});
this.processors.set(name, transform);
return transform;
}
// 创建批处理流
createBatchStream(batchSize = this.options.batchSize) {
let batch = [];
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
batch.push(chunk);
if (batch.length >= batchSize) {
this.push([...batch]);
batch = [];
}
callback();
},
flush(callback) {
if (batch.length > 0) {
this.push(batch);
}
callback();
}
});
}
// 创建过滤流
createFilterStream(filterFn) {
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
if (filterFn(chunk)) {
this.push(chunk);
}
callback();
} catch (error) {
callback(error);
}
}
});
}
// 创建聚合流
createAggregateStream(keyFn, aggregateFn, windowSize = 60000) {
const windows = new Map();
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const key = keyFn(chunk);
const now = Date.now();
const windowStart = Math.floor(now / windowSize) * windowSize;
const windowKey = `${key}:${windowStart}`;
if (!windows.has(windowKey)) {
windows.set(windowKey, {
key,
windowStart,
windowEnd: windowStart + windowSize,
data: []
});
}
const window = windows.get(windowKey);
window.data.push(chunk);
// 检查是否需要输出完成的窗口
const completedWindows = Array.from(windows.entries())
.filter(([_, window]) => window.windowEnd <= now)
.map(([windowKey, window]) => {
windows.delete(windowKey);
return {
key: window.key,
windowStart: window.windowStart,
windowEnd: window.windowEnd,
result: aggregateFn(window.data)
};
});
completedWindows.forEach(result => this.push(result));
callback();
}
});
}
// 创建输出流
createOutputSink(outputFn) {
return new Writable({
objectMode: true,
write(chunk, encoding, callback) {
try {
outputFn(chunk);
callback();
} catch (error) {
callback(error);
}
}
});
}
// 创建处理管道
createPipeline(streams) {
return new Promise((resolve, reject) => {
pipeline(...streams, (error) => {
if (error) {
this.metrics.errors++;
this.emit('error', error);
reject(error);
} else {
this.emit('complete');
resolve();
}
});
});
}
// 实时数据分析示例
createRealTimeAnalyzer() {
// 数据清洗流
const cleaningStream = this.createTransformStream('cleaning', (data) => {
if (!data || typeof data !== 'object') return null;
// 数据清洗逻辑
return {
...data,
timestamp: data.timestamp || Date.now(),
processed: true
};
});
// 数据验证流
const validationStream = this.createFilterStream((data) => {
return data.timestamp &&
data.value !== undefined &&
!isNaN(data.value);
});
// 数据聚合流
const aggregationStream = this.createAggregateStream(
(data) => data.category || 'default',
(dataArray) => ({
count: dataArray.length,
sum: dataArray.reduce((sum, item) => sum + (item.value || 0), 0),
avg: dataArray.reduce((sum, item) => sum + (item.value || 0), 0) / dataArray.length,
min: Math.min(...dataArray.map(item => item.value || 0)),
max: Math.max(...dataArray.map(item => item.value || 0))
})
);
// 输出流
const outputStream = this.createOutputSink((result) => {
this.metrics.processed++;
this.emit('result', result);
console.log('Analysis result:', result);
});
return [cleaningStream, validationStream, aggregationStream, outputStream];
}
// 获取处理指标
getMetrics() {
const runtime = Date.now() - this.metrics.startTime;
return {
...this.metrics,
runtime,
throughput: this.metrics.processed / (runtime / 1000)
};
}
}
// 时序数据分析器
class TimeSeriesAnalyzer {
constructor(options = {}) {
this.windowSize = options.windowSize || 60000; // 1分钟
this.retentionPeriod = options.retentionPeriod || 24 * 60 * 60 * 1000; // 24小时
this.data = new Map();
this.startCleanup();
}
addDataPoint(series, value, timestamp = Date.now()) {
if (!this.data.has(series)) {
this.data.set(series, []);
}
const seriesData = this.data.get(series);
seriesData.push({ value, timestamp });
// 保持数据按时间排序
seriesData.sort((a, b) => a.timestamp - b.timestamp);
}
getMovingAverage(series, windowCount = 10) {
const seriesData = this.data.get(series);
if (!seriesData || seriesData.length < windowCount) {
return null;
}
const recentData = seriesData.slice(-windowCount);
const sum = recentData.reduce((sum, point) => sum + point.value, 0);
return sum / recentData.length;
}
detectAnomalies(series, threshold = 2) {
const seriesData = this.data.get(series);
if (!seriesData || seriesData.length < 10) {
return [];
}
const values = seriesData.map(point => point.value);
const mean = values.reduce((sum, val) => sum + val, 0) / values.length;
const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;
const stdDev = Math.sqrt(variance);
return seriesData.filter(point =>
Math.abs(point.value - mean) > threshold * stdDev
);
}
getTrend(series, periods = 10) {
const seriesData = this.data.get(series);
if (!seriesData || seriesData.length < periods) {
return null;
}
const recentData = seriesData.slice(-periods);
const n = recentData.length;
let sumX = 0, sumY = 0, sumXY = 0, sumXX = 0;
recentData.forEach((point, index) => {
sumX += index;
sumY += point.value;
sumXY += index * point.value;
sumXX += index * index;
});
const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
const intercept = (sumY - slope * sumX) / n;
return { slope, intercept, trend: slope > 0 ? 'increasing' : slope < 0 ? 'decreasing' : 'stable' };
}
startCleanup() {
setInterval(() => {
const cutoff = Date.now() - this.retentionPeriod;
this.data.forEach((seriesData, series) => {
const filteredData = seriesData.filter(point => point.timestamp > cutoff);
this.data.set(series, filteredData);
});
}, 60000); // 每分钟清理一次
}
}
module.exports = {
StreamProcessor,
TimeSeriesAnalyzer
};
7.2 机器学习集成
机器学习服务实现
// ml-service.js
const tf = require('@tensorflow/tfjs-node');
const fs = require('fs').promises;
const path = require('path');
class MLService {
constructor() {
this.models = new Map();
this.preprocessors = new Map();
this.metrics = new Map();
}
// 加载预训练模型
async loadModel(name, modelPath) {
try {
const model = await tf.loadLayersModel(`file://${modelPath}`);
this.models.set(name, {
model,
loadedAt: new Date(),
predictions: 0
});
console.log(`Model ${name} loaded successfully`);
} catch (error) {
console.error(`Failed to load model ${name}:`, error);
throw error;
}
}
// 创建简单的线性回归模型
createLinearRegressionModel(inputShape) {
const model = tf.sequential({
layers: [
tf.layers.dense({
inputShape: [inputShape],
units: 64,
activation: 'relu'
}),
tf.layers.dropout({ rate: 0.2 }),
tf.layers.dense({
units: 32,
activation: 'relu'
}),
tf.layers.dense({
units: 1,
activation: 'linear'
})
]
});
model.compile({
optimizer: tf.train.adam(0.001),
loss: 'meanSquaredError',
metrics: ['mae']
});
return model;
}
// 创建分类模型
createClassificationModel(inputShape, numClasses) {
const model = tf.sequential({
layers: [
tf.layers.dense({
inputShape: [inputShape],
units: 128,
activation: 'relu'
}),
tf.layers.dropout({ rate: 0.3 }),
tf.layers.dense({
units: 64,
activation: 'relu'
}),
tf.layers.dropout({ rate: 0.2 }),
tf.layers.dense({
units: numClasses,
activation: 'softmax'
})
]
});
model.compile({
optimizer: tf.train.adam(0.001),
loss: 'categoricalCrossentropy',
metrics: ['accuracy']
});
return model;
}
// 训练模型
async trainModel(name, trainData, validationData, options = {}) {
const model = this.models.get(name)?.model;
if (!model) {
throw new Error(`Model ${name} not found`);
}
const {
epochs = 100,
batchSize = 32,
validationSplit = 0.2,
callbacks = []
} = options;
// 添加早停回调
const earlyStopping = tf.callbacks.earlyStopping({
monitor: 'val_loss',
patience: 10,
restoreBestWeights: true
});
// 添加学习率调度
const reduceLROnPlateau = tf.callbacks.reduceLROnPlateau({
monitor: 'val_loss',
factor: 0.5,
patience: 5,
minLr: 0.0001
});
const allCallbacks = [earlyStopping, reduceLROnPlateau, ...callbacks];
const history = await model.fit(trainData.xs, trainData.ys, {
epochs,
batchSize,
validationData: validationData ? [validationData.xs, validationData.ys] : undefined,
validationSplit: validationData ? undefined : validationSplit,
callbacks: allCallbacks,
verbose: 1
});
// 保存训练历史
this.metrics.set(name, {
history: history.history,
trainedAt: new Date()
});
return history;
}
// 进行预测
async predict(modelName, inputData) {
const modelInfo = this.models.get(modelName);
if (!modelInfo) {
throw new Error(`Model ${modelName} not found`);
}
const { model } = modelInfo;
// 预处理输入数据
const preprocessedData = await this.preprocessData(modelName, inputData);
// 进行预测
const prediction = model.predict(preprocessedData);
// 更新预测计数
modelInfo.predictions++;
// 转换为JavaScript数组
const result = await prediction.data();
// 清理内存
prediction.dispose();
preprocessedData.dispose();
return Array.from(result);
}
// 批量预测
async batchPredict(modelName, inputDataArray) {
const results = [];
for (const inputData of inputDataArray) {
const result = await this.predict(modelName, inputData);
results.push(result);
}
return results;
}
// 数据预处理
async preprocessData(modelName, data) {
const preprocessor = this.preprocessors.get(modelName);
if (preprocessor) {
return preprocessor(data);
}
// 默认预处理:转换为张量
if (Array.isArray(data)) {
return tf.tensor2d([data]);
} else if (typeof data === 'object') {
// 假设是特征对象
const features = Object.values(data);
return tf.tensor2d([features]);
}
return tf.tensor2d([[data]]);
}
// 注册预处理器
registerPreprocessor(modelName, preprocessorFn) {
this.preprocessors.set(modelName, preprocessorFn);
}
// 模型评估
async evaluateModel(modelName, testData) {
const modelInfo = this.models.get(modelName);
if (!modelInfo) {
throw new Error(`Model ${modelName} not found`);
}
const { model } = modelInfo;
const evaluation = await model.evaluate(testData.xs, testData.ys);
const metrics = {};
const metricNames = model.metricsNames;
for (let i = 0; i < metricNames.length; i++) {
metrics[metricNames[i]] = await evaluation[i].data();
}
// 清理内存
evaluation.forEach(tensor => tensor.dispose());
return metrics;
}
// 保存模型
async saveModel(modelName, savePath) {
const modelInfo = this.models.get(modelName);
if (!modelInfo) {
throw new Error(`Model ${modelName} not found`);
}
const { model } = modelInfo;
await model.save(`file://${savePath}`);
console.log(`Model ${modelName} saved to ${savePath}`);
}
// 获取模型信息
getModelInfo(modelName) {
const modelInfo = this.models.get(modelName);
if (!modelInfo) {
return null;
}
const { model, loadedAt, predictions } = modelInfo;
return {
name: modelName,
loadedAt,
predictions,
inputShape: model.inputs[0].shape,
outputShape: model.outputs[0].shape,
trainableParams: model.countParams(),
layers: model.layers.length
};
}
// 获取所有模型统计
getAllModelsStats() {
const stats = {};
this.models.forEach((modelInfo, name) => {
stats[name] = this.getModelInfo(name);
});
return stats;
}
// 清理模型内存
disposeModel(modelName) {
const modelInfo = this.models.get(modelName);
if (modelInfo) {
modelInfo.model.dispose();
this.models.delete(modelName);
this.preprocessors.delete(modelName);
this.metrics.delete(modelName);
console.log(`Model ${modelName} disposed`);
}
}
// 清理所有模型
disposeAllModels() {
this.models.forEach((_, name) => {
this.disposeModel(name);
});
}
}
// 特征工程工具
class FeatureEngineering {
// 数据标准化
static normalize(data) {
const tensor = tf.tensor(data);
const normalized = tf.div(
tf.sub(tensor, tf.mean(tensor, 0)),
tf.add(tf.moments(tensor, 0).variance.sqrt(), 1e-7)
);
const result = normalized.arraySync();
tensor.dispose();
normalized.dispose();
return result;
}
// 最小-最大缩放
static minMaxScale(data, min = 0, max = 1) {
const tensor = tf.tensor(data);
const minVal = tf.min(tensor, 0);
const maxVal = tf.max(tensor, 0);
const scaled = tf.add(
tf.mul(
tf.div(tf.sub(tensor, minVal), tf.sub(maxVal, minVal)),
max - min
),
min
);
const result = scaled.arraySync();
tensor.dispose();
minVal.dispose();
maxVal.dispose();
scaled.dispose();
return result;
}
// 独热编码
static oneHotEncode(labels, numClasses) {
const tensor = tf.tensor1d(labels, 'int32');
const oneHot = tf.oneHot(tensor, numClasses);
const result = oneHot.arraySync();
tensor.dispose();
oneHot.dispose();
return result;
}
// 创建时间特征
static createTimeFeatures(timestamps) {
return timestamps.map(timestamp => {
const date = new Date(timestamp);
return {
hour: date.getHours(),
dayOfWeek: date.getDay(),
dayOfMonth: date.getDate(),
month: date.getMonth(),
quarter: Math.floor(date.getMonth() / 3),
isWeekend: date.getDay() === 0 || date.getDay() === 6 ? 1 : 0
};
});
}
// 多项式特征
static polynomialFeatures(data, degree = 2) {
const features = [];
for (let i = 0; i < data.length; i++) {
const row = data[i];
const polyRow = [...row];
// 添加多项式特征
for (let d = 2; d <= degree; d++) {
for (let j = 0; j < row.length; j++) {
polyRow.push(Math.pow(row[j], d));
}
}
// 添加交互特征
if (degree >= 2) {
for (let j = 0; j < row.length; j++) {
for (let k = j + 1; k < row.length; k++) {
polyRow.push(row[j] * row[k]);
}
}
}
features.push(polyRow);
}
return features;
}
}
module.exports = {
MLService,
FeatureEngineering
};
结语
感谢您的阅读!期待您的一键三连!欢迎指正!