广州做网站制作公司网络营销的整体概念
本文介绍通过sshtunnel类库建立SSH隧道,使用paramiko通过SSH来访问数据库。
实现了两种建立SSH方式:公私钥验证、密码验证。
公私钥可读本地,也可读取Aws S3上的私钥文件。
本质上就是在本机建立SSH隧道,然后将访问DB转发到本地SSH内去访问数据库。
简单易懂,上代码:
from sshtunnel import SSHTunnelForwarder
from sqlalchemy import create_engine, text
import paramiko
import io
import socket#### 都换成你自己的
# SSH配置
ssh_host = '' #主机
ssh_port = 22 #端口
ssh_user = 'ec2-user' #用户名
ssh_password = '' #密码(如果是密码验证)
ssh_key_path = r'C:\Users\Desktop\test_primi.pem' #私钥本地地址(如果是公私钥验证)
# 数据库配置
database_user = '' #用户名
database_password = '' #密码
database_name = '' #数据库名
database_host = '' #主机
database_port = 3306 #端口号def get_available_port():"""获取可用的本地端口。"""sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:sock.bind(('127.0.0.1', 0))port = sock.getsockname()[1]finally:sock.close()return porttry:#读取S3上的私钥# s3_client = get_s3_client()# response = s3_client.get_object(Bucket='桶名称', Key='私钥在S3上的key')# private_key_content = response['Body'].read().decode('utf-8')#private_key = paramiko.RSAKey.from_private_key(io.StringIO(private_key_content))#读取本地私钥private_key = paramiko.RSAKey.from_private_key_file(ssh_key_path)local_port = get_available_port()# 创建 SSH 隧道tunnel= SSHTunnelForwarder((ssh_host, ssh_port),ssh_username=ssh_user,ssh_pkey=private_key,#ssh_password=ssh_password,remote_bind_address=(database_host, database_port),local_bind_address=('127.0.0.1', local_port),host_pkey_directories=[])try:# 启动 SSH 隧道tunnel.start()# 连接数据库engine = create_engine(f'mysql+pymysql://{database_user}:{database_password}@127.0.0.1:{local_port}/{database_name}')# 测试数据库连接with engine.connect() as connection:result = connection.execute(text("SELECT count(*) from activity_logs"))for row in result:print(row)finally:# 关闭 SSH 隧道tunnel.stop()
except Exception as e:print(f"出现错误: {e}")
执行结果:

注意:host_pkey_directories=[] 的意思是 不要在指定的目录中寻找密钥,如果没有将出现如下错误,但不影响程序正常执行。
可以先在本地用Navicat之类的客户端测好了,sshtunnel应该是三种验证方法都支持的,源码如下