当前位置: 首页 > news >正文

消息发送接收如何传递TraceId

文章目录

      • 以下是基于常见MQ(RabbitMQ/Kafka)的具体实现方案:
        • 一、通用准备:MDC与traceId基础
        • 二、RabbitMQ实现方案
          • 1. 发送消息时插入traceId
          • 2. 消费消息时提取traceId并写入MDC
        • 三、Kafka实现方案
          • 1. 发送消息时插入traceId
          • 2. 消费消息时提取traceId并写入MDC
        • 四、关键注意事项

在Java中通过MQ实现链路追踪(传递 traceId并在消费端写入MDC)的核心思路是: 发送消息时将当前线程的traceId存入消息元数据(如headers),消费消息时从元数据中提取traceId并设置到MDC,处理完成后清除MDC避免线程复用污染

以下是基于常见MQ(RabbitMQ/Kafka)的具体实现方案:

一、通用准备:MDC与traceId基础

MDC(Mapped Diagnostic Context)是日志框架(如Logback、Log4j)提供的线程本地存储工具,用于存储链路追踪的traceId。需先确保项目中已集成日志框架,并通过MDC.put("traceId", traceId)MDC.remove("traceId")操作上下文。

二、RabbitMQ实现方案

以Spring AMQP(Spring集成RabbitMQ)为例:

1. 发送消息时插入traceId

通过MessagePostProcessor拦截消息发送过程,将当前MDC中的traceId存入消息headers。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;
import java.util.UUID;@Component
public class RabbitMqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息时自动添加traceId到headerspublic void send(String exchange, String routingKey, Object message) {// 1. 获取当前MDC中的traceId(若不存在则生成)String traceId = MDC.get("traceId");if (traceId == null) {traceId = generateTraceId(); // 生成新的traceId(如UUID)}// 2. 通过MessagePostProcessor将traceId添加到消息headersMessagePostProcessor traceIdProcessor = msg -> {msg.getMessageProperties().setHeader("traceId", traceId);return msg;};// 3. 发送消息(携带traceId处理器)rabbitTemplate.convertAndSend(exchange, routingKey, message, traceIdProcessor);}// 生成traceId(可自定义规则,如UUID)private String generateTraceId() {return UUID.randomUUID().toString().replace("-", "");}
}
2. 消费消息时提取traceId并写入MDC

通过AOP拦截@RabbitListener标注的消费方法,从消息headers中提取traceId并设置到MDC,处理完成后清除。

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;@Aspect
@Component
public class RabbitMdcAspect {// 切入点:拦截所有@RabbitListener标注的方法@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")public void rabbitListenerPointcut() {}@Around("rabbitListenerPointcut()")public Object aroundConsumer(ProceedingJoinPoint joinPoint) throws Throwable {// 1. 从方法参数中获取Message对象(消息原始信息)Message message = getMessageFromArgs(joinPoint.getArgs());if (message != null) {// 2. 从消息headers中提取traceIdString traceId = message.getMessageProperties().getHeader("traceId");if (traceId != null) {MDC.put("traceId", traceId); // 写入MDC}}try {// 3. 执行消费逻辑return joinPoint.proceed();} finally {// 4. 清除MDC(关键:避免线程池复用导致的traceId残留)MDC.remove("traceId");}}// 从方法参数中提取Message对象(兼容多种参数类型)private Message getMessageFromArgs(Object[] args) {for (Object arg : args) {if (arg instanceof Message) {return (Message) arg;}}return null;}
}

注意:消费方法的参数需包含Message类型(或通过@Payload+@Header单独提取),确保能获取到原始消息headers。

三、Kafka实现方案

以Spring Kafka为例:

1. 发送消息时插入traceId

通过ProducerRecord的headers携带traceId

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import java.util.UUID;@Component
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public void send(String topic, Object message) {// 1. 获取或生成traceIdString traceId = MDC.get("traceId");if (traceId == null) {traceId = generateTraceId();}// 2. 创建ProducerRecord并添加traceId到headersProducerRecord<String, Object> record = new ProducerRecord<>(topic, message);record.headers().add("traceId", traceId.getBytes(StandardCharsets.UTF_8));// 3. 发送消息kafkaTemplate.send(record);}private String generateTraceId() {return UUID.randomUUID().toString().replace("-", "");}
}
2. 消费消息时提取traceId并写入MDC

