当前位置: 首页 > news >正文

RabbitMQ工作模式(下)

路由模式

在这里插入图片描述

交换机通过不同的 routingkey 绑定队列,生产者通过 routingkey 来向不同的队列发送消息

生产者代码演示:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();// 开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("DIRECT_EXCHANGE", BuiltinExchangeType.DIRECT, true, false, null);//声明队列channel.queueDeclare("direct1", true, false, false, null);channel.queueDeclare("direct2", true, false, false, null);//绑定队列和交换机channel.queueBind("direct1","DIRECT_EXCHANGE","a");channel.queueBind("direct2","DIRECT_EXCHANGE","a");channel.queueBind("direct2","DIRECT_EXCHANGE","b");channel.queueBind("direct2","DIRECT_EXCHANGE","c");//发送消息for (int i = 0; i < 10; i++) {channel.basicPublish("DIRECT_EXCHANGE", "a", null, ("hello" + i).getBytes());channel.basicPublish("DIRECT_EXCHANGE", "b", null, ("hello" + i).getBytes());channel.basicPublish("DIRECT_EXCHANGE", "c", null, ("hello" + i).getBytes());}//关闭资源channel.close();connection.close();}
}

前面的建立连接的代码大家可以抽离出来,这里不抽离是方便大家了解整个代码的编写过程

在上面我们建立的队列和交换机的绑定关系图如下:
在这里插入图片描述
这里可以看到同样的 routingkey 为 a 绑定了两个队列,如果生产者使用 a 这个 routingkey 发送消息,那么两个队列都会接收到
在这里插入图片描述

消费者代码演示:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("direct1", true, false, false, null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("direct1", true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("direct2", true, false, false, null);//进行消费Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("direct2", true, consumer);}
}

通配符模式

在这里插入图片描述

这里有两种符号需要认识,首先 * 表示匹配一个单词,# 表示匹配 0 - n 个单词,只要符合上面的路由规则交换机就会将消息发送到对应的队列上。

生产者代码演示:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);//声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//绑定队列和交换机channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.orange.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.rabbit");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "lazy.#");//发送消息for (int i = 0; i < 10; i++) {channel.basicPublish(Constants.TOPIC_EXCHANGE, "quick.orange.rabbit", null, ("quick.orange.rabbit hello" + i).getBytes());channel.basicPublish(Constants.TOPIC_EXCHANGE, "lazy", null, ("lazy hello" + i).getBytes());}//关闭资源channel.close();connection.close();}
}

消费者代码演示:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);//进行消费Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列 避免没有队列发生异常channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//从队列中获取信息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}

RPC

在这里插入图片描述

PRC 模式一般很少使用,当我们的生产者需要消费者的响应的时候,我们才会使用这个模式。

这里有两个重要的参数,correlation_id 是消息的标识符,主要用于区分消息,由于Client需要得到 Server 的响应所以这里的 correlation_id 需要区分这是哪条消息

reply_to 用于Client指定对应的队列去路由消息

Client 代码演示:

public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);// 设置消息String msg = "hello rpc";//设置请求的唯一标识String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes());//接收响应//使用阻塞队列存储响应信息final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//消费者逻辑Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String responseMsg = new String(body);System.out.println("接收到回调信息:" + responseMsg);if (correlationId.equals(properties.getAppId())) {response.offer(responseMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true , consumer);//取出消息进行消费String res = response.take();System.out.println("最终结果:" + res);}
}

代码简单介绍:
String correlationId = UUID.randomUUID().toString(); 用于标识消息

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
通过设置 属性,将我们的 correlationId 和 replyTo(指定对应的队列接发消息)设置进去

Server 代码演示:

public class PpcServer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);//由于使用的是默认的交换机,所以绑定队列省略//消费者最多获取到一个未确认的消息channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body, "UTF-8");System.out.println("接收到的请求为:" + request);String response = "针对 resquest 的响应为:" + request + "响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}

代码简单介绍
channel.basicQos(1); 这是rabbitmq 的高级特性,用于消费者最多接收多少个未确认的消息,可以调节消费者的吞吐量

