当前位置: 首页 > news >正文

RabbitMQ 4.1.1初体验-队列和交换机

接上一篇博文

启动RabbitMQ

注意:根据版本,要提前安装Erlang的环境
这里用的版本是 RabbitMQ 4.0.9 Erlang 28.0.1

# 启动rabbitmq-server
D:\rabbitmq-server-windows-4.0.9\rabbitmq_server-4.0.9>sbin\rabbitmq-server.bat

在这里插入图片描述

通过web页面新建虚拟主机

新建虚拟主机my-virtual-host(tags:administrator),参见下图
在这里插入图片描述
建完后如下
在这里插入图片描述

Default Queue Type:

  • classic:经典队列,RabbitMQ默认的队列类型是“classic”,也就是直接的、传统的队列。在这种类型的队列中,消息被按照入队的顺序处理,每个消息都会被分发给一个消费者。
  • quorum:仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列。Quorum队列更适合于长期存在的队列,并且在对容错、数据安全方面有更严格要求的场景。相对于追求低延迟、非持久化等高级队列,Quorum队列提供了更可靠的数据复制机制,以满足对数据一致性和高可用性的要求。
  • Stream流式队列: Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。

交换机 (Exchange)

  • 交换器 (Exchange)类型

    • 1、Fanout Exchange(扇形)
    • 2、Direct Exchange(直连)
    • 3、Topic Exchange(主题)
    • 4、Headers Exchange(头部)
  • Fanout Exchange
    Fanout 扇形的,散开的; 扇形交换机
    投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
    在这里插入图片描述

  • 关键代码
    application.yml

