当前位置: 首页 > news >正文

Spring Boot消息队列与事件驱动详解

文章目录

    • 1. 消息队列概述
      • 1.1 消息队列优势
      • 1.2 消息队列模式
      • 1.3 核心依赖
    • 2. RabbitMQ消息队列
      • 2.1 RabbitMQ配置
      • 2.2 消息生产者
      • 2.3 消息消费者
      • 2.4 消息配置
    • 3. Kafka消息队列
      • 3.1 Kafka配置
      • 3.2 Kafka生产者
      • 3.3 Kafka消费者
    • 4. 事件驱动架构
      • 4.1 事件定义
      • 4.2 事件发布器
      • 4.3 事件处理器
    • 5. WebSocket实时通信
      • 5.1 WebSocket配置
      • 5.2 WebSocket控制器
      • 5.3 WebSocket服务
    • 6. 消息队列监控
      • 6.1 消息监控
      • 6.2 消息统计
    • 7. 消息队列最佳实践
      • 7.1 消息设计
      • 7.2 消息处理策略
    • 8. 总结

1. 消息队列概述

消息队列是分布式系统中重要的通信机制,通过异步消息传递实现系统解耦、提高可靠性和扩展性。Spring Boot提供了完整的消息队列解决方案。废话不多说,接下来直接手撕代码!满满的干货

1.1 消息队列优势

  • 系统解耦:服务间通过消息通信,降低耦合度
  • 异步处理:提高系统响应速度和吞吐量
  • 削峰填谷:处理突发流量,保护下游系统
  • 可靠性:消息持久化,确保消息不丢失
  • 扩展性:支持水平扩展和负载均衡

1.2 消息队列模式

  • 点对点模式:一对一消息传递
  • 发布订阅模式:一对多消息广播
  • 请求响应模式:同步消息处理
  • 事件驱动模式:基于事件的异步处理

1.3 核心依赖

<dependencies><!-- Spring Boot AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- Spring Boot WebSocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
</dependencies>

2. RabbitMQ消息队列

2.1 RabbitMQ配置

# application.yml
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 15000publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:acknowledge-mode: manualconcurrency: 5max-concurrency: 10prefetch: 1

2.2 消息生产者

package com.example.demo.messaging;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MessageConverter messageConverter;// 发送简单消息public void sendMessage(String message) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message);}// 发送对象消息public void sendObjectMessage(Object message) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message);}// 发送带确认的消息public void sendMessageWithConfirm(String message) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());return msg;});}// 发送延迟消息public void sendDelayedMessage(String message, long delay) {rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setDelay((int) delay);return msg;});}// 发送批量消息public void sendBatchMessages(List<String> messages) {for (String message : messages) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message);}}
}

2.3 消息消费者

package com.example.demo.messaging;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;@Component
public class MessageConsumer {// 简单消息消费@RabbitListener(queues = "simple.queue")public void handleSimpleMessage(String message) {System.out.println("收到消息: " + message);}// 对象消息消费@RabbitListener(queues = "object.queue")public void handleObjectMessage(@Payload Object message) {System.out.println("收到对象消息: " + message);}// 带确认的消息消费@RabbitListener(queues = "confirm.queue")public void handleConfirmMessage(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws IOException {try {System.out.println("处理消息: " + message);// 处理业务逻辑processMessage(message);// 手动确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}}// 延迟消息消费@RabbitListener(queues = "delayed.queue")public void handleDelayedMessage(String message) {System.out.println("收到延迟消息: " + message);}private void processMessage(String message) {// 实现消息处理逻辑}
}

