關於一些特性的小記
处理实时数据流 进行复杂的数据转换 过滤和聚合操作 同时保持完整的类型安全和错误处理能力
✅ 類型安全 ✅ 錯誤處理 ✅ 重試機制 ✅ 超時控制 ✅ 緩存支持 ✅ 攔截器模式 ✅ 可擴展性
// 高级类型定义
type DataStream<T> = AsyncIterable<T> | Iterable<T>;
type Processor<T, R> = (data: T) => Promise<R> | R;
type Predicate<T> = (data: T) => boolean;
type Reducer<T, R> = (accumulator: R, current: T) => R;// 错误处理类
class DataProcessingError extends Error {constructor(message: string,public readonly originalError?: unknown,public readonly data?: any) {super(message);this.name = 'DataProcessingError';}
}// 数据验证装饰器
function validate<T>(predicate: Predicate<T>, errorMessage: string) {return function (target: any,propertyKey: string,descriptor: PropertyDescriptor) {const originalMethod = descriptor.value;descriptor.value = async function (...args: any[]) {const result = await originalMethod.apply(this, args);if (!predicate(result)) {throw new DataProcessingError(errorMessage, undefined, result);}return result;};};
}// 主数据处理类
class ComplexDataProcessor<T, R> {private processors: Array<Processor<any, any>> = [];private errorHandlers: Array<(error: DataProcessingError) => void> = [];constructor(private initialData: DataStream<T>) {}// 添加处理步骤pipe<U>(processor: Processor<any, U>): ComplexDataProcessor<T, U> {this.processors.push(processor);return this as unknown as ComplexDataProcessor<T, U>;}// 添加错误处理catch(handler: (error: DataProcessingError) => void): this {this.errorHandlers.push(handler);return this;}// 类型守卫private static isAsyncIterable<T>(obj: any): obj is AsyncIterable<T> {return obj != null && typeof obj[Symbol.asyncIterator] === 'function';}// 核心处理方法@validate<Array<any>>(arr => arr.length > 0, '处理结果不能为空数组')async process(): Promise<R[]> {const results: R[] = [];try {if (ComplexDataProcessor.isAsyncIterable(this.initialData)) {for await (const data of this.initialData) {const result = await this.processData(data);results.push(result);}} else {for (const data of this.initialData) {const result = await this.processData(data);results.push(result);}}} catch (error) {this.handleError(error);}return results;}private async processData(data: T): Promise<R> {let currentData: any = data;for (const processor of this.processors) {try {currentData = await processor(currentData);} catch (error) {throw new DataProcessingError('数据处理失败',error,currentData);}}return currentData;}private handleError(error: unknown): never {if (error instanceof DataProcessingError) {for (const handler of this.errorHandlers) {handler(error);}}throw error;}// 静态工具方法static filter<T>(predicate: Predicate<T>): Processor<T, T | null> {return (data: T) => predicate(data) ? data : null;}static map<T, R>(mapper: (data: T) => R): Processor<T, R> {return mapper;}static reduce<T, R>(reducer: Reducer<T, R>, initialValue: R): Processor<T[], R> {return (data: T[]) => data.reduce(reducer, initialValue);}// 高级组合器static compose<A, B, C>(f1: Processor<A, B>,f2: Processor<B, C>): Processor<A, C> {return async (data: A) => {const result1 = await f1(data);return await f2(result1);};}// 条件处理static conditional<T, R>(condition: Predicate<T>,trueProcessor: Processor<T, R>,falseProcessor: Processor<T, R>): Processor<T, R> {return async (data: T) => {return condition(data) ? await trueProcessor(data) : await falseProcessor(data);};}
}// 使用示例
interface SensorData {id: string;value: number;timestamp: Date;status: 'active' | 'inactive';
}interface ProcessedData {id: string;normalizedValue: number;isValid: boolean;processedAt: Date;
}// 模拟数据流
async function* generateSensorData(): AsyncIterable<SensorData> {for (let i = 0; i < 100; i++) {yield {id: `sensor-${i}`,value: Math.random() * 100,timestamp: new Date(),status: i % 10 === 0 ? 'inactive' : 'active'};await new Promise(resolve => setTimeout(resolve, 100));}
}// 复杂数据处理示例
async function main() {const processor = new ComplexDataProcessor(generateSensorData()).pipe(ComplexDataProcessor.filter<SensorData>(data => data.status === 'active')).pipe(ComplexDataProcessor.map<SensorData, Partial<ProcessedData>>(data => ({id: data.id,normalizedValue: data.value * 0.95,isValid: data.value > 10}))).pipe(ComplexDataProcessor.conditional<Partial<ProcessedData>, ProcessedData>(data => data.isValid!,async data => ({...data,processedAt: new Date(),normalizedValue: Math.round(data.normalizedValue! * 100) / 100} as ProcessedData),async data => ({...data,processedAt: new Date(),normalizedValue: 0,isValid: false} as ProcessedData))).catch(error => {console.error('处理错误:', error.message, '数据:', error.data);});const results = await processor.process();console.log('处理结果:', results);// 使用reduce进行统计分析const statsProcessor = new ComplexDataProcessor(results).pipe(ComplexDataProcessor.reduce<ProcessedData, { sum: number; count: number }>((acc, curr) => ({sum: acc.sum + curr.normalizedValue,count: acc.count + 1}),{ sum: 0, count: 0 })).pipe(data => ({average: data.sum / data.count,total: data.count}));const stats = await statsProcessor.process();console.log('统计结果:', stats);
}// 运行处理
main().catch(console.error);
這個代碼是一個類型安全的數據流處理系統,可以用來處理實時數據流(如傳感器數據、日誌流、用戶行為數據等)
- 物聯網傳感器數據處理
// 處理溫度傳感器數據
const temperatureProcessor = new ComplexDataProcessor(sensorDataStream).pipe(ComplexDataProcessor.filter<SensorData>(data => data.temperature > -50 && data.temperature < 100)).pipe(ComplexDataProcessor.map<SensorData, ProcessedData>(data => ({...data,normalizedTemp: (data.temperature * 9/5) + 32, // 轉換為華氏溫度timestamp: new Date(data.timestamp)}))).catch(error => {console.error('溫度數據處理錯誤:', error);});const results = await temperatureProcessor.process();
- 用戶行為日誌分析
// 分析用戶點擊流
const userBehaviorProcessor = new ComplexDataProcessor(clickStream).pipe(ComplexDataProcessor.filter<ClickEvent>(event => event.userId !== undefined)).pipe(ComplexDataProcessor.map<ClickEvent, UserAnalytics>(event => ({userId: event.userId,action: event.actionType,duration: event.durationMs,converted: event.value > 0}))).pipe(ComplexDataProcessor.reduce<UserAnalytics, ConversionStats>((acc, curr) => ({totalUsers: acc.totalUsers + (curr.converted ? 1 : 0),totalActions: acc.totalActions + 1,totalValue: acc.totalValue + (curr.converted ? 1 : 0)}),{ totalUsers: 0, totalActions: 0, totalValue: 0 }));
- 金融交易處理
// 實時交易風控處理
const riskManagementProcessor = new ComplexDataProcessor(transactionStream).pipe(ComplexDataProcessor.conditional<Transaction, RiskResult>(transaction => transaction.amount > 10000,async largeTransaction => await checkLargeTransaction(largeTransaction),async normalTransaction => await checkNormalTransaction(normalTransaction))).pipe(ComplexDataProcessor.filter<RiskResult>(result => result.riskLevel > 0.7)).pipe(ComplexDataProcessor.map<RiskResult, Alert>(result => ({transactionId: result.transactionId,riskScore: result.riskLevel,alertType: 'HIGH_RISK',timestamp: new Date()})));
高級HTTP客戶端
// 高級HTTP客戶端封裝
class ApiClient {private baseURL: string;private defaultHeaders: HeadersInit;private retryAttempts: number;private timeout: number;constructor(config: {baseURL: string;headers?: HeadersInit;retryAttempts?: number;timeout?: number;}) {this.baseURL = config.baseURL;this.defaultHeaders = config.headers || {};this.retryAttempts = config.retryAttempts || 3;this.timeout = config.timeout || 10000;}// 帶重試機制的請求private async fetchWithRetry(url: string,options: RequestInit = {},attempt = 1): Promise<Response> {try {const controller = new AbortController();const timeoutId = setTimeout(() => controller.abort(), this.timeout);const response = await fetch(`${this.baseURL}${url}`, {...options,headers: { ...this.defaultHeaders, ...options.headers },signal: controller.signal});clearTimeout(timeoutId);if (!response.ok) {throw new ApiError(`HTTP ${response.status}: ${response.statusText}`,response.status,await response.text());}return response;} catch (error) {if (attempt >= this.retryAttempts) throw error;// 指數退避重試const delay = Math.pow(2, attempt) * 1000;await new Promise(resolve => setTimeout(resolve, delay));return this.fetchWithRetry(url, options, attempt + 1);}}// 類型安全的GET請求async get<T>(url: string, options?: RequestInit): Promise<T> {const response = await this.fetchWithRetry(url, {...options,method: 'GET'});return this.parseResponse<T>(response);}// 類型安全的POST請求async post<T>(url: string, data?: any, options?: RequestInit): Promise<T> {const response = await this.fetchWithRetry(url, {...options,method: 'POST',body: data ? JSON.stringify(data) : undefined,headers: {'Content-Type': 'application/json',...options?.headers}});return this.parseResponse<T>(response);}// 響應解析與驗證private async parseResponse<T>(response: Response): Promise<T> {const contentType = response.headers.get('content-type');if (contentType?.includes('application/json')) {const data = await response.json();// 可選:添加數據驗證if (this.validateResponse) {return this.validateResponse<T>(data);}return data as T;}return response.text() as unknown as T;}// 可選的響應驗證函數validateResponse?: <T>(data: any) => T;
}// 自定義API錯誤類
class ApiError extends Error {constructor(message: string,public readonly statusCode: number,public readonly responseBody: string) {super(message);this.name = 'ApiError';}
}
創建API客戶端實例
const apiClient = new ApiClient({baseURL: 'https://api.example.com',headers: {'Authorization': 'Bearer your-token-here','Accept': 'application/json'},retryAttempts: 3,timeout: 15000
});// 可選:添加響應驗證
apiClient.validateResponse = <T>(data: any): T => {if (!data || typeof data !== 'object') {throw new ApiError('Invalid response format', 500, 'Invalid data');}return data as T;
};
封裝式API數據流生成器
// API數據流配置接口
interface ApiStreamConfig<T> {endpoint: string;pollingInterval?: number;maxRequests?: number;transform?: (data: any) => T;onError?: (error: Error) => void;
}// 高級API數據流生成器
class ApiDataStream<T> implements AsyncIterable<T> {private apiClient: ApiClient;private config: ApiStreamConfig<T>;private requestCount = 0;constructor(apiClient: ApiClient, config: ApiStreamConfig<T>) {this.apiClient = apiClient;this.config = {pollingInterval: 1000,maxRequests: Infinity,...config};}async *[Symbol.asyncIterator](): AsyncIterator<T> {while (this.requestCount < this.config.maxRequests!) {try {const rawData = await this.apiClient.get<any>(this.config.endpoint);let processedData: T;if (this.config.transform) {processedData = this.config.transform(rawData);} else {processedData = rawData as T;}yield processedData;this.requestCount++;// 等待下一次輪詢await new Promise(resolve => setTimeout(resolve, this.config.pollingInterval!));} catch (error) {if (this.config.onError) {this.config.onError(error as Error);}// 錯誤時等待更長時間再重試await new Promise(resolve => setTimeout(resolve, this.config.pollingInterval! * 2));}}}// 創建分頁數據流static createPaginatedStream<T>(apiClient: ApiClient,endpoint: string,pageSize: number = 100): AsyncIterable<T[]> {return {[Symbol.asyncIterator]: async function* () {let page = 1;let hasMore = true;while (hasMore) {try {const response = await apiClient.get<{data: T[];pagination: { hasMore: boolean; total: number };}>(`${endpoint}?page=${page}&limit=${pageSize}`);yield response.data;hasMore = response.pagination.hasMore;page++;await new Promise(resolve => setTimeout(resolve, 500));} catch (error) {console.error('分頁請求失敗:', error);break;}}}};}
}
創建高級數據流
interface SensorData {id: string;temperature: number;humidity: number;timestamp: string;
}// 方式1:基本數據流
const sensorStream = new ApiDataStream<SensorData>(apiClient, {endpoint: '/sensors/real-time',pollingInterval: 2000,transform: (rawData: any) => ({id: rawData.sensor_id,temperature: rawData.temp_c,humidity: rawData.humidity_pct,timestamp: new Date(rawData.timestamp).toISOString()}),onError: (error) => {console.error('傳感器數據獲取失敗:', error);// 可以發送到監控系統}
});// 方式2:分頁數據流
const historicalDataStream = ApiDataStream.createPaginatedStream<SensorData>(apiClient,'/sensors/history',50
);
在數據處理器中使用
const processor = new ComplexDataProcessor(sensorStream).pipe(ComplexDataProcessor.filter<SensorData>(data => data.temperature > -50)).pipe(ComplexDataProcessor.map<SensorData, ProcessedData>(data => ({...data,normalizedTemp: data.temperature * 1.8 + 32,status: data.temperature > 30 ? 'WARNING' : 'NORMAL'})));// 處理數據
const results = await processor.process();
請求攔截器
// 添加請求日誌攔截
apiClient.fetchWithRetry = new Proxy(apiClient.fetchWithRetry, {apply: async (target, thisArg, args) => {console.log('發起請求:', args[0]);const startTime = Date.now();try {const result = await target.apply(thisArg, args);console.log(`請求成功: ${Date.now() - startTime}ms`);return result;} catch (error) {console.error(`請求失敗: ${Date.now() - startTime}ms`, error);throw error;}}
});
緩存機制
// 添加內存緩存
const responseCache = new Map<string, { data: any; timestamp: number }>();apiClient.get = new Proxy(apiClient.get, {apply: async (target, thisArg, args) => {const cacheKey = args[0];const cached = responseCache.get(cacheKey);if (cached && Date.now() - cached.timestamp < 30000) {return cached.data;}const result = await target.apply(thisArg, args);responseCache.set(cacheKey, { data: result, timestamp: Date.now() });return result;}
});