当前位置: 首页 > news >正文

[Python学习日记-88] 并发编程之多进程 —— 队列与生产者消费者模型

[Python学习日记-88] 并发编程之多进程 —— 队列与生产者消费者模型

简介

队列

一、队列的介绍 

二、队列的使用

生产者消费者模型

一、为什么要使用生产者消费者模型

二、什么是生产者消费者模型

三、生产者消费者模型的优势

四、生产者消费者模型的实现

JoinableQueue

管道(不推荐)

简介

        在多进程编程当中允许程序创建多个进程,每个进程拥有独立的地址空间、内存、数据栈等资源,能够并行执行,互不干扰。在一个程序或系统中通常会让多个进程协同工作,那就会发生数据交换和信息共享,此时就需要一种有效的进程间通信(Inter - Process Communication, IPC)机制,而队列就是其中一种。本篇将会介绍队列的使用和生产者消费者模型。

队列

一、队列的介绍 

        mutiprocessing 模块支持使用消息传递的队列来实现进程间通信,可以使用 Queue 类,其语法如下:

Queue([maxsize])        # 创建共享的进程队列

        该类提供了一个进程安全的队列,可用于在不同进程之间传递数据。它的底层通过管道和锁定机制实现,确保了多进程环境下数据传递的准确性与安全性。 

类的参数介绍

  • maxsize:该参数用来限制队列中允许的最大项数,省略该参数则表示无大小限制

注意:对于该参数有两点需要明确

        1、队列中适合存放精简的小数据,即消息,而非大数据

        2、队列实际上占用的是内存空间,即便 maxsize 无大小限制也受限于系统内存大小

队列的基本操作方法

  • put(obj, blocked=True, timeout=None):该方法将数据 obj 放入队列中。若 blocked 为 True(默认值)且 timeout 为正值,当队列已满时,该方法会阻塞 timeout 指定的时间,直至队列有剩余空间;若超时,将抛出 Queue.Full 异常。若 blocked 为 False,且队列已满,会立即抛出 Queue.Full 异常
  • get(blocked=True, timeout=None):该方法可以从队列读取并删除一个元素。当 blocked 为 True(默认值)且 timeout 为正值时,若在等待时间内未取到任何元素,会抛出 Queue.Empty 异常。若 blocked 为 False,若队列有可用值则立即返回,否则,若队列为空,会立即抛出 Queue.Empty 异常
  • put_nowait():与 put(obj, blocked=False) 效果相同
  • get_nowait():与 get(blocked=False) 效果相同
  • qsize():返回队列中当前元素的数量。由于多进程环境的复杂性,该结果可能不可靠,例如在获取数量的瞬间,其他进程可能已对队列进行了修改
  • empty():判断队列是否为空。该结果也可能不可靠,因为在返回 True 的过程中,队列可能又加入了项目
  • full():判断队列是否已满。该结果也可能存在不可靠性

其他方法

  • cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止 join_thread() 阻塞
  • close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果队列被垃圾回收机制回收,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常,也就是说使用者在使用 get() 被阻塞,并不会因为队列关闭而收到结束信号或异常
  • join_thread():连接队列的后台线程。此方法用于在调用 close() 之后,等待所有队列项被消耗。通常使用队列的创建人,及你,不会去调用该方法,可以通过调用 cancel_join_thread() 来防止调用

二、队列的使用

指定队列大小: 

from multiprocessing import Queue


q = Queue(3)    # 队列长度设置为3

q.put('hello')
q.put({'a':1})
q.put([3,3,3])
print("队列是否满了:",q.full())

# q.put(4)  # 当队列满了的时候再放数据进去就会卡住,直到有人取数据

print("第一次取:",q.get())
print("第二次取:",q.get())
print("第三次取:",q.get())
print("队列是否空了:",q.empty())

print("第四次取:",q.get())  # 当队列是空的时候也会卡住,直到有人放数据进来

输出结果:

        当队列满时再放数据进去就会卡住,知道有人取数据(不注释 q.put(4) 时)

         当按照队列长度输入得刚刚好时(注释 q.put(4) 时)

