当前位置: 首页 > news >正文

BUS-消息总线

BUS-消息总线

一、Spring Cloud Bus 概念

:::color5
在微服务架构中,消息总线是一条“公用消息主题”,微服务可订阅/消费该主题的消息,从而实现广播通知(如配置全局刷新、服务状态通知等)。

Spring Cloud Bus 是实现消息总线的组件,支持 RabbitMQKafka 作为消息中间件。它常与 Spring Cloud Config 结合,实现**配置更新的全局广播**(即修改 Git 配置后,所有微服务自动刷新配置)。

:::

二、集成 Kafka 实现 Config 全局刷新

以“Config 配置全局刷新”为例,演示 Spring Cloud Bus 的核心用法。

前期准备:安装并启动中间件
  1. 启动 ZooKeeper:Kafka 依赖 ZooKeeper 管理元数据,启动命令(以本地安装为例):
zkServer.sh start
  1. 启动 Kafka
kafka-server-start.sh config/server.properties
  1. 启动 Eureka Server:参考前文“Config 集成 Eureka”的 Eureka 服务端搭建流程。
步骤1:改造 Config Server,集成 Bus 和 Kafka
<!-- pom.xml(Config Server) -->
<dependencies><!-- 原有 Config Server 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-server</artifactId></dependency><!-- Eureka 客户端依赖(注册到 Eureka) --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- Bus 集成 Kafka 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency><!-- Actuator 依赖(用于暴露 bus-refresh 端点) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
</dependencies>
# application.properties(Config Server)
server.port=9000
spring.application.name=config-server# Git 仓库配置
spring.cloud.config.server.git.uri=https://gitee.com/xxx/spring-cloud-config-repo.git
spring.cloud.config.server.git.search-paths=config-repo# Eureka 注册配置
eureka.client.service-url.defaultZone=http://localhost:8000/eureka/# Actuator 暴露所有端点
management.endpoints.web.exposure.include=*
// ConfigServerApplication.java
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {public static void main(String[] args) {SpringApplication.run(ConfigServerApplication.class, args);}
}
步骤2:改造 Config Client,集成 Bus 和 Kafka
<!-- pom.xml(Config Client) -->
<dependencies><!-- 原有 Config Client 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-config</artifactId></dependency><!-- Eureka 客户端依赖(从 Eureka 发现 Config Server) --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- Bus 集成 Kafka 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency><!-- Actuator 依赖 + @RefreshScope 支持配置刷新 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
# bootstrap.properties(Config Client)
spring.application.name=order
spring.cloud.config.profile=dev
spring.cloud.config.label=config-muse
eureka.client.service-url.defaultZone=http://localhost:8000/eureka/
spring.cloud.config.discovery.enabled=true
spring.cloud.config.discovery.service-id=config-server# Actuator 暴露所有端点
management.endpoints.web.exposure.include=*
// ConfigClientApplication.java
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@SpringBootApplication
public class ConfigClientApplication {public static void main(String[] args) {SpringApplication.run(ConfigClientApplication.class, args);}@RestController@RefreshScope // 标记该类支持配置热刷新class ConfigController {@Value("${from}")private String from;@GetMapping("/from")public String getFrom() {return "配置值 from: " + from;}}
}
步骤3:启动服务并验证全局刷新流程
  1. 启动服务:依次启动 Eureka Server、Config Server、多个 Config Client(如端口 8081、8082)。
  2. 初始配置验证:访问 http://localhost:8081/fromhttp://localhost:8082/from,均返回 Git 中配置的 from=git-dev-1.0
  3. 修改 Git 配置:将 Git 仓库中 order-dev.propertiesfrom 改为 git-dev-2.0 并提交。
  4. 触发全局刷新:向 Config Server 发送 POST 请求 http://localhost:9000/actuator/bus-refresh
  5. 验证刷新结果:再次访问 http://localhost:8081/fromhttp://localhost:8082/from,均返回 from=git-dev-2.0,说明所有 Client 已全局刷新配置。
原理说明:消息总线如何实现全局刷新?
  • Config Server 接收到 <font style="color:#DF2A3F;">bus-refresh</font> 请求后,会向 Kafka 的 springCloudBus 主题发送刷新事件(RefreshRemoteApplicationEvent)
  • 所有订阅了 springCloudBus 主题的 Config Client 会消费该事件,触发自身配置刷新(结合 @RefreshScope 实现热更新)。
  • 若需验证 Kafka 消息,可通过 Kafka 控制台消费 springCloudBus 主题:
kafka-console-consumer.sh --topic springCloudBus --bootstrap-server localhost:9092

触发 bus-refresh 后,控制台会打印出刷新事件的 JSON 消息。

三、原理

Spring Cloud Bus 的底层基于 Spring 事件驱动模型,包含三个核心概念:

概念作用
事件(ApplicationEvent)表示“发生的事情”,如 RefreshRemoteApplicationEvent(配置刷新事件)。
事件监听器(ApplicationListener)监听并处理特定事件,如 RefreshListener 处理配置刷新逻辑。
事件发布者(ApplicationEventPublisher)发布事件,触发监听器执行,如 ApplicationEventMulticaster 负责广播事件。
1. 事件(ApplicationEvent)

