高级特性实战:死信队列、延迟队列与优先级队列(二)
三、延迟队列:实现任务定时执行
3.1 延迟队列概念解析
延迟队列(Delay Queue),是一种特殊的队列,它的独特之处在于队列中的元素(消息)并不会立即被处理,而是会在指定的延迟时间过后,才会被消费者取出并进行处理。简单来说,延迟队列就像是一个时间控制的消息容器,能够精确地按照设定的时间规则来释放消息,实现任务的定时执行。
延迟队列中的每个元素都关联了一个延迟时间,这个时间决定了元素在队列中的等待时长。在延迟时间未到达之前,元素会一直处于队列中等待,不会被消费。只有当延迟时间到期,元素才会被视为 “就绪” 状态,被投递到消费者进行处理 。这种特性使得延迟队列在很多对时间有精确要求的场景中发挥着重要作用,比如电商平台中的限时优惠活动、物流系统中的货物配送时间管理等。
3.2 延迟队列应用场景
延迟队列在实际业务中有广泛的应用场景,以下是一些常见的例子:
- 定时任务执行:在很多系统中,都需要执行一些定时任务,如每天凌晨执行数据备份、每周一发送周报邮件等。使用延迟队列,可以将这些定时任务封装成消息,设置好延迟时间,放入延迟队列中。当延迟时间到达时,任务消息就会被取出执行,实现了定时任务的自动化处理,避免了使用复杂的定时任务框架。
- 电商订单未支付超时处理:在电商购物流程中,当用户下单后,如果在规定的时间内(比如 30 分钟)未完成支付,订单需要被自动取消。利用延迟队列,在用户下单后,将订单信息作为消息放入延迟队列,并设置延迟时间为 30 分钟。30 分钟后,消息从延迟队列中被取出,系统可以检查订单状态,如果仍未支付,则自动取消订单,释放库存,保证了库存的有效管理和订单的正常流转。
- 用户注册后未激活提醒:当用户注册新账号后,可能需要在一定时间内进行激活操作。如果用户在规定时间(如 24 小时)内未激活,系统可以通过延迟队列发送提醒邮件或短信。将用户注册信息和激活提醒任务封装成消息,设置 24 小时的延迟时间放入延迟队列。24 小时后,消息被取出,系统自动发送提醒通知,提高了用户激活率。
3.3 延迟队列实现方式
3.3.1 使用 RabbitMQ 的 TTL 和死信队列实现延迟队列
RabbitMQ 本身并没有直接提供延迟队列的功能,但我们可以巧妙地利用它的两个特性:消息的过期时间(Time-To-Live,TTL)和死信队列(Dead Letter Queue)来实现延迟队列。
其实现原理如下:首先,我们创建一个普通队列,并为该队列设置一个死信交换机(Dead Letter Exchange)和死信路由键(Dead Letter Routing Key)。同时,我们可以为发送到该队列的消息设置 TTL,或者直接为队列设置 TTL。当消息在队列中停留的时间超过了 TTL 值时,消息就会成为死信。由于我们之前配置了死信交换机和路由键,这些死信会被发送到指定的死信队列中。而消费者只需要监听死信队列,就可以获取到这些延迟处理的消息,从而实现了延迟队列的功能。
下面是一个使用 RabbitMQ 的 Java 客户端实现延迟队列的示例代码:
首先,引入 RabbitMQ 的 Java 客户端依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
然后,编写配置类,声明队列、交换机和绑定关系:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String DELAY_QUEUE = "delay_queue";
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static final String ROUTING_KEY = "routing_key";
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", ROUTING_KEY);
// 设置队列的TTL为10秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_QUEUE).withArguments(args).build();
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTING_KEY);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);
}
}
接着,编写生产者代码,向延迟队列发送消息:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.ROUTING_KEY, message);
System.out.println("Sent message: " + message);
}
}
最后,编写消费者代码,从死信队列中接收延迟处理的消息:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void receive(String message) {
System.out.println("Received message: " + message);
}
}
在上述代码中,我们通过配置delayQueue的x-message-ttl参数设置了队列的 TTL 为 10 秒。生产者将消息发送到delay_exchange,经过 10 秒延迟后,消息进入dead_letter_queue,消费者从dead_letter_queue中接收并处理消息,实现了延迟队列的功能。
3.3.2 基于 JDK 的 DelayQueue 实现延迟队列
JDK 提供了DelayQueue类,它是一个无界的阻塞队列,用于存放实现了Delayed接口的对象。DelayQueue中的元素只有在其延迟时间到期时才能从队列中取出,非常适合用于实现延迟队列。
Delayed接口继承自Comparable接口,实现Delayed接口需要实现getDelay和compareTo方法。getDelay方法用于获取元素的剩余延迟时间,compareTo方法用于比较元素的延迟时间,以确定队列中元素的顺序。
以下是一个基于DelayQueue实现延迟队列的简单示例:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 添加延迟任务,延迟3秒执行
delayQueue.offer(new DelayedTask(3000, "Task 1"));
// 添加延迟任务,延迟5秒执行
delayQueue.offer(new DelayedTask(5000, "Task 2"));
new Thread(() -> {
while (true) {
try {
DelayedTask task = delayQueue.take();
System.out.println("Executing task: " + task.getMessage() + " at " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
static class DelayedTask implements Delayed {
private final long delayTime;
private final String message;
private final long expire;
public DelayedTask(long delayTime, String message) {
this.delayTime = delayTime;
this.message = message;
this.expire = System.currentTimeMillis() + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
long diff = this.expire - ((DelayedTask) other).expire;
if (diff == 0) {
return 0;
} else if (diff < 0) {
return -1;
} else {
return 1;
}
}
public String getMessage() {
return message;
}
}
}
在这个示例中,DelayedTask类实现了Delayed接口,并重写了getDelay和compareTo方法。DelayQueueExample类中创建了一个DelayQueue,并向其中添加了两个延迟任务。主线程启动一个新线程,不断从DelayQueue中取出到期的任务并执行,实现了简单的延迟队列功能。
3.3.3 利用 Redis 的有序集合 ZSet 实现延迟队列
Redis 的有序集合(Sorted Set,简称 ZSet)是一种非常适合实现延迟队列的数据结构。在 ZSet 中,每个元素都关联一个分数(score),集合会根据分数对元素进行排序。我们可以利用这一特性,将消息的执行时间作为分数,消息内容作为元素,实现延迟队列。
实现原理如下:生产者将消息及其延迟时间(转换为时间戳作为分数)添加到 ZSet 中。消费者通过不断轮询 ZSet,获取当前时间戳之前的元素(即延迟时间已到的消息),并进行处理。处理完成后,将该元素从 ZSet 中删除。
以下是一个使用 Redis 的 Java 客户端 Jedis 实现延迟队列的简单示例代码:
首先,引入 Jedis 依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
然后,编写生产者代码,向 ZSet 中添加延迟消息:
import redis.clients.jedis.Jedis;
public class RedisProducer {
private static final String DELAY_QUEUE_KEY = "delay_queue";
public static void main(String[] args) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
// 发送延迟消息,延迟5秒
long delayTime = System.currentTimeMillis() + 5000;
jedis.zadd(DELAY_QUEUE_KEY, delayTime, "Message 1");
System.out.println("Sent message to Redis delay queue.");
}
}
}
接着,编写消费者代码,从 ZSet 中获取并处理延迟消息:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Set;
public class RedisConsumer {
private static final String DELAY_QUEUE_KEY = "delay_queue";
public static void main(String[] args) {
new Thread(() -> {
try (Jedis jedis = new Jedis("localhost", 6379)) {
while (true) {
long currentTime = System.currentTimeMillis();
// 获取当前时间之前的消息
Set<Tuple> messages = jedis.zrangeByScoreWithScores(DELAY_QUEUE_KEY, 0, currentTime);
for (Tuple message : messages) {
System.out.println("Received message: " + message.getElement());
// 处理完消息后,从ZSet中删除
jedis.zrem(DELAY_QUEUE_KEY, message.getElement());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
在上述代码中,RedisProducer将带有延迟时间的消息添加到 Redis 的 ZSet 中,RedisConsumer通过不断轮询 ZSet,获取并处理延迟时间已到的消息,实现了基于 Redis ZSet 的延迟队列功能。