当前位置: 首页 > news >正文

RabbitMQ ③-Spring使用RabbitMQ

在这里插入图片描述

Spring使用RabbitMQ

创建 Spring 项目后,引入依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml

spring:application:name: spring-rabbitmq-demorabbitmq:
#    host: 47.94.9.33
#    port: 5672
#    username: admin
#    password: admin
#    virtual-host: /addresses: amqp://admin:admin@47.94.9.33:5672/

Work-Queue(工作队列模式)

声明队列

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 工作队列模式@Beanpublic Queue workQueue() {return QueueBuilder.durable(Constants.WORK_QUEUE).build();}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~ " + i;// ? 当使用默认交换机时,routingKey 和队列名称保持一致rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);}log.info("消息发送成功");return "消息发送成功";}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class WorkListener {private static final Logger log = LoggerFactory.getLogger(WorkListener.class);@RabbitListener(queues = Constants.WORK_QUEUE)public void process1(Message message, Channel channel) {log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);}@RabbitListener(queues = Constants.WORK_QUEUE)public void process2(String message) {log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);}
}

Publish/Subscribe(发布/订阅模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 发布订阅模式@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueue1")public Queue fanoutQueue1 () {return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2 () {return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("bindingFanout1")public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}@Bean("bindingFanout2")public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;
package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/fanout")public String fanout() {for (int i = 0; i < 10; i++) {String msg = "hello publish fanout mode~ " + i;// ? 当使用默认交换机时,routingKey 和队列名称保持一致rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);}log.info("消息发送成功");return "消息发送成功";}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutListener {private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void process1(String message) {log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void process2(String message) {log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);}
}

Routing(路由模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 路由模式@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("bindingDirect1")public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}@Bean("bindingDirect2")public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}@Bean("bindingDirect3")public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("black");}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);log.info("消息发送成功:{}", routingKey);return "消息发送成功:" + routingKey;}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectListener {private static final Logger log = LoggerFactory.getLogger(DirectListener.class);@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void process1(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void process2(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);}
}

Topics(通配符模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 通配符模式@Bean("topicExchange")public TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("bindingTopic1")public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.orange.*");}@Bean("bindingTopic2")public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit");}@Bean("bindingTopic3")public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("lazy.#");}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);log.info("消息发送成功:{}", routingKey);return "消息发送成功:" + routingKey;}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicListener {private static final Logger log = LoggerFactory.getLogger(TopicListener.class);@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void process1(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void process2(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);}
}

相关文章:

  • 段错误(Segmentation Fault)总结
  • Java MVC
  • 【HarmonyOS Next之旅】DevEco Studio使用指南(二十二)
  • Java使用POI+反射灵活的控制字段导出Excel
  • 18.three官方示例+编辑器+AI快速学习webgl_buffergeometry_points_interleaved
  • 神经网络初步学习——感知机
  • 《步进电机最小转速终极指南:从理论到实战,突破低速极限的5大秘技》
  • 了解神经网络声音定制,实现多情绪、多语言演绎
  • 推理加速新范式:火山引擎高性能分布式 KVCache (EIC)核心技术解读
  • 搜索二维矩阵 II 算法讲解
  • 矩阵置零算法讲解
  • 使用 AddressSanitizer 检测栈内存越界错误
  • 什么是数据集市(Data Mart)?
  • 如何查看电脑处理器配置 电脑处理器查看方法
  • Koa知识框架
  • 菊厂0510面试手撕题目解答
  • 一、HAL库的设计理念详解:从架构到实践
  • 简述DNS域名服务器
  • 前端面试每日三题 - Day 32
  • Browserless 快速上手
  • 张笑宇:物质极大丰富之后,我们该怎么办?
  • 观众走入剧院空间,人艺之友一起“再造时光”
  • 全球医药股普跌,A股创新药板块下挫
  • 第一集丨《亲爱的仇敌》和《姜颂》,都有耐人寻味的“她”
  • 普京提议恢复直接谈判,泽连斯基:望俄明日停火,乌愿谈判
  • 巴西总统卢拉昨晚抵达北京