当前位置: 首页 > news >正文

RabbitMQ知识点

1.为什么需要消息队列?

 

 

 

 

 

 

 

 RabbitMQ体系结构

操作001:RabbitMQ安装

二、安装

# 拉取镜像
docker pull rabbitmq:3.13-management
​
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management

二、验证

访问后台管理界面:http://192.168.200.100:15672

使用上面创建Docker容器时指定的默认用户名、密码登录:

三、可能的问题

1、问题现象

在使用Docker拉取RabbitMQ镜像的时候,如果遇到提示:missing signature key,那就说明Docker版本太低了,需要升级

比如我目前的Docker版本如下图所示:

2、解决办法

基于CentOS7

①卸载当前Docker

更好的办法是安装Docker前曾经给服务器拍摄了快照,此时恢复快照;

如果不曾拍摄快照,那只能执行卸载操作了

yum erase -y docker \
    docker-client \
    docker-client-latest \
    docker-common \
    docker-latest \
    docker-latest-logrotate \
    docker-logrotate \
    docker-selinux \
    docker-engine-selinux \
    docker-engine \
    docker-ce

②升级yum库

yum update -y

③安装Docker最新版

yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

如果这一步看到提示:没有可用软件包 docker-ce,那就添加Docker的yum源:

yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

④设置Docker服务

systemctl start docker
systemctl enable docker

3、验证

上述操作执行完成后,再次查看Docker版本:

操作002:HelloWorld

一、目标

生产者发送消息,消费者接收消息,用最简单的方式实现

官网说明参见下面超链接:

RabbitMQ tutorial - "Hello World!" — RabbitMQ

二、具体操作

1、创建Java工程

①消息发送端(生产者)

②消息接收端(消费者)

③添加依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.20.0</version>
    </dependency>
</dependencies>

2、发送消息

①Java代码

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

package com.atguigu.rabbitmq.simple;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
        // 创建连接工厂  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
  
        // 设置主机地址  
        connectionFactory.setHost("192.168.200.100");  
  
        // 设置连接端口号:默认为 5672
        connectionFactory.setPort(5672);
  
        // 虚拟主机名称:默认为 /
        connectionFactory.setVirtualHost("/");
  
        // 设置连接用户名;默认为guest  
        connectionFactory.setUsername("guest");
  
        // 设置连接密码;默认为guest  
        connectionFactory.setPassword("123456");
  
        // 创建连接  
        Connection connection = connectionFactory.newConnection();  
  
        // 创建频道  
        Channel channel = connection.createChannel();  
  
        // 声明(创建)队列  
        // queue      参数1:队列名称  
        // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  
        // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  
        // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  
        // arguments  参数5:队列其它参数  
        channel.queueDeclare("simple_queue", true, false, false, null);  
  
        // 要发送的信息  
        String message = "你好;小兔子!";  
  
        // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  
        // 参数2:路由key,简单模式可以传递队列名称  
        // 参数3:配置信息  
        // 参数4:消息内容  
        channel.basicPublish("", "simple_queue", null, message.getBytes());  
  
        System.out.println("已发送消息:" + message);  
  
        // 关闭资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

②查看效果

3、接收消息

①Java代码

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

package com.atguigu.rabbitmq.simple;  
  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
  
public class Consumer {  
  
    public static void main(String[] args) throws Exception {  
  
        // 1.创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
  
        // 2. 设置参数  
        factory.setHost("192.168.200.100");  
        factory.setPort(5672);  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");
        factory.setPassword("123456");  
  
        // 3. 创建连接 Connection        
        Connection connection = factory.newConnection();  
  
        // 4. 创建Channel  
        Channel channel = connection.createChannel();  
  
        // 5. 创建队列  
        // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  
        // 参数1. queue:队列名称  
        // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  
        // 参数3. exclusive:是否独占。  
        // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  
        // 参数5. arguments:其它参数。  
        channel.queueDeclare("simple_queue",true,false,false,null);  
  
        // 接收消息  
        DefaultConsumer consumer = new DefaultConsumer(channel){  
  
            // 回调方法,当收到消息后,会自动执行该方法  
            // 参数1. consumerTag:标识  
            // 参数2. envelope:获取一些信息,交换机,路由key...  
            // 参数3. properties:配置信息  
            // 参数4. body:数据  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("consumerTag:"+consumerTag);  
                System.out.println("Exchange:"+envelope.getExchange());  
                System.out.println("RoutingKey:"+envelope.getRoutingKey());  
                System.out.println("properties:"+properties);  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        // 参数1. queue:队列名称  
        // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  
        // 参数3. callback:回调对象  
        // 消费者类似一个监听程序,主要是用来监听消息  
        channel.basicConsume("simple_queue",true,consumer);  
  
    }  
  
}

②控制台打印

consumerTag:amq.ctag-8EB87GaZFP52LKSMcj98UA Exchange: RoutingKey:simple_queue properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) body:你好;小兔子!

