当前位置: 首页 > news >正文

Spring Cloud Bus 事件广播机制

文章目录

  • ☁️ Spring Cloud Bus 事件广播机制
    • → 消息驱动配置刷新实现
    • 📋 目录
    • 🎯 一、Bus 核心设计目标与组件
      • 💡 设计目标
      • 🏗️ 核心架构组件
    • 🔄 二、消息传播机制(RabbitMQ/Kafka)
      • 🐇 RabbitMQ 集成实现
      • 📊 Kafka 集成实现
    • ⚡ 三、配置刷新链路分析
      • 🔄 配置刷新完整流程
      • 🔧 配置刷新核心实现
    • 🔧 四、/bus/refresh 端点原理
      • 🎯 刷新端点实现机制
    • 🛡️ 五、消息防重复与延迟处理
      • 🔄 消息去重机制
      • ⏱️ 消息延迟处理
    • 🤝 六、与 Cloud Stream 协同机制
      • 🔄 基于 Stream 的消息集成
    • 💡 七、生产环境最佳实践
      • 🔧 高可用配置
      • 🚀 性能优化建议

☁️ Spring Cloud Bus 事件广播机制

→ 消息驱动配置刷新实现

📋 目录

  • 🎯 一、Bus 核心设计目标与组件
  • 🔄 二、消息传播机制(RabbitMQ/Kafka)
  • ⚡ 三、配置刷新链路分析
  • 🔧 四、/bus/refresh 端点原理
  • 🛡️ 五、消息防重复与延迟处理
  • 🤝 六、与 Cloud Stream 协同机制
  • 💡 七、生产环境最佳实践

🎯 一、Bus 核心设计目标与组件

💡 设计目标

Spring Cloud Bus 的设计目标是通过轻量级消息代理连接分布式系统的各个节点,用于广播状态更改(如配置更改)或其他管理指令。它建立在Spring事件模型之上,通过消息中间件实现分布式事件的传播。

🏗️ 核心架构组件

Bus 核心组件关系图

graph TBA[应用程序实例1] --> B[消息代理 RabbitMQ/Kafka]A --> C[Spring Cloud Bus]C --> D[RemoteApplicationEvent]E[应用程序实例2] --> BE --> F[Spring Cloud Bus]F --> G[事件监听器]H[配置服务器] --> I[/bus/refresh端点]I --> Bstyle C fill:#bbdefb,stroke:#333style F fill:#bbdefb,stroke:#333style I fill:#c8e6c9,stroke:#333

核心组件源码分析

/*** Bus 自动配置类* 负责初始化Bus相关组件*/
@Configuration
@ConditionalOnBean(annotation = EnableBus.class)
@EnableConfigurationProperties(BusProperties.class)
@Slf4j
public class BusAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic BusEventRegistry busEventRegistry() {return new InMemoryBusEventRegistry();}@Bean@ConditionalOnMissingBeanpublic ApplicationEventPublisher applicationEventPublisher() {return new SimpleApplicationEventPublisher();}/*** 消息通道绑定*/@Bean@ConditionalOnMissingBeanpublic BindingService bindingService() {return new DefaultBindingService();}
}/*** 远程应用事件 - Bus的核心消息载体*/
public abstract class RemoteApplicationEvent extends ApplicationEvent {private final String originService;private final String destinationService;private final String id;public RemoteApplicationEvent(Object source, String originService, String destinationService) {super(source);this.originService = originService;this.destinationService = destinationService;this.id = UUID.randomUUID().toString();}// 获取事件起源服务public String getOriginService() {return originService;}// 获取事件目标服务public String getDestinationService() {return destinationService;}// 获取事件唯一IDpublic String getId() {return id;}
}/*** 环境变更事件 - 用于配置刷新*/
public class EnvironmentChangeRemoteApplicationEvent extends RemoteApplicationEvent {private final Map<String, String> values;public EnvironmentChangeRemoteApplicationEvent(Map<String, String> values, String originService, String destinationService) {super(values, originService, destinationService);this.values = values;}public Map<String, String> getValues() {return values;}
}

🔄 二、消息传播机制(RabbitMQ/Kafka)

🐇 RabbitMQ 集成实现

RabbitMQ 消息传播配置