#定义要使用的交换机和队列名称
exchange:name: exchange.fanoutqueueA: queue.a
queueB: queue.bspring:application:name: fanout-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic FanoutExchange fanoutExchange(){//参数: 交换机名称return new FanoutExchange(exchangeName);}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){//构造方法return new Queue(queueA);}@Beanpublic Queue queueB(){//构造方法return new Queue(queueB);}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueA).to(fanoutExchange);}@Beanpublic Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(fanoutExchange);}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用扇形交换机FanoutExchange";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//FanoutExchange不需要routingKeyrabbitTemplate.send(exchangeName,"",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果:
在这里插入图片描述

在这里插入图片描述

  • MQ的消息包含两部分
    • 消息体body
    • 消息属性MessageProperties

Direct Exchange
根据路由键精确匹配(一模一样)进行路由消息队列;
在这里插入图片描述

实操如下
application.yml

#定义要使用的交换机和队列名称
exchange:name: exchange.directqueueA: queue.direct.a
queueB: queue.direct.bspring:application:name: fanout-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic DirectExchange directExchange(){//参数: 交换机名称   构建器模式(buidler)return ExchangeBuilder.directExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){//绑定队列queueA和交换机directExchange, 需要routingkey( info )return BindingBuilder.bind(queueA).to(directExchange).with("info");}@Beanpublic Binding bindingB(DirectExchange directExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(directExchange).with("error");}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用直连交换机DirectExchange====";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//DirectExchange需要routingKeyrabbitTemplate.send(exchangeName,"info",message);rabbitTemplate.send(exchangeName,"error",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • Topic Exchange
    通配符匹配,相当于模糊匹配;
    • # 匹配多个单词,用来表示任意数量(零个或多个)单词
    • * 匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词:
      beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx,beijing.queue.a.b
      beijing.* == beijing.queue, beijing.xyz

发送时指定的路由键:lazy.orange.rabbit
在这里插入图片描述
示例代码
application.yml

#定义要使用的交换机和队列名称
exchange:name: exchange.topic
queueA: queue.topic.a
queueB: queue.topic.b
spring:application:name: topic-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic TopicExchange topicExchange(){//参数: 交换机名称   构建器模式(buidler)return ExchangeBuilder.topicExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(TopicExchange topicExchange,Queue queueA){//绑定队列queueA和交换机TopicExchange, 需要routingkey( info )return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@Beanpublic Binding bindingB(TopicExchange topicExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用主题交换机TopicExchange====";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//DirectExchange需要routingKeyrabbitTemplate.send(exchangeName,"lazy.orange.rabbit",message);rabbitTemplate.send(exchangeName,"lazy.orange.rabbit",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • Headers Exchange(用的比较少)
    基于每条消息属性中的headers属性进行匹配;
    在这里插入图片描述
    示例代码
#定义要使用的交换机和队列名称
exchange:name: exchange.headerqueueA: queue.header.a
queueB: queue.header.bspring:application:name: header-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic HeadersExchange headersExchange(){//参数: 交换机名称   构建器模式(buidler)return ExchangeBuilder.headersExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(HeadersExchange headersExchange,Queue queueA){//匹配条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",1);headerValues.put("status","m");//绑定队列queueA和交换机HeadersExchangereturn BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();}@Beanpublic Binding bindingB(HeadersExchange headersExchange,Queue queueB){//匹配条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",2);headerValues.put("status","f");//绑定队列queueA和交换机HeadersExchangereturn BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//指定匹配条件,需要使用 Message的属性, 在header中增加条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",1);headerValues.put("status","m");MessageProperties prop  = new MessageProperties();prop.setHeaders(headerValues);//使用 MessageBuilder创建消息//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用Header交换机HeaderExchange====";Message message = MessageBuilder.withBody(text.getBytes()).andProperties(prop).build();//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//HeaderExchange 不需要routingKeyrabbitTemplate.send(exchangeName,"",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果
在这里插入图片描述
在这里插入图片描述

默认交换机

  • 创建虚拟主机的时候就会创建默认交换机,默认交换机的名字是空字符串,默认交换机是直连交换机(Direct)。
  • 每新建一个队列,都会自动和默认交换机绑定,绑定的路由key是该队列的名字
  • 向默认交换机发送消息,交换机名指定为空字符串,路由key为队列的名字
http://www.dtcms.com/a/267190.html

相关文章:

  • 【AI论文】WorldVLA:迈向自回归动作世界模型
  • 第二章 简单程序设计
  • 盘式制动器的设计+说明书和CAD)【6张】+绛重
  • 一种结合双阶段注意力循环神经网络(DA-RNN)和卷积块注意力模块(CBAM)的滚动轴承故障诊断方法
  • Rust实用案例解析
  • 后端树形结构
  • Qt处理USB摄像头开发说明与QtMultimedia与V4L2融合应用
  • 【爬虫】逆向爬虫初体验之爬取音乐
  • 408第三季part2 - 计算机网络 - 物理层
  • 由coalesce(1)OOM引发的coalesce和repartition理解
  • 3dmax一键烘焙很多张贴图合并成一张贴图插件支持fbx/obj/blender多材质模型合并为一张贴图
  • OneCode自主UI设计体系:架构解析与核心实现
  • web前端面试-- MVC、MVP、MVVM 架构模式对比
  • Vue.js TDD开发深度指南:工具链配置与精细化测试策略
  • 爬虫工程师Chrome开发者工具简单介绍
  • Kafka消息积压的多维度解决方案:超越简单扩容的完整策略
  • 牛客刷题 — 【排序】[NOIP2010] 导弹拦截(排序枚举)
  • 光伏发电园区管理系统 - Three.js + Django 实现方案
  • React Hooks全面解析:从基础到高级的实用指南
  • 【论文解读】Referring Camouflaged Object Detection
  • SqueezeBERT:计算机视觉能为自然语言处理在高效神经网络方面带来哪些启示?
  • 7月5日星期六今日早报简报微语报早读
  • 在服务器上配置MQ注意的问题
  • Gartner《Stream Processing: 新一代数据处理范式》学习报告
  • Flink-状态恢复-isRestore分析
  • 使用影刀RPA实现每日消防巡检提醒
  • 常见高危端口风险分析与防护指南
  • PostgreSQL表操作
  • Python Fabric库【系统管理工具】全面讲解
  • MQTT与HTTP在物联网中的比较:为什么MQTT是更好的选择