RabbitMQ延时队列的两种实现方式
目录
一、延时插件实现
1、版本要求
2、为运行新容器时安装
3、为已运行的容器安装
4、验证安装
5、代码编写
1. 配置类
2. 生产者
3. 消费者
二、死信队列实现
1、代码编写
1. 配置类
2. 生产者
3. 消费者
三、踩坑记录
1、发送消息失败
2、消息过期后未能转发到死信队列
3、消费者消费报错
一、延时插件实现
1、版本要求
RabbitMQ 3.5.7以上
2、为运行新容器时安装
# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
# 2. 启动容器并启用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"
3、为已运行的容器安装
# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 3. 退出容器
exit
# 4. 重启容器使插件生效
docker restart rabbitmq
4、验证安装
# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项
5、代码编写
1. 配置类
@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交换机类型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定类型true,false,args);}
@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}
2. 生产者
public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后处理器:设置延时和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}
3. 消费者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
}
二、死信队列实现
1、代码编写
1. 配置类
@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信队列(延时队列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
// 死信交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
// 绑定死信队列到死信交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
// 绑定普通队列到普通交换机@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}
2. 生产者
public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入库略,uuid为主键MessageProperties properties = new MessageProperties();// 设置TTL,单位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}
3. 消费者
@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
}
三、踩坑记录
1、发送消息失败
原因:RabbitTemplate
配置了消息抵达确认,消息ID没有传值。
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵达队列成功:{}", data);} else {log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);}
});
生产者实际发送消息未传消息ID:
错误格式
rabbitTemplate.convertAndSend(exchange, routingKey, data);
正确格式
String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));
2、消息过期后未能转发到死信队列
原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。
错误格式
@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}
正确格式
@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键.build();
}
3、消费者消费报错
原因:发送的消息由于自定义的 MessageProperties
,其中缺失了 contentType
参数,需要使用转化器进行转换,而不是直接发送消息。
错误格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));
正确格式
MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));