当前位置: 首页 > news >正文

Python实现生产者消费者模型-多进程与多线程处理

一、背景

生产者消费者模型是一个很有意思的、高效率的模型,在任何多步骤、大规模(数据)需要处理的场景都可以使用。

想想我们要实现一个小型工厂,工人们是cpu、GPU等,每个工人要干几种不同的活儿。工人之间形成流水线,工人自己的工作也是有先后顺序。根据一定的条件,半成品在工人之间转移以加工,工人也要先忙完一个活儿才能释放自己的精力去再做另一个活儿;不过工人不用线性去完成手里的工作,如果有某个活儿一时半会儿做不了,就先去做其他的活儿,这样到最后剩下的就是还没有做的活儿,如果时间久了还没有做完,可能就是有问题的活儿,可以采取其他措施,比如强行不做了等。

我要怎么做才能实现呢?很有意思的技术。

中间“废话”比较多,直接到最后看代码。

二、设计思路:概念和行动

设计流程,遵循 概念集合 + 行动集合 的理念。

2.1 概念集合

  1. 车间:某种类型工人的组织。
  2. 半成品仓库:要交给工人去进一步加工的物品集合。
  3. 成品库:由工人加工完成后的物品集合。
  4. 仓库管理员:需要不停检查半成品仓库当前库存的人,并检测是否仓库物品已经全部收到,然后告知所有人后边不会有新的物品。
  5. 工人:实现半成品到成品的执行任务。
  6. 流水线:送进来半成品并运走成品。
  7. 终止信号:来自上游的终止信号。

2.2 行动集合

  1. 干活:不同车间的工人,有不同的任务执行方法。
  2. 取半成品:从流水线上取半成品。
  3. 放成品:把做完的成品放到流水线上。
  4. 发终止信号:车间内所有工人的工作完成后,并收到了上游的终止信号后,给下游车间发送终止信号。
  5. 停止干活:收到停止信号且当前工作都完成了,就不干了。
  6. 丢弃当前任务:当前任务可能由于一些原因,耗费了很久还没有完成,那就丢弃,继续接其他单子。
  7. 检查上游流水线:看看是否还有东西在上游流水线,若有就得继续干活。

2.3 每个概念与概念、概念与行动的关系

2.3.1 车间

  1. 检查上游流水线
  2. 分配工人工作
  3. 让工人把成品送到下游流水线
  4. 检查是否收到终止信号
  5. 等工人完成所有上游流水线后,并在接收到终止信号后,发送下游终止信号。
  6. 过程中:就是等终止信号。

2.3.2 工人

  1. 从上游流水线获取半成品
  2. 加工半成品
  3. 把成品放入到下游流水线
  4. 检查是否收到终止信号
  5. 检查是否上游流水线是否还有东西
  6. 停止工作

2.4 工厂图解

  1. 车间工人干活
半成品库流水线
车间1-工人1
车间1-工人2
车间1-工人...
成品库流水线

对于更复杂的流程,基本是在这个框架上叠加更多层的 半成品->工人->成品 这个框架,以及多个半成品库组合成一个汇总的半成品库给到组装工人去组装罢了。

  1. 车间收到终止信号
上游终止信号
等待所有worker停工
发送下游终止信号
  1. 工人收到终止信号
上游终止信号
半成品流水线为空
半成品流水线非空
停止工作
完成当前工作

3. Python工具

我准备用python代码来抽象并实现这样一个概念-行动集合体。为了方便复用,可以通过以下几个方式来使用代码:

  1. 继承:继承我写的基类,根据实际业务需要,重写必要的函数来实现具体的功能。
  2. 实例化:通过输入一些处理函数,实例化具体的类。用户只要按照要求写好处理函数,经过组装即可。

3.1 进程和线程基础知识

这里简单讲一下进程和线程的概念和区别。
在操作系统中,进程是资源分配的基本单位,而线程是CPU调度的基本单位。同一个进程内的多个线程,共享该进程的地址空间、包括进程的代码段、数据段、堆内存等资源。因此,进程中的成员变量对于该进程内的所有线程都是可见的,线程可以直接访问和修改这些成员变量,从而实现线程间的通信。

