遗留系统微服务改造(二):数据迁移实战攻略与一致性保证
数据迁移是遗留系统微服务改造中风险最高的环节。想象一下这样的场景:一个运行了十几年的电商系统,用户、订单、商品、库存等所有数据都挤在同一个数据库里,表与表之间的关联关系错综复杂。要把这样的单体数据库安全拆分成多个独立的微服务数据库,难度可想而知。
我参与过一个大型电商平台的数据库拆分项目,原系统有超过500张表,日均数据量增长100GB,峰值QPS达到50万。这样的系统做数据迁移,就像在钢丝上跳舞——稍有不慎就会导致业务中断,损失可能以千万计。
1. 数据库拆分的挑战
1.1 遗留系统数据库的典型问题
遗留系统通常使用单一的大型数据库,所有业务数据都挤在同一个库里。这种设计在微服务架构下会带来不少麻烦:
数据耦合严重:不同业务模块的数据表之间关联关系复杂,订单表直接关联用户表、商品表、库存表等。一个简单的订单查询可能需要关联十几张表。
事务边界模糊:原本通过数据库事务保证的ACID特性,在分布式环境下需要重新设计。下单流程涉及用户验证、库存扣减、订单创建、支付处理等多个步骤,原来一个事务搞定,现在要考虑分布式事务。
性能瓶颈:单一数据库成为整个系统的性能瓶颈,无法针对不同业务特点进行优化。用户查询和订单统计抢占同一个数据库资源,互相影响性能。
扩展困难:无法根据业务特点选择合适的数据存储方案。用户画像适合用NoSQL,财务数据更适合关系型数据库,但现在只能用一种。
1.2 真实案例:电商平台数据库现状
用一个真实的电商平台案例来说明问题的复杂性:
-- 原始系统的核心表结构(简化版)
-- 用户表
CREATE TABLE users (id BIGINT PRIMARY KEY,username VARCHAR(50),email VARCHAR(100),phone VARCHAR(20),password_hash VARCHAR(255),profile_data JSON,created_at TIMESTAMP,updated_at TIMESTAMP
);-- 商品表
CREATE TABLE products (id BIGINT PRIMARY KEY,name VARCHAR(200),description TEXT,price DECIMAL(10,2),category_id BIGINT,seller_id BIGINT,created_at TIMESTAMP,FOREIGN KEY (seller_id) REFERENCES users(id)
);-- 订单表
CREATE TABLE orders (id BIGINT PRIMARY KEY,user_id BIGINT,total_amount DECIMAL(10,2),status VARCHAR(20),shipping_address TEXT,created_at TIMESTAMP,FOREIGN KEY (user_id) REFERENCES users(id)
);-- 订单明细表
CREATE TABLE order_items (id BIGINT PRIMARY KEY,order_id BIGINT,product_id BIGINT,quantity INT,price DECIMAL(10,2),FOREIGN KEY (order_id) REFERENCES orders(id),FOREIGN KEY (product_id) REFERENCES products(id)
);-- 库存表
CREATE TABLE inventory (id BIGINT PRIMARY KEY,product_id BIGINT,quantity INT,reserved_quantity INT,updated_at TIMESTAMP,FOREIGN KEY (product_id) REFERENCES products(id)
);
这个看似简单的结构,实际上隐藏着巨大的复杂性:
- 一个订单查询需要关联用户、商品、库存等多张表
- 用户既是买家又可能是卖家,数据关系复杂
- 库存更新和订单创建必须在同一个事务中完成
- 所有业务都依赖这个单一数据库,任何变更都可能影响全局
2. 数据库拆分原则
要想拆分数据库不出问题,需要遵循几个核心原则:
2.1 按业务领域拆分
根据领域驱动设计(DDD)的思想,按照业务边界来划分数据库。这不是简单的技术拆分,而是要深入理解业务逻辑。
用户域:用户注册、登录、个人信息管理
商品域:商品信息、分类、搜索
订单域:订单创建、支付、物流
库存域:库存管理、预留、释放
2.2 数据独立性
每个微服务都要有自己的数据库,不能直接访问别的服务的数据。这是微服务架构的铁律。
// 错误的做法:直接访问其他服务的数据库
@Service
public class OrderService {@Autowiredprivate UserRepository userRepository; // 直接访问用户数据库public void createOrder(OrderRequest request) {User user = userRepository.findById(request.getUserId()); // 违反了数据独立性// ...}
}// 正确的做法:通过服务接口访问
@Service
public class OrderService {@Autowiredprivate UserServiceClient userServiceClient; // 通过服务接口访问public void createOrder(OrderRequest request) {UserInfo user = userServiceClient.getUserInfo(request.getUserId());// ...}
}
2.3 最小化跨服务查询
设计的时候就要想办法减少跨服务查询。每次跨服务调用都是网络开销,还可能出现级联故障。
最常用的办法就是适当冗余数据。比如订单里存一份用户名和邮箱,这样查订单的时候就不用再去查用户服务了。虽然会有一些数据冗余,但换来的是性能提升和系统稳定性。
// 通过数据冗余避免跨服务查询
@Entity
public class Order {private Long id;private Long userId;private String userName; // 冗余用户名,避免每次都查询用户服务private String userEmail; // 冗余用户邮箱private BigDecimal totalAmount;// ...
}
2.4 保持数据一致性
数据一致性是微服务架构下最头疼的问题。原来单体应用一个事务就能搞定的事情,现在要跨多个服务,复杂度直线上升。
常用的解决方案有几种:最终一致性适合大部分场景,性能好但可能有短暂的数据不一致;分布式事务保证强一致性,但性能开销大;补偿机制是个折中方案,通过业务逻辑来保证最终的数据正确性。
3. 数据库拆分策略
3.1 垂直拆分:按业务模块拆分
这是最常用也是最直观的拆分方式。简单来说就是把不同业务的表分开放,用户相关的表放到用户服务,订单相关的表放到订单服务。
拆分的时候要注意业务边界,尽量让强相关的数据放在一起。比如用户基本信息和用户地址应该在同一个服务里,这样可以减少跨服务调用。
-- 拆分后的用户服务数据库
CREATE DATABASE user_service;
USE user_service;CREATE TABLE users (id BIGINT PRIMARY KEY,username VARCHAR(50),email VARCHAR(100),phone VARCHAR(20),password_hash VARCHAR(255),profile_data JSON,created_at TIMESTAMP,updated_at TIMESTAMP
);CREATE TABLE user_addresses (id BIGINT PRIMARY KEY,user_id BIGINT,address_type VARCHAR(20),address_detail TEXT,is_default BOOLEAN,FOREIGN KEY (user_id) REFERENCES users(id)
);
-- 拆分后的商品服务数据库
CREATE DATABASE product_service;
USE product_service;CREATE TABLE products (id BIGINT PRIMARY KEY,name VARCHAR(200),description TEXT,price DECIMAL(10,2),category_id BIGINT,seller_id BIGINT, -- 不再是外键,通过服务调用关联created_at TIMESTAMP
);CREATE TABLE product_categories (id BIGINT PRIMARY KEY,name VARCHAR(100),parent_id BIGINT,level INT
);
-- 拆分后的订单服务数据库
CREATE DATABASE order_service;
USE order_service;CREATE TABLE orders (id BIGINT PRIMARY KEY,user_id BIGINT, -- 不再是外键,通过服务调用关联user_name VARCHAR(50), -- 冗余数据,避免频繁查询用户服务total_amount DECIMAL(10,2),status VARCHAR(20),shipping_address TEXT,created_at TIMESTAMP
);CREATE TABLE order_items (id BIGINT PRIMARY KEY,order_id BIGINT,product_id BIGINT, -- 不再是外键product_name VARCHAR(200), -- 冗余商品名称quantity INT,price DECIMAL(10,2),FOREIGN KEY (order_id) REFERENCES orders(id)
);
3.2 水平拆分:按数据量拆分
对于数据量特别大的表,可以考虑水平拆分。
// 订单表按时间分片
@Component
public class OrderShardingStrategy {public String determineShardKey(Date orderDate) {Calendar cal = Calendar.getInstance();cal.setTime(orderDate);int year = cal.get(Calendar.YEAR);int month = cal.get(Calendar.MONTH) + 1;return String.format("order_%d_%02d", year, month);}
}
-- 按月分片的订单表
CREATE TABLE order_2024_01 (id BIGINT PRIMARY KEY,user_id BIGINT,total_amount DECIMAL(10,2),status VARCHAR(20),created_at TIMESTAMP
);CREATE TABLE order_2024_02 (id BIGINT PRIMARY KEY,user_id BIGINT,total_amount DECIMAL(10,2),status VARCHAR(20),created_at TIMESTAMP
);
4. 数据迁移实战
4.1 数据迁移工具设计
// 通用数据迁移框架
@Component
public class DataMigrationTool {@Autowiredprivate LegacyDataSource legacyDataSource;@Autowiredprivate Map<String, DataSource> targetDataSources;@Autowiredprivate MigrationMetrics migrationMetrics;public void migrateTable(MigrationConfig config) {Timer.Sample sample = migrationMetrics.startMigrationTimer();try {validateMigrationConfig(config);long totalRecords = countRecords(config);logger.info("Starting migration for table: {}, total records: {}", config.getTableName(), totalRecords);migrateData(config, totalRecords);migrationMetrics.recordMigrationSuccess(config.getTableName());logger.info("Migration completed for table: {}", config.getTableName());} catch (Exception e) {migrationMetrics.recordMigrationFailure(config.getTableName(), e.getMessage());logger.error("Migration failed for table: " + config.getTableName(), e);throw new DataMigrationException("Migration failed", e);} finally {sample.stop(migrationMetrics.getMigrationDuration());}}private void migrateData(MigrationConfig config, long totalRecords) {int batchSize = config.getBatchSize();long offset = 0;long migratedCount = 0;while (offset < totalRecords) {List<Map<String, Object>> batch = fetchBatch(config, offset, batchSize);if (batch.isEmpty()) {break;}insertBatch(config, batch);offset += batchSize;migratedCount += batch.size();// 记录进度double progress = (double) migratedCount / totalRecords * 100;logger.info("Migration progress for {}: {:.2f}% ({}/{})", config.getTableName(), progress, migratedCount, totalRecords);// 避免对源数据库造成过大压力if (config.getMigrationDelay() > 0) {try {Thread.sleep(config.getMigrationDelay());} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new DataMigrationException("Migration interrupted", e);}}}}private List<Map<String, Object>> fetchBatch(MigrationConfig config, long offset, int batchSize) {String sql = String.format("SELECT * FROM %s ORDER BY %s LIMIT %d OFFSET %d",config.getTableName(), config.getOrderByColumn(), batchSize, offset);try (Connection conn = legacyDataSource.getConnection();PreparedStatement stmt = conn.prepareStatement(sql);ResultSet rs = stmt.executeQuery()) {List<Map<String, Object>> result = new ArrayList<>();ResultSetMetaData metaData = rs.getMetaData();int columnCount = metaData.getColumnCount();while (rs.next()) {Map<String, Object> row = new HashMap<>();for (int i = 1; i <= columnCount; i++) {String columnName = metaData.getColumnName(i);Object value = rs.getObject(i);row.put(columnName, value);}result.add(row);}return result;} catch (SQLException e) {throw new DataMigrationException("Failed to fetch batch", e);}}private void insertBatch(MigrationConfig config, List<Map<String, Object>> batch) {DataSource targetDataSource = targetDataSources.get(config.getTargetDataSource());try (Connection conn = targetDataSource.getConnection()) {conn.setAutoCommit(false);String insertSql = buildInsertSql(config, batch.get(0));try (PreparedStatement stmt = conn.prepareStatement(insertSql)) {for (Map<String, Object> row : batch) {setStatementParameters(stmt, config, row);stmt.addBatch();}stmt.executeBatch();conn.commit();} catch (SQLException e) {conn.rollback();throw e;}} catch (SQLException e) {throw new DataMigrationException("Failed to insert batch", e);}}
}
4.2 数据迁移配置
// 迁移配置类
@Data
@Builder
public class MigrationConfig {private String tableName;private String targetDataSource;private String targetTableName;private String orderByColumn;private int batchSize;private long migrationDelay; // 毫秒private Map<String, String> columnMapping; // 源列名 -> 目标列名private List<String> excludeColumns;private String whereCondition;private DataTransformer dataTransformer;
}// 数据转换器接口
public interface DataTransformer {Map<String, Object> transform(Map<String, Object> sourceRow);
}// 用户数据转换器示例
@Component
public class UserDataTransformer implements DataTransformer {@Overridepublic Map<String, Object> transform(Map<String, Object> sourceRow) {Map<String, Object> targetRow = new HashMap<>(sourceRow);// 密码字段加密升级String oldPassword = (String) sourceRow.get("password");if (oldPassword != null && !oldPassword.startsWith("$2a$")) {String newPassword = BCrypt.hashpw(oldPassword, BCrypt.gensalt());targetRow.put("password_hash", newPassword);targetRow.remove("password");}// 手机号格式标准化String phone = (String) sourceRow.get("phone");if (phone != null) {phone = phone.replaceAll("[^0-9]", "");if (phone.length() == 11 && phone.startsWith("1")) {targetRow.put("phone", phone);} else {targetRow.put("phone", null);}}return targetRow;}
}
4.3 迁移任务编排
// 迁移任务编排器
@Component
public class MigrationOrchestrator {@Autowiredprivate DataMigrationTool migrationTool;@Autowiredprivate DataValidationService validationService;public void executeFullMigration() {List<MigrationTask> tasks = buildMigrationTasks();for (MigrationTask task : tasks) {try {logger.info("Starting migration task: {}", task.getName());// 执行迁移migrationTool.migrateTable(task.getConfig());// 数据校验ValidationResult validation = validationService.validateMigration(task);if (!validation.isValid()) {throw new DataMigrationException("Data validation failed: " + validation.getErrors());}logger.info("Migration task completed: {}", task.getName());} catch (Exception e) {logger.error("Migration task failed: " + task.getName(), e);// 根据配置决定是否继续执行后续任务if (task.isStopOnFailure()) {throw new DataMigrationException("Migration stopped due to task failure", e);}}}}private List<MigrationTask> buildMigrationTasks() {return Arrays.asList(// 1. 先迁移基础数据(无外键依赖)MigrationTask.builder().name("migrate-users").config(MigrationConfig.builder().tableName("users").targetDataSource("userService").targetTableName("users").orderByColumn("id").batchSize(1000).migrationDelay(100).dataTransformer(new UserDataTransformer()).build()).stopOnFailure(true).build(),// 2. 迁移商品数据MigrationTask.builder().name("migrate-products").config(MigrationConfig.builder().tableName("products").targetDataSource("productService").targetTableName("products").orderByColumn("id").batchSize(500).migrationDelay(50).build()).stopOnFailure(true).build(),// 3. 最后迁移关联数据MigrationTask.builder().name("migrate-orders").config(MigrationConfig.builder().tableName("orders").targetDataSource("orderService").targetTableName("orders").orderByColumn("created_at").batchSize(200).migrationDelay(200).dataTransformer(new OrderDataTransformer()).build()).stopOnFailure(false) // 订单迁移失败不影响其他任务.build());}
}
5. 数据一致性保证
5.1 双写策略
迁移过程中最怕的就是数据不一致,双写是个不错的解决办法:
// 双写服务
@Service
@Transactional
public class DualWriteService {@Autowiredprivate LegacyUserRepository legacyUserRepo;@Autowiredprivate NewUserRepository newUserRepo;@Autowiredprivate MessageQueue messageQueue;public void updateUser(Long userId, UserUpdateRequest request) {try {// 1. 更新遗留系统(主数据源)User legacyUser = legacyUserRepo.findById(userId).orElseThrow(() -> new UserNotFoundException("User not found: " + userId));legacyUser.setEmail(request.getEmail());legacyUser.setPhone(request.getPhone());legacyUser.setUpdatedAt(Instant.now());legacyUserRepo.save(legacyUser);// 2. 同步更新新系统(从数据源)try {NewUser newUser = convertToNewUser(legacyUser);newUserRepo.save(newUser);// 发送同步成功事件messageQueue.send(new UserSyncEvent(userId, "UPDATE", "SUCCESS"));} catch (Exception e) {// 新系统更新失败,记录补偿任务logger.error("Failed to sync user to new system: " + userId, e);scheduleCompensationTask(userId, request);// 发送同步失败事件messageQueue.send(new UserSyncEvent(userId, "UPDATE", "FAILED"));}} catch (Exception e) {logger.error("Failed to update user: " + userId, e);throw new UserUpdateException("User update failed", e);}}private void scheduleCompensationTask(Long userId, UserUpdateRequest request) {CompensationTask task = CompensationTask.builder().taskType("USER_SYNC").entityId(userId).payload(JsonUtils.toJson(request)).maxRetries(3).nextRetryTime(Instant.now().plusSeconds(30)).build();compensationTaskService.schedule(task);}
}
5.2 分布式事务处理
有些场景必须保证强一致性,这时候就得上分布式事务了:
// 使用Saga模式处理分布式事务
@Component
public class OrderSagaOrchestrator {@Autowiredprivate UserServiceClient userService;@Autowiredprivate InventoryServiceClient inventoryService;@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentServiceClient paymentService;public void processOrder(OrderRequest request) {SagaTransaction saga = SagaTransaction.builder().addStep("validateUser", () -> validateUser(request), () -> compensateUserValidation(request)).addStep("reserveInventory", () -> reserveInventory(request), () -> releaseInventory(request)).addStep("createOrder", () -> createOrder(request), () -> cancelOrder(request)).addStep("processPayment", () -> processPayment(request), () -> refundPayment(request)).build();try {saga.execute();logger.info("Order processed successfully: {}", request.getOrderId());} catch (SagaExecutionException e) {logger.error("Order saga failed, compensation completed", e);throw new OrderProcessingException("Order processing failed", e);}}private void validateUser(OrderRequest request) {UserValidationResponse response = userService.validateUser(request.getUserId());if (!response.isValid()) {throw new InvalidUserException("User validation failed");}request.setUserValidated(true);}private void compensateUserValidation(OrderRequest request) {// 用户验证的补偿通常为空操作logger.info("User validation compensation for order: {}", request.getOrderId());}private void reserveInventory(OrderRequest request) {for (OrderItem item : request.getItems()) {InventoryReservationResponse response = inventoryService.reserveInventory(item.getProductId(), item.getQuantity());if (!response.isSuccess()) {throw new InsufficientInventoryException("Inventory reservation failed");}item.setReservationId(response.getReservationId());}request.setInventoryReserved(true);}private void releaseInventory(OrderRequest request) {if (request.isInventoryReserved()) {for (OrderItem item : request.getItems()) {if (item.getReservationId() != null) {inventoryService.releaseReservation(item.getReservationId());}}}}private void createOrder(OrderRequest request) {Order order = Order.builder().userId(request.getUserId()).items(request.getItems()).totalAmount(request.getTotalAmount()).status(OrderStatus.PENDING).createdAt(Instant.now()).build();order = orderRepository.save(order);request.setOrderId(order.getId());request.setOrderCreated(true);}private void cancelOrder(OrderRequest request) {if (request.isOrderCreated() && request.getOrderId() != null) {Order order = orderRepository.findById(request.getOrderId()).orElse(null);if (order != null) {order.setStatus(OrderStatus.CANCELLED);orderRepository.save(order);}}}private void processPayment(OrderRequest request) {PaymentRequest paymentRequest = PaymentRequest.builder().orderId(request.getOrderId()).userId(request.getUserId()).amount(request.getTotalAmount()).build();PaymentResponse response = paymentService.processPayment(paymentRequest);if (!response.isSuccess()) {throw new PaymentFailedException("Payment processing failed");}request.setPaymentId(response.getPaymentId());request.setPaymentProcessed(true);// 更新订单状态Order order = orderRepository.findById(request.getOrderId()).orElse(null);if (order != null) {order.setStatus(OrderStatus.PAID);orderRepository.save(order);}}private void refundPayment(OrderRequest request) {if (request.isPaymentProcessed() && request.getPaymentId() != null) {RefundRequest refundRequest = RefundRequest.builder().paymentId(request.getPaymentId()).amount(request.getTotalAmount()).reason("Order processing failed").build();paymentService.processRefund(refundRequest);}}
}
5.3 最终一致性处理
不是所有场景都需要强一致性,很多时候最终一致性就够了:
// 基于事件的最终一致性
@Component
public class EventDrivenConsistencyHandler {@Autowiredprivate MessageQueue messageQueue;@EventListenerpublic void handleUserUpdate(UserUpdateEvent event) {// 发送用户更新事件UserSyncMessage message = UserSyncMessage.builder().userId(event.getUserId()).operation("UPDATE").timestamp(Instant.now()).data(event.getUserData()).version(event.getVersion()).build();messageQueue.send("user.sync.topic", message);}@RabbitListener(queues = "order.user.sync.queue")public void handleUserSyncInOrderService(UserSyncMessage message) {try {// 更新订单服务中的用户信息缓存orderUserCacheService.updateUserCache(message.getUserId(), message.getData());// 检查版本号,避免乱序更新UserCacheEntry currentEntry = orderUserCacheService.getUserCache(message.getUserId());if (currentEntry != null && currentEntry.getVersion() > message.getVersion()) {logger.warn("Received outdated user sync message, ignoring. User: {}, current version: {}, message version: {}",message.getUserId(), currentEntry.getVersion(), message.getVersion());return;}logger.info("User cache updated in order service for user: {}", message.getUserId());} catch (Exception e) {logger.error("Failed to sync user data in order service", e);// 发送到死信队列进行重试throw new AmqpRejectAndDontRequeueException("Sync failed", e);}}
}
6. 数据校验与监控
6.1 数据一致性校验
// 数据一致性校验服务
@Service
public class DataConsistencyValidator {@Autowiredprivate LegacyDataSource legacyDataSource;@Autowiredprivate Map<String, DataSource> targetDataSources;public ValidationResult validateUserData() {ValidationResult result = new ValidationResult();try {// 1. 数量校验long legacyCount = countUsers(legacyDataSource);long newCount = countUsers(targetDataSources.get("userService"));if (legacyCount != newCount) {result.addError(String.format("User count mismatch: legacy=%d, new=%d", legacyCount, newCount));}// 2. 抽样数据校验List<Long> sampleUserIds = getSampleUserIds(legacyDataSource, 1000);for (Long userId : sampleUserIds) {User legacyUser = getUserFromLegacy(userId);User newUser = getUserFromNew(userId);if (!isUserDataConsistent(legacyUser, newUser)) {result.addError(String.format("User data inconsistent for user: %d", userId));}}// 3. 关键字段校验validateCriticalFields(result);} catch (Exception e) {result.addError("Validation failed: " + e.getMessage());}return result;}private boolean isUserDataConsistent(User legacyUser, User newUser) {if (legacyUser == null && newUser == null) {return true;}if (legacyUser == null || newUser == null) {return false;}return Objects.equals(legacyUser.getId(), newUser.getId()) &&Objects.equals(legacyUser.getUsername(), newUser.getUsername()) &&Objects.equals(legacyUser.getEmail(), newUser.getEmail()) &&Objects.equals(legacyUser.getPhone(), newUser.getPhone());}private void validateCriticalFields(ValidationResult result) {// 校验关键字段的数据分布String sql = "SELECT COUNT(*) as cnt, email_domain FROM (SELECT SUBSTRING_INDEX(email, '@', -1) as email_domain FROM users WHERE email IS NOT NULL) t GROUP BY email_domain ORDER BY cnt DESC LIMIT 10";Map<String, Long> legacyDistribution = getEmailDomainDistribution(legacyDataSource, sql);Map<String, Long> newDistribution = getEmailDomainDistribution(targetDataSources.get("userService"), sql);for (Map.Entry<String, Long> entry : legacyDistribution.entrySet()) {String domain = entry.getKey();Long legacyCount = entry.getValue();Long newCount = newDistribution.get(domain);if (newCount == null || !newCount.equals(legacyCount)) {result.addError(String.format("Email domain distribution mismatch for %s: legacy=%d, new=%d", domain, legacyCount, newCount));}}}
}
6.2 实时监控
// 数据迁移监控
@Component
public class MigrationMonitor {private final MeterRegistry meterRegistry;private final Counter migrationSuccessCounter;private final Counter migrationFailureCounter;private final Timer migrationDuration;private final Gauge migrationProgress;public MigrationMonitor(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;this.migrationSuccessCounter = Counter.builder("migration.success").description("Number of successful migrations").register(meterRegistry);this.migrationFailureCounter = Counter.builder("migration.failure").description("Number of failed migrations").register(meterRegistry);this.migrationDuration = Timer.builder("migration.duration").description("Migration execution time").register(meterRegistry);this.migrationProgress = Gauge.builder("migration.progress").description("Migration progress percentage").register(meterRegistry, this, MigrationMonitor::getCurrentProgress);}private double currentProgress = 0.0;public void recordMigrationSuccess(String table) {migrationSuccessCounter.increment(Tags.of("table", table));}public void recordMigrationFailure(String table, String error) {migrationFailureCounter.increment(Tags.of("table", table, "error", error));}public Timer.Sample startMigrationTimer() {return Timer.start(meterRegistry);}public void updateProgress(double progress) {this.currentProgress = progress;}private double getCurrentProgress() {return currentProgress;}
}
7. 回滚策略
7.1 自动回滚机制
// 自动回滚服务
@Component
public class AutoRollbackService {@Autowiredprivate DataConsistencyValidator validator;@Autowiredprivate TrafficRoutingService routingService;@Autowiredprivate AlertService alertService;@Scheduled(fixedDelay = 60000) // 每分钟检查一次public void checkDataConsistency() {try {ValidationResult result = validator.validateUserData();if (!result.isValid()) {logger.error("Data consistency check failed: {}", result.getErrors());// 计算错误严重程度int errorCount = result.getErrors().size();if (errorCount > 10) { // 错误超过10个,触发自动回滚triggerAutoRollback("Data consistency errors: " + errorCount);} else {// 发送警告,但不回滚alertService.sendWarning("Data consistency issues detected", result.getErrors());}}} catch (Exception e) {logger.error("Data consistency check failed", e);alertService.sendError("Data consistency check failed", e.getMessage());}}private void triggerAutoRollback(String reason) {logger.error("Triggering automatic rollback due to: {}", reason);try {// 1. 将流量切回遗留系统routingService.routeToLegacySystem();// 2. 停止数据同步dataSyncService.stopSync();// 3. 发送紧急告警alertService.sendEmergencyAlert("Automatic rollback triggered", reason);// 4. 记录回滚事件auditService.recordRollback(reason, Instant.now());logger.info("Automatic rollback completed");} catch (Exception e) {logger.error("Automatic rollback failed", e);alertService.sendEmergencyAlert("Automatic rollback failed", e.getMessage());}}
}
8. 最佳实践总结
8.1 迁移前准备
- 摸清数据底细:把现有数据结构、数据量、数据质量都搞清楚
- 搭建测试环境:测试环境要和生产环境保持一致
- 制定迁移计划:分阶段进行,每一步都要有回滚方案
- 团队培训:让每个人都知道迁移流程和应急处理办法
8.2 迁移过程中
- 小步快跑:先从数据量小、风险低的表开始试水
- 盯紧监控:系统性能和数据一致性要时刻关注
- 灵活调整:发现问题马上调整策略,不要硬撑
- 及时沟通:和业务团队保持联系,有问题第一时间反馈
8.3 迁移后维护
- 定期校验:数据一致性检查不能停
- 性能调优:根据实际使用情况持续优化
- 更新文档:相关文档和操作手册要及时更新
- 总结经验:把踩过的坑记录下来,给后面的项目做参考