不指定队列大小:  

from multiprocessing import Queue


q = Queue()  # 不指定大小可以无限往里放,但限制于你的实际内存大小

q.put('hello')
q.put({'a':1})
q.put([3,3,3])
print("队列是否满了:",q.full())

print("第一次取:",q.get())
print("第二次取:",q.get())
print("第三次取:",q.get())
print("队列是否空了:",q.empty())

输出结果:

生产者消费者模型

一、为什么要使用生产者消费者模型

        生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者消费者模型。

二、什么是生产者消费者模型

        生产者消费者模型是一种经典的设计模式,它巧妙的通过一个容器解决了生产者和消费者的强耦合问题。在该模型中,生产者和消费者彼此之间不直接通讯,而通过缓冲区(通常是一个队列)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接放入缓冲区即可继续生产,而消费者不需要直接找到生产者拿数据,而试直接从缓冲区里取,这样生产者和消费者可以以不同的速度运行,极大地提高了系统的整体效率和灵活性。

三、生产者消费者模型的优势

  • 解耦:生产者和消费者无需直接交互,它们仅与队列进行数据传递,从而显著降低了模块之间的耦合度。这意味着在系统的后续维护和扩展过程中,对生产者或消费者模块的修改不会对另一方产生直接影响,提高了系统的可维护性
  • 提高效率:生产者和消费者能够并行工作,充分利用系统资源。例如,在一个数据处理系统中,生产者可以持续不断地从数据源获取数据并放入队列,而消费者则可以同时对队列中的数据进行处理,避免了因一方等待另一方而造成的资源浪费,提高了整体吞吐量
  • 缓冲:队列作为缓冲区,能够有效平衡生产者和消费者的速度差异。当生产者生成数据的速度快于消费者处理数据的速度时,队列可以暂时存储多余的数据,防止数据丢失;反之,当消费者处理数据的速度快于生产者生成数据的速度时,队列中的数据可以保证消费者不会因等待数据而空闲,维持系统的稳定运行

四、生产者消费者模型的实现

        我们以一个包子店为例,那么这个包子店的生产者消费者模型分析如下:

程序中有两类角色:
1、负责生产数据(生产者)
2、负责处理数据(消费者)
        
引入生产者消费者模型为了解决的问题是:让生产者和消费者进行解耦,平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
        
实现模型:生产者 <——> 队列 <——> 消费者

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time


def producer(name,q,product):
    for i in range(3):
        res = '%s%s' % (product,i)
        time.sleep(0.5)
        print('生产者%s生产了%s' % (name,res))
        q.put(res)


