【多进线程】python多进线程与通信
多进线程
选择多进程还是多线程取决于任务类型:
-
CPU密集型任务: 多进程,可充分利用多核
-
I/O密集型任务:多线程,创建和切换开销小
-
进程间不共享内存,需要通过队列、管道等机制进行通信
1. python多进程实现方法
1.1 multiprocessing
import multiprocessingdef worker1(task_name, duration):'''进程1'''print("进程1")def worker2(task_name):'''进程2'''print("进程2")def main():# 创建进程child_process1 = multiprocessing.Process(target=worker1, args=("arg1", "arg2"))child_process2 = multiprocessing.Process(target=worker2, args=("arg1"))# 启动进程child_process1.start()child_process2.start()# 等待进程结束child_process1.join()child_process2.join()print("所有任务完成")
1.2 concurrent.futures
1.2.1 ProcessPool 进程池
from concurrent.futures import ProcessPoolExecutorPROCESS_POLL_MAX = 10def worker1(task_name, duration):'''进程1'''print("进程1")def worker2(task_name):'''进程2'''print("进程2")def main():executor = ProcessPool(max_workers=THREAD_POLL_MAX)process_list = []while True:if 1:future = executor.submit(worker1, arg1, arg2)process_list.append(future)if 2:future = executor.submit(worker2, arg1)process_list.append(future)# 未被占用的线程process_list = [ p for p in process_list if not p.done ]# 资源回收executor.shutdown(wait=True)
1.3 os.fork 父子2个进程
import osdef main():# 创建子进程pid = os.fork()if pid > 0:# 父进程print(f"父进程:我的pid {os.getpid()}")print(f"父进程:创建的子进程ppid: {pid}")elif pid == 0:# 子进程print(f"子进程:我的pid:{os.getpid()}")print(f"子进程:父进程的pid:{os.ggetppid()}")else:print("创建进程失败")
2. python多线程实现方法
2.1 threading
import threadingdef worker(task_name, duration):"""线程执行的任务函数"""print(f"任务 {task_name} 开始执行")time.sleep(duration)print(f"任务 {task_name} 完成,耗时 {duration} 秒")# 创建线程
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 3))# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()print("所有任务完成")
2.2 concurrent.futures 线程池
2.2.1 ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
THREAD_POLL_MAX = 10 #现成池线程数def task1(arg):print("args:", args)def task2(arg1, arg2, arg3):print("args:", args)def main():executor = ThreadPoolExecutor(max_workers=THREAD_POLL_MAX)thread_list = []while True:if 1:future = executor.submit(task1, arg1)thread_list.append(future)if 2:future = executor.submit(task2, arg1, arg2, arg3) # 每个线程的参数个数等可以不同thread_list.append(future)# 未被占用的线程thread_list = [ t for t in thread_list if not t.done ]# 资源回收executor.shutdown(wait=True)
3. 进线程间通信
3.1 Queue 队列
import threading
import queuedef producer(q, producer_id):'''生产者线程,往队列添加产物'''# 如要求每个id 生产者,生产5个产物for i in range(5):item = f"{producer_id}产出的{i}产物"q.put(item)print(f"生产者{producer_id}生产了:{item}")def consumer(q, consumer_id):'''消费者线程,从队列拿出产物'''while True:item = q.get()if item is None:q.put(None) # 将结束信号放回,让其他消费者也能结束breakelse:print(f"消费者{consumer_id}消费了{item}")q.task_done()def main():# 创建队列q = queue.Queue()#创建生产者与消费者线程producers = []consumers = []# 2个生产者线程for i in range(2):t = threading.Thread(target=producer, args=(q, i))t.start()producers.append(t)# 2个消费者线程for i in range(2):t = threading.Thread(target=consumer, args=(q, i))t.start()consumers.append(t) # 等待生产者完成
for t in producers:t.join()# 等待队列中所有的任务被处理
q.join()# 添加结束信号
q.put(None)# 等待消费者完成
for t in consumers:t.join()print("所有任务完成")
3.2 multiprocessing 管道
import multiprocessingdef worker(conn, worker_id):'''工作进程'''while True:data = conn.recv()if data = 'exit':break;print(f"工作进程{eorker_id} 接收到{data}")# 处理rusult = data.upper()# 并发回处理结果conn.send(rusult)conn.close()def main():# 创建管道parent_conn, chile_conn = multiprocessing.Pipe()# 创建子进程p = multiprocessing.Process(target=worker, args=(child_conn, 1))p.start()# 主进程发送数据messages = ['hell', 'world', 'python', 'exit']for msg in message:parent_conn.send(msg)if msg != 'exit'response = parent_conn.recv()print(f"主进程收到{msg}发送结果响应{response}")# 进程等待回收p.join()print("所有任务均已完成")
4. 信号同步
import signal
import threadingsignal_user1_flag = threading.Event()# 信号接口处理函数
def signal_user1_handle_worker():signal_user1_flag.set()def worker():if not signal_user1_flag.is_set():print("work....")# 信号注册(可注册多个)
signal.signal(signal.SIGUSER1, signal_handle_worker)# 信号发送
os.kill(pid, signal.SIGUSER1)