当前位置: 首页 > news >正文

分布式之RabbitMQ的使用(3)QueueBuilder

文章目录

  • 前言
  • 一、概述
  • 二、功能特点
  • 三、准备工作
    • 创建CreateRabbitMQConfig
    • 创建listening
    • 创建QueueBuilderController
  • 四、常用属性及方法介绍
    • (一)创建持久化队列
      • 语法
      • 代码编写
        • CreateRabbitMQConfig
        • QueueBuilderController
        • ListenRabbitMQConfig
        • 测试
    • (二)创建非持久化队列
    • (三)创建排他队列
    • (四) 创建自动删除队列
  • (五)设置队列长度限制
  • 五、使用步骤
    • (一)引入相关依赖
    • (二)创建队列对象
    • (三)后续操作

前言

本篇帖子是上一篇分布式之RabbitMQ的使用(3)的续写。

一、概述

QueueBuilder 是在与消息队列系统(如 RabbitMQ)集成时,用于以编程方式构建队列(Queue)的工具类或构建器模式的实现。它提供了一种便捷的方法来配置队列的各种属性,使得在应用程序中能够根据具体需求灵活创建不同类型和特性的队列。

二、功能特点

  1. 属性配置灵活性:通过一系列方法,可以轻松设置队列的关键属性,如是否持久化、是否排他、是否自动删除等,以满足不同的业务场景和消息处理要求。
  2. 与消息队列系统紧密集成:通常是特定消息队列客户端库(如 Spring AMQP 用于 Spring Boot 与RabbitMQ 集成)的一部分,能够无缝对接相应的消息队列服务,确保创建的队列在系统中正确生效。

三、准备工作

创建CreateRabbitMQConfig

新建config文件夹,并在其文件夹下创建类CreateRabbitMQConfig

package com.hsh.config;import org.springframework.context.annotation.Configuration;@Configuration
public class CreateRabbitMQConfig {
}

创建listening

新建listen文件夹,并在其文件夹下创建类ListenRabbitMQConfig

package com.hsh.listen;
import org.springframework.stereotype.Component;@Component
public class ListenRabbitMQConfig {
}

创建QueueBuilderController

在controller文件夹下创建类QueueBuilderController

package com.hsh.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {@Autowiredprivate RabbitTemplate rabbitTemplate;
}

四、常用属性及方法介绍

(一)创建持久化队列

语法

方法: QueueBuilder.durable(String queueName)
说明:用于创建一个持久化队列。持久化队列在消息队列服务器重启或发生意外故障后,其队列定义(包括队列名称、属性等)以及队列中尚未被消费的消息不会丢失,会在服务器恢复正常后继续存在,可以继续被消费者消费。
语法如下示例:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();

在上述示例中,创建了一个名为 myDurableQueue 的持久化队列。

代码编写

我们以创建简单队列的持久化队列为例。

CreateRabbitMQConfig

在上面新建的CreateRabbitMQConfig中编写,用于在项目加载时创建队列

package com.hsh.config;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class CreateRabbitMQConfig {@Beanpublic Queue myDurableQueue () {Queue durableQueue = QueueBuilder.durable("myDurableQueue").build();return durableQueue;}
}
QueueBuilderController

编写QueueBuilderController,用于用户发起请求,也就是生产者。

package com.hsh.controller;import com.hsh.pojo.Goods;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String index(){Goods goods =new Goods();for (int i = 0; i < 10; i++){goods.setGoodsId(i);rabbitTemplate.convertAndSend("myDurableQueue", goods);}return "发送成功";}
}
ListenRabbitMQConfig

编写ListenRabbitMQConfig用于监听队列

package com.hsh.listen;import com.hsh.pojo.Goods;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;@Component
public class ListenRabbitMQConfig {// 不在这里创建队列了 @RabbitListener(queuesToDeclare = @Queue("work"))// 我们这个myDurableQueue在上面的 CreateRabbitMQConfig创建,这样更加灵活,也是为什么使用QueueBuilder的原因。@RabbitListener(queues = "myDurableQueue")public void receive(Message  message){Goods goods = (Goods) SerializationUtils.deserialize(message.getBody());System.out.println("消费者===========:" + goods);}
}
测试

