当前位置: 首页 > news >正文

RabbitMQ ③-Spring使用RabbitMQ

在这里插入图片描述

Spring使用RabbitMQ

创建 Spring 项目后,引入依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml

spring:application:name: spring-rabbitmq-demorabbitmq:
#    host: 47.94.9.33
#    port: 5672
#    username: admin
#    password: admin
#    virtual-host: /addresses: amqp://admin:admin@47.94.9.33:5672/

Work-Queue(工作队列模式)

声明队列

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 工作队列模式@Beanpublic Queue workQueue() {return QueueBuilder.durable(Constants.WORK_QUEUE).build();}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~ " + i;// ? 当使用默认交换机时,routingKey 和队列名称保持一致rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);}log.info("消息发送成功");return "消息发送成功";}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class WorkListener {private static final Logger log = LoggerFactory.getLogger(WorkListener.class);@RabbitListener(queues = Constants.WORK_QUEUE)public void process1(Message message, Channel channel) {log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);}@RabbitListener(queues = Constants.WORK_QUEUE)public void process2(String message) {log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);}
}

Publish/Subscribe(发布/订阅模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 发布订阅模式@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueue1")public Queue fanoutQueue1 () {return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2 () {return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("bindingFanout1")public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}@Bean("bindingFanout2")public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;
package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/fanout")public String fanout() {for (int i = 0; i < 10; i++) {String msg = "hello publish fanout mode~ " + i;// ? 当使用默认交换机时,routingKey 和队列名称保持一致rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);}log.info("消息发送成功");return "消息发送成功";}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutListener {private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void process1(String message) {log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void process2(String message) {log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);}
}

Routing(路由模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 路由模式@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("bindingDirect1")public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}@Bean("bindingDirect2")public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}@Bean("bindingDirect3")public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("black");}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);log.info("消息发送成功:{}", routingKey);return "消息发送成功:" + routingKey;}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectListener {private static final Logger log = LoggerFactory.getLogger(DirectListener.class);@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void process1(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void process2(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);}
}

Topics(通配符模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 通配符模式@Bean("topicExchange")public TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("bindingTopic1")public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.orange.*");}@Bean("bindingTopic2")public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit");}@Bean("bindingTopic3")public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("lazy.#");}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);log.info("消息发送成功:{}", routingKey);return "消息发送成功:" + routingKey;}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicListener {private static final Logger log = LoggerFactory.getLogger(TopicListener.class);@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void process1(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void process2(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);}
}
http://www.dtcms.com/a/185512.html

相关文章:

  • 段错误(Segmentation Fault)总结
  • Java MVC
  • 【HarmonyOS Next之旅】DevEco Studio使用指南(二十二)
  • Java使用POI+反射灵活的控制字段导出Excel
  • 18.three官方示例+编辑器+AI快速学习webgl_buffergeometry_points_interleaved
  • 神经网络初步学习——感知机
  • 《步进电机最小转速终极指南:从理论到实战,突破低速极限的5大秘技》
  • 了解神经网络声音定制,实现多情绪、多语言演绎
  • 推理加速新范式:火山引擎高性能分布式 KVCache (EIC)核心技术解读
  • 搜索二维矩阵 II 算法讲解
  • 矩阵置零算法讲解
  • 使用 AddressSanitizer 检测栈内存越界错误
  • 什么是数据集市(Data Mart)?
  • 如何查看电脑处理器配置 电脑处理器查看方法
  • Koa知识框架
  • 菊厂0510面试手撕题目解答
  • 一、HAL库的设计理念详解:从架构到实践
  • 简述DNS域名服务器
  • 前端面试每日三题 - Day 32
  • Browserless 快速上手
  • 全栈工程师实战手册:LuatOS日志系统开发指南!
  • C 语言_可变参数宏详解
  • temu自养号采购如何解决多账号防关联问题
  • (done) 补充:xv6 的一个用户程序 init 是怎么启动的 ?它如何启动第一个 bash ?
  • ARM64内核内存空间布局
  • The 2024 Sichuan Provincial Collegiate Programming Contest部分题解(L,H,E,B,I)
  • Ethereum Pectra 的升级
  • TWASandGWAS中GBS filtering and GWAS(1)
  • 《Flutter社交应用暗黑奥秘:模式适配与色彩的艺术》
  • 使用PhpStudy搭建Web测试服务器