当前位置: 首页 > wzjs >正文

做网站济南西网站后台开发技术

做网站济南西,网站后台开发技术,建站哪家技术好,国外的有名的网站什么是延迟队列 定义:延迟队列是一种特殊的队列,队列中的元素(消息)并不会立即被消费者获取并处理,而是在经过一段指定的延迟时间后,才会被消费者消费。它主要用于需要在特定时间点或经过一定时间间隔后执…

画板

什么是延迟队列

  • 定义:延迟队列是一种特殊的队列,队列中的元素(消息)并不会立即被消费者获取并处理,而是在经过一段指定的延迟时间后,才会被消费者消费。它主要用于需要在特定时间点或经过一定时间间隔后执行的任务场景。

RabbitMQ实现延迟队列的方法

利用消息的TTL(Time-To-Live)和死信队列(DLQ)组合

  • 原理:为队列或消息设置TTL,当消息在队列中存活时间超过TTL值,就会变成死信。将该队列与死信交换机绑定,死信交换机再将死信路由到另一个队列(即延迟队列),消费者从这个延迟队列中获取消息进行处理。
  • 代码
1. 定义 RabbitMQ 相关配置,声明普通队列并配置TTL和死信交换机
public class RabbitMQConfig {// 延迟队列public static final String DELAY_QUEUE = "delay.queue";// 延迟交换机public static final String DELAY_EXCHANGE = "delay.exchange";// 延迟路由键public static final String DELAY_ROUTING_KEY = "delay.routing.key";// 死信队列public static final String DLX_QUEUE = "dlx.queue";// 死信交换机public static final String DLX_EXCHANGE = "dlx.exchange";// 死信路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";// 创建连接public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxx");    // 设置RabbitMQ服务器地址factory.setPort(5672);           // 设置RabbitMQ服务器端口factory.setUsername("admin");    // 设置用户名factory.setPassword("admin");    // 设置密码return factory.newConnection();}// 初始化队列和交换机public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 声明死信队列channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 绑定死信队列和交换机channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 声明延迟交换机channel.exchangeDeclare(DELAY_EXCHANGE, "direct", true);// 声明延迟队列,并设置死信参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);channel.queueDeclare(DELAY_QUEUE, true, false, false, args);// 绑定延迟队列和交换机channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);}}
} 
2. 生产者和消费者
// 消息发送者
@Slf4j
public class MessageProducer {public void sendMessage(String message, int delayTime) throws Exception {try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {log.info("发送延迟消息: {}, 延迟时间: {}ms", message, delayTime);// 设置消息的过期时间AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayTime)).build();channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE,RabbitMQConfig.DELAY_ROUTING_KEY,properties,message.getBytes());}}
} // 消息 消费者
@Slf4j
public class MessageConsumer {public void consumeDelayQueue() throws Exception {Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到延迟消息: {}", message);// 确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费死信队列(实际处理延迟消息的队列)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
} 
3. 测试
@Slf4j
public class DelayQueueTest {public static void main(String[] args) throws Exception {// 初始化队列和交换机RabbitMQConfig.init();// 创建生产者和消费者MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动消费者线程new Thread(() -> {try {consumer.consumeDelayQueue();} catch (Exception e) {e.printStackTrace();}}).start();// 发送不同延迟时间的消息producer.sendMessage("延迟5秒的消息", 5000);producer.sendMessage("延迟10秒的消息", 10000);producer.sendMessage("延迟15秒的消息", 15000);log.info("所有消息已发送,等待延迟处理...");// 保持程序运行Thread.sleep(20000);}
} 

从时间便可以看出消息是延迟消费了。

使用插件(rabbitmq-delay-message-exchange插件)

  • 原理:该插件提供了一种更直接的方式来实现延迟队列。它在RabbitMQ中添加了一个自定义的交换机类型(如x-delay-message),生产者可以直接在发送消息时指定延迟时间,消息会在指定延迟时间后被路由到绑定的队列。