在 RPC 中我们也要设置correlationId,这里要求correlationId和生产者的correlationId要保持一致,这样生产者才能识别出来这是哪一条消息

AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish(“”, Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());

Publisher Confirms(发布确认)

这个也可以当作是生产者将消息成功发送到指定的broker的可靠保证的方式

发布确认一共有三种方式:单独确认,批量确认,异步确认

下面是建立连接的方法:

    //建立连接private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);return factory.newConnection();}

Publishing Messages Individually(单独确认)

单独确认就是生成者每发送一条消息,都要等待broker 回复 ack,只有等到ack,才会发送下一则消息,效率不高

开启发布确认模式需要进行下面的设置:

channel.confirmSelect();

等到ack 回复可以使用下面的方法:

channel.waitForConfirmsOrDie(5000);

    /*** 单独确认模式*/public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = createConnection();//开启信道Channel channel = connection.createChannel();//设置为发布确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);//发布消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "publishingMessagesIndividually:" + i;channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());//最多等待消息确认的时间 5schannel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.println("单独确认消息总耗时:" + (end - start));}

Publishing Messages in Batches(批量确认)

批量确认顾名思义就是等待消息发送到一定数量的时候才进行确认,这里有个问题就是在发生故障时我们无法确切知道哪里出了问题,因此我们可能需要将整个批次保存在内存中,以记录一些有意义的信息或重新发布消息。此外,该解决方案仍然是同步的,因此会阻塞消息的发布。下面的官方的解释:

在这里插入图片描述

对比我们的单独确认,批量确认确实能够提高我们的吞吐量。

    /*** 批量确认模式*/public static void publishingMessagesBatches() throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = createConnection();//开启信道Channel channel = connection.createChannel();//设置为发布确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);//发布消息long start = System.currentTimeMillis();//计数int count = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "publishingMessagesBatches:" + i;channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());count++;if(count == 100) {//最大等待时间channel.waitForConfirmsOrDie(5000);count = 0;}}//最后一次确认,确保剩余的不足100条消息全部确认掉if(count > 0) {channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.println("批量确认消息总耗时:" + (end - start));}

Handling Publisher Confirms Asynchronously(异步确认)

异步确认是指发送消息和接收消息的 ack 这两个动作是异步的,也就意味着在高吞吐量的情况下,我们可以更好地应对,比起前两种方式,这种异步确认的方式确实效率更高

    /*** 异步确认模式*/public static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = createConnection();//开启信道Channel channel = connection.createChannel();//设置为发布确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);//发布消息long start = System.currentTimeMillis();//存放消息序号的容器SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());//添加监听器channel.addConfirmListener(new ConfirmListener() {//收到确认信息@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple) {//将当前消息以及当前消息前面所有的消息删除confirmSet.headSet(deliveryTag + 1).clear();} else {//只删除当前消息confirmSet.remove(deliveryTag);}}//收到nack 的消息@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSet.headSet(deliveryTag+1).clear();}else {confirmSet.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//发布消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "handlingPublisherConfirmsAsynchronously: " + i;//将消息的序号放入到容器中//channel.getNextPublishSeqNo() 在确认模式下,返回要发布的下一条消息的序列号。confirmSet.add(channel.getNextPublishSeqNo());channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());}while(!confirmSet.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.println("异步确认消息总耗时:" + (end - start));}

代码解读:
异步确认最重要的就是设置监听器:我们要设置两个方法,一个是接收到 ack 的处理,另一个则是接收到 nack 的处理,这里有一个参数 multiple 【表示是否批量确认,如果是批量确认的话,意味着接受到序列号为deliveryTag的消息的时候,小于等于deliveryTag的消息一同都要被确认掉,如果不是批量确认,那就单独确认,需要我们一条一条消息进行确认】

        //添加监听器channel.addConfirmListener(new ConfirmListener() {//收到确认信息@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple) {//将当前消息以及当前消息前面所有的消息删除confirmSet.headSet(deliveryTag + 1).clear();} else {//只删除当前消息confirmSet.remove(deliveryTag);}}//收到nack 的消息@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSet.headSet(deliveryTag+1).clear();}else {confirmSet.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});