Spring 中所有事件都继承自 ApplicationEvent,Bus 中核心事件有:

  • RefreshRemoteApplicationEvent:触发配置刷新的事件。
  • AckRemoteApplicationEvent:确认事件已接收的响应事件。

这些事件包含 type(事件类型)、timestamp(时间戳)、originService(来源服务)等属性,可用于追踪和控制事件范围(如只刷新特定服务)。

2. 事件监听器(ApplicationListener)

需实现 ApplicationListener<E> 接口,其中 E 是要监听的事件类型。例如 RefreshListener 监听 RefreshRemoteApplicationEvent,并执行配置刷新逻辑。

3. 事件发布者(ApplicationEventPublisher)

通过 ApplicationEventPublisher#publishEvent() 方法发布事件,ApplicationEventMulticaster 会将事件广播给所有匹配的监听器。

四、工作流原理

1. 触发刷新请求

当 Git 仓库的配置被修改后,我们向 Config Server 发送 <font style="color:rgba(0, 0, 0, 0.85) !important;">POST /actuator/bus-refresh</font> 请求(也可结合 Git Hook 自动触发)。

2. 消息广播(Config Server 发消息)

Config Server 接收到请求后,会向消息总线(Kafka/RabbitMQ)****的 **<font style="color:#DF2A3F;">springCloudBus</font>** ****主题广播一条 “配置刷新事件”(**<font style="color:rgb(0, 0, 0);">RefreshRemoteApplicationEvent</font>**

3. 客户端接收消息(Config Client 消费消息)

所有集成了 Spring Cloud Bus 的 Config Client 都会订阅 <font style="color:rgba(0, 0, 0, 0.85) !important;">springCloudBus</font> 主题,当收到 “配置刷新事件” 后,会感知到 “需要刷新配置”。

4. 配置刷新(Client 执行刷新逻辑)

Config Client 内部通过 <font style="color:#DF2A3F;">@RefreshScope</font> 注解和 Actuator 机制,重新从 Config Server 拉取最新配置,并更新自身的配置信息,从而实现 “全局配置热刷新”。

简单来说,Bus 就像一个 “喇叭”:Config Server 对着喇叭喊 “配置更新了,大家快刷新”,所有订阅了这个 “喇叭频道” 的 Config Client 都会听到并执行刷新操作。

总结

Spring Cloud Bus 是微服务架构的“消息中枢”,通过集成 Kafka/RabbitMQ 实现配置全局刷新、服务广播通知等能力。其底层依赖 Spring 事件驱动模型,保证了事件的可靠发布与消费。

与“单个服务触发刷新”相比,Bus 让 Config Server 成为“刷新入口”,避免了服务单点故障对刷新流程的影响,是生产级微服务配置管理的关键组件。

http://www.dtcms.com/a/394081.html

相关文章:

  • 23种设计模式之【单例模式模式】-核心原理与 Java实践
  • 精度至上,杜绝失真,机器视觉检测中为何常用BMP格式?
  • 关于wireshark流量分析软件brim(Zui)安装方法
  • springboot3.4.1集成pulsar
  • 信息量、熵、KL散度和交叉熵
  • 使用Python一站式提取Word、Excel、PDF 和PPT文档内容v1.0
  • 线性代数 | REF / RREF
  • TLCP的一些内容
  • dock容器网络存储相关练习
  • 鸿蒙Next ArkTS卡片提供方开发指南:从入门到实战
  • Netty LengthFieldBasedFrameDecoder
  • 后端_HTTP 接口签名防篡改实战指南
  • 区块链论文速读 CCF A--WWW 2025(5)
  • 机器学习周报十四
  • 如何解决stun服务无法打洞建立p2p连接的问题
  • 解决项目实践中 java.lang.NoSuchMethodError:的问题
  • JavaSE-多线程(5.2)- ReentrantLock (源码解析,公平模式)
  • 2025华为杯A题B题C题D题E题F题选题建议思路数学建模研研究生数学建模思路代码文章成品
  • 【记录】Docker|Docker中git克隆私有库的安全方法
  • Web之防XSS(跨站脚本攻击)
  • 使用 AI 对 QT应用程序进行翻译
  • Windows下游戏闪退?软件崩溃?游戏环境缺失?软件运行缺少依赖?这个免费工具一键帮您自动修复(DLL文件/DirectX/运行库等问题一键搞定)
  • 【从入门到精通Spring Cloud】统一服务入口Spring Cloud Gateway
  • setfacl 命令
  • Photoshop - Photoshop 分享作品和设计
  • 【Agent 设计模式与工程化】如何做出好一个可持续发展的agent需要考虑的架构
  • 【Camera开发】疑难杂症记录
  • 如何提高自己的Java并发编程能力?
  • Polkadot - ELVES Protocol详解
  • springBoot图片本地存储