当前位置: 首页 > news >正文

微服务--消息队列mq

1. mq简介

        消息队列是分布式系统中的异步通信中间件,采用"生产者-消费者"模型实现服务间解耦通信

核心作用

  • 服务解耦
  • 异步处理
  • 流量削峰
  • 数据同步
  • 最终一致性

消息队列模式

  • 发布/订阅模式:一对多广播
  • 工作队列模式:竞争消费
  • 死信队列:处理失败消息
  • 延迟队列:定时任务处理
  • 消息回溯:Kafka按offset重新消费

2. mq入门

        使用SpringAMQP实现HelloWorld中的基础消息队列功能,一个生产者,一个队列,一个消费者

2.1 启动mq

        打开mq下载目录,输入命令(rabbitmq-server start)启动

网址localhost:15672访问,账号密码均为guest

2.2 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gaohe</groupId><artifactId>clouddemo</artifactId><packaging>pom</packaging><version>0.0.1-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><name>clouddemo</name><description>clouddemo</description><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.3.3</spring-boot.version><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.gaohe.clouddemo.ClouddemoApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

2.3 在yml配置文件中配置连接信息

spring:rabbitmq:host: localhost # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码

2.4 在publisher中利用RabbitTemplate发送信息到simple.queue队列

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;//    发送消息
@Test
public void test1(){
//        1.发送的队列String queueName1 ="hello.queue";
//        2.发送的消息String msg = "你好我哟一个帽衫";
//        3.发送rabbitTemplate.convertAndSend(queueName1,msg);
}}

2.5 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class HelloLisenner {@RabbitListener(queues = "hello.queue")public void helloQueueLisenner(String msg){System.out.println("helloQueueLisenner"+msg);}@RabbitListener(queues = "hello.queue")public void helloQueueLisenner2(String msg){System.out.println("helloQueueLisenner2"+msg);}}

3.交换机

        Exchange是消息队列系统中的消息路由中枢,负责接收生产者发送的消息并根据特定规则将消息路由到一个或多个队列中。

常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

3.1 路由交换机(FanoutExchange)

  • 在consumer服务创建一个类,添加注解,声明交换机,队列以及绑定关系对象
package com.gaohe.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FanoutConfig {//    交换机@Beanpublic FanoutExchange fanout1(){return new FanoutExchange("itgaohe.fanout");}//    定义队列@Beanpublic Queue queue1(){return new Queue("fanout.queue1");}//    队列绑定交换机@Beanpublic Binding binding1(FanoutExchange fanout1){return BindingBuilder.bind(queue1()).to(fanout1);}//    定义队列@Beanpublic Queue queue2(){return new Queue("fanout.queue2");}//    队列绑定交换机@Beanpublic Binding binding2(FanoutExchange fanout1){return BindingBuilder.bind(queue2()).to(fanout1);}
}

 

  • 在consumer服务中的监听类中添加方法进行监听
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutLisenner {@RabbitListener(queues = "fanout.queue1")public void fanoutQueueLisenner(String msg){System.out.println("fanoutQueueLisenner:"+msg);}@RabbitListener(queues = "fanout.queue2")public void fanoutQueueLisenner2(String msg){System.out.println("fanoutQueueLisenner2:"+msg);}
}

 

  • 在publisher服务创建测试类进行测试

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
//        1.发送的队列String exName ="itgaohe.fanout";
//        2.发送的消息String msg = "你好";
//        3.发送rabbitTemplate.convertAndSend(exName,"",msg);}}

3.2 路由交换机(DirectExchange)

        交换机,队列不仅可以单独配置,也可以在监听类使用注解进行配置

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"blue","red"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"yellow","red"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}

         publisher测试类进行测试

package com.gaohe.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
//        1.发送的队列String exName ="itgaohe.direct";
//        2.发送的消息String msg = "I LOVE YOU ";
//        3.发送rabbitTemplate.convertAndSend(exName,"yellow",msg);}}

3.3 广播交换机(TopicExchange)

  • 监听类
package com.gaohe.consumer.lisenner;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class TopicLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"china.#","#.weather"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"us.#","#.weather"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}
  • 测试类
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test4(){
//        1.发送的String exName ="itgaohe.topic";
//        2.发送的消息String msg = "hello world6666";
//        3.发送rabbitTemplate.convertAndSend(exName,"aa.weather",msg);}
}

        用的最多的是路由交换机和广播交换机

4. mq消息转换器

        消息转换器是消息中间件中的数据格式转换层,负责在消息生产/消费过程中实现:

  • Java对象 ↔ 消息体序列化/反序列化

  • 消息属性(headers/properties)的自动处理

  • 不同数据格式间的相互转换

配置消息转换器

  • 父工程导入依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
  • 给提供者和消费者配置消息转换器Bean对象
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class mqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
  • 定义消费者,监听队列并消费消息

  • 测试

 

相关文章:

  • 一个小错误:Content-Type ‘text/plain;charset=UTF-8‘ is not supported 的粗心
  • list使用及模拟
  • Vue3+Element Plus动态表格列宽设置
  • ShardingSphere实现分库分表
  • 比特币---第1关:矿工任务及所需硬件
  • 如何存储和和使用比特币---第1关:比特币的存储
  • 升级openssl后无法使用cmake和curl的解决方法
  • 【C/C++ 为什么 unique_ptr 不支持拷贝构造、赋值构造等操作】
  • 大模型_Ubuntu24.04安装RagFlow_使用hyper-v虚拟机_超级详细--人工智能工作笔记0251
  • ubuntu24.04.2安装docker自动化脚本
  • 强化学习 A2C算法
  • java 将多张图片合成gif动态图
  • 微服务--nacos+feign
  • NY197NY205美光闪存固态NY218NY226
  • 两个矩阵的卷积运算
  • 算法导论第五章:概率分析与随机算法的艺术
  • 篇章六 系统性能优化——资源优化——CPU优化(3)
  • 当空间与数据联动,会展中心如何打造智慧运营新范式?
  • 利用 Python 爬虫按关键字搜索 1688 商品
  • 学生端前端用户操作手册
  • wordpress 哪个好用/seo技巧是什么
  • 网站空间商那个好/自助建站的优势
  • 广州外贸网站效果/绍兴seo推广
  • 佛山微网站开发哪家好/东莞网站推广营销网站设计
  • 最大的网站/5118关键词工具
  • 黄石网站建设教程/做个小程序需要花多少钱