当前位置: 首页 > news >正文

Python Day24 多线程编程:核心机制、同步方法与实践案例

一、线程事件对象(threading.Event

threading.Event 用于实现线程间的通信,可让一个线程通知其他线程终止任务,核心是通过 “事件触发” 机制协调线程行为。

核心方法:
  1. 创建事件对象event = threading.Event()
  2. 触发事件event.set()(将事件状态设为 “已触发”)
  3. 判断事件状态event.is_set()(返回 True 表示事件已触发)
案例:动物赛跑(乌龟 vs 兔子)

通过 Event 实现 “一方到达终点则所有线程停止” 的逻辑:

import threading
from threading import Thread, Event, current_thread
from abc import ABC, abstractmethod
import timeclass Animal(Thread, ABC):max_distance = 2000  # 比赛总距离def __init__(self, speed, name, time1=0, state=False, stop_event: Event = None):super().__init__(name=name)self.speed = speed  # 速度self.time1 = time1  # 已跑时间self.state = state  # 是否到达终点self.stop_event = stop_event or Event()  # 共享的停止事件@abstractmethoddef distance(self):# 抽象方法:计算当前跑过的距离passdef run(self):# 循环条件:未收到停止信号while not self.stop_event.is_set():self.time1 += 0.1  # 每次循环增加0.1秒length = round(self.distance())  # 计算当前距离print(f"{self.name}在{self.time1:.1f}秒跑了{length}mm")# 检查是否到达终点if length >= self.max_distance:self.state = Trueprint(f"\n{self.name}到达终点!")self.stop_event.set()  # 通知其他线程停止breaktime.sleep(0.1)  # 模拟时间流逝class Turtle(Animal):def distance(self):# 乌龟匀速前进return self.time1 * self.speedclass Rabbit(Animal):def distance(self):# 兔子逻辑:前5秒快跑,休息180秒,之后继续跑if self.time1 < 5:return self.time1 * self.speed  # 前5秒正常跑elif self.time1 < 5 + 180:return 5 * self.speed  # 5-185秒休息(保持5秒时的距离)else:rest_end_time = 5 + 180  # 休息结束时间extra_time = self.time1 - rest_end_time  # 休息后额外跑的时间return 5 * self.speed + extra_time * self.speed  # 总距离if __name__ == '__main__':race_stop_event = Event()  # 共享的停止事件rabbit = Rabbit(speed=90, name="兔子", stop_event=race_stop_event)turtle = Turtle(speed=10, name="乌龟", stop_event=race_stop_event)print("比赛开始!")rabbit.start()turtle.start()rabbit.join()turtle.join()# 判断结果if rabbit.state:print("兔子赢了!")elif turtle.state:print("乌龟赢了!")

二、线程安全与队列(queue.Queue

多线程共享数据时,列表等结构线程不安全(可能引发并发问题),而 queue.Queue 是线程安全的,常用于多线程通信,遵循 FIFO(先进先出)原则。

常见队列类型:
  • Queue:FIFO 队列
  • PriorityQueue:基于优先级的队列
  • LifoQueue:LIFO(先进后出)队列
队列核心方法:
方法说明
empty()判断队列是否为空
full()判断队列是否已满(仅对有长度限制的队列有效)
qsize()获取队列中数据个数
put(item, timeout)存入数据,队列满时阻塞,可设置超时时间
put_nowait(item)存入数据,队列满时不阻塞(直接报错)
get(timeout)获取数据,队列空时阻塞,可设置超时时间
get_nowait()获取数据,队列空时不阻塞(直接报错)

三、条件对象(threading.Condition

Condition 用于复杂的线程间同步,通过 “等待 - 唤醒” 机制协调线程行为,自带锁(默认 RLock),需在锁块中使用 wait() 和 notify()

核心方法:
  • wait(timeout=None):阻塞当前线程,直到超时或被其他线程唤醒
  • notify(n=1):唤醒 1 个等待的线程(默认)
  • notify_all():唤醒所有等待的线程
案例:生产者 - 消费者(摘苹果与吃苹果)

生产者摘苹果存入队列,队列满时通知消费者;消费者吃苹果,队列空时通知生产者:

from threading import Thread, Condition, current_thread
from queue import Queue
import timeclass Apple:__number_code__ = 0  # 苹果编号计数器def __init__(self):self.__class__.__number_code__ += 1self.number = self.__class__.__number_code__def __repr__(self):return f"<Apple : {self.number}>"def product_apple(queue: Queue, condition: Condition):while True:with condition:  # 锁块:确保操作原子性while queue.full():condition.wait()  # 队列满时等待apple = Apple()print(f"{current_thread().name}正在摘取{apple}")queue.put(apple)time.sleep(0.2)  # 模拟摘苹果耗时if queue.full():condition.notify_all()  # 队列满时唤醒消费者def eat_apple(queue: Queue, condition: Condition):while True:with condition:while queue.empty():condition.wait()  # 队列空时等待apple = queue.get()print(f"{current_thread().name}正在吃{apple}~~")time.sleep(0.5)  # 模拟吃苹果耗时if queue.empty():condition.notify_all()  # 队列空时唤醒生产者if __name__ == '__main__':apple_queue = Queue(maxsize=50)  # 队列最大容量50condition = Condition()# 启动生产者和消费者productor = Thread(target=product_apple, args=(apple_queue, condition), name='生产者01')productor.start()for x in range(2):consumer = Thread(target=eat_apple, args=(apple_queue, condition), name=f'消费者{x}')consumer.start()

四、线程状态

线程生命周期包含以下状态及转换:

  1. 新建状态:创建线程对象后,调用 start() 之前
  2. 就绪状态:调用 start() 后,等待 CPU 调度
  3. 运行状态:获取 CPU 时间片,执行 run() 方法
  4. 阻塞状态:因 sleep()join()input() 等操作暂停,阻塞结束后回到就绪状态
  5. 死亡状态run() 执行完毕或异常终止

五、线程池(concurrent.futures.ThreadPoolExecutor

线程池可复用线程,减少线程创建 / 销毁的开销,适用于多任务场景。

核心用法:
  • 创建线程池:executor = ThreadPoolExecutor(max_workers)max_workers 为线程数)
  • 提交任务:executor.submit(func, *args)(返回 Future 对象,用于获取结果)
  • 获取结果:future.result()(阻塞等待任务完成并返回结果)
案例 1:线程池售票

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue
import timedef execute_task(queue: Queue):while True:time.sleep(0.5)try:ticket = queue.get(timeout=1)  # 超时1秒未获取到票则退出print(f"{current_thread().name}正在售票、票号是{ticket}、剩余{queue.qsize()}")except:print(f"{current_thread().name}:票已售罄")breakif __name__ == '__main__':executor = ThreadPoolExecutor(5)  # 5个线程的线程池queue = Queue(maxsize=100)for x in range(1, 101):queue.put(f"NO.{x:>08}")  # 初始化100张票for _ in range(5):executor.submit(execute_task, queue)
案例 2:多线程爬虫(抓取小说章节)

主线程获取小说章节列表,线程池并发抓取章节内容,最终汇总保存:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue
import requests
import re
import time
from traceback import print_excdef parse_url(url, count=0):"""解析网址,最多重试3次"""headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}try:if count < 3:response = requests.get(url, headers=headers)assert response.status_code == 200, f'网页抓取失败,状态码{response.status_code}'return response.textreturn Noneexcept:print_exc()return parse_url(url, count + 1)  # 重试def parse_caption(queue: Queue):"""抓取单章节内容"""try:time.sleep(0.5)  # 降低频率,避免被封index, href, title = queue.get(timeout=1)print(f"{current_thread().name}正在抓取{title}~~~")text = parse_url(f"https://www.c3719.lol{href}")print(f"{current_thread().name}抓取{title}成功!!!")if text is None:return index, title, ""# 提取章节正文regex = r'<div\s+id="chaptercontent".*?>(.*?)请收藏本站'match = re.search(regex, text)if not match:return index, title, ""caption = match.group(1)# 清洗内容(去除标签和多余空格)caption = re.sub(r"<br\s*/?>|\s+", "", caption)return index, title, captionexcept:print(f"{current_thread().name}所有章节抓取完成")if __name__ == '__main__':# 主线程获取章节列表url = "https://www.c3719.lol/book/61808/"noval_name = '斗罗大陆V重生唐三'text = parse_url(url)# 提取所有章节链接和标题regex = r'<dd><a\s+href\s*="(.*?)">(.*?)</a></dd>'all_captions = re.findall(regex, text)# 章节信息存入队列queue = Queue()for index, caption in enumerate(all_captions):queue.put((index, *caption))# 线程池并发抓取executor = ThreadPoolExecutor(50)futures = [executor.submit(parse_caption, queue) for _ in all_captions]# 获取结果并排序result = [f.result() for f in futures]result.sort(key=lambda x: x[0])  # 按章节顺序排序# 保存到文件with open(f'{noval_name}.txt', 'w', encoding='utf-8') as f:for index, title, text in result:f.write(title + "\n")f.write(text + "\n")f.flush()print(f"{noval_name}抓取完成")

作业一:线程通信(生产者 - 消费者模型)

需求
a) 定义 1 个生产者线程,负责摘苹果并存储到队列中;当队列已满时,通知消费者吃苹果并停止摘苹果。
b) 定义多个消费者线程,负责吃苹果;当队列为空时,通知生产者继续摘苹果。

