当前位置: 首页 > wzjs >正文

合肥网站建设公司 千鸟装修房子的效果图 三室二厅二卫

合肥网站建设公司 千鸟,装修房子的效果图 三室二厅二卫,网站上怎么做图片变换动图,wordpress文章支持多形式目录 1. springboot配置 2. 开始写RabbitMq代码 3. 队列优化 4. 插件实现延迟队列 5. 总结 前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加…

目录

1. springboot配置

2. 开始写RabbitMq代码

 3. 队列优化

4. 插件实现延迟队列

5. 总结


前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

1. springboot配置

  1. 创建一个 Maven 工程或者 Spring Boot工程

  2. 添加依赖坐标,这里的 Spring Boot 是3.4.3 版本

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.51</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Knife4j API文档生产工具 --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.4.0</version></dependency><!-- swagger注解支持:Knife4j依赖本依赖 --><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>1.5.22</version></dependency></dependencies>

        3.创建 application.yml 文件

server:port: 8888
spring:rabbitmq:host: 你的服务器ipport: 5672username: adminpassword: 123456

这里是 8808 端口,可根据需求决定端口

        4. 配置swgger

@Configuration
public class Knife4jConfig {@Beanpublic OpenAPI springShopOpenAPI() {return new OpenAPI()// 接口文档标题.info(new Info().title("接口文档")// 接口文档简介.description("RabbitMq测试文档")// 接口文档版本.version("v1.0")// 开发者联系方式.contact(new Contact().name("luckily").email("3298244978@qq.com")));}}

        5. 新建主启动类

@Log4j2
@SpringBootApplication
public class RabbitMqDemoApplication {public static void main(String[] args) {SpringApplication app = new SpringApplication(RabbitMqDemoApplication.class);Environment env = app.run(args).getEnvironment();app.setBannerMode(Banner.Mode.CONSOLE);logApplicationStartup(env);}private static void logApplicationStartup(Environment env) {String protocol = "http";if (env.getProperty("server.ssl.key-store") != null) {protocol = "https";}String serverPort = env.getProperty("server.port");String contextPath = env.getProperty("server.servlet.context-path");if (StringUtils.isBlank(contextPath)) {contextPath = "/doc.html";} else {contextPath = contextPath + "/doc.html";}String hostAddress = "localhost";try {hostAddress = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {log.warn("The host name could not be determined, using `localhost` as fallback");}log.info("""----------------------------------------------------------\t应用程序“{}”正在运行中......\t接口文档访问 URL:\t本地: \t\t{}://localhost:{}{}\t外部: \t{}://{}:{}{}\t配置文件: \t{}----------------------------------------------------------""",env.getProperty("spring.application.name"),protocol,serverPort,contextPath,protocol,hostAddress,serverPort,contextPath,env.getActiveProfiles());}}

2. 开始写RabbitMq代码

代码架构

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wugMpbMe-1630999921193)(D:\学习资料\图片\image-20210902150742461.png)]

代码实现

配置类代码

/*
* TTL队列 配置文件类代码
*
* */
@Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String  X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码

 /** 发送延迟消息* */@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}
}

消费者

/** 队列TTL 消费者* */
@Slf4j
@Component
public class DeadLetterQueueConsumer {//接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

效果:

 3. 队列优化

问题:

第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求

代码架构图

增加一个队列QC实现动态延时

实现

配置文件类

 /** TTL队列 配置文件类代码** */@Configurationpublic class TtlQueueConfig {//普通交换机的名称public static final String  X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明QC队列@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable().withArguments(arguments).build();}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

生产者

/** 发送延迟消息* */
@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}//开始发消息@GetMapping("sendExpirationMsg/{message}/{ttlTime}")@Operation(summary = "Rabbitmq发送消息与延迟时间")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg->{//发送消息的时候 延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}

消费者

消费者代码不改变

效果:本来消息2是延迟2秒,消息1延迟20秒,消息2似乎要比消息1更早接收,但因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这是队列特性

怎么弥补这个缺陷,需要用到Rabbitmq的插件实现延迟队列

4. 插件实现延迟队列

我们之前延迟消息是在队列进行延迟,安装插件之后是在交换机进行延迟

  • 下载延迟插件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
  • 将延迟插件放到RabbitMQ的插件目录下:由于我通过docker容器安装的rabbitmq,所以我将安装包先通过xftp发送给主机,在通过docker命令给容器,进入容器安装

复制给容器

进入容器

启动插件

重启容器

docker restart 7af

网页端出现以下就表示成功安装插件

实战

配置文件类

 @Configurationpublic class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);};//声明交换机@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

 /** 发送延迟消息* */@Slf4j@RestController@RequestMapping("/ttl")public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息 基于插件的 消息 及 延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {// 发送消息的时候 延迟时长 单位msmsg.getMessageProperties().setDelay(delayTime);return msg;});}}

消费者

 // 消费者代码 基于插件的延迟消息@Slf4j@Componentpublic class DelayQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void recieveDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}

效果

成功!

5. 总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用
RabbitMQ.的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis.的zsset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景

http://www.dtcms.com/wzjs/544665.html

相关文章:

  • 网站建设首选公司seo外包公司排名
  • 电子商城网站开发流程网站建设设计报告
  • 微网站开发合同wordpress表格显示图片
  • 免费建站网站seo怎么样购买网站空间
  • 做网站需要注意的点赶集网网站建设分析
  • 如何免费制作企业网站国内消息最新新闻
  • 教育视频培训网站建设网络科技公司取名
  • 网站建设项目进度计划书银川专业做网站的公司
  • 北京市住房建设官网站合肥市建设工程信息网官网
  • 雏鸟app网站推广网页微博版
  • 二级医院做网站wordpress编辑模板
  • 嘉兴网站建设多少钱国企网站建设合同
  • 桐城市做网站怎么申请自媒体平台账号
  • 娱乐网站的特点怎么进行网站诊断
  • 互联网行业招聘网站青浦网站建设su35
  • 建站视频网站开发维护
  • 网站制作完工验收单网站时间显示
  • .网站链接策略怎么建设自己产品网站
  • 字体设计类网站电子商城网站开发与设计
  • 网站建设模块化实现莱芜免费发布信息网
  • 网站免费建站ppa如何提高网站的访问速度
  • 网站和网业的关系怎么在网上卖产品
  • php 关闭网站深圳龙华网站公司
  • 餐饮网站模板网站开发人员职责
  • jquery win8风格企业网站模板扫黄除恶网站构造结构怎么做
  • 建网站的目的长沙建网站公司
  • 网站页脚内容个人如何做百度推广
  • 免费行情软件app网站排行温州做外贸网站
  • 网站特色怎么写公司logo标志设计免费
  • 四川住房城乡和城乡建设厅网站如何在网站后台备份数据库