Kafka如何配置生产者拦截器和消费者拦截器
Kafka 的生产者拦截器和消费者拦截器允许你在消息发送前后以及消息消费前后嵌入自定义逻辑,用于实现监控、审计、消息修改等功能。本文我们就用一个最常见的传递TraceId的案例来说明下这两类拦截器如何来使用。
生产者发送拦截器
生产者拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor
接口。在这个拦截器中,我们把保存到ThreadLocal中的traceId设置到消息的header中。
步骤 1:实现拦截器类
创建一个类,实现 ProducerInterceptor
接口。该接口有两个核心方法:
-
onSend(ProducerRecord record): 在消息被序列化和计算分区之前调用。你可以修改或记录消息。
-
onAcknowledgement(RecordMetadata metadata, Exception exception): 在消息被服务器确认(成功或失败)之后调用。这会在生产者回调触发之前调用。注意:该方法不要在 ProducerInterceptor 中实现耗时逻辑,因为它会阻塞生产者。
public class SendTraceIdInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {// 把TheadLocal中traceId设置到header中producerRecord.headers().add(RequestContext.TRACE_ID, RequestContext.getTraceId().getBytes());return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("send successfully");} else {log.error("send error : {}", e);}}@Overridepublic void close() {}// 这里可以拿到所有的producer的配置信息@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
步骤 2:在生产者配置中指定拦截器
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.SendTraceIdInterceptor
消费者接收拦截器
消费者拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor
接口。在这个拦截器中我们读取消息中的header并重新设置到ThreadLocal中。
步骤 1:实现拦截器类
创建一个类,实现 ConsumerInterceptor
接口。该接口也有两个核心方法:
- onConsume(ConsumerRecords records): 在消息被反序列化之后、传递给消费者poll()方法返回之前调用。你可以修改或过滤消息。
- onCommit(Map offsets): 在消费者提交偏移量之后调用。
public class ReceiveTraceIdInterceptor implements ConsumerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(ReceiveTraceIdInterceptor.class);@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {for(Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator(); recordIterator.hasNext();){ConsumerRecord<String, String> record = recordIterator.next();Headers headers = record.headers();if(headers == null){continue;}for(Iterator<Header> headerIterator = headers.iterator(); headerIterator.hasNext();){Header header = headerIterator.next();// 从header中获取traceId, 并保存到ThreadLocal if(Objects.equals(header.key(), RequestContext.TRACE_ID)){RequestContext.setTraceId(new String(header.value()));}}}return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}// 这里可以拿到所有的消费者的配置@Overridepublic void configure(Map<String, ?> configs) {log.info("consumer configure:{}", configs);}
}
步骤 2:在消费者配置中指定拦截器
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址consumer:group-id: my-group # 默认的消费者组IDauto-offset-reset: earliest # 如果没有初始偏移量或偏移量已失效,从最早的消息开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.ReceiveTraceIdInterceptor
总结
位置
- 生产者拦截器:在消息序列化和分区之前(onSend)以及确认之后(onAcknowledgement)调用。
- 消费者拦截器:在消息反序列化之后、返回给用户之前(onConsume)以及提交偏移量之后(onCommit)调用。
配置
使用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
和 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG
属性进行配置。
值是该拦截器类的全限定名,多个拦截器用逗号分隔,它们会按照配置的顺序执行。
用途
- 监控和审计:记录消息发送/接收的成功失败、延迟等。
- 消息修改:在发送前给消息添加统一前缀或头信息。
- 自定义指标:与监控系统(如 Prometheus)集成,收集特定指标。
- 过滤:消费者端可以尝试过滤消息,比如:本地local开发环境和测试服务器的test环境可能使用的是同一套kafka服务,我们可以在消息头中传递环境标识,在消费者端去过滤只属于自己这个环境的消息,从而防止引起混乱。