RabbitMQ消息传输中Protostuff序列化数据异常的深度解析与解决方案
目录
问题背景
环境配置
使用的依赖
测试对象
初始代码(有问题的版本)
问题分析
1. 初步排查
2. 关键发现
3. RabbitTemplate的默认行为分析
4. SimpleMessageConverter的处理机制
深入理解消息转换
消息转换器的层次结构:
而直接发送 Message:
解决方案
方案1:直接使用Message对象(推荐)
方案2:配置自定义MessageConverter
问题根因总结
经验教训
结论
最后最后附上序列化工具:
问题背景
在日常开发中,我们经常使用RabbitMQ作为消息中间件进行系统间的通信。最近,我在使用Protostuff进行对象序列化,并通过RabbitMQ传输时遇到了一个棘手的问题:反序列化失败!
-
发送端:序列化后的数据长度为8 bytes
-
接收端:接收到的数据长度变为14 bytes
-
错误信息:
Caused by: io.protostuff.ProtobufException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.at io.protostuff.ProtobufException.truncatedMessage(ProtobufException.java:76)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 69 out of bounds for length 14
环境配置
使用的依赖
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.8.0</version>
</dependency>
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.8.0</version>
</dependency>
测试对象
@Data
public class TestVO {private String name;private Integer age;public TestVO() {}
}
初始代码(有问题的版本)
发送端:
TestVO test = new TestVO();
test.setName("test");
test.setAge(1);
byte[] data = ProtostuffUtils.serialize(test);
System.out.println("发送消息大小: " + data.length + " bytes"); // 输出:8 bytes
rabbitTemplate.convertAndSend(rabbitMQConfigProperties.getExchange().getFanout().getPSet(), "", data);
接收端:
@RabbitListener(queues = "${my-rabbitmq-config.queue.p-set}")
@RabbitHandler
public void pSetListener(Message message) {byte[] body = message.getBody();System.out.println("接收消息大小: " + body.length + " bytes"); // 输出:14 bytes!TestVO result = ProtostuffUtils.deserialize(body, TestVO.class); // 这里报错!
}
问题分析
1. 初步排查
首先,我排除了Protostuff本身的问题:
-
在本地直接序列化后立即反序列化:✓ 正常工作
-
序列化数据在不同JVM间传输:✓ 正常工作
这说明Protostuff序列化机制本身没有问题。
2. 关键发现
通过对比发送和接收的数据长度:
-
发送:8 bytes
-
接收:14 bytes
数据在传输过程中被修改了!这指向了RabbitMQ的消息处理机制。
3. RabbitTemplate的默认行为分析
深入研究RabbitTemplate的源码后发现,当使用convertAndSend()
方法时,会经过以下流程:
// RabbitTemplate内部逻辑
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);}protected Message convertMessageIfNecessary(Object object) {if (object instanceof Message msg) {return msg;} else {return this.getRequiredMessageConverter().toMessage(object, new MessageProperties());}}
convertMessageIfNecessary()
方法会使用配置的MessageConverter对消息进行转换。默认使用的是SimpleMessageConverter
。
4. SimpleMessageConverter的处理机制
对于byte[]
类型的数据,SimpleMessageConverter
可能会进行以下处理:
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {if (object instanceof byte[] bytes) {messageProperties.setContentType("application/octet-stream");} else if (object instanceof String) {try {bytes = ((String)object).getBytes(this.defaultCharset);} catch (UnsupportedEncodingException e) {throw new MessageConversionException("failed to convert to Message content", e);}messageProperties.setContentType("text/plain");messageProperties.setContentEncoding(this.defaultCharset);} else if (object instanceof Serializable) {try {bytes = SerializationUtils.serialize(object);} catch (IllegalArgumentException e) {throw new MessageConversionException("failed to convert to serialized Message content", e);}messageProperties.setContentType("application/x-java-serialized-object");}if (bytes != null) {messageProperties.setContentLength((long)bytes.length);return new Message(bytes, messageProperties);} else {String var10002 = this.getClass().getSimpleName();throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
深入理解消息转换
消息转换器的层次结构:
RabbitTemplate.convertAndSend()↓
MessageConverter.convertToMessage()↓
SimpleMessageConverter.toMessage() // 这里进行了数据包装!↓
创建最终的 Message 对象
而直接发送 Message:
RabbitTemplate.send()↓
直接使用提供的 Message 对象 // 跳过转换步骤!↓
发送到 RabbitMQ
解决方案
方案1:直接使用Message对象(推荐)
修改后的发送端代码:
@Component
public class FixedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendProtostuffMessage(Object object, String exchange, String routingKey) {// 序列化对象byte[] data = ProtostuffUtils.serialize(object);System.out.println("发送消息大小: " + data.length + " bytes");// 创建Message对象,明确指定内容类型MessageProperties properties = new MessageProperties();properties.setContentType("application/x-protostuff");properties.setContentLength(data.length);Message message = new Message(data, properties);// 直接发送Message对象,跳过自动转换rabbitTemplate.send(exchange, routingKey, message);}
}
方案2:配置自定义MessageConverter
@Configuration
public class RabbitMQConfig {/*** 配置二进制消息转换器,避免对byte[]进行额外处理*/@Beanpublic MessageConverter binaryMessageConverter() {return new MessageConverter() {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) {if (object instanceof byte[]) {// 对于byte[],直接包装,不做任何处理return new Message((byte[]) object, messageProperties);}// 其他类型使用默认处理return new SimpleMessageConverter().toMessage(object, messageProperties);}@Overridepublic Object fromMessage(Message message) {// 直接返回消息体,不进行任何解码return message.getBody();}};}
}
在接收端也保持一致性:
@RabbitListener(queues = "queue_name")
public void handleBinaryMessage(Message message) {byte[] body = message.getBody(); // 直接获取原始数据// 进行反序列化MyObject obj = ProtostuffUtils.deserialize(body, MyObject.class);
}
问题根因总结
-
直接原因:RabbitTemplate的
convertAndSend()
方法内部的SimpleMessageConverter
对byte[]数据进行了额外处理 -
根本原因:消息转换器对二进制数据的默认处理策略与Protostuff的原始格式不兼容
-
数据变化:8 bytes → 14 bytes 的具体原因可能是:
-
Base64编码或其他编码转换
-
添加消息头信息
-
数据包装和格式化
-
经验教训
-
不要假设:不要假设RabbitTemplate会原样传输byte[]数据
-
明确指定:对于二进制数据,总是明确指定内容类型
-
跳过转换:使用Message对象直接发送,跳过自动转换步骤
-
添加调试:在关键位置添加数据长度和内容的调试信息
-
版本兼容:确保发送端和接收端使用相同的序列化版本
结论
通过这个问题的解决,我们深刻认识到:在使用消息中间件传输二进制数据时,必须了解其内部的消息转换机制。直接使用Message
对象而不是依赖自动转换,可以确保数据的完整性和一致性。
这个经验不仅适用于Protostuff,同样适用于Protocol Buffers、Avro等其他二进制序列化框架在RabbitMQ中的使用。
最后最后附上序列化工具:
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** Protostuff序列化工具类* 基于Protostuff实现高效的对象序列化与反序列化** @author wp* @Description:* @date 2025-10-15 15:53*/
public class ProtostuffSerializer {private static final Logger log = LoggerFactory.getLogger(ProtostuffSerializer.class);/*** 缓存Schema,避免重复创建提高性能*/private static final Map<Class<?>, Schema<?>> SCHEMA_CACHE = new ConcurrentHashMap<>();/*** 默认缓冲区大小*/private static final int DEFAULT_BUFFER_SIZE = 512;/*** 获取类的Schema,如果缓存中没有则创建并缓存** @param clazz 类对象* @param <T> 泛型类型* @return 类对应的Schema*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> clazz) {Schema<T> schema = (Schema<T>) SCHEMA_CACHE.get(clazz);if (schema == null) {schema = RuntimeSchema.getSchema(clazz);if (schema != null) {SCHEMA_CACHE.put(clazz, schema);} else {throw new IllegalArgumentException("无法为类 " + clazz.getName() + " 创建Schema,请检查该类是否有默认无参构造函数");}}return schema;}/*** 将对象序列化为字节数组** @param obj 要序列化的对象* @param <T> 对象类型* @return 序列化后的字节数组,若对象为null则返回null*/public static <T> byte[] serialize(T obj) {if (obj == null) {log.warn("序列化对象为null");return null;}Class<T> clazz = (Class<T>) obj.getClass();LinkedBuffer buffer = null;byte[] data = null;try {Schema<T> schema = getSchema(clazz);buffer = LinkedBuffer.allocate(DEFAULT_BUFFER_SIZE);data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);log.debug("对象[{}]序列化成功,长度: {}字节", clazz.getName(), data.length);} catch (Exception e) {log.error("对象序列化失败", e);throw new RuntimeException("对象序列化失败: " + e.getMessage(), e);} finally {// 释放缓冲区资源if (buffer != null) {buffer.clear();}}return data;}/*** 将字节数组反序列化为指定类型的对象** @param data 序列化后的字节数组* @param clazz 目标对象类型* @param <T> 泛型类型* @return 反序列化后的对象,若字节数组为null或空则返回null*/public static <T> T deserialize(byte[] data, Class<T> clazz) {if (data == null || data.length == 0) {log.warn("反序列化字节数组为null或空");return null;}T obj = null;try {obj = clazz.getDeclaredConstructor().newInstance();Schema<T> schema = getSchema(clazz);ProtostuffIOUtil.mergeFrom(data, obj, schema);log.debug("字节数组反序列化为[{}]成功", clazz.getName());} catch (Exception e) {log.error("字节数组反序列化失败", e);throw new RuntimeException("字节数组反序列化失败: " + e.getMessage(), e);}return obj;}
}