Python 线程 类比c++【python】
Python thread
import threading
import time
import random
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import sys# 1. 线程创建方式1:继承Thread类并重写run方法
class MyThread(threading.Thread):def __init__(self, name, delay):super().__init__(name=name) # 设置线程名称self.delay = delaydef run(self):"""线程执行的核心逻辑"""print(f"[{self.name}] 开始执行 (继承Thread类)")time.sleep(self.delay)print(f"[{self.name}] 执行结束")# 2. 线程创建方式2:通过target函数创建
def thread_func(name, delay):"""作为线程目标的函数"""print(f"[{name}] 开始执行 (target函数)")try:time.sleep(delay)# 模拟可能发生的异常if random.random() < 0.3:raise ValueError(f"[{name}] 发生随机错误")except Exception as e:print(f"[{name}] 捕获异常: {e}", file=sys.stderr)finally:print(f"[{name}] 函数执行完毕")# 3. 线程同步:使用Lock处理共享资源竞争
class SharedResource:def __init__(self):self.count = 0self.lock = threading.Lock() # 互斥锁# self.lock = threading.RLock() # 可重入锁(适合递归场景)def increment(self, thread_name):"""安全地递增计数器(演示锁的使用)"""with self.lock: # 自动获取和释放锁,避免死锁风险print(f"[{thread_name}] 准备修改计数器,当前值: {self.count}")current = self.counttime.sleep(0.1) # 模拟耗时操作,放大竞争问题self.count = current + 1print(f"[{thread_name}] 修改后计数器: {self.count}")# 4. 线程通信:使用Event实现简单信号传递
def event_waiter(event, name):"""等待事件触发的线程"""print(f"[{name}] 等待事件触发...")event.wait() # 阻塞等待事件被设置print(f"[{name}] 事件已触发,继续执行")# 5. 线程通信:使用Condition实现复杂条件等待
def condition_worker(cond, name, threshold):"""基于Condition等待特定条件的线程"""with cond:print(f"[{name}] 等待条件满足...")cond.wait_for(lambda: shared_counter >= threshold) # 等待条件成立print(f"[{name}] 条件满足 (shared_counter={shared_counter}),开始工作")time.sleep(0.5)# 6. 线程通信:使用Queue实现安全的数据传递
def queue_producer(q, name):"""向队列生产数据的线程"""for i in range(3):data = f"{name}_data_{i}"q.put(data) # 放入数据,队列满时会阻塞print(f"[{name}] 生产数据: {data},队列大小: {q.qsize()}")time.sleep(random.uniform(0.1, 0.5))q.put(None) # 发送结束信号print(f"[{name}] 生产结束")def queue_consumer(q, name):"""从队列消费数据的线程"""while True:data = q.get() # 获取数据,队列为空时会阻塞if data is None: # 收到结束信号q.put(None) # 转发结束信号给其他消费者breakprint(f"[{name}] 消费数据: {data},剩余: {q.qsize()}")time.sleep(random.uniform(0.2, 0.6))q.task_done() # 标记任务完成print(f"[{name}] 消费结束")# 7. 线程本地存储:每个线程的私有数据
thread_local = threading.local() # 线程本地存储对象def local_data_worker(name):"""使用线程本地存储的线程"""thread_local.value = name # 为当前线程设置私有值time.sleep(random.uniform(0.1, 0.3))# 每个线程只能访问自己的valueprint(f"[{name}] 线程本地值: {thread_local.value}")# 8. 线程池:使用ThreadPoolExecutor管理线程
def pool_task(task_id):"""线程池中的任务函数"""start = time.time()time.sleep(random.uniform(0.2, 0.8))end = time.time()return f"任务{task_id}完成,耗时: {end - start:.2f}s,线程: {threading.current_thread().name}"def main():global shared_countershared_counter = 0print("===== 1. 基本线程创建与启动 =====")# 创建线程实例thread1 = MyThread("继承线程", 1)thread2 = threading.Thread(target=thread_func,args=("目标线程", 1.5),daemon=False # 非守护线程(默认):主线程会等待其完成)# 启动线程thread1.start()thread2.start()# 等待线程完成(join的使用)thread1.join()thread2.join()print("基本线程演示结束\n")print("===== 2. 线程同步(锁机制) =====")resource = SharedResource()sync_threads = []for i in range(5):t = threading.Thread(target=resource.increment,args=(f"同步线程{i}",))sync_threads.append(t)t.start()# 等待所有同步线程完成for t in sync_threads:t.join()print(f"最终计数器值: {resource.count}")print("线程同步演示结束\n")print("===== 3. 线程通信(Event) =====")event = threading.Event()waiter1 = threading.Thread(target=event_waiter, args=(event, "等待线程1"))waiter2 = threading.Thread(target=event_waiter, args=(event, "等待线程2"))waiter1.start()waiter2.start()time.sleep(2) # 主线程延迟后触发事件print("主线程触发事件")event.set() # 设置事件,唤醒所有等待线程waiter1.join()waiter2.join()print("Event通信演示结束\n")print("===== 4. 线程通信(Condition) =====")cond = threading.Condition()condition_threads = [threading.Thread(target=condition_worker, args=(cond, f"条件线程{i}", i + 1))for i in range(3)]for t in condition_threads:t.start()# 主线程修改共享变量并通知等待线程time.sleep(1)with cond:shared_counter = 2print(f"主线程更新shared_counter={shared_counter},通知所有等待线程")cond.notify_all() # 唤醒所有等待线程for t in condition_threads:t.join()print("Condition通信演示结束\n")print("===== 5. 线程通信(Queue) =====")q = Queue(maxsize=2) # 最大容量为2的队列producer = threading.Thread(target=queue_producer, args=(q, "生产者"))consumer1 = threading.Thread(target=queue_consumer, args=(q, "消费者1"))consumer2 = threading.Thread(target=queue_consumer, args=(q, "消费者2"))producer.start()consumer1.start()consumer2.start()producer.join()consumer1.join()consumer2.join()print("Queue通信演示结束\n")print("===== 6. 线程本地存储 =====")local_threads = [threading.Thread(target=local_data_worker, args=(f"本地线程{i}",))for i in range(3)]for t in local_threads:t.start()for t in local_threads:t.join()print("线程本地存储演示结束\n")print("===== 7. 线程池(ThreadPoolExecutor) =====")with ThreadPoolExecutor(max_workers=3, thread_name_prefix="池线程") as executor:# 提交单个任务future = executor.submit(pool_task, 0)print(future.result()) # 获取单个任务结果# 批量提交任务tasks = list(range(1, 5))results = executor.map(pool_task, tasks) # 按顺序返回结果for res in results:print(res)print("线程池演示结束\n")print("===== 8. 守护线程演示 =====")def daemon_worker():while True:print("守护线程运行中...")time.sleep(0.5)daemon_thread = threading.Thread(target=daemon_worker, daemon=True)daemon_thread.start()print("主线程休眠2秒后结束(守护线程会被强制终止)")time.sleep(2)print("主线程结束,守护线程将随之终止")if __name__ == "__main__":main()
- 线程创建方式
- 继承
threading.Thread
类并重写run()
方法 - 直接传入目标函数
target
创建线程 - 线程命名与参数传递
- 继承
- 线程生命周期管理
start()
:启动线程join()
:等待线程完成- 守护线程(
daemon=True
):随主线程结束而终止
- 线程同步(解决资源竞争)
Lock
:基础互斥锁,确保共享资源原子操作RLock
:可重入锁(适合递归场景)with
语句自动管理锁的获取与释放
- 线程间通信
Event
:简单信号传递(触发 / 等待机制)Condition
:复杂条件等待(基于谓词的唤醒)Queue
:线程安全的消息队列(生产者 - 消费者模型)
- 线程私有数据
threading.local()
:存储线程独有数据,避免共享冲突
- 线程池
ThreadPoolExecutor
:高效管理线程资源submit()
:提交单个任务map()
:批量处理任务并获取结果
- 异常处理
- 线程内部的异常捕获与处理
- 错误信息的正确输出
- 线程属性
- 线程名称(
name
) - 线程标识(
ident
) - 当前线程获取(
threading.current_thread()
)
- 线程名称(
重写 run
1. 重写run
方法的作用
Thread
类是 Python 线程的基类,它内部有一个run
方法,这个方法是线程的 “入口点”—— 当线程启动时,本质上就是执行run
方法里的代码。
但Thread
类自带的run
方法是一个空实现(没有实际逻辑),因此需要通过继承Thread
类并重写run
方法,把我们需要线程执行的任务(比如循环、计算、IO 操作等)写在重写的run
方法里。
例如代码中的MyThread
类:
class MyThread(threading.Thread):def __init__(self, name, delay):super().__init__(name=name)self.delay = delay# 重写run方法,定义线程要执行的任务def run(self):print(f"[{self.name}] 开始执行 (继承Thread类)")time.sleep(self.delay) # 模拟任务执行print(f"[{self.name}] 执行结束")
这里重写的run
方法明确了线程的任务:打印开始信息 → 休眠指定时间 → 打印结束信息。
2. run
方法会被自动调用(无需手动调用)
重写后的run
方法不需要我们手动调用,而是通过线程的start()
方法间接触发:
# 创建线程实例
thread1 = MyThread("继承线程", 1)
# 启动线程:调用start(),会自动执行重写的run()方法
thread1.start()
- 当调用
start()
时,Python 会创建一个新的线程,并在新线程中自动执行run
方法里的逻辑。 - 注意:如果直接调用
run()
(如thread1.run()
),不会创建新线程,只是普通的函数调用(在当前线程中执行),这就失去了 “多线程” 的意义。
总结
- 重写
run
方法:目的是给线程 “分配任务”,定义线程要执行的具体逻辑。 - 调用方式:通过
start()
启动线程,start()
会自动在新线程中调用重写后的run
方法(无需手动调用run
)
类比c++ thread
在 Python 中 “继承 Thread
类并重写 run
方法”,对应 C++11 std::thread
的 “函数对象(Functor)” 用法(也叫 “仿函数” 或 “可调用对象”)。以下是详细对比和解释:
- 核心逻辑:“类封装执行逻辑”
Python 中重写 run
的本质是将线程的执行逻辑封装在类的成员函数中;C++11 中 “函数对象” 的本质是将线程的执行逻辑封装在类的 operator()
重载方法中。
- 代码结构对比
Python 侧(继承 Thread
并重写 run
)
import threading
class MyThread(threading.Thread):def __init__(self, name, delay):super().__init__(name=name)self.delay = delaydef run(self): # 重写 run 方法,定义线程任务print(f"[{self.name}] 执行任务...")time.sleep(self.delay)# 使用:创建线程并启动
t = MyThread("线程A", 1)
t.start() # 自动调用重写的 run 方法
C++11 侧(函数对象 / Functor)
#include <thread>
#include <iostream>
using namespace std;class MyFunctor {
public:MyFunctor(const string& name, int delay) : name_(name), delay_(delay) {}void operator()() { // 重载 operator(),定义线程任务cout << "[" << name_ << "] 执行任务..." << endl;this_thread::sleep_for(chrono::seconds(delay_));}
private:string name_;int delay_;
};// 使用:创建函数对象并传入 std::thread
int main() {MyFunctor func("线程A", 1);thread t(func); // 将函数对象传入 thread,自动调用 operator()t.join();return 0;
}
- 行为与设计意图的一致性
- 封装性:两者都通过 “类” 封装线程的状态(如 Python 的
self.delay
、C++ 的delay_
)**和**执行逻辑(run
或operator()
),适合复杂场景(如需要维护线程私有的成员变量)。 - 自动调用:Python 中
start()
会自动调用run
;C++ 中std::thread
会自动调用函数对象的operator()
,无需手动调用。
- 其他 C++11 线程用法对比
C++11 std::thread
还支持函数指针、Lambda、成员函数等方式,但这些与 Python “重写 run
” 的设计思路不同:
- 函数指针 / Lambda:逻辑是 “无状态” 的(或依赖外部变量捕获),无法像类一样自然封装线程私有的状态。
- 成员函数:需额外传递对象指针(如
thread(&Class::func, &obj)
),更适合 “复用已有类的方法”,而非 “为线程定制逻辑”