Python 实现多服务器并发启动 SDK-C Master 与 Viewer 的分布式方案
前一篇文章:https://blog.csdn.net/zhang_jiamin/article/details/149199519?spm=1011.2415.3001.5331
介绍了在一台服务器上并发启动master,另一台服务器上并发启动viewer。
这次介绍一下使用python实现分布式在多台服务器上并发启动master和viewer,用以实现更高的并发和负载。
远程 SSH 连接 EC2的python工具
Paramiko
安装
pip install paramiko
使用
import paramikohosts = ['70.xxx.250.xxx', '55.xx.222.xx']for host in hosts:with paramiko.SSHClient() as ssh:ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(hostname=host, username='ec2-user', key_filename='qa.pem')ssh.exec_command("touch 222.txt")
注:访问EC2的证书放在脚本的同目录。
Fabric
安装
pip install fabric
使用
from fabric import Connectionec2_hosts = ['70.xxx.250.xxx', '55.xx.222.xx']def connect_ec2(host):# 登录EC2conn = Connection(host=host, user='ec2-user', connect_kwargs={"key_filename": "qa.pem"})# 执行shell脚本conn.run("cd kvs-webrtc-sdk/ && bash multi_master_v2.sh")
- cd kvs-webrtc-sdk/ && bash multi_master_v2.sh 是在 同一个 shell session 中连续执行的
- 如果写成两条 conn.run(),每条命令会启动一个新的 shell,因此 cd 不会“保留路径”;
- 可以执行任意复杂命令,比如:
conn.run("cd /data/scripts && chmod +x *.sh && ./deploy.sh")
Paramiko 与 Fabric 对比
工具 | 本质 | 底层依赖 |
---|---|---|
Paramiko | 一个低层级的 SSH 库 | 原生实现 SSH 协议 |
Fabric | 一个高级 SSH 操作库(自动化部署工具) | 使用了 Paramiko |
Fabric 是基于 Paramiko 封装的更易用工具,提供了更简洁的 API 和自动化功能,适合执行多台服务器上的批量操作。
对比项 | Paramiko | Fabric |
---|---|---|
使用难度 | 较低层级,需要手动管理连接、执行命令等 | 较高层级,命令式风格更接近 Shell 脚本 |
灵活性 | 高,可以精细控制每一步 | 中,适合批量任务但不适合复杂交互 |
并发执行 | 需要自己用 threading 或 multiprocessing | Fabric 2.x+ 原生支持并发执行(使用 Group 等) |
自动化批处理 | 要自己写逻辑 | 很适合,比如部署、更新、批量执行命令 |
安装体积 | 小 | 大(因为依赖更多,如 invoke) |
学习成本 | 稍高(偏底层) | 较低,语法接近 Shell,适合部署场景 |
需求类型 | 推荐工具 | 理由 |
---|---|---|
只需连接一台机器,执行简单命令 | Paramiko | 更轻量、无额外依赖 |
批量连接多台服务器 | Fabric | API 更高级,支持并发、结果收集、任务复用等 |
自定义复杂 SSH 协议行为 | Paramiko | Fabric 无法精细操作底层,如端口转发、SFTP 文件校验等 |
熟悉 Shell + Python | Fabric | 类似写 Shell 自动化脚本,集成方便 |
结论
基于以上分析,我选择的是用fabric。
python多线程工具选择
并发(Concurrency)指的是程序在同一个时间段内处理多个任务。
在 Python 中,实现并发的主要方式有:
并发模型 | 核心模块 | 适合场景 |
---|---|---|
线程 Threading | threading / concurrent.futures.ThreadPoolExecutor | IO 密集型任务,如网络请求、文件读写 |
进程 Multiprocessing | multiprocessing / ProcessPoolExecutor | CPU 密集型任务,如图像处理、加密计算 |
协程 Coroutine | asyncio, trio, curio 等 | 高并发、轻量级协程任务,如高并发服务器、爬虫 |
并发 vs 并行(多核)
并发:多个任务看起来在同时进行,实际是在一个线程中快速切换(适合 IO 密集)
并行:多个任务真正同时在多个 CPU 核心上执行(适合 CPU 密集)
在 Python 中,由于 GIL(全局解释器锁) 的存在,线程不能真正并行跑 Python 字节码,但对于 IO 密集型任务非常有效。
适合使用 ThreadPoolExecutor 的场景
- 网络请求(如批量请求 API、ping 多个 EC2)
- 文件操作(如并发读写多个日志)
- 数据库访问(如查询多个数据源)
如果要跑大量 CPU 密集任务(如图像压缩、加密运算),应改用 ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
我选择的是多线程并发。
最后代码
from fabric import Connection
from concurrent.futures import ThreadPoolExecutorec2_hosts = ['70.xxx.250.xxx', '55.xx.222.xx']def connect_ec2(host):# 登录EC2conn = Connection(host=host, user='ec2-user', connect_kwargs={"key_filename": "qa.pem"})# 执行shell脚本conn.run("cd kvs-webrtc-sdk/ && bash multi_master_v2.sh")with ThreadPoolExecutor() as pool:futures = [pool.submit(connect_ec2, host) for host in ec2_hosts]for future in futures:future.result()
- 如果机器很多(10+),可以设置线程池最大并发数max_workers:
with ThreadPoolExecutor(max_workers=10) as pool:
表示最多允许10个线程并发。如果没传 max_workers,Python 会默认用 min(32, os.cpu_count() + 4)。
- ThreadPoolExecutor()
这是 Python 提供的线程池执行器。它管理一个线程池来运行函数。
等价于手动用 threading.Thread(target=func).start() 启动多个线程,但它更安全、结构更清晰、具备异常处理能力。
- pool.submit(connect_ec2, host)
submit 表示提交任务到线程池异步执行:
future = pool.submit(func, *args, **kwargs)
它会立即返回一个 Future 对象,任务会在后台某个线程中执行。
所以:
futures = [pool.submit(connect_ec2, host) for host in ec2_hosts]
表示:对 ec2_hosts 中每个主机,启动一个线程去运行 connect_ec2(host)。
- future.result()
future.result() 表示阻塞等待某个任务执行完,并返回执行结果。
for future in futures:future.result()
1、表示要等所有线程都跑完,确保全部执行完成后再往下执行。
2、这段代码是“并发执行任务,顺序获取结果”,它不是串行运行函数,而是:
(1)任务已经同时提交、并发执行(线程池里跑着)
(2)for future in futures: 只是按顺序取回每个任务的结果
(3)取结果的时候,如果该任务还没执行完,就会阻塞等它完成
(4)实际任务早就并发地在跑了。
3、这个等待是必要的,因为线程执行是异步的,如果不等,有可能主线程就结束了。
总结
元素 | 含义 |
---|---|
ThreadPoolExecutor | 线程池执行器,用于 IO 并发任务 |
submit(func, arg) | 提交任务,返回 Future |
future.result() | 阻塞等待任务完成,返回执行结果 |
max_workers=N | 控制最多并发线程数量 |
在pycharm中执行结果如下:
/Users/testmanzhang/PycharmProjects/multi_process_launch_viewer.py
🔄 当前并发数:1
🔄 当前并发数:2
🔄 当前并发数:3
🔄 当前并发数:4
🔄 当前并发数:5
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:1
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:2
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:3
🔄 当前并发数:4
🔄 当前并发数:5
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
...
这些打印实际上是服务器上面shell脚本的执行打印,其实是通过fabric同步到了当前代码执行的IDE中。