[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池
[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池
简介
共享数据
一、Value 和 Array
二、Manager
信号量
事件
进程池
一、进程池的介绍
二、进程池的使用
三、基于进程池的 Socket 应用
四、回调函数
简介
在 Python 编程中,并发编程能够显著提升程序的执行效率。多进程作为并发编程的重要方式之一,能够充分利用多核 CPU 的优势,实现真正的并行计算。在前面多篇博客的介绍多进程的主要内容介绍得差不多了:
- [Python学习日记-84] 并发编程之多进程 —— 进程理论
- [Python学习日记-85] 并发编程之多进程 —— multiprocessing 模块、Process 类、join 方法、僵尸进程与孤儿进程
- [Python学习日记-86] 并发编程之多进程 —— 守护进程
- [Python学习日记-87] 并发编程之多进程 —— 互斥锁
- [Python学习日记-88] 并发编程之多进程 —— 队列与生产者消费者模型
但是还有共享数据、信号量、事件、进程池这些尾巴,其中信号量、事件只需要了解一下就可以了,下面我们一起来看看这些知识吧。
共享数据
目前基于消息传递的并发编程是必然趋势,即使使用线程,也更趋向于将系统构建为松耦合的线程池体系。多进程中各进程之间的数据还是独立的,可以借助基于消息传递的队列或管道进行数据交换,这样不仅能显著降低对锁及其他同步机制的依赖,还可通过消息中间件的分布式特性无缝扩展至分布式环境。但是由于数据安全的问题,进程间通信应尽量避免采用共享数据的方式来进行通信。
虽然进程间的数据是独立的,但是在 multiprocessing 中提供了两种方式来实现数据共享,一种是通过 Value 和 Array;另一种是通过 Manager 来实现,这两种方法都需要使用锁来保证数据操作的安全性
一、Value 和 Array
multiprocessing 模块中的 Value 和 Array 可以在进程间共享简单的数据类型和数组,代码如下所示
Value:
import multiprocessing
def increment_shared_value(shared_value):
with shared_value.get_lock():
shared_value.value += 1
print(f"当前共享值为: {shared_value.value}")
if __name__ == '__main__':
shared_num = multiprocessing.Value('i', 0)
processes = []
for _ in range(5):
p = multiprocessing.Process(target=increment_shared_value, args=(shared_num,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"共享值最终为: {shared_num.value}")
输出结果为:
Array:
import multiprocessing
def increment_shared_value(shared_array, n):
with shared_array.get_lock():
for i in range(n):
shared_array[i] += 1
print(f"当前共享值为:{[shared_array[i] for i in range(n)]}")
if __name__ == '__main__':
shared_num = multiprocessing.Array('i', [0, 0, 0, 0, 0, 0, 0, 0, 0])
n = len(shared_num)
processes = []
for _ in range(5):
p = multiprocessing.Process(target=increment_shared_value, args=(shared_num, n,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"共享值最终为:{[shared_num[i] for i in range(n)]}")
输出结果为:
二、Manager
multiprocessing 模块中的 Manager 提供了更灵活的方式来共享复杂的数据结构,Manager() 返回的管理器对象使用了一个服务器进程,该进程保存 Python 对象,并允许其他进程通过代理来操作它们;该管理器支持多种复杂的数据结构,分别有:列表(list)、字典(dict)、命名空间(Namespace)、锁(Lock)、RLock、信号量(Semaphore)、有界信号量(BoundedSemaphore)、条件(Condition)、事件(Event)、Barrier、队列(Queue)、值(Value)和数组(Array)等。代码如下所示
import multiprocessing
def append_to_shared_list(shared_list, lock):
with lock: # 不加锁而操作共享的数据,肯定会出现数据错乱
shared_list.append(multiprocessing.current_process().pid)
print(f"当前共享列表:{shared_list}")
if __name__ == '__main__':
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
shared_list = manager.list()
processes = []
for _ in range(5):
p = multiprocessing.Process(target=append_to_shared_list, args=(shared_list,lock,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"共享列表: {shared_list}")
输出结果为:
信号量
信号量(Semaphore)是一种用于控制并发访问数量的同步原语。信号量与互斥锁进行对比,互斥锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据,例如,像超市结算排队,一共有3条队(指定信号量为3),那就是说最多只允许同时3个人一起结账(来一个人就会分配一把锁并计数加1),后面来结账的人只能等前面的人完成后才能进行结账。信号量与进程池的概念很像,最主要的区别在于信号量涉及到加锁的概念。
我们来看一个示例,假设有一个数据库连接池,最多允许3个进程同时使用连接,代码如下所示
import multiprocessing
import time
def use_database_connection(semaphore):
with semaphore:
print(f"进程 {multiprocessing.current_process().pid} 获得数据库连接")
time.sleep(2)
print(f"-----进程 {multiprocessing.current_process().pid} 释放数据库连接-----")
if __name__ == '__main__':
semaphore = multiprocessing.Semaphore(3)
processes = []
for _ in range(5):
p = multiprocessing.Process(target=use_database_connection, args=(semaphore,))
processes.append(p)
p.start()
for p in processes:
p.join()
输出结果为:
事件
事件(Event)是一种简单的同步机制,用于线程或进程之间的通信和同步。一个进程可以通过设置事件来通知其他进程某个条件已经满足,例如,主进程可以通过事件来控制其他进程的执行。
事件处理的机制:事件在全局定义了一个 Flag,其为布尔值,该变量的值会影响 wait()、set()、clear() 方法的执行
事件主要提供三个方法:
- wait():如果值为 False 时,执行 wait() 方法会阻塞,为 True 则不会
- set():将 Flag 设为 True
- clear():将 Flag 设为 False
我们来看一个红绿灯的示例,一共5条车道, Flag 值为 False 为红灯,值为 True 为绿灯,代码如下所示
import multiprocessing
import time,random
def police_car(e,n):
while True:
time.sleep(random.randint(3, 6)) # 不同警车停下的时间随机
if not e.is_set():
print('\033[31m红灯亮%s\033[0m,car%s等着' % (e.is_set(), n))
e.wait() # 等待变绿灯
print('\033[92m绿灯亮%s\033[0m,警车走了,car %s' %(e.is_set(),n))
def traffic_lights(e,inverval):
while True:
time.sleep(inverval) # 红绿灯等待时间
if e.is_set(): # 判断是否变绿灯
print("\033[31m现在是红灯\033[0m")
e.clear() # e.is_set() ---->False
else:
print("\033[92m现在是绿灯\033[0m")
e.set() # e.is_set() ---->True
if __name__ == '__main__':
e=multiprocessing.Event()
for i in range(5):
p = multiprocessing.Process(target=police_car, args=(e, i,))
p.start()
t = multiprocessing.Process(target=traffic_lights,args=(e,10))
t.start()
print('=====开始通行=====')
输出结果为:
进程池
官方文档:https://docs.python.org/dev/library/concurrent.futures.html
一、进程池的介绍
进程池(Process Pool)是一种用于管理和复用进程资源的机制。在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录或者远程控制多台主机,使用并行操作可以节约大量的时间,而之前介绍的多进程就是实现并发的手段之一,在进行并发时会面临一些问题:
- 通常需要并发执行的任务数会远远大于 CPU 的核心数,而一个操作系统并不会无限开启进程,而是根据核心数来开启进程(即有几个核心就开几个进程,开启多于核心数的进程无法做到并行)
- 当进程开启过多会占用大量的系统资源,导致运行效率将会不升反降
例如,当操作的对象量不是很大时,可以直接利用 multiprocessing 模块中的 Process 动态成生多个进程,但数量到达成百上千时,去手动限制进程数量会及繁琐又低效,这时就应该使用进程池了。我们可以通过创建一个进程池,可以预先创建一定数量的进程,并将任务分配给这些进程执行,从而避免了频繁创建和销毁进程的开销,并通过维护进程池来控制进程的数量,例如,httpd 的进程模式,就可以规定最小进程数和最大进程数。
对于远程过程调用的高级应用程序而言,应该使用进程池,进程池可以提供指定数量的进程供用户调用,当有新的请求提交到进程池中时,如果池还没有满就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,当池中有进程结束时就会重用进程池中结束的进程。
创建进程池可以使用 multiprocessing 模块中的 Pool 类,其语法如下:
Pool([numprocess [,initializer [, initargs]]])
类的参数介绍:
- numprocess:要创建的进程数,例如,指定 numprocess 为3,则进程池会从无到有创建3个进程,然后自始至终使用这3个进程去执行所有任务,不会开启其他进程,如果省略,将默认使用 cpu_count() 的值
- initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
- initargs:是要传给 initializer 的参数组
主要方法介绍:
- apply(func, args=(), kwds={}):在一个池工作进程中执行 func(*args,**kwargs),然后返回结果。需要强调的是,此操作并不会在所有池工作进程中并执行 func 方法。如果要通过不同参数并发地执行 func 方法,必须从不同线程调用 apply() 方法或者使用 apply_async() 方法
- map(func, iterable, chunksize=None):与 apply() 一样,但 map() 是对一个可迭代对象中的每个元素应用同一个 func。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
- apply_async(func, args=(), kwds={}, callback=None, error_callback=None):在一个池工作进程中异步地执行 func(*args,**kwargs),然后返回结果。此方法的结果是 AsyncResult 类的实例,回调函数(callback)是可调用对象,接收输入参数。当 func 的结果变为可用时,将立即传递给 callback。callback 禁止执行任何阻塞操作,否则将接收其他异步操作中的结果
- map_async(func, iterable, chunksize=None, callback=None, error_callback=None):与 apply_async() 一样都是异步执行 func,但 map_async() 是对一个可迭代对象中的每个元素应用同一个 func。例如,一个包含多个数字的列表,要对列表里的每个数字执行相同的计算操作。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
- close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
- jion():等待所有工作进程退出。此方法只能在 close() 或 teminate() 之后调用
其他方法介绍:
方法 apply_async() 和 map_async() 的返回值都是 AsyncResult 的实例(obj),其具有以下方法。
- obj.get(timeout=None):返回结果,如果有必要则等待结果到达。timeout 是可选的,如果未指定该参数或者将其设为 None,则会一直等待,直到执行完成并返回结果。如果在指定时间内还未完成,将抛出 TimeoutError 异常。如果远程操作中引发了异常,它将在调用此方法时再次抛出 Exception 异常
- obj.ready():如果调用完成,返回True
- obj.successful():如果调用完成且没有引发异常,返回True,如果在任务已经完成之前调用此方法(即 obj.ready() 返回 True),会引发 AssertionError 异常
- obj.wait(timeout=None):让当前程序等待异步任务完成
- obj.terminate():立即终止所有工作进程,将不执行任何清理或结束任何挂起工作。如果进程池被垃圾回收机制回收,将自动调用此函数
二、进程池的使用
同步调用:
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(0.1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply(func, (msg, )) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限;维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
print("==============================>")
pool.close()
pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l) #看到的就是最终的结果组成的列表
for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
print(i)
输出结果:
异步调用:
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply_async(func, (msg, )) # 同步运行,阻塞,直到本次任务执行完毕拿到res;并维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res)
print("==============================>") # 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了
# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
pool.close() # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l) # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
for i in res_l:
print(i.get()) # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
输出结果:
三、基于进程池的 Socket 应用
Pool 内的进程数默认是 CPU 核心数,假设为4(查看方法 os.cpu_count()),开启6个客户端,其中会有2个客户端处于等待状态,而在每个进程内查看 pid,只使用了4个 pid,即多个客户端公用4个进程,代码如下所示
服务器端:
from socket import *
from multiprocessing import Pool
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
print('用户 %s 进程pid: %s' % (client_addr, os.getpid()))
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p=Pool()
while True:
conn,client_addr=server.accept()
p.apply_async(talk,args=(conn,client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
客户端:
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
四、回调函数
回调函数(Callback Function)扮演着重要的角色,尤其是在处理异步任务时。当我们使用进程池或异步启动进程执行任务时,任务的执行是异步的,这意味着主进程不会等待任务完成就会继续执行后续代码。而回调函数提供了一种机制,让我们可以在任务完成后自动执行特定的操作。
具体使用回调函数的场景为,进程池中任何一个任务一旦处理完了,就立即告知主进程,然后主进程则调用一个函数去处理该结果,该函数即回调函数。例如,可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了 I/O 的过程,直接拿到的执行完成后的结果。代码如下所示
from multiprocessing import Pool
import requests
import os
def get_page(url):
print('<进程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def pasrse_page(res):
print('<进程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
p=Pool(3)
res_l=[]
for url in urls:
res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
res_l.append(res)
p.close()
p.join()
print([res.get() for res in res_l]) # 拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
输出结果:
在爬虫方面也可以用回调函数来进行数据处理,代码如下所示
from multiprocessing import Pool
import time,random
import requests
import re
def get_page(url, pattern):
response = requests.get(url) # 目前猫眼存在验证中心,先跳过,直接使用复制的html
# 用于替代抓取结果的html代码,测试时自行去url抓取
text = r"""
<!DOCTYPE html>
<!-- saved from url=(0030)https://www.maoyan.com/board/7 -->
<html><!--<![endif]--><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>热映口碑榜 - 猫眼电影 - 一网打尽好电影</title>
...
</style><div id="mount" style="display: block;"></div></template></div></html>"""
if response.status_code == 200:
return text, pattern
def parse_page(info):
page_content, pattern = info # page_content为html代码,pattern为正则表达式
res = re.findall(pattern, page_content) # 正则表达式对html代码进行过滤
for item in res:
dic={
'index':item[0],
'title':item[1],
'actor':item[2].strip()[3:],
'time':item[3][5:],
'score':item[4]+item[5]
}
print(dic)
if __name__ == '__main__':
pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)
url_dic={
'https://www.maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) # 使用回调函数来处理抓到的html代码
res_l.append(res)
for res in res_l:
res.get()
# 爬虫实现的关键代码,但无法越过验证中心
# res=requests.get('https://www.maoyan.com/board/7')
# print(re.findall(pattern1,res.text))
输出结果:
注意:如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数