当前位置: 首页 > news >正文

[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池

[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池

简介

共享数据

一、Value 和 Array

二、Manager

信号量

事件

进程池

一、进程池的介绍

二、进程池的使用

三、基于进程池的 Socket 应用

四、回调函数

简介

        在 Python 编程中,并发编程能够显著提升程序的执行效率。多进程作为并发编程的重要方式之一,能够充分利用多核 CPU 的优势,实现真正的并行计算。在前面多篇博客的介绍多进程的主要内容介绍得差不多了:

  • [Python学习日记-84] 并发编程之多进程 —— 进程理论
  • [Python学习日记-85] 并发编程之多进程 —— multiprocessing 模块、Process 类、join 方法、僵尸进程与孤儿进程
  • [Python学习日记-86] 并发编程之多进程 —— 守护进程
  • [Python学习日记-87] 并发编程之多进程 —— 互斥锁
  • [Python学习日记-88] 并发编程之多进程 —— 队列与生产者消费者模型

        但是还有共享数据、信号量、事件、进程池这些尾巴,其中信号量、事件只需要了解一下就可以了,下面我们一起来看看这些知识吧。

共享数据

        目前基于消息传递的并发编程是必然趋势,即使使用线程,也更趋向于将系统构建为松耦合的线程池体系。多进程中各进程之间的数据还是独立的,可以借助基于消息传递的队列或管道进行数据交换,这样不仅能显著降低对锁及其他同步机制的依赖,还可通过消息中间件的分布式特性无缝扩展至分布式环境。但是由于数据安全的问题,进程间通信应尽量避免采用共享数据的方式来进行通信。

        虽然进程间的数据是独立的,但是在 multiprocessing 中提供了两种方式来实现数据共享,一种是通过 Value 和 Array;另一种是通过 Manager 来实现,这两种方法都需要使用锁来保证数据操作的安全性

一、Value 和 Array

        multiprocessing 模块中的 Value 和 Array 可以在进程间共享简单的数据类型和数组,代码如下所示

Value

import multiprocessing


def increment_shared_value(shared_value):
    with shared_value.get_lock():
        shared_value.value += 1
        print(f"当前共享值为: {shared_value.value}")


if __name__ == '__main__':
    shared_num = multiprocessing.Value('i', 0)
    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=increment_shared_value, args=(shared_num,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f"共享值最终为: {shared_num.value}")

输出结果为:

Array

import multiprocessing


def increment_shared_value(shared_array, n):
    with shared_array.get_lock():
        for i in range(n):
            shared_array[i] += 1
        print(f"当前共享值为:{[shared_array[i] for i in range(n)]}")


if __name__ == '__main__':
    shared_num = multiprocessing.Array('i', [0, 0, 0, 0, 0, 0, 0, 0, 0])
    n = len(shared_num)
    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=increment_shared_value, args=(shared_num, n,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f"共享值最终为:{[shared_num[i] for i in range(n)]}")

输出结果为:

二、Manager

        multiprocessing 模块中的 Manager 提供了更灵活的方式来共享复杂的数据结构,Manager() 返回的管理器对象使用了一个服务器进程,该进程保存 Python 对象,并允许其他进程通过代理来操作它们;该管理器支持多种复杂的数据结构,分别有:列表(list)、字典(dict)、命名空间(Namespace)、锁(Lock)、RLock、信号量(Semaphore)、有界信号量(BoundedSemaphore)、条件(Condition)、事件(Event)、Barrier、队列(Queue)、值(Value)和数组(Array)等。代码如下所示

import multiprocessing


def append_to_shared_list(shared_list, lock):
    with lock:    # 不加锁而操作共享的数据,肯定会出现数据错乱
        shared_list.append(multiprocessing.current_process().pid)
        print(f"当前共享列表:{shared_list}")



if __name__ == '__main__':
    lock = multiprocessing.Lock()
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=append_to_shared_list, args=(shared_list,lock,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f"共享列表: {shared_list}")

输出结果为:

信号量

        信号量(Semaphore)是一种用于控制并发访问数量的同步原语。信号量与互斥锁进行对比,互斥锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据,例如,像超市结算排队,一共有3条队(指定信号量为3),那就是说最多只允许同时3个人一起结账(来一个人就会分配一把锁并计数加1),后面来结账的人只能等前面的人完成后才能进行结账。信号量与进程池的概念很像,最主要的区别在于信号量涉及到加锁的概念。

        我们来看一个示例,假设有一个数据库连接池,最多允许3个进程同时使用连接,代码如下所示

import multiprocessing
import time


def use_database_connection(semaphore):
    with semaphore:
        print(f"进程 {multiprocessing.current_process().pid} 获得数据库连接")
        time.sleep(2)
        print(f"-----进程 {multiprocessing.current_process().pid} 释放数据库连接-----")


if __name__ == '__main__':
    semaphore = multiprocessing.Semaphore(3)
    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=use_database_connection, args=(semaphore,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()

输出结果为:

事件

        事件(Event)是一种简单的同步机制,用于线程或进程之间的通信和同步。一个进程可以通过设置事件来通知其他进程某个条件已经满足,例如,主进程可以通过事件来控制其他进程的执行。

事件处理的机制:事件在全局定义了一个 Flag,其为布尔值,该变量的值会影响 wait()、set()、clear() 方法的执行

事件主要提供三个方法

  • wait():如果值为 False 时,执行 wait() 方法会阻塞,为 True 则不会
  • set():将 Flag 设为 True
  • clear():将 Flag 设为 False

         我们来看一个红绿灯的示例,一共5条车道, Flag 值为 False 为红灯,值为 True 为绿灯,代码如下所示

import multiprocessing
import time,random


def police_car(e,n):
    while True:
        time.sleep(random.randint(3, 6))    # 不同警车停下的时间随机
        if not e.is_set():
            print('\033[31m红灯亮%s\033[0m,car%s等着' % (e.is_set(), n))
            e.wait()    # 等待变绿灯
            print('\033[92m绿灯亮%s\033[0m,警车走了,car %s' %(e.is_set(),n))

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)    # 红绿灯等待时间
        if e.is_set():  # 判断是否变绿灯
            print("\033[31m现在是红灯\033[0m")
            e.clear() # e.is_set() ---->False
        else:
            print("\033[92m现在是绿灯\033[0m")
            e.set()  # e.is_set() ---->True

if __name__ == '__main__':
    e=multiprocessing.Event()

    for i in range(5):
        p = multiprocessing.Process(target=police_car, args=(e, i,))
        p.start()
    t = multiprocessing.Process(target=traffic_lights,args=(e,10))
    t.start()

    print('=====开始通行=====')

输出结果为:

进程池

官方文档:https://docs.python.org/dev/library/concurrent.futures.html

一、进程池的介绍

        进程池(Process Pool)是一种用于管理和复用进程资源的机制。在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录或者远程控制多台主机,使用并行操作可以节约大量的时间,而之前介绍的多进程就是实现并发的手段之一,在进行并发时会面临一些问题:

  1. 通常需要并发执行的任务数会远远大于 CPU 的核心数,而一个操作系统并不会无限开启进程,而是根据核心数来开启进程(即有几个核心就开几个进程,开启多于核心数的进程无法做到并行)
  2. 当进程开启过多会占用大量的系统资源,导致运行效率将会不升反降

        例如,当操作的对象量不是很大时,可以直接利用 multiprocessing 模块中的 Process 动态成生多个进程,但数量到达成百上千时,去手动限制进程数量会及繁琐又低效,这时就应该使用进程池了。我们可以通过创建一个进程池,可以预先创建一定数量的进程,并将任务分配给这些进程执行,从而避免了频繁创建和销毁进程的开销,并通过维护进程池来控制进程的数量,例如,httpd 的进程模式,就可以规定最小进程数和最大进程数。

        对于远程过程调用的高级应用程序而言,应该使用进程池,进程池可以提供指定数量的进程供用户调用,当有新的请求提交到进程池中时,如果池还没有满就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,当池中有进程结束时就会重用进程池中结束的进程。

        创建进程池可以使用 multiprocessing 模块中的 Pool 类,其语法如下:

Pool([numprocess [,initializer [, initargs]]])

类的参数介绍

  • numprocess:要创建的进程数,例如,指定 numprocess 为3,则进程池会从无到有创建3个进程,然后自始至终使用这3个进程去执行所有任务,不会开启其他进程,如果省略,将默认使用 cpu_count() 的值
  • initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
  • initargs:是要传给 initializer 的参数组

主要方法介绍: 

  • apply(func, args=(), kwds={}):在一个池工作进程中执行 func(*args,**kwargs),然后返回结果。需要强调的是,此操作并不会在所有池工作进程中并执行 func 方法。如果要通过不同参数并发地执行 func 方法,必须从不同线程调用 apply() 方法或者使用 apply_async() 方法
  • map(func, iterable, chunksize=None):与 apply() 一样,但 map() 是对一个可迭代对象中的每个元素应用同一个 func。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
  • apply_async(func, args=(), kwds={}, callback=None, error_callback=None):在一个池工作进程中异步地执行 func(*args,**kwargs),然后返回结果。此方法的结果是 AsyncResult 类的实例,回调函数(callback)是可调用对象,接收输入参数。当 func 的结果变为可用时,将立即传递给 callback。callback 禁止执行任何阻塞操作,否则将接收其他异步操作中的结果
  • map_async(func, iterable, chunksize=None, callback=None, error_callback=None):与 apply_async() 一样都是异步执行 func,但 map_async() 是对一个可迭代对象中的每个元素应用同一个 func。例如,一个包含多个数字的列表,要对列表里的每个数字执行相同的计算操作。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
  • close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
  • jion():等待所有工作进程退出。此方法只能在 close() 或 teminate() 之后调用

其他方法介绍

        方法 apply_async() 和 map_async() 的返回值都是 AsyncResult 的实例(obj),其具有以下方法。

  • obj.get(timeout=None):返回结果,如果有必要则等待结果到达。timeout 是可选的,如果未指定该参数或者将其设为 None,则会一直等待,直到执行完成并返回结果。如果在指定时间内还未完成,将抛出 TimeoutError 异常。如果远程操作中引发了异常,它将在调用此方法时再次抛出 Exception 异常
  • obj.ready():如果调用完成,返回True
  • obj.successful():如果调用完成且没有引发异常,返回True,如果在任务已经完成之前调用此方法(即 obj.ready() 返回 True),会引发 AssertionError 异常
  • obj.wait(timeout=None):让当前程序等待异步任务完成
  • obj.terminate():立即终止所有工作进程,将不执行任何清理或结束任何挂起工作。如果进程池被垃圾回收机制回收,将自动调用此函数

二、进程池的使用

同步调用

from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限;维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
    print("==============================>")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l) #看到的就是最终的结果组成的列表
    for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
        print(i)

输出结果:

异步调用

from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   # 同步运行,阻塞,直到本次任务执行完毕拿到res;并维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)
    print("==============================>") # 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了

    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    pool.close()    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l)    # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
    for i in res_l:
        print(i.get())  # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

