当前位置: 首页 > wzjs >正文

常宁市城市建设规划管理局网站经典品牌推广文案

常宁市城市建设规划管理局网站,经典品牌推广文案,网站程序复制,做网站要什么知识目录 1. springboot配置 2. 开始写RabbitMq代码 3. 队列优化 4. 插件实现延迟队列 5. 总结 前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加…

目录

1. springboot配置

2. 开始写RabbitMq代码

 3. 队列优化

4. 插件实现延迟队列

5. 总结


前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

1. springboot配置

  1. 创建一个 Maven 工程或者 Spring Boot工程

  2. 添加依赖坐标,这里的 Spring Boot 是3.4.3 版本

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.51</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Knife4j API文档生产工具 --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId><version>4.4.0</version></dependency><!-- swagger注解支持:Knife4j依赖本依赖 --><dependency><groupId>io.swagger</groupId><artifactId>swagger-annotations</artifactId><version>1.5.22</version></dependency></dependencies>

        3.创建 application.yml 文件

server:port: 8888
spring:rabbitmq:host: 你的服务器ipport: 5672username: adminpassword: 123456

这里是 8808 端口,可根据需求决定端口

        4. 配置swgger

@Configuration
public class Knife4jConfig {@Beanpublic OpenAPI springShopOpenAPI() {return new OpenAPI()// 接口文档标题.info(new Info().title("接口文档")// 接口文档简介.description("RabbitMq测试文档")// 接口文档版本.version("v1.0")// 开发者联系方式.contact(new Contact().name("luckily").email("3298244978@qq.com")));}}

        5. 新建主启动类

@Log4j2
@SpringBootApplication
public class RabbitMqDemoApplication {public static void main(String[] args) {SpringApplication app = new SpringApplication(RabbitMqDemoApplication.class);Environment env = app.run(args).getEnvironment();app.setBannerMode(Banner.Mode.CONSOLE);logApplicationStartup(env);}private static void logApplicationStartup(Environment env) {String protocol = "http";if (env.getProperty("server.ssl.key-store") != null) {protocol = "https";}String serverPort = env.getProperty("server.port");String contextPath = env.getProperty("server.servlet.context-path");if (StringUtils.isBlank(contextPath)) {contextPath = "/doc.html";} else {contextPath = contextPath + "/doc.html";}String hostAddress = "localhost";try {hostAddress = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {log.warn("The host name could not be determined, using `localhost` as fallback");}log.info("""----------------------------------------------------------\t应用程序“{}”正在运行中......\t接口文档访问 URL:\t本地: \t\t{}://localhost:{}{}\t外部: \t{}://{}:{}{}\t配置文件: \t{}----------------------------------------------------------""",env.getProperty("spring.application.name"),protocol,serverPort,contextPath,protocol,hostAddress,serverPort,contextPath,env.getActiveProfiles());}}

2. 开始写RabbitMq代码

代码架构

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wugMpbMe-1630999921193)(D:\学习资料\图片\image-20210902150742461.png)]

代码实现

配置类代码

/*
* TTL队列 配置文件类代码
*
* */
@Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String  X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码

 /** 发送延迟消息* */@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}
}

消费者

/** 队列TTL 消费者* */
@Slf4j
@Component
public class DeadLetterQueueConsumer {//接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

效果:

 3. 队列优化

问题:

第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求

代码架构图

增加一个队列QC实现动态延时

实现

配置文件类

 /** TTL队列 配置文件类代码** */@Configurationpublic class TtlQueueConfig {//普通交换机的名称public static final String  X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列的名称public static final String DEAD_LATTER_QUEUE = "QD";//声明QC队列@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable().withArguments(arguments).build();}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列 TTL为40s@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信Routing-keyarguments.put("x-dead-letter-routing-key","YD");//设置TTL 单位是msarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();}//绑定@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定@Beanpublic Binding queueDBindingX(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

生产者

/** 发送延迟消息* */
@Slf4j
@Tag(name = "Rabbitmq相关 API", description = "ttl API")
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/sendMsg/{message}")@Operation(summary = "Rabbitmq发送消息")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列:" + message);rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列:" + message);}//开始发消息@GetMapping("sendExpirationMsg/{message}/{ttlTime}")@Operation(summary = "Rabbitmq发送消息与延迟时间")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg->{//发送消息的时候 延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}

消费者

消费者代码不改变

效果:本来消息2是延迟2秒,消息1延迟20秒,消息2似乎要比消息1更早接收,但因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,这是队列特性

怎么弥补这个缺陷,需要用到Rabbitmq的插件实现延迟队列

4. 插件实现延迟队列

我们之前延迟消息是在队列进行延迟,安装插件之后是在交换机进行延迟

  • 下载延迟插件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
  • 将延迟插件放到RabbitMQ的插件目录下:由于我通过docker容器安装的rabbitmq,所以我将安装包先通过xftp发送给主机,在通过docker命令给容器,进入容器安装

复制给容器

进入容器

启动插件

重启容器

docker restart 7af

网页端出现以下就表示成功安装插件

实战

配置文件类

 @Configurationpublic class DelayedQueueConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);};//声明交换机@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

 /** 发送延迟消息* */@Slf4j@RestController@RequestMapping("/ttl")public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息 基于插件的 消息 及 延迟的时间@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {// 发送消息的时候 延迟时长 单位msmsg.getMessageProperties().setDelay(delayTime);return msg;});}}

消费者

 // 消费者代码 基于插件的延迟消息@Slf4j@Componentpublic class DelayQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void recieveDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}
}

效果

成功!

5. 总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用
RabbitMQ.的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis.的zsset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景

http://www.dtcms.com/wzjs/449135.html

相关文章:

  • 网站建设 云计算广告软文案例
  • 临沂网站设计建设广州线下教学
  • 娄底建设局官方网站aso榜单优化
  • 网站做毕业设计可靠吗关键词林俊杰mp3
  • 做网站前景怎么样石家庄网络关键词排名
  • 嘉兴网站建设技术托管网络推广公司排名
  • 低价网站制作汕头seo不错
  • 玩网页游戏的网站品牌营销公司
  • 策划书怎么写优化防控措施
  • 如何创建一个新网站谷歌排名算法
  • 政府网站建设年度报告高质量关键词搜索排名
  • 江阴便宜做网站引流人脉推广软件
  • 网站后台卸载cmsdede2023最新15件重大新闻
  • 深圳做宣传网站的公司百度投放广告流程
  • 我想做个网站怎么做建立网站的流程
  • 昆明网站建设首选外贸网站推广的方法
  • 东莞长安网站开发公司nba最新排行
  • 做商城网站的流程介绍企业网站建设平台
  • 网站建设深圳公司哪家好搜索seo神器
  • 成都专业手机网站建设服务什么广告推广最有效果
  • 网络营销导向企业网站建设的一般原则免费的舆情网站app
  • html5 电商网站模板北京seo
  • 做高考题的网站搜索引擎有哪几个网站
  • 石家庄站内换乘图解百度手机版下载
  • 禄劝彝族苗族网站建设福州seo结算
  • 专业的聊城网站建设青岛做网站的公司哪家好
  • 吉首做网站百度seo关键词排名
  • 做网站的专业叫啥今日国内新闻10则
  • 沃尔玛的网站建设泉州seo优化
  • 网站seo优化要怎么做知乎关键词优化软件