而进程之间的空间是独立,所以往往进程间通信比较麻烦,需要用到Multiprocessing库里的类,这个类是基于操作系统底层来实现共享的。

3.2 多进程的python实现

每个进程用来模拟一个工人。

# 常用的库是Multiprocessing.Process类,常见的做法是继承该类,并改写run()函数。运行时使用 start()函数会自动调用run函数;默认子进程启动和运行不会阻塞主进程;如要等子进程完成后再运行主进程,可以使用join()函数。
# 对于共享数据,需要在类初始化时,将共享数据作为参数进行输入,作为成员变量。

Import multiprocessing as mp

shared_queue = mp.Queue()

Class SampleProcess(mp.Process):
    def __init__(self, shared_queue):
        super(SampleProcess).__init__()
        self.shared_queue = shared_queue
        
    def run(self, **args):
        pass

# 启动子进程,这默认是异步的,即子进程的结果不会阻塞主进程,主进程的代码会继续运行。
child_process = SampleProcess(shared_queue)
child_process.start()

# 若要等待子进程或者子线程完成后再进行主进程的代码,则用join()方法。

for worker in workers:
    worker.join()
    
print("all worker has done")

# 获取子进程的结果,除了使用共享数据的方法之外,还可以手工来获取每个子进程的结果,用类似的多线程的进程池的方法。

import concurrent.futures as cf

def task(x):
    return x ** 2

with cf.ProcessPoolExecutor() as executor:
    data = [1, 2, 4]
    futures = [executor.submit(task, i) for i in data]
    for future in cf.as_complted(futures):
        try:
            result = future.result(timeout=10)
            print("work done, the result is %s" % result)
        except cf.TimeoutError as e:
            print("超时了")
        except Exceptions as e:
            print("进程遇到其他错误: %s" % e)


3.3 多线程的python实现

每个线程用来模拟每个工人要做的工作。通常,每个工人要去查看半成品仓库的信息,有活儿就干,没有就等着。所以至少可以分成两类任务,一种是具体干活的任务,一种是检查状态的任务,两者有一定的先后关系。

此外,工人可以同时干多个相同或者不同的、无先后顺序之分的活儿,谁先干完都行。

# 我比较了一下,建议使用concurrent.futures这个包更方便。它用来管理线程池子,不同的线程可以都放到这个池子里进行管理,不用手动的去管理每个线程的开始、运行和销毁。并且也更容易获取每个线程的运行结果。

import concurrent.futures
import threading

# 锁和共享变量要在同一个“层级”,即它们的作用域要能够覆盖到所有需要访问到该变量的线程。比如,可以同时定义在一个Process子类中。
# 如果可能要跨进程跨线程来做,那么就要定义在更大的层级内。
shared_variable = 0
lock = threading.Lock()

def task():
    global shared_variable
    with lock:
        shared_variable += 1
    return shared_variable
    
