当前位置: 首页 > news >正文

Spring Boot 与 RabbitMQ 的深度集成实践(二)

集成步骤详解

配置 RabbitMQ 连接信息

在 Spring Boot 项目中,通常在application.properties或application.yml文件中配置 RabbitMQ 的连接信息。以application.yml为例,配置如下:

 

spring:

rabbitmq:

host: localhost

port: 5672

username: guest

password: guest

virtual - host: /

上述配置中:

  • host指定了 RabbitMQ 服务器的地址,这里假设 RabbitMQ 运行在本地。
  • port指定了 RabbitMQ 服务的端口,默认端口是 5672。
  • username和password是连接 RabbitMQ 服务器所需的用户名和密码,默认的用户名和密码都是guest。不过在实际生产环境中,应使用更安全的用户名和密码组合。
  • virtual - host表示虚拟主机,它用于逻辑隔离不同的应用或业务模块之间的消息队列。这里使用根虚拟主机/,在实际应用中,可以根据业务需求创建不同的虚拟主机。例如,对于不同的业务线,可以分别使用/business1、/business2等虚拟主机,每个虚拟主机有独立的队列、交换机和权限控制 。

创建消息队列和交换机

通过配置类可以方便地创建消息队列、交换机以及它们之间的绑定关系。在 Spring Boot 中,使用@Configuration注解来定义配置类,并使用@Bean注解创建所需的 Bean。

下面分别展示direct、topic、fanout类型交换机的配置示例:

Direct Exchange(直连交换机)配置示例

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class DirectRabbitConfig {

// 定义队列

@Bean

public Queue directQueue() {

return new Queue("direct.queue", true);

}

// 定义直连交换机

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct.exchange", true, false);

}

// 绑定队列和交换机,设置路由键

@Bean

public Binding directBinding() {

return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");

}

}

在这个配置中:

  • directQueue方法创建了一个名为direct.queue的队列,true表示该队列是持久化的,即 RabbitMQ 服务器重启后队列依然存在。
  • directExchange方法创建了一个名为direct.exchange的直连交换机,同样设置为持久化。
  • directBinding方法通过BindingBuilder将队列和交换机进行绑定,并指定了路由键direct.routing.key。直连交换机的特点是根据路由键进行精确匹配,只有当消息的路由键与绑定的路由键完全一致时,消息才会被路由到对应的队列。

Topic Exchange(主题交换机)配置示例

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class TopicRabbitConfig {

// 定义第一个队列

@Bean

public Queue topicQueue1() {

return new Queue("topic.queue1", true);

}

// 定义第二个队列

@Bean

public Queue topicQueue2() {

return new Queue("topic.queue2", true);

}

// 定义主题交换机

@Bean

public TopicExchange topicExchange() {

return new TopicExchange("topic.exchange", true, false);

}

// 绑定第一个队列和交换机,设置路由键模式为topic.#

@Bean

public Binding topicBinding1() {

return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");

}

// 绑定第二个队列和交换机,设置路由键模式为topic.man

@Bean

public Binding topicBinding2() {

return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.man");

}

}

这里:

  • topicQueue1和topicQueue2分别创建了两个持久化队列。
  • topicExchange创建了一个名为topic.exchange的主题交换机。
  • topicBinding1将topicQueue1与topicExchange绑定,并使用topic.#作为路由键模式,其中#是通配符,表示匹配零个或多个单词。这意味着只要消息的路由键以topic.开头,就会被路由到topicQueue1。
  • topicBinding2将topicQueue2与topicExchange绑定,路由键模式为topic.man,只有当消息的路由键为topic.man时,消息才会被路由到topicQueue2。主题交换机通过通配符模式实现了更灵活的消息路由,适用于需要根据不同规则进行消息分发的场景 。

Fanout Exchange(扇形交换机)配置示例

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class FanoutRabbitConfig {

// 定义第一个队列

@Bean

public Queue fanoutQueue1() {

return new Queue("fanout.queue1", true);

}

// 定义第二个队列

@Bean

public Queue fanoutQueue2() {

return new Queue("fanout.queue2", true);

}

// 定义扇形交换机

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanout.exchange", true, false);

}

// 绑定第一个队列和交换机

@Bean

public Binding fanoutBinding1() {

return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());

}

// 绑定第二个队列和交换机

@Bean

public Binding fanoutBinding2() {

return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());

}

}

在这个配置类中:

  • fanoutQueue1和fanoutQueue2创建了两个持久化队列。
  • fanoutExchange创建了一个名为fanout.exchange的扇形交换机。
  • fanoutBinding1和fanoutBinding2分别将两个队列与扇形交换机进行绑定。扇形交换机的特点是将消息广播到所有与之绑定的队列,而不考虑路由键,只要有队列绑定到该交换机,消息就会被发送到这些队列,适用于需要将消息同时发送给多个消费者的场景 。

编写消息生产者

创建一个消息生产者类,通过注入RabbitTemplate来发送消息。RabbitTemplate是 Spring AMQP 提供的用于发送消息的核心类,它封装了与 RabbitMQ 交互的细节,提供了便捷的消息发送方法。

