netty异步日志架构
当Netty处理大量日志时,需要从多个层面进行优化,避免日志本身成为系统瓶颈。
1. 异步日志架构
1.1 异步日志处理器
java
@Component
public class AsyncLogHandler extends SimpleChannelInboundHandler<LogMessage> {// 高性能无锁队列private final Disruptor<LogEvent> disruptor;private final RingBuffer<LogEvent> ringBuffer;// 批量处理配置private static final int BATCH_SIZE = 1000;private static final int BUFFER_SIZE = 65536;private final List<LogMessage> batchBuffer = new ArrayList<>(BATCH_SIZE);private final ScheduledExecutorService flushScheduler = Executors.newScheduledThreadPool(1);@PostConstructpublic void init() {// 初始化Disruptordisruptor = new Disruptor<>(LogEvent::new,BUFFER_SIZE,Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());disruptor.handleEventsWith(new LogEventHandler());disruptor.start();ringBuffer = disruptor.getRingBuffer();// 启动定时刷新flushScheduler.scheduleAtFixedRate(this::flushBatch, 100, 100, TimeUnit.MILLISECONDS);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LogMessage logMessage) {// 异步写入队列,不阻塞IO线程long sequence = ringBuffer.next();try {LogEvent event = ringBuffer.get(sequence);event.setLogMessage(logMessage);event.setReceiveTime(System.currentTimeMillis());} finally {ringBuffer.publish(sequence);}}// Disruptor事件处理器private class LogEventHandler implements EventHandler<LogEvent> {@Overridepublic void onEvent(LogEvent event, long sequence, boolean endOfBatch) {processLogEvent(event);// 批量处理完成时刷新if (endOfBatch) {flushBatch();}}}private void processLogEvent(LogEvent event) {batchBuffer.add(event.getLogMessage());if (batchBuffer.size() >= BATCH_SIZE) {flushBatch();}}private void flushBatch() {if (batchBuffer.isEmpty()) {return;}List<LogMessage> logsToProcess = new ArrayList<>(batchBuffer);batchBuffer.clear();// 异步处理批量日志CompletableFuture.runAsync(() -> processBatchLogs(logsToProcess)).exceptionally(ex -> {log.error("批量处理日志异常", ex);return null;});}
}
1.2 日志分级处理策略
java
@Component
public class LogLevelProcessor {// 不同级别日志采用不同处理策略private final Map<LogLevel, LogStrategy> strategyMap = new EnumMap<>(LogLevel.class);@PostConstructpublic void init() {// ERROR级别 - 实时处理strategyMap.put(LogLevel.ERROR, new RealTimeLogStrategy());// WARN级别 - 快速处理 strategyMap.put(LogLevel.WARN, new FastLogStrategy());// INFO级别 - 批量处理strategyMap.put(LogLevel.INFO, new BatchLogStrategy());// DEBUG级别 - 采样处理strategyMap.put(LogLevel.DEBUG, new SamplingLogStrategy(0.1)); // 10%采样}public void processLog(LogMessage log) 