Redis 延迟队列
延时队列的实现可以利用Redis的有序集合(zset),其中消息被序列化为字符串并存储为zset的成员(member)。消息的到期时间被设置为zset的分数(score)。通过多个线程轮询zset以检索并处理到期的任务,保证了系统的高可用性,即使某个线程失败,其他线程也能接管处理。然而,由于存在多个线程,必须确保并发访问时任务不会被多次执行,这可能需要实现适当的并发控制机制。
一个简单的延迟队列示例:
import redis
import time
import uuid
import threading
import random# 链接 redis
r = redis.Redis(host='localhost', port=6379, db=0)def handle_task(task_id):print('handle task', task_id)def process_delay_queue():
# 开启线程后,循环去查询是否有任务存在,有则取出执行,这里有个特殊的点,就是zrem操作while True:task_list = r.zrangebyscore('delay_queue', 0, time.time())if not task_list:# 没有任务print('cost 1s')time.sleep(1)continue# 队列中有任务时,取出首个任务,然后这里用redis里的zrem# 如果将其删除后则返回 true, 若删除失败(也就是已经被删除过了,其他线程在来删除的会肯定会失败),则返回false# 这里能够保证一个任务只能被执行一次task_id = task_list[0]task = r.zrem('delay_queue', task_id) # 删除任务if task: # 防止多线程同时执行任务handle_task(task_id)if __name__ == '__main__':t = threading.Thread(target=process_delay_queue)t.start() # 开启线程for i in range(10):task_id = str(uuid.uuid4()) # 创建任务delay_time = random.randint(10,20) # 延时时间r.zadd('delay_queue', {task_id: time.time() + delay_time}) # 将任务添加到队列中
Redis的
ZREM
方法在多线程或多进程环境中处理任务争抢时扮演着关键角色。它的返回值表明当前实例是否成功抢到任务。由于process_delay_queue
方法可能由多个线程和进程调用,同一任务可能被多个进程或线程尝试获取。通过使用ZREM
,可以确保每个任务只有一个唯一的处理者。此外,确保对handle_task
方法进行异常捕获是至关重要的,以防止因个别任务处理异常导致整个循环异常退出,从而维护系统的稳定性和可靠性。