当前位置: 首页 > news >正文

Python线程池知多少

目录

目标

Python版本

官方文档

概述

线程池

实战

创建线程池的基本语法

批量提交任务

生产者&消费者模型


目标

        掌握线程池的基本概念和使用方法。


Python版本

        Python 3.9.18


官方文档

concurrent.futures — Launching parallel taskshttps://docs.python.org/3.9/library/concurrent.futures.html


概述

线程池

        线程的创建和销毁是需要消耗资源的,比如:CPU资源、内存、上下文切换等。线程池使得线程被创建后不停地被复用,大概过程是:

  1. 程序启动后线程池会预先创建一些线程,并将这些线程放入线程池中,这些线程是闲置着的,随时准备执行任务。
  2. 当需要任务执行时,程序会向线程池提交任务,线程池会取出一个空闲的线程来执行任务。
  3. 任务执行完成后线程不会被销毁,而是再次放回到线程池中,等待下一个任务。

我们创建线程池可以指定最大线程数量,也可以不指定。官方文档对于默认线程数量做了描述:

if max_workers is None: 
# ThreadPoolExecutor is often used to: 
# * CPU bound task which releases GIL 
# * I/O bound task (which releases GIL, of course) 

# We use cpu_count + 4 for both types of tasks. 
# But we limit it to 32 to avoid consuming surprisingly large resource 
# on many core machine. max_workers = min(32, (os.cpu_count() or 1) + 4)

即默认设置的最大线程数量是:

  1. 操作系统核心数量+4,
  2. 如果获取不到操作系统核心数量则最大线程数量是1个。
  3. 默认不超过32个线程数量。

实战

创建线程池的基本语法

import random
import time
from concurrent.futures import ThreadPoolExecutor

def fun(url):
    time.sleep(random.randint(1,3))
    # 获取当前时间戳
    current_time = time.localtime()
    # 格式化时间为yyyy-MM-dd HH:mm:ss
    formatted_time = time.strftime('%Y-%m-%d %H:%M:%S', current_time)
    return f"请求到了{url}的网络数据:{formatted_time}"

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=4)
    task_1=executor.submit(fun, "www.baidu.com")
    task_2=executor.submit(fun, "www.bilibili.com")
    task_3=executor.submit(fun, "www.jd.com")
    task_4=executor.submit(fun, "www.taobao.com")
    task_5 = executor.submit(fun, "www.tianmao.com")

    task_1.done()
    task_2.done()
    task_3.done()
    task_4.done()
    task_5.done()

    result_1=task_1.result()
    result_2=task_2.result()
    result_3=task_3.result()
    result_4=task_4.result()
    result_5=task_5.result()

    print(result_1)
    print(result_2)
    print(result_3)
    print(result_4)
    print(result_5)

    print("主线程结束")

批量提交任务

需求

        创建一个线程池,用来批量执行不同页面的请求任务。

import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_page_html(page_num):
    time.sleep(random.randint(1,5))
    # 获取当前时间戳
    current_time = time.localtime()
    # 格式化时间为yyyy-MM-dd HH:mm:ss
    formatted_time = time.strftime('%Y-%m-%d %H:%M:%S', current_time)
    return f"请求到第{page_num}页数据:{formatted_time}"

if __name__ == '__main__':
    executor=ThreadPoolExecutor()
    #请求第一页至第一百页的数据。
    page_num_list=[page_num for page_num in range(1,101)]
    task_list = [
        executor.submit(get_page_html,page_num) for page_num in page_num_list
    ]
    #按照task_list元素的顺序返回结果,即使后面的页面数据提前请求到了数据,也会靠后返回。
    for task in task_list:
        print("按照task_list中的元素顺序返回数据:",task.result())

    #按照先执行完,先返回结果。
    for future in as_completed(task_list):
        print("按照先请求到数据就先返回数据:",future.result())
        
    print("主线程结束")

生产者&消费者模型

需求

        每次生产出一只烤鸭,就会被等待的消费者消费,最多生产100000只烤鸭。要求用线程池实现。

import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor

# 最多生产烤鸭数量
max_duck_count = 100000
# 当前生产的烤鸭数量
duck_count = 0
# 把生产的烤鸭放入队列中
duck_queue = queue.Queue()


def produce_duck(duck_queue):
    global duck_count
    # 生产者不停地生产烤鸭
    while True:
        if duck_count >= max_duck_count:
            break
        duck_count += 1
        duck_queue.put(duck_count)
        print(f"===========生产者{threading.current_thread().name}生产了{duck_count}只烤鸭。")

    # 所有生产者完成后放入退出信号
    duck_queue.put(None)


def consume_duck(duck_queue):
    # 消费者不停地消费烤鸭
    while True:
        duck = duck_queue.get()
        if duck is None:
            # 收到退出信号后,消费者退出
            duck_queue.put(None)  # 传递退出信号给其他消费者
            break
        print(f"——————————消费者{threading.current_thread().name}消费了{duck}只烤鸭。")


# 初始化线程,设置名称
def init_thread():
    thread_id = threading.get_ident()
    threading.current_thread().name = f"线程_{thread_id}"


if __name__ == '__main__':
    thread_pool = ThreadPoolExecutor(max_workers=10,initializer=init_thread)

    # 启动多个生产者线程
    for i in range(5):
        thread_pool.submit(produce_duck, duck_queue)

    # 启动多个消费者线程
    for i in range(8):
        thread_pool.submit(consume_duck, duck_queue)

    # 等待线程池中的任务执行完毕
    thread_pool.shutdown(wait=True)

    print("主线程结束。")

相关文章:

  • 【Qt】ffmpeg照片提取、视频播放▲
  • 【Java 基础(人话版)】Java SE vs Java EE
  • 请解释 Node.js 中的网络模块(http、https),如何创建 HTTP服务器?
  • ESP32+Mixly+温湿度传感器DHT11
  • LangChain项目实战1——基于公司制度RAG回答机器人
  • PHP的学习
  • 如何通过 LlamaIndex 将数据导入 Elasticsearch
  • DAY09 Map接口、斗地主案例(有序版本)、冒泡排序
  • Mysql100道高频面试题
  • DeepSeek开源周-汇总
  • CSS浮动详解
  • 稀疏数组学习
  • Vue3项目如何使用TailWind CSS保姆级教程
  • 使用Python开发以太坊智能合约:轻松入门与深度探索
  • Linux与UDP应用1:翻译软件
  • C++Primer学习(4.8位运算符)
  • Qt中如果槽函数运行时间久,避免阻塞主线程的做法
  • 250301-OpenWebUI配置DeepSeek-火山方舟+硅基流动+联网搜索+推理显示
  • 【数据结构】链表与顺序表的比较
  • Python精进系列:divmod 函数
  • 做平面的网站/网站分析报告
  • 北京国贸网站建设/厦门网络推广培训
  • 手机网站全屏/我对网络营销的理解
  • 论坛网站怎么做/宁波seo网站服务
  • 广州营销型网站建设/电商平台发展现状与趋势
  • 张家港营销型网站建设/百度快速收录权限域名