当前位置: 首页 > news >正文

RabbitMQ消息传输中Protostuff序列化数据异常的深度解析与解决方案

目录

问题背景

环境配置

使用的依赖

测试对象

初始代码(有问题的版本)

问题分析

1. 初步排查

2. 关键发现

3. RabbitTemplate的默认行为分析

4. SimpleMessageConverter的处理机制

深入理解消息转换

消息转换器的层次结构:

而直接发送 Message:

解决方案

方案1:直接使用Message对象(推荐)

方案2:配置自定义MessageConverter

问题根因总结

经验教训

结论

最后最后附上序列化工具:


问题背景

在日常开发中,我们经常使用RabbitMQ作为消息中间件进行系统间的通信。最近,我在使用Protostuff进行对象序列化,并通过RabbitMQ传输时遇到了一个棘手的问题:反序列化失败!

  • 发送端:序列化后的数据长度为8 bytes

  • 接收端:接收到的数据长度变为14 bytes

  • 错误信息

Caused by: io.protostuff.ProtobufException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.at io.protostuff.ProtobufException.truncatedMessage(ProtobufException.java:76)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 69 out of bounds for length 14

环境配置

使用的依赖

<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.8.0</version>
</dependency>
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.8.0</version>
</dependency>

测试对象

@Data
public class TestVO {private String name;private Integer age;public TestVO() {}
}

初始代码(有问题的版本)

发送端:

TestVO test = new TestVO();
test.setName("test");
test.setAge(1);
byte[] data = ProtostuffUtils.serialize(test);
System.out.println("发送消息大小: " + data.length + " bytes"); // 输出:8 bytes
rabbitTemplate.convertAndSend(rabbitMQConfigProperties.getExchange().getFanout().getPSet(), "", data);

接收端:

@RabbitListener(queues = "${my-rabbitmq-config.queue.p-set}")
@RabbitHandler
public void pSetListener(Message message) {byte[] body = message.getBody();System.out.println("接收消息大小: " + body.length + " bytes"); // 输出:14 bytes!TestVO result = ProtostuffUtils.deserialize(body, TestVO.class); // 这里报错!
}

问题分析

1. 初步排查

首先,我排除了Protostuff本身的问题:

  • 在本地直接序列化后立即反序列化:✓ 正常工作

  • 序列化数据在不同JVM间传输:✓ 正常工作

这说明Protostuff序列化机制本身没有问题。

2. 关键发现

通过对比发送和接收的数据长度:

  • 发送:8 bytes

  • 接收:14 bytes

数据在传输过程中被修改了!这指向了RabbitMQ的消息处理机制。

3. RabbitTemplate的默认行为分析

深入研究RabbitTemplate的源码后发现,当使用convertAndSend()方法时,会经过以下流程:

// RabbitTemplate内部逻辑
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);}protected Message convertMessageIfNecessary(Object object) {if (object instanceof Message msg) {return msg;} else {return this.getRequiredMessageConverter().toMessage(object, new MessageProperties());}}

convertMessageIfNecessary()方法会使用配置的MessageConverter对消息进行转换。默认使用的是SimpleMessageConverter

4. SimpleMessageConverter的处理机制

对于byte[]类型的数据,SimpleMessageConverter可能会进行以下处理:

protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {if (object instanceof byte[] bytes) {messageProperties.setContentType("application/octet-stream");} else if (object instanceof String) {try {bytes = ((String)object).getBytes(this.defaultCharset);} catch (UnsupportedEncodingException e) {throw new MessageConversionException("failed to convert to Message content", e);}messageProperties.setContentType("text/plain");messageProperties.setContentEncoding(this.defaultCharset);} else if (object instanceof Serializable) {try {bytes = SerializationUtils.serialize(object);} catch (IllegalArgumentException e) {throw new MessageConversionException("failed to convert to serialized Message content", e);}messageProperties.setContentType("application/x-java-serialized-object");}if (bytes != null) {messageProperties.setContentLength((long)bytes.length);return new Message(bytes, messageProperties);} else {String var10002 = this.getClass().getSimpleName();throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());

深入理解消息转换

消息转换器的层次结构:

RabbitTemplate.convertAndSend()↓
MessageConverter.convertToMessage()↓
SimpleMessageConverter.toMessage()  // 这里进行了数据包装!↓
创建最终的 Message 对象

而直接发送 Message:

RabbitTemplate.send()↓
直接使用提供的 Message 对象  // 跳过转换步骤!↓
发送到 RabbitMQ

解决方案

方案1:直接使用Message对象(推荐)

修改后的发送端代码:

@Component
public class FixedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendProtostuffMessage(Object object, String exchange, String routingKey) {// 序列化对象byte[] data = ProtostuffUtils.serialize(object);System.out.println("发送消息大小: " + data.length + " bytes");// 创建Message对象,明确指定内容类型MessageProperties properties = new MessageProperties();properties.setContentType("application/x-protostuff");properties.setContentLength(data.length);Message message = new Message(data, properties);// 直接发送Message对象,跳过自动转换rabbitTemplate.send(exchange, routingKey, message);}
}

方案2:配置自定义MessageConverter

@Configuration
public class RabbitMQConfig {/*** 配置二进制消息转换器,避免对byte[]进行额外处理*/@Beanpublic MessageConverter binaryMessageConverter() {return new MessageConverter() {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) {if (object instanceof byte[]) {// 对于byte[],直接包装,不做任何处理return new Message((byte[]) object, messageProperties);}// 其他类型使用默认处理return new SimpleMessageConverter().toMessage(object, messageProperties);}@Overridepublic Object fromMessage(Message message) {// 直接返回消息体,不进行任何解码return message.getBody();}};}
}

在接收端也保持一致性:

@RabbitListener(queues = "queue_name")
public void handleBinaryMessage(Message message) {byte[] body = message.getBody();  // 直接获取原始数据// 进行反序列化MyObject obj = ProtostuffUtils.deserialize(body, MyObject.class);
}

问题根因总结

  1. 直接原因:RabbitTemplate的convertAndSend()方法内部的SimpleMessageConverter对byte[]数据进行了额外处理

  2. 根本原因:消息转换器对二进制数据的默认处理策略与Protostuff的原始格式不兼容

  3. 数据变化:8 bytes → 14 bytes 的具体原因可能是:

    • Base64编码或其他编码转换

    • 添加消息头信息

    • 数据包装和格式化

经验教训

  1. 不要假设:不要假设RabbitTemplate会原样传输byte[]数据

  2. 明确指定:对于二进制数据,总是明确指定内容类型

  3. 跳过转换:使用Message对象直接发送,跳过自动转换步骤

  4. 添加调试:在关键位置添加数据长度和内容的调试信息

  5. 版本兼容:确保发送端和接收端使用相同的序列化版本

结论

通过这个问题的解决,我们深刻认识到:在使用消息中间件传输二进制数据时,必须了解其内部的消息转换机制。直接使用Message对象而不是依赖自动转换,可以确保数据的完整性和一致性。

这个经验不仅适用于Protostuff,同样适用于Protocol Buffers、Avro等其他二进制序列化框架在RabbitMQ中的使用。

最后最后附上序列化工具:
 

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** Protostuff序列化工具类* 基于Protostuff实现高效的对象序列化与反序列化** @author wp* @Description:* @date 2025-10-15 15:53*/
public class ProtostuffSerializer {private static final Logger log = LoggerFactory.getLogger(ProtostuffSerializer.class);/*** 缓存Schema,避免重复创建提高性能*/private static final Map<Class<?>, Schema<?>> SCHEMA_CACHE = new ConcurrentHashMap<>();/*** 默认缓冲区大小*/private static final int DEFAULT_BUFFER_SIZE = 512;/*** 获取类的Schema,如果缓存中没有则创建并缓存** @param clazz 类对象* @param <T>   泛型类型* @return 类对应的Schema*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> clazz) {Schema<T> schema = (Schema<T>) SCHEMA_CACHE.get(clazz);if (schema == null) {schema = RuntimeSchema.getSchema(clazz);if (schema != null) {SCHEMA_CACHE.put(clazz, schema);} else {throw new IllegalArgumentException("无法为类 " + clazz.getName() + " 创建Schema,请检查该类是否有默认无参构造函数");}}return schema;}/*** 将对象序列化为字节数组** @param obj 要序列化的对象* @param <T> 对象类型* @return 序列化后的字节数组,若对象为null则返回null*/public static <T> byte[] serialize(T obj) {if (obj == null) {log.warn("序列化对象为null");return null;}Class<T> clazz = (Class<T>) obj.getClass();LinkedBuffer buffer = null;byte[] data = null;try {Schema<T> schema = getSchema(clazz);buffer = LinkedBuffer.allocate(DEFAULT_BUFFER_SIZE);data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);log.debug("对象[{}]序列化成功,长度: {}字节", clazz.getName(), data.length);} catch (Exception e) {log.error("对象序列化失败", e);throw new RuntimeException("对象序列化失败: " + e.getMessage(), e);} finally {// 释放缓冲区资源if (buffer != null) {buffer.clear();}}return data;}/*** 将字节数组反序列化为指定类型的对象** @param data  序列化后的字节数组* @param clazz 目标对象类型* @param <T>   泛型类型* @return 反序列化后的对象,若字节数组为null或空则返回null*/public static <T> T deserialize(byte[] data, Class<T> clazz) {if (data == null || data.length == 0) {log.warn("反序列化字节数组为null或空");return null;}T obj = null;try {obj = clazz.getDeclaredConstructor().newInstance();Schema<T> schema = getSchema(clazz);ProtostuffIOUtil.mergeFrom(data, obj, schema);log.debug("字节数组反序列化为[{}]成功", clazz.getName());} catch (Exception e) {log.error("字节数组反序列化失败", e);throw new RuntimeException("字节数组反序列化失败: " + e.getMessage(), e);}return obj;}
}

http://www.dtcms.com/a/490815.html

相关文章:

  • SSH连接服务器超时?可能原因与解决方案
  • iOS 代上架实战指南,从账号管理到使用 开心上架 上传IPA的完整流程
  • Visual Studio下的内存安全检测:CRT 内存泄漏 AddressSanitizer
  • iOS混淆与IPA文件加固深度解析,从反编译风险到苹果应用安全工程实践
  • 眉山建设中等职业技术学校 网站公司网页制作费用大概要多少钱?
  • 张店网站制作首选专家计算机大专生的出路
  • 万网的网站建设广州互联网公司集中在哪个区
  • 数据安全系列7:常用的非对称算法浅析
  • uniapp微信小程序+vue3基础内容介绍~(含标签、组件生命周期、页面生命周期、条件编译(一码多用)、分包))
  • 微信小程序报错 ubepected character `的style换行问题
  • H5封装打包小程序助手抖音快手微信小程序看广告流量主开源
  • 金华建设局网站做爰片在线看网站
  • 如何做二维码链接网站虚拟空间的网站赚钱吗
  • 营业部绩效考核方案与管理方法
  • 光刻刻蚀工艺控制要点及材料技术进展与限制
  • VPS SSH密钥登录配置指南:告别密码,拥抱安全
  • 注入“侨动力” 锻造“湘非链”
  • 做网站自己申请域名还是建站公司菏泽最好的网站建设公司
  • 网站建设方面书籍温州网站建设案例
  • 【Linux】Linux 零拷贝技术全景解读:从内核到硬件的性能优化之道
  • 微软ML.NET技术详解:从数据科学到生产部署的全栈解决方案
  • 镇江网站搜索引擎优化做外贸雨伞到什么网站
  • 网站收录一般多久沈阳建设学院
  • C++ AI 编程助手
  • 编程之python基础
  • 【系统分析师】写作框架:软件设计模式及其应用
  • leetcode 2598 执行操作后最大MEX
  • GPTBots Multi-Agent架构解析:如何通过多Agent协同实现业务智能化升级
  • 深圳网站建设智能小程序礼品网站如何做
  • 预约洗车小程序