当前位置: 首页 > news >正文

多线程-3-线程同步

目录

Lock

RLock

Condition

Condition

try使用两个线程启动大模型,进行辩论。

 semaphore


Lock

threading.Lock是Python threading模块中用于线程同步最基础的工具,可以把它理解为一把“锁”。

核心作用 :确保在任何时刻,只有一个线程能够执行某段被“锁”保护的代码。这可以有效防止多个线程同时修改共享数据时引发的 竞态条件 (Race Condition) 。

工作方式 :

1. 锁定 (Acquire) : 当一个线程想要访问共享资源时,它首先尝试获取锁 ( lock.acquire() )。
   如果锁是空闲的,该线程就能获得锁,并继续执行。
   如果锁已经被其他线程持有,该线程就会被 阻塞 ,直到锁被释放。
2. 释放 (Release) : 当线程完成了对共享资源的操作后,它必须释放锁 ( lock.release() ),这样其他等待的线程才有机会获取锁并执行。

import threading lock = threading.Lock()
# 获取lock
a = 0
def fun():global lockglobal afor i in range(10000):lock.acquire()a += 1lock.release
def fun_2():global lockglobal afor i in range(10000):lock.acquire()a -= 1lock.release# 如果不释放lock就会出现停滞  # 获取锁和释放锁会导致性能降低# 容易引起死锁  连续require会引起死锁

下面两种代码可以更好理解with lock的作用和实现:

import threading
from openai.types.beta import threadlock = threading.Lock()def fun(i):global locklock.acquire()print("hello ---->{}".format(i))lock.release()for i in range(10):t = threading.Thread(target=fun, args=(i,))t.start()t.join()
import threading
from openai.types.beta import threadlock = threading.Lock()def fun(i):global lockwith lock:print("hello ---->{}".format(i))for i in range(10):t = threading.Thread(target=fun, args=(i,))t.start()t.join()

RLock

可重入的锁

注意:在一个线程中,可以调用多个lock.acquire() 但是acquire的次数要和release对应。

import threadinga = 0
lock = threading.RLock()
def add():global a,lockfor i in range(1000):lock.acquire()a += 1lock.release()def sub():global a,lockfor i in range(1000):lock.acquire()lock.acquire()a -= 1lock.release()lock.release()t1 = threading.Thread(target=add)
t2 = threading.Thread(target=sub)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)

Condition

Condition

条件变量,用于线程同步

注意:

  1. 在该代码中,不能率先strat天猫,这是因为tianmao启动后会马上notify()但是此时小爱还没有启动,所以小爱接受不到,会陷入wait状态
  2. 而tianmao已经notify,也陷入wait状态,就会导致陷入wait死锁
  3. threading.Thread中strat()方法默认执行的是run()方法,所以需要重写或者将非重写函数在run方法中调用
import threading 
from threading import Conditionclass tianmao(threading.Thread):def __init__(self,condition : Condition,dialog : list[str]) -> None:super().__init__(name="天猫")self.condition = conditionself.dialog = dialogdef run(self) -> None:with self.condition:for s in self.dialog:print(f"天猫说:{s}")self.condition.notify()self.condition.wait()class xiaoai(threading.Thread):def __init__(self,condition : Condition,dialog : list[str]) -> None:super().__init__(name="小爱同学")self.condition = conditionself.dialog = dialogdef run(self) -> None:with self.condition:for line in self.dialog:self.condition.wait()self.condition.notify()print(f"小爱同学说:{line}")if __name__ == "__main__":tianmao_dialog = ["小爱同学","我们来对古诗吧","我住长江头","日日思君不见君","此水几时休","只愿君心似我心"]xiaoai_dialog = ["在","好啊","君住长江尾","共饮长江水","此恨何时已","定不负相思意"]cond = Condition()xiaoai = xiaoai(cond,xiaoai_dialog)tianmao = tianmao(cond,tianmao_dialog)xiaoai.start()tianmao.start()tianmao.join()xiaoai.join()