通过AOP拦截@KafkaListener方法,从ConsumerRecord的headers中提取traceId

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;@Aspect
@Component
public class KafkaMdcAspect {// 切入点:拦截所有@KafkaListener标注的方法@Pointcut("@annotation(org.springframework.kafka.annotation.KafkaListener)")public void kafkaListenerPointcut() {}@Around("kafkaListenerPointcut()")public Object aroundConsumer(ProceedingJoinPoint joinPoint) throws Throwable {// 1. 从参数中获取ConsumerRecordConsumerRecord<?, ?> record = getConsumerRecordFromArgs(joinPoint.getArgs());if (record != null) {// 2. 从headers提取traceIdbyte[] traceIdBytes = record.headers().lastHeader("traceId")?.value();if (traceIdBytes != null) {String traceId = new String(traceIdBytes, StandardCharsets.UTF_8);MDC.put("traceId", traceId); // 写入MDC}}try {// 3. 执行消费逻辑return joinPoint.proceed();} finally {// 4. 清除MDCMDC.remove("traceId");}}// 从参数中提取ConsumerRecordprivate ConsumerRecord<?, ?> getConsumerRecordFromArgs(Object[] args) {for (Object arg : args) {if (arg instanceof ConsumerRecord) {return (ConsumerRecord<?, ?>) arg;}}return null;}
}
四、关键注意事项
  1. traceId生成策略:若当前线程无traceId(如定时任务触发的消息发送),需生成全局唯一ID(推荐UUID或雪花算法)。
  2. MDC清除:必须在finally块中清除traceId,否则线程池复用会导致后续任务继承错误的traceId
  3. 兼容性:消息序列化/反序列化需保留headers(如RabbitMQ的SimpleMessageConverter、Kafka的StringJsonMessageConverter默认支持headers传递)。
  4. 与链路工具集成:若使用Spring Cloud Sleuth、SkyWalking等工具,它们会自动处理traceId传递,无需手动实现(但可通过自定义补充)。

通过以上方案,即可在MQ消息的发送和消费环节打通链路追踪,实现日志的全链路关联。

http://www.dtcms.com/a/521571.html

相关文章:

  • GPU芯片内存泄漏测试方法
  • 深兰科技法务大模型亮相,推动律所文书处理智能化
  • wordpress文章数据包昆明网站优化
  • 建一个资源网站赚钱吗php网站制作实例教程
  • 百度上做优化一年多少钱网站优化 月付费
  • 成都网站建设 平易云网站建设模板哪里下载
  • Docker Swarm之Java 应用部署与平滑更新
  • 网站图片标签群晖 做网站服务器
  • 网站开发项目实训报告企业融资的主要方式
  • 对遗传学进行机器学习的现状与展望!
  • 做旅游海报的软件或是网站wordpress 4.7.2 提权
  • 电子商务网站设计分析怎么做宿州建设银行网站
  • 基于GMapping和蚁群算法的导航方案
  • 阳朔县建设规划局网站怒江州建设局网站企业备案网站
  • O2OA(v9.5)开发平台更新说明(三):聚焦安全与系统维护的全面升级
  • 工信部网站备案查询步骤iis做网站视
  • win7本机做网站自己创建公司网站
  • 【Day 81】虚拟化-虚拟磁盘管理
  • 使用python开发任天堂gameboy模拟器|pyboy开发实践
  • 平顶山网站建设公司视频软件制作app
  • 手机网站模板网网站开发实训报告总结2021
  • mwf攻防。
  • 购物网站开发 书籍wordpress+去掉阅读
  • CICD实战(9) - 使用Arbess+GitLab实现Python项目自动化部署
  • 贵州两学一做教育网站怎么做点击图片进入网站
  • 阮一峰《TypeScript 教程》学习笔记——类型系统
  • 网站如何做微信支付宝支付湖南网站排名优化公司
  • 房产网站推广做自己移动端网站
  • 江阴网站建设公司vs用户登录注册网站建设代码
  • 锁和原子变量的基础介绍