当前位置: 首页 > news >正文

Python 实现多服务器并发启动 SDK-C Master 与 Viewer 的分布式方案

前一篇文章:https://blog.csdn.net/zhang_jiamin/article/details/149199519?spm=1011.2415.3001.5331
介绍了在一台服务器上并发启动master,另一台服务器上并发启动viewer。

这次介绍一下使用python实现分布式在多台服务器上并发启动master和viewer,用以实现更高的并发和负载。

远程 SSH 连接 EC2的python工具

Paramiko

安装

pip install paramiko

使用

import paramikohosts = ['70.xxx.250.xxx', '55.xx.222.xx']for host in hosts:with paramiko.SSHClient() as ssh:ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(hostname=host, username='ec2-user', key_filename='qa.pem')ssh.exec_command("touch 222.txt")

注:访问EC2的证书放在脚本的同目录。

Fabric

安装

pip install fabric

使用

from fabric import Connectionec2_hosts = ['70.xxx.250.xxx', '55.xx.222.xx']def connect_ec2(host):# 登录EC2conn = Connection(host=host, user='ec2-user', connect_kwargs={"key_filename": "qa.pem"})# 执行shell脚本conn.run("cd kvs-webrtc-sdk/ && bash multi_master_v2.sh")
  • cd kvs-webrtc-sdk/ && bash multi_master_v2.sh 是在 同一个 shell session 中连续执行的
  • 如果写成两条 conn.run(),每条命令会启动一个新的 shell,因此 cd 不会“保留路径”;
  • 可以执行任意复杂命令,比如:
conn.run("cd /data/scripts && chmod +x *.sh && ./deploy.sh")

Paramiko 与 Fabric 对比

工具本质底层依赖
Paramiko一个低层级的 SSH 库原生实现 SSH 协议
Fabric一个高级 SSH 操作库(自动化部署工具)使用了 Paramiko

Fabric 是基于 Paramiko 封装的更易用工具,提供了更简洁的 API 和自动化功能,适合执行多台服务器上的批量操作。

对比项ParamikoFabric
使用难度较低层级,需要手动管理连接、执行命令等较高层级,命令式风格更接近 Shell 脚本
灵活性高,可以精细控制每一步中,适合批量任务但不适合复杂交互
并发执行需要自己用 threading 或 multiprocessingFabric 2.x+ 原生支持并发执行(使用 Group 等)
自动化批处理要自己写逻辑很适合,比如部署、更新、批量执行命令
安装体积大(因为依赖更多,如 invoke)
学习成本稍高(偏底层)较低,语法接近 Shell,适合部署场景
需求类型推荐工具理由
只需连接一台机器,执行简单命令Paramiko更轻量、无额外依赖
批量连接多台服务器FabricAPI 更高级,支持并发、结果收集、任务复用等
自定义复杂 SSH 协议行为ParamikoFabric 无法精细操作底层,如端口转发、SFTP 文件校验等
熟悉 Shell + PythonFabric类似写 Shell 自动化脚本,集成方便

结论

基于以上分析,我选择的是用fabric。

python多线程工具选择

并发(Concurrency)指的是程序在同一个时间段内处理多个任务。

在 Python 中,实现并发的主要方式有:

并发模型核心模块适合场景
线程 Threadingthreading / concurrent.futures.ThreadPoolExecutorIO 密集型任务,如网络请求、文件读写
进程 Multiprocessingmultiprocessing / ProcessPoolExecutorCPU 密集型任务,如图像处理、加密计算
协程 Coroutineasyncio, trio, curio 等高并发、轻量级协程任务,如高并发服务器、爬虫

并发 vs 并行(多核)
并发:多个任务看起来在同时进行,实际是在一个线程中快速切换(适合 IO 密集)
并行:多个任务真正同时在多个 CPU 核心上执行(适合 CPU 密集)
在 Python 中,由于 GIL(全局解释器锁) 的存在,线程不能真正并行跑 Python 字节码,但对于 IO 密集型任务非常有效。

适合使用 ThreadPoolExecutor 的场景

  • 网络请求(如批量请求 API、ping 多个 EC2)
  • 文件操作(如并发读写多个日志)
  • 数据库访问(如查询多个数据源)

如果要跑大量 CPU 密集任务(如图像压缩、加密运算),应改用 ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

我选择的是多线程并发。

最后代码