实现代码

from queue import Queue
from threading import Thread, Condition, current_thread
import time# 苹果类:记录苹果编号
class Apple:__number_code__ = 0  # 类变量,用于计数苹果编号def __init__(self):self.__class__.__number_code__ += 1  # 每次创建苹果时编号自增self.number = self.__class__.__number_code__def __repr__(self):return f"<Apple> : {self.number}"  # 自定义苹果的打印格式# 生产者函数:摘苹果并放入队列
def produce_apple(queue: Queue, condition: Condition):while True:with condition:  # 借助Condition的锁确保操作原子性# 队列满时等待(释放锁,等待消费者唤醒)while queue.full():condition.wait()# 摘苹果并放入队列apple = Apple()queue.put(apple)print(f"{current_thread().name}摘下: <{apple}>")time.sleep(0.2)  # 模拟摘苹果耗时# 队列满时通知所有消费者if queue.full():condition.notify_all()# 消费者函数:从队列取苹果并"吃"
def consume_apple(queue: Queue, condition: Condition):while True:with condition:  # 借助Condition的锁确保操作原子性# 队列空时等待(释放锁,等待生产者唤醒)while queue.empty():condition.wait()# 从队列取苹果并"吃"apple = queue.get()print(f"{current_thread().name}正在吃: <{apple}>")time.sleep(0.2)  # 模拟吃苹果耗时# 队列空时通知生产者if queue.empty():condition.notify()  # 唤醒1个生产者(此处只有1个生产者,notify_all()也可)# 主程序:启动生产者和消费者
if __name__ == '__main__':# 创建队列(最大容量20)和条件对象apple_queue = Queue(maxsize=20)condition = Condition()# 启动1个生产者线程producer = Thread(target=produce_apple, args=(apple_queue, condition), name="生产者")producer.start()# 启动3个消费者线程for i in range(3):consumer = Thread(target=consume_apple, args=(apple_queue, condition), name=f"消费者<{i}>")consumer.start()
核心逻辑解析
  1. Apple 类:通过类变量__number_code__记录苹果编号,每次创建苹果时自动生成唯一编号,便于跟踪。
  2. Condition 协调
    • 生产者通过condition.wait()在队列满时阻塞,等待消费者唤醒;队列满时通过notify_all()通知所有消费者。
    • 消费者通过condition.wait()在队列空时阻塞,等待生产者唤醒;队列空时通过notify()通知生产者。
  3. 线程安全with condition确保生产 / 消费操作在锁保护下执行,避免并发冲突。

