【MySQL体系】第8篇:MySQL集群架构技术详解
文章目录
- 前言
- 第1节 集群架构设计
- 1.1 架构设计理念
- 核心设计原则
- 分布式数据库系统的设计挑战
- CAP理论在MySQL集群中的应用
- 性能与可靠性平衡点
- 1.2 可用性设计
- 高可用性(HA)实现方案
- 故障检测与自动恢复机制
- 心跳检测和健康检查
- RTO和RPO指标
- 1.3 扩展性设计
- 水平扩展和垂直扩展
- 分片(Sharding)技术实现
- 读写分离扩展方案
- 数据重平衡策略
- 1.4 一致性设计
- ACID特性在集群环境中的实现
- 分布式事务处理机制
- 最终一致性和强一致性应用场景
- 主从同步延迟解决方案
- 第2节 主从模式
- 2.1 适用场景
- 读多写少的业务场景
- 报表分析和OLAP应用
- 备份和灾难恢复需求
- 开发测试环境应用
- 2.2 实现原理
- 二进制日志(Binlog)复制机制
- GTID(全局事务标识)工作原理
- 主从同步数据流程
- 复制过滤器使用
- 2.3 并行复制
- 基于组提交的并行复制原理
- 多线程复制实现方式
- WRITESET并行复制技术
- 并行复制性能优化
- 2.4 读写分离
- ProxySQL实现方案
- MySQL Router实现方案
- 应用层读写分离方案
- 负载均衡策略
- 一致性读处理方法
- 第3节 双主模式
- 3.1 适用场景
- 高可用性要求严格的业务
- 多活数据中心部署
- 避免单点故障的系统
- 快速故障转移应用
- 3.2 MMM架构
- Multi-Master Replication Manager详解
- VIP漂移实现原理
- 脑裂问题预防措施
- 监控和管理机制
- 3.3 MHA架构
- Master High Availability实现
- 故障检测和主从切换流程
- 二进制日志补偿机制
- MHA管理工具集
- 3.4 主备切换
- 手动切换和自动切换区别
- 切换过程中数据一致性保证
- 切换后拓扑重构
- 切换演练和应急预案
- 实际配置示例和最佳实践
- 性能测试数据
- 最佳实践建议
- 总结
前言
随着互联网业务的快速发展和数据量的爆炸式增长,单机MySQL已经无法满足高并发、高可用、高扩展性的业务需求。
本文将深入探讨MySQL集群架构的设计原理、实现方案和最佳实践,帮助读者全面掌握MySQL集群技术。
第1节 集群架构设计
1.1 架构设计理念
核心设计原则
MySQL集群架构的设计遵循以下核心原则:
1. 高可用性(High Availability)
- 消除单点故障,确保系统7×24小时不间断服务
- 实现故障自动检测和快速恢复
- 提供多层次的冗余保护机制
2. 可扩展性(Scalability)
- 支持水平扩展和垂直扩展
- 能够根据业务增长动态调整集群规模
- 提供透明的扩容和缩容能力
3. 数据一致性(Consistency)
- 保证分布式环境下的数据完整性
- 提供不同级别的一致性保证
- 平衡一致性与性能的关系
4. 性能优化(Performance)
- 通过读写分离提升查询性能
- 利用分片技术分散负载
- 优化网络通信和数据传输
分布式数据库系统的设计挑战
网络分区问题
# 网络分区检测脚本示例
#!/bin/bash
NODES=("192.168.1.10" "192.168.1.11" "192.168.1.12")for node in "${NODES[@]}"; doif ! ping -c 3 -W 1 $node > /dev/null 2>&1; thenecho "Network partition detected: $node is unreachable"# 触发故障处理逻辑fi
done
数据同步延迟
- 主从复制延迟导致的数据不一致
- 网络抖动对同步性能的影响
- 大事务对复制性能的冲击
故障检测与恢复
- 如何快速准确地检测节点故障
- 自动故障转移的可靠性保证
- 脑裂问题的预防和处理
CAP理论在MySQL集群中的应用
一致性(Consistency)
-- 强一致性读取示例
SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
SELECT balance FROM accounts WHERE user_id = 12345 FOR UPDATE;
-- 业务逻辑处理
UPDATE accounts SET balance = balance - 100 WHERE user_id = 12345;
COMMIT;
可用性(Availability)
# MySQL集群高可用配置示例
mysql_cluster:master:host: 192.168.1.10port: 3306weight: 100slaves:- host: 192.168.1.11port: 3306weight: 50- host: 192.168.1.12port: 3306weight: 50failover:enabled: truetimeout: 30sretry_count: 3
分区容错性(Partition Tolerance)
- MySQL集群通常选择CP(一致性+分区容错性)
- 在网络分区时优先保证数据一致性
- 通过多数派机制避免脑裂问题
性能与可靠性平衡点
读写分离的性能提升
# 读写分离配置示例
class DatabaseRouter:def __init__(self):self.master = "mysql://master:3306/db"self.slaves = ["mysql://slave1:3306/db","mysql://slave2:3306/db"]def get_connection(self, operation_type):if operation_type in ['INSERT', 'UPDATE', 'DELETE']:return self.masterelse:return random.choice(self.slaves)
可靠性保证机制
- 数据多副本存储
- 定期备份和恢复测试
- 监控告警体系建设
1.2 可用性设计
高可用性(HA)实现方案
主从复制架构
-- 主库配置
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
gtid-mode = ON
enforce-gtid-consistency = ON-- 从库配置
[mysqld]
server-id = 2
relay-log = relay-bin
read-only = 1
gtid-mode = ON
enforce-gtid-consistency = ON
双主互备架构
-- 节点1配置
[mysqld]
server-id = 1
log-bin = mysql-bin
auto-increment-increment = 2
auto-increment-offset = 1-- 节点2配置
[mysqld]
server-id = 2
log-bin = mysql-bin
auto-increment-increment = 2
auto-increment-offset = 2
故障检测与自动恢复机制
故障检测策略
#!/bin/bash
# MySQL健康检查脚本
check_mysql_health() {local host=$1local port=$2local user=$3local password=$4# 连接性检查if ! mysqladmin -h$host -P$port -u$user -p$password ping > /dev/null 2>&1; thenreturn 1fi# 复制状态检查local slave_status=$(mysql -h$host -P$port -u$user -p$password -e "SHOW SLAVE STATUS\G" 2>/dev/null)if [[ $slave_status == *"Slave_IO_Running: No"* ]] || [[ $slave_status == *"Slave_SQL_Running: No"* ]]; thenreturn 2fireturn 0
}
自动恢复流程
class AutoFailover:def __init__(self, cluster_config):self.cluster = cluster_configself.check_interval = 10 # 秒def monitor_cluster(self):while True:for node in self.cluster.nodes:if not self.health_check(node):self.handle_node_failure(node)time.sleep(self.check_interval)def handle_node_failure(self, failed_node):if failed_node.role == 'master':self.promote_slave_to_master()elif failed_node.role == 'slave':self.remove_failed_slave(failed_node)
心跳检测和健康检查
心跳检测实现
-- 创建心跳表
CREATE TABLE heartbeat (id INT PRIMARY KEY,server_id INT NOT NULL,timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,INDEX idx_server_timestamp (server_id, timestamp)
);-- 心跳更新
INSERT INTO heartbeat (id, server_id) VALUES (1, @@server_id)
ON DUPLICATE KEY UPDATE timestamp = CURRENT_TIMESTAMP;
健康检查指标
class HealthChecker:def __init__(self, connection):self.conn = connectiondef check_replication_lag(self):"""检查复制延迟"""result = self.conn.execute("SHOW SLAVE STATUS")if result:return result['Seconds_Behind_Master']return Nonedef check_connection_count(self):"""检查连接数"""result = self.conn.execute("SHOW STATUS LIKE 'Threads_connected'")return int(result['Value'])def check_disk_space(self):"""检查磁盘空间"""result = self.conn.execute("SHOW VARIABLES LIKE 'datadir'")datadir = result['Value']return shutil.disk_usage(datadir).free
RTO和RPO指标
RTO(恢复时间目标)优化
# MHA配置示例
[server default]
manager_workdir=/var/log/masterha/app1
manager_log=/var/log/masterha/app1/manager.log
remote_workdir=/var/log/masterha/app1
ssh_user=root
repl_user=repl
repl_password=password
ping_interval=3
shutdown_script=/script/masterha/power_manager
master_ip_failover_script=/script/masterha/master_ip_failover[server1]
hostname=192.168.1.10
candidate_master=1
check_repl_delay=0[server2]
hostname=192.168.1.11
candidate_master=1
check_repl_delay=0
RPO(恢复点目标)保证
-- 半同步复制配置
-- 主库
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';
SET GLOBAL rpl_semi_sync_master_enabled = 1;
SET GLOBAL rpl_semi_sync_master_timeout = 1000;-- 从库
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';
SET GLOBAL rpl_semi_sync_slave_enabled = 1;
1.3 扩展性设计
水平扩展和垂直扩展
垂直扩展(Scale Up)
-- 硬件资源监控
SELECT VARIABLE_NAME,VARIABLE_VALUE
FROM INFORMATION_SCHEMA.GLOBAL_STATUS
WHERE VARIABLE_NAME IN ('Threads_connected','Threads_running','Innodb_buffer_pool_pages_free','Innodb_buffer_pool_pages_total'
);
水平扩展(Scale Out)
# 分片路由示例
class ShardRouter:def __init__(self, shard_config):self.shards = shard_configdef get_shard(self, user_id):"""根据用户ID进行分片路由"""shard_key = user_id % len(self.shards)return self.shards[shard_key]def get_all_shards(self):"""获取所有分片,用于聚合查询"""return self.shards
分片(Sharding)技术实现
水平分片策略
-- 按用户ID分片
CREATE TABLE users_shard_0 (user_id BIGINT PRIMARY KEY,username VARCHAR(50),email VARCHAR(100),created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;CREATE TABLE users_shard_1 (user_id BIGINT PRIMARY KEY,username VARCHAR(50),email VARCHAR(100),created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
分片中间件配置
# MyCAT分片配置示例
schema:name: "testdb"checkSQLschema: falsesqlMaxLimit: 100table:- name: "users"dataNode: "dn1,dn2,dn3,dn4"rule: "mod-long"dataNode:- name: "dn1"dataHost: "localhost1"database: "db1"- name: "dn2"dataHost: "localhost1"database: "db2"
读写分离扩展方案
ProxySQL配置
-- ProxySQL配置
INSERT INTO mysql_servers(hostgroup_id, hostname, port, weight) VALUES
(0, '192.168.1.10', 3306, 1000), -- 写组
(1, '192.168.1.11', 3306, 900), -- 读组
(1, '192.168.1.12', 3306, 900); -- 读组INSERT INTO mysql_query_rules(rule_id, active, match_pattern, destination_hostgroup, apply) VALUES
(1, 1, '^SELECT.*', 1, 1),
(2, 1, '^INSERT|UPDATE|DELETE.*', 0, 1);
应用层读写分离
@Service
public class UserService {@Autowired@Qualifier("masterDataSource")private DataSource masterDataSource;@Autowired@Qualifier("slaveDataSource")private DataSource slaveDataSource;@ReadOnlypublic User getUserById(Long id) {// 使用从库查询return userRepository.findById(id);}@Transactionalpublic void updateUser(User user) {// 使用主库更新userRepository.save(user);}
}
数据重平衡策略
在线数据迁移
class DataRebalancer:def __init__(self, source_shard, target_shard):self.source = source_shardself.target = target_sharddef migrate_data(self, table_name, batch_size=1000):"""在线数据迁移"""offset = 0while True:# 分批读取数据data = self.source.execute(f"""SELECT * FROM {table_name} LIMIT {batch_size} OFFSET {offset}""")if not data:break# 写入目标分片self.target.bulk_insert(table_name, data)# 删除源数据(可选)ids = [row['id'] for row in data]self.source.execute(f"""DELETE FROM {table_name} WHERE id IN ({','.join(map(str, ids))})""")offset += batch_size
1.4 一致性设计
ACID特性在集群环境中的实现
原子性(Atomicity)保证
-- 分布式事务示例
START TRANSACTION;-- 在分片1上执行
INSERT INTO orders_shard_1 (user_id, amount) VALUES (1001, 100.00);-- 在分片2上执行
UPDATE accounts_shard_2 SET balance = balance - 100.00 WHERE user_id = 1001;-- 两阶段提交
PREPARE TRANSACTION 'tx_001';
COMMIT PREPARED 'tx_001';
一致性(Consistency)维护
class ConsistencyChecker:def __init__(self, cluster_nodes):self.nodes = cluster_nodesdef check_data_consistency(self, table_name, key_column):"""检查数据一致性"""checksums = {}for node in self.nodes:checksum = node.execute(f"""SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', *)) AS UNSIGNED)) as checksumFROM {table_name}""")checksums[node.id] = checksum# 比较校验和return len(set(checksums.values())) == 1
分布式事务处理机制
两阶段提交(2PC)实现
class TwoPhaseCommitCoordinator:def __init__(self, participants):self.participants = participantsdef execute_transaction(self, transaction):# 阶段1:准备阶段prepare_results = []for participant in self.participants:result = participant.prepare(transaction)prepare_results.append(result)# 如果所有参与者都准备就绪if all(prepare_results):# 阶段2:提交阶段for participant in self.participants:participant.commit(transaction)return Trueelse:# 回滚事务for participant in self.participants:participant.rollback(transaction)return False
三阶段提交(3PC)优化
class ThreePhaseCommitCoordinator:def execute_transaction(self, transaction):# 阶段1:CanCommitif not self.can_commit_phase(transaction):return False# 阶段2:PreCommitif not self.pre_commit_phase(transaction):self.abort_transaction(transaction)return False# 阶段3:DoCommitreturn self.do_commit_phase(transaction)
最终一致性和强一致性应用场景
最终一致性场景
# 用户行为日志,允许短暂不一致
class UserActivityLogger:def log_activity(self, user_id, activity):# 异步写入多个副本for replica in self.replicas:replica.async_write(user_id, activity)
强一致性场景
# 金融交易,要求强一致性
class PaymentProcessor:def process_payment(self, from_account, to_account, amount):with self.distributed_lock:# 确保所有节点同步完成self.sync_update_balance(from_account, -amount)self.sync_update_balance(to_account, amount)
主从同步延迟解决方案
并行复制优化
-- 启用并行复制
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL slave_parallel_workers = 8;
SET GLOBAL slave_preserve_commit_order = 1;
读写分离延迟处理
@Service
public class ConsistentReadService {public User getUserWithConsistency(Long userId) {// 先尝试从主库读取最新数据User user = masterRepository.findById(userId);// 如果需要强一致性,直接返回主库数据if (requireStrongConsistency()) {return user;}// 否则可以从从库读取return slaveRepository.findById(userId);}
}
第2节 主从模式
2.1 适用场景
读多写少的业务场景
电商商品展示系统
-- 商品信息查询(读操作,占比90%)
SELECT p.*, c.name as category_name, b.name as brand_name
FROM products p
JOIN categories c ON p.category_id = c.id
JOIN brands b ON p.brand_id = b.id
WHERE p.status = 'active'
ORDER BY p.sales_count DESC
LIMIT 20;-- 商品信息更新(写操作,占比10%)
UPDATE products
SET stock_count = stock_count - 1, sales_count = sales_count + 1
WHERE id = 12345;
内容管理系统
class CMSService:def get_articles(self, category_id, page=1, size=10):"""文章列表查询 - 使用从库"""return self.slave_db.query("""SELECT id, title, summary, author, publish_timeFROM articles WHERE category_id = %s AND status = 'published'ORDER BY publish_time DESCLIMIT %s OFFSET %s""", [category_id, size, (page-1)*size])def publish_article(self, article_data):"""发布文章 - 使用主库"""return self.master_db.execute("""INSERT INTO articles (title, content, author_id, category_id, status)VALUES (%s, %s, %s, %s, 'published')""", article_data)
报表分析和OLAP应用
业务报表查询
-- 销售报表查询(在从库执行,避免影响主库性能)
SELECT DATE(order_time) as order_date,COUNT(*) as order_count,SUM(total_amount) as total_sales,AVG(total_amount) as avg_order_value
FROM orders
WHERE order_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)AND status = 'completed'
GROUP BY DATE(order_time)
ORDER BY order_date DESC;
数据仓库ETL
class ETLProcessor:def __init__(self):self.oltp_slave = MySQLConnection('slave_host')self.olap_warehouse = MySQLConnection('warehouse_host')def extract_daily_sales(self, date):"""从OLTP从库提取数据"""return self.oltp_slave.query("""SELECT product_id,SUM(quantity) as total_quantity,SUM(amount) as total_amountFROM order_items oiJOIN orders o ON oi.order_id = o.idWHERE DATE(o.order_time) = %sGROUP BY product_id""", [date])def load_to_warehouse(self, data):"""加载到数据仓库"""self.olap_warehouse.bulk_insert('daily_sales_fact', data)
备份和灾难恢复需求
实时备份策略
#!/bin/bash
# 从库备份脚本
BACKUP_DIR="/backup/mysql/$(date +%Y%m%d)"
SLAVE_HOST="192.168.1.11"
SLAVE_USER="backup"
SLAVE_PASS="backup_password"# 创建备份目录
mkdir -p $BACKUP_DIR# 使用从库进行备份,不影响主库性能
mysqldump -h$SLAVE_HOST -u$SLAVE_USER -p$SLAVE_PASS \--single-transaction \--routines \--triggers \--all-databases > $BACKUP_DIR/full_backup.sql# 压缩备份文件
gzip $BACKUP_DIR/full_backup.sql# 上传到远程存储
aws s3 cp $BACKUP_DIR/full_backup.sql.gz s3://mysql-backups/$(date +%Y%m%d)/
灾难恢复演练
class DisasterRecoveryManager:def __init__(self):self.master = MySQLConnection('master_host')self.slave = MySQLConnection('slave_host')self.backup_storage = S3Storage('mysql-backups')def simulate_master_failure(self):"""模拟主库故障"""print("Simulating master failure...")# 1. 停止主库写入self.master.execute("SET GLOBAL read_only = 1")# 2. 等待从库同步完成self.wait_for_slave_sync()# 3. 提升从库为主库self.promote_slave_to_master()# 4. 验证数据完整性self.verify_data_integrity()def wait_for_slave_sync(self):"""等待从库同步完成"""while True:status = self.slave.query("SHOW SLAVE STATUS")[0]if status['Seconds_Behind_Master'] == 0:breaktime.sleep(1)
开发测试环境应用
环境隔离配置
# Docker Compose配置
version: '3.8'
services:mysql-master:image: mysql:8.0environment:MYSQL_ROOT_PASSWORD: root_passwordMYSQL_REPLICATION_USER: replMYSQL_REPLICATION_PASSWORD: repl_passwordvolumes:- ./master.cnf:/etc/mysql/conf.d/master.cnfports:- "3306:3306"mysql-slave-dev:image: mysql:8.0environment:MYSQL_ROOT_PASSWORD: root_passwordvolumes:- ./slave.cnf:/etc/mysql/conf.d/slave.cnfports:- "3307:3306"depends_on:- mysql-mastermysql-slave-test:image: mysql:8.0environment:MYSQL_ROOT_PASSWORD: root_passwordvolumes:- ./slave.cnf:/etc/mysql/conf.d/slave.cnfports:- "3308:3306"depends_on:- mysql-master
2.2 实现原理
二进制日志(Binlog)复制机制
Binlog格式配置
-- 主库配置
[mysqld]
# 服务器唯一标识
server-id = 1# 启用二进制日志
log-bin = mysql-bin
binlog-format = ROW # 推荐使用ROW格式# 二进制日志过期时间
binlog_expire_logs_seconds = 604800 # 7天# 同步设置
sync_binlog = 1 # 每次提交都同步到磁盘# GTID设置
gtid-mode = ON
enforce-gtid-consistency = ON
Binlog事件类型
class BinlogEventTypes:"""二进制日志事件类型"""# 事务相关GTID_LOG_EVENT = "GTID事件"QUERY_EVENT = "SQL语句事件"# 数据变更WRITE_ROWS_EVENT = "插入行事件"UPDATE_ROWS_EVENT = "更新行事件"DELETE_ROWS_EVENT = "删除行事件"# 事务控制XID_EVENT = "事务提交事件"BEGIN_LOAD_QUERY_EVENT = "LOAD DATA开始事件"def parse_binlog_event(self, event):"""解析binlog事件"""if event.event_type == self.WRITE_ROWS_EVENT:return self.parse_insert_event(event)elif event.event_type == self.UPDATE_ROWS_EVENT:return self.parse_update_event(event)elif event.event_type == self.DELETE_ROWS_EVENT:return self.parse_delete_event(event)
复制线程工作原理
-- 查看复制状态
SHOW SLAVE STATUS\G-- 关键字段解释
/*
Slave_IO_State: 从库I/O线程状态
Master_Log_File: 当前读取的主库binlog文件
Read_Master_Log_Pos: 读取位置
Relay_Log_File: 当前中继日志文件
Relay_Log_Pos: 中继日志位置
Slave_IO_Running: I/O线程是否运行
Slave_SQL_Running: SQL线程是否运行
Seconds_Behind_Master: 复制延迟秒数
*/
GTID(全局事务标识)工作原理
GTID组成结构
-- GTID格式:source_id:transaction_id
-- 示例:3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5-- 查看GTID状态
SHOW MASTER STATUS;
SHOW SLAVE STATUS;-- 查看已执行的GTID集合
SELECT @@GLOBAL.gtid_executed;
SELECT @@GLOBAL.gtid_purged;
基于GTID的复制配置
-- 主库配置
[mysqld]
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON-- 从库配置
[mysqld]
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
read-only = ON-- 建立复制关系
CHANGE MASTER TOMASTER_HOST='192.168.1.10',MASTER_USER='repl',MASTER_PASSWORD='repl_password',MASTER_AUTO_POSITION=1; -- 使用GTID自动定位START SLAVE;
GTID故障恢复
class GTIDRecoveryManager:def __init__(self, master_conn, slave_conn):self.master = master_connself.slave = slave_conndef recover_from_gtid_gap(self):"""从GTID间隙恢复"""# 1. 获取主从GTID状态master_gtid = self.master.query("SELECT @@GLOBAL.gtid_executed")[0][0]slave_gtid = self.slave.query("SELECT @@GLOBAL.gtid_executed")[0][0]# 2. 计算缺失的GTIDmissing_gtids = self.calculate_missing_gtids(master_gtid, slave_gtid)# 3. 跳过缺失的GTID(如果确认数据一致)for gtid in missing_gtids:self.slave.execute(f"SET GTID_NEXT='{gtid}'")self.slave.execute("BEGIN; COMMIT;")self.slave.execute("SET GTID_NEXT='AUTOMATIC'")# 4. 重启复制self.slave.execute("START SLAVE")
主从同步数据流程
同步流程详解
同步性能监控
-- 监控复制性能的关键指标
SELECT CHANNEL_NAME,SERVICE_STATE,LAST_ERROR_MESSAGE,LAST_ERROR_TIMESTAMP
FROM performance_schema.replication_connection_status;SELECT CHANNEL_NAME,WORKER_ID,SERVICE_STATE,LAST_ERROR_MESSAGE,LAST_APPLIED_TRANSACTION
FROM performance_schema.replication_applier_status_by_worker;
复制过滤器使用
数据库级别过滤
-- 从库配置文件
[mysqld]
# 只复制指定数据库
replicate-do-db = production_db
replicate-do-db = analytics_db# 忽略指定数据库
replicate-ignore-db = test_db
replicate-ignore-db = temp_db
表级别过滤
-- 表级别复制过滤
[mysqld]
# 只复制指定表
replicate-do-table = production_db.users
replicate-do-table = production_db.orders# 忽略指定表
replicate-ignore-table = production_db.logs
replicate-ignore-table = production_db.sessions# 通配符过滤
replicate-wild-do-table = production_db.user_%
replicate-wild-ignore-table = %.temp_%
动态过滤器管理
class ReplicationFilterManager:def __init__(self, slave_connection):self.slave = slave_connectiondef add_table_filter(self, database, table, filter_type='do'):"""动态添加表过滤器"""if filter_type == 'do':filter_name = 'replicate-do-table'else:filter_name = 'replicate-ignore-table'# 停止复制self.slave.execute("STOP SLAVE SQL_THREAD")# 添加过滤器self.slave.execute(f"""CHANGE REPLICATION FILTER {filter_name.replace('-', '_')} = ('{database}.{table}')""")# 重启复制self.slave.execute("START SLAVE SQL_THREAD")def show_current_filters(self):"""显示当前过滤器设置"""return self.slave.query("SHOW SLAVE STATUS")[0]
2.3 并行复制
基于组提交的并行复制原理
组提交机制
-- 启用组提交相关参数
[mysqld]
# 组提交相关参数
binlog_group_commit_sync_delay = 1000 # 微秒
binlog_group_commit_sync_no_delay_count = 10# 并行复制参数
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 8
slave_preserve_commit_order = ON
并行复制工作原理
class ParallelReplicationCoordinator:def __init__(self, worker_count=8):self.workers = [ReplicationWorker(i) for i in range(worker_count)]self.coordinator_thread = Nonedef distribute_transactions(self, relay_log_events):"""分发事务到不同的工作线程"""for event in relay_log_events:if event.type == 'GTID_LOG_EVENT':# 根据逻辑时钟分配工作线程worker_id = self.calculate_worker_id(event)self.workers[worker_id].add_transaction(event)def calculate_worker_id(self, gtid_event):"""根据逻辑时钟计算工作线程ID"""# 同一个逻辑时钟内的事务可以并行执行logical_clock = gtid_event.logical_timestampreturn logical_clock % len(self.workers)
多线程复制实现方式
DATABASE级别并行
-- 配置数据库级别并行复制
[mysqld]
slave_parallel_type = DATABASE
slave_parallel_workers = 4# 每个数据库分配一个工作线程
# 适用于多数据库环境
LOGICAL_CLOCK级别并行
-- 配置逻辑时钟并行复制
[mysqld]
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 8
slave_preserve_commit_order = ON# 基于事务的逻辑时钟进行并行
# 更细粒度的并行控制
性能监控脚本
class ParallelReplicationMonitor:def __init__(self, connection):self.conn = connectiondef monitor_worker_status(self):"""监控并行复制工作线程状态"""workers = self.conn.query("""SELECT WORKER_ID,SERVICE_STATE,LAST_ERROR_MESSAGE,LAST_APPLIED_TRANSACTION,APPLYING_TRANSACTIONFROM performance_schema.replication_applier_status_by_worker""")for worker in workers:print(f"Worker {worker['WORKER_ID']}: {worker['SERVICE_STATE']}")if worker['LAST_ERROR_MESSAGE']:print(f" Error: {worker['LAST_ERROR_MESSAGE']}")def get_replication_lag_by_worker(self):"""获取各工作线程的复制延迟"""return self.conn.query("""SELECT WORKER_ID,LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,LAST_APPLIED_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP,TIMESTAMPDIFF(MICROSECOND,LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,NOW(6)) / 1000000 AS lag_secondsFROM performance_schema.replication_applier_status_by_worker""")
WRITESET并行复制技术
WRITESET配置
-- MySQL 8.0 WRITESET并行复制
[mysqld]
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 8
binlog_transaction_dependency_tracking = WRITESET
transaction_write_set_extraction = XXHASH64
冲突检测机制
class WriteSetConflictDetector:def __init__(self):self.write_sets = {} # 存储每个事务的写集合def detect_conflict(self, transaction1, transaction2):"""检测两个事务是否存在写冲突"""writeset1 = self.extract_writeset(transaction1)writeset2 = self.extract_writeset(transaction2)# 检查写集合是否有交集return bool(writeset1.intersection(writeset2))def extract_writeset(self, transaction):"""提取事务的写集合"""writeset = set()for statement in transaction.statements:if statement.type in ['INSERT', 'UPDATE', 'DELETE']:# 提取涉及的表和主键table_key = f"{statement.database}.{statement.table}"for pk_value in statement.primary_key_values:writeset.add(f"{table_key}#{pk_value}")return writeset
并行复制性能优化
参数调优
-- 并行复制优化参数
[mysqld]
# 工作线程数量(建议为CPU核心数)
slave_parallel_workers = 16# 保持提交顺序
slave_preserve_commit_order = ON# 检查点间隔
slave_checkpoint_period = 300# 待处理事件队列大小
slave_pending_jobs_size_max = 134217728 # 128MB
监控和调优脚本
#!/bin/bash
# 并行复制性能监控脚本echo "=== 并行复制状态监控 ==="mysql -e "
SELECT WORKER_ID,SERVICE_STATE,LAST_APPLIED_TRANSACTION_RETRIES_COUNT,LAST_APPLIED_TRANSACTION_START_APPLY_TIMESTAMP,LAST_APPLIED_TRANSACTION_END_APPLY_TIMESTAMP
FROM performance_schema.replication_applier_status_by_worker;
"echo "=== 复制延迟统计 ==="mysql -e "
SELECT AVG(TIMESTAMPDIFF(MICROSECOND,LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,LAST_APPLIED_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) / 1000000) AS avg_lag_seconds,MAX(TIMESTAMPDIFF(MICROSECOND,LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP,LAST_APPLIED_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) / 1000000) AS max_lag_seconds
FROM performance_schema.replication_applier_status_by_worker
WHERE LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP IS NOT NULL;
"
2.4 读写分离
ProxySQL实现方案
ProxySQL安装配置
# 安装ProxySQL
yum install -y proxysql# 启动ProxySQL
systemctl start proxysql
systemctl enable proxysql
基础配置
-- 连接ProxySQL管理接口
mysql -u admin -padmin -h 127.0.0.1 -P6032-- 配置MySQL服务器
INSERT INTO mysql_servers(hostgroup_id, hostname, port, weight, comment) VALUES
(0, '192.168.1.10', 3306, 1000, 'Master'),
(1, '192.168.1.11', 3306, 900, 'Slave1'),
(1, '192.168.1.12', 3306, 900, 'Slave2');-- 配置用户
INSERT INTO mysql_users(username, password, default_hostgroup) VALUES
('app_user', 'app_password', 0);-- 配置查询路由规则
INSERT INTO mysql_query_rules(rule_id, active, match_pattern, destination_hostgroup, apply) VALUES
(1, 1, '^SELECT.*FOR UPDATE', 0, 1),
(2, 1, '^SELECT.*', 1, 1),
(3, 1, '^INSERT|UPDATE|DELETE.*', 0, 1);-- 加载配置到运行时
LOAD MYSQL SERVERS TO RUNTIME;
LOAD MYSQL USERS TO RUNTIME;
LOAD MYSQL QUERY RULES TO RUNTIME;-- 保存配置到磁盘
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL USERS TO DISK;
SAVE MYSQL QUERY_RULES TO DISK;
高级路由规则
-- 基于正则表达式的复杂路由
INSERT INTO mysql_query_rules(rule_id, active, match_pattern, destination_hostgroup, apply, comment) VALUES
(10, 1, '^SELECT.*FROM users WHERE.*', 1, 1, '用户查询路由到从库'),
(11, 1, '^SELECT.*FROM orders WHERE.*created_at.*', 1, 1, '历史订单查询路由到从库'),
(12, 1, '^SELECT COUNT\(\*\).*', 1, 1, '统计查询路由到从库'),
(13, 1, '^SELECT.*JOIN.*', 1, 1, '复杂查询路由到从库');-- 基于用户的路由规则
INSERT INTO mysql_query_rules(rule_id, active, username, match_pattern, destination_hostgroup, apply) VALUES
(20, 1, 'report_user', '^SELECT.*', 1, 1), -- 报表用户只能查询从库
(21, 1, 'admin_user', '.*', 0, 1); -- 管理员用户路由到主库
ProxySQL监控
class ProxySQLMonitor:def __init__(self, admin_connection):self.admin_conn = admin_connectiondef get_connection_stats(self):"""获取连接统计信息"""return self.admin_conn.query("""SELECT hostgroup,srv_host,srv_port,status,ConnUsed,ConnFree,ConnOK,ConnERR,Queries,Bytes_data_sent,Bytes_data_recvFROM stats_mysql_connection_pool""")def get_query_stats(self):"""获取查询统计信息"""return self.admin_conn.query("""SELECT hostgroup,schemaname,username,digest_text,count_star,first_seen,last_seen,sum_time,min_time,max_timeFROM stats_mysql_query_digestORDER BY sum_time DESCLIMIT 20""")
MySQL Router实现方案
MySQL Router配置
# /etc/mysqlrouter/mysqlrouter.conf
[DEFAULT]
logging_folder = /var/log/mysqlrouter
plugin_folder = /usr/lib64/mysqlrouter
config_folder = /etc/mysqlrouter[logger]
level = INFO# 读写分离配置
[routing:primary]
bind_address = 0.0.0.0
bind_port = 7001
destinations = 192.168.1.10:3306
routing_strategy = first-available
mode = read-write[routing:secondary]
bind_address = 0.0.0.0
bind_port = 7002
destinations = 192.168.1.11:3306,192.168.1.12:3306
routing_strategy = round-robin
mode = read-only
应用程序集成
@Configuration
public class DatabaseConfig {@Bean@Primarypublic DataSource writeDataSource() {HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://mysql-router:7001/mydb");config.setUsername("app_user");config.setPassword("app_password");config.setMaximumPoolSize(20);return new HikariDataSource(config);}@Beanpublic DataSource readDataSource() {HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://mysql-router:7002/mydb");config.setUsername("app_user");config.setPassword("app_password");config.setMaximumPoolSize(50);return new HikariDataSource(config);}
}
应用层读写分离方案
Spring Boot实现
@Component
public class DatabaseRoutingDataSource extends AbstractRoutingDataSource {@Overrideprotected Object determineCurrentLookupKey() {return DatabaseContextHolder.getDbType();}
}@Component
public class DatabaseContextHolder {private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();public static void setDbType(String dbType) {contextHolder.set(dbType);}public static String getDbType() {return contextHolder.get();}public static void clearDbType() {contextHolder.remove();}
}@Aspect
@Component
public class ReadWriteSplitAspect {@Before("@annotation(readOnly)")public void setReadDataSourceType(ReadOnly readOnly) {DatabaseContextHolder.setDbType("read");}@Before("@annotation(Transactional)")public void setWriteDataSourceType(Transactional transactional) {if (!transactional.readOnly()) {DatabaseContextHolder.setDbType("write");}}@After("@annotation(ReadOnly) || @annotation(Transactional)")public void clearDataSourceType() {DatabaseContextHolder.clearDbType();}
}
自定义注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}@Service
public class UserService {@ReadOnlypublic List<User> findAllUsers() {// 这个方法会路由到从库return userRepository.findAll();}@Transactionalpublic User saveUser(User user) {// 这个方法会路由到主库return userRepository.save(user);}
}
负载均衡策略
轮询策略
class RoundRobinLoadBalancer:def __init__(self, servers):self.servers = serversself.current = 0def get_server(self):server = self.servers[self.current]self.current = (self.current + 1) % len(self.servers)return server
加权轮询策略
class WeightedRoundRobinLoadBalancer:def __init__(self, servers_with_weights):self.servers = []for server, weight in servers_with_weights:self.servers.extend([server] * weight)self.current = 0def get_server(self):server = self.servers[self.current]self.current = (self.current + 1) % len(self.servers)return server
最少连接策略
class LeastConnectionsLoadBalancer:def __init__(self, servers):self.servers = {server: 0 for server in servers}def get_server(self):# 选择连接数最少的服务器return min(self.servers.keys(), key=lambda s: self.servers[s])def add_connection(self, server):self.servers[server] += 1def remove_connection(self, server):self.servers[server] -= 1
一致性读处理方法
强制主库读取
@Service
public class OrderService {@Transactionalpublic Order createOrder(OrderRequest request) {// 创建订单(写主库)Order order = orderRepository.save(new Order(request));// 立即查询刚创建的订单(强制读主库)return this.getOrderFromMaster(order.getId());}@ReadFromMaster // 自定义注解,强制读主库public Order getOrderFromMaster(Long orderId) {return orderRepository.findById(orderId);}
}
延迟补偿策略
class ConsistencyManager:def __init__(self, master_conn, slave_conn):self.master = master_connself.slave = slave_connself.lag_threshold = 1 # 1秒延迟阈值def read_with_consistency_check(self, query, params):"""带一致性检查的读取"""# 1. 检查复制延迟lag = self.get_replication_lag()if lag > self.lag_threshold:# 延迟过大,从主库读取return self.master.query(query, params)else:# 延迟可接受,从从库读取return self.slave.query(query, params)def get_replication_lag(self):"""获取复制延迟"""result = self.slave.query("SHOW SLAVE STATUS")return result[0]['Seconds_Behind_Master'] or 0
读写分离中间件对比
| 特性 | ProxySQL | MySQL Router | 应用层实现 |
|---|---|---|---|
| 部署复杂度 | 中等 | 简单 | 复杂 |
| 性能开销 | 低 | 低 | 最低 |
| 功能丰富度 | 高 | 中等 | 高 |
| 运维复杂度 | 中等 | 低 | 高 |
| 故障转移 | 自动 | 自动 | 需要实现 |
| 查询缓存 | 支持 | 不支持 | 可实现 |
| 连接池 | 内置 | 内置 | 需要配置 |
第3节 双主模式
3.1 适用场景
高可用性要求严格的业务
金融交易系统
class FinancialTransactionSystem:def __init__(self):self.primary_db = MySQLConnection('primary_host')self.secondary_db = MySQLConnection('secondary_host')self.current_master = 'primary'def process_payment(self, from_account, to_account, amount):"""处理支付交易,要求高可用性"""try:master_conn = self.get_current_master()with master_conn.transaction():# 检查账户余额balance = master_conn.query("SELECT balance FROM accounts WHERE id = %s FOR UPDATE",[from_account])[0]['balance']if balance < amount:raise InsufficientFundsError()# 执行转账master_conn.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",[amount, from_account])master_conn.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",[amount, to_account])# 记录交易日志master_conn.execute("INSERT INTO transactions (from_account, to_account, amount, status) VALUES (%s, %s, %s, 'completed')",[from_account, to_account, amount])except DatabaseConnectionError:# 主库故障,自动切换到备库self.failover_to_secondary()return self.process_payment(from_account, to_account, amount)
电商核心业务
-- 订单处理存储过程,要求高可用性
DELIMITER $$
CREATE PROCEDURE ProcessOrder(IN p_user_id INT,IN p_product_id INT,IN p_quantity INT,OUT p_order_id INT,OUT p_result_code INT
)
BEGINDECLARE v_stock INT DEFAULT 0;DECLARE v_price DECIMAL(10,2) DEFAULT 0;DECLARE EXIT HANDLER FOR SQLEXCEPTIONBEGINROLLBACK;SET p_result_code = -1;END;START TRANSACTION;-- 锁定库存检查SELECT stock, price INTO v_stock, v_priceFROM products WHERE id = p_product_id FOR UPDATE;IF v_stock < p_quantity THENSET p_result_code = -2; -- 库存不足ROLLBACK;ELSE-- 扣减库存UPDATE products SET stock = stock - p_quantity WHERE id = p_product_id;-- 创建订单INSERT INTO orders (user_id, total_amount, status, created_at)VALUES (p_user_id, v_price * p_quantity, 'pending', NOW());SET p_order_id = LAST_INSERT_ID();-- 创建订单明细INSERT INTO order_items (order_id, product_id, quantity, price)VALUES (p_order_id, p_product_id, p_quantity, v_price);SET p_result_code = 0; -- 成功COMMIT;END IF;
END$$
DELIMITER ;
多活数据中心部署
跨地域双活架构
# 双活数据中心配置
datacenter_config:beijing:mysql_master:host: "10.1.1.10"port: 3306server_id: 1auto_increment_offset: 1auto_increment_increment: 2mysql_slave:host: "10.1.1.11"port: 3306server_id: 2shanghai:mysql_master:host: "10.2.1.10"port: 3306server_id: 3auto_increment_offset: 2auto_increment_increment: 2mysql_slave:host: "10.2.1.11"port: 3306server_id: 4# 网络配置
network:beijing_to_shanghai:bandwidth: "100Mbps"latency: "30ms"shanghai_to_beijing:bandwidth: "100Mbps"latency: "30ms"
地域路由策略
class GeographicRoutingManager:def __init__(self):self.datacenters = {'beijing': {'master': 'mysql://10.1.1.10:3306/db','slave': 'mysql://10.1.1.11:3306/db'},'shanghai': {'master': 'mysql://10.2.1.10:3306/db','slave': 'mysql://10.2.1.11:3306/db'}}def route_request(self, client_ip, operation_type):"""根据客户端IP和操作类型路由请求"""client_region = self.get_client_region(client_ip)if operation_type == 'write':# 写操作路由到本地主库return self.datacenters[client_region]['master']else:# 读操作路由到本地从库return self.datacenters[client_region]['slave']def get_client_region(self, client_ip):"""根据客户端IP确定地域"""if client_ip.startswith('10.1.'):return 'beijing'elif client_ip.startswith('10.2.'):return 'shanghai'else:return 'beijing' # 默认路由
避免单点故障的系统
关键业务系统架构
class CriticalSystemArchitecture:def __init__(self):self.master_nodes = [MySQLNode('master1', '192.168.1.10', is_master=True),MySQLNode('master2', '192.168.1.11', is_master=True)]self.current_active_master = 0self.health_checker = HealthChecker()def execute_critical_operation(self, operation):"""执行关键业务操作"""max_retries = 3for attempt in range(max_retries):try:active_master = self.get_active_master()result = active_master.execute(operation)# 验证操作结果if self.verify_operation_result(result):return resultexcept DatabaseError as e:self.handle_master_failure(e)if attempt == max_retries - 1:raise CriticalOperationFailedException()def get_active_master(self):"""获取当前活跃的主库"""active_master = self.master_nodes[self.current_active_master]if not self.health_checker.is_healthy(active_master):self.switch_to_backup_master()active_master = self.master_nodes[self.current_active_master]return active_masterdef switch_to_backup_master(self):"""切换到备用主库"""self.current_active_master = 1 - self.current_active_masterprint(f"Switched to backup master: {self.master_nodes[self.current_active_master].host}")
快速故障转移应用
自动故障转移系统
class AutoFailoverSystem:def __init__(self, cluster_config):self.cluster = cluster_configself.failover_timeout = 30 # 30秒故障转移超时self.health_check_interval = 5 # 5秒健康检查间隔def monitor_and_failover(self):"""监控并执行自动故障转移"""while True:try:if not self.check_master_health():self.execute_failover()time.sleep(self.health_check_interval)except Exception as e:self.log_error(f"Failover monitoring error: {e}")def execute_failover(self):"""执行故障转移"""start_time = time.time()try:# 1. 停止应用写入self.stop_application_writes()# 2. 等待从库同步完成self.wait_for_slave_sync()# 3. 提升从库为主库self.promote_slave_to_master()# 4. 更新应用配置self.update_application_config()# 5. 恢复应用写入self.resume_application_writes()elapsed_time = time.time() - start_timeprint(f"Failover completed in {elapsed_time:.2f} seconds")except Exception as e:self.rollback_failover()raise FailoverException(f"Failover failed: {e}")
3.2 MMM架构
Multi-Master Replication Manager详解
MMM架构组件
# MMM安装配置
yum install -y mysql-mmm-agent mysql-mmm-monitor# 配置文件结构
/etc/mysql-mmm/
├── mmm_agent.conf # 代理配置
├── mmm_mon.conf # 监控配置
└── mmm_common.conf # 公共配置
MMM公共配置
# /etc/mysql-mmm/mmm_common.conf
active_master_role writer<host default>cluster_interface eth0pid_path /var/run/mysql-mmm/mmm_agentd.pidbin_path /usr/libexec/mysql-mmm/replication_user replicationreplication_password repl_passwordagent_user mmm_agentagent_password agent_password
</host><host db1>ip 192.168.1.10mode masterpeer db2
</host><host db2>ip 192.168.1.11mode masterpeer db1
</host><host db3>ip 192.168.1.12mode slave
</host><host db4>ip 192.168.1.13mode slave
</host><role writer>hosts db1, db2ips 192.168.1.100mode exclusive
</role><role reader>hosts db1, db2, db3, db4ips 192.168.1.101, 192.168.1.102mode balanced
</role>
MMM监控配置
# /etc/mysql-mmm/mmm_mon.conf
include mmm_common.conf<monitor>ip 127.0.0.1pid_path /var/run/mysql-mmm/mmm_mond.pidbin_path /usr/libexec/mysql-mmmstatus_path /var/lib/mysql-mmm/mmm_mond.statusping_ips 192.168.1.10,192.168.1.11,192.168.1.12,192.168.1.13auto_set_online 10# 故障检测参数ping_interval 1ping_timeout 3flap_detection trueflap_duration 3600flap_count 3
</monitor><host default>monitor_user mmm_monitormonitor_password monitor_password
</host>
VIP漂移实现原理
VIP管理脚本
#!/bin/bash
# VIP漂移脚本VIP="192.168.1.100"
INTERFACE="eth0:1"
NETMASK="255.255.255.0"case "$1" instart)echo "Adding VIP $VIP to $INTERFACE"/sbin/ifconfig $INTERFACE $VIP netmask $NETMASK up/sbin/arping -q -A -c 1 -I eth0 $VIP;;stop)echo "Removing VIP $VIP from $INTERFACE"/sbin/ifconfig $INTERFACE down;;status)/sbin/ifconfig $INTERFACE | grep $VIP > /dev/null 2>&1if [ $? -eq 0 ]; thenecho "VIP $VIP is active"exit 0elseecho "VIP $VIP is not active"exit 1fi;;*)echo "Usage: $0 {start|stop|status}"exit 1;;
esac
VIP切换监控
class VIPManager:def __init__(self, vip_config):self.vip = vip_config['ip']self.interface = vip_config['interface']self.current_master = Nonedef check_vip_status(self):"""检查VIP状态"""try:result = subprocess.run(['ip', 'addr', 'show', self.interface],capture_output=True, text=True)return self.vip in result.stdoutexcept Exception as e:print(f"Error checking VIP status: {e}")return Falsedef move_vip_to_host(self, target_host):"""将VIP迁移到目标主机"""try:# 从当前主机移除VIPif self.current_master:self.remove_vip_from_host(self.current_master)# 在目标主机添加VIPself.add_vip_to_host(target_host)# 发送免费ARPself.send_gratuitous_arp(target_host)self.current_master = target_hostprint(f"VIP {self.vip} moved to {target_host}")except Exception as e:print(f"Error moving VIP: {e}")raise VIPMigrationException()def send_gratuitous_arp(self, host):"""发送免费ARP更新网络设备ARP表"""subprocess.run(['ssh', host,f'arping -q -A -c 3 -I eth0 {self.vip}'])
脑裂问题预防措施
脑裂检测机制
class SplitBrainDetector:def __init__(self, cluster_nodes):self.nodes = cluster_nodesself.quorum_size = len(cluster_nodes) // 2 + 1def detect_split_brain(self):"""检测脑裂情况"""active_masters = []for node in self.nodes:if self.is_node_acting_as_master(node):active_masters.append(node)if len(active_masters) > 1:return True, active_mastersreturn False, []def is_node_acting_as_master(self, node):"""检查节点是否作为主库运行"""try:result = node.query("SHOW MASTER STATUS")return len(result) > 0except:return Falsedef resolve_split_brain(self, conflicting_masters):"""解决脑裂问题"""# 选择具有最新GTID的节点作为主库best_master = self.select_best_master(conflicting_masters)for master in conflicting_masters:if master != best_master:# 将其他节点降级为从库self.demote_to_slave(master, best_master)def select_best_master(self, masters):"""选择最佳主库"""best_master = Nonelatest_gtid = Nonefor master in masters:gtid_executed = master.query("SELECT @@GLOBAL.gtid_executed")[0][0]if self.compare_gtid(gtid_executed, latest_gtid) > 0:latest_gtid = gtid_executedbest_master = masterreturn best_master
仲裁机制实现
class ArbitrationManager:def __init__(self, arbitrator_hosts):self.arbitrators = arbitrator_hostsself.quorum_threshold = len(arbitrator_hosts) // 2 + 1def get_cluster_consensus(self, proposed_master):"""获取集群共识"""votes = 0for arbitrator in self.arbitrators:if self.request_vote(arbitrator, proposed_master):votes += 1return votes >= self.quorum_thresholddef request_vote(self, arbitrator, candidate):"""向仲裁者请求投票"""try:response = requests.post(f"http://{arbitrator}/vote", json={'candidate': candidate.host},timeout=5)return response.json().get('vote') == 'yes'except:return False
监控和管理机制
MMM状态监控
#!/bin/bash
# MMM状态监控脚本echo "=== MMM集群状态 ==="
mmm_mon_ctrl showecho "=== 主机状态详情 ==="
mmm_mon_ctrl checks allecho "=== 角色分配情况 ==="
mmm_mon_ctrl rolesecho "=== 复制状态检查 ==="
for host in db1 db2 db3 db4; doecho "--- $host ---"mysql -h$host -e "SHOW SLAVE STATUS\G" | grep -E "(Slave_IO_Running|Slave_SQL_Running|Seconds_Behind_Master)"
done
自动化管理脚本
class MMManagementSystem:def __init__(self):self.mmm_mon_cmd = "/usr/bin/mmm_mon_ctrl"def get_cluster_status(self):"""获取集群状态"""result = subprocess.run([self.mmm_mon_cmd, 'show'], capture_output=True, text=True)return self.parse_cluster_status(result.stdout)def set_host_online(self, hostname):"""设置主机在线"""subprocess.run([self.mmm_mon_cmd, 'set_online', hostname])print(f"Host {hostname} set online")def set_host_offline(self, hostname):"""设置主机离线"""subprocess.run([self.mmm_mon_ctrl, 'set_offline', hostname])print(f"Host {hostname} set offline")def move_role(self, role, target_host):"""移动角色到指定主机"""subprocess.run([self.mmm_mon_cmd, 'move_role', role, target_host])print(f"Role {role} moved to {target_host}")def perform_health_check(self):"""执行健康检查"""hosts = ['db1', 'db2', 'db3', 'db4']for host in hosts:result = subprocess.run([self.mmm_mon_cmd, 'checks', host],capture_output=True, text=True)if 'OK' not in result.stdout:print(f"Health check failed for {host}: {result.stdout}")self.handle_unhealthy_host(host)def handle_unhealthy_host(self, host):"""处理不健康的主机"""# 实现具体的处理逻辑pass
3.3 MHA架构
Master High Availability实现
MHA组件安装
# 安装MHA Manager(管理节点)
yum install -y mha4mysql-manager# 安装MHA Node(所有MySQL节点)
yum install -y mha4mysql-node# 创建MHA工作目录
mkdir -p /var/log/masterha/app1
MHA配置文件
# /etc/masterha/app1.cnf
[server default]
# MHA Manager工作目录
manager_workdir=/var/log/masterha/app1
manager_log=/var/log/masterha/app1/manager.log# 远程工作目录
remote_workdir=/var/log/masterha/app1# SSH用户
ssh_user=root# 复制用户
repl_user=replication
repl_password=repl_password# 监控用户
user=mha_monitor
password=monitor_password# 检查间隔
ping_interval=3# 关机脚本
shutdown_script=/usr/local/bin/power_manager# 故障转移脚本
master_ip_failover_script=/usr/local/bin/master_ip_failover# 在线切换脚本
master_ip_online_change_script=/usr/local/bin/master_ip_online_change# 发送报告脚本
report_script=/usr/local/bin/send_report[server1]
hostname=192.168.1.10
port=3306
candidate_master=1
check_repl_delay=0[server2]
hostname=192.168.1.11
port=3306
candidate_master=1
check_repl_delay=0[server3]
hostname=192.168.1.12
port=3306
no_master=1[server4]
hostname=192.168.1.13
port=3306
no_master=1
故障检测和主从切换流程
MHA故障检测机制
class MHAFailureDetector:def __init__(self, config):self.config = configself.ping_interval = config.get('ping_interval', 3)self.ping_timeout = config.get('ping_timeout', 1)def detect_master_failure(self, master_host):"""检测主库故障"""# 1. TCP连接检查if not self.tcp_ping(master_host):return True# 2. MySQL连接检查if not self.mysql_ping(master_host):return True# 3. 复制状态检查if not self.check_replication_health(master_host):return Truereturn Falsedef tcp_ping(self, host, port=3306):"""TCP连接检查"""try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.ping_timeout)result = sock.connect_ex((host, port))sock.close()return result == 0except:return Falsedef mysql_ping(self, host):"""MySQL连接检查"""try:conn = pymysql.connect(host=host,user=self.config['user'],password=self.config['password'],connect_timeout=self.ping_timeout)conn.ping()conn.close()return Trueexcept:return False
自动故障转移流程
class MHAFailoverManager:def __init__(self, cluster_config):self.config = cluster_configself.slaves = self.get_slave_list()def execute_failover(self, failed_master):"""执行故障转移"""try:# 阶段1:选择新主库new_master = self.select_new_master()# 阶段2:应用差异日志self.apply_differential_logs(new_master)# 阶段3:提升新主库self.promote_new_master(new_master)# 阶段4:重新配置从库self.reconfigure_slaves(new_master)# 阶段5:更新应用配置self.update_application_config(new_master)print(f"Failover completed. New master: {new_master.host}")except Exception as e:print(f"Failover failed: {e}")raise FailoverException()def select_new_master(self):"""选择新主库"""candidates = [s for s in self.slaves if s.is_candidate_master]if not candidates:candidates = self.slaves# 选择复制位置最新的从库best_candidate = max(candidates, key=lambda s: s.get_relay_log_position())return best_candidatedef apply_differential_logs(self, new_master):"""应用差异日志"""# 获取所有从库的中继日志relay_logs = {}for slave in self.slaves:relay_logs[slave.host] = slave.get_relay_log_events()# 找出缺失的事件并应用到新主库missing_events = self.find_missing_events(relay_logs, new_master)for event in missing_events:new_master.apply_binlog_event(event)
二进制日志补偿机制
日志补偿实现
class BinlogCompensationManager:def __init__(self):self.mysqlbinlog_cmd = "/usr/bin/mysqlbinlog"def extract_differential_logs(self, failed_master, slaves):"""提取差异日志"""differential_logs = {}for slave in slaves:# 获取从库已应用的位置slave_position = slave.get_executed_gtid_set()# 从失败主库的binlog中提取未应用的事件missing_events = self.extract_missing_events(failed_master, slave_position)differential_logs[slave.host] = missing_eventsreturn differential_logsdef extract_missing_events(self, master_host, slave_gtid_set):"""从主库binlog提取缺失事件"""try:# 使用mysqlbinlog工具提取cmd = [self.mysqlbinlog_cmd,"--read-from-remote-server",f"--host={master_host}","--user=root","--password=password",f"--exclude-gtids={slave_gtid_set}","--base64-output=DECODE-ROWS","--verbose","mysql-bin.000001" # 最新的binlog文件]result = subprocess.run(cmd, capture_output=True, text=True)return self.parse_binlog_events(result.stdout)except Exception as e:print(f"Error extracting missing events: {e}")return []def apply_compensation_logs(self, target_slave, compensation_logs):"""应用补偿日志"""for log_event in compensation_logs:try:target_slave.execute(log_event.sql_statement)except Exception as e:print(f"Error applying compensation log: {e}")# 记录失败的事件,可能需要手动处理self.log_failed_compensation(log_event, e)
MHA管理工具集
MHA命令行工具
# 检查MHA配置
masterha_check_ssh --conf=/etc/masterha/app1.cnf
masterha_check_repl --conf=/etc/masterha/app1.cnf# 启动MHA Manager
masterha_manager --conf=/etc/masterha/app1.cnf --remove_dead_master_conf --ignore_last_failover# 检查MHA状态
masterha_check_status --conf=/etc/masterha/app1.cnf# 手动故障转移
masterha_master_switch --conf=/etc/masterha/app1.cnf --master_state=dead --dead_master_host=192.168.1.10 --new_master_host=192.168.1.11# 在线主从切换
masterha_master_switch --conf=/etc/masterha/app1.cnf --master_state=alive --new_master_host=192.168.1.11 --orig_master_is_new_slave
MHA监控脚本
class MHAMonitoringSystem:def __init__(self, config_file):self.config_file = config_fileself.status_file = "/var/log/masterha/app1/app1.master_status"def get_mha_status(self):"""获取MHA状态"""try:result = subprocess.run(['masterha_check_status',f'--conf={self.config_file}'], capture_output=True, text=True)return self.parse_status_output(result.stdout)except Exception as e:return {'status': 'error', 'message': str(e)}def is_mha_running(self):"""检查MHA是否运行"""return os.path.exists(self.status_file)def start_mha_manager(self):"""启动MHA Manager"""cmd = ['masterha_manager',f'--conf={self.config_file}','--remove_dead_master_conf','--ignore_last_failover']subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)def stop_mha_manager(self):"""停止MHA Manager"""subprocess.run(['masterha_stop', f'--conf={self.config_file}'])def perform_health_check(self):"""执行健康检查"""checks = {'ssh_check': self.check_ssh_connectivity(),'replication_check': self.check_replication_status(),'mha_status': self.get_mha_status()}return checksdef check_ssh_connectivity(self):"""检查SSH连通性"""result = subprocess.run(['masterha_check_ssh',f'--conf={self.config_file}'], capture_output=True, text=True)return 'OK' in result.stdoutdef check_replication_status(self):"""检查复制状态"""result = subprocess.run(['masterha_check_repl',f'--conf={self.config_file}'], capture_output=True, text=True)return 'OK' in result.stdout
3.4 主备切换
手动切换和自动切换区别
手动切换流程
class ManualFailoverManager:def __init__(self, cluster_config):self.config = cluster_configdef planned_switchover(self, current_master, new_master):"""计划内主从切换"""try:# 1. 预检查self.pre_switchover_checks(current_master, new_master)# 2. 停止应用写入self.stop_application_writes()# 3. 等待从库同步完成self.wait_for_slave_sync(new_master)# 4. 设置当前主库为只读current_master.execute("SET GLOBAL read_only = 1")# 5. 提升新主库new_master.execute("STOP SLAVE")new_master.execute("RESET SLAVE ALL")new_master.execute("SET GLOBAL read_only = 0")# 6. 重新配置原主库为从库self.configure_as_slave(current_master, new_master)# 7. 更新应用配置self.update_application_config(new_master)# 8. 恢复应用写入self.resume_application_writes()print("Manual switchover completed successfully")except Exception as e:self.rollback_switchover(current_master, new_master)raise SwitchoverException(f"Manual switchover failed: {e}")def pre_switchover_checks(self, current_master, new_master):"""切换前检查"""# 检查复制状态slave_status = new_master.query("SHOW SLAVE STATUS")[0]if slave_status['Slave_IO_Running'] != 'Yes' or slave_status['Slave_SQL_Running'] != 'Yes':raise Exception("Slave replication is not running properly")# 检查复制延迟if slave_status['Seconds_Behind_Master'] > 10:raise Exception("Slave lag is too high for switchover")# 检查GTID一致性master_gtid = current_master.query("SELECT @@GLOBAL.gtid_executed")[0][0]slave_gtid = new_master.query("SELECT @@GLOBAL.gtid_executed")[0][0]if not self.gtid_subset(slave_gtid, master_gtid):raise Exception("GTID consistency check failed")
自动切换机制
class AutomaticFailoverManager:def __init__(self, cluster_config):self.config = cluster_configself.failure_detector = FailureDetector()self.failover_timeout = 60 # 60秒故障转移超时def monitor_and_failover(self):"""监控并自动故障转移"""while True:try:current_master = self.get_current_master()if not self.failure_detector.is_master_healthy(current_master):self.execute_automatic_failover(current_master)time.sleep(self.config.get('check_interval', 5))except Exception as e:self.log_error(f"Automatic failover monitoring error: {e}")def execute_automatic_failover(self, failed_master):"""执行自动故障转移"""start_time = time.time()try:# 1. 确认主库故障if not self.confirm_master_failure(failed_master):return# 2. 选择新主库new_master = self.select_best_slave()# 3. 应用差异日志self.apply_missing_transactions(new_master)# 4. 提升新主库self.promote_slave_to_master(new_master)# 5. 重新配置其他从库self.reconfigure_remaining_slaves(new_master)# 6. 更新VIP或DNSself.update_service_endpoint(new_master)# 7. 发送告警通知self.send_failover_notification(failed_master, new_master)elapsed_time = time.time() - start_timeprint(f"Automatic failover completed in {elapsed_time:.2f} seconds")except Exception as e:self.handle_failover_failure(e)raise AutoFailoverException(f"Automatic failover failed: {e}")
切换过程中数据一致性保证
GTID一致性检查
class ConsistencyGuard:def __init__(self):self.gtid_parser = GTIDParser()def ensure_data_consistency(self, old_master, new_master, slaves):"""确保数据一致性"""# 1. 收集所有节点的GTID状态gtid_status = self.collect_gtid_status([old_master, new_master] + slaves)# 2. 找出最完整的GTID集合complete_gtid_set = self.find_complete_gtid_set(gtid_status)# 3. 补偿缺失的事务for node in [new_master] + slaves:missing_gtids = self.find_missing_gtids(node.gtid_executed, complete_gtid_set)if missing_gtids:self.apply_missing_transactions(node, missing_gtids)# 4. 验证一致性return self.verify_consistency([new_master] + slaves)def collect_gtid_status(self, nodes):"""收集GTID状态"""gtid_status = {}for node in nodes:try:gtid_executed = node.query("SELECT @@GLOBAL.gtid_executed")[0][0]gtid_purged = node.query("SELECT @@GLOBAL.gtid_purged")[0][0]gtid_status[node.host] = {'executed': gtid_executed,'purged': gtid_purged,'available': self.subtract_gtid_sets(gtid_executed, gtid_purged)}except Exception as e:print(f"Error collecting GTID status from {node.host}: {e}")gtid_status[node.host] = Nonereturn gtid_statusdef apply_missing_transactions(self, target_node, missing_gtids):"""应用缺失的事务"""for gtid in missing_gtids:try:# 从其他节点获取事务内容transaction_sql = self.get_transaction_sql(gtid)if transaction_sql:target_node.execute(transaction_sql)else:# 如果无法获取事务内容,注入空事务self.inject_empty_transaction(target_node, gtid)except Exception as e:print(f"Error applying transaction {gtid}: {e}")def inject_empty_transaction(self, node, gtid):"""注入空事务"""node.execute(f"SET GTID_NEXT = '{gtid}'")node.execute("BEGIN")node.execute("COMMIT")node.execute("SET GTID_NEXT = 'AUTOMATIC'")
切换后拓扑重构
拓扑重构管理器
class TopologyReconstructionManager:def __init__(self, cluster_config):self.config = cluster_configdef reconstruct_topology(self, new_master, failed_master, slaves):"""重构集群拓扑"""try:# 1. 配置新的主从关系self.setup_new_replication_topology(new_master, slaves)# 2. 处理失败的原主库self.handle_failed_master(failed_master, new_master)# 3. 优化复制拓扑self.optimize_replication_topology(new_master, slaves)# 4. 验证新拓扑self.validate_new_topology(new_master, slaves)print("Topology reconstruction completed")except Exception as e:print(f"Topology reconstruction failed: {e}")raise TopologyReconstructionException()def setup_new_replication_topology(self, new_master, slaves):"""设置新的复制拓扑"""for slave in slaves:if slave.host == new_master.host:continuetry:# 停止当前复制slave.execute("STOP SLAVE")# 配置新的主库slave.execute(f"""CHANGE MASTER TOMASTER_HOST = '{new_master.host}',MASTER_USER = 'replication',MASTER_PASSWORD = 'repl_password',MASTER_AUTO_POSITION = 1""")# 启动复制slave.execute("START SLAVE")print(f"Reconfigured {slave.host} to replicate from {new_master.host}")except Exception as e:print(f"Error reconfiguring slave {slave.host}: {e}")def handle_failed_master(self, failed_master, new_master):"""处理失败的原主库"""if self.is_node_recoverable(failed_master):# 如果原主库可以恢复,将其配置为从库self.configure_as_slave(failed_master, new_master)else:# 如果无法恢复,从集群中移除self.remove_from_cluster(failed_master)def optimize_replication_topology(self, master, slaves):"""优化复制拓扑"""# 根据网络延迟和负载情况优化复制链路if len(slaves) > 3:# 创建中间主库减少主库负载intermediate_master = self.select_intermediate_master(slaves)self.setup_intermediate_master(master, intermediate_master, slaves)def validate_new_topology(self, master, slaves):"""验证新拓扑"""for slave in slaves:slave_status = slave.query("SHOW SLAVE STATUS")[0]if (slave_status['Slave_IO_Running'] != 'Yes' or slave_status['Slave_SQL_Running'] != 'Yes'):raise Exception(f"Replication not working on {slave.host}")print("New topology validation passed")
切换演练和应急预案
故障演练系统
class DisasterRecoveryDrill:def __init__(self, cluster_config):self.config = cluster_configself.drill_scenarios = ['master_crash','network_partition','disk_failure','memory_exhaustion']def execute_drill(self, scenario):"""执行故障演练"""print(f"Starting disaster recovery drill: {scenario}")try:# 1. 记录当前状态initial_state = self.capture_cluster_state()# 2. 模拟故障self.simulate_failure(scenario)# 3. 执行恢复流程recovery_start = time.time()self.execute_recovery_procedure(scenario)recovery_time = time.time() - recovery_start# 4. 验证恢复结果self.validate_recovery()# 5. 生成演练报告self.generate_drill_report(scenario, recovery_time, initial_state)print(f"Drill completed successfully in {recovery_time:.2f} seconds")except Exception as e:print(f"Drill failed: {e}")self.cleanup_drill_environment()def simulate_failure(self, scenario):"""模拟故障场景"""if scenario == 'master_crash':self.simulate_master_crash()elif scenario == 'network_partition':self.simulate_network_partition()elif scenario == 'disk_failure':self.simulate_disk_failure()elif scenario == 'memory_exhaustion':self.simulate_memory_exhaustion()def simulate_master_crash(self):"""模拟主库崩溃"""master = self.get_current_master()# 使用iptables阻断连接模拟网络故障subprocess.run(['ssh', master.host,'iptables -A INPUT -p tcp --dport 3306 -j DROP'])print(f"Simulated master crash on {master.host}")def generate_drill_report(self, scenario, recovery_time, initial_state):"""生成演练报告"""report = {'scenario': scenario,'drill_time': datetime.now().isoformat(),'recovery_time_seconds': recovery_time,'initial_state': initial_state,'final_state': self.capture_cluster_state(),'rto_target': self.config.get('rto_target', 60),'rto_achieved': recovery_time <= self.config.get('rto_target', 60)}# 保存报告with open(f'/var/log/drill_report_{scenario}_{int(time.time())}.json', 'w') as f:json.dump(report, f, indent=2)print(f"Drill report saved. RTO target: {report['rto_target']}s, Achieved: {recovery_time:.2f}s")
应急预案文档
class EmergencyResponsePlan:def __init__(self):self.procedures = {'master_failure': self.master_failure_procedure,'slave_failure': self.slave_failure_procedure,'network_partition': self.network_partition_procedure,'data_corruption': self.data_corruption_procedure}def get_emergency_procedure(self, incident_type):"""获取应急处理程序"""return self.procedures.get(incident_type, self.generic_procedure)def master_failure_procedure(self):"""主库故障应急程序"""return {'immediate_actions': ['1. 确认主库故障(ping、telnet、mysql连接)','2. 检查从库状态和复制延迟','3. 选择最佳从库作为新主库','4. 停止应用写入操作'],'recovery_steps': ['1. 提升选定从库为主库','2. 重新配置其他从库指向新主库','3. 更新应用配置或VIP指向','4. 验证数据一致性','5. 恢复应用写入操作'],'verification_checklist': ['□ 新主库可正常读写','□ 所有从库复制正常','□ 应用连接正常','□ 数据一致性验证通过','□ 监控告警正常'],'rollback_plan': ['如果新主库出现问题,立即切换到另一个从库','如果数据不一致,停止服务并进行数据修复','必要时从备份恢复数据']}
实际配置示例和最佳实践
性能测试数据
主从复制性能测试
# 使用sysbench进行性能测试
sysbench oltp_read_write \--mysql-host=192.168.1.10 \--mysql-user=test \--mysql-password=test \--mysql-db=testdb \--tables=10 \--table-size=100000 \--threads=16 \--time=300 \--report-interval=10 \preparesysbench oltp_read_write \--mysql-host=192.168.1.10 \--mysql-user=test \--mysql-password=test \--mysql-db=testdb \--tables=10 \--table-size=100000 \--threads=16 \--time=300 \--report-interval=10 \run
测试结果分析
性能测试结果对比:单机MySQL:
- QPS: 15,000
- TPS: 3,000
- 平均响应时间: 5.3ms
- 95%响应时间: 12.5ms主从架构(1主2从):
- 写QPS: 12,000 (-20%)
- 读QPS: 35,000 (+133%)
- 总QPS: 47,000 (+213%)
- 平均响应时间: 4.2ms
- 95%响应时间: 9.8ms双主架构:
- 写QPS: 18,000 (+20%)
- 读QPS: 28,000 (+87%)
- 总QPS: 46,000 (+207%)
- 平均响应时间: 4.8ms
- 95%响应时间: 11.2ms
最佳实践建议
1. 架构选择指南
- 读多写少场景:选择主从架构,配置多个只读从库
- 高可用要求:选择双主架构,配合MHA或MMM
- 地理分布:选择多活数据中心架构
- 大数据量:考虑分库分表+集群架构
2. 配置优化建议
-- 主库优化配置
[mysqld]
# 基础配置
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
gtid-mode = ON
enforce-gtid-consistency = ON# 性能优化
innodb_buffer_pool_size = 8G # 物理内存的70-80%
innodb_log_file_size = 1G
innodb_flush_log_at_trx_commit = 1
sync_binlog = 1# 复制优化
slave_parallel_workers = 8
slave_parallel_type = LOGICAL_CLOCK
slave_preserve_commit_order = ON
3. 监控告警体系
# 关键监控指标
monitoring_metrics = {'replication_lag': {'warning_threshold': 5, # 5秒'critical_threshold': 30 # 30秒},'connection_usage': {'warning_threshold': 70, # 70%'critical_threshold': 90 # 90%},'disk_usage': {'warning_threshold': 80, # 80%'critical_threshold': 95 # 95%},'cpu_usage': {'warning_threshold': 80, # 80%'critical_threshold': 95 # 95%}
}
总结
MySQL集群架构是现代高并发、高可用系统的核心技术。通过本文的深入分析,我们了解了:
核心要点:
- 架构设计原则:高可用性、可扩展性、数据一致性和性能优化的平衡
- 主从模式:适用于读多写少场景,通过读写分离提升性能
- 双主模式:提供更高的可用性,适用于关键业务系统
- 技术实现:从Binlog复制到GTID,从MMM到MHA的演进
实践建议:
- 根据业务特点选择合适的架构模式
- 重视监控告警和故障演练
- 定期进行性能测试和容量规划
- 建立完善的运维流程和应急预案
发展趋势:
随着云原生技术的发展,MySQL集群架构正在向更加自动化、智能化的方向演进。容器化部署、自动故障转移、智能负载均衡等技术将进一步提升MySQL集群的可用性和可维护性。