  • 实现步骤
    • 插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
    • 安装插件:选择适配的版本,在将rabbitmq-delay-message-exchange插件下载并安装到RabbitMQ服务器上,然后启用插件。

消息流转过程如下:
在这里插入图片描述

  • 代码
RabbitMQ 相关配置,定义延迟队列
public class RabbitMQConfig {// 延迟队列public static final String DELAY_QUEUE = "plugin.delay.queue";// 延迟交换机public static final String DELAY_EXCHANGE = "plugin.delay.exchange";// 延迟路由键public static final String DELAY_ROUTING_KEY = "plugin.delay.routing.key";// 创建连接public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxxxx");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");return factory.newConnection();}// 初始化队列和交换机public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 声明延迟交换机(使用x-delayed-message类型)Map<String, Object> args = new java.util.HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare(DELAY_EXCHANGE, "x-delayed-message", true, false, args);// 声明延迟队列channel.queueDeclare(DELAY_QUEUE, true, false, false, null);// 绑定延迟队列和交换机channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);}}
} 
生产者和消费者
// 生产者
@Slf4j
public class MessageProducer {public void sendMessage(String message, int delayTime) throws Exception {try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {log.info("发送延迟消息: {}, 延迟时间: {}ms", message, delayTime);// 设置消息的延迟时间AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(new java.util.HashMap<String, Object>() {{put("x-delay", delayTime);}}).build();channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE,RabbitMQConfig.DELAY_ROUTING_KEY,properties,message.getBytes());}}
} // 消费者
@Slf4j
public class MessageConsumer {public void consumeDelayQueue() throws Exception {Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到延迟消息: {}", message);// 确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费延迟队列channel.basicConsume(RabbitMQConfig.DELAY_QUEUE, false, deliverCallback, consumerTag -> {});}
} 
测试
@Slf4j
public class DelayQueuePluginTest {public static void main(String[] args) throws Exception {// 初始化队列和交换机RabbitMQConfig.init();// 创建生产者和消费者MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动消费者线程new Thread(() -> {try {consumer.consumeDelayQueue();} catch (Exception e) {log.error("消费者异常", e);}}).start();// 发送不同延迟时间的消息producer.sendMessage("延迟5秒的消息", 5000);producer.sendMessage("延迟10秒的消息", 10000);producer.sendMessage("延迟15秒的消息", 15000);log.info("所有消息已发送,等待延迟处理...");// 保持程序运行Thread.sleep(20000);}
} 

同样可以达到延迟消费的效果。

从管理台可以看到已创建 type=x-delayed-message 的交换机

使用场景

延迟队列的使用场景有很多,主要能够有效解决需要在特定时间或经过一定延迟后执行任务的需求。下面列举一些使用场景:

