做关于家乡的网站网店代运营十大排名
[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池
简介
共享数据
一、Value 和 Array
二、Manager
信号量
事件
进程池
一、进程池的介绍
二、进程池的使用
三、基于进程池的 Socket 应用
四、回调函数
简介
在 Python 编程中,并发编程能够显著提升程序的执行效率。多进程作为并发编程的重要方式之一,能够充分利用多核 CPU 的优势,实现真正的并行计算。在前面多篇博客的介绍多进程的主要内容介绍得差不多了:
- [Python学习日记-84] 并发编程之多进程 —— 进程理论
- [Python学习日记-85] 并发编程之多进程 —— multiprocessing 模块、Process 类、join 方法、僵尸进程与孤儿进程
- [Python学习日记-86] 并发编程之多进程 —— 守护进程
- [Python学习日记-87] 并发编程之多进程 —— 互斥锁
- [Python学习日记-88] 并发编程之多进程 —— 队列与生产者消费者模型
但是还有共享数据、信号量、事件、进程池这些尾巴,其中信号量、事件只需要了解一下就可以了,下面我们一起来看看这些知识吧。
共享数据
目前基于消息传递的并发编程是必然趋势,即使使用线程,也更趋向于将系统构建为松耦合的线程池体系。多进程中各进程之间的数据还是独立的,可以借助基于消息传递的队列或管道进行数据交换,这样不仅能显著降低对锁及其他同步机制的依赖,还可通过消息中间件的分布式特性无缝扩展至分布式环境。但是由于数据安全的问题,进程间通信应尽量避免采用共享数据的方式来进行通信。
虽然进程间的数据是独立的,但是在 multiprocessing 中提供了两种方式来实现数据共享,一种是通过 Value 和 Array;另一种是通过 Manager 来实现,这两种方法都需要使用锁来保证数据操作的安全性
一、Value 和 Array
multiprocessing 模块中的 Value 和 Array 可以在进程间共享简单的数据类型和数组,代码如下所示
Value:
import multiprocessingdef increment_shared_value(shared_value):with shared_value.get_lock():shared_value.value += 1print(f"当前共享值为: {shared_value.value}")if __name__ == '__main__':shared_num = multiprocessing.Value('i', 0)processes = []for _ in range(5):p = multiprocessing.Process(target=increment_shared_value, args=(shared_num,))processes.append(p)p.start()for p in processes:p.join()print(f"共享值最终为: {shared_num.value}")
输出结果为:
Array:
import multiprocessingdef increment_shared_value(shared_array, n):with shared_array.get_lock():for i in range(n):shared_array[i] += 1print(f"当前共享值为:{[shared_array[i] for i in range(n)]}")if __name__ == '__main__':shared_num = multiprocessing.Array('i', [0, 0, 0, 0, 0, 0, 0, 0, 0])n = len(shared_num)processes = []for _ in range(5):p = multiprocessing.Process(target=increment_shared_value, args=(shared_num, n,))processes.append(p)p.start()for p in processes:p.join()print(f"共享值最终为:{[shared_num[i] for i in range(n)]}")
输出结果为:
二、Manager
multiprocessing 模块中的 Manager 提供了更灵活的方式来共享复杂的数据结构,Manager() 返回的管理器对象使用了一个服务器进程,该进程保存 Python 对象,并允许其他进程通过代理来操作它们;该管理器支持多种复杂的数据结构,分别有:列表(list)、字典(dict)、命名空间(Namespace)、锁(Lock)、RLock、信号量(Semaphore)、有界信号量(BoundedSemaphore)、条件(Condition)、事件(Event)、Barrier、队列(Queue)、值(Value)和数组(Array)等。代码如下所示
import multiprocessingdef append_to_shared_list(shared_list, lock):with lock: # 不加锁而操作共享的数据,肯定会出现数据错乱shared_list.append(multiprocessing.current_process().pid)print(f"当前共享列表:{shared_list}")if __name__ == '__main__':lock = multiprocessing.Lock()manager = multiprocessing.Manager()shared_list = manager.list()processes = []for _ in range(5):p = multiprocessing.Process(target=append_to_shared_list, args=(shared_list,lock,))processes.append(p)p.start()for p in processes:p.join()print(f"共享列表: {shared_list}")
输出结果为:
信号量
信号量(Semaphore)是一种用于控制并发访问数量的同步原语。信号量与互斥锁进行对比,互斥锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据,例如,像超市结算排队,一共有3条队(指定信号量为3),那就是说最多只允许同时3个人一起结账(来一个人就会分配一把锁并计数加1),后面来结账的人只能等前面的人完成后才能进行结账。信号量与进程池的概念很像,最主要的区别在于信号量涉及到加锁的概念。
我们来看一个示例,假设有一个数据库连接池,最多允许3个进程同时使用连接,代码如下所示
import multiprocessing
import timedef use_database_connection(semaphore):with semaphore:print(f"进程 {multiprocessing.current_process().pid} 获得数据库连接")time.sleep(2)print(f"-----进程 {multiprocessing.current_process().pid} 释放数据库连接-----")if __name__ == '__main__':semaphore = multiprocessing.Semaphore(3)processes = []for _ in range(5):p = multiprocessing.Process(target=use_database_connection, args=(semaphore,))processes.append(p)p.start()for p in processes:p.join()
输出结果为:
事件
事件(Event)是一种简单的同步机制,用于线程或进程之间的通信和同步。一个进程可以通过设置事件来通知其他进程某个条件已经满足,例如,主进程可以通过事件来控制其他进程的执行。
事件处理的机制:事件在全局定义了一个 Flag,其为布尔值,该变量的值会影响 wait()、set()、clear() 方法的执行
事件主要提供三个方法:
- wait():如果值为 False 时,执行 wait() 方法会阻塞,为 True 则不会
- set():将 Flag 设为 True
- clear():将 Flag 设为 False
我们来看一个红绿灯的示例,一共5条车道, Flag 值为 False 为红灯,值为 True 为绿灯,代码如下所示
import multiprocessing
import time,randomdef police_car(e,n):while True:time.sleep(random.randint(3, 6)) # 不同警车停下的时间随机if not e.is_set():print('\033[31m红灯亮%s\033[0m,car%s等着' % (e.is_set(), n))e.wait() # 等待变绿灯print('\033[92m绿灯亮%s\033[0m,警车走了,car %s' %(e.is_set(),n))def traffic_lights(e,inverval):while True:time.sleep(inverval) # 红绿灯等待时间if e.is_set(): # 判断是否变绿灯print("\033[31m现在是红灯\033[0m")e.clear() # e.is_set() ---->Falseelse:print("\033[92m现在是绿灯\033[0m")e.set() # e.is_set() ---->Trueif __name__ == '__main__':e=multiprocessing.Event()for i in range(5):p = multiprocessing.Process(target=police_car, args=(e, i,))p.start()t = multiprocessing.Process(target=traffic_lights,args=(e,10))t.start()print('=====开始通行=====')
输出结果为:
进程池
官方文档:https://docs.python.org/dev/library/concurrent.futures.html
一、进程池的介绍
进程池(Process Pool)是一种用于管理和复用进程资源的机制。在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录或者远程控制多台主机,使用并行操作可以节约大量的时间,而之前介绍的多进程就是实现并发的手段之一,在进行并发时会面临一些问题:
- 通常需要并发执行的任务数会远远大于 CPU 的核心数,而一个操作系统并不会无限开启进程,而是根据核心数来开启进程(即有几个核心就开几个进程,开启多于核心数的进程无法做到并行)
- 当进程开启过多会占用大量的系统资源,导致运行效率将会不升反降
例如,当操作的对象量不是很大时,可以直接利用 multiprocessing 模块中的 Process 动态成生多个进程,但数量到达成百上千时,去手动限制进程数量会及繁琐又低效,这时就应该使用进程池了。我们可以通过创建一个进程池,可以预先创建一定数量的进程,并将任务分配给这些进程执行,从而避免了频繁创建和销毁进程的开销,并通过维护进程池来控制进程的数量,例如,httpd 的进程模式,就可以规定最小进程数和最大进程数。
对于远程过程调用的高级应用程序而言,应该使用进程池,进程池可以提供指定数量的进程供用户调用,当有新的请求提交到进程池中时,如果池还没有满就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,当池中有进程结束时就会重用进程池中结束的进程。
创建进程池可以使用 multiprocessing 模块中的 Pool 类,其语法如下:
Pool([numprocess [,initializer [, initargs]]])
类的参数介绍:
- numprocess:要创建的进程数,例如,指定 numprocess 为3,则进程池会从无到有创建3个进程,然后自始至终使用这3个进程去执行所有任务,不会开启其他进程,如果省略,将默认使用 cpu_count() 的值
- initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
- initargs:是要传给 initializer 的参数组
主要方法介绍:
- apply(func, args=(), kwds={}):在一个池工作进程中执行 func(*args,**kwargs),然后返回结果。需要强调的是,此操作并不会在所有池工作进程中并执行 func 方法。如果要通过不同参数并发地执行 func 方法,必须从不同线程调用 apply() 方法或者使用 apply_async() 方法
- map(func, iterable, chunksize=None):与 apply() 一样,但 map() 是对一个可迭代对象中的每个元素应用同一个 func。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
- apply_async(func, args=(), kwds={}, callback=None, error_callback=None):在一个池工作进程中异步地执行 func(*args,**kwargs),然后返回结果。此方法的结果是 AsyncResult 类的实例,回调函数(callback)是可调用对象,接收输入参数。当 func 的结果变为可用时,将立即传递给 callback。callback 禁止执行任何阻塞操作,否则将接收其他异步操作中的结果
- map_async(func, iterable, chunksize=None, callback=None, error_callback=None):与 apply_async() 一样都是异步执行 func,但 map_async() 是对一个可迭代对象中的每个元素应用同一个 func。例如,一个包含多个数字的列表,要对列表里的每个数字执行相同的计算操作。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
- close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
- jion():等待所有工作进程退出。此方法只能在 close() 或 teminate() 之后调用
其他方法介绍:
方法 apply_async() 和 map_async() 的返回值都是 AsyncResult 的实例(obj),其具有以下方法。
- obj.get(timeout=None):返回结果,如果有必要则等待结果到达。timeout 是可选的,如果未指定该参数或者将其设为 None,则会一直等待,直到执行完成并返回结果。如果在指定时间内还未完成,将抛出 TimeoutError 异常。如果远程操作中引发了异常,它将在调用此方法时再次抛出 Exception 异常
- obj.ready():如果调用完成,返回True
- obj.successful():如果调用完成且没有引发异常,返回True,如果在任务已经完成之前调用此方法(即 obj.ready() 返回 True),会引发 AssertionError 异常
- obj.wait(timeout=None):让当前程序等待异步任务完成
- obj.terminate():立即终止所有工作进程,将不执行任何清理或结束任何挂起工作。如果进程池被垃圾回收机制回收,将自动调用此函数
二、进程池的使用
同步调用:
from multiprocessing import Process,Pool
import timedef func(msg):print( "msg:", msg)time.sleep(0.1)return msgif __name__ == "__main__":pool = Pool(processes = 3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply(func, (msg, )) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限;维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个print("==============================>")pool.close()pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print(res_l) #看到的就是最终的结果组成的列表for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法print(i)
输出结果:
异步调用:
from multiprocessing import Process,Pool
import timedef func(msg):print( "msg:", msg)time.sleep(1)return msgif __name__ == "__main__":pool = Pool(processes = 3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply_async(func, (msg, )) # 同步运行,阻塞,直到本次任务执行完毕拿到res;并维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去res_l.append(res)print("==============================>") # 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了pool.close() # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print(res_l) # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果for i in res_l:print(i.get()) # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
输出结果:
三、基于进程池的 Socket 应用
Pool 内的进程数默认是 CPU 核心数,假设为4(查看方法 os.cpu_count()),开启6个客户端,其中会有2个客户端处于等待状态,而在每个进程内查看 pid,只使用了4个 pid,即多个客户端公用4个进程,代码如下所示
服务器端:
from socket import *
from multiprocessing import Pool
import osserver=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)def talk(conn,client_addr):print('用户 %s 进程pid: %s' % (client_addr, os.getpid()))while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool()while True:conn,client_addr=server.accept()p.apply_async(talk,args=(conn,client_addr))# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
客户端:
from socket import *client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))
四、回调函数
回调函数(Callback Function)扮演着重要的角色,尤其是在处理异步任务时。当我们使用进程池或异步启动进程执行任务时,任务的执行是异步的,这意味着主进程不会等待任务完成就会继续执行后续代码。而回调函数提供了一种机制,让我们可以在任务完成后自动执行特定的操作。
具体使用回调函数的场景为,进程池中任何一个任务一旦处理完了,就立即告知主进程,然后主进程则调用一个函数去处理该结果,该函数即回调函数。例如,可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了 I/O 的过程,直接拿到的执行完成后的结果。代码如下所示
from multiprocessing import Pool
import requests
import osdef get_page(url):print('<进程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<进程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) # 拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
输出结果:
在爬虫方面也可以用回调函数来进行数据处理,代码如下所示
from multiprocessing import Pool
import time,random
import requests
import redef get_page(url, pattern):response = requests.get(url) # 目前猫眼存在验证中心,先跳过,直接使用复制的html# 用于替代抓取结果的html代码,测试时自行去url抓取text = r"""<!DOCTYPE html><!-- saved from url=(0030)https://www.maoyan.com/board/7 --><html><!--<![endif]--><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>热映口碑榜 - 猫眼电影 - 一网打尽好电影</title>...</style><div id="mount" style="display: block;"></div></template></div></html>"""if response.status_code == 200:return text, patterndef parse_page(info):page_content, pattern = info # page_content为html代码,pattern为正则表达式res = re.findall(pattern, page_content) # 正则表达式对html代码进行过滤for item in res:dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]}print(dic)if __name__ == '__main__':pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)url_dic={'https://www.maoyan.com/board/7':pattern1,}p=Pool()res_l=[]for url,pattern in url_dic.items():res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) # 使用回调函数来处理抓到的html代码res_l.append(res)for res in res_l:res.get()# 爬虫实现的关键代码,但无法越过验证中心# res=requests.get('https://www.maoyan.com/board/7')# print(re.findall(pattern1,res.text))
输出结果:
注意:如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数