当前位置: 首页 > news >正文

Python进程知多少

目录

目标

Python版本

官方文档

概述

进程(Process)的基本概念

进程之间的通信方法

进程同步

进程间共享状态

实战

创建进程的基本语法

创建进程并传递复杂的参数

进程同步&进程通信

共享内存

基于服务器进程实现共享

基于队列实现进程安全

生产者&消费者模型(基于队列)

生产者&消费者模型(基于管道) 


目标

        掌握进程的基本概念和使用方法,包括:创建进程、进程同步、进程间共享状态、进程通信。


Python版本

        Python 3.9.18


官方文档

multiprocessing — Process-based parallelismhttps://docs.python.org/3.9/library/multiprocessing.html


概述

进程(Process)的基本概念

        进程是操作系统分配资源的最小单位。线程则是操作系统的最小调度单位。它们之间的关系是:线程是进程的一个执行单元,多个线程可以共享进程的资源。Python官方对于进程做了如下描述:

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

        从官方文档我们得到以下几点信息:

  • multiprocessing模块提供了进程相关的API,风格与线程的API类似。
  • 进程绕过了全局解释器锁(GIL),实现了真正的并行。因此能充分利用多核CPU,特别适合CPU密集型任务(如数据处理、图像处理、机器学习等)。
  • 支持本地和远程并发操作。

进程之间的通信方法

        最常见的通信方法有三种:队列(Queue)、管道(Pipe)、外部存储(如:多个进程读写同一个文件、数据库、各种中间件等)。官方文档的multiprocessing模块介绍了队列和管道:

Queues are thread and process safe.

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). 
The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). 

意思是:

  • 队列在线程和进程上是安全的。
  • 管道的Pipe()方法返回的是一对连接对象,代表管道的两端。且两端的对象都可以收发数据。
  • 管道适合1对1的进程通信。

进程同步

官方文档

multiprocessing contains equivalents of all the synchronization primitives from threading.

        multiprocessing模块提供了与 threading 模块相同的同步原语,用于管理进程间的同步和共享资源的访问。同步原语的意思是,用于协调多线程的工具(如:锁、事件、信号量等)在multiprocessing模块中也都存在。


进程间共享状态

官方文档

        

As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.

However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

Shared memory

Data can be stored in a shared memory map using Value or Array. 

Server process

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

官方不推荐进程间共享状态,但是仍然提供了两种进程间共享状态的方法:共享内存、通过服务器进程共享

共享内存:使用multiprocessing.Value或multiprocessing.Array让多个进程访问同一块共享内存。适用于小规模数据共享,性能较高。

通过服务器进程共享:multiprocessing.Manager() 创建一个服务进程,所有进程可以通过代理访问该进程中的对象。支持很多复杂数据类型


实战

创建进程的基本语法

import os
from multiprocessing import Process,current_process
import time

def fun(name):
    print(f"hello {name}")
    print(f"进程ID:{os.getpid()};进程名称:{current_process().name}")
    time.sleep(1)

if __name__ == '__main__':
    process=Process(target=fun,name="MyProcess",args=("张三",))
    process.start()
    process.join()
    print("主线程结束。")

创建进程并传递复杂的参数

import os
from multiprocessing import Process, current_process
import time

def fun(*args, **kwargs):
    name, age, sex = args
    print(f"姓名:{name},年龄:{age},性别:{sex}")
    scores = kwargs.get("scores", {})
    teachers = kwargs.get("teachers", {})
    print("分数如下:")
    for subject in scores:
        print(subject, scores[subject], end="\n")
    print("各科目的老师如下:")
    for subject in teachers:
        print(subject, teachers[subject], end="\n")

if __name__ == '__main__':
    student_info = ("张三", 18, "男")
    scores = {
        "语文": 90, "数学": 100, "英语": 99
    }
    teachers = {
        "语文": "李四", "数学": "王五", "英语": "赵六"
    }
    process = Process(
        target=fun,
        name="MyProcess",
        args=student_info,
        kwargs={"scores":scores, "teachers":teachers}
    )
    process.start()
    process.join()
    print("主线程结束。")