def consumer(name,q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(1)
        print('消费者%s吃了%s' % (name,res))


if __name__ == '__main__':
    # 容器
    q = Queue()
    # 生产者们
    p1 = Process(target=producer,args=('马国明',q,'叉烧包'))
    p2 = Process(target=producer,args=('刘德华',q,'生肉包'))
    p3 = Process(target=producer,args=('黎明',q,'豆沙包'))
    # 消费者们
    c1 = Process(target=consumer,args=('陆小凤',q))
    c2 = Process(target=consumer,args=('AngleBaby',q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)    # 有多少个消费者就需要多少个
    q.put(None)
    print('主')

输出结果:

         在代码中 q.put(None) 的作用是解决主进程永远不会结束的问题,这一问题的原因是生产者在生产完后就结束了,但是消费者在取空了队列 q 之后,则一直处于死循环中并卡在 q.get() 这一步,那我们只需要在生产者生产完毕后往队列中再发一个结束信号 None,然后消费者接收到结束信号 None 时就可以 break 跳出死循环。

        有的小伙伴可能就会问了,在什么时候 q.put(None) 才合适呢?我们有几个选择:一是,在 producer() 中的循环结束后加;二是,在主进程当中加。最终我们选择的是在主进程当中加,这是因为生产者肯定不止一个,消费者也不会只有一个,这个时候如果是前者的情况下,有的生产者速度快提前结束了生产任务,那么就会在队列当中直接放入结束信号 None,那么某一个消费者取队列中的数据时就会取到结束信号 None,但是这个消费者还想继续吃包子啊,那么就会造成一个 bug,在消费者还没吃饱的时候就把人家赶走了,而后者则在 join() 后才加入结束信号 None,及确保了生产者的进程结束了,也不会有某一个消费者在吃包子的过程中突然就取到了结束信号 None。

        其实我们的思路无非是发送结束信号而已,但是目前这种加结束信号的方式是不太高级的,我们可以使用另外一种队列(JoinableQueue),其提供了这种机制。

JoinableQueue

        JoinableQueue 是 multiprocessing.Queue 的子类,它在普通队列的基础上提供了额外的功能,用于更精细地管理任务的完成情况。与普通队列不同,JoinableQueue 允许消费者通知生产者队列中的数据已经被成功处理,而通知进程是使用共享的信号和条件变量来实现的,其语法如下:

 JoinableQueue([maxsize])

参数介绍

  • maxsize:该参数用来限制队列中允许的最大项数,省略该参数则表示无大小限制

方法介绍

        JoinableQueue 除了与 Queue 相同的方法外还具有以下方法

  • task_done():消费者使用此方法发出信号,表示 get() 的返回数据已经被处理。如果调用此方法的次数大于从队列中数据的数量,将抛出 ValueError 异常
  • join():生产者调用此方法进行阻塞,直到队列中所有的数据均被处理。阻塞将持续到队列中的每个数据均调用 task_done() 方法为止

 对上面实现的生产者消费者模型进行改造

from multiprocessing import Process,JoinableQueue
import time


def producer(name,q,product):
    for i in range(3):
        res = '%s%s' % (product,i)
        time.sleep(0.5)
        print('生产者%s生产了%s' % (name,res))
        q.put(res)
    q.join()    # 会等待队列中的所有项目都返回 task_done()


def consumer(name,q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(1)
        print('消费者%s吃了%s' % (name,res))
        q.task_done()    # 该项目处理完成后向生产者发送通知


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()
    # 生产者们
    p1 = Process(target=producer,args=('马国明',q,'叉烧包'))
    p2 = Process(target=producer,args=('刘德华',q,'生肉包'))
    p3 = Process(target=producer,args=('黎明',q,'豆沙包'))
    # 消费者们
    c1 = Process(target=consumer,args=('陆小凤',q))
    c2 = Process(target=consumer,args=('AngleBaby',q,))
    # 把消费者们设置为守护进程
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    print('主')

输出结果:

        在代码中不同进程之间的等待关系如下

主进程 —— 等待(p1.join()、p2.join()、p3.join())——> p1、p2、p3 —— 等待(q.join())——> c1、c2

        由于 q.join() 的存在,p1、p2、p3 结束了就证明 c1、c2 肯定都把队列中的数据处理完了,那么 c1、c2 也没有存在的价值了,它们应该随着主进程的结束而结束,所以把 c1、c2 设置成守护进程。

管道(不推荐)

        mutiprocessing 模块实现进程间通信的第二种形式就是使用消息传递的管道,但并部推荐使用,因为。。。。管道的语法如下:

Pipe([duplex])

        该类会在进程之间创建一条管道,并返回元组 (conn1,conn2),其中 conn1 和 conn2 分别表示管道两端的连接对象,需要注意的是管道的产生必须在产生 Process 对象之前。

类参数的介绍

  • duplex:默认管道是全双工的,如果将 duplex 设成 False,conn1 只能用于接收,conn2 只能用于发送

主要方法

  • conn1.recv():接收 conn2.send(obj) 发送的对象。如果没有消息可接收,recv() 会一直阻塞;如果连接的另外一端已经关闭,那么 recv() 方法会抛出 EOFError 异常
  • conn1.send(obj):通过连接发送对象。obj 是与序列化兼容的任意对象

其他方法

  • conn1.close():关闭连接,如果 conn1 被垃圾回收机制回收,将自动调用此方法
  • conn1.fileno():返回连接使用的整数文件描述符
  • conn1.poll([timeout]):如果连接上的数据可用,返回 True。timeout 指定等待的最长时限。如果省略此参数,方法将立即返回结果;如果将 timeout 射成 None,操作将无限期地等待数据到达
  • conn1.recv_bytes([maxlength]):接收 conn2.send_bytes() 方法发送的一条完整的字节消息。maxlength 指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将抛出 IOError 异常,并且在连接上无法进行进一步读取;如果连接的另外一端已经关闭,再也不存在任何数据,将抛出 EOFError 异常
  • conn2.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer 是支持缓冲区接口的任意对象,offset 是缓冲区中的字节偏移量,而 size 是要发送字节数。结果数据以单条消息的形式发出,然后调用 conn1.recv_bytes() 函数进行接收    
  • conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在 buffer 对象中,该对象支持可写入的缓冲区接口(即 bytearray 对象或类似的对象);offset 指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发 BufferTooShort 异常

基于管道实现生产者消费者模型:与队列的方式是类似的,因为队列就是管道加锁实现的

from multiprocessing import Process, Pipe
import time

# 管道的流向是从左到右的,放入时关闭右侧,获取时关闭左侧
def consumer(p, name):
    left, right = p
    left.close()    # 接收方关闭左侧,从右侧取出
    while True:
        try:
            baozi = right.recv()    # 如果包子还没放入则会进行等待
            print('%s 收到包子:%s' % (name, baozi))
        except EOFError:
            right.close()
            break


def producer(seq, p, name):
    left, right = p
    right.close()    # 发送方关闭右侧,从左侧放入
    for i in seq:
        left.send(i)
        print('%s 放入包子:%s' % (name, i))
        time.sleep(1)
    else:
        left.close()


if __name__ == '__main__':
    # 创建管道
    left, right = Pipe()

    # 消费者
    c1 = Process(target=consumer, args=((left, right), 'c1'))
    c1.start()

    # 生产者
    seq = (i for i in range(10))
    producer(seq, (left, right), 'p1')

    # 执行完毕关闭隧道
    right.close()
    left.close()

    c1.join()
    print('主进程')

输出结果:

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的 recv() 操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产 EOFError 异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点

        管道也可以用于双向通信,把客户端的请求视为管道的左侧,服务器的响应视为右侧,即请求与响应模型或远程过程调用,就可以使用管道编写与进程交互的程序。

http://www.dtcms.com/a/98527.html

相关文章:

  • 赛博威智慧导购平台,融合AI激活一线导购效能,破局增长瓶颈
  • 目标识别与双目测距(1)环境搭建:Ubuntu+yolov5+pcl库
  • JAVA学习*工厂模式
  • Python容器详解:从字符串到字典的完整指南
  • CPT204 Advanced Obejct-Oriented Programming 高级面向对象编程 Pt.4 泛型
  • 4.训练篇2-毕设篇
  • Share01-WinCC文件越用越大?
  • 【学习笔记】文件上传漏洞--js验证、mime验证、.user.ini、短标签、过滤、文件头
  • 大数据Spark(五十五):Spark框架及特点
  • Leetcode 两数相除
  • 海量数据处理
  • 下载和初步上手Vue3路由
  • NOIP2007提高组.矩阵取数游戏
  • 思维链技术(Chain-of-Thought, CoT)
  • 双卡 RTX 5090 深度体验:AI 推理框架选择与性能限制实测
  • HCIP(RSTP+MSTP)
  • 【STL】list
  • React程序打包与部署
  • JAVASE-day14
  • 蓝桥杯备考:多米诺骨牌
  • 【Linux】GDB调试指南
  • 基于YALMIP+CPLEX的带储能微电网调度问题最优求解matlab仿真
  • 提示词工程(Prompt Engineering):释放AI潜能的“语言编程”
  • #CX# UVM中的virtual sequence 和 virtual sequencer 的用途
  • 【Kafka】从理论到实践的深度解析
  • pytorch中dataloader自定义数据集
  • Java/Scala是什么
  • 解决由于中文路径无法安装软件的问题--例如postersql
  • 数据化管理(一)---什么是数据化管理
  • 模拟集成电路设计与仿真 : Memory