from fabric import Connection
from concurrent.futures import ThreadPoolExecutorec2_hosts = ['70.xxx.250.xxx', '55.xx.222.xx']def connect_ec2(host):# 登录EC2conn = Connection(host=host, user='ec2-user', connect_kwargs={"key_filename": "qa.pem"})# 执行shell脚本conn.run("cd kvs-webrtc-sdk/ && bash multi_master_v2.sh")with ThreadPoolExecutor() as pool:futures = [pool.submit(connect_ec2, host) for host in ec2_hosts]for future in futures:future.result()
  • 如果机器很多(10+),可以设置线程池最大并发数max_workers:
with ThreadPoolExecutor(max_workers=10) as pool:

表示最多允许10个线程并发。如果没传 max_workers,Python 会默认用 min(32, os.cpu_count() + 4)。

  • ThreadPoolExecutor()

这是 Python 提供的线程池执行器。它管理一个线程池来运行函数。
等价于手动用 threading.Thread(target=func).start() 启动多个线程,但它更安全、结构更清晰、具备异常处理能力。

  • pool.submit(connect_ec2, host)

submit 表示提交任务到线程池异步执行:

future = pool.submit(func, *args, **kwargs)

它会立即返回一个 Future 对象,任务会在后台某个线程中执行。

所以:

futures = [pool.submit(connect_ec2, host) for host in ec2_hosts]

表示:对 ec2_hosts 中每个主机,启动一个线程去运行 connect_ec2(host)。

  • future.result()

future.result() 表示阻塞等待某个任务执行完,并返回执行结果。

for future in futures:future.result()

1、表示要等所有线程都跑完,确保全部执行完成后再往下执行。
2、这段代码是“并发执行任务,顺序获取结果”,它不是串行运行函数,而是:
(1)任务已经同时提交、并发执行(线程池里跑着)
(2)for future in futures: 只是按顺序取回每个任务的结果
(3)取结果的时候,如果该任务还没执行完,就会阻塞等它完成
(4)实际任务早就并发地在跑了。
3、这个等待是必要的,因为线程执行是异步的,如果不等,有可能主线程就结束了。

总结

元素含义
ThreadPoolExecutor线程池执行器,用于 IO 并发任务
submit(func, arg)提交任务,返回 Future
future.result()阻塞等待任务完成,返回执行结果
max_workers=N控制最多并发线程数量

在pycharm中执行结果如下

/Users/testmanzhang/PycharmProjects/multi_process_launch_viewer.py 
🔄 当前并发数:1
🔄 当前并发数:2
🔄 当前并发数:3
🔄 当前并发数:4
🔄 当前并发数:5
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:1
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:2
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:3
🔄 当前并发数:4
🔄 当前并发数:5
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
...

这些打印实际上是服务器上面shell脚本的执行打印,其实是通过fabric同步到了当前代码执行的IDE中。

http://www.dtcms.com/a/303050.html

相关文章:

  • [尚庭公寓]15-个人中心
  • 力扣-22.括号生成
  • C++初学者4——标准数据类型
  • JavaScript对象与Math对象完全指南
  • 力扣7:整数反转
  • 利用DataStream和TrafficPeak实现大数据可观察性
  • jQuery 最新语法大全详解(2025版)
  • 下载k8s官方组件chart和容器镜像
  • JavaScript中的Promise.all方法详解
  • 坚鹏:AI智能体培训是知行学成为AI智能体创新应用引领者的基础
  • 【归并排序】排序数组(medium)
  • 阿里云【免费试用】Elasticsearch 智能运维 AI 助手
  • 应用信息更新至1.18.0
  • 加法器 以及ALU(逻辑算术单元)
  • 深入解析 Spring 获取 XML 验证模式的过程
  • redis数据库的四种取得 shell方法
  • C++模板进阶:从基础到实战的深度探索
  • python生成 requirement.txt 文件
  • 一个高效的阿里云漏洞库爬虫工具,用于自动化爬取和处理CVE数据
  • ROS2入门之开发环境搭建
  • AI-调查研究-40-多模态大模型量化 格局重塑:五大开源模型横评与技术对比
  • Navicat 17 教程:Windows 和 Mac 系统适用
  • 【运维】Smartctl安装及使用指南
  • Python爬虫实战:快速采集教育政策数据(附官网工具库API)
  • 设计模式实战:自定义SpringIOC(亲手实践)
  • 常见依赖于TCP/IP的应用层协议
  • Taro 网络请求相关 API 全面解析
  • 初识opencv05——图像预处理4
  • 【Linux系统】Ext2文件系统 | 软硬链接
  • 接口测试核心概念与实践指南