当前位置: 首页 > news >正文

spring-kafka的消息拦截器RecordInterceptor

KafkaClients提供了ConsumerInterceptor可以在消费消息之前对消息进行拦截,Spring-Kafka对消费拦截器做了增强,新提供了一个与之类似的RecordInterceptor,我们还是以传递TraceId来看下具体的使用方法。

发送消息之前使用ProducerInterceptor设置TraceId

public class SendTraceIdInterceptor implements ProducerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(SendTraceIdInterceptor.class);@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {producerRecord.headers().add(RequestContext.TRACE_ID, RequestContext.getTraceId().getBytes());return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("send successfully");} else {log.error("send error : {}", e);}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}

不要忘记把它配置到yml中:

  kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.SendTraceIdInterceptor

收到消息之前使用RecordInterceptor重新设置TraceId

@Slf4j
public class ReceiveTraceIdRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record, Consumer<Object, Object> consumer) {Header header = record.headers().lastHeader(RequestContext.TRACE_ID);if(header != null){log.info("ReceiveTraceIdRecordInterceptor 设置 TraceId");RequestContext.setTraceId(new String(header.value()));}return record;}
}

需要把它配置成spring的一个bean:

@Slf4j
@Configuration
public class KafkaConfiguration  {@Beanpublic ReceiveTraceIdRecordInterceptor receiveTraceIdInterceptor(){return new ReceiveTraceIdRecordInterceptor();}
}

发消息测试

消息发送者:

@GetMapping("send/{msg}")
public String send(@PathVariable("msg")String msg){CompletableFuture future = kafkaTemplate.send("test-topic", msg);try{future.get();log.info("消息发送成功,msg:{} traceId:{}",msg, RequestContext.getTraceId());}catch(Exception e){e.printStackTrace();}return "OK";
}

消息接收者:

@KafkaListener(topics = {"test-topic"}, filter = "traceIdFilter")
public void onMessage(ConsumerRecord record){Object value = record.value();log.info("收到ConsumerRecord消息:{}, traceId:{}", value, RequestContext.getTraceId());
}

多个RecordInterceptor

可以使用CompositeRecordInterceptor来包装多个RecordInterceptor, 多个RecordInterceptor按照顺序依次执行:

@Bean
public CompositeRecordInterceptor<Object, Object> compositeRecordInterceptor(){ReceiveTraceIdRecordInterceptor interceptor1 = new ReceiveTraceIdRecordInterceptor();RecordInterceptor<Object, Object> interceptor2 = (record, consumer) -> {log.info("interceptor2.intercept");return record;};return new CompositeRecordInterceptor<>(interceptor2, interceptor1);
}

各种Interceptor的执行顺寻问题

在我们这个场景中,RecordInterceptorConsumerInterceptorRecordFilterStrategy都可以完成这个功能,我们都添加上,看一下他们之间的执行顺序:

2025-09-16T15:50:37.065+08:00  INFO 6452 --- [ntainer#0-0-C-1] c.g.x.k.i.ReceiveTraceIdInterceptor      : ReceiveTraceIdInterceptor 设置 TraceId
2025-09-16T15:50:37.068+08:00  INFO 6452 --- [ntainer#0-0-C-1] .g.x.k.i.ReceiveTraceIdRecordInterceptor : ReceiveTraceIdRecordInterceptor 设置 TraceId
2025-09-16T15:50:37.069+08:00  INFO 6452 --- [ntainer#0-0-C-1] c.github.xjs.kafka.filter.TraceIdFilter  : TraceIdFilter 设置 TraceId

从日志可以很明显的看出来,按照先后顺序依次是:ConsumerInterceptor> RecordInterceptor > TraceIdFilter


文章转载自:

http://vU5PDpnt.qmbtn.cn
http://q41Tst60.qmbtn.cn
http://KRBRssxN.qmbtn.cn
http://CTU4TbQB.qmbtn.cn
http://mSBw30P7.qmbtn.cn
http://MwkUz3js.qmbtn.cn
http://49fkA21M.qmbtn.cn
http://I69WYu9t.qmbtn.cn
http://iEz7TVnU.qmbtn.cn
http://4VfNlstJ.qmbtn.cn
http://hRILrHOw.qmbtn.cn
http://0Q8lFl4z.qmbtn.cn
http://Jx5yuRxw.qmbtn.cn
http://JBz09nW6.qmbtn.cn
http://cf0qkyFj.qmbtn.cn
http://AdYdHW7g.qmbtn.cn
http://bjdv283U.qmbtn.cn
http://tNBqTyNF.qmbtn.cn
http://jH5GfxtB.qmbtn.cn
http://tDsOu5Fm.qmbtn.cn
http://JqNAEI35.qmbtn.cn
http://5z1T426n.qmbtn.cn
http://FQrT6XVl.qmbtn.cn
http://Rzn2boXQ.qmbtn.cn
http://pyJApxah.qmbtn.cn
http://6wzvha0b.qmbtn.cn
http://daFApVTe.qmbtn.cn
http://vvzk6iwO.qmbtn.cn
http://CvnrFyhl.qmbtn.cn
http://SClnsmb4.qmbtn.cn
http://www.dtcms.com/a/386191.html

相关文章:

  • VSCode + Python 开发踩坑:虚拟环境不在项目根目录导致包无法识别该怎么办
  • 【MCP】【FastMCP】[特殊字符] 使用 UV 创建 FastMCP 服务完整示例
  • 蓝绿部署(Blue-Green Deployment)介绍(一种用于降低软件发布风险的部署策略)流量切换(金丝雀发布)
  • 羽毛球地板:从专业运动场景到全民健身市场的技术跃迁与产业重构
  • 【实战】预警算法--噪声添加机制
  • Three.js 中如何给 3D 模型添加文字标签?
  • 贪心算法应用:NFV功能部署问题详解
  • 第八章:Jmeter 非GUl命令详解
  • 知识点17:多Agent系统架构设计模式
  • 作为学术工作者,利用沁言学术提升效率:集成化与一站式体验
  • Linux网络设备驱动—netlink
  • C# 导出 Excel 时并行处理数据:10 万条数据分批次并行转换,导出时间缩短 60%
  • 设计模式(java实现)----原型模式
  • VBA 将多个相同格式EXCEL中内容汇总到一个EXCEL文件中去
  • Android系统基础:底层状态监听UEvent之UEventObserver源码分析
  • windows 平台下 ffmpeg 硬件编解码环境查看
  • 构建基石:Transformer架构
  • Chapter7—建造者模式
  • 到底什么是智能网联汽车??第二期——决策与控制
  • 将普通Wpf项目改成Prism项目
  • 微硕WINSOK高性能N沟道场效应管WSD3040DN56,助力汽车中控散热风扇静音长寿命
  • nextjs+shadcn+tailwindcss实现博客中的overview
  • cursor-关于自定义指令的问题处理
  • Vision Transformer (ViT) :Transformer在computer vision领域的应用(四)
  • 【开题答辩全过程】以 “今天吃什么”微信小程序为例,包含答辩的问题和答案
  • iOS App 内存泄漏与性能调优实战 如何排查内存问题、优化CPU与GPU性能、降低耗电并提升流畅度(uni-app iOS开发优化指南)
  • 从 Token 拦截器到 Web 配置
  • Next.js 的原理和它的使用场景
  • SPAR模型优化思路
  • pycharm+miniconda cursor+miniconda配置