输出

姓名:张三,年龄:18,性别:男
分数如下:
语文 90
数学 100
英语 99
各科目的老师如下:
语文 李四
数学 王五
英语 赵六
主线程结束。


进程同步&进程通信

共享内存

思考:通过以下案例可以发现一个问题为什么count作为全局变量,在函数中修改count值不需要提前用global修饰?

答:通过multiprocessing.Value创建的变量是共享内存的,它在多进程中是共享的。而global只针对当前进程中的多个进程实现共享全局变量。

import multiprocessing

"""
注意,这里的'i'表示整数类型,不是存储count值的key。以下是我列出来的各种类型:
    'b'	布尔类型 (bool)
    'i'	整型 (int)
    'l'	长整型 (long)
    'f'	单精度浮点型 (float)
    'd'	双精度浮点型 (double)
    'c'	字符串(char)
    'u'	Unicode 字符串
"""
count = multiprocessing.Value('i', 0)


def increment(count):
    for _ in range(100000):
        with count.get_lock():
            count.value += 1

if __name__ == '__main__':
    process = multiprocessing.Process(target=increment, args=(count,))
    process2 = multiprocessing.Process(target=increment, args=(count,))

    process.start()
    process2.start()

    process.join()
    process2.join()

    print(count.value)

基于服务器进程实现共享

import multiprocessing

def increment(count, lock):
    for _ in range(10000):
        with lock:
            count.value += 1

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    count = manager.Value('i', 0)
    lock = manager.Lock()

    process = multiprocessing.Process(target=increment, args=(count,lock))
    process2 = multiprocessing.Process(target=increment, args=(count,lock))

    process.start()
    process2.start()

    process.join()
    process2.join()
    #输出:20000
    print(count.value)

基于队列实现进程安全

import multiprocessing
import os


def increment(queue):
    count = 0
    for _ in range(100000):
        count += 1
        print(f"当前进程是:{multiprocessing.current_process().name},count={count}")
    queue.put(count)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process = multiprocessing.Process(target=increment, name="process_1", args=(queue,))
    process2 = multiprocessing.Process(target=increment, name="process_2", args=(queue,))

    process.start()
    process2.start()

    process.join()
    process2.join()

    total_count = 0
    while not queue.empty():
        total_count += queue.get()
    #输出:200000
    print(total_count)

生产者&消费者模型(基于队列)

        如果用进程实现生产者消费者模型,光用内存共享或者服务器进程是不够的,因为内存共享和基于服务器进程只能实现进程通信,而不能实现任务存储,所以需要使用队列。大家可以看看我用多线程实现的生产者消费者模型,通过类比也能发现两者之间的相似之处。

需求

        每次生产出一只烤鸭,就会被等待的消费者消费。生产烤鸭的总量是100000只。

import multiprocessing
#最大烤鸭数量
max_duck_count=100000
# 烤鸭总量设置为内存共享
duck_count = multiprocessing.Value('i', 0)
def produce_duck(duck_queue,duck_count):
    #生产者不停地生产烤鸭。
    while True:
        with duck_count.get_lock():
            if duck_count.value>=max_duck_count:
                duck_queue.put(None)
                break
            duck_count.value += 1
            duck_queue.put(duck_count.value)
            print(f"{multiprocessing.current_process().name}生产了第{duck_count.value}只烤鸭。")

def consume_duck(duck_queue):
    #消费者不停地消费
    while True:
        duck=duck_queue.get()
        if duck is None:
            break
        print(f"{multiprocessing.current_process().name}消费了第{duck}只烤鸭。")

