当前位置: 首页 > news >正文

【RabbitMQ】死信队列

1.概述

死信,顾名思义就是无法被消费的消息,也就是没有被传到消费者的消息,或者即使传到了也没有被消费。当然有死信就有死信队列。死信队列就是用来存储死信的。

它的应用场景就是保证订单业务的消息数据不丢失,当消息消费发 生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付 时自动失效。

死信的来源有3种:

1.消息TTL过期(TTL就是过期时间)

2.队列达到最大长度(也就是队列装满了消息)

3.消息被拒绝((basic.reject 或 basic.nack))并且requeue=false

解释一下这里的requeue=false

在调用basic.rejectbasic.nack 方法时,都有一个参数用于决定是否将被拒绝的消息重新放回原队列 。当requeue设置为false 时,被拒绝的消息不会再回到原队列 。这种情况下,若队列配置了死信交换机(通过x-dead-letter-exchange参数设置 )等相关死信处理机制,消息就会成为死信消息,被转发到死信交换机,再由死信交换机根据路由键转发到对应的死信队列 。如果没有配置死信相关机制,消息就会被直接丢弃 

2.代码实现

在编写代码之前,先看看整个流程图

生产者生产消息发送到普通交换机中,交换机根据routing key将消息转发给相应的普通队列。当普通队列中的消息由于某些原因变成了死信消息,会把死信消息转发到相应的死信交换机中,死信交换机同样会根据routing key转发给相应的死信队列,然后可以安排专门的消费者去消费死信队列中的死信消息

下面分3种情况来讲,就是按照上述3种变成死信消息的情况

2.1消费超时

生产者

public class DeadProducer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("exchange-normal", "direct");
        //模拟消息超时,超过10秒钟,消息进入死信
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("exchange-normal", "zhangsan", properties, ("message" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者(处理正常消息)

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        Channel channel = connection.createChannel();
        //声明死信和普通交换机 类型都为direct
        channel.exchangeDeclare("exchange-normal", "direct");
        channel.exchangeDeclare("exchange-dead", "direct");
        //声明普通队列
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", "exchange-dead");
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //生成死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //绑定普通队列和交换机
        channel.queueBind(normalQueue, "exchange-normal", "zhangsan");
        //消费消息
        channel.basicConsume(normalQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息是:" + new String(body));
            }
        });
    }
}

消费者(处理死信消息)

public class DeadConsumer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("exchange-dead", "direct");
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //消费消息
        channel.basicConsume(deadQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息是:" + new String(body));
            }
        });
    }
}

2.2达到队列最大长度

生产者

public class DeadProducer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("exchange-normal", "direct");
        //模拟消息超时,超过10秒钟,消息进入死信
        //AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("exchange-normal", "zhangsan", null, ("message" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者(消费正常消息)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        Channel channel = connection.createChannel();
        //声明死信和普通交换机 类型都为direct
        channel.exchangeDeclare("exchange-normal", "direct");
        channel.exchangeDeclare("exchange-dead", "direct");
        //声明普通队列
        Map<String, Object> params = new HashMap<>();
        //设置正常队列长度限制 key是固定值
        params.put("x-max-length", 6);
        params.put("x-dead-letter-exchange", "exchange-dead");
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //生成死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //绑定普通队列和交换机
        channel.queueBind(normalQueue, "exchange-normal", "zhangsan");
        //消费消息
        channel.basicConsume(normalQueue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息是:" + new String(body));
            }
        });
    }
}

消费死信消息的就不重复写了,跟之前的一样。

2.3消息消费被拒绝

生产者和消费死信消息的消费者都是一样的,只需要改一下消费正常消息的消费者的代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        //声明channel
        final Channel channel = connection.createChannel();
        //声明死信和普通交换机 类型都为direct
        channel.exchangeDeclare("exchange-normal", "direct");
        channel.exchangeDeclare("exchange-dead", "direct");
        //声明普通队列
        Map<String, Object> params = new HashMap<>();
        //设置正常队列长度限制 key是固定值
        //params.put("x-max-length", 6);
        params.put("x-dead-letter-exchange", "exchange-dead");
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        //生成死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //绑定死信
        channel.queueBind(deadQueue, "exchange-dead", "lisi");
        //绑定普通队列和交换机
        channel.queueBind(normalQueue, "exchange-normal", "zhangsan");
        //消费消息
        channel.basicConsume(normalQueue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                if (message.equals("message5")) {
                    System.out.println("Consumer01接收到消息" + message + "并拒绝签收该消息");
                    //拒绝消费该消息
                    channel.basicReject(envelope.getDeliveryTag(), false);
                } else {
                    System.out.println("消费的消息是:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }
}

用equals进行匹配,如果是指定的消息,就拒绝消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.dtcms.com/a/120688.html

相关文章:

  • 红宝书第三十四讲:零基础学会单元测试框架:Jest、Mocha、QUnit
  • 解决 IntelliJ IDEA 中 Maven 项目左侧项目视图未显示顶层目录问题的详细步骤说明
  • [leetcode]查询区间内的所有素数
  • libev实现Io复用及定时器事件服务器
  • linux提权进阶 环境变量劫持提权 nfs提权
  • spark架构和RDD相关概念
  • 蓝桥杯-小明的背包(动态规划-Java)
  • #无类域间路由(快速复习版)
  • 宝塔面板面试内容整理-性能监控
  • 建筑工程管理系统功能模块概览
  • 【MySQL 数据库】增删查改操作CRUD(上)
  • Elasticsearch 系列专题 - 第一篇:Elasticsearch 入门
  • Ansible的使用3
  • 大数据技术之SPARK
  • 潮玩宇宙大逃杀游戏源码搭建部署二开,可切换单多杀boss模式
  • 怎么构造思维链数据?思维链提示工程的五大原则
  • 区块链是怎么存储块怎么找到前一个块
  • 如何向ESL阿联酋航运发送EDI CODECO报文?
  • DeepSeek 助力 Vue3 开发:打造丝滑的日历(Calendar)
  • 基于STM32、HAL库的IP6525S快充协议芯片简介及驱动程序设计
  • java基础语法(3)数组
  • CTF web入门之信息收集
  • Maven和MyBatis学习总结
  • Windows10系统更改盘符
  • 【力扣hot100题】(080)爬楼梯
  • 【JavaScript】异步编程
  • 图解AUTOSAR_SWS_FunctionInhibitionManager
  • 重新定义PPT创作!ChatPPT发布全球首个AI PPT专用MCP Server
  • 函数作为返回值输出
  • OSI七层模型的封装及解包分用的过程