Redis(⑤-线程池隔离)
代码
import redis
import time
from concurrent.futures import ThreadPoolExecutor
import threading# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)# 1. 定义两个隔离的线程池(不同业务用不同线程池)
# 业务A:商品查询(分配5个线程)
query_pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix="query_")
# 业务B:下单(分配3个线程)
order_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="order_")# 2. 业务函数:查询商品(模拟正常操作)
def query_product(product_id):# 用Redis记录当前线程和任务状态thread_name = threading.current_thread().namer.hset(f"task:status:{product_id}", mapping={"thread": thread_name,"status": "processing","time": time.time()})# 模拟查询操作(访问Redis获取商品信息)product_info = r.get(f"product:{product_id}") or f"Product {product_id} info"time.sleep(0.1) # 模拟耗时# 更新任务状态为完成r.hset(f"task:status:{product_id}", "status", "completed")return f"Query {product_id}: {product_info}"# 3. 业务函数:下单(可能出现阻塞)
def create_order(order_id):thread_name = threading.current_thread().namer.hset(f"task:status:order_{order_id}", mapping={"thread": thread_name,"status": "processing","time": time.time()})# 模拟下单操作(可能因Redis超时阻塞)try:# 模拟Redis操作(这里故意加随机阻塞,模拟故障)if order_id % 5 == 0:time.sleep(5) # 每5个订单模拟一次超时r.set(f"order:{order_id}", f"Order {order_id} created")except Exception as e:r.hset(f"task:status:order_{order_id}", "status", f"failed: {str(e)}")return f"Order {order_id} failed: {str(e)}"r.hset(f"task:status:order_{order_id}", "status", "completed")return f"Order {order_id} created successfully"# 4. 模拟高并发请求
def simulate_traffic():# 初始化一些商品数据到Redisfor i in range(10):r.set(f"product:{i}", f"Product {i} - Price ${i*10}")# 提交大量商品查询任务(用query_pool)for i in range(20):query_pool.submit(lambda pid: print(query_product(pid)), i)# 提交一批下单任务(用order_pool,包含可能阻塞的任务)for i in range(10):order_pool.submit(lambda oid: print(create_order(oid)), i)# 等待所有任务完成query_pool.shutdown(wait=True)order_pool.shutdown(wait=True)print("所有任务处理完毕")if __name__ == "__main__":simulate_traffic()# 可以在Redis中查看任务状态:HGETALL task:status:xxx
核心隔离逻辑(对应代码思路)
操作 | 含义 | 对应代码中的线程池隔离思想 |
---|---|---|
用 query:* 和 order:* 区分键 | 不同业务用独立的 Redis 键空间 | 不同业务用独立的线程池资源 |
业务 B 阻塞时业务 A 正常运行 | 键空间隔离,互不干扰 | 线程池隔离,一个业务阻塞不影响其他业务 |
HSET xxx thread "xxx" | 记录任务所属 “线程” | 线程池记录任务由哪个线程处理 |
两个业务(“商品库存查询” 和 “商品库存扣减”)操作同一个 Redis 键(stock:1001
,代表商品 1001 的库存),但使用独立线程池,即使其中一个业务线程拥堵,另一个仍能正常运行。
共享的 Redis 键:stock:1001(值为 100,代表商品 1001 的库存)。
业务 A(查询库存):用线程池 A(5 个线程),每次操作GET stock:1001,耗时 0.1 秒(快速)。
业务 B(扣减库存):用线程池 B(3 个线程),每次操作DECR stock:1001,但每 10 次操作会随机阻塞 5 秒(模拟故障)。
两个业务操作同一个 Redis 键,但因为用了独立的线程池,线程资源互不共享:
业务 B 的线程拥堵(线程池 B 耗尽),只会影响自身的任务处理。
业务 A 的线程池 A 仍有空闲线程,能正常处理查询任务。
极简的线程池隔离示例,用两个独立线程池处理不同任务
核心隔离点:
创建了两个完全独立的线程池 fast_pool(3 线程)和 slow_pool(2 线程),分别处理不同任务。
运行现象:
慢速任务中,任务 0 会阻塞 5 秒,占用 slow_pool 的 1 个线程
此时 slow_pool 只剩 1 个空闲线程,新的慢速任务需要排队
但 fast_pool 的 3 个线程不受影响,5 个快速任务会快速执行完毕
隔离效果:
即使慢速任务线程池因阻塞被占满,快速任务线程池仍能正常工作,两者互不干扰。
import time
from concurrent.futures import ThreadPoolExecutor# 1. 创建两个隔离的线程池
# 线程池A:处理快速任务(3个线程)
fast_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="fast_")
# 线程池B:处理可能阻塞的任务(2个线程)
slow_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="slow_")# 2. 快速任务(模拟正常业务)
def fast_task(task_id):print(f"快速任务{task_id} 开始(线程:{threading.current_thread().name})")time.sleep(0.5) # 快速处理print(f"快速任务{task_id} 完成")return task_id# 3. 慢速任务(模拟可能阻塞的业务)
def slow_task(task_id):print(f"慢速任务{task_id} 开始(线程:{threading.current_thread().name})")# 每3个任务模拟一次阻塞if task_id % 3 == 0:time.sleep(5) # 长时间阻塞else:time.sleep(1)print(f"慢速任务{task_id} 完成")return task_id# 4. 模拟并发请求
if __name__ == "__main__":import threading # 局部导入,避免命名冲突# 提交5个快速任务到fast_poolfor i in range(5):fast_pool.submit(fast_task, i)# 提交4个慢速任务到slow_pool(其中任务0和3会阻塞5秒)for i in range(4):slow_pool.submit(slow_task, i)# 等待所有任务完成fast_pool.shutdown(wait=True)slow_pool.shutdown(wait=True)print("所有任务处理完毕")
运行后可以看到,快速任务会全部快速完成,而慢速任务会因为阻塞出现排队,但不会影响快速任务的执行节奏 —— 这就是线程池隔离的核心作用。