try使用两个线程启动大模型,进行辩论。

from openai import AzureOpenAI,OpenAI
import threading
from threading import Condition
import os
import time
class People(threading.Thread):def __init__(self,cond : Condition,name : str,client : AzureOpenAI,model : str,prompt : str,):super().__init__(name=name)self.cond = condself.client = clientself.model = modelself.prompt = promptdef run(self) -> None:passclass LuXun(People):def __init__(self,cond : Condition,name : str,client : AzureOpenAI,model : str,prompt : str,):super().__init__(cond=cond, name=name, client=client, model=model, prompt=prompt)def run(self) -> None:global HISTORYwith self.cond:messages = []while True:if not messages:messages = [{"role": "system", "content": self.prompt}]else:messages.append({"role": "user","content": HISTORY[-1]["content"]})response = self.client.chat.completions.create(model=self.model,messages=messages,)reply = response.choices[0].message.contentHISTORY.append({"role": "luxun","content": reply})print(f"\033[32m{self.name}:{reply}\033[0m")time.sleep(5)self.cond.notify()self.cond.wait()class YuHua(People):def __init__(self,cond : Condition,name : str,client : AzureOpenAI,model : str,prompt : str,):super().__init__(cond=cond, name=name, client=client, model=model, prompt=prompt)def run(self) -> None:global HISTORYmessages =   []with self.cond:while True:self.cond.wait()messages.append({"role": "system", "content": self.prompt})messages.append({"role": "user","content": HISTORY[-1]["content"]})response = self.client.chat.completions.create(model=self.model,messages=messages,)reply = response.choices[0].message.contentHISTORY.append({"role": "yuhua","content": reply})print(f"\033[33m{self.name}:{reply}\033[0m")time.sleep(3)self.cond.notify()if __name__ == "__main__":HISTORY : list[dict] = []os.environ['AZURE_API_KEY'] = 'XXXXXXXX'os.environ['AZURE_API_VERSION'] = "XXXXX"os.environ['AZURE_ENDPOINT'] = "https://search.bytedance.net/gpt/openapi/online/v2/crawl"client = AzureOpenAI(api_key=os.getenv("AZURE_API_KEY"),api_version=os.getenv("AZURE_API_VERSION"),azure_endpoint=os.getenv("AZURE_ENDPOINT"))model = "gpt-4o-2024-08-06"client_ = OpenAI(base_url="https://ark-cn-beijing.bytedance.net/api/v3",api_key="XXXXXX")model_="XXXXXXXXX"luxun_p = "你正在参加一场激烈的辩论赛。辩题是:“如果你有超能力,可以让你爱的人也爱你,你要不要使用这项超能力?”你代表正方,主张“要使用超能力”。\n你代表正方,主张“要使用超能力”。你首先发言,在对方没有发表观点前不要反驳。每轮发言须:1. 简明表达自身观点。2. 针对反方上一轮具体观点进行有针对性的反驳、质疑或提出新论据。3. 必须尝试引入新的论据、现实案例、哲学思辨或情感共鸣,避免重复。4. 可以对反方观点提出追问,引导对方自我反思。5. 每次回答控制在120字以内,语言犀利、逻辑清晰,体现辩论攻防。"yuhua_p = "你正在参加一场激烈的辩论赛。辩题是:“如果你有超能力,可以让你爱的人也爱你,你要不要使用这项超能力?”你代表反方,主张“不使用超能力”。你代表反方,主张“不使用超能力”。每轮发言须:1. 简明表达自身观点。2. 针对正方上一轮具体观点进行有针对性的反驳、质疑或提出新论据。3. 必须尝试引入新的论据、现实案例、哲学思辨或情感共鸣,避免重复。4. 可以对正方观点提出追问,引导对方自我反思。5. 每次回答控制在120字以内,语言犀利、逻辑清晰,体现辩论攻防。"cond = Condition()yuhua = YuHua(cond=cond, name= "余华", client=client_, model=model_, prompt=yuhua_p)luxun = LuXun(cond=cond, name="鲁迅", client=client, model=model, prompt=luxun_p)yuhua.start()luxun.start()luxun.join()yuhua.join()

 semaphore