直接访问http://localhost:8080/queueBuilderController/send
控制台如下
在这里插入图片描述

(二)创建非持久化队列

方法: QueueBuilder.nonDurable(String queueName)
说明:创建一个非持久化队列。非持久化队列在某些情况下(如连接关闭、相关消费者进程结束等),队列及其可能剩余的消息通常会被自动删除,不具备在服务器重启等情况下的数据保留能力。
示例:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();

这里创建了一个名为 myNonDurableQueue 的非持久化队列。

和上面创建持久化队列差不多就是把CreateRabbitMQConfig的内容换一下不再演示。

(三)创建排他队列

  • 方法: QueueBuilder.exclusive(String queueName)
  • 说明:排他队列是连接专属的,只有创建它的连接可以使用它,并且当连接关闭时,该队列会被自动删除。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue exclusiveQueue = QueueBuilder.exclusive("myExclusiveQueue")
    .build();
    

此示例创建了一个名为 myExclusiveQueue 的排他队列。

(四) 创建自动删除队列

  • 方法: QueueBuilder.autoDelete(String queueName)
  • 说明:自动删除队列在所有消费者都取消订阅或者队列中的消息都被消费完后(满足其中一个条件即可),队列会会被自动删除。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue autoDeleteQueue = QueueBuilder.autoDelete("myAutoDeleteQueue")
    .build();
    

创建了一个名为 myAutoDeleteQueue 的自动删除队列。

(五)设置队列长度限制

  • 方法: QueueBuilder.durable(String queueName).maxLength(int maxLength) (以在持久化队列上设置为例,非持久化等其他类型队列设置方式类似)
  • 说明:用于设置队列的最大长度限制,即队列中最多能容纳的消息数量。当队列中的消息数量达到该限制时,后续发送到该队列的消息可能会根据消息队列系统的策略进行相应处理(如被丢弃等)。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue queueWithLengthLimit = QueueBuilder.durable("myQueueWithLimit")
    .maxLength(1000)
    .build();
    

在上述示例中,创建了一个名为 myQueueWithLimit 的持久化队列,并设置其最大长度为 1000 条消息。

五、使用步骤

(一)引入相关依赖

在使用 QueueBuilder 之前,需要确保项目中引入了与消息队列系统对应的客户端库。例如,在 SpringBoot 中集成 RabbitMQ 并使用 QueueBuilder 时,需要在项目的 pom.xml(Maven 项目)或build.gradle(Gradle 项目)中引入 Spring AMQP 依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(二)创建队列对象

根据业务需求,选择合适的 QueueBuilder 方法来创建队列对象。也就是上面的CreateRabbitMQConfig中编写如下代码例如:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
// 创建一个持久化队列
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
// 创建一个非持久化队列
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();

(三)后续操作

创建好队列对象后,可以根据具体的消息队列集成场景进行后续操作,比如:

将队列绑定到交换机(在使用交换机的消息队列架构中):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {// 创建一个持久化队列@Beanpublic Queue myQueue() {return QueueBuilder.durable("myQueue").build();}// 创建一个持久化的直接交换机@Beanpublic DirectExchange myDirectExchange() {return ExchangeBuilder.directExchange("myDirectExchange").durable(true).build();}// 创建队列与交换机之间的绑定关系/**/@Beanpublic Binding binding(DirectExchange myDirectExchange, Queue myQueue) {return BindingBuilder.bind(myQueue).to(myDirectExchange).with("routingKey");}
}
  • myQueue 方法使用 QueueBuilder 创建了一个名为 myQueue 的持久化队列,并通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。
  • myDirectExchange 方法使用 ExchangeBuilder 创建了一个名为myDirectExchange 的持久化直接交换机,同样通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。
  • binding 方法通过 @Bean 注解创建了一个 Binding 对象,用于将前面创建的 myQueue 队列和myDirectExchange 交换机进行绑定,并且指定了路由键为 routingKey 。Spring 会自动将已经创建好的 myDirectExchangemyQueue 这两个 Bean 作为参数传递给 binding 方法,无需手动干预。