以下是一个消息生产者类的示例,包含普通消息发送和带自定义属性消息发送的示例:

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.HashMap;

import java.util.Map;

@Service

public class RabbitMQProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

// 发送普通消息

public void sendMessage(String exchange, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchange, routingKey, message);

System.out.println("Sent message: " + message);

}

// 发送带自定义属性的消息

public void sendMessageWithProperties(String exchange, String routingKey, String message) {

Map<String, Object> headers = new HashMap<>();

headers.put("customHeader", "customValue");

rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor -> {

messagePostProcessor.getMessageProperties().setHeaders(headers);

return messagePostProcessor;

});

System.out.println("Sent message with properties: " + message);

}

}

在这个生产者类中:

  • sendMessage方法接收交换机名称、路由键和消息内容作为参数,通过rabbitTemplate.convertAndSend方法将消息发送到指定的交换机和队列。该方法会根据路由键将消息路由到对应的队列,如果路由键匹配成功,消息就会被存储在队列中等待消费者获取。
  • sendMessageWithProperties方法展示了如何发送带有自定义属性的消息。首先创建一个Map来存储自定义属性,这里添加了一个名为customHeader,值为customValue的属性。然后使用rabbitTemplate.convertAndSend的另一个重载方法,该方法接收一个MessagePostProcessor作为参数。在MessagePostProcessor的回调函数中,将自定义属性设置到消息的属性中,从而实现发送带有自定义属性的消息 。这种方式在实际应用中非常有用,例如可以通过自定义属性来传递一些额外的元数据,如消息的优先级、创建时间等,以便消费者在处理消息时根据这些属性进行不同的处理逻辑。

编写消息消费者

创建消息消费者类,使用@RabbitListener注解来监听指定的队列,并处理接收到的消息。@RabbitListener是 Spring AMQP 提供的注解,用于声明一个方法作为消息监听器,当队列中有新消息时,该方法会被自动调用。

以下是一个消息消费者类的示例,涵盖消息处理逻辑和异常处理:

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class RabbitMQConsumer {

@RabbitListener(queues = "direct.queue")

public void receiveMessage(String message) {

try {

// 消息处理逻辑

System.out.println("Received message: " + message);

// 模拟业务处理

Thread.sleep(1000);

System.out.println("Message processed successfully.");

} catch (Exception e) {

// 异常处理

System.out.println("Error processing message: " + e.getMessage());

// 可以根据具体业务需求进行重试、记录日志或其他操作

}

}

}

在这个消费者类中:

  • @RabbitListener(queues = "direct.queue")注解表示该方法监听名为direct.queue的队列。当该队列中有消息时,receiveMessage方法会被触发。
  • 在receiveMessage方法中,首先打印接收到的消息,然后通过Thread.sleep(1000)模拟业务处理过程,这里让线程睡眠 1 秒来模拟一些耗时操作。如果在处理过程中发生异常,会捕获异常并打印错误信息。在实际应用中,可以根据业务需求进行更复杂的异常处理,例如将消息放入死信队列进行后续处理,或者进行消息重试。如果是因为网络波动等临时原因导致消息处理失败,可以设置重试机制,让消息重新被消费;如果是因为消息内容本身错误等不可恢复的原因,可以将消息放入死信队列,以便后续进行人工处理或分析 。通过合理的异常处理,可以提高系统的稳定性和可靠性,确保消息能够被正确处理。

相关文章:

  • FloodFill算法:洪水般的图像处理艺术
  • 网络安全利器:蜜罐技术详解
  • 【Java ee初阶】jvm(1)
  • 【IPMV】图像处理与机器视觉:Lec10 Edges and Lines
  • Linux STM32 电脑 之间的关系 为何选择Linux
  • NetApp FAS存储系统的加密Encrytpion解决方案介绍
  • 实时时钟项目设计
  • 实习记录小程序|基于SSM+Vue的实习记录小程序设计与实现(源码+数据库+文档)
  • 【微信小程序 + 高德地图API 】键入关键字搜索地址,获取经纬度等
  • 【从基础到模型网络】深度学习-语义分割-基础
  • 【深度学习新浪潮】大模型在哪些垂域已经有比较好的落地?
  • OpenCV-去噪效果和评估指标方法
  • C++多线程数据错乱
  • 常见的请求头(Request Header)参数
  • SpringMVC-拦截器
  • 虚幻引擎5-Unreal Engine笔记之`GameMode`、`关卡(Level)` 和 `关卡蓝图(Level Blueprint)`的关系
  • 从0到1吃透卷积神经网络(CNN):原理与实战全解析
  • Linux安全第三章-系统安全及应用
  • vscode优化使用体验篇(快捷键)
  • 【Leetcode】取余/2的幂次方
  • 浙江一家长称小学老师打孩子还威胁要从3楼扔下,当地警方已立案
  • 从近200件文物文献里,回望光华大学建校百年
  • 终于,俄罗斯和乌克兰谈上了
  • 北方将现今年首场大范围高温天气,山西河南山东陕西局地可超40℃
  • 国家统计局向多省份反馈统计督察意见
  • 哪种“网红减肥法”比较靠谱?医学专家和运动专家共同解答