spring-kafka的消息拦截器RecordInterceptor
KafkaClients提供了ConsumerInterceptor
可以在消费消息之前对消息进行拦截,Spring-Kafka对消费拦截器做了增强,新提供了一个与之类似的RecordInterceptor
,我们还是以传递TraceId来看下具体的使用方法。
发送消息之前使用ProducerInterceptor设置TraceId
public class SendTraceIdInterceptor implements ProducerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(SendTraceIdInterceptor.class);@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {producerRecord.headers().add(RequestContext.TRACE_ID, RequestContext.getTraceId().getBytes());return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("send successfully");} else {log.error("send error : {}", e);}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
不要忘记把它配置到yml中:
kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.SendTraceIdInterceptor
收到消息之前使用RecordInterceptor重新设置TraceId
@Slf4j
public class ReceiveTraceIdRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record, Consumer<Object, Object> consumer) {Header header = record.headers().lastHeader(RequestContext.TRACE_ID);if(header != null){log.info("ReceiveTraceIdRecordInterceptor 设置 TraceId");RequestContext.setTraceId(new String(header.value()));}return record;}
}
需要把它配置成spring的一个bean:
@Slf4j
@Configuration
public class KafkaConfiguration {@Beanpublic ReceiveTraceIdRecordInterceptor receiveTraceIdInterceptor(){return new ReceiveTraceIdRecordInterceptor();}
}
发消息测试
消息发送者:
@GetMapping("send/{msg}")
public String send(@PathVariable("msg")String msg){CompletableFuture future = kafkaTemplate.send("test-topic", msg);try{future.get();log.info("消息发送成功,msg:{} traceId:{}",msg, RequestContext.getTraceId());}catch(Exception e){e.printStackTrace();}return "OK";
}
消息接收者:
@KafkaListener(topics = {"test-topic"}, filter = "traceIdFilter")
public void onMessage(ConsumerRecord record){Object value = record.value();log.info("收到ConsumerRecord消息:{}, traceId:{}", value, RequestContext.getTraceId());
}
多个RecordInterceptor
可以使用CompositeRecordInterceptor
来包装多个RecordInterceptor
, 多个RecordInterceptor
按照顺序依次执行:
@Bean
public CompositeRecordInterceptor<Object, Object> compositeRecordInterceptor(){ReceiveTraceIdRecordInterceptor interceptor1 = new ReceiveTraceIdRecordInterceptor();RecordInterceptor<Object, Object> interceptor2 = (record, consumer) -> {log.info("interceptor2.intercept");return record;};return new CompositeRecordInterceptor<>(interceptor2, interceptor1);
}
各种Interceptor的执行顺寻问题
在我们这个场景中,RecordInterceptor
、ConsumerInterceptor
、RecordFilterStrategy
都可以完成这个功能,我们都添加上,看一下他们之间的执行顺序:
2025-09-16T15:50:37.065+08:00 INFO 6452 --- [ntainer#0-0-C-1] c.g.x.k.i.ReceiveTraceIdInterceptor : ReceiveTraceIdInterceptor 设置 TraceId
2025-09-16T15:50:37.068+08:00 INFO 6452 --- [ntainer#0-0-C-1] .g.x.k.i.ReceiveTraceIdRecordInterceptor : ReceiveTraceIdRecordInterceptor 设置 TraceId
2025-09-16T15:50:37.069+08:00 INFO 6452 --- [ntainer#0-0-C-1] c.github.xjs.kafka.filter.TraceIdFilter : TraceIdFilter 设置 TraceId
从日志可以很明显的看出来,按照先后顺序依次是:ConsumerInterceptor
> RecordInterceptor
> TraceIdFilter
。