③查看后台管理界面

因为消息被消费掉了,所以RabbitMQ服务器上没有了:

操作003:工作队列模式

Work Queues 本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:• 生产者只有一个 • 发送一个消息 • 消费者也只有一个,消息也只能被这个消费者消费所以HelloWorld也称为简单模式。 现在我们还原一下常规情况: • 生产者发送多个消息 • 由多个消费者来竞争 • 谁抢到算谁的

结论: • 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。• Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。

一、生产者代码

1、封装工具类

package com.atguigu.rabbitmq.util;  
  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class ConnectionUtil {  
  
    public static final String HOST_ADDRESS = "192.168.200.100";  
  
    public static Connection getConnection() throws Exception {  
  
        // 定义连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
  
        // 设置服务地址  
        factory.setHost(HOST_ADDRESS);  
  
        // 端口  
        factory.setPort(5672);  
  
        //设置账号信息,用户名、密码、vhost  
        factory.setVirtualHost("/");  
        factory.setUsername("guest");  
        factory.setPassword("123456");  
  
        // 通过工程获取连接  
        Connection connection = factory.newConnection();  
  
        return connection;  
    }  
  
  
  
    public static void main(String[] args) throws Exception {  
  
        Connection con = ConnectionUtil.getConnection();  
  
        // amqp://guest@192.168.200.100:5672/  
        System.out.println(con);  
  
        con.close();  
  
    }  
  
}

2、编写代码

package com.atguigu.rabbitmq.work;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
  
public class Producer {  
  
    public static final String QUEUE_NAME = "work_queue";  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        for (int i = 1; i <= 10; i++) {  
  
            String body = i+"hello rabbitmq~~~";  
  
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  
  
        }  
  
        channel.close();  
  
        connection.close();  
  
    }  
  
}

3、发送消息效果

img

二、消费者代码

1、编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。

package com.atguigu.rabbitmq.work;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
  
public class Consumer1 {  
  
    static final String QUEUE_NAME = "work_queue";  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("Consumer1 body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。 如果已经运行过生产者程序,则手动把work_queue队列删掉。

2、运行效果

最终两个消费端程序竞争结果如下:

image-20231103103841644

操作004:发布订阅模式

生产者不是把消息直接发送到队列,而是发送到交换机

• 交换机接收消息,而如何处理消息取决于交换机的类型• 交换机有如下3种常见类型

• Fanout:广播,将消息发送给所有绑定到交换机的队列

• Direct:定向,把消息交给符合指定routing key的队列

• Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

• 注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失! 

组件之间关系:

        • 生产者把消息发送到交换机

        • 队列直接和交换机绑定

• 工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列

• 理解概念:

        • Publish:发布,这里就是把消息发送到交换机上

        • Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系

一、生产者代码

package com.atguigu.rabbitmq.fanout;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
      // 1、获取连接  
        Connection connection = ConnectionUtil.getConnection();  
  
      // 2、创建频道  
        Channel channel = connection.createChannel();  
  
        // 参数1. exchange:交换机名称  
        // 参数2. type:交换机类型  
        //     DIRECT("direct"):定向  
        //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  
        //     TOPIC("topic"):通配符的方式  
        //     HEADERS("headers"):参数匹配  
        // 参数3. durable:是否持久化  
        // 参数4. autoDelete:自动删除  
        // 参数5. internal:内部使用。一般false  
        // 参数6. arguments:其它参数  
        String exchangeName = "test_fanout";  
  
        // 3、创建交换机  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  
  
        // 4、创建队列  
        String queue1Name = "test_fanout_queue1";  
        String queue2Name = "test_fanout_queue2";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        // 5、绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //     如果交换机的类型为fanout,routingKey设置为""  
        channel.queueBind(queue1Name,exchangeName,"");  
        channel.queueBind(queue2Name,exchangeName,"");  
  
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";  
  
        // 6、发送消息  
        channel.basicPublish(exchangeName,"",null,body.getBytes());  
  
        // 7、释放资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

二、消费者代码

1、消费者1号

package com.atguigu.rabbitmq.fanout;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue1Name = "test_fanout_queue1";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue1Name,true,consumer);  
  
    }  
  
}

