Spring Boot消息队列与事件驱动详解
文章目录
- 1. 消息队列概述
- 1.1 消息队列优势
- 1.2 消息队列模式
- 1.3 核心依赖
- 2. RabbitMQ消息队列
- 2.1 RabbitMQ配置
- 2.2 消息生产者
- 2.3 消息消费者
- 2.4 消息配置
- 3. Kafka消息队列
- 3.1 Kafka配置
- 3.2 Kafka生产者
- 3.3 Kafka消费者
- 4. 事件驱动架构
- 4.1 事件定义
- 4.2 事件发布器
- 4.3 事件处理器
- 5. WebSocket实时通信
- 5.1 WebSocket配置
- 5.2 WebSocket控制器
- 5.3 WebSocket服务
- 6. 消息队列监控
- 6.1 消息监控
- 6.2 消息统计
- 7. 消息队列最佳实践
- 7.1 消息设计
- 7.2 消息处理策略
- 8. 总结
1. 消息队列概述
消息队列是分布式系统中重要的通信机制,通过异步消息传递实现系统解耦、提高可靠性和扩展性。Spring Boot提供了完整的消息队列解决方案。废话不多说,接下来直接手撕代码!满满的干货
1.1 消息队列优势
- 系统解耦:服务间通过消息通信,降低耦合度
- 异步处理:提高系统响应速度和吞吐量
- 削峰填谷:处理突发流量,保护下游系统
- 可靠性:消息持久化,确保消息不丢失
- 扩展性:支持水平扩展和负载均衡
1.2 消息队列模式
- 点对点模式:一对一消息传递
- 发布订阅模式:一对多消息广播
- 请求响应模式:同步消息处理
- 事件驱动模式:基于事件的异步处理
1.3 核心依赖
<dependencies><!-- Spring Boot AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- Spring Boot WebSocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
</dependencies>
2. RabbitMQ消息队列
2.1 RabbitMQ配置
# application.yml
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 15000publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:acknowledge-mode: manualconcurrency: 5max-concurrency: 10prefetch: 1
2.2 消息生产者
package com.example.demo.messaging;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MessageConverter messageConverter;// 发送简单消息public void sendMessage(String message) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message);}// 发送对象消息public void sendObjectMessage(Object message) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message);}// 发送带确认的消息public void sendMessageWithConfirm(String message) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());return msg;});}// 发送延迟消息public void sendDelayedMessage(String message, long delay) {rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {msg.getMessageProperties().setDelay((int) delay);return msg;});}// 发送批量消息public void sendBatchMessages(List<String> messages) {for (String message : messages) {rabbitTemplate.convertAndSend("direct.exchange", "routing.key", message);}}
}
2.3 消息消费者
package com.example.demo.messaging;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;@Component
public class MessageConsumer {// 简单消息消费@RabbitListener(queues = "simple.queue")public void handleSimpleMessage(String message) {System.out.println("收到消息: " + message);}// 对象消息消费@RabbitListener(queues = "object.queue")public void handleObjectMessage(@Payload Object message) {System.out.println("收到对象消息: " + message);}// 带确认的消息消费@RabbitListener(queues = "confirm.queue")public void handleConfirmMessage(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws IOException {try {System.out.println("处理消息: " + message);// 处理业务逻辑processMessage(message);// 手动确认消息channel.basicAck(deliveryTag, false);} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}}// 延迟消息消费@RabbitListener(queues = "delayed.queue")public void handleDelayedMessage(String message) {System.out.println("收到延迟消息: " + message);}private void processMessage(String message) {// 实现消息处理逻辑}
}
2.4 消息配置
package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 交换机配置@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct.exchange");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic.exchange");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange");}// 队列配置@Beanpublic Queue simpleQueue() {return QueueBuilder.durable("simple.queue").build();}@Beanpublic Queue objectQueue() {return QueueBuilder.durable("object.queue").build();}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable("confirm.queue").build();}@Beanpublic Queue delayedQueue() {return QueueBuilder.durable("delayed.queue").withArgument("x-message-ttl", 60000).build();}// 绑定配置@Beanpublic Binding simpleBinding() {return BindingBuilder.bind(simpleQueue()).to(directExchange()).with("routing.key");}@Beanpublic Binding objectBinding() {return BindingBuilder.bind(objectQueue()).to(directExchange()).with("object.routing.key");}@Beanpublic Binding confirmBinding() {return BindingBuilder.bind(confirmQueue()).to(directExchange()).with("confirm.routing.key");}// 消息转换器@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// RabbitTemplate配置@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(messageConverter());template.setMandatory(true);template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("消息发送失败: " + replyText);});template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功");} else {System.err.println("消息发送失败: " + cause);}});return template;}// 监听器容器配置@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConcurrentConsumers(5);factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(1);return factory;}
}
3. Kafka消息队列
3.1 Kafka配置
# application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: falsemax-poll-records: 500producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allretries: 3batch-size: 16384linger-ms: 5buffer-memory: 33554432
3.2 Kafka生产者
package com.example.demo.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送简单消息public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}// 发送带键的消息public void sendMessageWithKey(String topic, String key, String message) {kafkaTemplate.send(topic, key, message);}// 发送对象消息public void sendObjectMessage(String topic, Object message) {kafkaTemplate.send(topic, message);}// 发送带回调的消息public void sendMessageWithCallback(String topic, String message) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("消息发送成功: " + result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("消息发送失败: " + ex.getMessage());}});}// 发送批量消息public void sendBatchMessages(String topic, List<String> messages) {for (String message : messages) {kafkaTemplate.send(topic, message);}}
}
3.3 Kafka消费者
package com.example.demo.kafka;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.List;@Component
public class KafkaConsumer {// 简单消息消费@KafkaListener(topics = "simple-topic", groupId = "my-group")public void handleSimpleMessage(String message) {System.out.println("收到消息: " + message);}// 带键的消息消费@KafkaListener(topics = "key-topic", groupId = "my-group")public void handleMessageWithKey(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {System.out.println("收到消息 - Key: " + key + ", Value: " + message);}// 对象消息消费@KafkaListener(topics = "object-topic", groupId = "my-group")public void handleObjectMessage(@Payload Object message) {System.out.println("收到对象消息: " + message);}// 手动确认消息@KafkaListener(topics = "manual-topic", groupId = "my-group")public void handleManualMessage(@Payload String message, Acknowledgment ack) {try {System.out.println("处理消息: " + message);// 处理业务逻辑processMessage(message);// 手动确认消息ack.acknowledge();} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());}}// 批量消息消费@KafkaListener(topics = "batch-topic", groupId = "my-group")public void handleBatchMessages(@Payload List<String> messages) {System.out.println("收到批量消息,数量: " + messages.size());for (String message : messages) {System.out.println("处理消息: " + message);}}private void processMessage(String message) {// 实现消息处理逻辑}
}
4. 事件驱动架构
4.1 事件定义
package com.example.demo.event;import java.time.LocalDateTime;
import java.util.UUID;public abstract class DomainEvent {private final String eventId;private final LocalDateTime occurredOn;private final String eventType;public DomainEvent() {this.eventId = UUID.randomUUID().toString();this.occurredOn = LocalDateTime.now();this.eventType = this.getClass().getSimpleName();}public String getEventId() { return eventId; }public LocalDateTime getOccurredOn() { return occurredOn; }public String getEventType() { return eventType; }
}// 用户创建事件
public class UserCreatedEvent extends DomainEvent {private final Long userId;private final String username;private final String email;public UserCreatedEvent(Long userId, String username, String email) {super();this.userId = userId;this.username = username;this.email = email;}public Long getUserId() { return userId; }public String getUsername() { return username; }public String getEmail() { return email; }
}// 订单创建事件
public class OrderCreatedEvent extends DomainEvent {private final Long orderId;private final Long userId;private final String orderNumber;public OrderCreatedEvent(Long orderId, Long userId, String orderNumber) {super();this.orderId = orderId;this.userId = userId;this.orderNumber = orderNumber;}public Long getOrderId() { return orderId; }public Long getUserId() { return userId; }public String getOrderNumber() { return orderNumber; }
}
4.2 事件发布器
package com.example.demo.event;import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;@Component
public class DomainEventPublisher {private final ApplicationEventPublisher eventPublisher;public DomainEventPublisher(ApplicationEventPublisher eventPublisher) {this.eventPublisher = eventPublisher;}@Transactionalpublic void publishEvent(DomainEvent event) {eventPublisher.publishEvent(event);}@Transactionalpublic void publishUserCreated(Long userId, String username, String email) {UserCreatedEvent event = new UserCreatedEvent(userId, username, email);publishEvent(event);}@Transactionalpublic void publishOrderCreated(Long orderId, Long userId, String orderNumber) {OrderCreatedEvent event = new OrderCreatedEvent(orderId, userId, orderNumber);publishEvent(event);}
}
4.3 事件处理器
package com.example.demo.event;import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;@Component
public class DomainEventHandler {// 处理用户创建事件@EventListener@Asyncpublic CompletableFuture<Void> handleUserCreated(UserCreatedEvent event) {System.out.println("处理用户创建事件: " + event.getUserId());// 发送欢迎邮件sendWelcomeEmail(event.getEmail());// 初始化用户配置initializeUserSettings(event.getUserId());return CompletableFuture.completedFuture(null);}// 处理订单创建事件@EventListener@Asyncpublic CompletableFuture<Void> handleOrderCreated(OrderCreatedEvent event) {System.out.println("处理订单创建事件: " + event.getOrderId());// 发送订单确认邮件sendOrderConfirmation(event.getUserId(), event.getOrderNumber());// 更新库存updateInventory(event.getOrderId());return CompletableFuture.completedFuture(null);}private void sendWelcomeEmail(String email) {// 实现欢迎邮件发送}private void initializeUserSettings(Long userId) {// 实现用户设置初始化}private void sendOrderConfirmation(Long userId, String orderNumber) {// 实现订单确认邮件发送}private void updateInventory(Long orderId) {// 实现库存更新}
}
5. WebSocket实时通信
5.1 WebSocket配置
package com.example.demo.websocket;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {config.enableSimpleBroker("/topic", "/queue");config.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}
5.2 WebSocket控制器
package com.example.demo.websocket;import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.beans.factory.annotation.Autowired;
import java.security.Principal;@Controller
public class WebSocketController {@Autowiredprivate SimpMessagingTemplate messagingTemplate;// 广播消息@MessageMapping("/broadcast")@SendTo("/topic/messages")public String broadcastMessage(String message) {return "广播消息: " + message;}// 点对点消息@MessageMapping("/private")public void sendPrivateMessage(String message, Principal principal) {String username = principal.getName();messagingTemplate.convertAndSendToUser(username, "/queue/private", "私有消息: " + message);}// 群组消息@MessageMapping("/group")public void sendGroupMessage(String message, String groupId) {messagingTemplate.convertAndSend("/topic/group/" + groupId, "群组消息: " + message);}
}
5.3 WebSocket服务
package com.example.demo.websocket;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Service
public class WebSocketService {@Autowiredprivate SimpMessagingTemplate messagingTemplate;private final Map<String, String> userSessions = new ConcurrentHashMap<>();// 用户上线public void userOnline(String username, String sessionId) {userSessions.put(username, sessionId);messagingTemplate.convertAndSend("/topic/online", "用户 " + username + " 上线了");}// 用户下线public void userOffline(String username) {userSessions.remove(username);messagingTemplate.convertAndSend("/topic/online", "用户 " + username + " 下线了");}// 发送系统通知public void sendSystemNotification(String message) {messagingTemplate.convertAndSend("/topic/notification", message);}// 发送用户通知public void sendUserNotification(String username, String message) {messagingTemplate.convertAndSendToUser(username, "/queue/notification", message);}// 获取在线用户数public int getOnlineUserCount() {return userSessions.size();}
}
6. 消息队列监控
6.1 消息监控
package com.example.demo.monitoring;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.HashMap;@Service
public class MessageQueueMonitor {@Autowiredprivate RabbitTemplate rabbitTemplate;// 获取队列信息public Map<String, Object> getQueueInfo(String queueName) {Map<String, Object> info = new HashMap<>();try {// 获取队列消息数long messageCount = rabbitTemplate.execute(channel -> {return channel.messageCount(queueName);});// 获取消费者数long consumerCount = rabbitTemplate.execute(channel -> {return channel.consumerCount(queueName);});info.put("queueName", queueName);info.put("messageCount", messageCount);info.put("consumerCount", consumerCount);info.put("status", "healthy");} catch (Exception e) {info.put("queueName", queueName);info.put("status", "error");info.put("error", e.getMessage());}return info;}// 获取所有队列信息public Map<String, Object> getAllQueuesInfo() {Map<String, Object> allInfo = new HashMap<>();String[] queues = {"simple.queue", "object.queue", "confirm.queue"};for (String queue : queues) {allInfo.put(queue, getQueueInfo(queue));}return allInfo;}
}
6.2 消息统计
package com.example.demo.monitoring;import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicLong;@Component
public class MessageStatistics {private final Counter messageSentCounter;private final Counter messageReceivedCounter;private final Counter messageErrorCounter;private final AtomicLong messageProcessingTime;@Autowiredpublic MessageStatistics(MeterRegistry meterRegistry) {this.messageSentCounter = Counter.builder("messages.sent").description("Number of messages sent").register(meterRegistry);this.messageReceivedCounter = Counter.builder("messages.received").description("Number of messages received").register(meterRegistry);this.messageErrorCounter = Counter.builder("messages.errors").description("Number of message processing errors").register(meterRegistry);this.messageProcessingTime = meterRegistry.gauge("messages.processing.time", new AtomicLong(0));}public void incrementSentMessages() {messageSentCounter.increment();}public void incrementReceivedMessages() {messageReceivedCounter.increment();}public void incrementErrorMessages() {messageErrorCounter.increment();}public void recordProcessingTime(long time) {messageProcessingTime.set(time);}
}
7. 消息队列最佳实践
7.1 消息设计
package com.example.demo.bestpractice;import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.HashMap;public class MessageTemplate {@JsonProperty("messageId")private String messageId;@JsonProperty("messageType")private String messageType;@JsonProperty("timestamp")private LocalDateTime timestamp;@JsonProperty("payload")private Object payload;@JsonProperty("headers")private Map<String, Object> headers;public MessageTemplate() {this.timestamp = LocalDateTime.now();this.headers = new HashMap<>();}public MessageTemplate(String messageType, Object payload) {this();this.messageType = messageType;this.payload = payload;}// 添加头部信息public MessageTemplate addHeader(String key, Object value) {this.headers.put(key, value);return this;}// 获取头部信息public Object getHeader(String key) {return this.headers.get(key);}// getter和setter方法public String getMessageId() { return messageId; }public void setMessageId(String messageId) { this.messageId = messageId; }public String getMessageType() { return messageType; }public void setMessageType(String messageType) { this.messageType = messageType; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public Object getPayload() { return payload; }public void setPayload(Object payload) { this.payload = payload; }public Map<String, Object> getHeaders() { return headers; }public void setHeaders(Map<String, Object> headers) { this.headers = headers; }
}
7.2 消息处理策略
package com.example.demo.bestpractice;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;@Component
public class MessageProcessingStrategy {private final AtomicInteger retryCount = new AtomicInteger(0);private final int maxRetries = 3;// 重试策略@RabbitListener(queues = "retry.queue")public void handleMessageWithRetry(String message) {try {processMessage(message);retryCount.set(0); // 重置重试计数} catch (Exception e) {int currentRetry = retryCount.incrementAndGet();if (currentRetry <= maxRetries) {System.out.println("消息处理失败,重试第 " + currentRetry + " 次: " + e.getMessage());// 重新发送消息到重试队列resendMessage(message);} else {System.err.println("消息处理失败,已达到最大重试次数: " + e.getMessage());// 发送到死信队列sendToDeadLetterQueue(message, e);}}}// 幂等性处理@RabbitListener(queues = "idempotent.queue")public void handleIdempotentMessage(String message) {String messageId = extractMessageId(message);if (isMessageProcessed(messageId)) {System.out.println("消息已处理,跳过: " + messageId);return;}try {processMessage(message);markMessageAsProcessed(messageId);} catch (Exception e) {System.err.println("消息处理失败: " + e.getMessage());}}// 批量处理@RabbitListener(queues = "batch.queue", containerFactory = "batchListenerContainerFactory")public void handleBatchMessages(List<String> messages) {System.out.println("批量处理消息,数量: " + messages.size());for (String message : messages) {try {processMessage(message);} catch (Exception e) {System.err.println("批量消息处理失败: " + e.getMessage());}}}private void processMessage(String message) {// 实现消息处理逻辑}private void resendMessage(String message) {// 实现消息重新发送}private void sendToDeadLetterQueue(String message, Exception e) {// 实现死信队列发送}private String extractMessageId(String message) {// 实现消息ID提取return null;}private boolean isMessageProcessed(String messageId) {// 实现消息处理状态检查return false;}private void markMessageAsProcessed(String messageId) {// 实现消息处理状态标记}
}
8. 总结
Spring Boot消息队列与事件驱动提供了完整的异步通信解决方案:
- 消息队列:RabbitMQ、Kafka等消息中间件
- 事件驱动:领域事件、事件发布、事件处理
- 实时通信:WebSocket、STOMP协议
- 监控运维:消息监控、性能统计、错误处理
- 最佳实践:消息设计、处理策略、幂等性、批量处理
通过合理使用这些技术,可以构建出高可用、高性能的分布式系统。