在并发编程中,信号量(semaphore)是一种经典的同步机制,用于控制多个线程对共享资源的访问。有荷兰科学家Edsger Dijkstra在1960年提出,是操作系统和并发编程中的最基础的同步原语之一。

注意:

  1. 操作系统调度线程的方式是非确定性 的,调试器(debug)的介入会显著改变线程的执行顺序。
  2. Python 的 threading.Semaphore 实现不保证公平性 (即等待线程的唤醒顺序可能不是 FIFO)。
  • 正常运行 线程调度由操作系统动态决定,可能并发执行多个线程。
  • 调试模式 :设置断点、单步执行会强制线程按调试器的节奏运行,导致:
    • 阻塞操作延迟 :如 acquire()release() 的执行顺序被打乱。
    • 线程切换延迟 :调试器暂停某个线程时,其他线程可能无法及时抢占资源。
from concurrent.futures import thread
import threading
from threading import Semaphore
import time
class user(threading.Thread):def __init__(self,semaphore: Semaphore):super().__init__()self.semaphore = semaphoredef run(self):time.sleep(5)print(f"\033[32m{self.semaphore._value} is releasing\033[0m")self.semaphore.release()class assistant(threading.Thread):def __init__(self,semaphore:Semaphore):super().__init__()self.semaphore = semaphoredef run(self) -> None:for i in range(10):self.semaphore.acquire()user_ = user(self.semaphore)user_.start()print(f"\033[33m{self.semaphore._value + 1} is running\033[0m")if  __name__ == "__main__":semaphore = Semaphore(3)assistant_ = assistant(semaphore)assistant_.start()assistant_.join()

http://www.dtcms.com/a/286914.html

相关文章:

  • HTTPie: 开发者友好的http客户端工具
  • 数据排序
  • 特种作业操作证(制冷空调)的考试科目有哪些?
  • Xilinx Zynq:一款适用于软件定义无线电的现代片上系统
  • 使用 C# 实现移动加权平均(Weighted Moving Average)算法
  • java基础-5 : 面向对象
  • python网络爬虫(第三章/共三章:驱动浏览器窗口界面,网页元素定位,模拟用户交互(输入操作、点击操作、文件上传),浏览器窗口切换,循环爬取存储)
  • RPG60.生成可拾取物品
  • 拓扑排序/
  • 安卓Android项目 报错:系统找不到指定文件
  • Python编程:从入门到实践
  • rpa机器人流程自动化软件公司是做什么的?如何选择RPA厂商?简要介绍RPA技术、应用场景和未来趋势
  • Shell变量操作
  • Linux内核设计与实现 - 第4章 进程的调度
  • 函数返回值问题,以及返回值的使用问题(c/c++)
  • [FDBUS4.2] watcher的使用
  • STM32-CAN
  • vs openssl编译提示无法打开文件“libssl.lib”或“libcrypto.lib”
  • 理解 VMA 与 LMA
  • 【实战】Dify从0到100进阶--文档解读(8)文档列表节点
  • 深入剖析 Delta Live Tables (DLT):声明式数据管道的核心原理与底层实现
  • git:tag标签远程管理
  • 公贝固定资产管理系统对接HR、财务及采购系统的方案与效益
  • 【实用工具】HDCleaner:高效、安全、免费的系统清洁工具,免费的电脑清理垃圾神器,20秒扫出20G垃圾!
  • LP-MSPM0G3507学习--05中断及管脚中断
  • 习题4.1 输出3个人的顺序
  • APIs案例及知识点串讲(下)
  • NFS读写性能评估与优化指南(上)
  • Android性能优化之电量优化
  • C 语言字符大小写互转:tolower / toupper 详解与实战