2、消费者2号

package com.atguigu.rabbitmq.fanout;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue2Name = "test_fanout_queue2";  
  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue2Name,true,consumer);  
  
    }  
  
}

三、运行效果

还是先启动消费者,然后再运行生产者程序发送消息:

img

img

四、小结

交换机和队列的绑定关系如下图所示:

images

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机

  • 发布订阅模式绑定指定交换机

  • 监听同一个队列的消费端程序彼此之间是竞争关系

  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

操作006-路由模式

 通过『路由绑定』的方式,把交换机和队列关联起来

• 交换机和队列通过路由键进行绑定

• 生产者发送消息时不仅要指定交换机,还要指定路由键

• 交换机接收到消息会发送到路由键绑定的队列

• 在编码上与 Publish/Subscribe发布与订阅模式的区别:

        • 交换机的类型为:Direct

        • 队列绑定交换机的时候需要指定routing key。

一、生产者代码

package com.atguigu.rabbitmq.routing;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
      Connection connection = ConnectionUtil.getConnection();  
  
      Channel channel = connection.createChannel();  
  
      String exchangeName = "test_direct";  
  
      // 创建交换机  
      channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  
  
      // 创建队列  
      String queue1Name = "test_direct_queue1";  
      String queue2Name = "test_direct_queue2";  
  
      // 声明(创建)队列  
      channel.queueDeclare(queue1Name,true,false,false,null);  
      channel.queueDeclare(queue2Name,true,false,false,null);  
  
      // 队列绑定交换机  
      // 队列1绑定error  
      channel.queueBind(queue1Name,exchangeName,"error");  
  
      // 队列2绑定info error warning  
      channel.queueBind(queue2Name,exchangeName,"info");  
      channel.queueBind(queue2Name,exchangeName,"error");  
      channel.queueBind(queue2Name,exchangeName,"warning");  
  
        String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";  
  
        // 发送消息  
        channel.basicPublish(exchangeName,"warning",null,message.getBytes());  
        System.out.println(message);  
  
      // 释放资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

二、消费者代码

1、消费者1号

package com.atguigu.rabbitmq.routing;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue1Name = "test_direct_queue1";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("Consumer1 将日志信息打印到控制台.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue1Name,true,consumer);  
  
    }  
  
}

2、消费者2号

package com.atguigu.rabbitmq.routing;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String queue2Name = "test_direct_queue2";  
  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
                System.out.println("Consumer2 将日志信息存储到数据库.....");  
  
            }  
  
        };  
  
        channel.basicConsume(queue2Name,true,consumer);  
  
    }  
  
}

三、运行结果

1、绑定关系

img

2、消费消息

操作006:主题模式

 • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符

• Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert

• 通配符规则:

        • #:匹配零个或多个词

        • *:匹配一个词

一、生产者代码

package com.atguigu.rabbitmq.topic;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String exchangeName = "test_topic";  
  
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);  
  
        String queue1Name = "test_topic_queue1";  
        String queue2Name = "test_topic_queue2";  
  
        channel.queueDeclare(queue1Name,true,false,false,null);  
        channel.queueDeclare(queue2Name,true,false,false,null);  
  
        // 绑定队列和交换机  
      // 参数1. queue:队列名称  
      // 参数2. exchange:交换机名称  
      // 参数3. routingKey:路由键,绑定规则  
      //      如果交换机的类型为fanout ,routingKey设置为""  
        // routing key 常用格式:系统的名称.日志的级别。  
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  
        channel.queueBind(queue1Name,exchangeName,"#.error");  
        channel.queueBind(queue1Name,exchangeName,"order.*");  
        channel.queueBind(queue2Name,exchangeName,"*.*");  
  
        // 分别发送消息到队列:order.info、goods.info、goods.error  
        String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";  
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());  
  
        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";  
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());  
  
        body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";  
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());  
  
        channel.close();  
        connection.close();  
  
    }  
  
}

