当前位置: 首页 > news >正文

RabbitMQ应用(1)

1.7种工作模式介绍

1.1 Simple(简单模式)

P: ⽣产者, 也就是要发送消息的程序

C: 消费者,消息的接收者

Queue: 消息队列, 图中⻩⾊背景部分. 类似⼀个邮箱, 可以缓存消息; ⽣产者向其中投递消息, 消费者从其中取出消息.

特点: ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次. 也称为点对点(Point-to-Point)模式.

适⽤场景: 消息只能被单个消费者处理

1.2 Work Queue(工作队列)

⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息.

特点: 消息不会重复, 分配给不同的消费者.

适⽤场景: 集群环境中做异步处理

⽐如12306 短信通知服务, 订票成功后, 订单消息会发送到RabbitMQ, 短信服务从RabbitMQ中获取订单信息, 并发送通知信息(在短信服务之间进⾏任务分配)

1.3 Publish/Subscribe(发布/订阅)

图中X表⽰交换机, 在订阅模型中,多了⼀个Exchange⻆⾊, 过程略有变化

概念介绍

Exchange: 交换机 (X).

作⽤: ⽣产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中, 实际上这个在RabbitMQ中不会发⽣. )

RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协议⾥还有另外两种类型, System和⾃定义, 此处不再描述.

1. Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)

2. Direct:定向,把消息交给符合指定routing key的队列(Routing模式)

3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)

4. headers类型的交换器不依赖于路由键的匹配规则来路由消息, ⽽是根据发送的消息内容中的headers属性进⾏匹配. headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.

Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失

RoutingKey: 路由键.⽣产者将消息发给交换器时, 指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息.

Binding Key:绑定. RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了。

⽐如下图: 如果在发送消息时, 设置了RoutingKey 为orange, 消息就会路由到Q1

当消息的Routing key与队列绑定的Bindingkey相匹配时,消息才会被路由到这个队列.

BindingKey其实也属于路由键中的⼀种, 官⽅解释为:the routingkey to use for the binding.

可以翻译为:在绑定的时候使⽤的路由键. ⼤多数时候,包括官⽅⽂档和RabbitMQJava API 中都把BindingKey和RoutingKey看作RoutingKey, 为了避免混淆,可以这么理解:

1. 在使⽤绑定的时候,需要的路由键是BindingKey.

2. 在发送消息的时候,需要的路由键是RoutingKey.

后续也可能把两者合称为Routing Key, ⼤家根据使⽤场景来区分

Publish/Subscribe模式

⼀个⽣产者P, 多个消费者C1, C2, X代表交换机消息复制多份,每个消费者接收相同的消息

⽣产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者

适合场景: 消息需要被多个消费者同时接收的场景.

比如: 实时通知或者⼴播消息⽐如中国⽓象局发布"天⽓预报"的消息送⼊交换机, 新浪,百度, 搜狐, ⽹易等⻔⼾⽹站接⼊消息, 通过队列绑定到该交换机, ⾃动获取⽓象局推送的⽓象数据

1.4 Routing(路由模式)

路由模式是发布订阅模式的变种, 在发布订阅基础上, 增加路由key

发布订阅模式是⽆条件的将所有消息分发给所有消费者, 路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列

适合场景: 需要根据特定规则分发消息的场景.

⽐如系统打印⽇志, ⽇志等级分为error, warning, info,debug, 就可以通过这种模式,把不同的⽇志发送到不同的队列, 最终输出到不同的⽂件

1.5 Topics(通配符模式)

路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活.

Topics和Routing的基本原理相同,即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列. 类似于正则表达式的⽅式来定义Routingkey的模式.

不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.

适合场景: 需要灵活匹配和过滤消息的场景

1.6 RPC(RPC通信)

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.

1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.

2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列

3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.

1.7 Publisher Confirms(发布确认)

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.

2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.

适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理.

2. 工作模式的使用案例

2.1 简单模式

快速⼊⻔程序就是简单模式. 此处省略.

2.2 Work Queues(工作队列)

简单模式的增强版, 和简单模式的区别就是: 简单模式有⼀个消费者, ⼯作队列模式⽀持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被⼀个消费者接收

步骤:

1. 引⼊依赖

2. 编写⽣产者代码

3. 编写消费者代码

