当前位置: 首页 > news >正文

RabbitMQ的其中工作模式介绍以及Java的实现

文章目录

  • 前文
  • 一、模式介绍
    • 1. 简单模式
    • 2. 工作队列模式
    • 3. 广播模式
    • 4. 路由模式
    • 5. 通配符模式
    • 6. RPC模式
    • 7. 发布确认模式
  • 二、代码实现
    • 1、简单模式
    • 2、工作队列模式
      • 生产者
      • 消费者
        • 消费者 1
        • 消费者 2
    • 3、广播模式 (Fanout Mode)
      • 生产者
      • 消费者
    • 4、路由模式 (Direct Mode)
      • 生产者
      • 消费者
    • 5、通配符模式
    • 6、RPC模式 (Remote Procedure Call Mode)
      • 服务器 (Server)
      • 客户端 (Client)
    • 7、发布确认模式 (Publisher Confirms)
      • 1. 单独确认 (Publishing Messages Individually)
      • 2. 批量确认 (Publishing Messages in Batches)
      • 3. 异步确认
      • 对比总结

前文

为了更好的理解RabbitMQ中的工作模式,最好先了解RabbitMQ的几种常见交换机的类型

  1. Fanout(扇出交换机)
    它会忽视路由键,把消息发送给所有绑定了该交换机的所有队列

  2. Direct(直接交换机)
    根据生产者发送消息时设置的routingKey和交换机与不同队列绑定的bindingKey进行匹配,如果匹配把消息发送给对应的队列

  3. Topic(通配符交换机)
    可以认为是Direct的升级版。Direct中bindingKey必须是一个常量字符串,在Topic中bindingKey可以是一个通配符,类似于正则表达式。只要routingKey符合bindingKey的字符串模式,那么就可以把消息发送给指定队列

RabbitMQ中用 .来分割每一个单词。*表示匹配一个任意单词,可以是单个字母。#表示可以匹配0个或者多个单词,比*宽松。
例如,# 可以匹配 a、a.b、a.b.c 等,而 . 只能匹配正好两个单词的路由键(如 a.b)

  1. Header
    这种交换机不依赖于routingKey和bindingKey。它会根据消息中的headers属性进行匹配。但是由于其性能低下,因此很少用。

此外,代码实现部分博客使用的RabbitMQ自带的依赖包。Spring也支持RabbitMQ。
两者在RabbitMQ官网都有说明。


一、模式介绍

1. 简单模式

七个模式中最简单的模式,特点是一个生产者p、一个消费者c,消息只能被消费一次。适用于消息只能被单个消费者消费的场景。
在这里插入图片描述

2. 工作队列模式

概述: ⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息

特点: 消息不会重复, 分配给不同的消费者

适⽤场景: 集群环境中做异步处理。例如12306候补成功的短信服务,其中每个短信服务功能是一样的,消息给到那个消费者都可以,类似于集群:在这里插入图片描述

在这里插入图片描述

3. 广播模式

概述: 图中x是exchange,exchange会根据消息中的routingkey与Q1、Q2绑定的bindkey进行匹配,如果匹配成功,把消息转发给指定的队列。
特点: 一个生产者发送给exchange的消息,会被exchange复制多分,分别发送给绑定了这个exchange的queue。每个消费者获得的消息都是一样的
应用场景: 比如1001就老喜欢这种东西了,想给自己的客户推销广告,用广播模式,就可以把消息发送给所有的用户。
在这里插入图片描述

4. 路由模式

概述: 这个模式相当于是广播模式的一个约束,它会根据消息中的routingKey和与其他队列绑定的bindingKey进行匹配,如果匹配才会把消息发送给指定队列。

💡routingKey 和bindgKey必须完全一直才能匹配成功

在这里插入图片描述

5. 通配符模式

概述: 相当于路由模式的升级版,只要消息中的routingKey与指定队列的通配符匹配进行发送消息。
在这里插入图片描述

根据上图示例:

  1. ff.a.j与*.a.*匹配,该消息就会发送到Q1
  2. 消息:c.jojo.hyy 与c.#匹配,该消息就会发送到Q2

6. RPC模式