二、消费者代码

1、消费者1号

消费者1监听队列1:

package com.atguigu.rabbitmq.topic;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer1 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String QUEUE_NAME = "test_topic_queue1";  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

2、消费者2号

消费者2监听队列2:

package com.atguigu.rabbitmq.topic;  
  
import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  
  
public class Consumer2 {  
  
    public static void main(String[] args) throws Exception {  
  
        Connection connection = ConnectionUtil.getConnection();  
  
        Channel channel = connection.createChannel();  
  
        String QUEUE_NAME = "test_topic_queue2";  
  
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  
        Consumer consumer = new DefaultConsumer(channel){  
  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  
                System.out.println("body:"+new String(body));  
  
            }  
  
        };  
  
        channel.basicConsume(QUEUE_NAME,true,consumer);  
  
    }  
  
}

三、运行效果

队列1:

img

队列2:

操作007:整合SpringBoot

搭建环境

• 基础设定:交换机名称、队列名称、绑定关系

• 发送消息:使用RabbitTemplate

• 接收消息:使用@RabbitListener注解

1、消费者工程

①创建module

②配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>
​
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

③YAML

增加日志打印的配置:

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
logging:
  level:
    com.atguigu.mq.listener.MyMessageListener: info

④主启动类

仿照生产者工程的主启动类,改一下类名即可

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQConsumerMainType {
​
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQConsumerMainType.class, args);
    }
​
}

⑤监听器

package com.atguigu.mq.listener;
​
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MyMessageListener {
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";  
    public static final String QUEUE_NAME  = "queue.order";  
  
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT),
            key = {ROUTING_KEY}
    ))
    public void processMessage(String dateString,
                               Message message,
                               Channel channel) {
        log.info(dateString);
    }
  
}

2、@RabbitListener注解属性对比

①bindings属性

  • 表面作用:

    • 指定交换机和队列之间的绑定关系

    • 指定当前方法要监听的队列

  • 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

②queues属性

@RabbitListener(queues = {QUEUE_ATGUIGU})
  • 作用:指定当前方法要监听的队列

  • 注意:此时框架不会创建相关交换机和队列,必须提前创建好

3、生产者工程

①创建module

images

②配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>
​
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

③YAML

spring: 
  rabbitmq: 
    host: 192.168.200.100
    port: 5672 
    username: guest 
    password: 123456 
    virtual-host: /

④主启动类

package com.atguigu.mq;  
  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
  
@SpringBootApplication
public class RabbitMQProducerMainType {
​
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);  
    }
​
}

⑤测试程序

package com.atguigu.mq.test;
  
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";  
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage() {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

消息可靠性投递

下单操作的正常流程

 故障情况1

消息没有发送到消息队列上

后果:消费者拿不到消息,业务功能缺失,数据错误

 故障情况2

消息成功存入消息队列,但是消息队列服务器宕机了

原本保存在内存中的消息也丢失了

即使服务器重新启动,消息也找不回来了

后果:消费者拿不到消息,业务功能缺失,数据错误

故障情况3 

消息成功存入消息队列,但是消费端出现问题,

例如:宕机、抛异常等等后果:业务功能缺失,数据错误

 对症下药

• 故障情况1:消息没有发送到消息队列

        • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送

        • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机

• 故障情况2:消息队列服务器宕机导致内存中消息丢失

        • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失

• 故障情况3:消费端宕机或抛异常导致消息没有成功被消费

        • 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息

        • 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性

操作008-01-A:生产者端消息确认机制

一、创建module

二、搭建环境

1、配置POM

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>
​
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

2、主启动类

没有特殊设定:

package com.atguigu.mq;  
  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
  
@SpringBootApplication
public class RabbitMQProducerMainType {
​
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQProducerMainType.class, args);  
    }
​
}

3、YAML

注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认
logging:
  level:
    com.atguigu.mq.config.MQProducerAckConfig: info

三、创建配置类