输出结果:

三、基于进程池的 Socket 应用

        Pool 内的进程数默认是 CPU 核心数,假设为4(查看方法 os.cpu_count()),开启6个客户端,其中会有2个客户端处于等待状态,而在每个进程内查看 pid,只使用了4个 pid,即多个客户端公用4个进程,代码如下所示

服务器端

from socket import *
from multiprocessing import Pool
import os


server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('用户 %s 进程pid: %s' % (client_addr, os.getpid()))
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool()
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

四、回调函数

        回调函数(Callback Function)扮演着重要的角色,尤其是在处理异步任务时。当我们使用进程池或异步启动进程执行任务时,任务的执行是异步的,这意味着主进程不会等待任务完成就会继续执行后续代码。而回调函数提供了一种机制,让我们可以在任务完成后自动执行特定的操作。

        具体使用回调函数的场景为,进程池中任何一个任务一旦处理完了,就立即告知主进程,然后主进程则调用一个函数去处理该结果,该函数即回调函数。例如,可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了 I/O 的过程,直接拿到的执行完成后的结果。代码如下所示

from multiprocessing import Pool
import requests
import os


def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l])    # 拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

输出结果:

        在爬虫方面也可以用回调函数来进行数据处理,代码如下所示

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url, pattern):
    response = requests.get(url)    # 目前猫眼存在验证中心,先跳过,直接使用复制的html
    # 用于替代抓取结果的html代码,测试时自行去url抓取
    text = r"""
    <!DOCTYPE html>
    <!-- saved from url=(0030)https://www.maoyan.com/board/7 -->
    <html><!--<![endif]--><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
      <title>热映口碑榜 - 猫眼电影 - 一网打尽好电影</title>
    ...
    </style><div id="mount" style="display: block;"></div></template></div></html>"""
    if response.status_code == 200:
        return text, pattern

