BUS-消息总线
BUS-消息总线
一、Spring Cloud Bus 概念
:::color5
在微服务架构中,消息总线是一条“公用消息主题”,微服务可订阅/消费该主题的消息,从而实现广播通知(如配置全局刷新、服务状态通知等)。
Spring Cloud Bus 是实现消息总线的组件,支持 RabbitMQ 和 Kafka 作为消息中间件。它常与 Spring Cloud Config 结合,实现**配置更新的全局广播**(即修改 Git 配置后,所有微服务自动刷新配置)。
:::
二、集成 Kafka 实现 Config 全局刷新
以“Config 配置全局刷新”为例,演示 Spring Cloud Bus 的核心用法。
前期准备:安装并启动中间件
- 启动 ZooKeeper:Kafka 依赖 ZooKeeper 管理元数据,启动命令(以本地安装为例):
zkServer.sh start
- 启动 Kafka:
kafka-server-start.sh config/server.properties
- 启动 Eureka Server:参考前文“Config 集成 Eureka”的 Eureka 服务端搭建流程。
步骤1:改造 Config Server,集成 Bus 和 Kafka
<!-- pom.xml(Config Server) -->
<dependencies><!-- 原有 Config Server 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-server</artifactId></dependency><!-- Eureka 客户端依赖(注册到 Eureka) --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- Bus 集成 Kafka 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency><!-- Actuator 依赖(用于暴露 bus-refresh 端点) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
</dependencies>
# application.properties(Config Server)
server.port=9000
spring.application.name=config-server# Git 仓库配置
spring.cloud.config.server.git.uri=https://gitee.com/xxx/spring-cloud-config-repo.git
spring.cloud.config.server.git.search-paths=config-repo# Eureka 注册配置
eureka.client.service-url.defaultZone=http://localhost:8000/eureka/# Actuator 暴露所有端点
management.endpoints.web.exposure.include=*
// ConfigServerApplication.java
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {public static void main(String[] args) {SpringApplication.run(ConfigServerApplication.class, args);}
}
步骤2:改造 Config Client,集成 Bus 和 Kafka
<!-- pom.xml(Config Client) -->
<dependencies><!-- 原有 Config Client 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-config</artifactId></dependency><!-- Eureka 客户端依赖(从 Eureka 发现 Config Server) --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- Bus 集成 Kafka 依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency><!-- Actuator 依赖 + @RefreshScope 支持配置刷新 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
# bootstrap.properties(Config Client)
spring.application.name=order
spring.cloud.config.profile=dev
spring.cloud.config.label=config-muse
eureka.client.service-url.defaultZone=http://localhost:8000/eureka/
spring.cloud.config.discovery.enabled=true
spring.cloud.config.discovery.service-id=config-server# Actuator 暴露所有端点
management.endpoints.web.exposure.include=*
// ConfigClientApplication.java
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@SpringBootApplication
public class ConfigClientApplication {public static void main(String[] args) {SpringApplication.run(ConfigClientApplication.class, args);}@RestController@RefreshScope // 标记该类支持配置热刷新class ConfigController {@Value("${from}")private String from;@GetMapping("/from")public String getFrom() {return "配置值 from: " + from;}}
}
步骤3:启动服务并验证全局刷新流程
- 启动服务:依次启动 Eureka Server、Config Server、多个 Config Client(如端口 8081、8082)。
- 初始配置验证:访问
http://localhost:8081/from
和http://localhost:8082/from
,均返回 Git 中配置的from=git-dev-1.0
。 - 修改 Git 配置:将 Git 仓库中
order-dev.properties
的from
改为git-dev-2.0
并提交。 - 触发全局刷新:向 Config Server 发送 POST 请求
http://localhost:9000/actuator/bus-refresh
。 - 验证刷新结果:再次访问
http://localhost:8081/from
和http://localhost:8082/from
,均返回from=git-dev-2.0
,说明所有 Client 已全局刷新配置。
原理说明:消息总线如何实现全局刷新?
- Config Server 接收到
<font style="color:#DF2A3F;">bus-refresh</font>
请求后,会向 Kafka 的springCloudBus
主题发送刷新事件(RefreshRemoteApplicationEvent)。 - 所有订阅了
springCloudBus
主题的 Config Client 会消费该事件,触发自身配置刷新(结合@RefreshScope
实现热更新)。 - 若需验证 Kafka 消息,可通过 Kafka 控制台消费
springCloudBus
主题:
kafka-console-consumer.sh --topic springCloudBus --bootstrap-server localhost:9092
触发 bus-refresh
后,控制台会打印出刷新事件的 JSON 消息。
三、原理
Spring Cloud Bus 的底层基于 Spring 事件驱动模型,包含三个核心概念:
概念 | 作用 |
---|---|
事件(ApplicationEvent) | 表示“发生的事情”,如 RefreshRemoteApplicationEvent (配置刷新事件)。 |
事件监听器(ApplicationListener) | 监听并处理特定事件,如 RefreshListener 处理配置刷新逻辑。 |
事件发布者(ApplicationEventPublisher) | 发布事件,触发监听器执行,如 ApplicationEventMulticaster 负责广播事件。 |
1. 事件(ApplicationEvent)
Spring 中所有事件都继承自 ApplicationEvent
,Bus 中核心事件有:
RefreshRemoteApplicationEvent
:触发配置刷新的事件。AckRemoteApplicationEvent
:确认事件已接收的响应事件。
这些事件包含 type
(事件类型)、timestamp
(时间戳)、originService
(来源服务)等属性,可用于追踪和控制事件范围(如只刷新特定服务)。
2. 事件监听器(ApplicationListener)
需实现 ApplicationListener<E>
接口,其中 E
是要监听的事件类型。例如 RefreshListener
监听 RefreshRemoteApplicationEvent
,并执行配置刷新逻辑。
3. 事件发布者(ApplicationEventPublisher)
通过 ApplicationEventPublisher#publishEvent()
方法发布事件,ApplicationEventMulticaster
会将事件广播给所有匹配的监听器。
四、工作流原理
1. 触发刷新请求
当 Git 仓库的配置被修改后,我们向 Config Server 发送 <font style="color:rgba(0, 0, 0, 0.85) !important;">POST /actuator/bus-refresh</font>
请求(也可结合 Git Hook 自动触发)。
2. 消息广播(Config Server 发消息)
Config Server 接收到请求后,会向消息总线(Kafka/RabbitMQ)****的 **<font style="color:#DF2A3F;">springCloudBus</font>**
****主题广播一条 “配置刷新事件”(**<font style="color:rgb(0, 0, 0);">RefreshRemoteApplicationEvent</font>**
)。
3. 客户端接收消息(Config Client 消费消息)
所有集成了 Spring Cloud Bus 的 Config Client 都会订阅 <font style="color:rgba(0, 0, 0, 0.85) !important;">springCloudBus</font>
主题,当收到 “配置刷新事件” 后,会感知到 “需要刷新配置”。
4. 配置刷新(Client 执行刷新逻辑)
Config Client 内部通过 <font style="color:#DF2A3F;">@RefreshScope</font>
注解和 Actuator 机制,重新从 Config Server 拉取最新配置,并更新自身的配置信息,从而实现 “全局配置热刷新”。
简单来说,Bus 就像一个 “喇叭”:Config Server 对着喇叭喊 “配置更新了,大家快刷新”,所有订阅了这个 “喇叭频道” 的 Config Client 都会听到并执行刷新操作。
总结
Spring Cloud Bus 是微服务架构的“消息中枢”,通过集成 Kafka/RabbitMQ 实现配置全局刷新、服务广播通知等能力。其底层依赖 Spring 事件驱动模型,保证了事件的可靠发布与消费。
与“单个服务触发刷新”相比,Bus 让 Config Server 成为“刷新入口”,避免了服务单点故障对刷新流程的影响,是生产级微服务配置管理的关键组件。