分布式之RabbitMQ的使用(3)QueueBuilder
文章目录
- 前言
- 一、概述
- 二、功能特点
- 三、准备工作
- 创建CreateRabbitMQConfig
- 创建listening
- 创建QueueBuilderController
- 四、常用属性及方法介绍
- (一)创建持久化队列
- 语法
- 代码编写
- CreateRabbitMQConfig
- QueueBuilderController
- ListenRabbitMQConfig
- 测试
- (二)创建非持久化队列
- (三)创建排他队列
- (四) 创建自动删除队列
- (五)设置队列长度限制
- 五、使用步骤
- (一)引入相关依赖
- (二)创建队列对象
- (三)后续操作
前言
本篇帖子是上一篇分布式之RabbitMQ的使用(3)的续写。
一、概述
QueueBuilder 是在与消息队列系统(如 RabbitMQ)集成时,用于以编程方式构建队列(Queue)的工具类或构建器模式的实现。它提供了一种便捷的方法来配置队列的各种属性,使得在应用程序中能够根据具体需求灵活创建不同类型和特性的队列。
二、功能特点
- 属性配置灵活性:通过一系列方法,可以轻松设置队列的关键属性,如是否持久化、是否排他、是否自动删除等,以满足不同的业务场景和消息处理要求。
- 与消息队列系统紧密集成:通常是特定消息队列客户端库(如 Spring AMQP 用于 Spring Boot 与RabbitMQ 集成)的一部分,能够无缝对接相应的消息队列服务,确保创建的队列在系统中正确生效。
三、准备工作
创建CreateRabbitMQConfig
新建config文件夹,并在其文件夹下创建类CreateRabbitMQConfig
package com.hsh.config;import org.springframework.context.annotation.Configuration;@Configuration
public class CreateRabbitMQConfig {
}
创建listening
新建listen文件夹,并在其文件夹下创建类ListenRabbitMQConfig
package com.hsh.listen;
import org.springframework.stereotype.Component;@Component
public class ListenRabbitMQConfig {
}
创建QueueBuilderController
在controller文件夹下创建类QueueBuilderController
package com.hsh.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {@Autowiredprivate RabbitTemplate rabbitTemplate;
}
四、常用属性及方法介绍
(一)创建持久化队列
语法
方法: QueueBuilder.durable(String queueName)
说明:用于创建一个持久化队列。持久化队列在消息队列服务器重启或发生意外故障后,其队列定义(包括队列名称、属性等)以及队列中尚未被消费的消息不会丢失,会在服务器恢复正常后继续存在,可以继续被消费者消费。
语法如下示例:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
在上述示例中,创建了一个名为 myDurableQueue 的持久化队列。
代码编写
我们以创建简单队列的持久化队列为例。
CreateRabbitMQConfig
在上面新建的CreateRabbitMQConfig中编写,用于在项目加载时创建队列
package com.hsh.config;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class CreateRabbitMQConfig {@Beanpublic Queue myDurableQueue () {Queue durableQueue = QueueBuilder.durable("myDurableQueue").build();return durableQueue;}
}
QueueBuilderController
编写QueueBuilderController,用于用户发起请求,也就是生产者。
package com.hsh.controller;import com.hsh.pojo.Goods;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String index(){Goods goods =new Goods();for (int i = 0; i < 10; i++){goods.setGoodsId(i);rabbitTemplate.convertAndSend("myDurableQueue", goods);}return "发送成功";}
}
ListenRabbitMQConfig
编写ListenRabbitMQConfig用于监听队列
package com.hsh.listen;import com.hsh.pojo.Goods;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;@Component
public class ListenRabbitMQConfig {// 不在这里创建队列了 @RabbitListener(queuesToDeclare = @Queue("work"))// 我们这个myDurableQueue在上面的 CreateRabbitMQConfig创建,这样更加灵活,也是为什么使用QueueBuilder的原因。@RabbitListener(queues = "myDurableQueue")public void receive(Message message){Goods goods = (Goods) SerializationUtils.deserialize(message.getBody());System.out.println("消费者===========:" + goods);}
}
测试
直接访问http://localhost:8080/queueBuilderController/send
控制台如下
(二)创建非持久化队列
方法: QueueBuilder.nonDurable(String queueName)
说明:创建一个非持久化队列。非持久化队列在某些情况下(如连接关闭、相关消费者进程结束等),队列及其可能剩余的消息通常会被自动删除,不具备在服务器重启等情况下的数据保留能力。
示例:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();
这里创建了一个名为 myNonDurableQueue 的非持久化队列。
和上面创建持久化队列差不多就是把CreateRabbitMQConfig的内容换一下不再演示。
(三)创建排他队列
- 方法: QueueBuilder.exclusive(String queueName)
- 说明:排他队列是连接专属的,只有创建它的连接可以使用它,并且当连接关闭时,该队列会被自动删除。
- 示例:
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; Queue exclusiveQueue = QueueBuilder.exclusive("myExclusiveQueue") .build();
此示例创建了一个名为 myExclusiveQueue 的排他队列。
(四) 创建自动删除队列
- 方法: QueueBuilder.autoDelete(String queueName)
- 说明:自动删除队列在所有消费者都取消订阅或者队列中的消息都被消费完后(满足其中一个条件即可),队列会会被自动删除。
- 示例:
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; Queue autoDeleteQueue = QueueBuilder.autoDelete("myAutoDeleteQueue") .build();
创建了一个名为 myAutoDeleteQueue 的自动删除队列。
(五)设置队列长度限制
- 方法: QueueBuilder.durable(String queueName).maxLength(int maxLength) (以在持久化队列上设置为例,非持久化等其他类型队列设置方式类似)
- 说明:用于设置队列的最大长度限制,即队列中最多能容纳的消息数量。当队列中的消息数量达到该限制时,后续发送到该队列的消息可能会根据消息队列系统的策略进行相应处理(如被丢弃等)。
- 示例:
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; Queue queueWithLengthLimit = QueueBuilder.durable("myQueueWithLimit") .maxLength(1000) .build();
在上述示例中,创建了一个名为 myQueueWithLimit 的持久化队列,并设置其最大长度为 1000 条消息。
五、使用步骤
(一)引入相关依赖
在使用 QueueBuilder 之前,需要确保项目中引入了与消息队列系统对应的客户端库。例如,在 SpringBoot 中集成 RabbitMQ 并使用 QueueBuilder 时,需要在项目的 pom.xml(Maven 项目)或build.gradle(Gradle 项目)中引入 Spring AMQP 依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(二)创建队列对象
根据业务需求,选择合适的 QueueBuilder 方法来创建队列对象。也就是上面的CreateRabbitMQConfig
中编写如下代码例如:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
// 创建一个持久化队列
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
// 创建一个非持久化队列
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();
(三)后续操作
创建好队列对象后,可以根据具体的消息队列集成场景进行后续操作,比如:
将队列绑定到交换机(在使用交换机的消息队列架构中):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {// 创建一个持久化队列@Beanpublic Queue myQueue() {return QueueBuilder.durable("myQueue").build();}// 创建一个持久化的直接交换机@Beanpublic DirectExchange myDirectExchange() {return ExchangeBuilder.directExchange("myDirectExchange").durable(true).build();}// 创建队列与交换机之间的绑定关系/**/@Beanpublic Binding binding(DirectExchange myDirectExchange, Queue myQueue) {return BindingBuilder.bind(myQueue).to(myDirectExchange).with("routingKey");}
}
myQueue
方法使用QueueBuilder
创建了一个名为myQueue
的持久化队列,并通过@Bean
注解将其注册为 Spring 容器中的一个 Bean。myDirectExchange
方法使用ExchangeBuilder
创建了一个名为myDirectExchange
的持久化直接交换机,同样通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。binding
方法通过@Bean
注解创建了一个Binding
对象,用于将前面创建的myQueue
队列和myDirectExchange
交换机进行绑定,并且指定了路由键为routingKey
。Spring 会自动将已经创建好的myDirectExchange
和myQueue
这两个 Bean 作为参数传递给binding
方法,无需手动干预。
binding方法:
- 调用时机:Spring 在创建
Binding
这个 Bean 时会自动调用binding
方法。因为这个方法也被标注了@Bean
注解,所以 Spring 会识别它并按照创建 Bean 的流程来处理。 - 参数传递:
binding
方法的参数是DirectExchange myDirectExchange
和Queue myQueue
。Spring 会通过方法参数的类型来自动匹配已经在 Spring 容器中创建好的相应 Bean 实例并作为参数传递进来。也就是说,Spring 会找到之前通过myDirectExchange
方法创建并注册的直接交换机 Bean 和通过myQueue
方法创建并注册的队列 Bean,然后将它们分别传递给binding
方法,以便在方法内部使用BindingBuilder
将队列和交换机进行绑定操作。
将队列提供给消息生产者和消费者使用:
在消息生产者端,可以将消息发送到创建好的队列中。例如,在 Spring AMQP 中:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate Queue myQueue;public void sendMessage(String message) {rabbitTemplate.convertAndSend(myQueue.getName(), message);}
}
在上述代码中,通过 RabbitTemplate
将消息发送到了名为 myQueue
的队列中(这里假设 myQueue
是之前通过 QueueBuilder 创建的队列)。
在消息消费者端,可以从创建好的队列中获取消息进行处理。例如:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received message: " " + message);}
}
这里通过 RabbitListener
注解监听名为 myQueue
的队列,当队列中有消息时,会调用receiveMessage
方法进行处理。