def parse_page(info):
    page_content, pattern = info    # page_content为html代码,pattern为正则表达式
    res = re.findall(pattern, page_content) # 正则表达式对html代码进行过滤
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]
        }
        print(dic)


if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'https://www.maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)  # 使用回调函数来处理抓到的html代码
        res_l.append(res)

    for res in res_l:
        res.get()

    # 爬虫实现的关键代码,但无法越过验证中心
    # res=requests.get('https://www.maoyan.com/board/7')
    # print(re.findall(pattern1,res.text))

输出结果:

注意:如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数 

http://www.dtcms.com/a/111275.html

相关文章:

  • 【nginx】Nginx的功能特性及常用功能
  • PostgreSQL:表分区与继承
  • OBS录制的一些日志
  • python如何把列表中所有字符变成小写
  • GATT(Generic Attribute Profile)是蓝牙低功耗(Bluetooth Low Energy,简称BLE)协议栈中的一个核心协议
  • 【蓝桥杯】算法笔记3
  • 函数栈帧的创建与销毁
  • 言语理解与表达
  • 实战交易策略 篇十四:江南神鹰捕捉热点和熊市生存交易策略
  • 专为 零基础初学者 设计的最简前端学习路线,聚焦核心内容,避免过度扩展,帮你快速入门并建立信心!
  • 第15周:注意力汇聚:Nadaraya-Watson 核回归
  • 生成 SSH Key 并配置 GitHub/GitLab 详细教程
  • 深入理解AOP:面向切面编程的核心概念与实战应用
  • ctfshow VIP题目限免 robots后台泄露
  • 规则引擎Drools
  • 【KMP】P10915 [蓝桥杯 2024 国 B] 最长回文前后缀|普及+
  • RHCSA Linux 系统 文件的查看、复制、移动、重命名
  • 阿里巴巴langengine二次开发大模型平台
  • 压测工具开发实战篇(二)——构建侧边栏以及设置图标字体
  • Linux(十二)信号
  • SQL注入重新学习
  • OpenEuler/CentOS一键部署OpenGauss数据库教程(脚本+视频)
  • openmv用了4个了,烧了2个,质量堪忧啊
  • 基于FPGA的特定序列检测器verilog实现,包含testbench和开发板硬件测试
  • 鸿蒙 ——选择相册图片保存到应用
  • 第11/100节:三点估算
  • Muduo网络库实现 [十五] - HttpContext模块
  • 传统开发者视角:智能合约与区块链数据库探秘
  • 实操(进程状态,R/S/D/T/t/X/Z)Linux
  • im即时通讯支持红包收发分销功能,带内嵌web页面,已经测试完美运行