当前位置: 首页 > wzjs >正文

做关于家乡的网站网店代运营十大排名

做关于家乡的网站,网店代运营十大排名,ih5制作平台官网免费,c2c模式流程图[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池 简介 共享数据 一、Value 和 Array 二、Manager 信号量 事件 进程池 一、进程池的介绍 二、进程池的使用 三、基于进程池的 Socket 应用 四、回调函数 简介 在 Python 编程中&#xff0c…

[Python学习日记-89] 并发编程之多进程 —— 共享数据、信号量、事件、进程池

简介

共享数据

一、Value 和 Array

二、Manager

信号量

事件

进程池

一、进程池的介绍

二、进程池的使用

三、基于进程池的 Socket 应用

四、回调函数

简介

        在 Python 编程中,并发编程能够显著提升程序的执行效率。多进程作为并发编程的重要方式之一,能够充分利用多核 CPU 的优势,实现真正的并行计算。在前面多篇博客的介绍多进程的主要内容介绍得差不多了:

  • [Python学习日记-84] 并发编程之多进程 —— 进程理论
  • [Python学习日记-85] 并发编程之多进程 —— multiprocessing 模块、Process 类、join 方法、僵尸进程与孤儿进程
  • [Python学习日记-86] 并发编程之多进程 —— 守护进程
  • [Python学习日记-87] 并发编程之多进程 —— 互斥锁
  • [Python学习日记-88] 并发编程之多进程 —— 队列与生产者消费者模型

        但是还有共享数据、信号量、事件、进程池这些尾巴,其中信号量、事件只需要了解一下就可以了,下面我们一起来看看这些知识吧。

共享数据

        目前基于消息传递的并发编程是必然趋势,即使使用线程,也更趋向于将系统构建为松耦合的线程池体系。多进程中各进程之间的数据还是独立的,可以借助基于消息传递的队列或管道进行数据交换,这样不仅能显著降低对锁及其他同步机制的依赖,还可通过消息中间件的分布式特性无缝扩展至分布式环境。但是由于数据安全的问题,进程间通信应尽量避免采用共享数据的方式来进行通信。

        虽然进程间的数据是独立的,但是在 multiprocessing 中提供了两种方式来实现数据共享,一种是通过 Value 和 Array;另一种是通过 Manager 来实现,这两种方法都需要使用锁来保证数据操作的安全性

一、Value 和 Array

        multiprocessing 模块中的 Value 和 Array 可以在进程间共享简单的数据类型和数组,代码如下所示

Value

import multiprocessingdef increment_shared_value(shared_value):with shared_value.get_lock():shared_value.value += 1print(f"当前共享值为: {shared_value.value}")if __name__ == '__main__':shared_num = multiprocessing.Value('i', 0)processes = []for _ in range(5):p = multiprocessing.Process(target=increment_shared_value, args=(shared_num,))processes.append(p)p.start()for p in processes:p.join()print(f"共享值最终为: {shared_num.value}")

输出结果为:

Array

import multiprocessingdef increment_shared_value(shared_array, n):with shared_array.get_lock():for i in range(n):shared_array[i] += 1print(f"当前共享值为:{[shared_array[i] for i in range(n)]}")if __name__ == '__main__':shared_num = multiprocessing.Array('i', [0, 0, 0, 0, 0, 0, 0, 0, 0])n = len(shared_num)processes = []for _ in range(5):p = multiprocessing.Process(target=increment_shared_value, args=(shared_num, n,))processes.append(p)p.start()for p in processes:p.join()print(f"共享值最终为:{[shared_num[i] for i in range(n)]}")

输出结果为:

二、Manager

        multiprocessing 模块中的 Manager 提供了更灵活的方式来共享复杂的数据结构,Manager() 返回的管理器对象使用了一个服务器进程,该进程保存 Python 对象,并允许其他进程通过代理来操作它们;该管理器支持多种复杂的数据结构,分别有:列表(list)、字典(dict)、命名空间(Namespace)、锁(Lock)、RLock、信号量(Semaphore)、有界信号量(BoundedSemaphore)、条件(Condition)、事件(Event)、Barrier、队列(Queue)、值(Value)和数组(Array)等。代码如下所示

import multiprocessingdef append_to_shared_list(shared_list, lock):with lock:    # 不加锁而操作共享的数据,肯定会出现数据错乱shared_list.append(multiprocessing.current_process().pid)print(f"当前共享列表:{shared_list}")if __name__ == '__main__':lock = multiprocessing.Lock()manager = multiprocessing.Manager()shared_list = manager.list()processes = []for _ in range(5):p = multiprocessing.Process(target=append_to_shared_list, args=(shared_list,lock,))processes.append(p)p.start()for p in processes:p.join()print(f"共享列表: {shared_list}")

输出结果为:

信号量

        信号量(Semaphore)是一种用于控制并发访问数量的同步原语。信号量与互斥锁进行对比,互斥锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据,例如,像超市结算排队,一共有3条队(指定信号量为3),那就是说最多只允许同时3个人一起结账(来一个人就会分配一把锁并计数加1),后面来结账的人只能等前面的人完成后才能进行结账。信号量与进程池的概念很像,最主要的区别在于信号量涉及到加锁的概念。

        我们来看一个示例,假设有一个数据库连接池,最多允许3个进程同时使用连接,代码如下所示

import multiprocessing
import timedef use_database_connection(semaphore):with semaphore:print(f"进程 {multiprocessing.current_process().pid} 获得数据库连接")time.sleep(2)print(f"-----进程 {multiprocessing.current_process().pid} 释放数据库连接-----")if __name__ == '__main__':semaphore = multiprocessing.Semaphore(3)processes = []for _ in range(5):p = multiprocessing.Process(target=use_database_connection, args=(semaphore,))processes.append(p)p.start()for p in processes:p.join()

输出结果为:

事件

        事件(Event)是一种简单的同步机制,用于线程或进程之间的通信和同步。一个进程可以通过设置事件来通知其他进程某个条件已经满足,例如,主进程可以通过事件来控制其他进程的执行。

事件处理的机制:事件在全局定义了一个 Flag,其为布尔值,该变量的值会影响 wait()、set()、clear() 方法的执行

事件主要提供三个方法

  • wait():如果值为 False 时,执行 wait() 方法会阻塞,为 True 则不会
  • set():将 Flag 设为 True
  • clear():将 Flag 设为 False

         我们来看一个红绿灯的示例,一共5条车道, Flag 值为 False 为红灯,值为 True 为绿灯,代码如下所示

import multiprocessing
import time,randomdef police_car(e,n):while True:time.sleep(random.randint(3, 6))    # 不同警车停下的时间随机if not e.is_set():print('\033[31m红灯亮%s\033[0m,car%s等着' % (e.is_set(), n))e.wait()    # 等待变绿灯print('\033[92m绿灯亮%s\033[0m,警车走了,car %s' %(e.is_set(),n))def traffic_lights(e,inverval):while True:time.sleep(inverval)    # 红绿灯等待时间if e.is_set():  # 判断是否变绿灯print("\033[31m现在是红灯\033[0m")e.clear() # e.is_set() ---->Falseelse:print("\033[92m现在是绿灯\033[0m")e.set()  # e.is_set() ---->Trueif __name__ == '__main__':e=multiprocessing.Event()for i in range(5):p = multiprocessing.Process(target=police_car, args=(e, i,))p.start()t = multiprocessing.Process(target=traffic_lights,args=(e,10))t.start()print('=====开始通行=====')

输出结果为:

进程池

官方文档:https://docs.python.org/dev/library/concurrent.futures.html

一、进程池的介绍

        进程池(Process Pool)是一种用于管理和复用进程资源的机制。在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录或者远程控制多台主机,使用并行操作可以节约大量的时间,而之前介绍的多进程就是实现并发的手段之一,在进行并发时会面临一些问题:

  1. 通常需要并发执行的任务数会远远大于 CPU 的核心数,而一个操作系统并不会无限开启进程,而是根据核心数来开启进程(即有几个核心就开几个进程,开启多于核心数的进程无法做到并行)
  2. 当进程开启过多会占用大量的系统资源,导致运行效率将会不升反降

        例如,当操作的对象量不是很大时,可以直接利用 multiprocessing 模块中的 Process 动态成生多个进程,但数量到达成百上千时,去手动限制进程数量会及繁琐又低效,这时就应该使用进程池了。我们可以通过创建一个进程池,可以预先创建一定数量的进程,并将任务分配给这些进程执行,从而避免了频繁创建和销毁进程的开销,并通过维护进程池来控制进程的数量,例如,httpd 的进程模式,就可以规定最小进程数和最大进程数。

        对于远程过程调用的高级应用程序而言,应该使用进程池,进程池可以提供指定数量的进程供用户调用,当有新的请求提交到进程池中时,如果池还没有满就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,当池中有进程结束时就会重用进程池中结束的进程。

        创建进程池可以使用 multiprocessing 模块中的 Pool 类,其语法如下:

Pool([numprocess [,initializer [, initargs]]])

类的参数介绍

  • numprocess:要创建的进程数,例如,指定 numprocess 为3,则进程池会从无到有创建3个进程,然后自始至终使用这3个进程去执行所有任务,不会开启其他进程,如果省略,将默认使用 cpu_count() 的值
  • initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
  • initargs:是要传给 initializer 的参数组

主要方法介绍: 

  • apply(func, args=(), kwds={}):在一个池工作进程中执行 func(*args,**kwargs),然后返回结果。需要强调的是,此操作并不会在所有池工作进程中并执行 func 方法。如果要通过不同参数并发地执行 func 方法,必须从不同线程调用 apply() 方法或者使用 apply_async() 方法
  • map(func, iterable, chunksize=None):与 apply() 一样,但 map() 是对一个可迭代对象中的每个元素应用同一个 func。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
  • apply_async(func, args=(), kwds={}, callback=None, error_callback=None):在一个池工作进程中异步地执行 func(*args,**kwargs),然后返回结果。此方法的结果是 AsyncResult 类的实例,回调函数(callback)是可调用对象,接收输入参数。当 func 的结果变为可用时,将立即传递给 callback。callback 禁止执行任何阻塞操作,否则将接收其他异步操作中的结果
  • map_async(func, iterable, chunksize=None, callback=None, error_callback=None):与 apply_async() 一样都是异步执行 func,但 map_async() 是对一个可迭代对象中的每个元素应用同一个 func。例如,一个包含多个数字的列表,要对列表里的每个数字执行相同的计算操作。返回的是一个可迭代对象,包含了 func 应用到每个元素后的结果
  • close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
  • jion():等待所有工作进程退出。此方法只能在 close() 或 teminate() 之后调用

其他方法介绍

        方法 apply_async() 和 map_async() 的返回值都是 AsyncResult 的实例(obj),其具有以下方法。

  • obj.get(timeout=None):返回结果,如果有必要则等待结果到达。timeout 是可选的,如果未指定该参数或者将其设为 None,则会一直等待,直到执行完成并返回结果。如果在指定时间内还未完成,将抛出 TimeoutError 异常。如果远程操作中引发了异常,它将在调用此方法时再次抛出 Exception 异常
  • obj.ready():如果调用完成,返回True
  • obj.successful():如果调用完成且没有引发异常,返回True,如果在任务已经完成之前调用此方法(即 obj.ready() 返回 True),会引发 AssertionError 异常
  • obj.wait(timeout=None):让当前程序等待异步任务完成
  • obj.terminate():立即终止所有工作进程,将不执行任何清理或结束任何挂起工作。如果进程池被垃圾回收机制回收,将自动调用此函数

二、进程池的使用

同步调用

from multiprocessing import Process,Pool
import timedef func(msg):print( "msg:", msg)time.sleep(0.1)return msgif __name__ == "__main__":pool = Pool(processes = 3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply(func, (msg, ))   # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限;维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个print("==============================>")pool.close()pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print(res_l) #看到的就是最终的结果组成的列表for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法print(i)

输出结果:

异步调用

from multiprocessing import Process,Pool
import timedef func(msg):print( "msg:", msg)time.sleep(1)return msgif __name__ == "__main__":pool = Pool(processes = 3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply_async(func, (msg, ))   # 同步运行,阻塞,直到本次任务执行完毕拿到res;并维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去res_l.append(res)print("==============================>") # 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了pool.close()    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print(res_l)    # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果for i in res_l:print(i.get())  # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

输出结果:

三、基于进程池的 Socket 应用

        Pool 内的进程数默认是 CPU 核心数,假设为4(查看方法 os.cpu_count()),开启6个客户端,其中会有2个客户端处于等待状态,而在每个进程内查看 pid,只使用了4个 pid,即多个客户端公用4个进程,代码如下所示

服务器端

from socket import *
from multiprocessing import Pool
import osserver=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)def talk(conn,client_addr):print('用户 %s 进程pid: %s' % (client_addr, os.getpid()))while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool()while True:conn,client_addr=server.accept()p.apply_async(talk,args=(conn,client_addr))# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

客户端

from socket import *client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))

