当前位置: 首页 > news >正文

kafka组件traceId增强

问题背景

kafka 组件在排查生产者和消费者问题的时候,经常出现日志不匹配的情况,为了解决这个问题,本文实现 traceId 在kafka 组件的生产者和消费者之间传递,达到日志匹配,快速排查问题目的。

实现方案

1、新增生产者拦截器,读取 MDC 中的 traceId 并放入 kafka 消息的 headers 中;

2、新增消费者拦截器,读取 kafka 消息 headers 中的 traceId,覆写 MDC 的 traceId。

新增两个拦截器的方案对代码几乎没有侵入性,很优雅的解决了我们的问题。

生产者拦截器

1、kafka 消息发送前,进入到生产者拦截器,如果 MDC.get("trace_id") 存在值,则读取当前 MDC 中的 traceId 并放入消息的 headers 中;

2、如果 MDC.get("trace_id") 不存在,则生成一个 traceId,并写入消息的 headers 中。

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;@Slf4j
@Component
public class KafkaTraceIdProducerInterceptor implements ProducerInterceptor<Integer, String> {public static final String TRACE_ID = "trace_id";@Overridepublic ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> producerRecord) {producerRecord.headers().add(new Header() {@Overridepublic String key() {return TRACE_ID;}@Overridepublic byte[] value() {return getOrGenerateTraceId().getBytes(StandardCharsets.UTF_8);}});return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}/*** 获取当前请求TraceId或生成新的traceId** @return traceId*/public static String getOrGenerateTraceId() {return Optional.ofNullable(MDC.get(TRACE_ID)).orElseGet(UUID.randomUUID()::toString);}
}

消费者拦截器

读取消息 headers 中的 traceId,覆写 MDC 的 traceId,消息消费完成后清理相关的 MDC。

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.MDC;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.UUID;@Slf4j
@Component
public class KafkaConsumerInterceptor<K, V> implements RecordInterceptor<K, V> {private final static String TRACE_ID = "trace_id";@Overridepublic ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> consumerRecord) {return consumerRecord;}@Overridepublic ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {try {Header traceHeader = record.headers().lastHeader(TRACE_ID);String traceId = (traceHeader != null && traceHeader.value() != null)? new String(traceHeader.value(), StandardCharsets.UTF_8): UUID.randomUUID().toString();MDC.put(TRACE_ID, traceId);} catch (Exception e) {log.error("处理Kafka消息头异常", e);}return record;}@Overridepublic void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {MDC.clear();}@Overridepublic void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {MDC.clear();}}

问题延伸

如果遇到需要打印生产者和消费者接口日志或者遇到安全问题,生产者消息加密,消费者消息解密,都可以通过上述增加生产者拦截器和消费者拦截器来解决问题。

http://www.dtcms.com/a/525565.html

相关文章:

  • 【流程引擎】与【规则引擎】
  • 商业网站排名深圳市住房和建设保障局
  • PSG(巴黎圣日耳曼)技术文章大纲
  • wecenter wordpressseowhy是什么意思中文
  • 微店常用API:获取商品详情接口|关键字搜索商品接口|获取快递费接口-打通商品运营与用户体验的技术桥梁
  • 给aws xray添加采样规则
  • 圈地游戏(分数规划、网格图对偶建模)
  • 工商注册官方网站北京软件开发公司官网
  • 南充网站建设服务商互动平台
  • 电影网站html模板屋领网站固链
  • marm_ros2 机械臂视觉抓取操作流程
  • 像淘客基地这样的网站如何做网站引导页在线做
  • Wordpress 仿站 工具深圳金科威公司官网
  • python在Arcgis Pro中 多边形锐角识别与切割脚本笔记
  • 一种使用 PowerToys 的键盘管理器工具编辑惠普暗影精灵11 的 OMEN 自定义按键的方法
  • 锂电池充放电管理学习
  • 复数等式:为何对所有整数都成立?
  • CLIP模型全解析:从对比学习到零样本识别的革命
  • 广州网站优化步骤用公司网站后缀做邮箱
  • 202.快乐数
  • 黑色asp企业网站源码办公空间设计说明300字
  • 站群 网站如何做网站页面设计网页说明
  • 昆山的网站建设网站建设改手机号
  • HTML:Video视频切换时不重新加载
  • CSP-S模拟赛六总结
  • 网站开发工具中的三剑客湖南建设人力资源网证书查询
  • 朝阳网站推广网站开发费用说明
  • seo做的好的网站有哪些东莞最新招聘
  • seo网站优化多少钱全球咨询公司排名
  • ElementPlus 如何支持移动端