引⼊依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amap-client</artifactId><version>5.7.3</version>
</dependency>

编写生产者代码

⼯作队列模式和简单模式区别是有多个消费者, 所以⽣产者消费者代码差异不⼤

相⽐简单模式, ⽣产者的代码基本⼀样, 为了能看到多个消费者竞争的关系, 我们⼀次发送10条消息

我们把发送消息的地⽅, 改为⼀次发送10条消息

for (int i = 0; i < 10; i++) {String msg = "Hello World" + i;channel.basicPublish("", QUEUE_NAME,null,msg.getBytes());
}

整体代码:

public class Constants {public static final String HOST = "110.41.51.65";public static final Integer PORT = 15673;public static final String VIRTUAL_HOST = "bite";public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String WORK_QUEUE_NAME = "work_queues";
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;public class WorkRabbitProducer {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 声明队列//如果没有一个这样的一个队列,会自动创建,如果有,则不创建channel.queueDeclare(Constants.WORK_QUEUE_NAME, true, false, false, null);//3. 发送消息for (int i = 0; i < 10; i++) {String msg = "Hello World" + i;channel.basicPublish("",Constants.WORK_QUEUE_NAME,null,msg.getBytes());}//4. 释放资源channel.close();connection.close();}
}

编写消费者代码

消费者代码和简单模式⼀样, 只是复制两份. 两个消费者代码可以是⼀样的

import com.rabbitmq.client.*;
import constant.Constants;import java.io.IOException;public class WorkRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 声明队列//如果没有一个这样的一个队列,会自动创建,如果有,则不创建channel.queueDeclare(Constants.WORK_QUEUE_NAME, true, false, false, null);//3. 接收消息,并消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE_NAME, true, consumer);}
}

运⾏程序, 观察结果

先启动两个消费者运⾏, 再启动⽣产者

如果先启动⽣产者, 在启动消费者, 由于消息较少, 处理较快, 那么第⼀个启动的消费者就会瞬间把10条消息消费掉, 所以我们先启动两个消费者, 再启动⽣产者

1. 启动2个消费者

2. 启动⽣产者

可以看到两个消费者都打印了消费信息

WorkQueuesConsumer1 打印

1 body:Hello World0

2 body:Hello World2

3 body:Hello World4

4 body:Hello World6

5 body:Hello World8

WorkQueuesConsumer2 打印

1 body:Hello World1

2 body:Hello World3

3 body:Hello World5

4 body:Hello World7

5 body:Hello World9

可以看到管理界⾯上显⽰两个消费者

2.3 Publish/Subscribe(发布/订阅)

在发布/订阅模型中,多了⼀个Exchange⻆⾊.

Exchange 常⻅有三种类型, 分别代表不同的路由规则

  1. Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
  2. Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)

也就分别对应不同的⼯作模式

我们来看看Publish/Subscribe 模式

步骤:

1. 引⼊依赖

2. 编写⽣产者代码

3. 编写消费者代码

引入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

编写生产者代码

和前⾯两个的区别是:

需要创建交换机, 并且绑定队列和交换机

创建交换机
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autobelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
* DIRECT("direct"), 定向, 直连, routing
* FANOUT("fanout"), 扇形(广播), 每个队列都能收到消息
* TOPIC("topic"), 通配符
* HEADERS("headers") 参数匹配(工作用的较少)
3. durable: 是否持久化。true-持久化, false非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
4. autoDelete: 自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。而不是这种理解:当与此交换器连接的客户端断开时,RabbitMQ会自动删除本交换器。
5. internal: 内部使用,一般false。如果没有true,表示内部使用。客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
6. arguments: 参数
*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME,BuiltinExchangeType.FANOUT, true, false, false, null);
声明两个队列

后⾯验证是否两个队列都能收到消息

