Listener method could not be invoked with the incoming message
问题描述
生产者方代码:
private void rollbackOrder(long orderId, CorrelationData correlationData) {rabbitTemplate.convertAndSend("order-rollback-exchange","rollback.order",new QuotaRollbackTO(orderId,null,null),correlationData);}
消费者方代码:
@RabbitListener(queues = "rollback.order.queue")public void handleRollback(QuotaRollbackTO quotaRollbackTO, Message message, Channel channel) throws Exception {try {//回滚订单的逻辑orderMainMapper.deleteById(quotaRollbackTO.getOrderId());orderProductMapper.deleteById(quotaRollbackTO.getOrderId());// 手动ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {System.err.println("消息处理失败:" + e.getMessage());// 拒绝消息并转入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}
Rabbit MQ发送消息报错:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void org.ragdollcat.order.mqlistener.OrderMQListener.handleRollback(org.ragdollcat.order.to.QuotaRollbackTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception]
原因分析:
由于未配置 RabbitMQ 使用 JSON 消息转换器,Spring Boot 默认采用了 SimpleMessageConverter
,导致无法将 JSON 数据反序列化为 QuotaRollbackTO 对象,从而造成监听方法调用失败。在错误信息中,可能会发现以下提示:
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.ragdollcat.order.to.QuotaRollbackTO] for GenericMessage [payload=byte[234], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=rollback.order, spring_listener_return_correlation=4e708ed8-bfde-4f43-9282-4701c1b5ef19, amqp_receivedExchange=order-rollback-exchange, spring_returned_message_correlation=4591a034-2ee5-4d4c-8a3c-4c95a3b43572, amqp_deliveryTag=1, amqp_consumerQueue=rollback.order.queue, amqp_redelivered=false, id=58a03a74-a8d4-329c-7d26-c229977f4c15, amqp_consumerTag=amq.ctag-0deI5Ts_m6_K32RdEYyDZA, contentType=application/x-java-serialized-object, timestamp=1747560067493}]
证实了发送端把对象以JDK默认方式(ObjectOutputStream)序列化成 byte[],接收端尝试用 JSON 的方式将这个 byte[] 解析成 QuotaRollbackTO,失败。
解决方案:
在配置类中,显式使用 JSON 消息转换器,为 RabbitTemplate 设置 Jackson2JsonMessageConverter
(或放在Spring Boot的引导类中也可以,引导类本身也是作为一个配置类),这样后续注入的就是自定义的RabbitTemplate
:
@Configuration
public class RabbitTemplateConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,MessageConverter jacksonMessageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 开启强制投递,必须配置这个 ReturnCallback 才会生效!rabbitTemplate.setMandatory(true);// 设置 ConfirmCallbackrabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println(" 交换机已收到消息:" + correlationData);} else {System.err.println(" 交换机未收到消息:" + cause + ",相关数据:" + correlationData);}});// 设置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println(" 消息未成功路由到队列:");System.err.println(" 原始消息:" + new String(message.getBody()));System.err.println(" 应答码:" + replyCode);System.err.println(" 原因:" + replyText);System.err.println(" 交换机:" + exchange);System.err.println(" 路由键:" + routingKey);});//为 RabbitTemplate 设置 `Jackson2JsonMessageConverter`rabbitTemplate.setMessageConverter(jacksonMessageConverter);return rabbitTemplate;}@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,MessageConverter jacksonMessageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(jacksonMessageConverter);return factory;}}
同样地在发送消息时,不应采用对象转换为JSON字符串的写法:
rabbitTemplate.convertAndSend("rollback.order", JSON.toJSONString(rollbackTO));
验证方式:
临时打印发送内容 headers:
rabbitTemplate.setReturnsCallback(returned -> {Message returnedMessage = returned.getMessage();System.out.println("发送内容类型: " + returnedMessage.getMessageProperties().getContentType());
});
看到的 contentType 应该是:
“application/json”