概述: RCP模式下 没有Producter和Consumer的概念,取而代之的是Client和Server的结构。Client发送消息给Server并且希望Server能发送一个期望的响应给Client,可以使用RPC模式.

特点: Client发送消息会设定两个字段relyTo、correlationId。replyTo用于指定Server使用哪一个回调队列(图中使用的Reply)发送响应给到Client。Client会等待回调队列发送reply给到自己,根据correlationId确保是Cilen需要的响应。
在这里插入图片描述


7. 发布确认模式

概述: 发布确认机制是RabbitMQ用于保证消息可靠性的其中一个方式。

  1. producter把对应的channel设置成confirm模式(通过channel.confirmSelect方法实现),并且设定一个消息唯一ID,把消息与唯一ID关联起来
  2. exchange接收到消息后会发送一个ACK响应给到producter(响应中含有唯一ID),表明消息已经送达。
    这种方法可以尽可能的避免在消息发送过程中的丢失问题。

二、代码实现

代码实现,主要有两种,一个是RabbitMQ官方提供的依赖包,另一个是Spring官方AMQP对RabbitMQ的封装实现,两者都会演示

RabbitMQ中央仓库
找到合适的版本导入即可,本博客使用的5.20.0版本。

1、简单模式

生产者:

public class Producer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);//4、发布消息channel.basicPublish("", Constants.SIMPLE_QUEUE, null, "呵呵".getBytes());System.out.println("执行了发布");//5、关闭连接channel.close();connection.close();}
}

💡

  1. Channel、Connection的相关包都来自于com.rabbitmq.client不要导错了
  2. 步骤四中参数 “” 的意思是使用默认交换机(Direct类型),bindingKey就是已经绑定的队列名字。
    消费者:
public class Consumer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);//5、定义consumer逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//根据实际场景消费消息System.out.println("消息已经被消费,获取的消息内容:" + new String(body));}};//6、消费内容channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);//7、关闭连接channel.close();connection.close();}
}

💡
consumerTag: 标识不同消费者的唯一标签
envelope: 描述了消息传递的细节,如该消息是由那个交换机发送的,消息指定的routingKey是什么,消息的唯一标识deliveryTag。
properties: 用于设定RabbitMQ的高级属性
body: 消息的本体,以二进制方式存储

2、工作队列模式

工作队列模式(Work Queue Mode)是一种任务分发的模式,允许多个消费者从同一个队列中获取消息并处理,从而实现任务的负载均衡。消息会被轮询(Round-Robin)分发到不同的消费者,适合处理耗时任务的场景。

生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Product {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、发送20条消息for (int i = 0; i < 20; i++) {channel.basicPublish("", Constants.WORK_QUEUE, null, ("工作队列的消息" + i).getBytes());}// 5、关闭连接channel.close();connection.close();}
}

说明:

  • 队列声明:使用 channel.queueDeclare 声明一个持久化的队列(durable=true),确保队列在 RabbitMQ 重启后依然存在。
  • 消息发送:通过 basicPublish 方法向默认交换机("")发送消息,路由键为队列名称(Constants.WORK_QUEUE)。
  • 消息内容:循环发送 20 条消息,每条消息为 "工作队列的消息" + i
  • 连接关闭:发送完成后关闭信道和连接。

💡 注意

  • 参数 "" 表示使用默认交换机(Direct 类型),路由键直接绑定到队列名称。

消费者

消费者从工作队列中获取消息并处理。以下是两个消费者的实现,分别命名为 Consumer1Consumer2,它们共享同一队列的消息,每个消费者拿到不同的消息。

消费者 1
public class Consumer1 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("第一个消费者消费消息:" + new String(body));}};// 5、消费channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// 6、保持连接(注释掉关闭连接的代码)// channel.close();// connection.close();}
}
消费者 2

public class Consumer2 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("第二个消费者消费消息:" + new String(body));}};// 5、消费channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// 6、保持连接(注释掉关闭连接的代码)// channel.close();// connection.close();}
}