/*** RabbitMQ Bus 实现*/
@Configuration
@ConditionalOnClass(RabbitTemplate.class)
@ConditionalOnProperty(name = "spring.cloud.bus.rabbit.enabled", havingValue = "true")
@Slf4j
public class RabbitBusAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setChannelTransacted(true);template.setMessageConverter(new Jackson2JsonMessageConverter());return template;}@Beanpublic Queue busQueue() {return new Queue("springCloudBus", true, false, false);}@Beanpublic Exchange busExchange() {return new TopicExchange("springCloudBus", true, false);}@Beanpublic Binding busBinding() {return BindingBuilder.bind(busQueue()).to(busExchange()).with("#").noargs();}/*** RabbitMQ 事件发送器*/@Component@Slf4jpublic class RabbitBusEventSender implements ApplicationEventPublisherAware {private final RabbitTemplate rabbitTemplate;private ApplicationEventPublisher applicationEventPublisher;@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.applicationEventPublisher = applicationEventPublisher;}@EventListenerpublic void acceptLocal(ApplicationEvent event) {if (event instanceof RemoteApplicationEvent) {RemoteApplicationEvent remoteEvent = (RemoteApplicationEvent) event;// 序列化事件并发送到RabbitMQMessage message = createMessage(remoteEvent);rabbitTemplate.convertAndSend("springCloudBus", "", message);log.debug("事件已发送到RabbitMQ: {}", remoteEvent.getId());}}private Message createMessage(RemoteApplicationEvent event) {try {ObjectMapper mapper = new ObjectMapper();byte[] bytes = mapper.writeValueAsBytes(event);return MessageBuilder.withBody(bytes).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();} catch (Exception e) {throw new RuntimeException("事件序列化失败", e);}}}/*** RabbitMQ 事件接收器*/@Component@Slf4jpublic class RabbitBusEventReceiver {private final ApplicationEventPublisher applicationEventPublisher;@RabbitListener(queues = "springCloudBus")public void receive(Message message) {try {RemoteApplicationEvent event = convertToEvent(message);if (event != null && !isEventFromSelf(event)) {applicationEventPublisher.publishEvent(event);log.debug("事件已接收并发布: {}", event.getId());}} catch (Exception e) {log.error("事件处理失败", e);}}private RemoteApplicationEvent convertToEvent(Message message) {try {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(message.getBody(), RemoteApplicationEvent.class);} catch (Exception e) {log.error("事件反序列化失败", e);return null;}}private boolean isEventFromSelf(RemoteApplicationEvent event) {// 避免处理自己发出的事件return event.getOriginService().equals(getCurrentServiceId());}}
}

📊 Kafka 集成实现

Kafka 消息传播配置

/*** Kafka Bus 实现*/
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@ConditionalOnProperty(name = "spring.cloud.bus.kafka.enabled", havingValue = "true")
@Slf4j
public class KafkaBusAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return new DefaultKafkaProducerFactory<>(props);}@Bean@ConditionalOnMissingBeanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** Kafka 事件发送器*/@Component@Slf4jpublic class KafkaBusEventSender {private final KafkaTemplate<String, Object> kafkaTemplate;@EventListenerpublic void acceptLocal(ApplicationEvent event) {if (event instanceof RemoteApplicationEvent) {RemoteApplicationEvent remoteEvent = (RemoteApplicationEvent) event;// 发送到Kafka主题kafkaTemplate.send("springCloudBus", remoteEvent);log.debug("事件已发送到Kafka: {}", remoteEvent.getId());}}}/*** Kafka 事件接收器*/@Component@Slf4jpublic class KafkaBusEventReceiver {private final ApplicationEventPublisher applicationEventPublisher;@KafkaListener(topics = "springCloudBus")public void receive(RemoteApplicationEvent event) {if (!isEventFromSelf(event)) {applicationEventPublisher.publishEvent(event);log.debug("事件已接收并发布: {}", event.getId());}}}
}

⚡ 三、配置刷新链路分析

🔄 配置刷新完整流程

配置刷新序列图

配置服务器/bus/refresh端点消息代理服务实例1服务实例2服务实例NPOST /bus/refresh发布RefreshRemoteApplicationEvent广播事件广播事件广播事件处理刷新事件重新获取配置刷新@RefreshScope Beans处理刷新事件重新获取配置刷新@RefreshScope Beans处理刷新事件重新获取配置刷新@RefreshScope Beans配置服务器/bus/refresh端点消息代理服务实例1服务实例2服务实例N

🔧 配置刷新核心实现

配置刷新事件处理

/*** 配置刷新事件处理器*/
@Component
@Slf4j
public class ConfigurationRefreshListener {private final ContextRefresher contextRefresher;private final EnvironmentManager environmentManager;@EventListenerpublic void handleRefreshEvent(RefreshRemoteApplicationEvent event) {log.info("接收到配置刷新事件: {}", event.getId());try {// 1. 刷新环境配置Set<String> changedKeys = contextRefresher.refresh();// 2. 记录刷新结果if (!changedKeys.isEmpty()) {log.info("配置刷新完成,变更的键: {}", changedKeys);// 3. 发布环境变更事件publishEnvironmentChangeEvent(changedKeys);} else {log.info("配置未发生变化");}} catch (Exception e) {log.error("配置刷新失败", e);throw new RuntimeException("配置刷新处理异常", e);}}/*** 环境刷新器实现*/@Component@Slf4jpublic class ContextRefresher {private final ConfigServicePropertySourceLocator propertySourceLocator;private final AtomicBoolean refreshInProgress = new AtomicBoolean(false);public synchronized Set<String> refresh() {if (!refreshInProgress.compareAndSet(false, true)) {log.warn("刷新操作正在进行中,跳过此次请求");return Collections.emptySet();}try {// 获取当前环境配置Map<String, Object> before = extractCurrentProperties();// 刷新环境refreshEnvironment();// 获取刷新后的配置Map<String, Object> after = extractCurrentProperties();// 计算变化的配置键return findChangedKeys(before, after);} finally {refreshInProgress.set(false);}}private void refreshEnvironment() {// 清空配置缓存propertySourceLocator.clearCache();// 重新加载配置propertySourceLocator.locate(environment);log.debug("环境配置刷新完成");}}
}

🔧 四、/bus/refresh 端点原理

🎯 刷新端点实现机制

/bus/refresh 端点源码分析

/*** Bus刷新端点实现* 提供HTTP接口触发配置刷新*/
@RestController
@Endpoint(id = "bus-refresh")
@Slf4j
public class BusRefreshEndpoint {private final ApplicationEventPublisher publisher;private final ServiceMatcher serviceMatcher;@WriteOperationpublic void busRefresh(@Selector String destination) {log.info("接收到总线刷新请求,目标: {}", destination);// 1. 创建刷新事件RefreshRemoteApplicationEvent event = new RefreshRemoteApplicationEvent(this, getCurrentServiceId(), destination);// 2. 发布事件到本地上下文publisher.publishEvent(event);log.info("刷新事件已发布: {}", event.getId());}@WriteOperationpublic void busRefresh() {// 全量刷新,目标服务为所有实例busRefresh("*");}/*** 刷新远程应用事件*/public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {public RefreshRemoteApplicationEvent(Object source, String originService, String destinationService) {super(source, originService, destinationService);}}
}/*** 刷新事件监听器*/
@Component
@Slf4j
public class RefreshEventListener {private final ContextRefresher contextRefresher;@EventListenerpublic void handleRefreshEvent(RefreshRemoteApplicationEvent event) {// 检查是否应该处理此事件if (shouldHandleEvent(event)) {log.info("开始处理刷新事件: {}", event.getId());try {// 执行配置刷新Set<String> changedKeys = contextRefresher.refresh();if (!changedKeys.isEmpty()) {log.info("配置刷新成功,变更配置项: {}", changedKeys);} else {log.info("配置未发生变化");}} catch (Exception e) {log.error("配置刷新失败", e);throw new RuntimeException("刷新处理异常", e);}}}private boolean shouldHandleEvent(RefreshRemoteApplicationEvent event) {// 检查事件目标是否匹配当前服务String destination = event.getDestinationService();return "*".equals(destination) || getCurrentServiceId().equals(destination);}
}

🛡️ 五、消息防重复与延迟处理

🔄 消息去重机制

消息重复消费防护

/*** 消息去重处理器* 防止同一事件被重复处理*/
@Component
@Slf4j
public class MessageDeduplicationHandler {private final Set<String> processedEventIds = Collections.synchronizedSet(new HashSet<>());private final long retentionTime = 300000; // 5分钟/*** 检查事件是否已处理*/public boolean isEventProcessed(String eventId) {cleanExpiredEvents();return processedEventIds.contains(eventId);}/*** 标记事件为已处理*/public void markEventAsProcessed(String eventId) {processedEventIds.add(eventId);// 定时清理过期事件Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {processedEventIds.remove(eventId);}}, retentionTime);}/*** 清理过期事件*/private void cleanExpiredEvents() {// 可实现基于时间的清理逻辑if (processedEventIds.size() > 10000) {processedEventIds.clear();}}
}/*** 带去重功能的事件监听器*/
@Component
@Slf4j
public class DeduplicationEventListener {private final MessageDeduplicationHandler deduplicationHandler;@EventListenerpublic void handleEvent(RemoteApplicationEvent event) {String eventId = event.getId();// 检查是否已处理if (deduplicationHandler.isEventProcessed(eventId)) {log.debug("事件已处理,跳过: {}", eventId);return;}// 处理事件try {processEvent(event);deduplicationHandler.markEventAsProcessed(eventId);} catch (Exception e) {log.error("事件处理失败: {}", eventId, e);}}
}

⏱️ 消息延迟处理

延迟消息处理机制

/*** 延迟消息处理器* 处理网络延迟导致的消息乱序问题*/
@Component
@Slf4j
public class DelayedMessageHandler {private final PriorityQueue<DelayedEvent> eventQueue = new PriorityQueue<>(Comparator.comparing(DelayedEvent::getTimestamp));private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();@PostConstructpublic void init() {// 启动延迟消息处理线程scheduler.scheduleAtFixedRate(this::processDelayedEvents, 1, 1, TimeUnit.SECONDS);}/*** 延迟处理事件*/public void delayEvent(RemoteApplicationEvent event, long delayMs) {DelayedEvent delayedEvent = new DelayedEvent(event, System.currentTimeMillis() + delayMs);synchronized (eventQueue) {eventQueue.offer(delayedEvent);}log.debug("事件延迟处理: {},延迟: {}ms", event.getId(), delayMs);}/*** 处理到期事件*/private void processDelayedEvents() {long currentTime = System.currentTimeMillis();List<DelayedEvent> readyEvents = new ArrayList<>();synchronized (eventQueue) {while (!eventQueue.isEmpty() && eventQueue.peek().getTimestamp() <= currentTime) {readyEvents.add(eventQueue.poll());}}for (DelayedEvent delayedEvent : readyEvents) {try {applicationEventPublisher.publishEvent(delayedEvent.getEvent());log.debug("延迟事件已发布: {}", delayedEvent.getEvent().getId());} catch (Exception e) {log.error("延迟事件处理失败", e);}}}@Data@AllArgsConstructorprivate static class DelayedEvent {private final RemoteApplicationEvent event;private final long timestamp;}
}

🤝 六、与 Cloud Stream 协同机制

🔄 基于 Stream 的消息集成

Cloud Stream 集成配置

# application.yml
spring:cloud:stream:# 绑定配置bindings:# 输入通道springCloudBusInput:destination: springCloudBusgroup: ${spring.application.name}content-type: application/json# 输出通道  springCloudBusOutput:destination: springCloudBuscontent-type: application/json# Kafka配置kafka:binder:brokers: ${KAFKA_BROKERS:localhost:9092}auto-create-topics: truebindings:springCloudBusInput:consumer:autoCommitOffset: truespringCloudBusOutput:producer:sync: true# RabbitMQ配置rabbit:bindings:springCloudBusInput:consumer:autoBindDlq: truerepublishToDlq: truespringCloudBusOutput:producer:autoBindDlq: true

Stream 事件通道实现

/*** 基于Cloud Stream的事件通道*/
@Component
@Slf4j
@EnableBinding(BusEventChannels.class)
public class StreamBusEventChannel {/*** 事件通道定义*/public interface BusEventChannels {String INPUT = "springCloudBusInput";String OUTPUT = "springCloudBusOutput";@Input(INPUT)SubscribableChannel input();@Output(OUTPUT)MessageChannel output();}/*** 事件发送服务*/@Service@Slf4jpublic class StreamBusEventSender {private final BusEventChannels channels;public void sendEvent(RemoteApplicationEvent event) {try {Message<?> message = MessageBuilder.withPayload(event).setHeader("type", event.getClass().getName()).setHeader("timestamp", System.currentTimeMillis()).build();channels.output().send(message);log.debug("事件已通过Stream发送: {}", event.getId());} catch (Exception e) {log.error("Stream事件发送失败", e);}}}/*** 事件接收服务*/@Service@Slf4jpublic class StreamBusEventReceiver {@StreamListener(BusEventChannels.INPUT)public void receive(Message<?> message) {try {RemoteApplicationEvent event = (RemoteApplicationEvent) message.getPayload();// 检查事件来源,避免循环if (!isEventFromSelf(event)) {applicationEventPublisher.publishEvent(event);log.debug("事件已通过Stream接收: {}", event.getId());}} catch (Exception e) {log.error("Stream事件接收失败", e);}}}
}

💡 七、生产环境最佳实践

🔧 高可用配置

生产级 Bus 配置

# 高可用配置
spring:cloud:bus:# 总线配置enabled: true# 追踪配置trace:enabled: true# 刷新配置refresh:enabled: true# 消息代理选择rabbit:enabled: truekafka:enabled: false# RabbitMQ 高可用配置  rabbitmq:host: ${RABBITMQ_HOST:localhost}port: ${RABBITMQ_PORT:5672}username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /# 连接池配置connection-timeout: 10000# 高可用配置addresses: ${RABBITMQ_CLUSTER:localhost:5672,localhost:5673,localhost:5674}# 镜像队列publisher-confirms: truepublisher-returns: true# 健康检查配置
management:health:rabbit:enabled: trueendpoints:web:exposure:include: health,info,bus-refreshendpoint:health:show-details: always# 监控配置
metrics:export:rabbitmq:enabled: true

🚀 性能优化建议

性能调优配置

/*** Bus 性能优化配置*/
@Configuration
@Slf4j
public class BusPerformanceConfig {/*** 连接池配置*/@Bean@Primarypublic CachingConnectionFactory connectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory();factory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);factory.setChannelCacheSize(25);factory.setChannelCheckoutTimeout(1000);return factory;}/*** 消息确认配置*/@Beanpublic RabbitTemplate.ConfirmCallback confirmCallback() {return (correlationData, ack, cause) -> {if (!ack) {log.warn("消息发送失败: {}", cause);}};}/*** 批量消息处理*/@Beanpublic BatchingStrategy batchingStrategy() {return new SimpleBatchingStrategy(10, 10000, 1000);}/*** 异步事件处理*/@Beanpublic ApplicationEventMulticaster applicationEventMulticaster() {SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();multicaster.setTaskExecutor(taskExecutor());return multicaster;}@Beanpublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(1000);executor.setThreadNamePrefix("bus-event-");return executor;}
}

架构师洞察:Spring Cloud Bus 是微服务配置动态刷新的核心基础设施。合理的消息去重、延迟处理和故障恢复机制,是保证配置刷新可靠性的关键。理解事件传播机制和性能优化点,才能在生产环境中构建稳定高效的配置更新体系。


如果觉得本文对你有帮助,请点击 👍 点赞 + ⭐ 收藏 + 💬 留言支持!

讨论话题