//如果没有一个这样的一个队列,会自动创建,如果有,则不创建
channel.queueDeclare(Constants.FANOUT_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE_NAME2, true, false, false, null);
绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由key, 路由规则如果交换机类型为fanout,routingkey设置为"",表示每个消费者都可以收到全部信息
*/
channel.queueBind(Constants.FANOUT_QUEUE_NAME1, Constants.FANOUT_EXCHANGE_NAME, "");
channel.queueBind(Constants.FANOUT_QUEUE_NAME2, Constants.FANOUT_EXCHANGE_NAME, "");
发送消息
/**
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
* 参数说明:
* Exchange: 交换机名称
* routingKey: 如果交换机类型为fanout,routingkey设置为"",表示每个消费者都可以收到全部信息
*/ 
String msg = "hello fanout";
channel.basicPublish(Constants.FANOUT_EXCHANGE_NAME,"",null,msg.getBytes());
完整代码
public static String FANOUT_EXCHANGE_NAME = "test_fanout";
public static String FANOUT_QUEUE_NAME1 = "fanout_queue1";
public static String FANOUT_QUEUE_NAME2 = "fanout_queue2";import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;public class FanoutRabbitProducer {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 创建交换机/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autobelete, boolean internal, Map<String, Object> arguments)参数:1. exchange:交换机名称2. type:交换机类型* DIRECT("direct"); 定向,直连,routing* FANOUT("fanout"),扇形(广播),每个队列都能收到消息* TOPIC("topic"),通配符* HEADERS("headers") 参数匹配(工作用的较少)3. durable: 是否持久化4. autoDelete: 自动删除5. internal: 内部使用,一般falsee6. arguments: 参数*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);//3. 声明队列//如果没有一个这样的一个队列,会自动创建,如果有,则不创建channel.queueDeclare(Constants.FANOUT_QUEUE_NAME1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE_NAME2, true, false, false, null);//4. 绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)参数:1. queue: 队列名称2. exchange: 交换机名称3. routingKey: 路由key, 路由规则如果交换机类型为fanout, routingkey设置为"",表示每个消费者都可以收到全部信息*/channel.queueBind(Constants.FANOUT_QUEUE_NAME1, Constants.FANOUT_EXCHANGE_NAME, "");channel.queueBind(Constants.FANOUT_QUEUE_NAME2, Constants.FANOUT_EXCHANGE_NAME, "");//5. 发送消息/*** basicPublish(String exchange, String routingKey, AWQP.BasicProperties props, byte[] body)* 参数说明:* Exchange: 交换机名称* routingKey: 如果交换机类型为fanout, routingkey设置为"",表示每个消费者都可以收到全部信息*/String msg = "hello fanout";channel.basicPublish(Constants.FANOUT_EXCHANGE_NAME,"", null,msg.getBytes());//6.释放资源channel.close();connection.close();}
}

编写消费者代码

交换机和队列的绑定关系及声明已经在⽣产⽅写完, 所以消费者不需要再写了

去掉声明队列的代码就可以了

1. 创建Channel

2. 接收消息, 并处理

完整代码

消费者1

import com.rabbitmq.client.*;
import constant.Constants;import java.io.IOException;public class FanoutRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息,并消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE_NAME1, true, consumer);}
}

消费者2 把队列名称改⼀下就可以了. 此处省略

运⾏程序, 观察结果

1. 运⾏⽣产者

a) 可以看到两个队列分别有了⼀条消息

b) Exchange多了队列绑定关系

2. 运⾏消费者

消费者1

接收到消息: hello fanout

消费者2

接收到消息: hello fanout

2.4 Routing(路由模式)

队列和交换机的绑定, 不能是任意的绑定了, ⽽是要指定⼀个BindingKey(RoutingKey的⼀种)

消息的发送⽅在向Exchange发送消息时, 也需要指定消息的RoutingKey

Exchange也不再把消息交给每⼀个绑定的key, ⽽是根据消息的RoutingKey进⾏判断, 只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致, 才会接收到消息

接下来我们看看Routing模式的实现

步骤:

1. 引⼊依赖

2. 编写⽣产者代码

3. 编写消费者代码

引⼊依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

编写⽣产者代码

和发布订阅模式的区别是: 交换机类型不同, 绑定队列的BindingKey不同

