当前位置: 首页 > news >正文

Redis(⑤-线程池隔离)

代码

import redis
import time
from concurrent.futures import ThreadPoolExecutor
import threading# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)# 1. 定义两个隔离的线程池(不同业务用不同线程池)
# 业务A:商品查询(分配5个线程)
query_pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix="query_")
# 业务B:下单(分配3个线程)
order_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="order_")# 2. 业务函数:查询商品(模拟正常操作)
def query_product(product_id):# 用Redis记录当前线程和任务状态thread_name = threading.current_thread().namer.hset(f"task:status:{product_id}", mapping={"thread": thread_name,"status": "processing","time": time.time()})# 模拟查询操作(访问Redis获取商品信息)product_info = r.get(f"product:{product_id}") or f"Product {product_id} info"time.sleep(0.1)  # 模拟耗时# 更新任务状态为完成r.hset(f"task:status:{product_id}", "status", "completed")return f"Query {product_id}: {product_info}"# 3. 业务函数:下单(可能出现阻塞)
def create_order(order_id):thread_name = threading.current_thread().namer.hset(f"task:status:order_{order_id}", mapping={"thread": thread_name,"status": "processing","time": time.time()})# 模拟下单操作(可能因Redis超时阻塞)try:# 模拟Redis操作(这里故意加随机阻塞,模拟故障)if order_id % 5 == 0:time.sleep(5)  # 每5个订单模拟一次超时r.set(f"order:{order_id}", f"Order {order_id} created")except Exception as e:r.hset(f"task:status:order_{order_id}", "status", f"failed: {str(e)}")return f"Order {order_id} failed: {str(e)}"r.hset(f"task:status:order_{order_id}", "status", "completed")return f"Order {order_id} created successfully"# 4. 模拟高并发请求
def simulate_traffic():# 初始化一些商品数据到Redisfor i in range(10):r.set(f"product:{i}", f"Product {i} - Price ${i*10}")# 提交大量商品查询任务(用query_pool)for i in range(20):query_pool.submit(lambda pid: print(query_product(pid)), i)# 提交一批下单任务(用order_pool,包含可能阻塞的任务)for i in range(10):order_pool.submit(lambda oid: print(create_order(oid)), i)# 等待所有任务完成query_pool.shutdown(wait=True)order_pool.shutdown(wait=True)print("所有任务处理完毕")if __name__ == "__main__":simulate_traffic()# 可以在Redis中查看任务状态:HGETALL task:status:xxx

核心隔离逻辑(对应代码思路)

操作含义对应代码中的线程池隔离思想
用 query:* 和 order:* 区分键不同业务用独立的 Redis 键空间不同业务用独立的线程池资源
业务 B 阻塞时业务 A 正常运行键空间隔离,互不干扰线程池隔离,一个业务阻塞不影响其他业务
HSET xxx thread "xxx"记录任务所属 “线程”线程池记录任务由哪个线程处理

两个业务(“商品库存查询” 和 “商品库存扣减”)操作同一个 Redis 键stock:1001,代表商品 1001 的库存),但使用独立线程池,即使其中一个业务线程拥堵,另一个仍能正常运行。

共享的 Redis 键:stock:1001(值为 100,代表商品 1001 的库存)。
业务 A(查询库存):用线程池 A(5 个线程),每次操作GET stock:1001,耗时 0.1 秒(快速)。
业务 B(扣减库存):用线程池 B(3 个线程),每次操作DECR stock:1001,但每 10 次操作会随机阻塞 5 秒(模拟故障)。

两个业务操作同一个 Redis 键,但因为用了独立的线程池,线程资源互不共享:
业务 B 的线程拥堵(线程池 B 耗尽),只会影响自身的任务处理。
业务 A 的线程池 A 仍有空闲线程,能正常处理查询任务。

极简的线程池隔离示例,用两个独立线程池处理不同任务