binding方法:

  • 调用时机:Spring 在创建 Binding 这个 Bean 时会自动调用 binding 方法。因为这个方法也被标注了 @Bean 注解,所以 Spring 会识别它并按照创建 Bean 的流程来处理。
  • 参数传递: binding 方法的参数是 DirectExchange myDirectExchangeQueue myQueue 。Spring 会通过方法参数的类型来自动匹配已经在 Spring 容器中创建好的相应 Bean 实例并作为参数传递进来。也就是说,Spring 会找到之前通过 myDirectExchange 方法创建并注册的直接交换机 Bean 和通过 myQueue 方法创建并注册的队列 Bean,然后将它们分别传递给 binding 方法,以便在方法内部使用 BindingBuilder 将队列和交换机进行绑定操作。

将队列提供给消息生产者和消费者使用:
在消息生产者端,可以将消息发送到创建好的队列中。例如,在 Spring AMQP 中:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate Queue myQueue;public void sendMessage(String message) {rabbitTemplate.convertAndSend(myQueue.getName(), message);}
}

在上述代码中,通过 RabbitTemplate 将消息发送到了名为 myQueue 的队列中(这里假设 myQueue 是之前通过 QueueBuilder 创建的队列)。

在消息消费者端,可以从创建好的队列中获取消息进行处理。例如:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {// 处理接收到的消息System.out.println("Received message: " " + message);}
}

这里通过 RabbitListener 注解监听名为 myQueue 的队列,当队列中有消息时,会调用receiveMessage 方法进行处理。

http://www.dtcms.com/a/481667.html

相关文章:

  • 建立自己网站的好处抖音代运营可以相信吗
  • Flink 状态和 CheckPoint 的区别和联系(附源码)
  • QML学习笔记(三十六)QML的ComboBox
  • 媒介宣发的技术革命:Infoseek如何用AI重构企业传播全链路
  • uniapp开发小程序
  • 浦江县建设局网站国家企业信息信用信息公示网址
  • 2025年燃气从业人员考试真题分享
  • SuperMap iServer 数据更新指南
  • C++基础:(十三)list类的模拟实现
  • 【网络编程】从数据链路层帧头到代理服务器:解析路由表、MTU/MSS、ARP、NAT 等网络核心技术
  • 北京网站seowyhseo网站模板但没有后台如何做网站
  • 对接世界职业院校技能大赛标准,唯众打造高质量云计算实训室
  • 利用人工智能、数字孪生、AR/VR 进行军用飞机维护
  • [特殊字符] Maven 编译报错「未与 -source 8 一起设置引导类路径」完美解决方案(以芋道项目为例)
  • 【CV】泊松图像融合
  • 云智融合:人工智能与云计算融合实践指南
  • Maven创建Java项目实战全流程
  • 泉州市住房与城乡建设网站wordpress弹出搜索
  • [创业之路-691]:历史与现实的镜鉴:从三国纷争到华为铁三角的系统性启示
  • 时序数据库选型革命:深入解析Apache IoTDB的架构智慧与实战指南
  • 南通网站制作建设手机网页设计软件下载
  • OpenAI推出即时支付功能,ChatGPT将整合电商能力|技术解析与行业影响
  • 小杰深度学习(seventeen)——视觉-经典神经网络——MObileNetV3
  • 线性代数 | 要义 / 本质 (下篇)
  • C# 预处理指令 (# 指令) 详解
  • 有趣的机器学习-利用神经网络来模拟“古龙”写作风格的输出器
  • AI破解数学界遗忘谜题:GPT-5重新发现尘封二十年的埃尔德什问题解法
  • ui网站推荐如何建网站不花钱
  • Java版自助共享空间系统,打造高效无人值守智慧实体门店
  • 《超越单链表的局限:双链表“哨兵位”设计模式,如何让边界处理代码既优雅又健壮?》