使用Spring Cloud Stream 模拟生产者消费者group destination的介绍(整合rabbitMQ)
文章目录
- 前言
- 整合
- 设置不同的group的结果
- 一个生产者 两个消费者 相同destination 不同的group 自动确认
- 一个生产者 两个消费者 相同destination 相同的group 自动确认
- 疑问?为什么group2 也有1条消息呢?
- 一个生产者 两个消费者 相同destination 没有手动显式设置group 自动确认
- group destination的抽象概念介绍
- 如何统一开发模型(跨 MQ 实现一致代码)
- 如何手动ack
- 总结
前言
楼主每次都是直接使用的公司封装好的调用mq的代码 发送消息 消费消息但是还没有好好的去认真了解下在微服务下是如何整合不同的mq,简单查看公司的实现方案下发现是使用的Spring Cloud Stream。所以今天就来简单的尝试下整合Spring Cloud Stream研究清楚发送消息和消费消息的过程。
本文是一篇对Spring Cloud Stream的简单整合使用,让你初步体验到为什么在微服务下使用Spring Cloud Stream的优点
1、 统一开发模型(跨 MQ 实现一致代码)
你只需写一次 destination + group + Consumer/Function,无需关心底层用的是 Kafka、Rabbit 还是 RocketMQ
2、声明式编程(配置驱动)
3、自动管理队列/绑定
本文会介绍
1、group destination的抽象概念
2、如何统一开发模型(跨 MQ 实现一致代码)
整合
pom:导入相关的包
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>
配置:
# 注意:存在group 和destination的概念 后文会详细介绍这两个概念对应mq中的实际样子是怎么样的
spring:cloud:stream:function:definition: receiver1Consumer;receiver2Consumer # 必须与@Bean名称一致bindings:sender-out-0:destination: broadcast-topicreceiver1Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topicgroup: group1receiver2Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topicgroup: group2rabbit:bindings:receiver1Consumer-in-0:consumer:acknowledge-mode: auto # 注意:设置为auto会自动ackreceiver2Consumer-in-0:consumer:acknowledge-mode: auto
生产者发送消息:
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;@Service
public class SenderService {private final StreamBridge streamBridge;public SenderService(StreamBridge streamBridge) {this.streamBridge = streamBridge;}public void sendMessage(String msg) {streamBridge.send("sender-out-0", msg);}
}
两个消费者消费消息:
import com.rabbitmq.client.Channel;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.function.Consumer;@Component
public class Receiver1 {@Beanpublic Consumer<Message<String>> receiver1Consumer() {return message -> {String msg = message.getPayload();System.out.println("Receiver1 收到消息: " + msg);// 获取AMQP相关信息Channel channel = (Channel) message.getHeaders().get("amqp_channel");Long deliveryTag = (Long) message.getHeaders().get("amqp_deliveryTag");// 以下代码在yml配置为auto的时候是不需要的,以下代码是在manual模式下的手动确认代码,auto模式不需要手动ack
// if (channel != null && deliveryTag != null) {
// try {
// // 你的业务逻辑
// processMessage(msg);
//
// // 确认消息
// channel.basicAck(deliveryTag, false);
// System.out.println("消息处理成功并确认");
//
// } catch (Exception e) {
// System.err.println("处理失败: " + e.getMessage());
// try {
// // 拒绝消息,重新入队
// channel.basicNack(deliveryTag, false, true);
// } catch (IOException ex) {
// throw new RuntimeException("拒绝消息失败", ex);
// }
// }
// }};}private void processMessage(String message) {// 你的业务逻辑// 如果这里抛出异常,消息会被nack}
}
import com.rabbitmq.client.Channel;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.function.Consumer;@Component
public class Receiver2 {@Beanpublic Consumer<Message<String>> receiver2Consumer() {return message -> {String msg = message.getPayload();System.out.println("Receiver2 收到消息: " + msg);// 获取AMQP相关信息Channel channel = (Channel) message.getHeaders().get("amqp_channel");Long deliveryTag = (Long) message.getHeaders().get("amqp_deliveryTag");// 以下代码在yml配置为auto的时候是不需要的,以下代码是在manual模式下的手动确认代码,auto模式不需要手动ack
// if (channel != null && deliveryTag != null) {
// try {
// // 你的业务逻辑
// processMessage(msg);
//
// // 确认消息
// channel.basicAck(deliveryTag, false);
// System.out.println("消息处理成功并确认");
//
// } catch (Exception e) {
// System.err.println("处理失败: " + e.getMessage());
// try {
// // 拒绝消息,重新入队
// channel.basicNack(deliveryTag, false, true);
// } catch (IOException ex) {
// throw new RuntimeException("拒绝消息失败", ex);
// }
// }
// }};}private void processMessage(String message) {// 你的业务逻辑// 如果这里抛出异常,消息会被nack}
}
设置不同的group的结果
一个生产者 两个消费者 相同destination 不同的group 自动确认
spring:cloud:stream:function:definition: receiver1Consumer;receiver2Consumer # 必须与@Bean名称一致bindings:sender-out-0:destination: broadcast-topicreceiver1Consumer-in-0: destination: broadcast-topicgroup: group1 # 不同的groupreceiver2Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topicgroup: group2 # 不同的grouprabbit:bindings:receiver1Consumer-in-0:consumer:acknowledge-mode: auto # 自动确认receiver2Consumer-in-0:consumer:acknowledge-mode: auto# 自动确认
一个生产者 两个消费者 相同destination 相同的group 自动确认
spring:cloud:stream:function:definition: receiver1Consumer;receiver2Consumer # 必须与@Bean名称一致bindings:sender-out-0:destination: broadcast-topicreceiver1Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topicgroup: group1 #相同的group1receiver2Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topicgroup: group1
疑问?为什么group2 也有1条消息呢?
因为当前配置就是创建了一个exchange和两个queue 然后采用# 通配符模式去匹配的queue
所以发送一条消息
-> exchange
->推送给绑定到这个exchange的两个queue
->group1的queue被消费者1消费
最终就在group2剩下了一条消息
一个生产者 两个消费者 相同destination 没有手动显式设置group 自动确认
会比较不一样
当不设置 group 时,Spring Cloud Stream 会默认为每个消费者创建一个匿名 Queue。
所有 Queue 都会绑定到相同的 Exchange。
每条消息都会被复制投递到每个队列中。
spring:cloud:stream:function:definition: receiver1Consumer;receiver2Consumer # 必须与@Bean名称一致bindings:sender-out-0:destination: broadcast-topicreceiver1Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topicreceiver2Consumer-in-0: # 注意格式:beanName-in-0destination: broadcast-topic
group destination的抽象概念介绍
destination 表示“消息的目标地址”,相当于一个逻辑主题或主题名称,不关心底层是 RabbitMQ 的 Exchange、Kafka 的 Topic 还是 RocketMQ 的 Topic。
对应 Kafka → Topic
对应 RabbitMQ → Exchange
对应 RocketMQ → Topic
group 表示消费者分组,是用于实现“消息队列负载均衡”的抽象。
同一个 destination + 相同的 group → 多个实例共享同一个 Queue,负载均衡消费
同一个 destination + 不同的 group → 每个 group 拥有独立的 Queue,广播消费
对应底层的表现:
Kafka 会作为 Consumer Group,只有一个实例消费 RabbitMQ 会创建一个 destination.group 的队列
RocketMQ 映射为 Consumer Group
用上文的代码去形象理解:
Spring Cloud Stream 概念 | RabbitMQ 中的映射 |
---|---|
destination = broadcast-topic | 创建一个 topic 类型的 Exchange:broadcast-topic (如果不存在) |
group = group3 | 创建一个 Queue:broadcast-topic.group3 |
自动 Binding | 将 Queue broadcast-topic.group3 绑定到 Exchange broadcast-topic ,RoutingKey 默认为 # |
消费行为 | 该 Queue 中的消息会由 group3 中的消费者组中的某个实例消费 |
ack 方式 | 默认是 AUTO,也可以配置为 MANUAL |
如何统一开发模型(跨 MQ 实现一致代码)
其实也就是只关注 destination + group + Consumer/Function,无需关心底层用的是 Kafka、Rabbit 还是 RocketMQ。
相比于Rabbit原生API需要手动创建 exchange/queue/binding、声明 channel、ack等。
Spring Cloud Stream 统一后:这些都配置 + 自动完成
如何手动ack
yml需要调整为manual:
acknowledge-mode: manual
代码需要手动确认逻辑:
@Beanpublic Consumer<Message<String>> receiver1Consumer() {return message -> {String msg = message.getPayload();System.out.println("Receiver1 收到消息: " + msg);// 获取AMQP相关信息Channel channel = (Channel) message.getHeaders().get("amqp_channel");Long deliveryTag = (Long) message.getHeaders().get("amqp_deliveryTag");if (channel != null && deliveryTag != null) {try {// 你的业务逻辑processMessage(msg);// 确认消息channel.basicAck(deliveryTag, false);System.out.println("消息处理成功并确认");} catch (Exception e) {System.err.println("处理失败: " + e.getMessage());try {// 拒绝消息,重新入队channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {throw new RuntimeException("拒绝消息失败", ex);}}}};}
总结
这是一篇对Spring Cloud Stream的简单整合尝试 具体如何实现不同的消息队列的切换 后续再聊 关注【代码丰】码字不易 请点赞收藏 谢谢