核心隔离点:
创建了两个完全独立的线程池 fast_pool(3 线程)和 slow_pool(2 线程),分别处理不同任务。
运行现象:
慢速任务中,任务 0 会阻塞 5 秒,占用 slow_pool 的 1 个线程
此时 slow_pool 只剩 1 个空闲线程,新的慢速任务需要排队
但 fast_pool 的 3 个线程不受影响,5 个快速任务会快速执行完毕
隔离效果:
即使慢速任务线程池因阻塞被占满,快速任务线程池仍能正常工作,两者互不干扰。

import time
from concurrent.futures import ThreadPoolExecutor# 1. 创建两个隔离的线程池
# 线程池A:处理快速任务(3个线程)
fast_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="fast_")
# 线程池B:处理可能阻塞的任务(2个线程)
slow_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="slow_")# 2. 快速任务(模拟正常业务)
def fast_task(task_id):print(f"快速任务{task_id} 开始(线程:{threading.current_thread().name})")time.sleep(0.5)  # 快速处理print(f"快速任务{task_id} 完成")return task_id# 3. 慢速任务(模拟可能阻塞的业务)
def slow_task(task_id):print(f"慢速任务{task_id} 开始(线程:{threading.current_thread().name})")# 每3个任务模拟一次阻塞if task_id % 3 == 0:time.sleep(5)  # 长时间阻塞else:time.sleep(1)print(f"慢速任务{task_id} 完成")return task_id# 4. 模拟并发请求
if __name__ == "__main__":import threading  # 局部导入,避免命名冲突# 提交5个快速任务到fast_poolfor i in range(5):fast_pool.submit(fast_task, i)# 提交4个慢速任务到slow_pool(其中任务0和3会阻塞5秒)for i in range(4):slow_pool.submit(slow_task, i)# 等待所有任务完成fast_pool.shutdown(wait=True)slow_pool.shutdown(wait=True)print("所有任务处理完毕")

运行后可以看到,快速任务会全部快速完成,而慢速任务会因为阻塞出现排队,但不会影响快速任务的执行节奏 —— 这就是线程池隔离的核心作用。

http://www.dtcms.com/a/321808.html

相关文章:

  • javaSE(基础):5.抽象类和接口
  • C+++——内存管理
  • 大语言模型提示工程与应用:提示工程入门指南
  • 前端后端之争?JavaScript和Java的特性与应用场景解析
  • 大型语言模型幻觉检测与缓解技术研究综述
  • 将Django项目部署到Vercel平台的完整指南
  • Spring Boot 常用注解及其功能详解
  • Numpy科学计算与数据分析:Numpy高效数据处理与优化
  • 第七章:数据持久化 —— `chrome.storage` 的记忆魔法
  • bytearray和bytes
  • 解决flex元素内部文本溢出的问题min-width: 0
  • Pytest项目_day08(setup、teardown前置后置操作)
  • 树和二叉树和算法复杂度
  • 这款MEMS组合导航系统如何实现高性价比?
  • SVM实战:从线性可分到高维映射再到实战演练
  • 智能对讲机是什么?原理、优势、应用场景、发展趋势详解
  • 前端老项目依赖安全漏洞解决
  • 【LLM实战|langchain、qwen_agent】RAG高级
  • 888. 公平的糖果交换
  • YOLO-Count:用于文本到图像生成的可微分目标计数
  • 智慧公厕自动清洁空气环境,节省门店运营成本
  • 什么是SSL证书颁发机构?
  • 北斗变形监测技术应用与案例分析
  • SVM算法实战应用
  • 【开源工具】网络交换机批量配置生成工具开发全解:从原理到实战(附完整Python源码)
  • C++ 标准库容器常用成员函数
  • 04--模板初阶(了解)
  • 【Linux】从零开始:RPM 打包全流程实战万字指南(含目录结构、spec 编写、分步调试)
  • 【探展WAIC】从“眼见为虚”到“AI识真”:如何用大模型筑造多模态鉴伪盾牌
  • 惯量时间常数 H 与转动惯量 J 的关系解析