作业二:多线程小说抓取与下载

需求:使用多线程(线程池)抓取笔趣阁类网站(如https://www.biqu03.cc/)某小说的所有章节内容,并保存为本地文件。

实现代码

import re
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue# 解析小说章节列表(获取所有章节的链接和标题)
def parse_url(url):headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}# 发送请求获取小说目录页内容response = requests.get(url, headers=headers)assert response.status_code == 200, "目录页请求失败"  # 确保请求成功text = response.text# 正则提取章节链接和标题(<dd><a href="链接">标题</a></dd>)regex = r'<dd><a\s+href\s+="(.*)">(.*)</a></dd>'all_title = re.findall(regex, text)  # 返回列表:[(链接1, 标题1), (链接2, 标题2), ...]return all_title# 解析单章节内容(从队列获取章节信息,抓取正文并返回)
def parse_caption(queue: Queue):try:time.sleep(0.5)  # 降低请求频率,避免被网站反爬headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}# 从队列获取章节信息(索引、链接、标题)index, href, title = queue.get(timeout=1)print(f"{current_thread().name}正在抓取{title}~~~")# 构建章节详情页URL并请求new_url = f"https://www.c3719.lol{href}"  # 注意替换为实际网站域名response = requests.get(new_url, headers=headers)assert response.status_code == 200, f"{title}详情页请求失败"text = response.textprint(f"{current_thread().name}抓取{title}成功!!!")# 正则提取章节正文(匹配id为chaptercontent的div内容)regex = r'<div\s+id="chaptercontent".*?>(.*?)请收藏本站'match = re.search(regex, text, re.DOTALL)  # re.DOTALL让.匹配换行符if not match:return index, title, ""  # 无正文时返回空# 清洗正文(去除<br>标签和多余空格)caption = match.group(1)caption = re.sub(r"<br\s*/?>|\s+", "", caption)return index, title, captionexcept:print(f"{current_thread().name}所有章节抓取完成")# 主程序:协调抓取流程并保存结果
if __name__ == '__main__':# 小说目录页URL(替换为目标小说目录页)url = "https://www.c3719.lol/book/61808/"all_tit = parse_url(url)  # 获取所有章节信息# 章节信息存入队列(用于线程池消费)queue = Queue()for index, x in enumerate(all_tit):queue.put((index, *x))# 小说名称(用于保存文件)noval_name = '斗罗大陆V重生唐三'# 线程池并发抓取所有章节executor = ThreadPoolExecutor(50)  # 50个线程并发futures = [executor.submit(parse_caption, queue) for _ in all_tit]# 收集并排序结果(按章节索引排序)result = [f.result() for f in futures if f.result() is not None]result.sort(key=lambda d: d[0])  # 按索引排序,确保章节顺序正确# 保存到本地文件with open(f'{noval_name}.txt', 'w', encoding='utf-8') as f:for index, title, text in result:f.write(title + "\n")  # 写入标题f.write(text + "\n")   # 写入正文f.flush()  # 实时刷新缓冲区print(f"{noval_name}抓取完成,已保存为本地文件!")

