当前位置: 首页 > news >正文

Kafka如何配置生产者拦截器和消费者拦截器

Kafka 的生产者拦截器和消费者拦截器允许你在消息发送前后以及消息消费前后嵌入自定义逻辑,用于实现监控、审计、消息修改等功能。本文我们就用一个最常见的传递TraceId的案例来说明下这两类拦截器如何来使用。

生产者发送拦截器

生产者拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。在这个拦截器中,我们把保存到ThreadLocal中的traceId设置到消息的header中。

步骤 1:实现拦截器类

创建一个类,实现 ProducerInterceptor 接口。该接口有两个核心方法:

  • onSend(ProducerRecord record): 在消息被序列化和计算分区之前调用。你可以修改或记录消息。

  • onAcknowledgement(RecordMetadata metadata, Exception exception): 在消息被服务器确认(成功或失败)之后调用。这会在生产者回调触发之前调用。注意:该方法不要在 ProducerInterceptor 中实现耗时逻辑,因为它会阻塞生产者。

public class SendTraceIdInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {// 把TheadLocal中traceId设置到header中producerRecord.headers().add(RequestContext.TRACE_ID, RequestContext.getTraceId().getBytes());return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("send successfully");} else {log.error("send error : {}", e);}}@Overridepublic void close() {}// 这里可以拿到所有的producer的配置信息@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
步骤 2:在生产者配置中指定拦截器
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.SendTraceIdInterceptor

消费者接收拦截器

消费者拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口。在这个拦截器中我们读取消息中的header并重新设置到ThreadLocal中。

步骤 1:实现拦截器类

创建一个类,实现 ConsumerInterceptor 接口。该接口也有两个核心方法:

  • onConsume(ConsumerRecords records): 在消息被反序列化之后、传递给消费者poll()方法返回之前调用。你可以修改或过滤消息。
  • onCommit(Map offsets): 在消费者提交偏移量之后调用。
public class ReceiveTraceIdInterceptor implements ConsumerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(ReceiveTraceIdInterceptor.class);@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {for(Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator(); recordIterator.hasNext();){ConsumerRecord<String, String> record = recordIterator.next();Headers headers = record.headers();if(headers == null){continue;}for(Iterator<Header> headerIterator = headers.iterator(); headerIterator.hasNext();){Header header = headerIterator.next();// 从header中获取traceId, 并保存到ThreadLocal          if(Objects.equals(header.key(), RequestContext.TRACE_ID)){RequestContext.setTraceId(new String(header.value()));}}}return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}// 这里可以拿到所有的消费者的配置@Overridepublic void configure(Map<String, ?> configs) {log.info("consumer configure:{}", configs);}
}
步骤 2:在消费者配置中指定拦截器
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-group  # 默认的消费者组IDauto-offset-reset: earliest  # 如果没有初始偏移量或偏移量已失效,从最早的消息开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.ReceiveTraceIdInterceptor    

总结

位置
  • 生产者拦截器:在消息序列化和分区之前(onSend)以及确认之后(onAcknowledgement)调用。
  • 消费者拦截器:在消息反序列化之后、返回给用户之前(onConsume)以及提交偏移量之后(onCommit)调用。
配置

使用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIGConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 属性进行配置。
值是该拦截器类的全限定名,多个拦截器用逗号分隔,它们会按照配置的顺序执行。

用途
  • 监控和审计:记录消息发送/接收的成功失败、延迟等。
  • 消息修改:在发送前给消息添加统一前缀或头信息。
  • 自定义指标:与监控系统(如 Prometheus)集成,收集特定指标。
  • 过滤:消费者端可以尝试过滤消息,比如:本地local开发环境和测试服务器的test环境可能使用的是同一套kafka服务,我们可以在消息头中传递环境标识,在消费者端去过滤只属于自己这个环境的消息,从而防止引起混乱。

文章转载自:

http://kJ1XhWiR.hxLch.cn
http://hAEUQL9Y.hxLch.cn
http://e0SkLOS7.hxLch.cn
http://Y6lNaMTY.hxLch.cn
http://KZSt6C2T.hxLch.cn
http://twGi9JfR.hxLch.cn
http://gSbMBwZW.hxLch.cn
http://q8ib7NMB.hxLch.cn
http://XmbY6kf7.hxLch.cn
http://MpCucS1T.hxLch.cn
http://ry5J3SyG.hxLch.cn
http://fA15HzOF.hxLch.cn
http://8jAyRXS3.hxLch.cn
http://Cleiezx2.hxLch.cn
http://NBPIfiaI.hxLch.cn
http://hterBzhI.hxLch.cn
http://HWnNYbEd.hxLch.cn
http://9SNPwG4Y.hxLch.cn
http://DeDKunrG.hxLch.cn
http://vCcqTpxB.hxLch.cn
http://MndRhLbV.hxLch.cn
http://uKlRgd2X.hxLch.cn
http://1TdWiFnj.hxLch.cn
http://0Qg4F1qs.hxLch.cn
http://Gn4RwC4o.hxLch.cn
http://mhpo4F19.hxLch.cn
http://q6FbGJZy.hxLch.cn
http://w325QwOB.hxLch.cn
http://4Bsbk1jd.hxLch.cn
http://agqmOLY3.hxLch.cn
http://www.dtcms.com/a/381517.html

相关文章:

  • uniapp:根据目的地经纬度,名称,唤起高德/百度地图来导航,兼容App,H5,小程序
  • 欧拉函数 | 定义 / 性质 / 应用
  • 【更新至2024年】1996-2024年各省农业总产值数据(无缺失)
  • 财报季观察|消费“分野”,燕之屋(1497.HK)们向上生长
  • 机械制造专属ERP:降本增效与数字转型的关键
  • 基于node.js+vue的医院陪诊系统的设计与实现(源码+论文+部署+安装)
  • 【大语言模型 59】监控与日志系统:训练过程全面监控
  • HIS架构智能化升级编程路径:从底层原理到临床实践的深度解析(下)
  • Node.js中package.json详解
  • 当AI遇上数据库:Text2Sql.Net如何让“说人话查数据“成为现实
  • 数据结构8——双向链表
  • 问卷系统自动化测试报告
  • Python 的函数柯里化(Currying)
  • 渗透测试信息收集详解
  • 【连载3】C# MVC 异常日志进阶:结构化日志与性能优化技巧
  • 冯诺依曼体系:现代计算机的基石与未来展望
  • 关于在阿里云DMS误操作后如何恢复数据的记录
  • 贪心算法应用:神经网络剪枝详解
  • 灵活学习PyTorch算法:从动态计算图到领域最佳实践
  • [code-review] 部署配置 | Docker+PM2 | AWS Lambda | Vercel+边缘函数
  • 递归,搜索与回溯算法
  • 31.网络基础概念(一)
  • 贪心算法应用:信用卡还款优化问题详解
  • Linux的多线程
  • 《链式二叉树常用操作全解析》
  • ——贪心算法——
  • IDEA使用Maven和MyBatis简化数据库连接(配置篇)
  • MLLM学习~M3-Agent如何处理视频:视频clip提取、音频提取、抽帧提取和人脸提取
  • video视频标签 响应式写法 pc 手机调用不同视频 亲测
  • CMD简单用法