python多线程开发
文章目录
- 前言
- 1.threading模块
- 2.threading传递参数
- 2.1args传递参数
- 2.2kwargs传递参数
- 2.3使用class封装线程函数
- 3.ThreadPoolExecutor
前言
在处理大量数据时,需要使用多线程,分布式等方法来提高速率。目前使用比较多的是python,但是我了解到阿里等大厂的安全开发开始用go语言来替换python了,所以建议后面再学习一下go语言。
1.threading模块
threading.Thread(target=目标函数):创建执行目标函数的线程
start():开始执行线程
join():等待线程结束
import threading
def test_thread():
print("aaa:")
def test2_thread():
print("bbb")
def test():
thread1 = threading.Thread(target=test_thread)
thread2 = threading.Thread(target=test2_thread)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
if __name__ == '__main__':
test()
2.threading传递参数
2.1args传递参数
单个参数,注意逗号
thread = threading.Thread(target=函数, args=(参数1,))
多个参数
thread = threading.Thread(target=函数, args=(参数1, 参数2))
2.2kwargs传递参数
通过key:value的形式传递的
thread = threading.Thread(target=函数, kwargs={"key1": "value1", "key2": value2})
2.3使用class封装线程函数
在调用start时,会先执行线程的init和run函数,我们重载init函数就可以传递参数。
同时重载run函数也可以自定义里面的功能
class test_thread(threading.Thread): #继承线程
def __init__(self, output): #重载init函数,self是函数自己,output是参数
super().__init__() #调用父类的init函数
self.output = output
def run(self):
print(self.output)
.... #自定义
完整代码:
import threading
class test_thread(threading.Thread):
def __init__(self, output):
super().__init__()
self.output = output
def run(self):
print(self.output)
def test():
threads = []
for i in range(10):
thread = test_thread(f"这是第{i}个")
threads.append(thread)
for i in threads:
i.start()
for i in threads:
i.join()
if __name__ == '__main__':
test()
3.ThreadPoolExecutor
如果任务数量较多,直接使用 threading.Thread 创建每个线程可能会导致性能问题。可以使用线程池来管理这些线程,这样可以提高性能并避免线程过多导致的资源浪费。
ThreadPoolExecutor是通过将任务不断提交到线程池来完成多线程的,我们首先定义最大线程数max_workers=?:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
with可以确保在任务完成后自动进行清理工作(如释放资源),即使过程中发生了异常也会确保资源的正确释放。
with 语句的工作原理:
with 语句的核心是上下文管理器,它是一种实现了 enter 和 exit 方法的对象。
enter:在 with 语句块开始时执行,通常用于资源的初始化。
exit:在 with 语句块结束时执行,无论是正常结束还是发生异常,通常用于清理资源。
在 with 语句块执行过程中,enter 方法会被调用并返回一个上下文对象,然后该对象会被用于执行 with 语句内部的代码。当 with 语句块执行完毕时,无论是正常结束还是抛出异常,exit 方法都会被调用,从而确保资源的释放。
加入线程需要我们submit(函数名,[参数])。
task = executor.submit()
future.result() :除了可以获取结果以外,还可以阻塞直到任务完成。当你调用 future.result() 时,它会等待任务执行完成并返回结果。如果任务抛出了异常,那么 future.result() 会重新抛出该异常。因此,我们需要在调用 future.result() 时使用 try-except 来捕获任务中的异常。
完整代码:
import concurrent.futures.thread
def test_fun():
print("a")
return 1
def test():
with concurrent.futures.thread.ThreadPoolExecutor(max_workers=3) as ss:
mytasks = [ss.submit(test_fun) for i in range(10)] #提交了10个任务
for mytask in concurrent.futures.as_completed(mytasks):
try:
print(mytask.result())
except Exception as e:
print(e)
if __name__ == '__main__':
test()
不使用with:
你必须手动调用 shutdown() 来关闭线程池
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
executor.shutdown()