1、目标

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法所需对象类型
setConfirmCallback()ConfirmCallback接口类型
setReturnCallback()ReturnCallback接口类型

2、API说明

①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

    /**
     * A callback for publisher confirmations.
     *
     */
    @FunctionalInterface
    public interface ConfirmCallback {
​
        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
​
    }

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机

  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

    /**
     * A callback for returned messages.
     *
     * @since 2.3
     */
    @FunctionalInterface
    public interface ReturnsCallback {
​
        /**
         * Returned message callback.
         * @param returned the returned message and metadata.
         */
        void returnedMessage(ReturnedMessage returned);
​
    }

注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称

3、配置类代码

①要点1

加@Component注解,加入IOC容器

②要点2

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。

操作封装到了一个专门的void init()方法中。

为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。

关于@PostConstruct注解大家可以参照以下说明:

@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。

使用@PostConstruct注解的方法必须满足以下条件:

  1. 方法不能有任何参数。

  2. 方法必须是非静态的。

  3. 方法不能返回任何值。

当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。

③代码

有了以上说明,下面我们就可以展示配置类的整体代码:

package com.atguigu.mq.config;
​
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
​
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机成功!数据:" + correlationData);
        } else {
            log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
        }
    }
​
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        //发送到交换机失败调用的方法
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
    }
}

四、发送消息

package com.atguigu.mq.test;
  
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest  
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test  
    public void testSendMessage() {  
        rabbitTemplate.convertAndSend(  
                EXCHANGE_DIRECT,   
                ROUTING_KEY,   
                "Hello atguigu");  
    }  
  
}

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确

  • 交换机正确、路由键不正确,无法发送到队列

  • 交换机不正确,无法发送到交换机

操作008-01-B:备份交换机

一、创建备份交换机

1、创建备份交换机

注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键

image-20231203231926247

image-20231202183701454

2、创建备份交换机要绑定的队列

①创建队列

image-20231202183906676

image-20231202183949674

②绑定交换机

注意:这里是要和备份交换机绑定

image-20231203232801504

3、针对备份队列创建消费端监听器

    public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
    public static final String QUEUE_NAME_BACKUP  = "queue.order.backup";
​
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
            key = {""}
    ))
    public void processMessageBackup(String dateString,
                                     Message message,
                                     Channel channel) {
        log.info("BackUp: " + dateString);
    }

二、设定备份关系

1、原交换机删除

·

image-20231202184840124

2、重新创建原交换机

image-20231202185211633

image-20231202185342087

3、原交换机重新绑定原队列

image-20231202190111581

image-20231202185955138

image-20231202190036520

三、测试

  • 启动消费者端

  • 发送消息,但是路由键不对,于是转入备份交换机

相关文章:

  • 二叉树-验证二叉搜索树
  • 计算机网络开发(2)TCP\UDP区别、TCP通信框架、服务端客户端通信实例
  • RV1126采集VI视频数据流
  • 【QWEN】机器人控制器的控制周期越短精度越高吗
  • Kotlin D1
  • 模块15.常用API
  • Java接口(3)与图书管理系统
  • Android 多用户相关
  • tcp/ip协议详细介绍,tcpip协议详细介绍
  • DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)示例5: 搜索和过滤
  • DeepSeek精品课分享 清北
  • 《几何原本》命题I.15
  • YOLOv12改进之A2(区域注意力)
  • LeetCode热题100JS(44/100)第八天|二叉树的直径|二叉树的层序遍历|将有序数组转换为二叉搜索树|验证二叉树搜索树|二叉搜索树中第K小的元素
  • Kafka - 高吞吐量的七项核心设计解析
  • 全面复习回顾——C++语法篇2
  • Docker部署开源运维工具MyIP结合内网穿透远程在线网络诊断和监控
  • 【Unity Shader编程】之光照模型
  • 【1Panel】平替宝塔面板!1Panel面板香橙派部署结合内网穿透远程管理
  • 传统架构与集群架构搭建LAMP环境并部署WordPress服务
  • 网络课程系统网站建设费用/优化搜狗排名
  • 网站建设比较好/网页设计代码案例
  • 公司网站制作平台/友情链接模板
  • 做网站开发多少钱/广告推广 精准引流
  • 网站制作可以/推广之家
  • 鞍山创网站怎么创/淘宝引流推广平台