当前位置: 首页 > news >正文

Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统

Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统

一、Spring AMQP 是什么?

Spring AMQP(Application Messaging Protocol)是 Spring 官方提供的对 AMQP 协议的封装,其核心模块有两个:

  • spring-amqp: 提供 AMQP 抽象封装
  • spring-rabbit: RabbitMQ 的具体实现

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

常见的场景包括:

  • 微服务之间的异步通信
  • 秒杀系统削峰
  • 用户注册发送邮件/短信通知
  • 分布式事务的最终一致性方案

二、Spring Boot 集成 RabbitMQ

2.1. 引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

它会自动引入 spring-rabbit 和 spring-amqp 模块。

2.2. 配置 RabbitMQ

spring:rabbitmq:host: 192.168.184.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

三、快速构建消息系统

  • 一个消息队列
  • 一个消息发送者
  • 一个消息监听者(消费者)

构建示例项目:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者
    在这里插入图片描述
    引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

3.1.消息发送

publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

3.2.消息接收

consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

四、WorkQueues模型

4.1. 介绍

Work Queues(工作队列)又叫 任务队列(Task Queues),主要用于将一个任务分发给多个消费者(工作线程)处理,每个任务只会被一个消费者处理。

核心思想是:生产者只管发送任务,多个消费者竞争获取任务并处理,达到并发消费、分担压力的目的
在这里插入图片描述

  • Producer(生产者):发送任务消息。
  • Queue(队列):缓存任务。
  • Consumer(消费者):从队列中获取任务并处理。

每个任务只会被一个消费者处理,多个消费者之间互不干扰。

4.2. 消息发送

publisher服务中的SpringAmqpTest类中添加一个测试方法实现循环发送:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

4.3. 消息接收

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

多个消费者监听同一个队列,消息将被平均分配(默认轮询方式)。

4.4. 公平分发 vs 轮询分发

🔁 默认行为:轮询分发
RabbitMQ 默认采用 Round-Robin(轮询) 分发方式,消费者不论是否处理完当前消息,下一条消息仍然会发给它。

这可能导致:处理慢的消费者积压任务,处理快的消费者反而闲着

✅ 公平分发(prefetch)
设置每个消费者的最大未确认消息数,让 RabbitMQ 只向空闲的消费者发送消息。

spring:rabbitmq:listener:simple:prefetch: 1  # 每个消费者同一时间只能处理1条消息
http://www.dtcms.com/a/321458.html

相关文章:

  • Android12 Framework Sim卡pin与puk码解锁
  • 用LaTeX优化FPGA开发:结合符号计算与Vivado工具链(二)
  • Nature论文-预测和捕捉人类认知的基础模型-用大模型模拟人类认知
  • 麦芽:寻常食材的中医智慧 多炮制方式各显养生价值
  • 动态规划进阶:转移方程优化技巧全解
  • 安卓应用内WebView页面调试技巧
  • WPF 双击行为实现详解:DoubleClickBehavior 源码分析与实战指南
  • 政治社会时间线
  • Java 之 多态
  • UE5太空射击游戏入门(一):项目创建与飞船控制
  • HEVC视频扩展免费下载
  • ISL9V3040D3ST-F085C一款安森美 ON生产的汽车点火IGBT模块,绝缘栅双极型晶体管ISL9V3040D3ST汽车点火电路中的线圈驱动器
  • Redis对象编码
  • 分布式系统性能优化实战:从瓶颈定位到架构升级
  • J2000与WGS84坐标系
  • Docker--docker的学习
  • Visual Studio 2019 + Qt + MySQL 开发调试全过程问题详解
  • 装配式建筑4.0:建筑工业化的智慧飞跃
  • 训练模型时梯度出现NAN或者inf
  • WiFi 核心概念与实战用例全解
  • git环境配置_笔记
  • [Linux]学习笔记系列 -- [arm[kernel]
  • modem上报SIM卡状态为unknown问题分析
  • 6_基于深度学习的火灾检测识别系统(yolo11、yolov8、yolov5+UI界面+Python项目源码+模型+标注好的数据集)
  • 控制建模matlab练习13:线性状态反馈控制器-②系统的能控性
  • #C语言——刷题攻略:牛客编程入门训练(六):运算(三)-- 涉及 辗转相除法求最大公约数
  • JAVA,Maven聚合
  • 【记录】yumdownloader 和 yum install --downloadonly
  • Linux线程概念
  • 一洽客服系统:APP路由等级与路由条件设置