创建交换机, 定义交换机类型为BuiltinExchangeType.DIRECT
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT, true, false, false, null);
声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE_NAME2, true, false, false, null);
绑定交换机和队列
//队列1绑定orange
channel.queueBind(Constants.DIRECT_QUEUE_NAME1, Constants.DIRECT_EXCHANGE_NAME, "orange");
//队列2绑定black, green
channel.queueBind(Constants.DIRECT_QUEUE_NAME2, Constants.DIRECT_EXCHANGE_NAME, "black");
channel.queueBind(Constants.DIRECT_QUEUE_NAME2, Constants.DIRECT_EXCHANGE_NAME, "green");
发送消息
//发送消息时, 指定RoutingKey
String msg = "hello direct, I am orange";
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"orange",null,msg.getBytes());String msg_black = "hello direct,I am black";
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"black",null,msg_black.getBytes());String msg_green= "hello direct, I am green";
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"green",null,msg_green.getBytes());
完整代码
public static String DIRECT_EXCHANGE_NAME = "test_direct";
public static String DIRECT_QUEUE_NAME1 = "direct_queue1";
public static String DIRECT_QUEUE_NAME2 = "direct_queue2";import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;public class DirectRabbitProducer {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认factory.setUsername(Constants.USER_NAME);//用户名,默认 guestfactory.setPassword(Constants.PASSWORD);//密码,默认 guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 创建交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.FANOUT, true, false, false, null);//3. 声明队列//如果没有一个这样的一个队列,会自动创建,如果有,则不创建channel.queueDeclare(Constants.DIRECT_QUEUE_NAME1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE_NAME2, true, false, false, null);//4. 绑定队列和交换机//队列1绑定orangechannel.queueBind(Constants.DIRECT_QUEUE_NAME1,Constants.DIRECT_EXCHANGE_NAME, "orange");//队列2绑定black, greenchannel.queueBind(Constants.DIRECT_QUEUE_NAME2,Constants.DIRECT_EXCHANGE_NAME, "black");channel.queueBind(Constants.DIRECT_QUEUE_NAME2,Constants.DIRECT_EXCHANGE_NAME, "green");//5. 发送消息String msg = "hello direct, I am orange";channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"orange",null,msg.getBytes());String msg_black = "hello direct,I am black";channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"black",null,msg_black.getBytes());String msg_green= "hello direct, I am green";channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"green",null,msg_green.getBytes());//6.释放资源channel.close();connection.close();}
}

编写消费者代码

Routing模式的消费者代码和Publish/Subscribe 代码⼀样, 同样复制出来两份

消费者1:DirectRabbitmqConsumer1

消费者2: DirectRabbitmqConsumer2

修改消费的队列名称就可以

完整代码:

import com.rabbitmq.client.*;
import constant.Constants;import java.io.IOException;public class DirectRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息,并消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE_NAME1, true, consumer);}
}

运⾏程序, 观察结果

1. 运⾏⽣产者

a) 可以看到direct_queue1队列中, 路由了⼀条消息. direct_queue2队列中, 路由了⼀条消息

b) exchange下队列和Routing Key的绑定关系

2. 运⾏消费者

DirectRabbitmqConsumer1 :

接收到消息: hello direct, I am orange

DirectRabbitmqConsumer2:

接收到消息: hello direct,I am black

接收到消息: hello direct, I am green

2.5 Topics(通配符模式)

Topics 和Routing模式的区别是:

1. topics 模式使⽤的交换机类型为topic(Routing模式使⽤的交换机类型为direct)

2. topic 类型的交换机在匹配规则上进⾏了扩展, Binding Key⽀持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配).

在topic类型的交换机在匹配规则上, 有些要求:

1. RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse ", " nyse.vmw "," quick.orange.rabbit "

2. BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串.

3. Binding Key中可以存在两种特殊字符串, ⽤于模糊匹配

        ◦ * 表⽰⼀个单词

        ◦ # 表⽰多个单词(0-N个)

⽐如:

• Binding Key 为"d.a.b" 会同时路由到Q1 和Q2

• Binding Key 为"d.a.f" 会路由到Q1

• Binding Key 为"c.e.f" 会路由到Q2

• Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)

接下来我们看看Routing模式的实现

步骤:

1. 引⼊依赖

2. 编写⽣产者代码

3. 编写消费者代码

引⼊依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

编写⽣产者代码

和路由模式, 发布订阅模式的区别是: 交换机类型不同, 绑定队列的RoutingKey不同

创建交换机

定义交换机类型为BuiltinExchangeType.TOPIC

channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, false, null);
声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);
绑定交换机和队列
//队列1绑定error,仅接收error信息
channel.queueBind(Constants.TOPIC_QUEUE_NAME1, Constants.TOPIC_EXCHANGE_NAME, "*.error");
//队列2绑定info, error: error, info信息都接收
channel.queueBind(Constants.TOPIC_QUEUE_NAME2, Constants.TOPIC_EXCHANGE_NAME, "#.info");
channel.queueBind(Constants.TOPIC_QUEUE_NAME2, Constants.TOPIC_EXCHANGE_NAME, "*.error");
发送消息
String msg = "hello topic, I'm order.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error", null,msg.getBytes());String msg_black = "hello topic, I'm order.pay.info";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_black.getBytes());String msg_green= "hello topic, I'm pay.error";
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.getBytes());
完整代码
public static String TOPIC_EXCHANGE_NAME = "test_topic";
public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
public static String TOPIC_QUEUE_NAME2 = "topic_queue2";import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;public class TopicRabbitProducer {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 创建交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC, true, false, false, null);//3. 声明队列//如果没有一个这样的一个队列,会自动创建,如果有,则不创建channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false,null);channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false,null);//4. 绑定队列和交换机//队列绑定error,仅接收error信息channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,"*.error");//队列2绑定info, error: error, info信息都接收channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,"#.info");channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,"*.error");//5. 发送消息String msg = "hello topic, I'm order.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBytes());String msg_black = "hello topic, I'm order.pay.info";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_black.getBytes());String msg_green= "hello topic, I'm pay.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.getBytes());//6.释放资源channel.close();connection.close();}
}

编写消费者代码

Routing模式的消费者代码和Routing模式代码⼀样, 修改消费的队列名称即可.

同样复制出来两份

消费者1:TopicRabbitmqConsumer1

消费者2: TopicRabbitmqConsumer2

完整代码:

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class TopicRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 创建channel通道​ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhost​factory.setPort(Constants.PORT); //默认值5672​factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /​factory.setUsername(Constants.USER_NAME);//用户名,默认guest​factory.setPassword(Constants.PASSWORD);//密码, 默认guest​Connection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息, 并消费​DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);}
}

运⾏程序, 观察结果

1. 运⾏⽣产者, 可以看到队列的消息数

2. 运⾏消费者:

TopicRabbitmqConsumer1 :

接收到消息: hello topic, I'm order.error

接收到消息: hello topic, I'm pay.error

TopicRabbitmqConsumer2:

接收到消息: hello topic, I'm order.error

接收到消息: hello topic, I'm order.pay.info

接收到消息: hello topic, I'm pay.error

2.6 RPC(RPC通信)

RPC(Remote Procedure Call), 即远程过程调⽤. 它是⼀种通过⽹络从远程计算机上请求服务, ⽽不需要了解底层⽹络的技术. 类似于Http远程调⽤.

RabbitMQ实现RPC通信的过程, ⼤概是通过两个队列实现⼀个可回调的过程.

⼤概流程如下:

1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, 服务端处理后, 会把响应结果发送到这个队列.

2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列

3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.

接下来我们看看RPC模式的实现

步骤:

1. 引⼊依赖

2. 编写客⼾端

3. 编写服务端

引⼊依赖

<dependency><groupId>com.rabbitmag</groupId><artifactId>amap-client</artifactId><version>5.7.3</version>
</dependency>

编写客⼾端

客⼾端代码主要流程如下:

1. 声明两个队列, 包含回调队列replyQueueName, 声明本次请求的唯⼀标志corrId

2. 将replyQueueName和corrId配置到要发送的消息队列中

3. 使⽤阻塞队列来阻塞当前进程, 监听回调队列中的消息, 把请求放到阻塞队列中

4. 阻塞队列有消息后, 主线程被唤醒,打印返回内容

