[每周一更]-(第145期):分表数据扩容处理:原理与实战
文章目录
- 一、为什么需要分表扩容?
- 二、分表 vs 分库:先分表,还是同步进行?
- 三、分表策略有哪些?
- 四、扩容的挑战与处理方式
- 五、如何设计支持“动态扩容”的分表系统?
- 六、典型实践案例简述
- 七、总结建议
- 八、示例
- (1)**实现分表路由策略逻辑**的示例
- 1. 分表路由器结构定义
- 2. 获取目标表名
- 3. 用法示例
- 输出示例:
- 扩展建议
- (2)自动迁移脚本基础版
- 脚本要点说明
- 迁移后建议的验证
- (3)升级改进方案设计
- 1. 迁移状态表结构
- 2.日志表设计(可选持久化到数据库)
- 3.支持断点续传的多协程分表迁移脚本
- 3.辅助函数:迁移进度读取与更新
- 脚本优势
- 示例查看进度
一、为什么需要分表扩容?
随着系统业务量增长,单表数据不断膨胀,可能会出现:
- 查询性能下降(索引失效、全表扫描)
- 写入速度变慢(锁竞争、I/O 瓶颈)
- 数据库单点风险增加
因此,分表是一种常用的“水平扩展(Horizontal Scaling)”策略。
二、分表 vs 分库:先分表,还是同步进行?
- 分表:一个数据库中,把某个大表按某种规则拆成多张小表。
- 分库分表:数据分散到多个数据库实例的多个表中,适合更大规模场景。
初期大多采用先分表策略,便于控制和迁移。
三、分表策略有哪些?
- 按范围分表(Range Sharding)
例:按时间、ID段划分,如 user_202401、user_202402- 优点:易于维护,查询有边界
- 缺点:冷热数据不均衡,某些表会成为热点
- 按哈希分表(Hash Sharding)
例:user_%(user_id % 4) → user_0、user_1、user_2、user_3- 优点:数据分布均匀
- 缺点:不易范围查询,扩容麻烦
- 按业务维度分表(如地区、商户、租户)
适合多租户系统,实现物理隔离或逻辑隔离
四、扩容的挑战与处理方式
1. 扩容时机如何判断?
- 单表记录数达到上限(如千万级)
- 查询、写入明显变慢
- 某个表频繁成为访问瓶颈
2. 扩容策略:如何处理已有数据?
- 新旧表并行:老表保留不动,新增数据写入新表
- 全量迁移:将旧数据迁移到新表结构,做过渡处理(高风险)
- 分区管理:借助中间层(如 ShardingSphere、MyCAT)统一管理分表逻辑
3. 扩容过程需解决的问题:
- 数据迁移的一致性与幂等处理
- 历史数据查询兼容
- 应用层改造:支持多表路由
- 灰度发布与回滚策略
五、如何设计支持“动态扩容”的分表系统?
- 抽象出路由层(sharding router):根据 key 决定数据落表
- 保持扩容前后的路由规则兼容性(如:支持2路转4路)
- 使用配置中心动态维护分表映射关系
- 日志追踪/审计:便于调试和排查错误路由
六、典型实践案例简述
示例:用户表 user 表按 user_id 取模分 4 表,未来需扩到 8 表。
迁移步骤大致如下:
- 开新表 user_4 ~ user_7
- 建立新分表规则(如 %8)
- 数据批量迁移(可用临时双写、或异步复制)
- 应用灰度切换至新规则
- 验证无误后,淘汰旧分表规则
七、总结建议
- 提前设计好路由层与分表逻辑,避免耦合业务代码
- 监控分表热点情况,及时预警扩容
- 如条件允许,可借助 中间件统一管理分表,减少扩容痛点
八、示例
(1)实现分表路由策略逻辑的示例
适合你在文章中展示如何根据业务主键(如
user_id
)将数据路由到对应的子表。
我们以「按哈希分表」策略为例,即:
tableIndex = user_id % tableCount
1. 分表路由器结构定义
package shardingimport ("fmt"
)type ShardRouter struct {TableBaseName string // 基础表名,如 "user"TableCount int // 表的总数,如 4 表:user_0 ~ user_3
}// NewShardRouter 创建一个分表路由器
func NewShardRouter(baseName string, count int) *ShardRouter {return &ShardRouter{TableBaseName: baseName,TableCount: count,}
}
2. 获取目标表名
// GetTableName 返回指定 userID 应该落到哪张表
func (r *ShardRouter) GetTableName(userID int64) string {index := userID % int64(r.TableCount)return fmt.Sprintf("%s_%d", r.TableBaseName, index)
}
3. 用法示例
package mainimport ("fmt""your_project/sharding"
)func main() {router := sharding.NewShardRouter("user", 4)testUserIDs := []int64{1001, 2022, 3033, 4044, 5055}for _, uid := range testUserIDs {table := router.GetTableName(uid)fmt.Printf("UserID: %d => Table: %s\n", uid, table)}
}
输出示例:
UserID: 1001 => Table: user_1
UserID: 2022 => Table: user_2
UserID: 3033 => Table: user_1
UserID: 4044 => Table: user_0
UserID: 5055 => Table: user_3
扩展建议
如果你计划支持 动态扩容(如从 4 表扩到 8 表),可以这样设计:
type VersionedShardRouter struct {VersionMapping map[int]*ShardRouter // version => routerCurrentVersion int
}// 可按时间或ID范围映射使用不同版本的路由
或者,如果未来要支持跨库分表:
type ShardTarget struct {DBName stringTableName string
}
func (r *ShardRouter) Route(userID int64) ShardTarget {// hash or range logic, return db+table
}
(2)自动迁移脚本基础版
下面看一个实际可能用到的需求,数据表扩容,用过分表的可能都清楚,比如早起分了10个表,随着数据量增加发现又得到了单表的限制,需要再次分表来扩容,那下边我们看看如何处理。
我们以:
- 原表为:
user_0 ~ user_3
(4 表) - 扩容到新表:
user_0 ~ user_7
(8 表) - 按
user_id % 8
的新规则重新分布数据。
package mainimport ("database/sql""fmt""log"_ "github.com/go-sql-driver/mysql"
)const (oldTableCount = 4newTableCount = 8baseTableName = "user"batchSize = 1000
)func main() {dsn := "user:password@tcp(127.0.0.1:3306)/your_db?charset=utf8mb4&parseTime=True&loc=Local"db, err := sql.Open("mysql", dsn)if err != nil {log.Fatalf("Connect error: %v", err)}defer db.Close()for i := 0; i < oldTableCount; i++ {oldTable := fmt.Sprintf("%s_%d", baseTableName, i)fmt.Printf("Migrating from table: %s\n", oldTable)migrateTable(db, oldTable)}
}func migrateTable(db *sql.DB, oldTable string) {var lastID int64 = 0for {// 读取一批数据query := fmt.Sprintf("SELECT id, user_id, name, email FROM %s WHERE id > ? ORDER BY id ASC LIMIT ?", oldTable)rows, err := db.Query(query, lastID, batchSize)if err != nil {log.Fatalf("Query error from %s: %v", oldTable, err)}defer rows.Close()type User struct {ID int64UserID int64Name stringEmail string}var users []Userfor rows.Next() {var u Userif err := rows.Scan(&u.ID, &u.UserID, &u.Name, &u.Email); err != nil {log.Fatalf("Scan error: %v", err)}users = append(users, u)lastID = u.ID}if len(users) == 0 {break}// 插入到新表for _, u := range users {newTable := fmt.Sprintf("%s_%d", baseTableName, u.UserID%newTableCount)insertSQL := fmt.Sprintf("INSERT INTO %s (id, user_id, name, email) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), email=VALUES(email)", newTable)_, err := db.Exec(insertSQL, u.ID, u.UserID, u.Name, u.Email)if err != nil {log.Printf("Insert error into %s: %v", newTable, err)}}fmt.Printf("Migrated batch up to ID %d from %s\n", lastID, oldTable)}
}
脚本要点说明
点位 | 说明 |
---|---|
分批处理 | 避免内存和锁压力,提升容错能力 |
ON DUPLICATE KEY | 保证幂等执行,可重复运行 |
多协程可扩展 | 若你数据量大,可以将每张旧表并发迁移 |
支持中断续跑 | 可持久化 lastID 或用状态表做断点续传 |
避免写入死锁 | 插入顺序与主键升序有助于减少锁冲突 |
迁移后建议的验证
-
统计校验:
SELECT COUNT(*) FROM user_0 + user_1 + ... + user_3; SELECT COUNT(*) FROM user_0 + ... + user_7;
-
spot-check 用户:
SELECT * FROM user_5 WHERE user_id = 10055;
-
应用层切换到新路由策略前,建议“新旧表双写”一段时间以保障数据一致性。
(3)升级改进方案设计
增加支持断点续传、每张表迁移进度、多协程处理、迁移日志等功能
我们引入一个迁移状态表(migration_progress
)来记录每张旧表的迁移进度(即每张表上次迁移到的最大 id
值)。
1. 迁移状态表结构
CREATE TABLE migration_progress (table_name VARCHAR(64) PRIMARY KEY,last_id BIGINT NOT NULL,updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
2.日志表设计(可选持久化到数据库)
如果你想将日志存入数据库:
CREATE TABLE migration_log (id BIGINT AUTO_INCREMENT PRIMARY KEY,table_name VARCHAR(64),batch_start BIGINT,batch_end BIGINT,record_count INT,status VARCHAR(20),message TEXT,created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
推荐使用文件日志为主,数据库日志为补充。
3.支持断点续传的多协程分表迁移脚本
package mainimport ("database/sql""fmt""log""sync""time"_ "github.com/go-sql-driver/mysql"
)const (oldTableCount = 4newTableCount = 8baseTableName = "user"batchSize = 1000maxGoroutines = 4
)type User struct {ID int64UserID int64Name stringEmail string
}func main() {dsn := "user:password@tcp(127.0.0.1:3306)/your_db?charset=utf8mb4&parseTime=True&loc=Local"db, err := sql.Open("mysql", dsn)if err != nil {log.Fatalf("Connect error: %v", err)}defer db.Close()var wg sync.WaitGroupsem := make(chan struct{}, maxGoroutines)for i := 0; i < oldTableCount; i++ {wg.Add(1)sem <- struct{}{}go func(index int) {defer wg.Done()defer func() { <-sem }()oldTable := fmt.Sprintf("%s_%d", baseTableName, index)log.Printf("Starting migration from %s\n", oldTable)if err := migrateTable(db, oldTable); err != nil {log.Printf("Migration failed for %s: %v", oldTable, err)}log.Printf("Finished migration from %s\n", oldTable)}(i)}wg.Wait()log.Println("All migrations completed.")
}func migrateTable(db *sql.DB, oldTable string) error {lastID, err := getLastMigratedID(db, oldTable)if err != nil {return err}for {query := fmt.Sprintf("SELECT id, user_id, name, email FROM %s WHERE id > ? ORDER BY id ASC LIMIT ?", oldTable)rows, err := db.Query(query, lastID, batchSize)if err != nil {logError(oldTable, lastID, lastID, 0, "query_error", err.Error())return fmt.Errorf("query failed: %w", err)}var users []Uservar batchStartID = lastIDfor rows.Next() {var u Userif err := rows.Scan(&u.ID, &u.UserID, &u.Name, &u.Email); err != nil {rows.Close()logError(oldTable, batchStartID, u.ID, len(users), "scan_error", err.Error())return fmt.Errorf("scan failed: %w", err)}users = append(users, u)lastID = u.ID}rows.Close()if len(users) == 0 {break}var successCount intfor _, u := range users {newTable := fmt.Sprintf("%s_%d", baseTableName, u.UserID%newTableCount)insertSQL := fmt.Sprintf("INSERT INTO %s (id, user_id, name, email) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), email=VALUES(email)",newTable,)_, err := db.Exec(insertSQL, u.ID, u.UserID, u.Name, u.Email)if err != nil {log.Printf("Insert failed to %s: %v", newTable, err)logError(oldTable, batchStartID, lastID, successCount, "insert_error", err.Error())continue}successCount++}err = updateLastMigratedID(db, oldTable, lastID)if err != nil {logError(oldTable, batchStartID, lastID, successCount, "update_progress_error", err.Error())return fmt.Errorf("update progress failed: %w", err)}log.Printf("[%s] Migrated batch: %d records, ID %d to %d", oldTable, successCount, batchStartID, lastID)logSuccess(oldTable, batchStartID, lastID, successCount)time.Sleep(10 * time.Millisecond)}return nil
}func getLastMigratedID(db *sql.DB, tableName string) (int64, error) {var lastID int64err := db.QueryRow("SELECT last_id FROM migration_progress WHERE table_name = ?", tableName).Scan(&lastID)if err == sql.ErrNoRows {// 初始化_, err := db.Exec("INSERT INTO migration_progress (table_name, last_id) VALUES (?, 0)", tableName)if err != nil {return 0, fmt.Errorf("init progress row failed: %w", err)}return 0, nil} else if err != nil {return 0, fmt.Errorf("query progress failed: %w", err)}return lastID, nil
}func updateLastMigratedID(db *sql.DB, tableName string, lastID int64) error {_, err := db.Exec("UPDATE migration_progress SET last_id = ? WHERE table_name = ?", lastID, tableName)return err
}func logSuccess(table string, startID, endID int64, count int) {logger.Printf("[SUCCESS] table=%s, id_range=%d-%d, count=%d\n",table, startID, endID, count)
}func logError(table string, startID, endID int64, count int, code string, msg string) {logger.Printf("[ERROR] table=%s, id_range=%d-%d, count=%d, code=%s, msg=%s\n",table, startID, endID, count, code, msg)
}
3.辅助函数:迁移进度读取与更新
func getLastMigratedID(db *sql.DB, tableName string) (int64, error) {var lastID int64err := db.QueryRow("SELECT last_id FROM migration_progress WHERE table_name = ?", tableName).Scan(&lastID)if err == sql.ErrNoRows {// 初始化_, err := db.Exec("INSERT INTO migration_progress (table_name, last_id) VALUES (?, 0)", tableName)if err != nil {return 0, fmt.Errorf("init progress row failed: %w", err)}return 0, nil} else if err != nil {return 0, fmt.Errorf("query progress failed: %w", err)}return lastID, nil
}func updateLastMigratedID(db *sql.DB, tableName string, lastID int64) error {_, err := db.Exec("UPDATE migration_progress SET last_id = ? WHERE table_name = ?", lastID, tableName)return err
}
脚本优势
能力 | 描述 |
---|---|
并发迁移 | 提升速度,充分利用多核和数据库连接池 |
支持断点续传 | 每张旧表独立记录进度,不怕重启或失败 |
幂等可重复运行 | 使用 ON DUPLICATE KEY 保证多次运行不重复插入 |
细粒度进度监控 | 可实时查看每张表迁移到哪个 ID |
示例查看进度
SELECT * FROM migration_progress ORDER BY updated_at DESC;