2.4 消息配置

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 交换机配置@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct.exchange");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic.exchange");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange");}// 队列配置@Beanpublic Queue simpleQueue() {return QueueBuilder.durable("simple.queue").build();}@Beanpublic Queue objectQueue() {return QueueBuilder.durable("object.queue").build();}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable("confirm.queue").build();}@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed.queue").withArgument("x-message-ttl", 60000).build();}// 绑定配置@Beanpublic Binding simpleBinding() {return BindingBuilder.bind(simpleQueue()).to(directExchange()).with("routing.key");}@Beanpublic Binding objectBinding() {return BindingBuilder.bind(objectQueue()).to(directExchange()).with("object.routing.key");}@Beanpublic Binding confirmBinding() {return BindingBuilder.bind(confirmQueue()).to(directExchange()).with("confirm.routing.key");}// 消息转换器@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// RabbitTemplate配置@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(messageConverter());template.setMandatory(true);template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("消息发送失败: " + replyText);});template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功");} else {System.err.println("消息发送失败: " + cause);}});return template;}// 监听器容器配置@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConcurrentConsumers(5);factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(1);return factory;}
}

3. Kafka消息队列

3.1 Kafka配置

# application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: falsemax-poll-records: 500producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3batch-size: 16384linger-ms: 5buffer-memory: 33554432

3.2 Kafka生产者

package com.example.demo.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送简单消息public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}// 发送带键的消息public void sendMessageWithKey(String topic, String key, String message) {kafkaTemplate.send(topic, key, message);}// 发送对象消息public void sendObjectMessage(String topic, Object message) {kafkaTemplate.send(topic, message);}// 发送带回调的消息public void sendMessageWithCallback(String topic, String message) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("消息发送成功: " + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("消息发送失败: " + ex.getMessage());}});}// 发送批量消息public void sendBatchMessages(String topic, List<String> messages) {for (String message : messages) {kafkaTemplate.send(topic, message);}}
}

3.3 Kafka消费者

package com.example.demo.kafka;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.List;@Component
public class KafkaConsumer {// 简单消息消费@KafkaListener(topics = "simple-topic", groupId = "my-group")public void handleSimpleMessage(String message) {System.out.println("收到消息: " + message);}// 带键的消息消费@KafkaListener(topics = "key-topic", groupId = "my-group")public void handleMessageWithKey(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {System.out.println("收到消息 - Key: " + key + ", Value: " + message);}// 对象消息消费@KafkaListener(topics = "object-topic", groupId = "my-group")public void handleObjectMessage(@Payload Object message) {System.out.println("收到对象消息: " + message);}// 手动确认消息@KafkaListener(topics = "manual-topic", groupId = "my-group")public void handleManualMessage(@Payload String message, Acknowledgment ack) {try {System.out.println("处理消息: " + message);// 处理业务逻辑processMessage(message);// 手动确认消息ack.acknowledge();} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());}}// 批量消息消费@KafkaListener(topics = "batch-topic", groupId = "my-group")public void handleBatchMessages(@Payload List<String> messages) {System.out.println("收到批量消息,数量: " + messages.size());for (String message : messages) {System.out.println("处理消息: " + message);}}private void processMessage(String message) {// 实现消息处理逻辑}
}

4. 事件驱动架构

4.1 事件定义

package com.example.demo.event;import java.time.LocalDateTime;
import java.util.UUID;public abstract class DomainEvent {private final String eventId;private final LocalDateTime occurredOn;private final String eventType;public DomainEvent() {this.eventId = UUID.randomUUID().toString();this.occurredOn = LocalDateTime.now();this.eventType = this.getClass().getSimpleName();}public String getEventId() { return eventId; }public LocalDateTime getOccurredOn() { return occurredOn; }public String getEventType() { return eventType; }
}// 用户创建事件
public class UserCreatedEvent extends DomainEvent {private final Long userId;private final String username;private final String email;public UserCreatedEvent(Long userId, String username, String email) {super();this.userId = userId;this.username = username;this.email = email;}public Long getUserId() { return userId; }public String getUsername() { return username; }public String getEmail() { return email; }
}// 订单创建事件
public class OrderCreatedEvent extends DomainEvent {private final Long orderId;private final Long userId;private final String orderNumber;public OrderCreatedEvent(Long orderId, Long userId, String orderNumber) {super();this.orderId = orderId;this.userId = userId;this.orderNumber = orderNumber;}public Long getOrderId() { return orderId; }public Long getUserId() { return userId; }public String getOrderNumber() { return orderNumber; }
}

