消息发送接收如何传递TraceId
文章目录
- 以下是基于常见MQ(RabbitMQ/Kafka)的具体实现方案:
- 一、通用准备:MDC与traceId基础
- 二、RabbitMQ实现方案
- 1. 发送消息时插入traceId
- 2. 消费消息时提取traceId并写入MDC
- 三、Kafka实现方案
- 1. 发送消息时插入traceId
- 2. 消费消息时提取traceId并写入MDC
- 四、关键注意事项
在Java中通过MQ实现链路追踪(传递
traceId并在消费端写入MDC)的核心思路是:
发送消息时将当前线程的traceId存入消息元数据(如headers),消费消息时从元数据中提取traceId并设置到MDC,处理完成后清除MDC避免线程复用污染。
以下是基于常见MQ(RabbitMQ/Kafka)的具体实现方案:
一、通用准备:MDC与traceId基础
MDC(Mapped Diagnostic Context)是日志框架(如Logback、Log4j)提供的线程本地存储工具,用于存储链路追踪的traceId。需先确保项目中已集成日志框架,并通过MDC.put("traceId", traceId)和MDC.remove("traceId")操作上下文。
二、RabbitMQ实现方案
以Spring AMQP(Spring集成RabbitMQ)为例:
1. 发送消息时插入traceId
通过MessagePostProcessor拦截消息发送过程,将当前MDC中的traceId存入消息headers。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;
import java.util.UUID;@Component
public class RabbitMqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息时自动添加traceId到headerspublic void send(String exchange, String routingKey, Object message) {// 1. 获取当前MDC中的traceId(若不存在则生成)String traceId = MDC.get("traceId");if (traceId == null) {traceId = generateTraceId(); // 生成新的traceId(如UUID)}// 2. 通过MessagePostProcessor将traceId添加到消息headersMessagePostProcessor traceIdProcessor = msg -> {msg.getMessageProperties().setHeader("traceId", traceId);return msg;};// 3. 发送消息(携带traceId处理器)rabbitTemplate.convertAndSend(exchange, routingKey, message, traceIdProcessor);}// 生成traceId(可自定义规则,如UUID)private String generateTraceId() {return UUID.randomUUID().toString().replace("-", "");}
}
2. 消费消息时提取traceId并写入MDC
通过AOP拦截@RabbitListener标注的消费方法,从消息headers中提取traceId并设置到MDC,处理完成后清除。
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;@Aspect
@Component
public class RabbitMdcAspect {// 切入点:拦截所有@RabbitListener标注的方法@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")public void rabbitListenerPointcut() {}@Around("rabbitListenerPointcut()")public Object aroundConsumer(ProceedingJoinPoint joinPoint) throws Throwable {// 1. 从方法参数中获取Message对象(消息原始信息)Message message = getMessageFromArgs(joinPoint.getArgs());if (message != null) {// 2. 从消息headers中提取traceIdString traceId = message.getMessageProperties().getHeader("traceId");if (traceId != null) {MDC.put("traceId", traceId); // 写入MDC}}try {// 3. 执行消费逻辑return joinPoint.proceed();} finally {// 4. 清除MDC(关键:避免线程池复用导致的traceId残留)MDC.remove("traceId");}}// 从方法参数中提取Message对象(兼容多种参数类型)private Message getMessageFromArgs(Object[] args) {for (Object arg : args) {if (arg instanceof Message) {return (Message) arg;}}return null;}
}
注意:消费方法的参数需包含
Message类型(或通过@Payload+@Header单独提取),确保能获取到原始消息headers。
三、Kafka实现方案
以Spring Kafka为例:
1. 发送消息时插入traceId
通过ProducerRecord的headers携带traceId。
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import java.util.UUID;@Component
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public void send(String topic, Object message) {// 1. 获取或生成traceIdString traceId = MDC.get("traceId");if (traceId == null) {traceId = generateTraceId();}// 2. 创建ProducerRecord并添加traceId到headersProducerRecord<String, Object> record = new ProducerRecord<>(topic, message);record.headers().add("traceId", traceId.getBytes(StandardCharsets.UTF_8));// 3. 发送消息kafkaTemplate.send(record);}private String generateTraceId() {return UUID.randomUUID().toString().replace("-", "");}
}
2. 消费消息时提取traceId并写入MDC
通过AOP拦截@KafkaListener方法,从ConsumerRecord的headers中提取traceId。
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.slf4j.MDC;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;@Aspect
@Component
public class KafkaMdcAspect {// 切入点:拦截所有@KafkaListener标注的方法@Pointcut("@annotation(org.springframework.kafka.annotation.KafkaListener)")public void kafkaListenerPointcut() {}@Around("kafkaListenerPointcut()")public Object aroundConsumer(ProceedingJoinPoint joinPoint) throws Throwable {// 1. 从参数中获取ConsumerRecordConsumerRecord<?, ?> record = getConsumerRecordFromArgs(joinPoint.getArgs());if (record != null) {// 2. 从headers提取traceIdbyte[] traceIdBytes = record.headers().lastHeader("traceId")?.value();if (traceIdBytes != null) {String traceId = new String(traceIdBytes, StandardCharsets.UTF_8);MDC.put("traceId", traceId); // 写入MDC}}try {// 3. 执行消费逻辑return joinPoint.proceed();} finally {// 4. 清除MDCMDC.remove("traceId");}}// 从参数中提取ConsumerRecordprivate ConsumerRecord<?, ?> getConsumerRecordFromArgs(Object[] args) {for (Object arg : args) {if (arg instanceof ConsumerRecord) {return (ConsumerRecord<?, ?>) arg;}}return null;}
}
四、关键注意事项
- traceId生成策略:若当前线程无
traceId(如定时任务触发的消息发送),需生成全局唯一ID(推荐UUID或雪花算法)。 - MDC清除:必须在
finally块中清除traceId,否则线程池复用会导致后续任务继承错误的traceId。 - 兼容性:消息序列化/反序列化需保留headers(如RabbitMQ的
SimpleMessageConverter、Kafka的StringJsonMessageConverter默认支持headers传递)。 - 与链路工具集成:若使用Spring Cloud Sleuth、SkyWalking等工具,它们会自动处理
traceId传递,无需手动实现(但可通过自定义补充)。
通过以上方案,即可在MQ消息的发送和消费环节打通链路追踪,实现日志的全链路关联。