核心逻辑解析
  1. 章节列表解析:通过parse_url函数请求小说目录页,用正则提取所有章节的链接和标题,为后续抓取做准备。
  2. 多线程并发抓取:使用ThreadPoolExecutor创建线程池,多个线程从队列获取章节信息,并发请求详情页,提高效率。
  3. 正文提取与清洗:通过正则匹配章节正文所在的 HTML 标签,去除冗余标签(如<br>)和空格,得到干净的文本。
  4. 结果排序与保存:通过章节索引排序结果,确保章节顺序正确,最终写入本地 TXT 文件。
http://www.dtcms.com/a/317970.html

相关文章:

  • Lesson 33 Out of the darkness
  • 开疆智能ModbusTCP转Profinet网关连接EPSON机器人配置案例
  • c# winform 调用 海康威视工业相机(又全又细又简洁)
  • 字典树trie
  • 技术博客:从HTML提取到PDF生成的完整解决方案
  • 奔图P2500NW打印机手机无线连接方法
  • 强化应急通信生命线:遨游三防平板、卫星电话破局极端灾害救援
  • 2.6 sync
  • 2024年测绘程序设计比赛--空间探索性分析(数据为2025年第三次模拟数据)
  • 第二十六天(数据结构:树(补充版程序请看下一篇))
  • 【数据结构与算法】刷题篇——环形链表的约瑟夫问题
  • tmux.conf配置-简易版
  • Java技术栈/面试题合集(15)-RabbitMQ篇
  • 202506 电子学会青少年等级考试机器人四级实际操作真题
  • vue3 vite 使用vitest 单元测试 组件测试
  • Python数据可视化:从基础到高级实战指南
  • 【代码随想录day 12】 力扣 144.145.94.前序遍历中序遍历后序遍历
  • 【数据可视化-82】中国城市幸福指数可视化分析:Python + PyEcharts 打造炫酷城市幸福指数可视化大屏
  • 架构层防护在高并发场景下的实践
  • Linux系统之Docker命令与镜像、容器管理
  • Spring Cloud系列—Eureka服务注册/发现
  • ElasticSearch~DSL常用语法
  • Unity 调节 Rigidbody2D 响应速度的解决方案【资料】
  • CS课程项目设计8:基于Canvas支持AI人机对战的五子棋游戏
  • Lesson 35 Stop thief!
  • MATLAB实现的改进遗传算法用于有约束优化问题
  • Java 工具类的“活化石”:Apache Commons 核心用法、性能陷阱与现代替代方案
  • 03-mysql/redis/apache安装记录
  • 从《中国开源年度报告》看中国开源力量的十年变迁中,Apache SeaTunnel 的跃迁
  • SmartMediaKit 模块化音视频框架实战指南:场景链路 + 能力矩阵全解析