当前位置: 首页 > news >正文

详解RabbitMQ工作模式之通配符模式

目录

通配符模式

概述

应用场景

优势

代码案例

引入依赖

常量类

编写生产者代码

编写消费者1代码

编写消费者2代码

运行代码


通配符模式

概述

通配符模式是一种灵活的消息传递模式,可以根据消息的路由键(routing key)和绑定(binding)模式来实现精确的消息过滤和匹配。在RabbitMQ中,路由键由生产者定义,用于标识消息的目的地;而绑定则由消费者定义,用于指定消息的接收规则。

路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活.

Topics和Routing的基本原理相同,即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列. 类似于正则表达式的⽅式来定义Routingkey的模式.

不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.

应用场景

RabbitMQ的通配符模式在需要根据消息的特定属性进行路由和过滤的场景中非常有用。例如,在一个日志系统中,可以使用通配符模式来将不同级别的日志消息路由到不同的队列中,以便进行不同的处理和分析。

优势

通配符模式的优势在于它可以灵活地匹配消息,使得消息可以根据不同的条件进行过滤和选择。通过合理地定义绑定和路由键,可以实现复杂的消息过滤和路由策略,提高系统的灵活性和性能。 

代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量类
public class Constants {public static final String HOST = "47.98.109.138";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "aaa";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic_queue1";public static final String TOPIC_QUEUE2 = "topic_queue2";
}
编写生产者代码
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 通配符模式生产者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//4. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6. 发送消息String msg = "hello topic, my routingkey is ae.a.f....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //转发到Q1String msg_b = "hello topic, my routingkey is ef.a.b....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2String msg_c = "hello topic, my routingkey is c.ef.d....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}
编写消费者1代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
编写消费者2代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}
运行代码

观察管理界面可以看到两个队列都各自收到了2条消息,与预期符合。

两个消费者都各自从两个不同的队列中取出并消费了2条消息,与预期符合。

相关文章:

  • laravel 中使用的pdf 扩展包 laravel-snappy(已解决中文乱码)
  • 从零搭建AI工作站:Gemma3大模型本地部署+WebUI配置全套方案
  • 用 openssl 测试 tls 连接
  • C++23 views::slide (P2442R1) 深入解析
  • 奇次谐波和偶次谐波【EMC】
  • 扩展:React 项目执行 yarn eject 后的 scripts 目录结构详解
  • 数据结构与算法学习-JavaScript的Array.prototype.reduce()方法
  • 【K8S学习之探针】详细了解就绪探针 readinessProbe 和存活探针 livenessProbe 的配置
  • 【K8S学习之生命周期钩子】详细了解 postStart 和 preStop 生命周期钩子
  • JAVA EE_网络原理_数据链路层
  • 【网工第6版】第10章 网络规划和设计①
  • 【android bluetooth 框架分析 02】【Module详解 13】【CounterMetrics 模块介绍】
  • 【数据结构】双链表
  • 数据结构(六)——树和二叉树
  • 【漫话机器学习系列】255.独立同分布(Independent and Identically Distributed,简称 IID)
  • 【001】renPy android端启动流程分析
  • 致远OA人事标准模块功能简介【附应用包百度网盘下载地址,官方售价4W】
  • thinkphp模板文件缺失没有报错/thinkphp无法正常访问控制器
  • 最大子数组和
  • 智能家居“心脏“升级战:GD25Q127CSIG国产芯片如何重构家庭物联生态
  • 书法需从字外看,书法家、学者吴本清辞世
  • 央行等印发《关于金融支持广州南沙深化面向世界的粤港澳全面合作的意见》
  • 上海护师邢红获第50届南丁格尔奖,她为何能摘得护理界最高荣誉
  • 上海与世界|环城生态公园带是上海绿色发展新名片
  • 汇源果汁发文:经营情况一切正常
  • 演员发文抵制代拍获粉丝支持,媒体:追星“正确姿势”不妨多来点