基于Patroni实现PostgreSQL数据库高可用
一、环境准备
系统架构
主机名 | IP | 系统 | 部署应用 |
---|---|---|---|
pg1 | 11.0.1.4 | CentOS7 | Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18 |
pg2 | 11.0.1.5 | CentOS7 | Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18 |
pg3 | 11.0.1.6 | CentOS7 | Python3.13.5、Openssl、Etcd3.6.2、Patroni4.0.6、PGSQL14.18 |
demo1 | 11.0.1.7 | CentOS7 | HAproxy2.9.6 |
配置本地yum源
将CentOS-7-x86_64-Everything-2207-02.iso镜像上传到服务器(我是VMware虚拟机,镜像文件是在/dev/sr0里),挂载到/media并配置yum源
mkdir /mnt/cdrom
mount /dev/sr0 /mnt/cdrom
sudo tee /etc/yum.repos.d/local.repo <<EOF
[local]
name=CentOS-7 Local DVD Repo
baseurl=file:///mnt/cdrom
enabled=1
gpgcheck=0
EOF# 重新建立缓存
rm -rf /etc/yum.repos.d/CentOS-*
sudo yum clean all
sudo yum makecache# 查看可用的yum源
sudo yum repolist# 安装常用命令
yum install -y vim wget
修改/etc/fstab
,在末尾加入下面这行,让镜像开机自动挂载
/dev/sr0 /mnt/cdrom iso9660 defaults 0 0
关闭防火墙
# 停止防火墙服务
sudo systemctl stop firewalld# 禁止防火墙开机自启
sudo systemctl disable firewalld# 验证状态(active应为inactive)
sudo systemctl status firewalld
关闭SELinux
sudo sed -i 's/^SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config && sudo setenforce 0
下载好所需安装包
# openssl
wget https://github.com/openssl/openssl/releases/download/openssl-3.5.1/openssl-3.5.1.tar.gz
# Python
wget https://www.python.org/ftp/python/3.13.5/Python-3.13.5.tgz
# etcd
wget https://github.com/etcd-io/etcd/releases/download/v3.6.2/etcd-v3.6.2-linux-amd64.tar.gz
# postgresql
wget https://ftp.postgresql.org/pub/source/v14.18/postgresql-14.18.tar.gz
# haproxy
wget https://www.haproxy.org/download/2.9/src/haproxy-2.9.6.tar.gz
二、安装openssl
系统自带的openssh版本太低,所以要自己下载高版本的安装
编译安装
# 解压
tar -zxf openssl-3.5.1.tar.gz
cd openssl-3.5.1# 首先确保系统已安装必要的编译工具
yum install -y perl-IPC-Cmd perl-Data-Dumper gcc make perl wget zlib-devel# 配置(使用默认lib目录,避免lib64路径问题)
./config --prefix=/usr/local/openssl --openssldir=/usr/local/openssl \shared zlib no-ssl3-method enable-tls1_3 \--libdir=lib # 强制使用lib目录而非lib64make -j $(nproc) && make install
配置动态链接库
# 创建动态链接库配置文件
echo "/usr/local/openssl/lib" | sudo tee /etc/ld.so.conf.d/openssl.conf# 刷新动态链接库缓存
sudo ldconfig# 验证配置
ldconfig -p | grep openssl
# 应输出类似:
# libssl.so.3 (libc6,x86-64) => /usr/local/openssl/lib/libssl.so.3
# libcrypto.so.3 (libc6,x86-64) => /usr/local/openssl/lib/libcrypto.so.3
声明环境变量
# 添加到 ~/.bashrc 或 /etc/profile.d/openssl.sh
echo 'export PATH="/usr/local/openssl/bin:$PATH"' | sudo tee -a /etc/profile.d/openssl.sh
echo 'export LD_LIBRARY_PATH="/usr/local/openssl/lib:$LD_LIBRARY_PATH"' | sudo tee -a /etc/profile.d/openssl.sh# 立即生效
source /etc/profile
检查一下版本
openssl version
# 输出应类似:
# OpenSSL 3.5.1 1 Jul 2025 (Library: OpenSSL 3.5.1 1 Jul 2025)
三、编译安装Python3.13
服务器自带的python版本为2.7,比较老旧,无法满足程序要求
编译安装
# 解压
tar -zxf Python-3.13.5.tgz
cd Python-3.13.5
# 安装必要的开发工具
yum install -y zlib-devel bzip2-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel wget
# 编译安装
./configure --prefix=/usr/local/python3.13 --with-openssl=/usr/local/openssl --enable-shared
make -j $(nproc) && make altinstall
配置动态链接库路径
将 Python 的库目录添加到系统动态链接库配置中,防止服务器重启后python运行有问题:
# 创建Python库路径配置文件
sudo tee /etc/ld.so.conf.d/python313.conf <<EOF
/usr/local/python3.13/lib
EOF# 刷新动态链接库缓存
sudo ldconfig
配置环境变量
echo 'export PATH="/usr/local/python3.13/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
四、安装ETCD
解压安装
tar -zxf etcd-v3.6.2-linux-amd64.tar.gz
cd etcd-v3.6.2-linux-amd64
cp etcd etcdctl /usr/local/bin/
创建etcd配置文件
mkdir /etc/etcdcat > /etc/etcd/etcd.conf << EOF
#[Member]
ETCD_NAME=$(hostname)
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"
ETCD_LISTEN_PEER_URLS="http://0.0.0.0:12380"
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:12379"#[Clustering]
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://$(hostname -i):12380"
ETCD_ADVERTISE_CLIENT_URLS="http://$(hostname -i):12379"
ETCD_INITIAL_CLUSTER="etcd1=http://11.0.1.4:12380,etcd2=http://11.0.1.5:12380,etcd3=http://11.0.1.6:12380"
ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
ETCD_INITIAL_CLUSTER_STATE="new"
EOF
创建systemd服务文件
cat > /etc/systemd/system/etcd.service << EOF
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target[Service]
Type=notify
EnvironmentFile=/etc/etcd/etcd.conf
ExecStart=/usr/local/bin/etcd
Restart=on-failure
RestartSec=5
LimitNOFILE=65536[Install]
WantedBy=multi-user.target
EOF
启动etcd服务
systemctl daemon-reload
systemctl enable etcd
systemctl start etcd
验证etcd集群状态
etcdctl --endpoints=http://11.0.1.4:12379,http://11.0.1.5:12379,http://11.0.1.6:12379 member list
五、编译安装pgsql
编译安装
# 解压
tar -zxf postgresql-14.18.tar.gz
cd postgresql-14.18
# 安装开发工具
yum -y groupinstall "Development Tools" "Legacy UNIX Compatibility"
yum -y install bison flex readline* zlib-devel gcc* make
# 编译安装
./configure --prefix=/usr/local/pgsqlmake && make install
创建用户和目录
useradd postgres
mkdir -p /data/pgsql
chown -R postgres:postgres /data/pgsql
chown -R postgres:postgres /usr/local/pgsql
配置环境变量
cat >> /home/postgres/.bashrc << EOF
export PATH=/usr/local/pgsql/bin:$PATH
export LD_LIBRARY_PATH=/usr/local/pgsql/lib:$LD_LIBRARY_PATH
export PGDATA=/data/pgsql
EOF
source /home/postgres/.bashrc
初始化
初始化数据库(仅在主节点11.0.1.4上执行)
su - postgres -c "initdb -D /data/pgsql/data"
六、安装Patroni
在所有三个节点上安装Patroni:
联网和离线两种方式二选一即可
联网安装Patroni和依赖
pip3.13 install patroni[etcd] psycopg2-binary etcd3
- 如果你使用 etcd 3.x 及以上版本(如你的场景中是 etcd 3.6.2):
强烈推荐使用etcd3
,原因如下:- 你的 etcd 版本是 3.6.2,原生支持 v3 API,
etcd3
是专门为 v3 设计的客户端,能充分利用其功能(如分布式锁、高效的键值存储等)。 python-etcd
仅支持 v2 API,而 etcd 3.x 中 v2 API 可能被默认禁用或限制,可能导致兼容性问题(例如你之前遇到的 404 错误,可能与 v2 API 路径访问失败有关)。
- 你的 etcd 版本是 3.6.2,原生支持 v3 API,
- 如果你必须兼容 etcd 2.x 版本:
只能选择python-etcd
,而不是etcd3
但需注意其已停止维护,可能存在安全隐患或功能缺失。
离线安装Patroni和依赖
在可以联网的环境中,使用 pip download
命令下载 Patroni 及其所有依赖:
# 创建下载目录
mkdir patroni_dependencies && cd patroni_dependencies# 下载Patroni及其依赖(默认下载最新版本)
pip3.13 download patroni[etcd] psycopg2-binary etcd3 setuptools wheel
将下载的所有 .whl
和 .tar.gz
文件复制到离线服务器的某个目录(例如 /tmp/patroni_deps
)。
在离线环境中,使用 pip install
从本地文件安装:
# 安装所有下载的包(按依赖顺序)
pip3.13 install --no-index --find-links=. ./*# 检查Patroni版本
patroni --version# 检查是否能正常导入
python3.13 -c "from patroni.version import __version__; print(__version__)"
# 正常应该如下输出:
4.0.6
创建Patroni配置目录
mkdir -p /etc/patroni
chown -R postgres:postgres /etc/patroni
创建Patroni配置文件(所有节点)
cat > /etc/patroni/patroni.yml << EOF
scope: pg-cluster
namespace: /postgres/
name: $(hostname)restapi:listen: 0.0.0.0:8008connect_address: $(hostname -i):8008etcd3:hosts:- 11.0.1.4:12379- 11.0.1.5:12379- 11.0.1.6:12379protocol: http# 可选:添加认证信息(如果Etcd启用了认证)# username: user# password: password
bootstrap:dcs:ttl: 30loop_wait: 10retry_timeout: 10maximum_lag_on_failover: 1048576postgresql:use_pg_rewind: trueuse_slots: trueparameters:wal_level: replicahot_standby: "on"max_wal_senders: 10max_replication_slots: 10wal_keep_segments: 32listen_addresses: '*'port: 15432archive_mode: "on"archive_command: 'test ! -f /data/pgsql/archive/%f && cp %p /data/pgsql/archive/%f'initdb:- encoding: UTF8- data-checksumspostgresql:listen: 0.0.0.0:15432connect_address: $(hostname -i):15432data_dir: /data/pgsql/databin_dir: /usr/local/pgsql/binpgpass: /tmp/pgpassauthentication:replication:username: replicatorpassword: rep-passsuperuser:username: postgrespassword: postgresparameters:unix_socket_directories: '/tmp'tags:nofailover: falsenoloadbalance: falseclonefrom: falsenosync: false
EOF
创建归档目录
mkdir -p /data/pgsql/archive
chown -R postgres:postgres /data/pgsql/archive
创建systemd服务文件
cat > /etc/systemd/system/patroni.service << EOF
[Unit]
Description=Patroni high-availability PostgreSQL
After=syslog.target network.target etcd.service
Requires=etcd.service[Service]
Type=simple
User=postgres
Group=postgres
ExecStart=/usr/local/python3.13/bin/patroni /etc/patroni/patroni.yml
ExecReload=/bin/kill -HUP $MAINPID
KillMode=process
Restart=always
RestartSec=10
LimitNOFILE=65536[Install]
WantedBy=multi-user.target
EOF
启动Patroni服务(仅在主节点11.0.1.4上执行)
systemctl daemon-reload
systemctl enable patroni
systemctl start patroni
验证Patroni集群
在主节点启动Patroni后,验证集群状态:
patronictl -c /etc/patroni/patroni.yml list
输出应显示类似以下内容:
+ Cluster: pg-cluster (7528443352098796254) -+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+--------+---------+----+-----------+
| pg1 | 11.0.1.4:15432 | Leader | running | 2 | |
+--------+----------------+--------+---------+----+-----------+
创建数据库角色并修改postgres用户的密码:
[root@pg1 ~]# su - postgres -c 'psql -p 15432'
psql (17.5)
Type "help" for help.postgres=# CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'rep-pass'; #这里的用户名和密码要与patroni.yml里保持一致
CREATE ROLE
postgres=# ALTER USER postgres PASSWORD 'postgres'; # 这里的postgres用户的密码要与patroni.yml里保持一致
ALTER ROLE
postgres=# \q
修改/data/pgsql/data/pg_hba.conf,添加下面四行内容,允许3台服务器有权限进行主从复制:
# 允许客户机连接数据库
host all postgres 11.0.1.1/32 md5
# 允许互相之间使用postgres用户互相访问
host all postgres 11.0.1.4/32 md5
host all postgres 11.0.1.5/32 md5
host all postgres 11.0.1.6/32 md5
# 允许HAproxy服务器使用postgres用户进行健康检查
host all postgres 11.0.1.7/32 md5
# 允许互相之间使用replicator用户进行主从同步数据
host replication replicator 11.0.1.4/32 md5
host replication replicator 11.0.1.5/32 md5
host replication replicator 11.0.1.6/32 md5
修改完成后,要重启pg数据库,不然两个从节点是无法连接到主库进行主从同步的。重启数据库直接重启patroni即可
systemctl restart patroni
然后在从节点11.0.1.5和11.0.1.6上启动Patroni服务:
systemctl daemon-reload
systemctl enable patroni
systemctl start patroni
再次验证集群状态:
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1 | 11.0.1.4:15432 | Leader | running | 3 | |
| pg2 | 11.0.1.5:15432 | Replica | streaming | 3 | 0 |
| pg3 | 11.0.1.6:15432 | Replica | running | 3 | 0 |
+--------+----------------+---------+-----------+----+-----------+
到这一步之后,就可以使用Navicate工具逐个连接三台数据库进行读写测试了。
自动选举测试
先将pg1上的patroni停掉,可以看到pg2成为新的leader了:
[root@pg2 ~]# systemctl stop patroni
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1 | 11.0.1.4:15432 | Replica | stopped | | unknown |
| pg2 | 11.0.1.5:15432 | Leader | running | 4 | |
| pg3 | 11.0.1.6:15432 | Replica | streaming | 4 | 0 |
+--------+----------------+---------+-----------+----+-----------+
测试完成后记得把服务拉起来:
[root@pg2 ~]# systemctl restart patroni
[root@pg1 ~]# patronictl -c /etc/patroni/patroni.yml list
+ Cluster: pg-cluster (7528443352098796254) ----+----+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+----------------+---------+-----------+----+-----------+
| pg1 | 11.0.1.4:15432 | Replica | streaming | 4 | 0 |
| pg2 | 11.0.1.5:15432 | Leader | running | 4 | |
| pg3 | 11.0.1.6:15432 | Replica | streaming | 4 | 0 |
+--------+----------------+---------+-----------+----+-----------+
七、安装HAproxy
编译安装
**(HAproxy仅在11.0.1.10上安装)**这里建议使用最新版源码进行编译安装,而不是使用CentOS自带的,自带的版本太低,无法满足需求。
# 安装依赖
sudo yum install -y gcc make pcre-devel openssl-devel systemd-devel
# 编译安装
tar -zxvf haproxy-2.9.6.tar.gz
cd haproxy-2.9.6
# 编译(支持SSL和systemd)
make TARGET=linux-glibc USE_PCRE=1 USE_OPENSSL=1 USE_SYSTEMD=1 USE_CPU_AFFINITY=1# 安装
sudo make install PREFIX=/usr/local/haproxy# 创建软链接以便全局调用
sudo ln -s /usr/local/haproxy/sbin/haproxy /usr/sbin/haproxy
配置系统服务
# 创建systemd服务文件
sudo vim /usr/lib/systemd/system/haproxy.service
添加以下内容:
[Unit]
Description=HAProxy Load Balancer
After=network.target[Service]
ExecStart=/usr/sbin/haproxy -D -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid
ExecReload=/usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy.pid -sf $MAINPID
PIDFile=/run/haproxy.pid
ExecStop=/bin/kill -TERM $MAINPID
Restart=always
User=root [Install]
WantedBy=multi-user.target
修改服务配置文件/etc/haproxy/haproxy.cfg
:
[root@localhost ~]# cat /etc/haproxy/haproxy.cfg
globallog /dev/log local0 infolog /dev/log local1 noticedaemonmaxconn 4096user rootdefaultslog globalmode tcpretries 3timeout connect 5stimeout client 30stimeout server 30soption tcplog# 监控页面
listen statsbind *:8080mode httpstats enablestats uri /haproxy-statsstats auth admin:admin123stats refresh 10s# 写请求(主库,15000端口)
frontend pg_writebind *:15000mode tcpdefault_backend pg_primary# 读请求(从库,15001端口)
frontend pg_readbind *:15001mode tcpdefault_backend pg_replicas# 主库后端(仅显示primary角色节点)
backend pg_primarymode tcpbalance roundrobin# 健康检查:调用Patroni的/role接口,匹配"primary"角色option httpchk GET /rolehttp-check send meth GET uri /role# 精确匹配Patroni返回的角色(注意:Patroni返回可能带引号,如"primary",需测试调整)http-check expect string '"role": "primary"'# 节点配置:保持原有server,但健康检查会自动过滤非primary节点server pg1 11.0.1.4:15432 check port 8008 inter 2s rise 2 fall 3server pg2 11.0.1.5:15432 check port 8008 inter 2s rise 2 fall 3server pg3 11.0.1.6:15432 check port 8008 inter 2s rise 2 fall 3# 从库后端(仅显示replica角色节点)
backend pg_replicasmode tcpbalance leastconn# 健康检查:调用Patroni的/role接口,匹配"replica"角色option httpchk GET /rolehttp-check send meth GET uri /rolehttp-check expect string '"role": "replica"'# 节点配置:保持原有server,健康检查自动过滤非replica节点server pg1 11.0.1.4:15432 check port 8008 inter 2s rise 2 fall 3server pg2 11.0.1.5:15432 check port 8008 inter 2s rise 2 fall 3server pg3 11.0.1.6:15432 check port 8008 inter 2s rise 2 fall 3
验证安装
# 重载系统服务
sudo systemctl daemon-reload# 启动并设置开机自启
sudo systemctl start haproxy
sudo systemctl enable haproxy# 检查版本和状态
haproxy -v
sudo systemctl status haproxy
测试读写分离
我们上面HAproxy配置文件定义的主库可写,从库只读;主库端口是15000,从库端口15001。我们使用Navicate分别连接11.0.1.10:15000(写)和11.0.1.10:15001(读)。
在15000上新建一张表,进行写测试:
插入数据正常,使用Ctrl+S快捷键可以正常保存数据。
在15001上打开上面这张表,继续插入数据:
使用Ctrl+S快捷键尝试保存数据时,提示“ERROR: cannot execute INSERT in a read-only transaction”。
八、测试主从切换
写一个Python脚本haproxy_db_test
,对读写进行测试:
import psycopg2
import time
import random
from psycopg2 import OperationalError, InterfaceError, Error# 数据库配置 - 已更新为你的实际信息
CONFIG = {"write_host": "11.0.1.7", # 你的HAProxy服务器IP"write_port": 15000, # 写端口"read_host": "11.0.1.7", # 你的HAProxy服务器IP"read_port": 15001, # 读端口"user": "postgres","password": "postgres", # 你的数据库密码"dbname": "postgres","retry_attempts": 10, # 最大重试次数"retry_delay": 2, # 重试间隔(秒)"test_interval": 5, # 测试间隔(秒)"sync_retries": 3, # 读取重试次数"sync_delay": 2 # 读取重试间隔(秒)
}def get_connection(host, port):"""创建数据库连接并获取实际节点信息"""attempts = 0while attempts < CONFIG["retry_attempts"]:try:conn = psycopg2.connect(host=host,port=port,user=CONFIG["user"],password=CONFIG["password"],dbname=CONFIG["dbname"],connect_timeout=5)conn.autocommit = True # 自动提交(对写连接有效,读连接不影响)# 获取实际数据库节点IP和端口with conn.cursor() as cur:cur.execute("""SELECT inet_server_addr() AS server_ip,inet_server_port() AS server_port""")result = cur.fetchone()actual_host = result[0]actual_port = result[1] # 正确索引获取端口print(f"✅ 成功连接到 {host}:{port} (实际节点: {actual_host}:{actual_port})")return conn, actual_host, actual_portexcept (OperationalError, InterfaceError) as e:attempts += 1print(f"❌ 连接 {host}:{port} 失败 (尝试 {attempts}/{CONFIG['retry_attempts']}): {str(e)}")if attempts < CONFIG["retry_attempts"]:time.sleep(CONFIG["retry_delay"])print(f"❌ 达到最大重试次数,无法连接到 {host}:{port}")return None, None, Nonedef update_table_schema(conn):"""检查并更新表结构(仅在写连接中执行)"""try:with conn.cursor() as cur:cur.execute("""SELECT column_nameFROM information_schema.columnsWHERE table_name = 'ha_test'""")existing_columns = [row[0] for row in cur.fetchall()]required_columns = [('write_proxy_host', 'TEXT'),('write_proxy_port', 'INT'),('actual_write_host', 'TEXT'),('actual_write_port', 'INT'),('read_info', 'TEXT'), # 存储读库信息(JSON格式)('read_time', 'TIMESTAMPTZ')]for col_name, col_type in required_columns:if col_name not in existing_columns:cur.execute(f"ALTER TABLE ha_test ADD COLUMN {col_name} {col_type}")print(f"📝 已添加缺失字段: {col_name}")print("✅ 表结构检查和更新完成")return Trueexcept Exception as e:print(f"❌ 表结构更新失败: {str(e)}")return Falsedef init_test_table(conn):"""初始化测试表(仅在写连接中执行)"""try:with conn.cursor() as cur:cur.execute("""CREATE TABLE IF NOT EXISTS ha_test(idSERIALPRIMARYKEY,write_timeTIMESTAMPTZ,write_proxy_hostTEXT,write_proxy_portINT,actual_write_hostTEXT,actual_write_portINT,read_infoTEXT,#存储读库信息:{"proxy_host":"...","actual_host":"...",...}read_timeTIMESTAMPTZ)""")print("📋 测试表初始化完成")return Trueexcept Exception as e:print(f"❌ 测试表初始化失败: {str(e)}")return Falsedef test_write(conn, actual_write_host, actual_write_port):"""测试写操作并记录主库地址"""try:with conn.cursor() as cur:test_id = random.randint(1, 1000000)cur.execute("""INSERT INTO ha_test (id, write_time, write_proxy_host, write_proxy_port,actual_write_host, actual_write_port)VALUES (%s, NOW(), %s, %s, %s, %s) RETURNING id""", (test_id,CONFIG["write_host"], CONFIG["write_port"],actual_write_host, actual_write_port))inserted_id = cur.fetchone()[0]print(f"✍️ 成功写入数据,ID: {inserted_id} (主库: {actual_write_host}:{actual_write_port})")return test_idexcept Exception as e:print(f"❌ 写操作失败: {str(e)}")return Nonedef test_read(read_conn, test_id, actual_read_host, actual_read_port):"""纯读操作(不包含任何写操作)"""if not test_id:return Falsefor attempt in range(CONFIG["sync_retries"]):try:with read_conn.cursor() as cur:# 仅执行查询,不做任何更新cur.execute("SELECT id FROM ha_test WHERE id = %s", (test_id,))result = cur.fetchone()if result:print(f"📖 成功读取数据,ID: {result[0]} (读库: {actual_read_host}:{actual_read_port}, 尝试 {attempt + 1}/{CONFIG['sync_retries']})")return Trueelse:if attempt < CONFIG["sync_retries"] - 1:print(f"⌛ 数据尚未同步,等待 {CONFIG['sync_delay']} 秒后重试 (尝试 {attempt + 1}/{CONFIG['sync_retries']})")time.sleep(CONFIG["sync_delay"])except Error as e:print(f"⚠️ 读取尝试 {attempt + 1} 失败: {str(e)}")if attempt < CONFIG["sync_retries"] - 1:time.sleep(CONFIG["sync_delay"])print(f"❌ 所有读取尝试失败,未找到ID为 {test_id} 的数据 (读库: {actual_read_host}:{actual_read_port})")return Falsedef update_read_info(write_conn, test_id, actual_read_host, actual_read_port):"""通过写连接更新读库信息(仅主库执行)"""if not test_id or not write_conn:returntry:with write_conn.cursor() as cur:# 构造读库信息JSONread_info = f"""{{"read_proxy_host": "{CONFIG['read_host']}","read_proxy_port": {CONFIG['read_port']},"actual_read_host": "{actual_read_host}","actual_read_port": {actual_read_port}}}"""cur.execute("""UPDATE ha_testSET read_time = NOW(),read_info = %sWHERE id = %s""", (read_info, test_id))print(f"📝 已通过主库更新读库信息,ID: {test_id}")except Exception as e:print(f"❌ 更新读库信息失败: {str(e)}")def main():# 初始化连接和表结构(通过写连接执行)init_conn, _, _ = get_connection(CONFIG["write_host"], CONFIG["write_port"])if init_conn:init_test_table(init_conn)update_table_schema(init_conn)init_conn.close()write_conn = Noneread_conn = Noneactual_write_host = Noneactual_write_port = Noneactual_read_host = Noneactual_read_port = Nonetry:while True:# 维护连接if not write_conn or write_conn.closed:write_conn, actual_write_host, actual_write_port = get_connection(CONFIG["write_host"], CONFIG["write_port"])if not read_conn or read_conn.closed:read_conn, actual_read_host, actual_read_port = get_connection(CONFIG["read_host"], CONFIG["read_port"])# 执行测试if write_conn and read_conn and actual_write_host and actual_read_host:test_id = test_write(write_conn, actual_write_host, actual_write_port)if test_id:time.sleep(1)# 纯读操作(从库执行,无写操作)read_success = test_read(read_conn, test_id, actual_read_host, actual_read_port)# 通过主库更新读库信息(写操作仅在主库执行)if read_success:update_read_info(write_conn, test_id, actual_read_host, actual_read_port)time.sleep(CONFIG["test_interval"])except KeyboardInterrupt:print("\n⏹️ 用户终止程序")except Exception as e:print(f"\n💥 发生意外错误: {str(e)}")finally:# 清理连接if write_conn and not write_conn.closed:write_conn.close()if read_conn and not read_conn.closed:read_conn.close()print("🔚 程序结束")if __name__ == "__main__":main()