四、回调函数

        回调函数(Callback Function)扮演着重要的角色,尤其是在处理异步任务时。当我们使用进程池或异步启动进程执行任务时,任务的执行是异步的,这意味着主进程不会等待任务完成就会继续执行后续代码。而回调函数提供了一种机制,让我们可以在任务完成后自动执行特定的操作。

        具体使用回调函数的场景为,进程池中任何一个任务一旦处理完了,就立即告知主进程,然后主进程则调用一个函数去处理该结果,该函数即回调函数。例如,可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了 I/O 的过程,直接拿到的执行完成后的结果。代码如下所示

from multiprocessing import Pool
import requests
import osdef get_page(url):print('<进程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<进程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l])    # 拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

输出结果:

        在爬虫方面也可以用回调函数来进行数据处理,代码如下所示

from multiprocessing import Pool
import time,random
import requests
import redef get_page(url, pattern):response = requests.get(url)    # 目前猫眼存在验证中心,先跳过,直接使用复制的html# 用于替代抓取结果的html代码,测试时自行去url抓取text = r"""<!DOCTYPE html><!-- saved from url=(0030)https://www.maoyan.com/board/7 --><html><!--<![endif]--><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>热映口碑榜 - 猫眼电影 - 一网打尽好电影</title>...</style><div id="mount" style="display: block;"></div></template></div></html>"""if response.status_code == 200:return text, patterndef parse_page(info):page_content, pattern = info    # page_content为html代码,pattern为正则表达式res = re.findall(pattern, page_content) # 正则表达式对html代码进行过滤for item in res:dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]}print(dic)if __name__ == '__main__':pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)url_dic={'https://www.maoyan.com/board/7':pattern1,}p=Pool()res_l=[]for url,pattern in url_dic.items():res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)  # 使用回调函数来处理抓到的html代码res_l.append(res)for res in res_l:res.get()# 爬虫实现的关键代码,但无法越过验证中心# res=requests.get('https://www.maoyan.com/board/7')# print(re.findall(pattern1,res.text))

输出结果:

注意:如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数 

http://www.dtcms.com/wzjs/153582.html

相关文章:

  • 凡科网可以免费做网站吗每日新闻简报
  • 佛山外贸网站手机最新产品新闻
  • wordpress文章页广告插件湖南seo推广软件
  • 企业网站建设哪家公司好百度账号购买1元40个
  • 自建站服务品牌推广方案包括哪些
  • 货代去什么网站开发客户资源
  • 西安烽盈网站建设推广网店seo关键词
  • 怎么做网站卖车如何在百度上做广告
  • 怎么做引流网站百度优化师
  • 做婚宴的网站有哪些武汉大学人民医院东院
  • 室内设计案例去什么网站百度网站登录
  • 苏州新区建网站外包
  • 杨浦集团网站建设新闻 今天
  • 做门头上那个网站申报互联网推广怎么找客户
  • 张家明做网站seo搜索引擎优化5
  • 《网站开发技术》模板乐天seo培训中心
  • 深圳做网站优化工资多少搜索引擎营销广告
  • 官方网站下载打印机驱动搜索软件排行榜前十名
  • 合肥 网站建设中央网站seo
  • 有空间与域名 怎么做网站网络服务主要包括什么
  • 营销网站怎么做福州百度seo
  • 深圳营销型网站建设哪家好百度 营销怎么收费
  • dw网站制作怎么做滑动的图片建立网站一般要多少钱
  • 自己免费网站建设青岛seo关键词优化公司
  • 南昌市公司网站建设北京网站seo费用
  • 怎么做能打不开漫画网站站外推广免费网站
  • 网站建设金手指什么推广平台比较好
  • 门户网站建设创新搜索引擎下载安装
  • 石家庄有哪些公司可以做网站模板建站难吗
  • 网站个人备案域名解析在线查询