当前位置: 首页 > news >正文

Python多线程知多少

目录

目标

Python版本

官方文档

概述

线程

守护线程

线程同步

事件对象(Event Object)

实战

创建线程的基本语法

阻塞线程

守护线程

线程同步的方法

互斥锁(排他锁)

信号量(Semaphore)

事件对象(Event Object)

基本语法

生产者和消费者(不推荐)

队列(queue)


目标

        掌握多线程的基本概念和使用方法,包括:创建线程、线程同步、线程通信、守护线程、事件对象、队列。


Python版本

        Python 3.9.18


官方文档

基于线程的并行https://docs.python.org/3.9/library/threading.html

线程池https://docs.python.org/3.9/library/concurrent.futures.html


概述

线程

定义

        线程(Thread)是操作系统调度的最小单位,是进程(Process)内部的执行单元。线程与进程的主要区别在于,进程是资源分配的基本单位,而线程是CPU调度的基本单位

        多线程是指在同一个进程中并发执行多个线程,每个线程可以独立执行任务。Python使用threading模块提供多线程支持。

生活中的单线程案例

        餐厅中如果只有一个厨师,则该厨师需要依次完成以下工作:接收订单->准备食材->烹饪->装盘。如果餐厅生意火爆,则客人必然会等待很久。

生活中的多线程案例

        餐厅有多个厨师,则每个厨师只需要负责制作他拿手的菜,如果餐厅生意火爆,每个厨师都会忙碌起来,但客人并不会等待太久。

官方文档

CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

        从官方文档了解到,由于全局解释器锁(GIL)的限制,Python的多线程通常并不适用于CPU密集型的任务,但对于IO密集型任务(如文件操作、网络请求等)仍然是非常有效的。这其实说明了Python的多线程往往不是并行调度。


守护线程

