当前位置: 首页 > news >正文

Spring Boot集合RabbitMQ

RabbitMQ在SpringBoot中的使用

Spring官网中如何在Spring中配置RabbitMQ:AMQP :: Spring Boot

在创建项目的时候导入其依赖

添加配置项

工作队列中消息的发送

创建一个队列

@Configuration
public class RabbitMQConfig {//    当spring启动的时候,自动创建,并且注册一个RabbitMQ队列@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(Constants.WORK_QUEUE123).build();}
}

将队列名称定义为一个常量

    public static final String WORK_QUEUE123="work.queue";

进行消息的发送

@RestController
@RequestMapping("/rabbit")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {for (int i = 0; i < 10; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE123, "hello rabbitMQ Spring Boot");}return "消息发送成功";}
}

消息的接收

@Component
public class WorkListener {@RabbitListener(queues = Constants.WORK_QUEUE123)public void queueListener1(Message message ,Channel channel){System.out.println("监听的队列"+Constants.WORK_QUEUE123+"收到的消息"+message+"channel"+channel);}@RabbitListener(queues = Constants.WORK_QUEUE123)public void queueListener2(String message){System.out.println("监听的队列"+Constants.WORK_QUEUE123+"收到的消息"+message);}}

发布订阅模式(Publish/Subscribe)

在发布订阅模式中多了一个新的角色 Exchange(交换机)

创建两个队列,一个交换机,在创建队列的时候,将队列定义为持久化

    @Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}

将队列和交换机进行绑定

@Bean("fanoutBinding1")public Binding fanoutBinding1(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange);}@Bean("fanoutBinding2")public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange);}

将其名字定义为常量

    public static final String FANOUT_QUEUE1="fanout.queue1";public static final String FANOUT_QUEUE2="fanout.queue2";public static final String FANOUT_EXCHANGE="fanout.exchange";

发送消息

@RequestMapping("/fanout")public String fanout() {for (int i = 0; i < 10; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello spring amqp:fanout...");}return "发送成功";}

接收消息

@Component
public class FanoutListener {@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void fanoutQueueListener1(String message){System.out.println("监听的队列"+Constants.FANOUT_QUEUE1+"收到的消息"+message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void fanoutQueueListener2(String message){System.out.println("监听的队列"+Constants.FANOUT_QUEUE2+"收到的消息"+message);}
}

路由模式(Routing)

定义队列和交换机

@Bean("directQueue1")public Queue directQueue1(){return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2(){return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}

定义队列和交换机的名字为常量

    public static final String DIRECT_QUEUE1="direct.queue1";public static final String DIRECT_QUEUE2="direct.queue2";public static final String DIRECT_EXCHANGE="direct.exchange";

将队列和交换机进行绑定,并且定义rountingKey

    @Bean("directBlinding1")public Binding directBlinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("orange");}@Bean("directBlinding2")public Binding directBlinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("black");}@Bean("directBlinding3")public Binding directBlinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("orange");}

发送消息

@RequestMapping("/directByOrange")public String directByOrange() {for (int i = 0; i < 4; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, "orange", "<orange>hello spring amqp:direct...");}return "发送成功";}@RequestMapping("/directByBlack")public String directByBlack() {for (int i = 0; i < 4; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, "black", "<black>hello spring amqp:direct...");}return "发送成功";}

接收消息

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void directQueueListener1(String message){System.out.println("监听的队列"+Constants.DIRECT_QUEUE1+"收到的消息"+message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void directQueueListener2(String message){System.out.println("监听的队列"+Constants.DIRECT_QUEUE2+"收到的消息"+message);}
}

通过rountingKey:orange发送消息

通过rountingKey:black发送消息

通配符模式(Topics)

创建队列和交换机

@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}

定义队列和交换机的名字为常量

    public static final String TOPIC_QUEUE1="topic.queue1";public static final String TOPIC_QUEUE2="topic.queue2";public static final String TOPIC_EXCHANGE="topic.exchange";

将队列和交换机进行绑定

定义rountingKey的通配符规则

@Bean("topicBlinding1")public Binding topicBlinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.mm");}@Bean("topicBlinding2")public Binding topicBlinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.nn.*");}@Bean("topicBlinding3")public Binding topicBlinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("pp.#");}

发送消息

进行路径的匹配

