当前位置: 首页 > news >正文

RabbitMQ-topic exchange使用方法

RabbitMQ-默认读、写方式介绍

RabbitMQ-发布/订阅模式

RabbitMQ-直连交换机(direct)使用方法

目录

1、概述

2、topic交换机使用方法

2.1 适用场景

2.2 解决方案

3、代码实现

3.1 源代码实现

3.2 运行记录

4、小结


1、概述

topic 交换机是比直连交换机功能更加强大的交换方式,通过不同的路由规则,可以实现fanout、direct两种交换机的功能。

2、topic交换机使用方法

2.1 适用场景

假设我们要对动物做一个描述,根据速度、颜色、种类等特征对其进行分别入到不同的mq队列中,routing key的格式为:"<speed>.<colour>.<species>",比如说,所有黄色动物入队列1,跑的速度慢的,还有小兔子入队列2,哪该如何实现该需求呢?

2.2 解决方案

结合2.1描述的需求,我们可以画出如下框图:

知识点解释:

* (star) :和正则的功能类似,可以代表一整个单词。

# (hash) :代表0个或者多个单词。

如果一条消息的routing key为「quick.orange.rabbit」,将会被同时路由到Q1和Q2,「lazy.orange.elephant」的routing key同样也将会被同时路由到Q1和Q2,「quick.orange.fox」的消息只会被路由Q1,【lazy.brown.fox】只会被路由到Q2,【lazy.pink.rabbit】只会被路由到Q2一次,虽然匹配了两个binding,【quick.brown.fox】没有匹配到任何的绑定,那么消息将会被丢弃。

如果一个队列绑定的是【#】,那么他将会接收到所有的消息,会忽略调binding key,实现类似扇形交换机的功能。

如果一个routing key中没有设计【#】和【*】,那么他会实现类似直连交换机的功能。

3、代码实现

3.1 源代码实现

生产者:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare(
		"logs_topic", // name
		"topic",      // type
		true,         // durable
		false,        // auto-deleted
		false,        // internal
		false,        // no-wait
		nil,          // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare an exchange,err:", err)
		return
	}
	body := "Hello World by topic exchange"
	err = ch.Publish(
		"logs_topic",       // exchange
		"quick.orange.fox", // routing key
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	if err != nil {
		fmt.Println("Failed to publish a message")
		return
	}
}

代码示例中routing key为【quick.orange.fox】,这条消息将会被路由到2.2中的Q1队列中。

消费侧代码:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare("logs", "direct", true, false, false, false, nil)
	if err != nil {
		fmt.Println("Failed to declare an exchange")
		return
	}
	q, err := ch.QueueDeclare(
		"logs_topic", // name
		true,         // durable
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	err = ch.QueueBind(
		q.Name,       // queue name
		"*.orange.*", // routing key(binding key)
		"logs_topic", // exchange
		false,
		nil,
	)

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		true,   // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	var forever chan struct{}

	go func() {
		for d := range msgs {
			fmt.Printf(" [x] %s\n", d.Body)
		}
	}()

	fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

3.2 运行记录

发送消息:

接收消息:

4、小结

学到这里发现,topic交换机完全具备fanout、direct两种交换机的全部功能,日常开发完全可以使用topic交换机,根据不同routing key即可以实现扇形和直连交换机的功能。

比如第3章节中消费者,routing key设置为【#】,则这个队列可以接收所有消息,类似扇形交换机功能。

相关文章:

  • Python学习笔记7:入门知识(七)
  • 常见排序算法——插入排序(直接插入排序 希尔排序)
  • MediaRecorder + Camera2 视频录制最佳实践
  • 【通信协议-RTCM】GPS-RTK可观测消息 ---- 对应RTCM十六进制 编码ID(3E9 3EA 3EB 3EC)
  • HTML (总结黑马的)
  • 自学网络安全的三个必经阶段(含路线图)
  • AI大模型在健康睡眠监测中的深度融合与实践案例
  • 深入解析JVM的GC过程
  • 导出 Whisper 模型到 ONNX
  • 前端之npm运行时配置文件.npmrc(可用于配置npm淘宝源)
  • PCA降维算法
  • 一文讲清:生产报工系统的功能、报价以及如何选择
  • 【写时复制】内存不一致
  • ARM-V9 RME(Realm Management Extension)系统架构之系统安全能力的侧信道抵御
  • 狒狒吃香蕉(二分查找)
  • CompletableFuture使用
  • 【git基本使用】
  • HCIA12 NAT网络地址转换实验
  • Python搭建自己的VPN
  • CAP理论
  • 退休11年后,71岁四川厅官杨家卷被查
  • 马上评|从一个细节看今年五一档电影
  • 体坛联播|米兰逆转热那亚豪取3连胜,阿诺德官宣离开利物浦
  • 晋城一男子实名举报村支书打伤其67岁父亲,镇政府:案件正在侦办中
  • 酒店民宿一房难求,湖北宣恩文旅局工作人员腾出家中空房给游客救急
  • 中国驻日本大使吴江浩就日本民用飞机侵闯我钓鱼岛领空向日方提出严正交涉