当前位置: 首页 > news >正文

RabbitMQ四种交换器类型详解及示例

交换器类型路由规则路由键作用代码示例典型用例
Direct精确匹配 (Routing Key == Binding Key)关键,必须精确匹配queueBind(queue, exchange, “exact.routing.key”)点对点任务分发,RPC
Fanout广播,忽略路由键无关queueBind(queue, exchange, “”) (忽略 routingKey)发布订阅,广播通知
Topic模式匹配 (*, #通配符)关键,支持灵活模式queueBind(queue, exchange, “pattern.*.key”)基于多维度属性的动态路由
Headers根据消息头属性匹配 (x-match=all/any)无关,使用 Headers 属性queueBind(queue, exchange, “”, headersArgs)基于复杂属性的路由(不常用)

1. Direct(直连交换机)

交换器会将消息的 路由键 (Routing Key)​ 与队列绑定时使用的 绑定键 (Binding Key)​ 进行精确比较。如果两者完全匹配,消息就会被投递到该队列。

代码示例:

创建了一个直连交换机,生产者向不同 routingKey(“error”、“info”)发送消息。
启动两个消费者,分别绑定特定的 routingKey。
运行结果:三个消费者都会收到生产者发送的每一条消息

public class DirectExchangeDemo {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {// 启动消费者new Thread(() -> startConsumer("error")).start();new Thread(() -> startConsumer("info")).start();Thread.sleep(1000);// 生产者发送消息startProducer();}static void startProducer() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 发送到不同 routingKey 的消息String[] routingKeys = {"error", "info", "error", "info"};String[] messages = {"数据库错误", "用户登录", "文件读取失败", "系统启动"};for (int i = 0; i < messages.length; i++) {channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, messages[i].getBytes());System.out.println(" [x] Sent '" + routingKeys[i] + "':'" + messages[i] + "'");}}}static void startConsumer(String routingKey) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = channel.queueDeclare().getQueue();// 队列绑定特定的 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, routingKey);System.out.println(" [*] " + routingKey + " consumer waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [" + routingKey + " Consumer] Received: '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });} catch (Exception e) {e.printStackTrace();}}
}

2. Fanout(广播交换机)

会将发送到该交换器的所有消息无条件地路由到所有与它绑定的队列。忽略路由键。
就像广播或发布-订阅模式。一条消息会被发送给所有“订阅者”(所有绑定的队列)。

代码示例:

创建了一个广播交换机,生产者发送的消息会被广播到所有绑定队列。
启动三个消费者,都绑定到同一个扇出交换机。

运行结果:三个消费者都会收到生产者发送的每一条消息

public class FanoutExchangeDemo {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {// 启动多个消费者new Thread(() -> startConsumer("Consumer1")).start();new Thread(() -> startConsumer("Consumer2")).start();new Thread(() -> startConsumer("Consumer3")).start();Thread.sleep(1000);// 生产者发送消息startProducer();}static void startProducer() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);for (int i = 1; i <= 3; i++) {String message = "广播消息 #" + i;// Fanout 交换机会忽略 routingKeychannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent: '" + message + "'");}}}static void startConsumer(String consumerName) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String queueName = channel.queueDeclare().getQueue();// 绑定到 Fanout 交换机,不需要 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] " + consumerName + " waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [" + consumerName + "] Received: '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });} catch (Exception e) {e.printStackTrace();}}
}

3. Topic(主题交换机)

使用通配符匹配 routingKey
*:匹配一个单词
#:匹配零个或多个单词

代码示例:

创建了一个主题交换机,使用通配符进行路由。
启动三个消费者,分别使用不同的绑定模式:
“news.sports.":匹配所有体育新闻
"news.tech.
”:匹配所有科技新闻
“news.#”:匹配所有新闻。

运行结果:消费者根据绑定模式选择性接收匹配的消息

public class TopicExchangeDemo {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {// 启动不同模式的消费者new Thread(() -> startConsumer("news.sports.*", "体育新闻消费者")).start();new Thread(() -> startConsumer("news.tech.*", "科技新闻消费者")).start();new Thread(() -> startConsumer("news.#", "所有新闻消费者")).start();Thread.sleep(1000);// 生产者发送消息startProducer();}static void startProducer() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 发送到不同主题的消息String[][] messages = {{"news.sports.basketball", "篮球"},{"news.sports.football", "足球"},{"news.tech.ai", "AI"},{"news.tech.blockchain", "区块链应用"},{"news.weather", "天气预报"}};for (String[] msg : messages) {String routingKey = msg[0];String message = msg[1];channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}}}static void startConsumer(String bindingKey, String consumerName) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();// 使用通配符绑定channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);System.out.println(" [*] " + consumerName + " [" + bindingKey + "] waiting...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");String routingKey = delivery.getEnvelope().getRoutingKey();System.out.println(" [" + consumerName + "] Received '" + routingKey + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });} catch (Exception e) {e.printStackTrace();}}
}

4.Headers(头交换机)

忽略 routingKey,根据消息的 headers 属性进行匹配
x-match: all:必须匹配所有指定的 headers(AND 逻辑)
x-match: any:只需匹配任意一个指定的 headers(OR 逻辑)

代码示例:

创建一个头交换机,基于消息的 headers 属性进行路由。
启动两个消费者,使用不同的匹配策略:
"all"模式:必须匹配所有指定的 headers
"any"模式:只需匹配任意一个指定的 headers

运行结果:完全忽略 routingKey,消费者根据 headers 匹配条件选择性接收消息

public class HeadersExchangeDemo {private static final String EXCHANGE_NAME = "headers_exchange";public static void main(String[] args) throws Exception {// 启动不同匹配条件的消费者new Thread(() -> startConsumer("all", createArgs("all", "alert", "high"))).start();new Thread(() -> startConsumer("any", createArgs("any", "type", "alert"))).start();Thread.sleep(1000);// 生产者发送消息startProducer();}static void startProducer() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);// 发送带有不同 headers 的消息Map<String, Object> headers1 = new HashMap<>();headers1.put("type", "alert");headers1.put("priority", "high");Map<String, Object> headers2 = new HashMap<>();headers2.put("type", "info");headers2.put("priority", "low");Map<String, Object> headers3 = new HashMap<>();headers3.put("type", "alert");headers3.put("priority", "medium");AMQP.BasicProperties props1 = new AMQP.BasicProperties.Builder().headers(headers1).build();AMQP.BasicProperties props2 = new AMQP.BasicProperties.Builder().headers(headers2).build();AMQP.BasicProperties props3 = new AMQP.BasicProperties.Builder().headers(headers3).build();// Headers 交换机会忽略 routingKeychannel.basicPublish(EXCHANGE_NAME, "", props1, "高危警报".getBytes());channel.basicPublish(EXCHANGE_NAME, "", props2, "普通信息".getBytes());channel.basicPublish(EXCHANGE_NAME, "", props3, "中等警报".getBytes());System.out.println(" [x] Sent 3 messages with different headers");}}static void startConsumer(String consumerName, Map<String, Object> bindingArgs) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);String queueName = channel.queueDeclare().getQueue();// 使用 headers 进行绑定channel.queueBind(queueName, EXCHANGE_NAME, "", bindingArgs);System.out.println(" [*] " + consumerName + " consumer waiting...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");Map<String, Object> headers = delivery.getProperties().getHeaders();System.out.println(" [" + consumerName + " Consumer] Received headers " + headers + ": '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });} catch (Exception e) {e.printStackTrace();}}static Map<String, Object> createArgs(String matchType, String key, String value) {Map<String, Object> args = new HashMap<>();args.put("x-match", matchType);  // "all" 或 "any"args.put(key, value);return args;}
}

5.maven引用

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>
</dependencies>

6.参考资料

《RabbitMQ实战指南》

http://www.dtcms.com/a/618698.html

相关文章:

  • 网站配图尺寸信息流是sem还是seo
  • 安微凤阳县建设局网站网页设计html模板下载
  • 精品网站设计商标logo图片
  • RK3588平台部署MNN和OPENCL
  • 基于Spring Cloud的电商系统设计与实现——用户与商品模块的研究(下)
  • 网站触屏版建站软件排行榜
  • docker-study
  • 为什么建设厅的网站不好打开龙华网站开发公司电话
  • 网站建设方案书是啥外贸网站建设深圳
  • 基础算法精讲 13 | 二叉树的层序遍历
  • 网站设计包含哪些技术wordpress数据库更改账号密码
  • 如何通过reactor实现流式响应接口
  • vue-leaflet使用教程(一)
  • 江苏省徐州市建设银行网站技术培训网站
  • 如何取外贸网站域名建设网站平台费
  • python 贪心-dfs-dp
  • Android Studio - 使用 BuildConfig
  • 在ec2上部署Qwen2.5omini和Qwen3omini模型
  • 设备通信的艺术:从协议选型、性能调优到自定义实现的全维度技术实践
  • 过滤器模式、责任链模式
  • 做货源的网站郑州企业免费建站
  • HCIP笔记5--OSPF域间路由、虚链路、认证
  • Java 黑马程序员学习笔记(进阶篇27)
  • 海南网站推广建设温州市城市基础设施建设网站
  • CentOS 7 安装 unzip-6.0-21.el7.x86_64.rpm 步骤详解(附安装包)
  • 审计局网站建设管理创意设计广告
  • Goer-Docker系列-1-使用kubectl命令部署Web应用
  • php网站超市响应式网站建设方案
  • 家用路由器挑选指南:考量无线协议与Wi-Fi 7新技术参数
  • 站长工具网站提交沈阳网站优化怎么做