官方文档

Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

        线程可以被标记为守护线程。它的特点是:如果Python程序剩下守护线程和主线程,则主线程会立刻退出,所以守护线程的生命周期依赖主线程。因此守护线程适合做一些辅助工作,如:日志、心跳、监控等。


    线程同步

    1. 线程同步的目的是确保共享资源在同一时刻只能被一个线程访问,从而避免出现数据冲突或不一致的情况。
    2. 线程同步是实现线程安全的其中一种方法。
    3. 线程同步通常用锁来实现,其中互斥锁是最基本的同步机制。

    事件对象(Event Object)

    官方文档

    This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.

    An event object manages an internal flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true.

            这是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。Event对象维护一个内部标识,通过调用各种方法实现线程通信:

    • event.set()将事件状态设为 True,所有 wait() 的线程立即执行。
    • event.clear()将事件状态设为 False,使 wait() 进入阻塞状态。
    • event.wait()如果事件状态是 False,阻塞线程,直到 set() 被调用。可以设置timeout(超时时间)参数,但是是秒。

            这里要分清楚clear()和wait()的具体作用:wait()的作用是检查Event是否为True,为True则线程继续运行,为False则线程阻塞。由此看来,Event对象调用wait()不一定会阻塞线程。只有Event对象调用clear()后,再调用wait()才可以阻塞线程


    实战

    创建线程的基本语法

    import threading
    import time
    
    def fun_1():
        for i in range(5):
            print(f"fun_1方法开始执行。{i}")
            time.sleep(2)
    def fun_2():
        for i in range(6):
            print(f"fun_2方法开始执行。{i}")
            time.sleep(1)
    t1=threading.Thread(target=fun_1,name="线程1");
    t2=threading.Thread(target=fun_2,name="线程2");
    t1.start()
    t2.start()
    print("主线程开始继续执行。")

    输出结果 

    fun_1方法开始执行。0
    fun_2方法开始执行。0
    主线程开始继续执行。
    fun_2方法开始执行。1
    fun_1方法开始执行。1
    fun_2方法开始执行。2
    fun_2方法开始执行。3
    fun_1方法开始执行。2
    fun_2方法开始执行。4
    fun_2方法开始执行。5
    fun_1方法开始执行。3
    fun_1方法开始执行。4


    阻塞线程

            在上述案例中,我们可以看到子线程还未执行完任务,主线程就已经继续向下执行了。能否让主线程等待子线程执行完了才向下运行。我们可以使用join()方法实现这种功能,该方法有timeout这个超时参数,如果超过了这个时间子线程还未完成任务,主线程将不再等待,继续向下运行。

    import threading
    import time
    
    def fun_1():
        for i in range(5):
            print(f"fun_1方法开始执行。{i}")
            time.sleep(2)
    def fun_2():
        for i in range(6):
            print(f"fun_2方法开始执行。{i}")
            time.sleep(1)
    t1=threading.Thread(target=fun_1,name="线程1");
    t2=threading.Thread(target=fun_2,name="线程2");
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("主线程开始继续执行。")

    输出结果 

    fun_1方法开始执行。0
    fun_2方法开始执行。0
    fun_2方法开始执行。1
    fun_1方法开始执行。1
    fun_2方法开始执行。2
    fun_2方法开始执行。3
    fun_1方法开始执行。2
    fun_2方法开始执行。4
    fun_2方法开始执行。5
    fun_1方法开始执行。3
    fun_1方法开始执行。4
    主线程开始继续执行。


    守护线程

    import threading
    import time
    
    def fun_1():
        for i in range(5):
            print(f"fun_1方法开始执行。{i}")
            time.sleep(2)
    
    t1=threading.Thread(target=fun_1,name="线程1");
    #设置线程为守护线程。
    t1.setDaemon(True)
    t1.start()
    
    print("主线程开始继续执行。")

     输出结果 

    fun_1方法开始执行。0
    主线程开始继续执行。


    线程同步的方法

    互斥锁(排他锁)

            同一时间只有一个线程获取锁,其他线程会被阻塞。

    import threading
    
    count = 0
    #互斥锁(排他锁)
    lock = threading.Lock()
    
    def increment():
        global count
        for _ in range(1000000):
            with lock:
                count += 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=increment)
        t2 = threading.Thread(target=increment)
    
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
        #正确结果是2000000
        print(count)

    注意

            with lock是语法糖,相当于下面的代码:

    def increment():
        global count
        for _ in range(1000000):
            lock.acquire()  # 手动获取锁
            try:
                count += 1
            finally:
                lock.release()  # 确保无论如何都释放锁,防止死锁

    信号量(Semaphore)

            多信号量不能保证线程同步,多信号量只能保证并发量。但是单个信号量的效果等同于互斥锁,可以作为线程同步的方法。

    import threading
    
    count = 0
    semaphore=threading.Semaphore(1)
    
    def increment():
        global count
        for _ in range(1000000):
            with semaphore:
                count += 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=increment)
        t2 = threading.Thread(target=increment)
    
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
        #正确结果是2000000
        print(count)

    注意

            with semaphore是语法糖,相当于下面的代码:

    def increment():
        global count
        for _ in range(1000000):
            semaphore.acquire()
            try:
                count += 1
            finally:
                semaphore.release()

    事件对象(Event Object)

    基本语法

    import threading
    import time
    
    event = threading.Event()
    def fun():
        print("fun方法开始执行……")
        #如果设置了超时时间,则wait会在超时后放行。但事件状态仍然为False
        #event.wait(1)
        print("事件状态:", event.is_set())
        event.wait()
        print("事件状态:",event.is_set())
    
    t=threading.Thread(target=fun)
    t.start()
    time.sleep(3)
    print("事件状态:",event.is_set())
    print("事件对象设置标识为True。")
    event.set()
    print("事件状态:",event.is_set())
    t.join()
    print("主线程结束。")

    生产者和消费者(不推荐)

            事件对象是线程间一种简单的通信机制,现在我们就用事件对象实现生产者&消费者模型,具体需求:每次生产出一只烤鸭,就会被等待的消费者消费。

    分析

            虽然下面的代码可以实现每生产一只烤鸭就会立刻消费一只烤鸭。但如果把代码中生产烤鸭的睡眠时间注释掉,烤鸭生产速度就会很快,最终导致烤鸭消费不完。对应实际业务,生产者产生的消息太快,消费者往往来不及消费,导致消息丢失因为Event只能使用标识来阻塞或唤醒线程,而不能存储多个事件。这时我们需要用队列(queue)

    import random
    import threading
    import time
    
    def produce_duck(event):
        count=0
        while True:
            #随机睡眠一段时间
            time.sleep(random.uniform(1, 3))
            count +=1
            print(f"成功生产出了第{count}只烤鸭。")
            #生产消息后立刻发出信号。
            event.set()
    
    def consume_duck(event):
        count=0
        while True:
            event.wait()
            count+=1
            print(f"消费了第{count}只烤鸭。")
            event.clear()
    
    if __name__ == '__main__':
        event=threading.Event()
        produce_duck_thread=threading.Thread(target=produce_duck, args=(event,));
        consume_duck_thread=threading.Thread(target=consume_duck, args=(event,));
    
        produce_duck_thread.start()
        consume_duck_thread.start()
    
        consume_duck_thread.join()
        consume_duck_thread.join()

    队列(queue)

    使用队列(queue)来实现生产者&消费者模型,具体需求:每次生产出一只烤鸭,就会被等待的消费者消费。技术要求:

    • 为了体现出队列先进先出和线程安全的特点,所以我们使用队列来存储烤鸭。
    • 烤鸭总数量作为所有生产者线程的共享变量,需要加锁。
    • 用多线程来实现这个需求。
    import queue
    import random
    import threading
    import time
    
    #烤鸭总数量(共享变量)
    duck_count=0
    #为了保证duck_count线程安全,需要加锁。
    duck_count_lock=threading.Lock()
    #把生产的烤鸭放入队列
    duck_queue=queue.Queue()
    
    def produce_duck(duck_queue,produce_id,max_duck_count):
        """
        :param duck_queue: 方烤鸭的队列
        :param produce_id: 生产者id
        :param max_duck_count: 所有生产者最多生产多少烤鸭。
        :return:
        """
        global duck_count
        #生产者总是不停地生产烤鸭。
        while True:
            with duck_count_lock:
                if duck_count>=max_duck_count:
                    #如果不在队列中加入None,则主线程不会停止,因为消费者无法退出。
                    duck_queue.put(None)
                    break
                duck_count+=1
                duck_num=duck_count
                duck_queue.put(duck_num)
                print(f"生产者{produce_id}生产了{duck_num}只烤鸭。")
    
    def consume_duck(duck_queue,consume_id):
        #消费者不停地消费
        while True:
            duck_num=duck_queue.get()
            if duck_num is None:
                break
            print(f"消费者{consume_id}消费了第{duck_num}只烤鸭。")
            #标记任务完成
            duck_queue.task_done()
    
    
    if __name__ == '__main__':
        produce_duck_list=[
            threading.Thread(target=produce_duck,args=(duck_queue,i,10000))
            for i in range(1,5)
        ]
        consume_duck_list = [
            threading.Thread(target=consume_duck, args=(duck_queue, i))
            for i in range(1, 3)
        ]
    
        for produce_duck_thread in produce_duck_list:
            produce_duck_thread.start()
        for consume_duck_thread in consume_duck_list:
            consume_duck_thread.start()
        for consume_duck_thread in consume_duck_list:
            consume_duck_thread.join()
        for produce_duck_thread in produce_duck_list:
            produce_duck_thread.join()
        print("主线程结束。")

    相关文章:

  1. 手机打电话时如何识别对方按下的DTMF按键的字符-安卓AI电话机器人
  2. cpp单调栈模板
  3. DeepSeek如何辅助学术量化研究
  4. 基于coze+微信小程序实现图片上传并利用大模型解析
  5. TP-LINK路由器如何设置网段、网关和DHCP服务
  6. 左值引用与右值引用详解
  7. 第二十八:5.5.【storeToRefs】5.6.【getters】
  8. Ultralytics导出的Engine模型直接加载报错
  9. DeepSeek 损失函数、奖励函数、训练过程
  10. Docker01 - docker快速入门
  11. MFC获取所有硬件厂商和序列号
  12. JSX 实现列表渲染
  13. 为AI聊天工具添加一个知识系统 之125 详细设计之66 智能语义网络
  14. DeepSeek赋能智慧港口:点亮全球航运的智慧灯塔,开启智能航运新纪元
  15. Vue框架的使用 搭建打包 Vue的安全问题(Xss,源码泄露)
  16. Vue3状态管理新选择:Pinia使用完全指南
  17. 和鲸科技携手四川气象,以 AI 的力量赋能四川气象一体化平台建设
  18. 课程2. 用PyTorch训练神经网络与梯度下降
  19. NAT 技术:网络中的 “地址魔术师”
  20. a_init: Unable to get log name. Retval:[-4]是什么故障
  21. 第十一届世界雷达展开幕,尖端装备、“大国重器”集中亮相
  22. 哈马斯与以色列在多哈举行新一轮加沙停火谈判
  23. 2025年“新时代网络文明公益广告”征集展示活动在沪启动
  24. 中国物流集团等10家央企11名领导人员职务任免
  25. 上海率先推进生物制品分段生产试点,这款国产1类创新药获批上市
  26. 缅甸内观冥想的历史漂流:从“人民鸦片”到东方灵修