4.2 事件发布器

package com.example.demo.event;import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;@Component
public class DomainEventPublisher {private final ApplicationEventPublisher eventPublisher;public DomainEventPublisher(ApplicationEventPublisher eventPublisher) {this.eventPublisher = eventPublisher;}@Transactionalpublic void publishEvent(DomainEvent event) {eventPublisher.publishEvent(event);}@Transactionalpublic void publishUserCreated(Long userId, String username, String email) {UserCreatedEvent event = new UserCreatedEvent(userId, username, email);publishEvent(event);}@Transactionalpublic void publishOrderCreated(Long orderId, Long userId, String orderNumber) {OrderCreatedEvent event = new OrderCreatedEvent(orderId, userId, orderNumber);publishEvent(event);}
}

4.3 事件处理器

package com.example.demo.event;import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;@Component
public class DomainEventHandler {// 处理用户创建事件@EventListener@Asyncpublic CompletableFuture<Void> handleUserCreated(UserCreatedEvent event) {System.out.println("处理用户创建事件: " + event.getUserId());// 发送欢迎邮件sendWelcomeEmail(event.getEmail());// 初始化用户配置initializeUserSettings(event.getUserId());return CompletableFuture.completedFuture(null);}// 处理订单创建事件@EventListener@Asyncpublic CompletableFuture<Void> handleOrderCreated(OrderCreatedEvent event) {System.out.println("处理订单创建事件: " + event.getOrderId());// 发送订单确认邮件sendOrderConfirmation(event.getUserId(), event.getOrderNumber());// 更新库存updateInventory(event.getOrderId());return CompletableFuture.completedFuture(null);}private void sendWelcomeEmail(String email) {// 实现欢迎邮件发送}private void initializeUserSettings(Long userId) {// 实现用户设置初始化}private void sendOrderConfirmation(Long userId, String orderNumber) {// 实现订单确认邮件发送}private void updateInventory(Long orderId) {// 实现库存更新}
}

5. WebSocket实时通信

5.1 WebSocket配置

package com.example.demo.websocket;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {config.enableSimpleBroker("/topic", "/queue");config.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}

5.2 WebSocket控制器

package com.example.demo.websocket;import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.beans.factory.annotation.Autowired;
import java.security.Principal;@Controller
public class WebSocketController {@Autowiredprivate SimpMessagingTemplate messagingTemplate;// 广播消息@MessageMapping("/broadcast")@SendTo("/topic/messages")public String broadcastMessage(String message) {return "广播消息: " + message;}// 点对点消息@MessageMapping("/private")public void sendPrivateMessage(String message, Principal principal) {String username = principal.getName();messagingTemplate.convertAndSendToUser(username, "/queue/private", "私有消息: " + message);}// 群组消息@MessageMapping("/group")public void sendGroupMessage(String message, String groupId) {messagingTemplate.convertAndSend("/topic/group/" + groupId, "群组消息: " + message);}
}

5.3 WebSocket服务

package com.example.demo.websocket;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Service
public class WebSocketService {@Autowiredprivate SimpMessagingTemplate messagingTemplate;private final Map<String, String> userSessions = new ConcurrentHashMap<>();// 用户上线public void userOnline(String username, String sessionId) {userSessions.put(username, sessionId);messagingTemplate.convertAndSend("/topic/online", "用户 " + username + " 上线了");}// 用户下线public void userOffline(String username) {userSessions.remove(username);messagingTemplate.convertAndSend("/topic/online", "用户 " + username + " 下线了");}// 发送系统通知public void sendSystemNotification(String message) {messagingTemplate.convertAndSend("/topic/notification", message);}// 发送用户通知public void sendUserNotification(String username, String message) {messagingTemplate.convertAndSendToUser(username, "/queue/notification", message);}// 获取在线用户数public int getOnlineUserCount() {return userSessions.size();}
}