if __name__ == '__main__':
    queue=multiprocessing.Queue()

    produce_duck_process_list = [
        multiprocessing.Process(target=produce_duck,name=f"生产者进程{_}",args=(queue,duck_count))
        for _ in range(5)
    ]
    consume_duck_process_list = [
        multiprocessing.Process(target=consume_duck,name=f"消费者进程{_}",args=(queue,))
        for _ in range(3)
    ]
    for produce_duck_process in produce_duck_process_list:
        produce_duck_process.start()
    for consume_duck_process in consume_duck_process_list:
        consume_duck_process.start()

    for produce_duck_process in produce_duck_process_list:
        produce_duck_process.join()
    for consume_duck_process in consume_duck_process_list:
        consume_duck_process.join()

    print("主线程结束。")

生产者&消费者模型(基于管道) 

需求

        每次生产出一只烤鸭,就会被等待的消费者消费。生产烤鸭的总量是100000只。

import multiprocessing
#最大烤鸭数量
max_duck_count=100000
# 烤鸭总量设置为内存共享
duck_count = multiprocessing.Value('i', 0)
def produce_duck(pipe,duck_count):
    #生产者不停地生产烤鸭。
    while True:
        with duck_count.get_lock():
            if duck_count.value>=max_duck_count:
                pipe.send(None)
                break
            duck_count.value += 1
            pipe.send(duck_count.value)
            print(f"{multiprocessing.current_process().name}生产了第{duck_count.value}只烤鸭。")

def consume_duck(pipe):
    #消费者不停地消费
    while True:
        duck=pipe.recv()
        if duck is None:
            break
        print(f"{multiprocessing.current_process().name}消费了第{duck}只烤鸭。")

if __name__ == '__main__':
    produce_duck_pipe,consume_duck_pipe=multiprocessing.Pipe()


    produce_duck_process_list = [
        multiprocessing.Process(target=produce_duck,name=f"生产者进程{_}",args=(produce_duck_pipe,duck_count))
        for _ in range(5)
    ]
    consume_duck_process_list = [
        multiprocessing.Process(target=consume_duck,name=f"消费者进程{_}",args=(consume_duck_pipe,))
        for _ in range(3)
    ]
    for produce_duck_process in produce_duck_process_list:
        produce_duck_process.start()
    for consume_duck_process in consume_duck_process_list:
        consume_duck_process.start()

    for produce_duck_process in produce_duck_process_list:
        produce_duck_process.join()
    for consume_duck_process in consume_duck_process_list:
        consume_duck_process.join()

    print("主线程结束。")

相关文章:

  • 【MySQL】在CentOS7环境下----手把手教你安装MySQL详细教程(附带图例详解!!)
  • Hbase伪分布安装教程,详细版
  • DeepSeek + 数据分析:让数据洞察更智能、更高效
  • 学习路程十一 langchain核心组件 Memory
  • Python Scrapy爬虫面试题及参考答案
  • 0x03 http协议和分层架构
  • 【子网掩码计算器:Python + Tkinter 实现】
  • Python中字符串的常用操作
  • Java注释/JDK开发工具生成API/关键字、标识符规范
  • 软件设计师-计算机系统知识:1. 计算机系统基础知识
  • (视频教程)Compass代谢分析详细流程及python版-R语言版下游分析和可视化
  • SpringBoot五:JSR303校验
  • 《几何原本》公理
  • Android15 am命令 APP安装流程
  • Python爬虫
  • flowable中用户相关api
  • 可观测之Tracing-bpftrace
  • C#委托(delegate)的常用方式
  • 【Qt-信号与槽】connect函数的用法
  • 单细胞分析(19)—— 单细胞转录组基因集评分方法
  • 河南省建设厅厅长/重庆白云seo整站优化
  • 企业网站制作开发/精品成品网站源码
  • 汽车o2o网站建设/推广网络营销案例
  • 独立网站模板下载/怎么优化网站关键词的方法
  • 做网站如何添加表单/网页设计模板图片
  • 公司网站英文/国外网站推广