当前位置: 首页 > news >正文

多任务——进程

1.进程的介绍

1. 进程的基本概念

在 Python 中,进程是操作系统分配资源和调度的基本单位,代表一个独立的程序执行实例。Python 的 multiprocessing 模块支持多进程编程,允许在单个程序中并行运行多个进程。每个进程拥有:

  • 独立的 Python 解释器实例

  • 独立的内存空间

  • 不受全局解释器锁(GIL)限制,适合 CPU 密集型任务

2. 使用进程的优点

✅ 并行计算

  • 多进程可充分利用多核 CPU,每个进程运行在单独的核心上,实现真正的并行计算。

✅ 内存隔离

  • 进程间不共享内存,避免数据竞争和同步问题,提高稳定性。

3. 需要注意的问题

⚠ 更高的资源开销

  • 进程的创建和销毁比线程更消耗资源(内存、CPU)。

⚠ 进程间通信(IPC)较复杂

  • 由于内存隔离,进程间通信需要特殊机制,如:

    • Queue(线程安全的队列)

    • Pipe(管道,适用于两个进程通信)

    • Value / Array(共享内存,需同步控制

4.进程的创建与使用:

以下是如何使用multiprocessing模块创建和启动两个进程的简单实例:

from multiprocessing import Processdef print_numbers():for i in range(5):print(i)def print_letters():for letter in 'abcde':print(letter)# 创建进程
process1=Process(target=print_numbers)
process2=Process(target=print_letters)# 启动进程
process1.start()
process2.start()# 等待进程完成
process1.join()
process2.join()

5.结论:

Python 的 多进程编程 提供了一种高效的并发执行方案,尤其适用于:
✅ CPU 密集型任务(如数值计算、数据处理)
✅ 需要隔离执行环境(避免数据竞争,提高稳定性)

对比线程的优势与代价

✔ 优势

  • 绕过 GIL 限制,真正并行执行

  • 充分利用多核 CPU,提升计算效率

  • 内存隔离,避免线程间的同步问题

⚠ 代价

  • 进程创建和切换开销更大(相比线程)

  • 进程间通信(IPC)更复杂(需使用 QueuePipe、共享内存等机制)

适用场景建议
  • 优先使用多进程:计算密集型任务(如机器学习、科学计算)

  • 优先使用多线程:I/O 密集型任务(如网络请求、文件读写)

正确使用多进程能显著提升 Python 程序的性能,但需权衡资源开销和开发复杂度。

2.进程的使用

1.进程的概述

程序 vs. 进程
程序进程
静态的代码文件(如 14.py动态执行的实例(程序运行时的状态)
存储在磁盘上加载到内存中执行
无执行状态包含代码、数据、内存、CPU 资源等
关键概念
  • 进程是操作系统分配资源的基本单位(CPU、内存、文件句柄等)。

  • 进程 = 程序代码 + 运行时资源(数据、内存、寄存器状态等)。

  • 每个进程独立运行,拥有自己的地址空间,互不干扰。

示例
  • 双击 14.py → Python 解释器运行它 → 创建一个进程(分配内存、CPU 时间等)。

  • 同时运行两个 14.py → 两个独立进程(即使代码相同,内存和状态各自独立)。

2.进程的创建与简单使用

multiprocessing 模块就是跨平台版本的多进程模块,提供了一个 Process 类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。

from multiprocessing import Processdef work():print('子进程任务启动...')if __name__ =="__main__":# 创建进程对象p = Process(target=work)# 启动进程p.start()

并行执行两个循环任务

import time
from multiprocessing import Processdef proc():while True:print("当前任务被子进行运行...")time.sleep(1)if __name__=='__main__':p = Process(target=proc)p.start()while True:print("当前任务被主进程运行...")time.sleep(1)

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动

获取进程的pid

import os
from multiprocessing import Processdef proc():print(f'子进程运行中,pid={os.getpid()}')print('子进行即将退出...')if __name__ == '__main__':print(f'父进程pid:{os.getpid()}')p=Process(target=proc)p.start()

 3.进程的内置参数与内置方法

multiprocessing.Process内置参数

参数名称参数作用
target如果传递了函数的引用,可以任务这个子进程就执行这里的代码
args给target指定的函数传递的参数,以元组的方式传递
kwargs给 target 指定的函数传递命名参数
name给进程设定名称,也可以不设定
group指定进程组,大多数情况下用不到

multiprocessing.Process内置方法

方法名称方法作用
start()启动子进程实例(创建子进程)
is_alive()判断子进程是否存活
join(<timeout>)是否等待子进行执行结束或等待多少秒
terminate()不管任务是否完成都立即终止子进程

 给子进程指定的函数传递参数

import os
import time
from multiprocessing import Processdef proc_1(name,age,**kwargs):for i in range(10):print(f'子进程运行中:{name},{age},{os.getpid()}')print(f'传递的不定长kw参数为:{kwargs}')time.sleep(0.5)if __name__ == '__main__':p=Process(target=proc_1,args=('admin',22),kwargs={'address':'克州'})p.start()time.sleep(1) # 主程序休眠一秒p.terminate() # 不管任务是否完成都立即终止子进程# 使用join方法确保了子进程已经被操作系统清理,如果没有调用join方法则子进程会成为僵尸进程p.join()print(p.is_alive()) # 判断子进程对象是否存活

进程间不共享全局变量

import os
import time
from multiprocessing import Processnums=[11,22]def work_1():print('子进程-1的pid为:%d,初始列表=%s'%(os.getpid(),nums))for i in range(1,4):nums.append(i)time.sleep(1)print('子进程-1的pid为:%d,操作后的列表=%s'%(os.getpid(),nums))def work_2():print('子进程-2的pid为:%d,全局列表=%s'%(os.getpid(),nums))if __name__ == '__main__':p1=Process(target=work_1)p1.start()p1.join()p2=Process(target=work_2)p2.start()

 3.进程与线程的对比

1. 功能区别

进程线程
功能完成多任务(如同时运行多个QQ)完成多任务(如一个QQ中的多个聊天窗口)
独立性独立运行,资源隔离共享进程资源,依赖进程存在

2. 定义对比

进程线程
资源分配系统分配资源的基本单位进程的一个执行单元
调度单位操作系统调度CPU调度和分派的基本单位
资源拥有独立内存空间和系统资源仅拥有必要资源(程序计数器、寄存器、栈),共享进程资源

3. 关键区别

对比维度进程线程
数量关系一个程序至少一个进程一个进程至少一个线程
资源占用独立内存单元,资源占用多共享内存,资源占用少
执行方式可独立执行必须依赖进程存在
并发性并发性较低(切换开销大)并发性高(切换开销小)
类比工厂中的一条完整流水线流水线上的工人

4. 优缺点分析

优点缺点
进程资源隔离,安全性高,稳定性强创建/切换开销大,通信复杂
线程创建/切换快,通信简单,高效并发资源共享导致同步问题,不稳定

5. 使用建议

  • 选择进程:需要安全隔离、CPU密集型任务

  • 选择线程:需要高效并发、I/O密集型任务

 4.进程通信

1. 概述

  • 必要性:进程间需要数据交换(如生产者-消费者模型)

  • 挑战:进程拥有独立内存空间,无法直接访问彼此数据

  • 解决方案:操作系统提供的IPC机制

2. 主要通信方式

机制实现方式特点Python实现适用场景
管道(Pipe)单向/双向字节流简单高效;仅适用于父子进程multiprocessing.Pipe()少量数据交换
队列(Queue)先进先出(FIFO)的消息队列线程安全;支持多生产者/消费者multiprocessing.Queue()多进程任务分发
共享内存映射同一块物理内存最快IPC方式;需要同步机制multiprocessing.Value/Array高性能数据共享
信号(Signal)系统预定义事件通知异步通信;信息量有限signal模块进程控制
套接字(Socket)网络通信接口支持跨网络通信;开销较大socket模块分布式系统
文件磁盘文件读写简单但效率低;需要处理竞态条件普通文件操作临时数据交换

3. Python多进程通信最佳实践

  1. 简单通信 → 使用Queue

  2. 高性能需求 → 共享内存+同步锁

  3. 跨机器通信 → Socket

  4. 父子进程 → Pipe

4. 注意事项

  • 避免死锁:获取多个锁时按固定顺序

  • 注意序列化:传输对象需可pickle

  • 资源释放:显式关闭通信通道

5.Queue的使用

可以使用multiprocessing模块的Queue实现多进程之间的数据传递。

Queue本身是一个消息列队程序,首先用一个实列来演示一下Queue的工作原理:

from multiprocessing import Queuequeue=Queue(3) # 初始化一个Queue对象,最多可接收三条put消息queue.put('消息1')
queue.put('消息2')
print(queue.full()) # 判断当前队列是否已满:Falsequeue.put('消息3')
print(queue.full())# 如果队列已满put_nowait会立即抛出异常,put等待两秒会抛出异常
# queue.put('消息4',True,2)
# queue.put_nowait()# 推荐的方式,先判断消息队列是否已满,再写入
if not queue.full():queue.put_nowait('消息4')# 读取消息时,先判断消息队列是否为空,再读取
if not queue.empty():for i in range(queue.qsize()):print(queue.get_nowait())

6.进程队列对象内置方法概述

方法名称

方法作用

Queue.qsize()

返回当前队列包含的消息数量

Queue.empty()

如果队列为空,返回True,反之False

Queue.full()

如果队列已满,返回True,反之False

Queue.get()

获取队列中的一条消息,然后将其从列队中移除

Queue.get_nowait()

如果队列为空则直接抛出异常,相当于Queue.get(False)

Queue.put()

item消息写入队列

Queue.put_nowait()

如果队列已满则直接抛出异常,相当于Queue.put(item, False)

7.进程队列代码示例

在父进程中创建两个子进程,一个往队列里写数据;一个从队列里读数据

import time,randomfrom multiprocessing import Process,Queue# 写数据进程执行的代码:
def write(queue):for item in ['A','B','C']:print(f'向队列上传的值为:{item}')queue.put(item)time.sleep(random.random())# 读数据进程执行的代码
def read(queue):while True:item=queue.get()if item is not None:print(f'向队列获取的值为:{item}')time.sleep(random.random())else:breakif __name__ == '__main__':q=Queue()p1=Process(target=write,args=(q,)) # 写入p2=Process(target=read,args=(q,)) # 读取p1.start()p2.start()# 等待写入任务完成后主进程继续向下执行p1.join()# 使用主进程向队列传递哨兵q.put(None)# 等待读取任务完成后主进程继续向下执行p2.join()print('主程序即将退出...')

5.进程池

1.使用Pool创建进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态 生成多个进程,但是如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

工作流程如下图:

代码示例

import os
import time
import random
from multiprocessing import Pooldef worker(proc_name,*args,**kwargs):p_start=time.time()print(f'{proc_name}开始执行,进程号为:{os.getpid()}')print('任务接收到的args不定长参数为:',args)print('任务接收到的kw不定长参数为:',kwargs)# random.random():随机生成0~1之间的浮点数time.sleep(random.random()*2)p_stop=time.time()print(f'{proc_name}执行完毕,子进程耗时为:{p_stop - p_start:.2f}')if __name__ == '__main__':# 主程序启动计时main_start=time.time()# 定义一个进程池,最大进程数3pool=Pool(3)for item in range(1,11):# apply_async(要调用的目标,(传递给目标的参数,))# 每次循环将会用空闲出来的子进程去调用目标pool.apply_async(worker,(item,'测试参数-1','测试参数-2'),kwds={'proc_kw':'proc_attr'})# 同步执行 执行该方法会导致主进程堵塞# pool.apply(worker,(item,))# 关闭进程池,关闭后pool不再接收新的请求pool.close()# 等待pool中所有子进程执行完成,必须放在close语句之后pool.join()main_stop=time.time()print(f'主程序耗时:{main_stop - main_start:.2f}')

multiprocessing.Pool常用函数解析:

方法名称

方法作用

apply_async()

使用非阻塞方式调用func

close()

关闭Pool,使其不再接受新的任务

terminate()

不管任务是否完成,立即终止任务

join()

主进程阻塞并等待子进程的退出, 必须在closeterminate之后使用

2.进程池中的Queue

如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager()中的Queue(),而不是 multiprocessing.Queue()。否则可能会得到一条如下的错误信息: RuntimeError: Queue objects should only be shared between processes through inheritance.

以下实列演示了进程池中的进程如何通信:

import os
import time
from random import randint
from multiprocessing import Manager,Pooldef reader(queue):print(f'读取任务启动,进程的pid为:{os.getpid()}')while True:message=queue.get()if message is not None:print(f'读取任务从队列中获取到消息为:{message}')else:breakdef writer(queue):print(f'写入任务启动,进程的pid为:{os.getpid()}')url_list=['https://www.baidu.com','https://www.goole.com','https://www.facebook.com','https://www.yahoo.com','https://www.linkedin.com','https://www.github.com','https://www.bing.com']for item in url_list:queue.put(item)time.sleep(randint(1,2)) # 模拟IO延迟queue.put(None) # 任务退出前传递哨兵信号if __name__ == '__main__':print('主程序启动...')# 无法使用进程中的Queue队列# q=Queue()# 使用Manager中的Queueq=Manager().Queue()pool=Pool()pool.apply_async(writer,(q,))pool.apply_async(reader,(q,))pool.close()pool.join()print('主程序即将退出...')

3.获取进程池返回值

同步/并发 获取返回值

import time
from random import random
from multiprocessing import Pool# 返回值从进程池获取,父子进程没有返回值
def proc_func(num):# 模拟随机延迟time.sleep(random())# 任务函数使用return将计算结果返回return num * numif __name__ == '__main__':pool = Pool(5)# 存放future对象result_list = list()for item in range(1, 11):result = pool.apply_async(proc_func, args=(item,))# 将future对象添加到列表中result_list.append(result)for item in result_list:# 使用进程池结果对象中的get方法获取结果print(item.get())if __name__ == '__main__':pool = Pool(5)

 利用map()方法获取返回值

import time
from random import random
from multiprocessing import Pooldef proc_func(num):time.sleep(random())return num*numif __name__ == '__main__':pool=Pool(5)result=pool.map(proc_func,range(1,11))print(result)

注意点:

  • map()是一次性获取所有子进程的返回值,自带close、join
  • apply_async是分批返回

4.使用 concurrent 模块创建进程池

concurrent 模块创建进程池的使用方式与线程池保持一致,以下为代码示例:

import os
import time
from random import randint
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutordef reader(queue):print(f'读取任务启动,进程的pid为:{os.getpid()}')while True:message=queue.get()if message is not None:print(f'读取任务从队列中获取到消息为:{message}')else:print('读取任务结束.')breakdef writer(queue):print(f'写入任务启动,进程的pid为:{os.getpid()}')url_list=['https://www.baidu.com','https://www.google.com','https://www.facebook.com','https://www.yahoo.com','https://www.linkedin.com','https://www.github.com','https://www.bing.com']for item in url_list:queue.put(item)time.sleep(randint(1,2)) # 模拟IO延迟queue.put(None) # 任务退出前传递哨兵信号def main():print('主程序启动...')with Manager() as manager:q=manager.Queue()with ProcessPoolExecutor() as executor:executor.submit(writer,q)executor.submit(reader,q)print('主程序即将退出...')if __name__ == '__main__':main()

6.进程中的生产者与消费者模式

在进程中使用生产者与消费者模式和线程稍有不同,以下为代码示例:

哨兵模式

import time
import queue
from multiprocessing import Process,Queue# 定义生产者函数
def producer(name,queue):for i in range(1,6):item=f'产品{i}'queue.put(item) # 将项放入队列中# 定义消费者函数
def consumer(name,queue):while True:item=queue.get() # 从队列中获取项if item is None:queue.put(item) # 为了让其他消费者也能停止,重新放入哨兵对象。break # 如果为None则停止循环print(f'消费者{name}消费了{item}')time.sleep(2) # 假设消费需要更多时间if __name__ == '__main__':# 创建队列对象q= Queue()# 创建和启动生产者进程producer_process=Process(target=producer,args=('小明',q))producer_process.start()# 创建和启动消费者线程consumer_process=Process(target=consumer,args=('小红',q))consumer_process.start()# 等待生产者进行执行完毕producer_process.join()# 使用主程序往队列中添加哨兵信号:Noneq.put(None)# 等待消费者进程完成consumer_process.join()# 主程序退出print('程序结束')

 task_done

在进程环境中的Queue不支持task_done()方法,需要替换成JoinableQueue

 

import time
from multiprocessing import Process,JoinableQueue as Queue# 定义生产者函数
def producer(name,queue):for i in range(1,6):item=f'产品{i}'queue.put(item) # 将项放入队列中print(f'生产者{name}生产了{item}')time.sleep(1) # 假设生产需要一些时间# 定义消费者函数
def consumer(name,queue):while True:item = queue.get() # 从队列中获取项print(f'消费者{name}消费了{item}')time.sleep(2) # 假设消费需要更多时间queue.task_done() # 指示队列中的任务已经处理完成if __name__ == '__main__':q= Queue()# 创建和启动生产者进程producer_process = Process(target=producer, args=('小明', q))producer_process.start()# 创建和启动消费者线程consumer_process = Process(target=consumer, args=('小红', q))# 将消费者设置为守护进程后启动consumer_process.daemon=Trueconsumer_process.start()# 等待生产者任务完成producer_process.join()# 等待队列任务完成,如果队列完成则允许主程序退出q.join()print('主程序结束...')

相关文章:

  • 1857. 有向图中最大颜色值
  • 前端面经 响应式布局
  • GitHub 趋势日报 (2025年05月31日)
  • Git 入门学习教程
  • day 1 任务以及具体安排:第一章 数组part01
  • 学习日记-day20-6.1
  • CppCon 2014 学习: C++ Test-driven Development
  • AI 医疗影像诊断:技术实现、临床应用与未来趋势 —— 以肺部 CT 早期肺癌检测为例
  • 通过内核寄存器排除HardFault
  • 头歌数据库课程实验(角色管理)
  • 正则表达式笔记
  • #14 【Kaggle】 Drawing with LLMs 金牌方案赏析
  • InternVL2.5-多模态大模型评估专业图片
  • IDEA PyCharm 等工具如何同时打开多个窗口
  • VeriFree:无需Verifier的通用RL框架
  • TensorFlow深度学习实战(19)——受限玻尔兹曼机
  • Git企业级项目管理实战
  • 3、禁止树莓派屏幕休眠,设置树莓派屏幕常亮
  • 动态规划第二弹:路径类问题(不同路径,珠宝的最高价值,地下城游戏)
  • 【ISP算法精粹】动手实战:用 Python 实现 Bayer 图像的黑电平校正
  • 网站架构设计图/互联网推广中心
  • 网站院长信箱怎么做/品牌推广方式
  • 浏阳疫情最新公告/手机系统优化软件
  • 怎样做 网站做seo/qq排名优化网站
  • 手机咋建网站/seo托管
  • 滦南县建设局网站/谷歌搜索引擎在线