Spring Boot 与 RabbitMQ 的深度集成实践(二)
集成步骤详解
配置 RabbitMQ 连接信息
在 Spring Boot 项目中,通常在application.properties或application.yml文件中配置 RabbitMQ 的连接信息。以application.yml为例,配置如下:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual - host: /
上述配置中:
- host指定了 RabbitMQ 服务器的地址,这里假设 RabbitMQ 运行在本地。
- port指定了 RabbitMQ 服务的端口,默认端口是 5672。
- username和password是连接 RabbitMQ 服务器所需的用户名和密码,默认的用户名和密码都是guest。不过在实际生产环境中,应使用更安全的用户名和密码组合。
- virtual - host表示虚拟主机,它用于逻辑隔离不同的应用或业务模块之间的消息队列。这里使用根虚拟主机/,在实际应用中,可以根据业务需求创建不同的虚拟主机。例如,对于不同的业务线,可以分别使用/business1、/business2等虚拟主机,每个虚拟主机有独立的队列、交换机和权限控制 。
创建消息队列和交换机
通过配置类可以方便地创建消息队列、交换机以及它们之间的绑定关系。在 Spring Boot 中,使用@Configuration注解来定义配置类,并使用@Bean注解创建所需的 Bean。
下面分别展示direct、topic、fanout类型交换机的配置示例:
Direct Exchange(直连交换机)配置示例:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
// 定义队列
@Bean
public Queue directQueue() {
return new Queue("direct.queue", true);
}
// 定义直连交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange", true, false);
}
// 绑定队列和交换机,设置路由键
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");
}
}
在这个配置中:
- directQueue方法创建了一个名为direct.queue的队列,true表示该队列是持久化的,即 RabbitMQ 服务器重启后队列依然存在。
- directExchange方法创建了一个名为direct.exchange的直连交换机,同样设置为持久化。
- directBinding方法通过BindingBuilder将队列和交换机进行绑定,并指定了路由键direct.routing.key。直连交换机的特点是根据路由键进行精确匹配,只有当消息的路由键与绑定的路由键完全一致时,消息才会被路由到对应的队列。
Topic Exchange(主题交换机)配置示例:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
// 定义第一个队列
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1", true);
}
// 定义第二个队列
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2", true);
}
// 定义主题交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange", true, false);
}
// 绑定第一个队列和交换机,设置路由键模式为topic.#
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");
}
// 绑定第二个队列和交换机,设置路由键模式为topic.man
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.man");
}
}
这里:
- topicQueue1和topicQueue2分别创建了两个持久化队列。
- topicExchange创建了一个名为topic.exchange的主题交换机。
- topicBinding1将topicQueue1与topicExchange绑定,并使用topic.#作为路由键模式,其中#是通配符,表示匹配零个或多个单词。这意味着只要消息的路由键以topic.开头,就会被路由到topicQueue1。
- topicBinding2将topicQueue2与topicExchange绑定,路由键模式为topic.man,只有当消息的路由键为topic.man时,消息才会被路由到topicQueue2。主题交换机通过通配符模式实现了更灵活的消息路由,适用于需要根据不同规则进行消息分发的场景 。
Fanout Exchange(扇形交换机)配置示例:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
// 定义第一个队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1", true);
}
// 定义第二个队列
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2", true);
}
// 定义扇形交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange", true, false);
}
// 绑定第一个队列和交换机
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
// 绑定第二个队列和交换机
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
在这个配置类中:
- fanoutQueue1和fanoutQueue2创建了两个持久化队列。
- fanoutExchange创建了一个名为fanout.exchange的扇形交换机。
- fanoutBinding1和fanoutBinding2分别将两个队列与扇形交换机进行绑定。扇形交换机的特点是将消息广播到所有与之绑定的队列,而不考虑路由键,只要有队列绑定到该交换机,消息就会被发送到这些队列,适用于需要将消息同时发送给多个消费者的场景 。
编写消息生产者
创建一个消息生产者类,通过注入RabbitTemplate来发送消息。RabbitTemplate是 Spring AMQP 提供的用于发送消息的核心类,它封装了与 RabbitMQ 交互的细节,提供了便捷的消息发送方法。
以下是一个消息生产者类的示例,包含普通消息发送和带自定义属性消息发送的示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送普通消息
public void sendMessage(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
System.out.println("Sent message: " + message);
}
// 发送带自定义属性的消息
public void sendMessageWithProperties(String exchange, String routingKey, String message) {
Map<String, Object> headers = new HashMap<>();
headers.put("customHeader", "customValue");
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setHeaders(headers);
return messagePostProcessor;
});
System.out.println("Sent message with properties: " + message);
}
}
在这个生产者类中:
- sendMessage方法接收交换机名称、路由键和消息内容作为参数,通过rabbitTemplate.convertAndSend方法将消息发送到指定的交换机和队列。该方法会根据路由键将消息路由到对应的队列,如果路由键匹配成功,消息就会被存储在队列中等待消费者获取。
- sendMessageWithProperties方法展示了如何发送带有自定义属性的消息。首先创建一个Map来存储自定义属性,这里添加了一个名为customHeader,值为customValue的属性。然后使用rabbitTemplate.convertAndSend的另一个重载方法,该方法接收一个MessagePostProcessor作为参数。在MessagePostProcessor的回调函数中,将自定义属性设置到消息的属性中,从而实现发送带有自定义属性的消息 。这种方式在实际应用中非常有用,例如可以通过自定义属性来传递一些额外的元数据,如消息的优先级、创建时间等,以便消费者在处理消息时根据这些属性进行不同的处理逻辑。
编写消息消费者
创建消息消费者类,使用@RabbitListener注解来监听指定的队列,并处理接收到的消息。@RabbitListener是 Spring AMQP 提供的注解,用于声明一个方法作为消息监听器,当队列中有新消息时,该方法会被自动调用。
以下是一个消息消费者类的示例,涵盖消息处理逻辑和异常处理:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQConsumer {
@RabbitListener(queues = "direct.queue")
public void receiveMessage(String message) {
try {
// 消息处理逻辑
System.out.println("Received message: " + message);
// 模拟业务处理
Thread.sleep(1000);
System.out.println("Message processed successfully.");
} catch (Exception e) {
// 异常处理
System.out.println("Error processing message: " + e.getMessage());
// 可以根据具体业务需求进行重试、记录日志或其他操作
}
}
}
在这个消费者类中:
- @RabbitListener(queues = "direct.queue")注解表示该方法监听名为direct.queue的队列。当该队列中有消息时,receiveMessage方法会被触发。
- 在receiveMessage方法中,首先打印接收到的消息,然后通过Thread.sleep(1000)模拟业务处理过程,这里让线程睡眠 1 秒来模拟一些耗时操作。如果在处理过程中发生异常,会捕获异常并打印错误信息。在实际应用中,可以根据业务需求进行更复杂的异常处理,例如将消息放入死信队列进行后续处理,或者进行消息重试。如果是因为网络波动等临时原因导致消息处理失败,可以设置重试机制,让消息重新被消费;如果是因为消息内容本身错误等不可恢复的原因,可以将消息放入死信队列,以便后续进行人工处理或分析 。通过合理的异常处理,可以提高系统的稳定性和可靠性,确保消息能够被正确处理。