声明队列
//2. 声明队列,发送消息
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false, null);
定义回调队列
// 定义临时队列,并返回生成的队列名称
String replyQueueName = channel.queueDeclare().getQueue();
使用内置交换机发送消息
// 本次请求唯一标志
String corrId = UUID.randomUUID().toString();
// 生成发送消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId) // 唯一标志本次请求
.replyTo(replyQueueName) // 设置回调队列
.build();
// 通过内置交换机,发送消息
String message = "hello rpc...";
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props, message.getBytes());
使用阻塞队列,来存储回调结果
// 阻塞队列,用于存储回调结果
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//接收服务端的响应
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息:"+ new String(body));//如果唯一标识正确,放到阻塞队列中if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}
};
channel.basicConsume(replyQueueName, true, consumer);
获取回调结果
// 获取回调的结果
String result = response.take();
System.out.println(" [RPCClient] Result:" + result);
完整代码
public static String RPC_REQUEST_QUEUE_NAME = "rpc_request_queue";import com.rabbitmq.client.*;
import constant.Constants;import java.io.IOException;
import java.util.UIJID;
import java.util.concurrent.ArrayListQueue;
import java.util.concurrent.BlockingQueue;public class RPCClient {public static void main(String[] args) throws Exception {//1. 创建Channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME);//用户名,默认guestfactory.setPassword(Constants.PASSWORD);//密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false, null);// 唯一标志本次请求String corrId = UIJID.randomUIJID().toString();// 定义临时队列,并返回生成的队列名称String replyQueueName = channel.queueDeclare().getQueue();// 生成发送消息的属性AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId) // 唯一标志本次请求.replyTo(replyQueueName) // 设置回调队列.build();// 通过内置交换机,发送消息String message = "hello rpc...";channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props, message.getBytes());// 阻塞队列,用于存储回调结果final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//接收服务端的响应DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到回调消息:"+ new String(body));if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body, "UTF-8"));}}};channel.basicConsume(replyQueueName, true, consumer);// 获取回调的结果String result = response.take();System.out.println(" [RPCClient] Result:" + result);//释放资源channel.close();connection.close();}
}

编写服务端

服务端代码主要流程如下:

1. 接收消息

2. 根据消息内容进⾏响应处理, 把应答结果返回到回调队列中

声明队列
//2. 声明队列,发送消息
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false, null);
设置同时最多只能获取一个消息

如果不设置basicQos, RabbitMQ 会使⽤默认的QoS设置, 其prefetchCount默认值为0. 当

prefetchCount为0时,RabbitMQ 会根据内部实现和当前的⽹络状况等因素,可能会同时发送多条消息给消费者. 这意味着在默认情况下,消费者可能会同时接收到多条消息, 但具体数量不是严格保证的, 可能会有所波动(后⾯会讲)

在RPC模式下,通常期望的是⼀对⼀的消息处理, 即⼀个请求对应⼀个响应. 消费者在处理完⼀个消息并确认之后,才会接收到下⼀条消息.

// 设置同时最多只能获取一个消息
channel.basicQos(1);
System.out.println("Awaiting RPC request");
接收消息,并做出相应处理

RabbitMQ 消息确定机制

在RabbitMQ中,basicConsume⽅法的autoAck参数⽤于指定消费者是否应该⾃动向消息队列确认消息

⾃动确认(autoAck=true): 消息队列在将消息发送给消费者后, 会⽴即从内存中删除该消息. 这意味着, 如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费