说明:

  • 队列声明:与生产者一致,消费者也声明相同的队列,确保队列存在。
  • 消费逻辑:通过继承 DefaultConsumer 并重写 handleDelivery 方法,定义消息处理逻辑。Consumer1Consumer2 分别打印接收到的消息,标识为“第一个消费者”和“第二个消费者”。
  • 消息消费:使用 channel.basicConsume 订阅队列,autoAck=true 表示自动确认消息(消费者接收消息后自动通知 RabbitMQ,把消息从队列中删除)。

💡 注意

  • consumerTag:标识消费者的唯一标签,用于区分不同的消费者。
  • envelope:包含消息的元数据,如路由键、交换机和 deliveryTag(消息的唯一标识)。
  • properties:消息的附加属性,可用于高级配置。
  • body:消息的实际内容,以字节数组形式存储。

3、广播模式 (Fanout Mode)

广播模式通过 Fanout 交换机将消息分发到所有绑定的队列,忽略路由键,适合发布/订阅场景。以下基于提供的代码续写。

生产者

关键点:

  • 声明 Fanout 交换机 (exchangeDeclare)。
  • 声明并绑定多个队列到交换机 (queueDeclare, queueBind)。
  • 发布消息到交换机,路由键为空 (basicPublish)。

消费者

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}

关键点:

  • 声明队列并监听消息 (queueDeclare, basicConsume)。
  • 每个消费者独立消费绑定队列的消息。
  • 自动确认消息 (autoAck=true)。

💡 注意:

  • Fanout 模式下,路由键被忽略,消息广播到所有绑定队列。
  • 确保交换机和队列正确绑定,避免消息丢失。

4、路由模式 (Direct Mode)

路由模式通过 Direct 交换机根据路由键精确分发消息到匹配的队列,适合需要条件路由的场景。

生产者

public class Producer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5、交换机绑定队列q1 q2channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"q1");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"q2");//6、生产者发送消息channel.basicPublish(Constants.DIRECT_EXCHANGE,"q1",null,("q1需要接收到这个消息").getBytes());channel.basicPublish(Constants.DIRECT_EXCHANGE,"q2",null,("q2需要接收到这个消息").getBytes());//7、关闭资源channel.close();connection.close();}
}

关键点:

  • 声明 Direct 交换机 (exchangeDeclare)。
  • 声明队列并绑定到交换机,指定路由键 (queueBind)。
  • 发布消息时指定路由键 (basicPublish)。

消费者

public class Consumer1 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//5、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+new String(body));}};//5、消费channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);//6、关闭连接channel.close();connection.close();}
}
public class Consumer2 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+new String(body));}};//5、消费channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);//6、关闭连接channel.close();connection.close();}
}

关键点:

  • 声明队列并监听消息 (queueDeclare, basicConsume)。
  • 根据队列绑定的路由键接收对应消息。
  • 自动确认消息 (autoAck=true)。

💡 注意:

  • 路由键必须精确匹配,消息才会分发到对应队列。
  • 队列可以绑定多个路由键,增加灵活性。
  • 未绑定路由键的队列不会收到消息。

5、通配符模式

通配符模式和路由模式实现的不同点就是交换机使用TOPIC类型,交换机和队列绑定使用通配符,其他代码几乎一致,这里就不演示了。

6、RPC模式 (Remote Procedure Call Mode)

RPC模式通过RabbitMQ实现客户端与服务器的双向通信,客户端发送请求到服务器并等待响应,适合需要同步响应的场景。

服务器 (Server)

public class Server {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列并且设定对多处理消息数channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.basicQos(1);//5、定义consumer逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//根据实际场景消费消息System.out.println("客服端发送了这个消息:" + new String(body));//获取消息中的correID将其发送会客户端AMQP.BasicProperties proper = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();//给客户端发送响应,指定使用replayTochannel.basicPublish("", properties.getReplyTo(), proper, ("收到来自客户端的请求").getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};//6、消费内容channel.basicConsume(Constants.RPC_REQUEST_QUEUE, true, consumer);}
}

关键点:

  • 声明请求和响应队列 (queueDeclare)。
  • 设置消息处理限制 (basicQos),确保按序处理。
  • 消费请求队列消息,发送响应到客户端指定的 replyTo 队列。
  • 使用 correlationId 关联请求和响应。