其次就是要创建一个容器用于保存消息的序列号

        //存放消息序号的容器SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

channel.getNextPublishSeqNo() 这个方法是在确认模式下,返回要发布的下一条消息的序列号deliveryTag

channel.getNextPublishSeqNo()

文章转载自:

http://EtdASwxH.hdqqr.cn
http://en8qYTOf.hdqqr.cn
http://H4gezFMT.hdqqr.cn
http://9iv0dBqv.hdqqr.cn
http://WG0xTY0Y.hdqqr.cn
http://bvOHVO6G.hdqqr.cn
http://T7ouM9Mk.hdqqr.cn
http://SrmuUUzI.hdqqr.cn
http://0jUNj45b.hdqqr.cn
http://KqiihD69.hdqqr.cn
http://OI7lsXtY.hdqqr.cn
http://zYWsakuw.hdqqr.cn
http://dpupJTcx.hdqqr.cn
http://LjmASbSn.hdqqr.cn
http://ITMQFp1g.hdqqr.cn
http://Rw2u1BkB.hdqqr.cn
http://EcOiDxNf.hdqqr.cn
http://QkFgxUbH.hdqqr.cn
http://EcMWIVTd.hdqqr.cn
http://ROsuA7ft.hdqqr.cn
http://4OoNyd24.hdqqr.cn
http://ziqKKb0H.hdqqr.cn
http://PP1rMHgy.hdqqr.cn
http://68EWj6od.hdqqr.cn
http://DpKCzdpr.hdqqr.cn
http://AAEhWLtT.hdqqr.cn
http://amTU4Zfc.hdqqr.cn
http://ZWVIw1e1.hdqqr.cn
http://KU5VIx9l.hdqqr.cn
http://2VOfziri.hdqqr.cn
http://www.dtcms.com/a/369993.html

相关文章:

  • Custom SRP - Complex Maps
  • tp报错解决
  • MySQL MHA 高可用集群搭建
  • 《AI大模型应知应会100篇》第68篇:移动应用中的大模型功能开发 —— 用 React Native 打造你的语音笔记摘要 App
  • Mac Intel 芯片 Docker 一键部署 Neo4j 最新版本教程
  • 正态分布 - 正态分布的经验法则(68-95-99.7 法则)
  • 【操作系统-Day 25】死锁 (Deadlock):揭秘多线程编程的“终极杀手”
  • (二).net面试(static)
  • 为什么服务器有主备BMC?
  • Dotnet 项目手动部署到AWS 和Github action CICD 流程总结
  • (2)桌面云、并行计算、分布式、网格计算
  • Java中的死锁
  • SQL 进阶指南:视图的创建与使用(视图语法 / 作用 / 权限控制)
  • SQL 实战指南:电商订单数据分析(订单 / 用户 / 商品表关联 + 统计需求)
  • 附050.Kubernetes Karmada Helm部署联邦及使用
  • 【PCIe EP 设备入门学习专栏 -- 8 PCIe EP 架构详细介绍】
  • STM32HAL 快速入门(十九):UART 编程(二)—— 中断方式实现收发及局限分析
  • 【星闪】Hi2821 | PWM脉宽调制模块 + 呼吸灯例程
  • 具身智能模拟器:解决机器人实机训练场景局限与成本问题的创新方案
  • 【嵌入式】【科普】AUTOSAR学习路径
  • 大麦APP抢票-核心
  • Linux笔记---TCP套接字编程
  • SQL面试题及详细答案150道(81-100) --- 子查询篇
  • CentOS系统停服,系统迁移Ubuntu LTS
  • 基于Spring Boot的幼儿园管理系统
  • 《sklearn机器学习——聚类性能指标》Fowlkes-Mallows 得分
  • STAR-CCM+|雷诺数回顾
  • 设计整体 的 序分(三“释”)、正宗分(双“门”)和流通分(统一的通行表达式) 之3 “自明性”(腾讯元宝 之2)
  • MySQL集群高可用架构之组复制 (MGR)
  • GPT-5发布:统一智能体时代的开启——从“工具”到“协作者”的范式跃迁