⼿动确认(autoAck=false): 消息队列在将消息发送给消费者后,需要消费者显式地调⽤basicAck⽅法来确认消息. ⼿动确认提供了更⾼的可靠性, 确保消息不会被意外丢失, 适⽤于消息处理重要且需要确保每个消息都被正确处理的场景.

Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();// 生成返回String message = new String(body);String response = "request:"+ message + ", response: 处理成功";// 回复消息,通知已经收到请求channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}
}
);
channel.basicConsume(Constants.RPC_REQUEST_QUEUE_NAME, false, consumer);
完整代码
import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {public static void main(String[] args) throws IOException,TimeoutException {//1. 创建Channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // ip 默认值localhostfactory.setPort(Constants.PORT); // 默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME); // 用户名, 默认 guestfactory.setPassword(Constants.PASSWORD); // 密码,默认 guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, false, null);// 接收消息,并消费// 设置同时最多只能获取一个消息channel.basicQos(1);System.out.println("Awaiting RPC request");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();// 生成返回String message = new String(body);String response = "request:"+ message + ", response: 处理成功";// 回复消息,通知已经收到请求channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes());// 对消息进行应答channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE_NAME, false, consumer);}
}

运⾏程序, 观察结果

运⾏客⼾端:

amq.gen-iijHJbGabr7E2aK1KQV3Nw 就是回调队列

运⾏服务端, 观察客⼾端⽇志

接收到回调消息:request:hello rpc..., response: 处理成功

[RPCClient] Result:request:hello rpc..., response: 处理成功

2.7 Publisher Confirms(发布确认)

作为消息中间件, 都会⾯临消息丢失的问题.

消息丢失⼤概分为三种情况:

1. ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.

2. 消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 导致消息丢失.

3. 消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费失败的消息从队列中删除了.

RabbitMQ也对上述问题给出了相应的解决⽅案. 问题2可以通过持久化机制. 问题3可以采⽤消息应答机制. (后⾯详细讲)

针对问题1, 可以采⽤发布确认(Publisher Confirms)机制实现.

发布确认 属于RabbitMQ的七⼤⼯作模式之⼀.

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.

发送⽅确认机制最⼤的好处在于它是异步的, ⽣产者可以同时发布消息和等待信道返回确认消息.

1. 当消息最终得到确认之后, ⽣产者可以通过回调⽅法来处理该确认消息.

2. 如果RabbitMQ因为⾃⾝内部错误导致消息丢失, 就会发送⼀条nack(Basic.Nack)命令, ⽣产者同样可以在回调⽅法中处理该nack命令.

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式.

发布确认是 AMQP 0.9.1 协议的扩展, 默认情况下它不会被启⽤. ⽣产者通过channel.confirmSelect()将信道设置为confirm模式.

Channel channel = connection.createChannel();channel.confirmSelect();

发布确认有3种策略, 接下来我们来学习这三种策略.

Publishing Messages Individually(单独确认)

代码⽰例:

static void publishMessagesIndividually() throws Exception {  try (Connection connection = createConnection()) {  //创建channelChannel ch = connection.createChannel();//开启信道确认模式ch.confirmSelect();//声明队列ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME1, true, false, true,null);long start = System.currentTimeMillis();//循环发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String body = "消息"+ i;//发布消息ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME1, null,body.getBytes());//等待确认消息.只要消息被确认,这个方法就会被返回//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失),waitForConfirmsOrDie将抛出IOException。ch.waitForConfirmsOrDie(5_000);}long end = System.currentTimeMillis();System.out.format("Published %d messages individually in %d ms",MESSAGE_COUNT, end - start);}
}

观察运⾏结果:

Published 200 messages individually in 6265 ms

可以发现, 发送200条消息, 耗时很⻓.

观察上⾯代码, 会发现这种策略是每发送⼀条消息后就调⽤channel.waitForConfirmsOrDie⽅法,之后等待服务端的确认, 这实际上是⼀种串⾏同步等待的⽅式. 尤其对于持久化的消息来说, 需要等待消息确认存储在磁盘之后才会返回(调⽤Linux内核的fsync⽅法).

但是发布确认机制是⽀持异步的. 可以⼀边发送消息, ⼀边等待消息确认.

由此进⾏了改进, 接下来看另外两种策略:

  • Publishing Messages in Batches(批量确认) : 每发送⼀批消息后,调⽤channel.waitForConfirms⽅法, 等待服务器的确认返回.
  • Handling Publisher Confirms Asynchronously(异步确认): 提供⼀个回调⽅法,服务端确认了⼀条或者多条消息后客⼾端会回这个⽅法进⾏处理

Publishing Messages in Batches(批量确认)

代码⽰例:

static void publishMessagesInBatch() throws Exception {try (Connection connection = createConnection()) {//创建信道Channel ch = connection.createChannel();//信道设置为confirm模式ch.confirmSelect();//声明队列ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME2, true, false, true, null);int batchSize = 100;int outstandingMessageCount = 0;long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = "消息"+ i;//发送消息ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME2, null, body.getBytes());outstandingMessageCount++;//批量确认消息if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}//消息发送完,还有未确认的消息,进行确认if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}long end = System.currentTimeMillis();System.out.format("Published %d messages in batch in %d ms", MESSAGE_COUNT, end - start);}
}

观察运⾏结果:

Published 200 messages in batch in 128 ms

可以观察到, 性能提⾼了很多.

相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量.当消息经常丢失时,批量确认的性能应该是不升反降的.

Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.

ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.

deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.

我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作.

代码⽰例:

static void handlePublishConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()) {Channel ch = connection.createChannel();ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME3, false, false, true,null);ch.confirmSelect();//有序集合,元素按照自然顺序进行排序,存储未confirm消息序号SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet>());ch.addConfirmListener(new ConfirmListener() {@Override public void handleAck(long deliveryTag, boolean multiple) throws IOException {//System.out.println("ack, SeqNo: " + deliveryTag +",multiple:" + multiple);//multiple 批量//confirmSet.headSet(n)方法返回当前集合中小于n的集合if (multiple) {//批量确认:将集合中小于等于当前序号deliveryTag元素的集合清除,表示这批序号的消息都已经被ack了confirmSet.headSet(deliveryTag+1).clear();} else {//单条确认:将当前的deliveryTag从集合中移除confirmSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.format("deliveryTag: %d, multiple: %b%n",deliveryTag, multiple);if (multiple) {//批量确认:将集合中小于等于当前序号deliveryTag元素的集合清除,表示这批序号的消息都已经被ack了confirmSet.headSet(deliveryTag+1).clear();} else {//单条确认:将当前的deliveryTag从集合中移除confirmSet.remove(deliveryTag);}//如果处理失败,这里需要添加处理消息重发的场景。此处代码省略}});//循环发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;//得到下次发送消息的序号,从I开始long nextPublishSeqNo = ch.getNextPublishSeqNo();//System.out.println("消息序号:" + nextPublishSeqNo);ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME3, null,message.getBytes());//将序号存入集合中confirmSet.add(nextPublishSeqNo);}//消息确认完毕while (!confirmSet.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.format("Published %d messages and handled confirms asynchronously in %d ms%n", MESSAGE_COUNT, end - start);}
}

观察运⾏结果:

Published 200 messages and handled confirms asynchronously in 92 ms

三种策略对⽐

消息数越多, 异步确认的优势越明显

200条消息的结果对⽐

Published 200 messages individually in 6931 ms

Published 200 messages in batch in 137 ms

Published 200 messages and handled confirms asynchronously in 73 ms

500条消息结果对⽐

Published 500 messages individually in 15805 ms

Published 500 messages in batch in 246 ms

Published 500 messages and handled confirms asynchronously in 107 ms

http://www.dtcms.com/a/600585.html

相关文章:

  • .NET驾驭Excel之力:Excel应用程序的创建与管理
  • Unity2.5D视角肉鸽项目架构
  • JAVA和C#的语法对比
  • WPS Excel 图表
  • 电商网站开发需要掌握哪些知识技能品牌设计和vi设计有什么区别
  • Spring 框架整合 JUnit 单元测试——包含完整执行流程
  • .NET驾驭Excel之力:自动化数据处理 - 开篇概述与环境准备
  • 多站点网站群的建设与管理识图搜索在线 照片识别
  • C++ builder xe 用imageen组件ImageEnView1合并多个图片导出一个pdf
  • 深度拆解汽车制造系统设计:用 Java + 设计模式打造高扩展性品牌 - 车型动态生成架构
  • 客户端VS前端VS后端
  • 西安企业网站建设哪家好hs网站推广
  • 【宝塔面板】监控、日志、任务与安全设置
  • RPA财务机器人落地指南:治理架构、流程优化与风险防控
  • GitHub Agent HQ正式发布,构建开放智能体生态
  • XML节点SelectSingleNode(“msbuild:DebugType“ 为什么要加msbuild
  • 【GitHub热门项目】(2025-11-12)
  • 【RAG评测方案汇总】GitHub开源工具全览
  • 数据集月度精选 | 高质量具身智能数据集:打开机器人“感知-决策-动作”闭环的钥匙
  • 深圳网站制作易捷网络湘乡网站seo
  • Java Maven Log4j 项目日志打印
  • 面试:Spring中单例模式用的是哪种?
  • 长芯微LPS5820完全P2P替代NCP51820,LPS5820 是一款高速半桥驱动器,可用来驱动半 桥功率拓扑的 GaN 功率管。
  • Python 第三方库:PyTorch(动态计算图的深度学习框架)
  • 如果网站打开非常缓慢国内全屋定制十大名牌
  • 【操作系统】详解 分页与分段系统存储管理
  • flex:1
  • 【LeetCode经典题解】递归破解对称二叉树之谜
  • 电脑已连接网络无线自动重启
  • 创建Vue2和Vue3项目区别对比和对应示例演示