  • 订单超时处理:用户下单后,订单进入延迟队列,设置一定的延迟时间(如30分钟)。若在该时间内用户未完成支付,消息从延迟队列中被消费,系统自动取消订单并释放库存。这保证了库存的合理利用,避免资源浪费。防止用户长时间占用库存而不付款,影响其他用户购买。
  • 退款处理:当用户发起退款申请后,退款请求进入延迟队列。经过一定时间(如72小时),系统检查该订单是否有新的状态变化(如商家拒绝退款、用户撤销申请等)。若无变化,则自动执行退款操作,提高退款处理效率,减少人工干预。
  • 还款提醒:在还款日前几天,将还款提醒消息放入延迟队列,根据不同用户设置不同的延迟时间。到了设定时间,系统从队列中取出消息,向用户发送还款提醒,帮助用户避免逾期还款,减少逾期风险。
  • 转账确认超时处理:在进行跨行转账等操作时,转账请求进入延迟队列。若在规定时间(如24小时)内未收到对方银行的确认信息,消息被消费,系统自动回滚转账操作,并通知用户转账失败,保障资金安全和交易的准确性。
  • 包裹逾期未取提醒:当包裹到达配送点一定时间(如3天)后仍未被取走,将提醒消息放入延迟队列。延迟时间到达后,系统自动发送提醒通知给收件人,提醒其尽快取件,提高包裹周转效率,减少配送点的存储压力。
  • 配送延迟预警:根据物流运输的预计时间,将预警消息放入延迟队列。如果在预计到达时间前,包裹状态仍未更新为已送达,消息被消费,系统向相关人员(如配送员、客服、收件人)发送配送延迟预警,便于及时沟通和处理。

文章转载自:

http://F05be3lU.xczyj.cn
http://lcwgfQrm.xczyj.cn
http://Ac3GmFvw.xczyj.cn
http://UDnFRnBp.xczyj.cn
http://B6DLP2Mr.xczyj.cn
http://BCdIDoHz.xczyj.cn
http://QeSNp4qb.xczyj.cn
http://uJ08JWmX.xczyj.cn
http://NjxeJHuG.xczyj.cn
http://4TBc6Ov4.xczyj.cn
http://ztniILNn.xczyj.cn
http://h4t3YnU8.xczyj.cn
http://w0R4nIgt.xczyj.cn
http://K14aTVCM.xczyj.cn
http://8SVONz3Y.xczyj.cn
http://EQF1YUAb.xczyj.cn
http://e6NTSlJU.xczyj.cn
http://SiLS2Crn.xczyj.cn
http://ypem4C3c.xczyj.cn
http://cPqzF2Pc.xczyj.cn
http://f6e1iAVP.xczyj.cn
http://9ppEOKA6.xczyj.cn
http://u1XUMCMc.xczyj.cn
http://NaZfV5yV.xczyj.cn
http://bGWPbBZ7.xczyj.cn
http://q8GlFvtz.xczyj.cn
http://M95jvF8M.xczyj.cn
http://RJ0n3nFq.xczyj.cn
http://XQlArqgk.xczyj.cn
http://qFWazM9i.xczyj.cn
http://www.dtcms.com/wzjs/741081.html

相关文章:

  • 衡水哪儿做wap网站比较好的做网站的公司
  • 怎么样建设自己网站淄博营销网站建设服务
  • 北京市朝阳区住房建设网站微信_网站提成方案点做
  • 网站制作完成之后wordpress笑话类模板
  • 做网站前端网址可以自己写吗企业微信和个人微信的区别
  • 惠州网站建设企业服装设计专业有前途吗
  • 淘宝客网站开发平台潍坊模板建站定制
  • 徐州网站建设xlec徐州网站建设的特点
  • 万荣做网站中英文网站建设用两个域名
  • 机关网站建设费入什么科目去广告店当学徒有用吗
  • 全能网站建设pdf蝴蝶传媒网站推广
  • 珠宝网站源码怎么自学电商运营
  • 沧州网站建设多少钱最好的网站排名优化工作室
  • 微信网站模块推广营销策划方案
  • 网页免费建站职业技能培训中心
  • 做昆特牌的网站php网站开发招聘需求分析
  • 学院网站建设的特色网站建设中啥意思
  • 公司门户网站建设费计入什么科目购物网站服务器带宽
  • 网站改手机版个人网站制作模板
  • 网站做中文和英文切换论坛网站建设用工具软件
  • 自己搭建网站需要什么网件路由器重置
  • 太原网站建设哪家强北京网站备案核验单
  • 福州网站平台建设公司找货源上什么平台最好
  • 泰州网站建设价格网站添加锚点
  • 加盟网站分页怎么做seo辽宁省建设工程信息网官网新系统
  • 自学网站查分数电商o2o是什么意思
  • 标志网广州网站优化渠道
  • 中山 网站制作长春微信做网站
  • 网站导航怎么做外链自我介绍ppt模板
  • 电影网站做静态是不是好一些pc网站设计哪家公司好