当前位置: 首页 > news >正文

Python多线程、锁、多进程、异步编程

多线程

import threading
import timedef task(name, delay):print(f"线程 {name} 开始执行")time.sleep(delay)print(f"线程 {name} 执行完成")# 创建线程
thread1 = threading.Thread(target=task, args=("A", 2))
thread2 = threading.Thread(target=task, args=("B", 1))# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()print("所有线程执行完毕")
#线程池
from concurrent.futures import ThreadPoolExecutor
import timedef task(name):print(f"任务 {name} 开始")time.sleep(1)return f"任务 {name} 完成"with ThreadPoolExecutor(max_workers=3) as executor:# 提交多个任务futures = [executor.submit(task, i) for i in range(5)]# 获取结果for future in futures:print(future.result())

锁机制

import threadingcounter = 0
lock = threading.Lock()def increment():global counterfor _ in range(100000):with lock:  # 自动获取和释放锁counter += 1threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()print(f"最终计数器值: {counter}")  # 应该是500000

生产消费者模型

import threading
import queue
import random
import time# 共享队列
q = queue.Queue(maxsize=5)
condition = threading.Condition()def producer():while True:with condition:if q.full():print("队列已满,生产者等待")condition.wait()item = random.randint(1, 100)q.put(item)print(f"生产者生产了: {item}")condition.notify()time.sleep(random.random())def consumer():while True:with condition:if q.empty():print("队列为空,消费者等待")condition.wait()item = q.get()print(f"消费者消费了: {item}")condition.notify()time.sleep(random.random())# 启动生产者和消费者
prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)prod.start()
cons.start()

多进程编程

from multiprocessing import Process
import osdef task(name):print(f"子进程 {name} (PID: {os.getpid()}) 执行")time.sleep(2)if __name__ == '__main__':processes = []for i in range(3):p = Process(target=task, args=(i,))processes.append(p)p.start()for p in processes:p.join()print("所有子进程执行完毕")
#进程池
from multiprocessing import Pool
import timedef square(x):print(f"计算 {x} 的平方")time.sleep(1)return x * xif __name__ == '__main__':with Pool(processes=4) as pool:# 同步执行result = pool.map(square, range(10))print(result)# 异步执行async_result = pool.map_async(square, range(10))print(async_result.get())
#进程间通信
from multiprocessing import Process, Pipedef sender(conn):conn.send("Hello from sender")conn.close()def receiver(conn):msg = conn.recv()print(f"接收到的消息: {msg}")conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p1 = Process(target=sender, args=(child_conn,))p2 = Process(target=receiver, args=(parent_conn,))p1.start()p2.start()p1.join()p2.join()

异步

协程(Coroutine): 异步执行的任务单元

事件循环(Event Loop): 调度和执行协程的核心

Future: 表示异步操作的最终结果

Task: Future的子类,用于包装和管理协程的执行

import asyncioasync def main(): #携程函数定义print('Hello')await asyncio.sleep(1) #异步等待print('World')asyncio.run(main())

创建和运行协程

async def say_after(delay, message):await asyncio.sleep(delay)print(message)async def main():# 顺序执行await say_after(1, 'hello')await say_after(2, 'world')# 并发执行task1 = asyncio.create_task(say_after(1, 'hello'))task2 = asyncio.create_task(say_after(2, 'world'))await task1await task2asyncio.run(main())

并发运行多个协程

async def fetch_data(task_id, delay):print(f'Task {task_id} started')await asyncio.sleep(delay)print(f'Task {task_id} completed')return f'result-{task_id}'async def main():# 使用gather并发运行results = await asyncio.gather(fetch_data(1, 2),fetch_data(2, 1),fetch_data(3, 3))print(f'All results: {results}')asyncio.run(main())

异步上下文管理器

class AsyncResource:async def __aenter__(self):print('Acquiring resource')await asyncio.sleep(0.5)return selfasync def __aexit__(self, exc_type, exc, tb):print('Releasing resource')await asyncio.sleep(0.5)async def use_resource():async with AsyncResource() as resource:print('Using resource')await asyncio.sleep(1)asyncio.run(use_resource())

异步迭代器

class AsyncCounter:def __init__(self, stop):self.current = 0self.stop = stopdef __aiter__(self):return selfasync def __anext__(self):if self.current >= self.stop:raise StopAsyncIterationawait asyncio.sleep(0.5)self.current += 1return self.currentasync def main():async for number in AsyncCounter(5):print(number)asyncio.run(main())

异步Http请求

import aiohttpasync def fetch_url(session, url):async with session.get(url) as response:return await response.text()async def main():urls = ['https://www.python.org','https://www.google.com','https://www.github.com']async with aiohttp.ClientSession() as session:tasks = [fetch_url(session, url) for url in urls]results = await asyncio.gather(*tasks)for url, content in zip(urls, results):print(f"{url} -> {len(content)} bytes")asyncio.run(main())

异步数据库访问

import asyncpgasync def fetch_data():conn = await asyncpg.connect(user='user', password='pass',database='db', host='localhost')try:# 执行查询result = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)print(result)finally:await conn.close()asyncio.run(fetch_data())
http://www.dtcms.com/a/336508.html

相关文章:

  • Linux | i.MX6ULL网络通信-套字节 TCP(第十七章)
  • 【k8s】Kubernetes核心概念与架构详解
  • 4.8 Vue 3: provide / inject 详解
  • LEA(Load Effective Address)指令
  • MACS2简介
  • 欠拟合和过拟合的特征标志,有什么方法解决,又该如何避免
  • 评测系统构建
  • 20.LeNet
  • [逆向知识] AST抽象语法树:混淆与反混淆的逻辑互换(二)
  • 2001-2024年中国玉米种植分布数据集
  • Cesium学习(二)-地形可视化处理
  • AutoSar BSW介绍
  • PyTorch 面试题及详细答案120题(01-05)-- 基础概念与安装
  • 全星质量管理 QMS:驱动制造业高质量发展的核心工具
  • 雷卯针对香橙派Orange Pi 5 Ultra开发板防雷防静电方案
  • Java研学-SpringCloud(五)
  • 如何理解“速度模式间接实现收放卷恒张力控制“
  • 题目2:使用递归CTE分析产品层级关系
  • 【从零开始学习Redis】项目实战-黑马点评D2
  • 【会议跟踪】ICRA 2021 Workshop:Visual-Inertial Navigation Systems
  • 多线程—飞机大战(加入播放音乐功能版本)
  • 【Virtual Globe 渲染技术笔记】6 着色
  • C语言---第一个C语言程序
  • Tomcat下载、安装及配置详细教程
  • Hybrid Beamforming Design for OFDM Dual-Function Radar-Communication System
  • LaTeX中表示实数集R的方法
  • 零基础搭建公网 Nginx:通过 cpolar 内网穿透服务实现远程访问
  • 朝花夕拾(四) --------python中的os库全指南
  • 【计算机数学】关于全概率和贝叶斯公式的使用场景说明
  • Linux目录相关的命令