  1. 你在生产环境中如何保证配置刷新的可靠性?
  2. 在微服务架构中,如何设计消息去重和延迟处理机制?
  3. 面对网络分区情况,Bus 应该如何保证消息的最终一致性?

相关资源推荐

  • 📚 https://spring.io/projects/spring-cloud-bus
  • 🔧 https://github.com/example/spring-cloud-bus-demo
  • 💻 https://www.rabbitmq.com/ha.html
http://www.dtcms.com/a/596281.html

相关文章:

  • 广州巨腾建网站公司郑州网站app开发
  • 银河麒麟服务器安装图形化界面
  • 【源码+文档+调试讲解】基于Spring Boot的考务管理系统设计与实现 085
  • LeetCode 421 - 数组中两个数的最大异或值
  • 【笔记】xFormers版本与PyTorch、CUDA对应关系及正确安装方法详解
  • 【GitHub每日速递 20251111】PyTorch:GPU加速、动态网络,深度学习平台的不二之选!
  • 多产品的网站怎么做seo做音乐网站之前的准备
  • 网站如何做h5动态页面设计万网备案初审过了后网站能访问吗
  • centos运维常用命令
  • 在CentOS 7.6系统中找回或重置 root 密码
  • 濮阳团购网站建设手机网站模板psd
  • 基于Spring Boot的电子犬证管理系统设计与实现
  • Spring Boot 中的定时任务:从基础调度到高可用实践
  • 家装设计师网站wordpress小清新模板
  • 用WordPress制作单页相城seo网站优化软件
  • wordpress主题wpmee江门网站优化排名
  • 淮安设计网站苏州网站建设相关技术
  • 公司的网站开发费计入什么科目济南传承网络李聪
  • 营销类型的公司网站物联网平台功能
  • 做网站设计都需要什么杭州建设信息网
  • 惠州网站设计哪家好网站内的搜索怎么做的
  • 网站域名使用费用上海十大猎头公司排名
  • 网站建站程序wordpress salient
  • 舞蹈网站模板权威做网站的公司
  • 互联网 创新创业大赛seo推广培训中心
  • 广西网站建设-好发信息网建设银行网站e动终端
  • 建站网哪个好微信公众号调用WordPress
  • 广州网站建设比较好的公司主营网站建设会计记账
  • 招生网站建设板块网站建设的针对对象
  • 成都访问公司网站吉安工商注册官方网站