6. 消息队列监控

6.1 消息监控

package com.example.demo.monitoring;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.HashMap;@Service
public class MessageQueueMonitor {@Autowiredprivate RabbitTemplate rabbitTemplate;// 获取队列信息public Map<String, Object> getQueueInfo(String queueName) {Map<String, Object> info = new HashMap<>();try {// 获取队列消息数long messageCount = rabbitTemplate.execute(channel -> {return channel.messageCount(queueName);});// 获取消费者数long consumerCount = rabbitTemplate.execute(channel -> {return channel.consumerCount(queueName);});info.put("queueName", queueName);info.put("messageCount", messageCount);info.put("consumerCount", consumerCount);info.put("status", "healthy");} catch (Exception e) {info.put("queueName", queueName);info.put("status", "error");info.put("error", e.getMessage());}return info;}// 获取所有队列信息public Map<String, Object> getAllQueuesInfo() {Map<String, Object> allInfo = new HashMap<>();String[] queues = {"simple.queue", "object.queue", "confirm.queue"};for (String queue : queues) {allInfo.put(queue, getQueueInfo(queue));}return allInfo;}
}

6.2 消息统计

package com.example.demo.monitoring;import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicLong;@Component
public class MessageStatistics {private final Counter messageSentCounter;private final Counter messageReceivedCounter;private final Counter messageErrorCounter;private final AtomicLong messageProcessingTime;@Autowiredpublic MessageStatistics(MeterRegistry meterRegistry) {this.messageSentCounter = Counter.builder("messages.sent").description("Number of messages sent").register(meterRegistry);this.messageReceivedCounter = Counter.builder("messages.received").description("Number of messages received").register(meterRegistry);this.messageErrorCounter = Counter.builder("messages.errors").description("Number of message processing errors").register(meterRegistry);this.messageProcessingTime = meterRegistry.gauge("messages.processing.time", new AtomicLong(0));}public void incrementSentMessages() {messageSentCounter.increment();}public void incrementReceivedMessages() {messageReceivedCounter.increment();}public void incrementErrorMessages() {messageErrorCounter.increment();}public void recordProcessingTime(long time) {messageProcessingTime.set(time);}
}

7. 消息队列最佳实践

7.1 消息设计

package com.example.demo.bestpractice;import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.HashMap;public class MessageTemplate {@JsonProperty("messageId")private String messageId;@JsonProperty("messageType")private String messageType;@JsonProperty("timestamp")private LocalDateTime timestamp;@JsonProperty("payload")private Object payload;@JsonProperty("headers")private Map<String, Object> headers;public MessageTemplate() {this.timestamp = LocalDateTime.now();this.headers = new HashMap<>();}public MessageTemplate(String messageType, Object payload) {this();this.messageType = messageType;this.payload = payload;}// 添加头部信息public MessageTemplate addHeader(String key, Object value) {this.headers.put(key, value);return this;}// 获取头部信息public Object getHeader(String key) {return this.headers.get(key);}// getter和setter方法public String getMessageId() { return messageId; }public void setMessageId(String messageId) { this.messageId = messageId; }public String getMessageType() { return messageType; }public void setMessageType(String messageType) { this.messageType = messageType; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public Object getPayload() { return payload; }public void setPayload(Object payload) { this.payload = payload; }public Map<String, Object> getHeaders() { return headers; }public void setHeaders(Map<String, Object> headers) { this.headers = headers; }
}

7.2 消息处理策略

