python爬虫关于多进程,多线程,协程的使用
简介:
python其实没有真正意义的多线程,因为有GIL锁存在,但是python3.13去掉GIL锁,有两个版本,python3.13t和python3.13,python3.13去掉GIL锁相当于python底层大规模改变,肯定会影响一些库的使用,并且可能导致单线程速度变慢。原来的python多线程是单线程利用io等待去完成其它事务,造成多线程假象,其实并没有对CPU资源造成影响。
本文介绍关于python爬虫种的多线程,多进程,协程爬虫。本文的案例仅供学习,切勿去压测别人网站。若因个人不当行为引发问题,责任与本人无关。
1.多线程
举个例子,python单线程
def func():time.time(5)if __name__ == '__main__':t = time.time()time.sleep(5)func()print(time.time() - t)
一般来说这个时间耗时十秒
如果用多线程
from threading import Thread
import timedef func():time.sleep(5) # 线程睡眠5秒if __name__ == '__main__':start_time = time.time() # 记录程序开始时间f = Thread(target=func) # 创建线程f.start() # 启动线程time.sleep(5) # 主线程再睡眠5秒end_time = time.time() # 记录程序结束时间print(end_time - start_time)
速度提高差不多一倍
没有真正意思的多线程,主要利用等待时间去执行别的线程
from threading import Thread
def func():a = input('请输入你要输入的值\n')print('程序结束')
def func2():print('线程2开始\n')if __name__ == '__main__':f = Thread(target=func)#线程1f2 = Thread(target=func2) #线程2f.start()f2.start()print(11111111)print(11111111)print(11111111)print('等待子线程完成')f.join()print('主线程完成')
一般来说,单线程遇到io操作或者等待,也就是读写操作时会等待才对,但是这个程序并不会等待,主要原理就算多线程利用io等待时间运行别的事务。f.join()作用是等待子线程完成才会运行主线程也就是 print('主线程完成')
当io结束时,程序正式结束
开启线程传参是这样表示的
threading.Thread(target=get_movie_info, args=(page,))
args必须为元组
案例:豆瓣电影 Top 250豆瓣
注意:豆瓣是会封ip的,不要频繁请求
import requests
import threading
from lxml import etree
import time
url = 'https://movie.douban.com/top250?start={}&filter='headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ""AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}def get_movie_info(page):response = requests.get(url.format(page * 25), headers=headers).texttree = etree.HTML(response)result = tree.xpath("//div[@class='hd']/a/span[1]/text()")print(result)if __name__ == '__main__':t = time.time()thread_obj_list = [threading.Thread(target=get_movie_info, args=(page,)) for page in range(10)]# 先启动所有线程for thread_obj in thread_obj_list:thread_obj.start()# 再等待所有线程完成for thread_obj in thread_obj_list:thread_obj.join()print(time.time() - t)
原理,先用循环为每一页的请求创建一个线程,再用循环进行线程的开始,注意,如果要判断时间,不要start就join一个,这样跟单线程一样,甚至比单线程还要慢。
耗时:
单线程自己去写我直接给出时间 、
单线程就慢很多。
2.线程池
python还提供了线程池功能. 可以一次性的创建多个线程, 并且, 不需要我们程序员手动去维护. 一切都交给线程池来自动管理.
# 线程池
def fn(name):for i in range(1000):print(name, i)if __name__ == '__main__':with ThreadPoolExecutor(10) as t:for i in range(100):t.submit(fn, name=f"线程{i}")
输出的值特别乱
如果用来做计算
# 线程池
from concurrent.futures import ThreadPoolExecutor, as_completed
a = 1
def fn(name):for i in range(1000):print(a+i)if __name__ == '__main__':with ThreadPoolExecutor(10) as t:for i in range(100):t.submit(fn, name=f"线程{i}")
也会很乱
所以最好不要用多线程进行计算。
如果要有返回值
方案一:future对象获取返回值
# 线程池
from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def func(name):return nameif __name__ == '__main__':with ThreadPoolExecutor(10) as t:names = [5, 2, 3]futures = [t.submit(func, page) for page in range(10)]for future in futures:print(future.result())
缺点:future对象获取返回值会造成主线程堵塞
方案二:as_completed会立即返回处理完成的结果
# 线程池
from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def func(name):return nameif __name__ == '__main__':with ThreadPoolExecutor(10) as t:names = [5, 2, 3]futures = [t.submit(func, page) for page in range(10)]# as_completed会立即返回处理完成的结果而不会堵塞主线程for future in as_completed(futures):print(future.result())
缺点:返回结果顺序乱
方案三:直接用map进行任务分发
# 线程池
from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def func(name):return nameif __name__ == '__main__':with ThreadPoolExecutor(10) as t:futures = t.map(func, list(range(10)))for r in futures:print("result", r)
注意map第二个参数为列表
缺点:跟方案1一样
方案四:添加回调
# 线程池
from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def func(name):return name
def do_callback(res):print(res.result())
if __name__ == '__main__':with ThreadPoolExecutor(10) as t:futures = [t.submit(func,page).add_done_callback(do_callback) for page in range(10)]
缺点:跟方案2一样,并且维护难,灵活性低 。
线程池案例:2024 中国票房 | 中国票房 | 中国电影票房排行榜
存入csv文件
第一步,封装请求方法
def get_page_source(url):resp = requests.get(url)resp.encoding = 'utf-8'return resp.text
第二步,封装提取方法
def parse_html(html):try:tree = etree.HTML(html)trs = tree.xpath("//table/tbody/tr")[1:]result = []for tr in trs:year = tr.xpath("./td[2]//text()")year = year[0] if year else ""name = tr.xpath("./td[3]//text()")name = name[0] if name else ""money = tr.xpath("./td[4]//text()")money = money[0] if money else ""d = (year, name, money)if any(d):result.append(d)return resultexcept Exception as e:print(e) # 调bug专用
第三步,封装存储csv方法,方法一和方法二在里面
def download_one(url, f):page_source = get_page_source(url)data = parse_html(page_source)for item in data:f.write(",".join(item))f.write("\n")
第四步,封装主函数,线程池
def main():f = open("movie.csv", mode="w", encoding='utf-8')lst = [str(i) for i in range(1994, 2022)]with ThreadPoolExecutor(10) as t:# 方案一# for year in lst:# url = f"http://www.boxofficecn.com/boxoffice{year}"# # download_one(url, f)# t.submit(download_one, url, f)# 方案二t.map(download_one, (f"http://www.boxofficecn.com/boxoffice{year}" for year in lst), (f for i in range(len(lst))))
注意,先打开文件
最后一步,启动主函数
完整步骤如下:
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutordef get_page_source(url):resp = requests.get(url)resp.encoding = 'utf-8'return resp.textdef parse_html(html):try:tree = etree.HTML(html)trs = tree.xpath("//table/tbody/tr")[1:]result = []for tr in trs:year = tr.xpath("./td[2]//text()")year = year[0] if year else ""name = tr.xpath("./td[3]//text()")name = name[0] if name else ""money = tr.xpath("./td[4]//text()")money = money[0] if money else ""d = (year, name, money)if any(d):result.append(d)return resultexcept Exception as e:print(e) # 调bug专用def download_one(url, f):page_source = get_page_source(url)data = parse_html(page_source)for item in data:f.write(",".join(item))f.write("\n")def main():f = open("movie.csv", mode="w", encoding='utf-8')lst = [str(i) for i in range(1994, 2022)]with ThreadPoolExecutor(10) as t:# 方案一# for year in lst:# url = f"http://www.boxofficecn.com/boxoffice{year}"# # download_one(url, f)# t.submit(download_one, url, f)# 方案二t.map(download_one, (f"http://www.boxofficecn.com/boxoffice{year}" for year in lst), (f for i in range(len(lst))))if __name__ == '__main__':main()
结果如下:
3.多进程
因为在Python
中存在GIL
锁,无法充分利用多核优势。所以为了能够提高程序运行效率我们也会采用进程的方式来完成代码需求。多进程和多线程区别:多进程相当于多个程序. 多线程相当于在一个程序里多条任务同时执行.
基本使用
from multiprocessing import Process
import timedef func():print('1111')# 创建进程对象
p = Process(target=func)# 启动进程
if __name__ == '__main__': p.start()# 等待子进程完成p.join()
进程必须在 if __name__ == '__main__':在运行
多进程在爬虫中的应用
如果遇到图片抓取的时候, 我们知道图片在一般都在网页的img标签中src属性存放的是图片的下载地址. 此时我们可以采用多进程的方案来实现, 一个负责疯狂扫图片下载地址. 另一个进程只负责下载图片.
综上, 多个任务需要并行执行, 但是任务之间相对独立(不一定完全独立). 可以考虑用多进程.
4.进程池
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
def func():print('1111')# 启动进程
if __name__ == '__main__':max_workers = cpu_count() # 使用 CPU 核心数作为最大并发数# max_workers = 4 # 进程数量默认为 4with ProcessPoolExecutor(max_workers=max_workers) as executor:# 提交任务到进程池futures = [executor.submit(func,) for i in (range(10))]# 等待所有任务完成for future in futures:future.result()
进程不是开越多越好,线程也一样,进程一般以 max_workers = cpu_count() # 使用 CPU 核心数作为最大并发数。一般多线程搭配队列Queue使用,在一个脚本里两个进程必须通过队列进行传输。比如一个脚本为一个进程,一般在scrapy运行多个脚本用多进程
scrapy运行案例:
from multiprocessing import Pool, cpu_count
from concurrent.futures import ProcessPoolExecutor
from scrapy import cmdline
def run_spider(name):cmdline.execute(f"scrapy crawl {name}".split())
if __name__ == '__main__':spider_names = ['spider1','spider2','spider3','spider4']max_workers = 4with ProcessPoolExecutor(max_workers=max_workers) as executor:# 提交任务到进程池futures = [executor.submit(run_spider, spider_name) for spider_name in spider_names]# 等待所有任务完成for future in futures:future.result()
终极案例:进程结合线程使用
免费4K高清壁纸-电脑背景图片-Mac壁纸网站「哲风壁纸」
这是个加解密网站,包括多进程,多线程。
先说怎么设计,只解释进程线程,不解释加解密
第一步肯定要导入队列,因为多进程用队列进行分享数据
from multiprocessing import Process,Queue
第二步,开启两个进程
if __name__ == '__main__':q = Queue() # 两个进程必须使用同一个队列. 否则数据传输不了p1 = Process(target=get_img_src, args=(q,))#发送请求并获取获取图片链接p2 = Process(target=download_img, args=(q,))#下载链接p1.start()p2.start()
第三步设计p1进程多线程:
def get_img_src(q):"""进程1: 负责提取页面中所有的img的下载地址将图片的下载地址通过队列. 传输给另一个进程进行下载进程1开启多线程"""with ThreadPoolExecutor(10) as t:futures = [t.submit(get_req, i, q) for i in range(1, 11)]for future in as_completed(futures):future.result() # 等待任务完成并获取结果q.put(None)
第四步线程设计,q.put(None)作用让程序结束条件。为什么不能判断q.empty(),因为队列有好几次为空状态
def get_req(page,q):"""网站解密请求"""url = "https://haowallpaper.com/link/pc/wallpaper/getWallpaperList"js = execjs.compile(js_code)data = {"page": page, "sortType": 3, "isSel": "true", "rows": 9, "isFavorites": False, "wpType": 1}params = {"data": js.call('_', data)}response = requests.get(url, headers=headers, params=params)text = js.call('get_data', response.json()['data'])for img_id in text['list']:list_img = 'https://haowallpaper.com/link/common/file/getCroppingImg/' + img_id['fileId']q.put(list_img)response.close()
ok,第一条进程设计完毕
p2进程
第五步设计多线程,程序从这里跳出去,然后结束。
def download_img(q):"""进程2: 将图片的下载地址从队列中提取出来. 进行下载.进程2:开启多线程"""with ThreadPoolExecutor(10) as t:while 1:s = q.get()if s == None:breakt.submit(donwload_one, s)
第六步设计下载方法
def donwload_one(s):# 单纯的下载功能resp = requests.get(s, headers=headers)file_name = s.split("/")[-1]+'.jpg'# 请提前创建好img文件夹with open(f"img/{file_name}", mode="wb") as f:f.write(resp.content)print("一张图片下载完毕", file_name)resp.close()
设计完毕
完整代码
from multiprocessing import Process,Queue
from concurrent.futures import ThreadPoolExecutor, as_completed
import requestsimport subprocess
from functools import partial
subprocess.Popen = partial(subprocess.Popen, encoding="utf-8")
import execjs
import requests
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
}
js_code = '''
var CryptoJS = require('crypto-js')
function _(W) {W = JSON.stringify(W)const me = CryptoJS.enc.Utf8.parse("68zhehao2O776519"), Ee = CryptoJS.enc.Utf8.parse("aa176b7519e84710"), Ye = CryptoJS.AES.encrypt(W, me, {iv: Ee,padding: CryptoJS.pad.Pkcs7}).ciphertext.toString();return CryptoJS.enc.Base64.stringify(CryptoJS.enc.Hex.parse(Ye))}function get_data(W) {Ee = CryptoJS.enc.Base64.parse(W).toString(CryptoJS.enc.Hex), je = CryptoJS.enc.Utf8.parse("68zhehao2O776519"), Ye = CryptoJS.enc.Utf8.parse("aa176b7519e84710"), Ct = CryptoJS.lib.CipherParams.create({ciphertext: CryptoJS.enc.Hex.parse(Ee)}), Lt = CryptoJS.AES.decrypt(Ct, je, {iv: Ye,padding: CryptoJS.pad.Pkcs7});me = CryptoJS.enc.Utf8.stringify(Lt).replace(/\0.*$/g, "")return JSON.parse(me)
}
'''
def get_req(page,q):"""网站解密请求"""url = "https://haowallpaper.com/link/pc/wallpaper/getWallpaperList"js = execjs.compile(js_code)data = {"page": page, "sortType": 3, "isSel": "true", "rows": 9, "isFavorites": False, "wpType": 1}params = {"data": js.call('_', data)}response = requests.get(url, headers=headers, params=params)text = js.call('get_data', response.json()['data'])for img_id in text['list']:list_img = 'https://haowallpaper.com/link/common/file/getCroppingImg/' + img_id['fileId']q.put(list_img)response.close()
def get_img_src(q):"""进程1: 负责提取页面中所有的img的下载地址将图片的下载地址通过队列. 传输给另一个进程进行下载进程1开启多线程"""with ThreadPoolExecutor(10) as t:futures = [t.submit(get_req, i, q) for i in range(1, 11)]for future in as_completed(futures):future.result() # 等待任务完成并获取结果q.put(None)
def download_img(q):"""进程2: 将图片的下载地址从队列中提取出来. 进行下载.进程2:开启多线程"""with ThreadPoolExecutor(10) as t:while 1:s = q.get()if s == None:breakt.submit(donwload_one, s)
def donwload_one(s):# 单纯的下载功能resp = requests.get(s, headers=headers)file_name = s.split("/")[-1]+'.jpg'# 请提前创建好img文件夹with open(f"img/{file_name}", mode="wb") as f:f.write(resp.content)print("一张图片下载完毕", file_name)resp.close()if __name__ == '__main__':q = Queue() # 两个进程必须使用同一个队列. 否则数据传输不了p1 = Process(target=get_img_src, args=(q,))#发送请求并获取获取图片链接p2 = Process(target=download_img, args=(q,))#下载链接p1.start()p2.start()
结果如下:
非常快 。
总的来说,两个进程,一个负责请求获取图片链接。一个负责下载图片。在请求或者文件写入时也就是下载等于io等待,这时就可以用多进程了。
5.协程
终于写到协程了,累死我了。
协程和线程区别?
多线程由操作系统调度,线程切换涉及系统调用,开销较大。协程由用户态调度器调度,切换开销小,由开发者控制,多线程受限于系统资源和 GIL,在 Python 中并发能力有限;协程可在一个线程中创建大量协程,适合高并发。
简单总结,协程比线程快(线程切换),比线程开销小。
基本语法
async def func():print("我是协程")if __name__ == '__main__':# print(func()) # 注意, 此时拿到的是一个协程对象, 和生成器差不多.该函数默认是不会这样执行的coroutine = func()asyncio.run(coroutine) # 用asyncio的run来执行协程.# lop = asyncio.get_event_loop()# lop.run_until_complete(coroutine) # 这两句顶上面一句
明显效果
import time
import asyncio# await: 当该任务被挂起后, CPU会自动切换到其他任务中
async def func1():print("func1, start")await asyncio.sleep(3)print("func1, end")async def func2():print("func2, start")await asyncio.sleep(4)print("func2, end")async def func3():print("func3, start")await asyncio.sleep(2)print("func3, end")async def run():start = time.time()tasks = [ # 协程任务列表asyncio.ensure_future(func1()), # create_task创建协程任务asyncio.ensure_future(func2()),asyncio.ensure_future(func3()),]await asyncio.wait(tasks) # 等待所有任务执行结束print(time.time() - start)if __name__ == '__main__':asyncio.run(run())
效果如下:
asyncio.ensure_future()
用于将协程封装成任务对象并排定在事件循环中执行,适用于在事件循环中并发运行多个任务。而 asyncio.run()
是运行异步程序的顶层入口点,用于启动整个异步应用程序并阻塞当前线程直到完成。在实际应用中,通常在程序的入口处使用 asyncio.run()
启动主协程,然后在主协程中使用 asyncio.ensure_future()
或 asyncio.create_task()
来创建和管理其他任务。
协程返回值
import asyncioasync def faker1():print("任务1开始")await asyncio.sleep(1)print("任务1完成")return "任务1结束"async def faker2():print("任务2开始")await asyncio.sleep(2)print("任务2完成")return "任务2结束"async def faker3():print("任务3开始")await asyncio.sleep(3)print("任务3完成")return "任务3结束"async def main():tasks = [asyncio.create_task(faker3()),asyncio.create_task(faker1()),asyncio.create_task(faker2()),]# 方案一, 用wait, 返回的结果在result中result, pending = await asyncio.wait(tasks)for r in result:print(r.result())# 方案二, 用gather, 返回的结果在result中, 结果会按照任务添加的顺序来返回数据# return_exceptions如果任务在执行过程中报错了. 返回错误信息. # result = await asyncio.gather(*tasks, return_exceptions=True)# for r in result:# print(r)if __name__ == '__main__':asyncio.run(main())
asyncio.ensure_future() 和 asyncio.create_task() 都可以将协程封装成一个 Task 对象并排定在事件循环中执行,作用基本一致。
result, pending = await asyncio.wait(tasks)
result
:包含所有已完成的 Task
对象。这些任务已经执行完毕,可以通过调用它们的.result()
方法来获取任务的返回值。
pending
:包含尚未完成的 Task
对象。这些任务可能仍在执行中或者尚未开始执行。
当你调用 await asyncio.wait(tasks)
时,当前协程会将控制权交还给事件循环,事件循环会继续执行其他可以运行的任务。
asyncio.run():
是运行异步程序的顶层入口点,通常用于启动整个异步应用程序。它会创建一个新的事件循环,并在该循环中运行指定的协程。
aiohttp模块基本使用
requests
是python
中的同步网络爬虫库,并不能直接使用asyncio
运行。所以我们使用asyncio
中的run_in_executor
方法创建线程池完成并发。用aiohttp请求
案例 明朝那些事儿-明朝那些事儿全集在线阅读
如何设计:
第一步,导入三个异步库
import asyncio
import aiohttp
import aiofiles
第二步,请求得到链接和标题
def get_chapter_info(url):resp = requests.get(url)resp.encoding = 'utf-8'page_source = resp.textresp.close()result = []# 解析page_sorucetree = etree.HTML(page_source)mulus = tree.xpath("//div[@class='main']/div[@class='bg']/div[@class='mulu']")for mulu in mulus:trs = mulu.xpath("./center/table/tr")title = trs[0].xpath(".//text()")chapter_name = "".join(title).strip()chapter_hrefs = []for tr in trs[1:]: # 循环内容hrefs = tr.xpath("./td/a/@href")chapter_hrefs.extend(hrefs)result.append({"chapter_name": chapter_name, "chapter_hrefs": chapter_hrefs})return result
这一步是最先执行的,不需要异步
第二步,创建异步下载方法
async def download_one(name, href):async with aiohttp.ClientSession() as session:async with session.get(href) as resp:hm = await resp.text(encoding="utf-8", errors="ignore")# 处理hmtree = etree.HTML(hm)title = tree.xpath("//div[@class='main']/h1/text()")[0].strip()content_list = tree.xpath("//div[@class='main']/div[@class='content']/p/text()")content = "\n".join(content_list).strip()async with aiofiles.open(f"{name}/{title}.txt", mode="w", encoding="utf-8") as f:await f.write(content)print(title)
第三步创建事件循环
async def download_all(chapter_info):tasks = []for chapter in chapter_info:name = f"./小说/{chapter['chapter_name']}"if not os.path.exists(name):os.makedirs(name)for url in chapter['chapter_hrefs']:task = asyncio.create_task(download_one(name, url))tasks.append(task)await asyncio.wait(tasks)
第四步,创建main方法,执行函数 ,运行异步程序的顶层入口点,通常用于启动整个异步应用程序。
def main():url = "http://www.mingchaonaxieshier.com/"# 获取每一篇文章的名称和url地址chapter_info = get_chapter_info(url)# 可以分开写. 也可以合起来写.# 方案一,分开写:# for chapter in chapter_info:# asyncio.run(download_chapter(chapter))# 方案e,合起来下载:asyncio.run(download_all(chapter_info))
完整步骤:
import asyncio
import aiohttp
import aiofiles
import requests
from lxml import etree
import osdef get_chapter_info(url):resp = requests.get(url)resp.encoding = 'utf-8'page_source = resp.textresp.close()result = []# 解析page_sorucetree = etree.HTML(page_source)mulus = tree.xpath("//div[@class='main']/div[@class='bg']/div[@class='mulu']")for mulu in mulus:trs = mulu.xpath("./center/table/tr")title = trs[0].xpath(".//text()")chapter_name = "".join(title).strip()chapter_hrefs = []for tr in trs[1:]: # 循环内容hrefs = tr.xpath("./td/a/@href")chapter_hrefs.extend(hrefs)result.append({"chapter_name": chapter_name, "chapter_hrefs": chapter_hrefs})return resultasync def download_one(name, href):async with aiohttp.ClientSession() as session:async with session.get(href) as resp:hm = await resp.text(encoding="utf-8", errors="ignore")# 处理hmtree = etree.HTML(hm)title = tree.xpath("//div[@class='main']/h1/text()")[0].strip()content_list = tree.xpath("//div[@class='main']/div[@class='content']/p/text()")content = "\n".join(content_list).strip()async with aiofiles.open(f"{name}/{title}.txt", mode="w", encoding="utf-8") as f:await f.write(content)print(title)# 方案一
# async def download_chapter(chapter):
# chapter_name = chapter['chapter_name']
#
# if not os.path.exists(chapter_name):
# os.makedirs(chapter_name)
# tasks = []
# for href in chapter['chapter_hrefs']:
# tasks.append(asyncio.create_task(download_one(chapter_name, href)))
# await asyncio.wait(tasks)# 方案二
async def download_all(chapter_info):tasks = []for chapter in chapter_info:name = f"./小说/{chapter['chapter_name']}"if not os.path.exists(name):os.makedirs(name)for url in chapter['chapter_hrefs']:task = asyncio.create_task(download_one(name, url))tasks.append(task)await asyncio.wait(tasks)def main():url = "http://www.mingchaonaxieshier.com/"# 获取每一篇文章的名称和url地址chapter_info = get_chapter_info(url)# 可以分开写. 也可以合起来写.# 方案一,分开写:# for chapter in chapter_info:# asyncio.run(download_chapter(chapter))# 方案e,合起来下载:asyncio.run(download_all(chapter_info))if __name__ == '__main__':main()
效果如下:
6.总结:
在 Python 爬虫开发中,协程、多线程和多进程是三种常用的并发技术,用于提高爬虫的效率和性能。它们各自有不同的适用场景和优缺点。以下总结时是AI给的。
1. 协程
-
定义:协程是通过
async
和await
实现的异步编程模型,属于用户态的并发机制。 -
优点:
-
高效利用 CPU:在 I/O 操作(如网络请求)时,不会阻塞整个线程,而是切换到其他协程继续执行,充分利用 CPU 时间。
-
高并发能力:可以在单个线程中创建大量协程,适合处理大量 I/O 密集型任务。
-
低资源消耗:协程的上下文切换开销较小,占用内存少。
-
缺点:
-
单线程限制:尽管可以在单线程中并发执行协程,但整体受制于单个线程,不适合 CPU 密集型任务。
-
实现复杂:需要使用异步编程模型,代码可读性稍差,调试难度较高。
-
应用场景:主要用于爬取大量网页时的网络请求,尤其是在 I/O 等待时间较长的情况下。
2. 多线程
-
定义:多线程通过
threading
模块实现,是操作系统支持的一种并发机制。 -
优点:
-
简单易用:编程模型相对直观,代码易于理解和维护。
-
适合 I/O 密集型任务:在 I/O 操作时,线程会阻塞,但其他线程仍然可以运行,适合处理网络请求等任务。
-
缺点:
-
GIL 的限制:在 CPython 中,全局解释器锁(GIL)会限制同一时刻只有一个线程执行 Python 字节码,导致多线程在 CPU 密集型任务中效率低下。
-
资源消耗大:每个线程都有独立的栈空间,占用较多内存资源。
-
应用场景:适用于爬取少量网页时的网络请求任务,尤其是当爬取的网站数量不多时。
3. 多进程
-
定义:多进程通过
multiprocessing
模块实现,每个进程可以独立运行一个 Python 解释器实例。 -
优点:
-
绕过 GIL:多个进程可以在多核 CPU 上并行运行,充分利用多核 CPU 的计算能力。
-
高并发能力:可以创建多个进程,每个进程独立运行,适合处理 CPU 密集型任务。
-
缺点:
-
资源消耗大:每个进程都有独立的内存空间,占用较大的系统资源。
-
进程间通信复杂:进程间通信需要通过队列、管道等机制实现,编程复杂度较高。
-
应用场景:适合处理计算密集型的爬虫任务,如解析大量数据、运行复杂的算法等。
4. 总结与建议
-
I/O 密集型任务:
-
首选协程:如果爬虫主要涉及大量的网络请求,协程是最佳选择,因为它可以在单个线程内高效地处理大量并发任务。
-
其次多线程:如果任务数量较少,且对并发要求不高,多线程也是一个不错的选择。
-
CPU 密集型任务:
-
首选多进程:如果爬虫需要处理大量的数据解析或计算任务,多进程可以充分利用多核 CPU 的优势。
-
混合场景:
-
线程 + 协程:在某些场景下,可以结合使用多线程和协程,例如在每个线程中运行多个协程,以充分利用线程和协程的优点。
-
多进程 + 协程:对于复杂的爬虫任务,可以使用多进程来处理 CPU 密集型任务,同时在每个进程中运行协程来处理 I/O 密集型任务