客户端 (Client)

public class Client {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4、生成唯一ID用于区分当前消息String corrID= UUID.randomUUID().toString();//5、配置请求相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();//4、发布消息channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, "呵呵".getBytes());System.out.println("执行了发布");//5、等待响应final BlockingQueue<String> bq= new LinkedBlockingQueue<>(1);DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response=new String(body);if(properties.getCorrelationId().equals(corrID)){System.out.println("接收到回调消息:"+response);bq.offer(response);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);bq.take();}
}

关键点:

  • 声明请求和响应队列 (queueDeclare)。
  • 生成唯一 correlationId 标识请求。
  • 发送请求到 RPC_REQUEST_QUEUE,指定 replyTo 为响应队列。
  • 监听 RPC_RESPONSE_QUEUE,验证 correlationId 匹配后处理响应。

7、发布确认模式 (Publisher Confirms)

发布确认模式确保生产者发送的消息被RabbitMQ正确接收,提供可靠性保证。确认方式有以下三种:

1. 单独确认 (Publishing Messages Individually)

  private static void individually() throws Exception {try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4. 发送消息, 并等待确认long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}

关键点:

  • 启用确认模式 (confirmSelect)。
  • 每发送一条消息,同步等待确认 (waitForConfirmsOrDie)。
  • 适合小规模消息发送,因为性能较低。

2. 批量确认 (Publishing Messages in Batches)

 private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}

关键点:

  • 启用确认模式 (confirmSelect)。
  • 每发送一批消息 (如100条),同步等待确认 (waitForConfirmsOrDie)。
  • 平衡了性能与可靠性。
  • 但在一些消息容易遗失的场景,我们不清楚具体是那个消息出现问题,需要批量重发消息,性能可能不增返降。

3. 异步确认

    private static void asynchronously() throws Exception{try (Connection connection = createConnection()){//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 监听confirm//集合中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//5. 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}

关键点:

  • 启用确认模式 (confirmSelect)。
  • 使用 channel.ConfirmListener() 开启监听,异步处理确认 (handleAck, handleNack)。
  • 通过 SortedSet 跟踪未确认消息。
  • 最高吞吐量,适合大规模消息发送。

对比总结

策略优点缺点适用场景
单独确认简单,高可靠性延迟高,吞吐量低小规模、可靠性优先
批量确认平衡性能与可靠性仍需同步等待,部分延迟中等规模、可靠性与性能兼顾
异步确认高吞吐量,低延迟实现复杂,需处理失败重发大规模、高性能需求

相关文章:

  • 【Qt】:设置hover属性,没有适应到子控件中
  • 【Qt】QImage实战
  • 【HTML-5】HTML 实体:完整指南与最佳实践
  • Qt+线段拖曳示例代码
  • Qt功能区:Ribbon控件
  • 在 Qt 中实现动态切换主题(明亮和暗黑)
  • Dify的大语言模型(LLM) AI 应用开发平台-本地部署
  • 基于Qt的app开发第十天
  • 微软 Build 2025:开启 AI 智能体时代的产业革命
  • QT中信号和事件的区别
  • 精益制造数字化转型智能工厂三年规划建设方案
  • 《算法笔记》11.8小节——动态规划专题->总结 问题 G: 点菜问题
  • 工具环境与系统部署
  • React中使用 Ant Design Charts 图表
  • 【人工智能发展史】从黎明到曙光01
  • 精益数据分析(75/126):用户反馈的科学解读与试验驱动迭代——Rally的双向验证方法论
  • react中运行 npm run dev 报错,提示vite.config.js出现错误 @esbuild/win32-x64
  • PHP伪随机数
  • Java Collection(集合) 接口
  • windows powershell 判断 进程号是否存在
  • 万网域名抢注/seo关键词排名公司
  • 织梦教育网站模板/百度置顶广告多少钱
  • 做非法网站判什么邢/广告资源发布平台
  • 承德北京网站建设/新品牌推广方案
  • 易语言编程软件做网站/百度网盘官网网页版
  • 杭州疫情最新数据消息/百度seo优化排名软件