# 创建 excutor这个线程管理器
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 提交一个线程任务并记录到队列中
    futures = [executor.submit(task, i) for i in range(5)]
    # as_completed会返回一个迭代器,把先完成的任务返回,未完成的,则会阻塞直至完成。
    for future in concurrent.futures.as_completed(futures):
        # 线程执行过程中可能会出错,需要有回收机制。
        try:
            # 有时候我们不希望一个线程无限期执行而影响全局代码无法继续,可以设置一个超时时间。
            result = future.result(timeout=3)
            print(f"Task result: {result}"
        # 超时错误的情况
        except concurrent.futures.TimeoutError:
            print("任务执行超时,返回None")
            result = None
        # 其他错误的情况
        except Exception as e:
            print(f"An error occured: {e}"")


3.4 共享数据

共享的队列、数据、数列、事件等,用来模拟各种仓库、指令等,用于信息传递。

import multiprocessing as mp

# 共享队列,相当于list,可以存放各种类型的数据,不过为了处理方便,最好是同一个类型的数据。队列是先进先出的。栈是后进先出的。

# 创建一个可共享的队列并设置最大不超过20个元素,否则可能会阻塞或者报异常
data_queue = mp.Queue(maxsize=20)
# 往队列里添加元素item;block为True时,意味着队列满了就会阻塞等待,如果设置为False,就会抛出Full异常,默认为True;timeout仅在block为True时有效,如果设置了timeout,则当队列满了的时候,阻塞超过了timeout时间就会抛出mp.queues.Full异常。
data_queue.put(item=XX, block=True, timeout=XX秒)

# 提取数据,block参数如果设置为True,则当队列为空时,会阻塞,False则直接抛出Empty异常,默认为True;Timeout同样是Block为True时,阻塞时长超过了timeout时,就抛出mp.queues.Empty异常。
result = data_queue.get(timeout=XX秒)

# 共享单个数值: 共享值的数据类型,int缩写为i,double缩写为d,float应该缩写为f;*args表示用于初始化共享值的参数,取决于具体的数据类型;lock则默认为True,表示自动为共享值创建一个锁,以确保线程安全;如果设置为False,则不会创建锁,需要手动管理同步。
# data_value = mp.Value(typecode_or_type, *args, lock=True)
data_value = mp.Value('int', 2)

# 修改共享值
data_value.value = 10

# 多进程访问时要上锁
def increment(shared_value):
    # 获取锁
    lock = shared_value.get_lock()
    with lock:
        # 修改共享值
        shared_value.value += 1
    print(f"当前值: {shared_value.value}")
    

# 如果要手动上锁
# 创建一个共享的整数变量,初始值为 0,不自动创建锁
shared_int = multiprocessing.Value('i', 0, lock=False)
# 手动创建一个锁
lock = multiprocessing.Lock()

def increment(shared_value, lock):
    # 获取锁
    with lock:
        # 修改共享值
        shared_value.value += 1
    print(f"当前值: {shared_value.value}")
    
    
# 使用mp.Evaent来设置共享开关。它有两种状态,set和clear。初始状态下,默认是 clear状态。
shared_event = mp.Event()
# 将状态改为set
shared_event.set()
# 将状态改为clear
shared_event.clear()
# 检查状态是否为set
if shared_event.is_set():
    print("it's set")
else:
    print("it's clear")
# 等待开关被set;如果被set,则返回True;如果没有被set,会被阻塞;若设定有timeout,则在阻塞指定时间内被set,就返回True,并不再阻塞;否则,超出时间后返回False。
shared_event.wait(timeout=None)


4. 代码示例

4.1 原型代码

我们创建了基类用于继承或者定制实例化来实现工厂模型。代码可以直接拷贝到py文件中运行。

import multiprocessing as mp
from queue import Empty
import concurrent.futures as cf
import time

# 定义一个工人,应该继承自一个进程,输入半成品队列,输出成品队列
# 这个Worker是个基类,应该被不同工种的工人来继承,重写process_task方法,处理任务。
class Worker(mp.Process):
    def __init__(self, task_queue: mp.Queue, result_queue: mp.Queue, end_signal: mp.Event):
        mp.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.end_signal = end_signal
        self.task_seq = 0   # 记录当前完成的是第几个任务,用于演示多线程和锁。# 这里要用multiprocessing的lock,而不能是threading的lock,因为无法被子进程序列化。
        self.thread_lock = mp.Lock()

    def run(self):
        # 重写父类的run方法
        # 工人停止工作的条件是以下两点同时满足:
        # 1. 收到停止信号;
        # 2. 没有任务了。
        while not (self.end_signal.is_set() and self.task_queue.empty()):
            # 如果有任务,就取出来,处理,然后放入结果队列
            try:
                thread_tasks = []

                with cf.ThreadPoolExecutor(max_workers=12) as executor:
                    for i in range(12):
                        # 每12个任务建成一个batch
                        try:
                            task = self.get_task(timeout=1)
                            thread_tasks.append(executor.submit(self.process_task, task))
                        except Empty:
                            print("1s内暂时没接到更多活了,就先处理手头的工作")
                            break
                    if thread_tasks:
                        for future in cf.as_completed(thread_tasks):
                            try:
                                # 这里要加上timeout,从底层worker的任务开始,限制完成时间,一旦没有完成就不做了,确保整体工程不会被阻塞。
                                result = future.result(timeout=3)
                                with self.thread_lock:
                                    self.task_seq += 1
                                    print("I am working on task: %d" % self.task_seq)
                                self.result_queue.put(result)
                            except cf.TimeoutError:
                                print("线程超时了,这个任务不做了: %s" % future)
                                continue
                    else:
                        continue

                # 干完一组活儿就歇息0.1秒,避免太累
                print("I am resting for 0.1 seconds")
                time.sleep(0.1)
            except Empty:
                print("暂时没活了,歇两秒再看看")
                time.sleep(2)
            except Exception as e:
                # 处理出错了,那么这个任务就不做了,进行下一个
                print("处理出错了: %s" % e)
                # raise ValueError("Worker process error")
                continue

    def get_task(self, timeout=2):
        # 从任务队列中获取任务
        # TODO:当面对创建进程很费时,但是完成1个任务很快的情况时,且可以并行完成多个任务时,可以使用线程池来优化,同时取多个任务。
        return self.task_queue.get(timeout=timeout)

    def put_result(self, result):
        # 将结果放入结果队列
        self.result_queue.put(result)

    def process_task(self, task):
        # TODO:需要重写
        # 处理任务
        return task


# 定义一个车间,含有很多工人。车间负责整体领取任务并汇总产出;分配任务给工人,并决定什么时候终止。
# 可以有不同的车间,包含不同类型的工人,用于干不同的活。车间之间用task_queue和result_queue进行通信。
# 这个不用继承;对于不同的车间,定义不同的worker_type、task_queue、result_queue、pre_end_signal、next_end_signal等参数即可。
class Workshop(mp.Process):
    def __init__(self, num_workers: int, worker_type, task_queue: mp.Queue, result_queue: mp.Queue, pre_end_signal: mp.Event, next_end_signal: mp.Event):
        mp.Process.__init__(self)
        self.num_workers = num_workers
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.pre_end_signal = pre_end_signal    # 上一个车间的结束信号
        self.next_end_signal = next_end_signal  # 发送给下一个车间的结束信号
        self.workers = []
        self.worker_type = worker_type  # 工人的类型,是继承自Worker的子类

    def run(self):
        # 1. 安排工作并启动所有工人
        for i in range(self.num_workers):
            # 工人需要直到任务队列,结果队列,以及停止通知
            # 当车间收到上一个车间停止通知时,要求上个车间的所有工作都已经处理完毕,同时当前车间的所有工人也同时收到通知。
            worker = self.worker_type(self.task_queue, self.result_queue, self.pre_end_signal)
            worker.start()
            print("worker %d is working" % i)
            self.workers.append(worker)

        # 2. 循环获取任务,直到收到停止信号;
        # 收到停止信号后,让工人们把已经放到队列的任务都处理完,就停止工作。
        while True:
            if self.pre_end_signal.is_set():
                break
            else:
                # 如果没有接到停止信号,就一直做工作;如果没有数据,就等待。
                time.sleep(2)

        # 3. 工人们完成所有任务队列的工作后,停止工作。
        for worker in self.workers:
            worker.join()   # 等待所有工人完成工作

        # 4. 确保等工人们都完成了当前任务队列的工作后,给下一个车间发送终止信号。
        self.next_end_signal.set()


# 定义工作流,提供原始数据和最终结果的队列,以及工人的数量;创建车间,并启动
# 我们这里定义的是完全线性的工作流,每个车间都是串行的,即一个车间处理完一个任务后,再处理下一个任务。我们把这样的工厂及其工作流抽象为一个类。
# 对于其他非完全线性的工作流,要么用多个线性流去组配;要么自己专门定制一个主函数。
# 既然可以多个线性流组配,那么它也是继承于Process类,可以独立去运行。
# 1. 创建原始数据队列和最终结果队列
# 2. 创建车间,并启动
# 3. 车间启动后,会自动启动工人。
# 4. 等待车间停止后,结束整个工作流。
class Workflow(mp.Process):
    # 初始化工厂时,先不启动,等到run时再启动。
    def __init__(self, input_data, output_queue: mp.Queue, output_event: mp.Event, worker_types: list, num_workers: list):
        mp.Process.__init__(self)
        self.input_data = input_data
        self.num_workers = num_workers
        self.worker_types = worker_types

        # 1. 挨个创建停止信号,数量要比车间数量多一个;最有一个event也是主进程传递进来的,用于和主进程通信。
        self.events = []
        for i in range(len(self.worker_types)):
            self.events.append(mp.Event())
        self.events.append(output_event)

        # 2. 挨个创建输入输出流,数量要比车间数量多一个;不过最后一个是主进程传过来的,用于主进程接收。
        self.product_queues = []
        for i in range(len(self.worker_types)):
            self.product_queues.append(mp.Queue())
        self.product_queues.append(output_queue)

    def process_data(self):
        # 处理输入数据
        # TODO:根据实际情况重写
        for i in self.input_data:
            self.product_queues[0].put(i)
            print("input data: %d" % i)

    def run(self):
        # 1. 处理输入数据,放到第一个product_queue里供后续车间消费;发送第一个停止信号。
        self.process_data()
        self.events[0].set()

        # 3. 挨个创建车间,根据worker_types的顺序,以及工作流顺序去创建。
        workshops = []
        for i in range(len(self.worker_types)):
            # 创建一个车间,并启动
            workshop = Workshop(self.num_workers[i], self.worker_types[i], self.product_queues[i], self.product_queues[i + 1],
                                self.events[i], self.events[i + 1])
            workshop.start()
            workshops.append(workshop)

        # 3. 等待所有车间工作都完成
        for workshop in workshops:
            workshop.join()

        # 4. 无法return来返回数据,通过最后一个queue来和主进程通信。
        self.events[-1].set()


def main():
    # 我们创建一个简单的工作,输入数据是一个数字序列,返回它们的平方
    input_data = [i for i in range(1, 10001)]
    worker_types = [Worker]
    num_workers = [4]
    output_queue = mp.Queue()
    output_event = mp.Event()
    workflow = Workflow(input_data, output_queue, output_event, worker_types, num_workers)

    # 启动工厂
    workflow.start()

    # 同步处理工厂输出的数据
    while not (output_event.is_set() and output_queue.empty()):
        try:
            data = output_queue.get(timeout=1)
            print("output data: %d" % data)
        except Empty:
            print("Empty output queue, waiting 2 seconds..")
            time.sleep(2)

    # 等待工厂结束
    workflow.join()


if __name__ == "__main__":
    main()

相关文章:

  • 基于Redis分布锁+事务补偿解决数据不一致性问题
  • 大数据E10:基于Spark和Scala编程解决一些基本的数据处理和统计分析,去重、排序等
  • 论文阅读:2023 EMNLP SeqXGPT: Sentence-level AI-generated text detection
  • 盛铂科技国产SLMF315超低相位噪声频率综合器介绍
  • SpringBoot有几种获取Request对象的方法
  • 龙虎榜——20250321
  • 第五章 起航18 管理会议信息同步
  • 计算机操作系统(三) 操作系统的特性、运行环境与核心功能(附带图谱更好对比理解))
  • 游戏引擎学习第173天
  • JAVA————十五万字汇总
  • QPrintDialog弹出慢的问题
  • 图表的黄金比例
  • clamav服务器杀毒(Linux服务器断网状态下如何进行clamav安装、查杀)
  • Office 2024 专业版系统安装
  • 黑马程序员-微服务开发-MybatisPlus的使用
  • 【LLM学习】论文学习-Qlora: QLoRA: Efficient Finetuning of Quantized LLMs
  • docker compose部署minio报错
  • 到底爱不爱我
  • 【数据挖掘】数据预处理——以鸢尾花数据集为例
  • 网络空间安全(41)权限维持
  • 一海南救护车在西藏无任务拉警笛开道,墨脱警方:已处罚教育
  • 印称印巴军事行动总指挥同意将局势降级
  • 中国恒大:清盘人向香港高等法院申请撤回股份转让
  • 夜读丨取稿费的乐趣
  • “春申阡陌”漆画展:将传统漆艺融入现代创作
  • 《致1999年的自己》:千禧之年的你在哪里?