package com.example.demo.bestpractice;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;@Component
public class MessageProcessingStrategy {private final AtomicInteger retryCount = new AtomicInteger(0);private final int maxRetries = 3;// 重试策略@RabbitListener(queues = "retry.queue")public void handleMessageWithRetry(String message) {try {processMessage(message);retryCount.set(0); // 重置重试计数} catch (Exception e) {int currentRetry = retryCount.incrementAndGet();if (currentRetry <= maxRetries) {System.out.println("消息处理失败,重试第 " + currentRetry + " 次: " + e.getMessage());// 重新发送消息到重试队列resendMessage(message);} else {System.err.println("消息处理失败,已达到最大重试次数: " + e.getMessage());// 发送到死信队列sendToDeadLetterQueue(message, e);}}}// 幂等性处理@RabbitListener(queues = "idempotent.queue")public void handleIdempotentMessage(String message) {String messageId = extractMessageId(message);if (isMessageProcessed(messageId)) {System.out.println("消息已处理,跳过: " + messageId);return;}try {processMessage(message);markMessageAsProcessed(messageId);} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());}}// 批量处理@RabbitListener(queues = "batch.queue", containerFactory = "batchListenerContainerFactory")public void handleBatchMessages(List<String> messages) {System.out.println("批量处理消息,数量: " + messages.size());for (String message : messages) {try {processMessage(message);} catch (Exception e) {System.err.println("批量消息处理失败: " + e.getMessage());}}}private void processMessage(String message) {// 实现消息处理逻辑}private void resendMessage(String message) {// 实现消息重新发送}private void sendToDeadLetterQueue(String message, Exception e) {// 实现死信队列发送}private String extractMessageId(String message) {// 实现消息ID提取return null;}private boolean isMessageProcessed(String messageId) {// 实现消息处理状态检查return false;}private void markMessageAsProcessed(String messageId) {// 实现消息处理状态标记}
}

8. 总结

Spring Boot消息队列与事件驱动提供了完整的异步通信解决方案:

  1. 消息队列:RabbitMQ、Kafka等消息中间件
  2. 事件驱动:领域事件、事件发布、事件处理
  3. 实时通信:WebSocket、STOMP协议
  4. 监控运维:消息监控、性能统计、错误处理
  5. 最佳实践:消息设计、处理策略、幂等性、批量处理

通过合理使用这些技术,可以构建出高可用、高性能的分布式系统。


http://www.dtcms.com/a/491723.html

相关文章:

  • sql中连接方式
  • 个人网站转为企业网站百度推广怎么登录
  • 模型预估值分布
  • YOLOv1与YOLOv2:目标检测的快速进化之路
  • 建设网站用什么软件排版网站服务器怎么做的
  • 《算法通关指南---OJ题目常见的错误效果》
  • 好看的创意网站设计蓝牙小程序开发教程
  • 高阶数据结构 --- Trie 树
  • PCIe协议之 flit 模式 之 flit bytes placing 图示说明
  • 如何做网站大管家Apple 手机网站制作
  • Unity 导出 AAR包 到 Android 项目并以 Fragment渲染显示
  • 把 AI“种”进闪存:基于极值量化与分块蒸馏的 7B 大模型 U 盘部署实战
  • 中兴电信B860AV3.2-T/B860AV3.1-T2(S905L3SB)2+8G_安卓9.0_线刷固件包
  • 网站建设主要工作内容动漫制作专业一定要艺术生吗
  • .livp,.HEIC格式图片转换成jpg格式图片
  • NewStarCTF2025-Week1-Web
  • 网站根目录 本地共享阿里指数在哪里看
  • 浏阳市商务局网站溪江农贸市场建设有什么平台可以发广告
  • FPGA强化-VGA显示设计与验证
  • 【2025最新】ArcGIS for JavaScript 快速实现热力图渲染
  • 怎么设置网站的logowordpress通知邮件美化
  • SpringCloud-Gateway实战使用与深度源码分析
  • 上海网站建设|网站制作浙江新手网络推广
  • 健康管理实训室厂家报价:精准明细,按需提供
  • Git学习笔记(三)
  • 通达信组合平台
  • 怎么做微网站推广泉州建设银行网站
  • 企业网站形象建设企业申请完域名以后 怎么把网站运行起来
  • 序列的力量——Python 内置方法的魔法解密
  • 跨数据源操作