@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey){System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is "+routingKey);return "发送成功";}

接受消息

@Component
public class TopicListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void topicQueueListener1(String message){System.out.println("监听的队列"+Constants.TOPIC_QUEUE1+"收到的消息"+message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void topicQueueListener2(String message){System.out.println("监听的队列"+Constants.TOPIC_QUEUE2+"收到的消息"+message);}
}

由此可见,发送的消息已经全部被消费

基于SpringBoot+RabbitMQ完成通信

实订单项目和物流项目之间的通信

在创建好项目之后,在项目中配置好RabbitMQ的信息

首先进行队列的申明

@Configuration
public class RabbitConfig {@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable("queue.build").build();}

在订单系统下单成功后,进行订单消息的发送

@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/createOrder")public String createOrder(){String orderId= UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","queue.build","下单成功"+orderId);return "下单成功";}

在物流系统中进行消息的接收


@Component
public class OrderListener {@RabbitListener(queues = "queue.build")public void listenerQueue(String message){System.out.println("接收到的消息"+message);}@RabbitHandler@RabbitListener(queues = "queue.build")public void ListenerQueue(UserInfo userinfo){System.out.println("接收到的消息"+userinfo);}
}

消息已经被消费

发送消息格式为对象

创建一个对象

@Data
public class UserInfo {private String messageId;private String messageName;
}

使用Json转换器,并将Json转换器绑定到自定义的RabbitTemplate,进行JSON 序列化,跨语言、可读、安全

//    定义一个Json消息转换器@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter){RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());return rabbitTemplate;}

进行对象消息的发送

@RequestMapping("/createOrder1")public String createOrder1(){String orderId= UUID.randomUUID().toString();UserInfo userInfo=new UserInfo();userInfo.setMessageId(orderId);userInfo.setMessageId("发送消息成功");rabbitTemplate.convertAndSend("","queue.build",userInfo);return "下单成功";}

先在物流系统中配置JSON 消息转换器

@Configuration
public class RabbitConfig {@Beanpublic Jackson2JsonMessageConverter converter() {return new Jackson2JsonMessageConverter();}

进行消息的接收

    @RabbitHandler@RabbitListener(queues = "queue.build")public void ListenerQueue(UserInfo userinfo){System.out.println("接收到的消息"+userinfo);}

http://www.dtcms.com/a/524132.html

相关文章:

  • 傻瓜式大型网站开发工具金融 网站 源码
  • 精准与安全并重!NHVOC-1 (C) 型便携式 VOCs 分析仪(PID + 催化氧化 - NDIR)深度解析
  • WPF ComboBox 样式
  • paddlenlp 3.x 版本使用uie-m-base报错找不到 static/inference.pdmodel
  • 郑州市有做网站的吗wordpress如何设置点击直接下载
  • 深度学习打卡第TR5周:Transformer实战:文本分类
  • 一个强大的开源OCR工具,基于DeepSeek OCR
  • 【AI工具】Lyra超级元提示词原文分享:颠覆AI交互逻辑的「提问式」优化工具
  • 企业级表单与文件上传统一管理方案
  • 报错解决:IEEE latex模版中thanks不显示 隶属关系 / 邮箱不显示
  • 第四章:向量数据库:解锁Embeddings价值的钥匙
  • 微信的微网站模板下载wordpress 后台502
  • 基于JavaWeb技术的在线考试系统设计与实现
  • Function Calling VS MCP
  • 找公司网站建设销售网页
  • C++仿muduo库高并发服务器项目:Channel模块
  • 网站开发前端php 后端python张家界seo
  • [特殊字符]兰亭妙微审美积累|总结三个情感化设计细节✨
  • 【数列求和】
  • 第一章-第二节-Cursor IDE与MCP集成.md
  • 做网站的的人收入多少钱wordpress 4.8.4 漏洞
  • 网站开发的英文书有什么如何做网站好看
  • 前端如何判断用户是否离开了当前页面?
  • Flutter项目搭建最佳实践
  • # AI高精度提示词生成项目——3D-VR 课件—— 最终仓库级 AI 提示词:生成《EduVR Studio》—— 专业级 3D-VR 课件创作平台
  • 巡检机器人落地攻略:RK3576驱动12路低延迟视觉
  • 网站开发 文件上传慢wordpress 上线到centos
  • 嘉兴网站建设多少钱广州装修公司口碑最好的是哪家
  • Docker Swarm 的负载均衡和平滑切换原理
  • RabbitMQ 发送方确认机制详解