【第37章】Spring Cloud之Spring Cloud Stream分布式消息队列
文章目录
- 前言
- 一、准备
- 1. 专业术语
- 2. 版本关系
- 3. 引入依赖
- 二、核心服务
- 1. 生产者
- 1.1 生产者配置
- 1.2 核心代码
- 2. 消费者
- 2.1 消费者配置
- 2.2 核心代码
- 三、单元测试
- 1. 请求
- 2. 结果
- 3. 控制台
- 总结
前言
Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。
入门篇请参考java中使用rabbitmq
接下来我们模拟场景:中秋节,商家给消费者发放大礼包,消费者凭收到信息提示核对中秋大礼包。
一、准备
1. 专业术语
-
目的地绑定器(Binders):负责提供与外部消息系统集成的组件。
-
目标绑定(Bindings):外部消息传递系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
-
消息(Message):生产者和消费者用于与目的地绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。
2. 版本关系
| Spring Cloud Stream | Spring Cloud | Spring Boot |
|---|---|---|
| 4.0.x | 2022.0.x aka Kilburn | 3.0.x |
| 3.2.x | 2021.0.x aka Jubilee | 2.6.x, 2.7.x (Starting with 2021.0.3 of Spring Cloud) |
| 3.1.x | 2020.0.x aka Ilford | 2.4.x, 2.5.x (Starting with 2020.0.3 of Spring Cloud) |
3. 引入依赖
我们这里直接使用spring-cloud的starter
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
二、核心服务
后端服务我们分为生产者(商家)和消费者(客户)
1. 生产者
1.1 生产者配置
spring:application:name: provider-servicerabbitmq:host: 192.168.145.128port: 5672username: guestpassword: guestvirtualHost: cloudcloud:stream:binders:rabbit1:type: rabbitenvironment:spring:rabbitmq:host: 192.168.145.128port: 5672username: guestpassword: guestvhost: cloudbindings:send-out-0:destination: zhongqiubinder: rabbit1
1.2 核心代码
package org.example.nacos.provider.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/9/17*/
@RestController
@RequestMapping("/producer")
public class ProducerController {private final Logger logger = LoggerFactory.getLogger(TestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/send")public String send(@RequestParam("number") int number,@RequestParam("message") String message){logger.debug("本次发放"+number+"中秋大礼包");AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < number; i++) {streamBridge.send("send-out-0",count.incrementAndGet()+" "+message);}return "SUCCESS";}
}
2. 消费者
2.1 消费者配置
spring:application:name: consumer-servicerabbitmq:host: 192.168.145.128port: 5672username: guestpassword: guestvirtualHost: cloudcloud:stream:binders:rabbit1:type: rabbitenvironment:spring:rabbitmq:host: 192.168.145.128port: 5672username: guestpassword: guestvhost: cloudbindings:receive-in-0:destination: zhongqiubinder: rabbit1group: customfunction:definition: receive;
2.2 核心代码
package org.example.nacos.consumer.amqp;import org.example.nacos.consumer.controller.TestController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;/*** Create by zjg on 2024/9/17*/
@Configuration
public class RabbitMQConsumer {private final Logger logger = LoggerFactory.getLogger(TestController.class);@Value("${server.port}")private int port;@Beanpublic Consumer<String> receive() {return s -> logger.info(port+":Data received..." + s);}
}
三、单元测试
1. 请求

2. 结果
生产者
2024-09-17 17:43:34.762 [TID: N/A] DEBUG 11124 [provider-service] [nio-9001-exec-5] o.e.n.p.controller.TestController :23 : 本次发放10中秋大礼包
消费者
2024-09-17 17:49:53.160 [TID:N/A] INFO 12756 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9003:Data received...2 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.257 [TID:N/A] INFO 12756 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9003:Data received...4 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.260 [TID:N/A] INFO 12756 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9003:Data received...6 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.262 [TID:N/A] INFO 12756 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9003:Data received...9 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。2024-09-17 17:49:53.159 [TID: N/A] INFO 12436 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9004:Data received...1 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.161 [TID: N/A] INFO 12436 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9004:Data received...3 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.258 [TID: N/A] INFO 12436 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9004:Data received...5 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.260 [TID: N/A] INFO 12436 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9004:Data received...7 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.262 [TID: N/A] INFO 12436 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9004:Data received...8 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
2024-09-17 17:49:53.264 [TID: N/A] INFO 12436 [consumer-service] [ongqiu.custom-1] o.e.n.c.controller.TestController :23 : 9004:Data received...10 花好月圆,阖家欢乐,恭喜您获得中秋月饼大礼包一份。
3. 控制台



总结
回到顶部
官方网站
官方文档
项目源码
官方案例
在这里祝大家,中秋快乐,阖家幸福,万事如意!
写了有一年了,我记得发布出去了,不晓得为啥在草稿箱,补发。
