当前位置: 首页 > news >正文

Java | 基于redis实现分布式批量设置各个数据中心的服务器配置方案设计和代码实践

关注:CodingTechWork

系统概述

项目背景

  在企业级应用系统中,经常需要对大量服务器进行配置的批量更新。传统的同步处理方式存在性能瓶颈和单点故障风险。本系统基于Redis实现分布式异步批量处理,支持大规模服务器的配置设置操作。

核心需求

  • 支持服务器级别的配置批量设置
  • 实现中心维度的并发处理
  • 提供完善的进度监控和状态查询
  • 支持失败配置的重试机制
  • 保证系统的高可用性和可扩展性

技术架构

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Web层     │    │  服务层     │    │ 数据层      │
│ Controller  │───▶│  Service   │───▶│  Redis     │
│   API       │    │  Processor │    │  Database  │
└─────────────┘    └─────────────┘    └─────────────┘│                  │                  │└──────────────────┼──────────────────┘│┌─────────────┐│  异步任务    ││ ThreadPool │└─────────────┘

核心设计

Redis数据结构设计

Key命名规范

// 暂定相关的
DC:PAUSED:{dcId}
SERVER:PAUSED:{dcId}
GLOBAL:PAUSED:{dcId} 
// 中心服务器映射
DC:SERVERS:{dcId}// 服务器配置队列
SERVER:CONFIGS:TASK:{serverId}
SERVER:CONFIGS:MAP:{serverId}// 配置状态管理
SERVER:PENDING:{serverId}      // 待处理配置
SERVER:PROCESSING:{serverId}   // 处理中配置  
SERVER:COMPLETED:{serverId}    // 完成配置
SERVER:FAILED:{serverId}       // 失败配置// 配置详情
CONFIG:DETAIL:{serverId}:{configId}
CONFIG:STATUS:{serverId}:{configId}

数据模型

@Data
class PauseStatus {private String targetId; // 中心ID或服务器IDprivate String targetType; // "DC" 或 "SERVER"private Boolean paused;private Date pauseTime;private String pausedBy;public PauseStatus(String targetId, String targetType) {this.targetId = targetId;this.targetType = targetType;this.pauseTime = new Date();}
}@Data
class GlobalPauseStatus {private Boolean globalPaused;private Date pauseTime;private String pausedBy;private Integer affectedDcs;private Integer affectedServers;public GlobalPauseStatus() {this.pauseTime = new Date();}
}@Data
class PauseOperationResult {private Boolean success;private String message;private String operation; // "PAUSE" 或 "RESUME"private String targetType; // "GLOBAL", "DC", "SERVER"private String targetId;private Date operationTime;public PauseOperationResult(String operation, String targetType, String targetId) {this.operation = operation;this.targetType = targetType;this.targetId = targetId;this.operationTime = new Date();}
}
@Data
public class ConfigInfo {private String dcId;private String serverId;private String configId;private String configName;
}@Data
public class ConfigStatusStats {private String serverId;private Long pendingCount = 0L;private Long processingCount = 0L;private Long completedCount = 0L;private Long failedCount = 0L;public Long getTotalCount() {return pendingCount + processingCount + completedCount + failedCount;}public Double getSuccessRate() {long total = getTotalCount();return total == 0 ? 0.0 : (double) completedCount / total * 100;}
}@Data
public class ServerCompletionDetail {private String serverId;private String serverName;/*** COMPLETED, PROCESSING, PENDING, FAILED*/private String status;private Integer totalConfigs = 0;private Integer completedConfigs = 0;private Integer processingConfigs = 0;private Integer pendingConfigs = 0;private Integer failedConfigs = 0;private String completionRate;private Date lastUpdateTime;public boolean isCompleted() {return "COMPLETED".equals(status);}
}

核心业务流程

配置设置流程

1. 初始化阶段↓
2. 服务器分批处理↓  
3. 配置逐个执行(30秒间隔)↓
4. 状态实时更新↓
5. 结果统计汇总

容错机制

  • 配置级别重试
  • 服务器级别恢复
  • 中心级别批量重试
  • 异常状态监控

核心代码实现

Redis管理服务

@Component
@Slf4j
public class ConfigRedisManager {// Redis Key 常量// 暂定相关的private static final String DC_PAUSED_KEY = "dc:paused:%s";private static final String SERVER_PAUSED_KEY = "server:paused:%s";private static final String GLOBAL_PAUSED_KEY = "global:paused";// 中心服务器private static final String DC_SERVERS_KEY = "dc:servers:%s";private static final String SERVER_CONFIGS_KEY = "server:configs:%s";private static final String CONFIG_DETAIL_KEY = "config:detail:%s:%s";private static final String SERVER_PENDING_KEY = "server:pending:%s";private static final String SERVER_PROCESSING_KEY = "server:processing:%s";private static final String SERVER_COMPLETED_KEY = "server:completed:%s";private static final String SERVER_FAILED_KEY = "server:failed:%s";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 初始化中心服务器列表*/public void initDcServers(String dcId, List<String> serverIds) {String key = String.format(DC_SERVERS_KEY, dcId);redisTemplate.delete(key);if (CollUtil.isNotEmpty(serverIds)) {redisTemplate.opsForSet().add(key, serverIds.toArray());redisTemplate.expire(key, 48, TimeUnit.HOURS);}log.info("初始化中心服务器列表完成,中心ID: {},服务器数量: {}", dcId, CollUtil.isNotEmpty(serverIds) ? serverIds.size() : 0);}/*** 初始化服务器配置列表*/public void initServerConfigs(String dcId, String serverId, List<ConfigInfo> configs) {// 1. 存储服务器配置列表String serverConfigsKey = String.format(SERVER_CONFIGS_KEY, serverId);redisTemplate.delete(serverConfigsKey);if (CollUtil.isNotEmpty(configs)) {List<String> configIds = configs.stream().map(ConfigInfo::getConfigId).collect(Collectors.toList());redisTemplate.opsForList().rightPushAll(serverConfigsKey, configIds.toArray());redisTemplate.expire(serverConfigsKey, 48, TimeUnit.HOURS);}// 2. 存储配置详情for (ConfigInfo config : configs) {String configDetailKey = String.format(CONFIG_DETAIL_KEY, serverId, config.getConfigId());Map<String, Object> configDetail = new HashMap<>();configDetail.put("dcId", dcId);configDetail.put("serverId", serverId);configDetail.put("configId", config.getConfigId());configDetail.put("configName", config.getConfigName());configDetail.put("createTime", System.currentTimeMillis());redisTemplate.opsForHash().putAll(configDetailKey, configDetail);redisTemplate.expire(configDetailKey, 48, TimeUnit.HOURS);}// 3. 初始化待处理队列String pendingKey = String.format(SERVER_PENDING_KEY, serverId);redisTemplate.delete(pendingKey);if (CollUtil.isNotEmpty(configs)) {List<String> configIds = configs.stream().map(ConfigInfo::getConfigId).collect(Collectors.toList());redisTemplate.opsForList().rightPushAll(pendingKey, configIds.toArray());redisTemplate.expire(pendingKey, 48, TimeUnit.HOURS);}// 4. 清理其他状态队列redisTemplate.delete(String.format(SERVER_PROCESSING_KEY, serverId));redisTemplate.delete(String.format(SERVER_COMPLETED_KEY, serverId));redisTemplate.delete(String.format(SERVER_FAILED_KEY, serverId));log.info("初始化服务器配置完成,中心ID: {},服务器ID: {},配置数量: {}", dcId, serverId, CollUtil.isNotEmpty(configs) ? configs.size() : 0);}/*** 获取下一个待处理配置*/public ConfigInfo getNextPendingConfig(String serverId) {String pendingKey = String.format(SERVER_PENDING_KEY, serverId);String configId = (String) redisTemplate.opsForList().leftPop(pendingKey);if (configId == null) {return null;}// 获取配置详情String configDetailKey = String.format(CONFIG_DETAIL_KEY, serverId, configId);Map<Object, Object> configDetail = redisTemplate.opsForHash().entries(configDetailKey);if (MapUtil.isEmpty(configDetail)) {return null;}ConfigInfo config = new ConfigInfo();config.setDcId((String) configDetail.get("dcId"));config.setServerId((String) configDetail.get("serverId"));config.setConfigId((String) configDetail.get("configId"));config.setConfigName((String) configDetail.get("configName"));return config;}/*** 标记配置为处理中*/public void markConfigProcessing(ConfigInfo config) {String processingKey = String.format(SERVER_PROCESSING_KEY, config.getServerId());redisTemplate.opsForList().rightPush(processingKey, config.getConfigId());}/*** 标记配置为完成*/public void markConfigCompleted(ConfigInfo config) {String processingKey = String.format(SERVER_PROCESSING_KEY, config.getServerId());redisTemplate.opsForList().remove(processingKey, 0, config.getConfigId());String completedKey = String.format(SERVER_COMPLETED_KEY, config.getServerId());redisTemplate.opsForSet().add(completedKey, config.getConfigId());}/*** 标记配置为失败*/public void markConfigFailed(ConfigInfo config, String errorMsg) {String processingKey = String.format(SERVER_PROCESSING_KEY, config.getServerId());redisTemplate.opsForList().remove(processingKey, 0, config.getConfigId());String failedKey = String.format(SERVER_FAILED_KEY, config.getServerId());Map<String, String> failedInfo = new HashMap<>();failedInfo.put("configId", config.getConfigId());failedInfo.put("configName", config.getConfigName());failedInfo.put("errorMsg", errorMsg);failedInfo.put("failTime", String.valueOf(System.currentTimeMillis()));redisTemplate.opsForHash().put(failedKey, config.getConfigId(), JSON.toJSONString(failedInfo));}/*** 获取配置状态统计*/public ConfigStatusStats getConfigStatusStats(String serverId) {ConfigStatusStats stats = new ConfigStatusStats();stats.setServerId(serverId);try {// 待处理数量String pendingKey = String.format(SERVER_PENDING_KEY, serverId);Long pendingCount = redisTemplate.opsForList().size(pendingKey);stats.setPendingCount(pendingCount != null ? pendingCount : 0L);// 处理中数量String processingKey = String.format(SERVER_PROCESSING_KEY, serverId);Long processingCount = redisTemplate.opsForList().size(processingKey);stats.setProcessingCount(processingCount != null ? processingCount : 0L);// 完成数量String completedKey = String.format(SERVER_COMPLETED_KEY, serverId);Long completedCount = redisTemplate.opsForSet().size(completedKey);stats.setCompletedCount(completedCount != null ? completedCount : 0L);// 失败数量String failedKey = String.format(SERVER_FAILED_KEY, serverId);Long failedCount = redisTemplate.opsForHash().size(failedKey);stats.setFailedCount(failedCount != null ? failedCount : 0L);} catch (Exception e) {log.error("获取服务器配置状态统计异常,服务器ID: {}", serverId, e);stats.setPendingCount(0L);stats.setProcessingCount(0L);stats.setCompletedCount(0L);stats.setFailedCount(0L);}return stats;}/*** 重试失败配置*/public void retryFailedConfigs(String serverId) {String failedKey = String.format(SERVER_FAILED_KEY, serverId);String pendingKey = String.format(SERVER_PENDING_KEY, serverId);Map<Object, Object> failedMap = redisTemplate.opsForHash().entries(failedKey);if (MapUtil.isNotEmpty(failedMap)) {for (Object configId : failedMap.keySet()) {redisTemplate.opsForList().rightPush(pendingKey, configId);}redisTemplate.delete(failedKey);}}/*** 检查服务器是否还有待处理配置*/public boolean hasPendingConfigs(String serverId) {String pendingKey = String.format(SERVER_PENDING_KEY, serverId);Long count = redisTemplate.opsForList().size(pendingKey);return count != null && count > 0;}/*** 获取中心服务器列表*/public Set<String> getDcServers(String dcId) {String key = String.format(DC_SERVERS_KEY, dcId);Set<Object> serverObjects = redisTemplate.opsForSet().members(key);if (CollUtil.isNotEmpty(serverObjects)) {return serverObjects.stream().map(obj -> obj.toString()).collect(Collectors.toSet());}return Collections.emptySet();}/*** 暂停中心设置任务*/public void pauseDcSetup(String dcId) {String pausedKey = String.format(DC_PAUSED_KEY, dcId);redisTemplate.opsForValue().set(pausedKey, "true", 48, TimeUnit.HOURS);log.info("中心设置任务已暂停,中心ID: {}", dcId);}/*** 恢复中心设置任务*/public void resumeDcSetup(String dcId) {String pausedKey = String.format(DC_PAUSED_KEY, dcId);redisTemplate.delete(pausedKey);log.info("中心设置任务已恢复,中心ID: {}", dcId);}/*** 暂停服务器设置任务*/public void pauseServerSetup(String serverId) {String pausedKey = String.format(SERVER_PAUSED_KEY, serverId);redisTemplate.opsForValue().set(pausedKey, "true", 48, TimeUnit.HOURS);log.info("服务器设置任务已暂停,服务器ID: {}", serverId);}/*** 恢复服务器设置任务*/public void resumeServerSetup(String serverId) {String pausedKey = String.format(SERVER_PAUSED_KEY, serverId);redisTemplate.delete(pausedKey);log.info("服务器设置任务已恢复,服务器ID: {}", serverId);}/*** 全局暂停所有设置任务*/public void pauseGlobalSetup() {redisTemplate.opsForValue().set(GLOBAL_PAUSED_KEY, "true", 48, TimeUnit.HOURS);log.info("全局设置任务已暂停");}/*** 全局恢复所有设置任务*/public void resumeGlobalSetup() {redisTemplate.delete(GLOBAL_PAUSED_KEY);log.info("全局设置任务已恢复");}/*** 检查中心是否被暂停*/public boolean isDcPaused(String dcId) {String globalPaused = (String) redisTemplate.opsForValue().get(GLOBAL_PAUSED_KEY);if ("true".equals(globalPaused)) {return true;}String dcPausedKey = String.format(DC_PAUSED_KEY, dcId);String dcPaused = (String) redisTemplate.opsForValue().get(dcPausedKey);return "true".equals(dcPaused);}/*** 检查服务器是否被暂停*/public boolean isServerPaused(String serverId) {String globalPaused = (String) redisTemplate.opsForValue().get(GLOBAL_PAUSED_KEY);if ("true".equals(globalPaused)) {return true;}// 检查服务器所属中心是否暂停ConfigInfo sampleConfig = getSampleConfig(serverId);if (sampleConfig != null && isDcPaused(sampleConfig.getDcId())) {return true;}String serverPausedKey = String.format(SERVER_PAUSED_KEY, serverId);String serverPaused = (String) redisTemplate.opsForValue().get(serverPausedKey);return "true".equals(serverPaused);}/*** 获取服务器的示例配置(用于获取中心ID)*/private ConfigInfo getSampleConfig(String serverId) {String serverConfigsKey = String.format(SERVER_CONFIGS_KEY, serverId);List<Object> configIds = redisTemplate.opsForList().range(serverConfigsKey, 0, 0);if (CollUtil.isEmpty(configIds)) {return null;}String configId = (String) configIds.get(0);String configDetailKey = String.format(CONFIG_DETAIL_KEY, serverId, configId);Map<Object, Object> configDetail = redisTemplate.opsForHash().entries(configDetailKey);if (MapUtil.isEmpty(configDetail)) {return null;}ConfigInfo config = new ConfigInfo();config.setDcId((String) configDetail.get("dcId"));config.setServerId((String) configDetail.get("serverId"));config.setConfigId((String) configDetail.get("configId"));config.setConfigName((String) configDetail.get("configName"));return config;}/*** 获取所有暂停的中心列表*/public Set<String> getPausedDcs() {Set<String> pausedDcs = new HashSet<>();// 检查全局暂停String globalPaused = (String) redisTemplate.opsForValue().get(GLOBAL_PAUSED_KEY);if ("true".equals(globalPaused)) {// 如果全局暂停,返回空集合,表示所有中心都暂停return Collections.emptySet();}// 扫描所有中心暂停状态Set<String> keys = redisTemplate.keys("dc:paused:*");if (CollUtil.isNotEmpty(keys)) {for (String key : keys) {String dcId = key.substring("dc:paused:".length());pausedDcs.add(dcId);}}return pausedDcs;}/*** 获取所有暂停的服务器列表*/public Set<String> getPausedServers() {Set<String> pausedServers = new HashSet<>();// 检查全局暂停String globalPaused = (String) redisTemplate.opsForValue().get(GLOBAL_PAUSED_KEY);if ("true".equals(globalPaused)) {// 如果全局暂停,返回空集合,表示所有服务器都暂停return Collections.emptySet();}// 扫描所有服务器暂停状态Set<String> keys = redisTemplate.keys("server:paused:*");if (CollUtil.isNotEmpty(keys)) {for (String key : keys) {String serverId = key.substring("server:paused:".length());pausedServers.add(serverId);}}return pausedServers;}
}

暂停处理器

@Service
@Slf4j
public class PauseManagementService {@Autowiredprivate ConfigRedisManager redisManager;@Autowiredprivate BatchSetupService batchSetupService;/*** 暂停中心设置任务*/public PauseOperationResult pauseDcSetup(String dcId, String operator) {log.info("操作员 {} 暂停中心设置任务,中心ID: {}", operator, dcId);PauseOperationResult result = new PauseOperationResult("PAUSE", "DC", dcId);try {redisManager.pauseDcSetup(dcId);result.setSuccess(true);result.setMessage("中心设置任务暂停成功");log.info("中心设置任务暂停成功,中心ID: {}", dcId);} catch (Exception e) {log.error("暂停中心设置任务失败,中心ID: {}", dcId, e);result.setSuccess(false);result.setMessage("暂停失败: " + e.getMessage());}return result;}/*** 恢复中心设置任务*/public PauseOperationResult resumeDcSetup(String dcId, String operator) {log.info("操作员 {} 恢复中心设置任务,中心ID: {}", operator, dcId);PauseOperationResult result = new PauseOperationResult("RESUME", "DC", dcId);try {redisManager.resumeDcSetup(dcId);result.setSuccess(true);result.setMessage("中心设置任务恢复成功");log.info("中心设置任务恢复成功,中心ID: {}", dcId);} catch (Exception e) {log.error("恢复中心设置任务失败,中心ID: {}", dcId, e);result.setSuccess(false);result.setMessage("恢复失败: " + e.getMessage());}return result;}/*** 暂停服务器设置任务*/public PauseOperationResult pauseServerSetup(String serverId, String operator) {log.info("操作员 {} 暂停服务器设置任务,服务器ID: {}", operator, serverId);PauseOperationResult result = new PauseOperationResult("PAUSE", "SERVER", serverId);try {redisManager.pauseServerSetup(serverId);result.setSuccess(true);result.setMessage("服务器设置任务暂停成功");log.info("服务器设置任务暂停成功,服务器ID: {}", serverId);} catch (Exception e) {log.error("暂停服务器设置任务失败,服务器ID: {}", serverId, e);result.setSuccess(false);result.setMessage("暂停失败: " + e.getMessage());}return result;}/*** 恢复服务器设置任务*/public PauseOperationResult resumeServerSetup(String serverId, String operator) {log.info("操作员 {} 恢复服务器设置任务,服务器ID: {}", operator, serverId);PauseOperationResult result = new PauseOperationResult("RESUME", "SERVER", serverId);try {redisManager.resumeServerSetup(serverId);result.setSuccess(true);result.setMessage("服务器设置任务恢复成功");log.info("服务器设置任务恢复成功,服务器ID: {}", serverId);} catch (Exception e) {log.error("恢复服务器设置任务失败,服务器ID: {}", serverId, e);result.setSuccess(false);result.setMessage("恢复失败: " + e.getMessage());}return result;}/*** 全局暂停所有设置任务*/public PauseOperationResult pauseGlobalSetup(String operator) {log.info("操作员 {} 全局暂停所有设置任务", operator);PauseOperationResult result = new PauseOperationResult("PAUSE", "GLOBAL", "ALL");try {redisManager.pauseGlobalSetup();result.setSuccess(true);result.setMessage("全局设置任务暂停成功");log.info("全局设置任务暂停成功");} catch (Exception e) {log.error("全局暂停设置任务失败", e);result.setSuccess(false);result.setMessage("全局暂停失败: " + e.getMessage());}return result;}/*** 全局恢复所有设置任务*/public PauseOperationResult resumeGlobalSetup(String operator) {log.info("操作员 {} 全局恢复所有设置任务", operator);PauseOperationResult result = new PauseOperationResult("RESUME", "GLOBAL", "ALL");try {redisManager.resumeGlobalSetup();result.setSuccess(true);result.setMessage("全局设置任务恢复成功");log.info("全局设置任务恢复成功");} catch (Exception e) {log.error("全局恢复设置任务失败", e);result.setSuccess(false);result.setMessage("全局恢复失败: " + e.getMessage());}return result;}/*** 查询全局暂停状态*/public GlobalPauseStatus getGlobalPauseStatus() {GlobalPauseStatus status = new GlobalPauseStatus();try {String globalPaused = (String) redisTemplate.opsForValue().get("global:paused");status.setGlobalPaused("true".equals(globalPaused));if (status.getGlobalPaused()) {// 统计受影响的范围Set<String> allDcs = getAllDcs();Set<String> allServers = getAllServers();status.setAffectedDcs(allDcs.size());status.setAffectedServers(allServers.size());}} catch (Exception e) {log.error("查询全局暂停状态异常", e);status.setGlobalPaused(false);}return status;}/*** 查询暂停状态列表*/public PauseStatusList getPauseStatusList() {PauseStatusList result = new PauseStatusList();try {// 获取全局暂停状态GlobalPauseStatus globalStatus = getGlobalPauseStatus();result.setGlobalPaused(globalStatus.getGlobalPaused());if (!globalStatus.getGlobalPaused()) {// 获取暂停的中心列表Set<String> pausedDcs = redisManager.getPausedDcs();for (String dcId : pausedDcs) {PauseStatus pauseStatus = new PauseStatus(dcId, "DC");pauseStatus.setPaused(true);result.getPausedDcs().add(pauseStatus);}// 获取暂停的服务器列表Set<String> pausedServers = redisManager.getPausedServers();for (String serverId : pausedServers) {PauseStatus pauseStatus = new PauseStatus(serverId, "SERVER");pauseStatus.setPaused(true);result.getPausedServers().add(pauseStatus);}}} catch (Exception e) {log.error("查询暂停状态列表异常", e);}return result;}private Set<String> getAllDcs() {// 实现获取所有中心ID的逻辑// 这里需要根据实际情况实现return Collections.emptySet();}private Set<String> getAllServers() {// 实现获取所有服务器ID的逻辑  // 这里需要根据实际情况实现return Collections.emptySet();}
}@Data
class PauseStatusList {private Boolean globalPaused = false;private List<PauseStatus> pausedDcs = new ArrayList<>();private List<PauseStatus> pausedServers = new ArrayList<>();public Integer getTotalPausedDcs() {return pausedDcs.size();}public Integer getTotalPausedServers() {return pausedServers.size();}
}

配置设置处理器

@Component
@Slf4j
public class ConfigSetupProcessor {@Autowiredprivate ConfigRedisManager redisManager;@Autowiredprivate ExternalApiService externalApiService;/*** 设置服务器的所有配置*/public ConfigSetupResult setupServerConfigs(String serverId) {ConfigSetupResult result = new ConfigSetupResult();result.setServerId(serverId);log.info("开始设置服务器配置,服务器ID: {}", serverId);while (true) {ConfigInfo config = redisManager.getNextPendingConfig(serverId);if (config == null) {break;}long startTime = System.currentTimeMillis();try {// 标记配置为处理中redisManager.markConfigProcessing(config);log.info("开始设置配置,中心ID: {},服务器ID: {},配置ID: {},配置名称: {}", config.getDcId(), config.getServerId(), config.getConfigId(), config.getConfigName());// 执行配置设置boolean success = setupSingleConfig(config);if (success) {redisManager.markConfigCompleted(config);result.recordSuccess();log.info("配置设置成功,服务器ID: {},配置ID: {},配置名称: {},耗时: {}ms", serverId, config.getConfigId(), config.getConfigName(), System.currentTimeMillis() - startTime);} else {String errorMsg = "配置设置失败";redisManager.markConfigFailed(config, errorMsg);result.recordFailure(config.getConfigId(), config.getConfigName(), errorMsg);log.error("配置设置失败,服务器ID: {},配置ID: {},配置名称: {},耗时: {}ms", serverId, config.getConfigId(), config.getConfigName(), System.currentTimeMillis() - startTime);}} catch (Exception e) {String errorMsg = "配置设置异常: " + e.getMessage();redisManager.markConfigFailed(config, errorMsg);result.recordFailure(config.getConfigId(), config.getConfigName(), errorMsg);log.error("配置设置异常,服务器ID: {},配置ID: {},配置名称: {},异常: {},耗时: {}ms", serverId, config.getConfigId(), config.getConfigName(), e.getMessage(), System.currentTimeMillis() - startTime, e);}// 配置处理间隔30秒if (redisManager.hasPendingConfigs(serverId)) {safeSleep(30000);}}result.setMessage(String.format("服务器设置完成,总配置数: %d,成功: %d,失败: %d", result.getTotalCount(), result.getSuccessCount(), result.getFailureCount()));log.info("服务器配置设置完成,服务器ID: {},结果: {}", serverId, result.getMessage());return result;}/*** 设置单个配置*/private boolean setupSingleConfig(ConfigInfo config) {try {// 调用外部API设置配置ConfigSetupRequest setupRequest = new ConfigSetupRequest();setupRequest.setServerId(config.getServerId());setupRequest.setConfigId(config.getConfigId());// 设置配置内容String configContent = buildConfigContent();setupRequest.setConfigContent(configContent);ApiResponse response = externalApiService.setupConfig(setupRequest);if (!response.isSuccess()) {log.error("设置配置失败,中心ID: {},服务器ID: {},配置ID: {},配置名称: {},错误信息: {}",config.getDcId(), config.getServerId(), config.getConfigId(), config.getConfigName(), response.getMessage());return false;}log.info("设置配置成功,中心ID: {},服务器ID: {},配置ID: {},配置名称: {}",config.getDcId(), config.getServerId(), config.getConfigId(), config.getConfigName());return true;} catch (Exception e) {log.error("设置配置异常,中心ID: {},服务器ID: {},配置ID: {},配置名称: {}",config.getDcId(), config.getServerId(), config.getConfigId(), config.getConfigName(), e);return false;}}/*** 构建配置内容*/private String buildConfigContent() {// 示例配置内容return "config setup content";}private void safeSleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.warn("配置处理间隔睡眠被中断");}}/*** 设置服务器的所有配置*/public ConfigSetupResult setupServerConfigs(String serverId) {ConfigSetupResult result = new ConfigSetupResult();result.setServerId(serverId);log.info("开始设置服务器配置,服务器ID: {}", serverId);// 检查服务器是否被暂停if (redisManager.isServerPaused(serverId)) {result.setMessage("服务器设置任务已被暂停");log.info("服务器设置任务被暂停,跳过处理,服务器ID: {}", serverId);return result;}while (true) {// 每次处理前检查暂停状态if (redisManager.isServerPaused(serverId)) {result.setMessage("服务器设置任务在运行中被暂停");log.info("服务器设置任务在运行中被暂停,服务器ID: {}", serverId);break;}ConfigInfo config = redisManager.getNextPendingConfig(serverId);if (config == null) {break;}long startTime = System.currentTimeMillis();try {// 标记配置为处理中redisManager.markConfigProcessing(config);log.info("开始设置配置,中心ID: {},服务器ID: {},配置ID: {},配置名称: {}", config.getDcId(), config.getServerId(), config.getConfigId(), config.getConfigName());// 执行配置设置boolean success = setupSingleConfig(config);if (success) {redisManager.markConfigCompleted(config);result.recordSuccess();log.info("配置设置成功,服务器ID: {},配置ID: {},配置名称: {},耗时: {}ms", serverId, config.getConfigId(), config.getConfigName(), System.currentTimeMillis() - startTime);} else {String errorMsg = "配置设置失败";redisManager.markConfigFailed(config, errorMsg);result.recordFailure(config.getConfigId(), config.getConfigName(), errorMsg);log.error("配置设置失败,服务器ID: {},配置ID: {},配置名称: {},耗时: {}ms", serverId, config.getConfigId(), config.getConfigName(), System.currentTimeMillis() - startTime);}} catch (Exception e) {String errorMsg = "配置设置异常: " + e.getMessage();redisManager.markConfigFailed(config, errorMsg);result.recordFailure(config.getConfigId(), config.getConfigName(), errorMsg);log.error("配置设置异常,服务器ID: {},配置ID: {},配置名称: {},异常: {},耗时: {}ms", serverId, config.getConfigId(), config.getConfigName(), e.getMessage(), System.currentTimeMillis() - startTime, e);}// 配置处理间隔30秒if (redisManager.hasPendingConfigs(serverId)) {safeSleep(30000);}}if (result.getMessage() == null) {result.setMessage(String.format("服务器设置完成,总配置数: %d,成功: %d,失败: %d", result.getTotalCount(), result.getSuccessCount(), result.getFailureCount()));}log.info("服务器配置设置完成,服务器ID: {},结果: {}", serverId, result.getMessage());return result;}}@Data
class ConfigSetupResult {private String serverId;private int totalCount = 0;private int successCount = 0;private int failureCount = 0;private String message;private List<FailedConfig> failedConfigs = new ArrayList<>();public void recordSuccess() {totalCount++;successCount++;}public void recordFailure(String configId, String configName, String errorMsg) {totalCount++;failureCount++;FailedConfig failedConfig = new FailedConfig();failedConfig.setConfigId(configId);failedConfig.setConfigName(configName);failedConfig.setErrorMsg(errorMsg);failedConfig.setFailTime(System.currentTimeMillis());failedConfigs.add(failedConfig);}public boolean isAllSuccess() {return failureCount == 0;}
}@Data
class FailedConfig {private String configId;private String configName;private String errorMsg;private Long failTime;
}

批量设置服务

@Service
@Slf4j
public class BatchSetupService {@Autowiredprivate ConfigRedisManager redisManager;@Autowiredprivate ConfigSetupProcessor configSetupProcessor;@Autowiredprivate ThreadPoolTaskExecutor setupExecutor;/*** 触发中心配置设置(异步)*/public SetupTaskResult setupDcConfigs(String dcId) {log.info("开始触发中心配置设置,中心ID: {}", dcId);SetupTaskResult result = new SetupTaskResult();result.setDcId(dcId);result.setTaskId(generateTaskId());result.setStartTime(new Date());result.setStatus("STARTED");result.setMessage("设置任务已启动");CompletableFuture.runAsync(() -> {processDcSetup(dcId, result);}, setupExecutor);log.info("中心配置设置任务已提交,中心ID: {},任务ID: {}", dcId, result.getTaskId());return result;}/*** 处理中心设置*/private void processDcSetup(String dcId, SetupTaskResult result) {try {log.info("开始处理中心设置,中心ID: {},任务ID: {}", dcId, result.getTaskId());// 获取中心服务器列表Set<String> serverIds = redisManager.getDcServers(dcId);if (CollUtil.isEmpty(serverIds)) {result.setStatus("COMPLETED");result.setMessage("中心无服务器");result.setEndTime(new Date());return;}result.setTotalServers(serverIds.size());result.setStatus("PROCESSING");result.setMessage("开始处理 " + serverIds.size() + " 个服务器");// 并发处理服务器(控制并发数)int concurrentServers = 5;List<List<String>> serverBatches = Lists.partition(new ArrayList<>(serverIds), concurrentServers);int processedServers = 0;// 同时,实现分配处理的能力for (int i = 0; i < serverBatches.size(); i++) {List<String> batch = serverBatches.get(i);log.info("开始处理第 {}/{} 批服务器数: {},中心ID: {}", i + 1, serverBatches.size(), batch.size(), dcId);// 并发处理当前批次服务器List<CompletableFuture<ServerSetupResult>> futures = batch.stream().map(serverId -> setupServerAsync(serverId, dcId)).collect(Collectors.toList());CompletableFuture<Void> batchFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));try {batchFuture.get(30, TimeUnit.MINUTES);// 收集结果for (CompletableFuture<ServerSetupResult> future : futures) {ServerSetupResult serverResult = future.get();if (serverResult.isSuccess()) {result.getSuccessServers().add(serverResult.getServerId());} else {result.getFailedServers().add(serverResult.getServerId());}}processedServers += batch.size();result.setProcessedServers(processedServers);log.info("第 {} 批次处理完成,中心ID: {},成功服务器: {},失败服务器: {}", i + 1, dcId, result.getSuccessServers().size(), result.getFailedServers().size());} catch (TimeoutException e) {log.error("第 {} 批次处理超时,中心ID: {}", i + 1, dcId, e);for (String serverId : batch) {result.getFailedServers().add(serverId);}}// 批次间间隔if (i < serverBatches.size() - 1) {safeSleep(30000);}}result.setStatus("COMPLETED");result.setEndTime(new Date());result.setMessage(String.format("中心设置完成,成功: %d,失败: %d", result.getSuccessServers().size(), result.getFailedServers().size()));logDcCompletion(dcId, result);} catch (Exception e) {log.error("中心设置处理异常,中心ID: {}", dcId, e);result.setStatus("FAILED");result.setEndTime(new Date());result.setMessage("处理异常: " + e.getMessage());}}/*** 异步设置单个服务器*/private CompletableFuture<ServerSetupResult> setupServerAsync(String serverId, String dcId) {return CompletableFuture.supplyAsync(() -> {ServerSetupResult result = new ServerSetupResult();result.setServerId(serverId);result.setDcId(dcId);result.setStartTime(new Date());try {log.info("开始设置服务器配置,中心ID: {},服务器ID: {}", dcId, serverId);ConfigSetupResult setupResult = configSetupProcessor.setupServerConfigs(serverId);result.setSuccess(setupResult.isAllSuccess());result.setTotalConfigs(setupResult.getTotalCount());result.setSuccessConfigs(setupResult.getSuccessCount());result.setFailedConfigs(setupResult.getFailureCount());result.setMessage(setupResult.getMessage());if (setupResult.isAllSuccess()) {log.info("服务器配置设置成功,中心ID: {},服务器ID: {},配置数: {}", dcId, serverId, setupResult.getTotalCount());} else {log.warn("服务器配置设置完成但有失败,中心ID: {},服务器ID: {},成功: {},失败: {}", dcId, serverId, setupResult.getSuccessCount(), setupResult.getFailureCount());}} catch (Exception e) {log.error("服务器配置设置异常,中心ID: {},服务器ID: {}", dcId, serverId, e);result.setSuccess(false);result.setMessage("设置异常: " + e.getMessage());}result.setEndTime(new Date());return result;}, setupExecutor);}/*** 重试中心下所有服务器的失败配置*/public DcRetryResult retryDcFailedConfigs(String dcId) {log.info("开始重试中心下所有服务器的失败配置,中心ID: {}", dcId);DcRetryResult result = new DcRetryResult(dcId);result.setTaskId(generateTaskId());result.setStatus("STARTED");result.setMessage("中心重试任务已启动");CompletableFuture.runAsync(() -> {processDcRetry(dcId, result);}, setupExecutor);log.info("中心重试任务已提交,中心ID: {},任务ID: {}", dcId, result.getTaskId());return result;}private void processDcRetry(String dcId, DcRetryResult result) {try {Set<String> serverIds = redisManager.getDcServers(dcId);if (CollUtil.isEmpty(serverIds)) {result.setStatus("COMPLETED");result.setMessage("中心无服务器");result.setEndTime(new Date());return;}result.setTotalServers(serverIds.size());result.setStatus("PROCESSING");for (String serverId : serverIds) {try {redisManager.retryFailedConfigs(serverId);result.setRetriedServers(result.getRetriedServers() + 1);result.setSuccessServers(result.getSuccessServers() + 1);log.info("服务器重试成功,中心ID: {},服务器ID: {}", dcId, serverId);} catch (Exception e) {log.error("服务器重试失败,中心ID: {},服务器ID: {}", dcId, serverId, e);result.setRetriedServers(result.getRetriedServers() + 1);result.setFailedServers(result.getFailedServers() + 1);}}result.setStatus("COMPLETED");result.setEndTime(new Date());result.setMessage(String.format("中心重试完成,总服务器: %d,重试服务器: %d,成功: %d,失败: %d", result.getTotalServers(), result.getRetriedServers(),result.getSuccessServers(), result.getFailedServers()));logDcRetryCompletion(dcId, result);} catch (Exception e) {log.error("中心重试处理异常,中心ID: {}", dcId, e);result.setStatus("FAILED");result.setEndTime(new Date());result.setMessage("处理异常: " + e.getMessage());}}private String generateTaskId() {return "TASK_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);}private void safeSleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void logDcCompletion(String dcId, SetupTaskResult result) {long duration = result.getEndTime().getTime() - result.getStartTime().getTime();String logMsg = String.format("中心设置完成 - 中心ID: %s, 总服务器数: %d, 成功服务器: %d, 失败服务器: %d, 总耗时: %dms",dcId, result.getTotalServers(), result.getSuccessServers().size(),result.getFailedServers().size(), duration);log.info(logMsg);}private void logDcRetryCompletion(String dcId, DcRetryResult result) {long duration = result.getEndTime().getTime() - result.getStartTime().getTime();String logMsg = String.format("中心重试完成 - 中心ID: %s, 总服务器数: %d, 重试服务器: %d, 成功服务器: %d, 失败服务器: %d, 总耗时: %dms",dcId, result.getTotalServers(), result.getRetriedServers(),result.getSuccessServers(), result.getFailedServers(), duration);log.info(logMsg);}/*** 处理中心设置(增强版,支持暂停检查)*/private void processDcSetup(String dcId, SetupTaskResult result) {try {log.info("开始处理中心设置,中心ID: {},任务ID: {}", dcId, result.getTaskId());// 检查中心是否被暂停if (redisManager.isDcPaused(dcId)) {result.setStatus("PAUSED");result.setMessage("中心设置任务已被暂停");result.setEndTime(new Date());log.info("中心设置任务被暂停,中心ID: {}", dcId);return;}// 获取中心服务器列表Set<String> serverIds = redisManager.getDcServers(dcId);if (CollUtil.isEmpty(serverIds)) {result.setStatus("COMPLETED");result.setMessage("中心无服务器");result.setEndTime(new Date());return;}result.setTotalServers(serverIds.size());result.setStatus("PROCESSING");result.setMessage("开始处理 " + serverIds.size() + " 个服务器");// 并发处理服务器(控制并发数)int concurrentServers = 5;List<List<String>> serverBatches = Lists.partition(new ArrayList<>(serverIds), concurrentServers);int processedServers = 0;for (int i = 0; i < serverBatches.size(); i++) {// 每次批次处理前检查暂停状态if (redisManager.isDcPaused(dcId)) {result.setStatus("PAUSED");result.setMessage("中心设置任务在运行中被暂停");result.setEndTime(new Date());log.info("中心设置任务在运行中被暂停,中心ID: {}", dcId);return;}List<String> batch = serverBatches.get(i);log.info("处理第 {}/{} 批次,服务器数: {},中心ID: {}", i + 1, serverBatches.size(), batch.size(), dcId);// 并发处理当前批次服务器List<CompletableFuture<ServerSetupResult>> futures = batch.stream().map(serverId -> setupServerAsync(serverId, dcId)).collect(Collectors.toList());CompletableFuture<Void> batchFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));try {batchFuture.get(30, TimeUnit.MINUTES);// 收集结果for (CompletableFuture<ServerSetupResult> future : futures) {ServerSetupResult serverResult = future.get();if (serverResult.isSuccess()) {result.getSuccessServers().add(serverResult.getServerId());} else {result.getFailedServers().add(serverResult.getServerId());}}processedServers += batch.size();result.setProcessedServers(processedServers);log.info("第 {} 批次处理完成,中心ID: {},成功服务器: {},失败服务器: {}", i + 1, dcId, result.getSuccessServers().size(), result.getFailedServers().size());} catch (TimeoutException e) {log.error("第 {} 批次处理超时,中心ID: {}", i + 1, dcId, e);for (String serverId : batch) {result.getFailedServers().add(serverId);}}// 批次间间隔if (i < serverBatches.size() - 1) {safeSleep(30000);}}result.setStatus("COMPLETED");result.setEndTime(new Date());result.setMessage(String.format("中心设置完成,成功: %d,失败: %d", result.getSuccessServers().size(), result.getFailedServers().size()));logDcCompletion(dcId, result);} catch (Exception e) {log.error("中心设置处理异常,中心ID: {}", dcId, e);result.setStatus("FAILED");result.setEndTime(new Date());result.setMessage("处理异常: " + e.getMessage());}}
}@Data
class SetupTaskResult {private String taskId;private String dcId;private String status;private String message;private Date startTime;private Date endTime;private Integer totalServers = 0;private Integer processedServers = 0;private List<String> successServers = new ArrayList<>();private List<String> failedServers = new ArrayList<>();
}@Data
class ServerSetupResult {private String serverId;private String dcId;private Boolean success;private String message;private Date startTime;private Date endTime;private Integer totalConfigs = 0;private Integer successConfigs = 0;private Integer failedConfigs = 0;
}@Data
class DcRetryResult {private String dcId;private String taskId;private String status;private String message;private Date startTime;private Date endTime;private Integer totalServers = 0;private Integer retriedServers = 0;private Integer successServers = 0;private Integer failedServers = 0;public DcRetryResult(String dcId) {this.dcId = dcId;this.startTime = new Date();}
}

查询服务

@Service
@Slf4j
public class ConfigQueryService {@Autowiredprivate ConfigRedisManager redisManager;@Autowiredprivate ServerService serverService;/*** 查询中心所有服务器设置完成状态*/public DcCompletionStatus getDcCompletionStatus(String dcId) {log.info("查询中心服务器设置完成状态,中心ID: {}", dcId);DcCompletionStatus status = new DcCompletionStatus(dcId);try {Set<String> serverIds = redisManager.getDcServers(dcId);if (CollUtil.isEmpty(serverIds)) {setEmptyStatus(status);return status;}status.setTotalServers(serverIds.size());long totalConfigs = 0;long completedConfigs = 0;for (String serverId : serverIds) {ServerCompletionDetail serverDetail = getServerCompletionDetail(serverId);status.getServerDetails().add(serverDetail);updateStatusCounters(status, serverDetail);if (serverDetail.getTotalConfigs() != null) {totalConfigs += serverDetail.getTotalConfigs();completedConfigs += serverDetail.getCompletedConfigs();}}calculateCompletionStatus(status, totalConfigs, completedConfigs);} catch (Exception e) {log.error("查询中心设置完成状态异常,中心ID: {}", dcId, e);status.setMessage("查询异常: " + e.getMessage());}return status;}/*** 查询中心下所有未完成设置的服务器列表*/public List<ServerCompletionDetail> getDcIncompleteServers(String dcId) {log.info("查询中心所有未完成设置服务器列表,中心ID: {}", dcId);List<ServerCompletionDetail> incompleteServers = new ArrayList<>();try {Set<String> serverIds = redisManager.getDcServers(dcId);if (CollUtil.isEmpty(serverIds)) {return incompleteServers;}for (String serverId : serverIds) {ServerCompletionDetail serverDetail = getServerCompletionDetail(serverId);if (serverDetail != null && !serverDetail.isCompleted()) {incompleteServers.add(serverDetail);}}log.info("中心未完成服务器列表查询完成,中心ID: {},未完成服务器数: {}", dcId, incompleteServers.size());} catch (Exception e) {log.error("查询中心未完成服务器列表异常,中心ID: {}", dcId, e);throw new RuntimeException("查询中心未完成服务器列表失败: " + e.getMessage(), e);}return incompleteServers;}private ServerCompletionDetail getServerCompletionDetail(String serverId) {ServerCompletionDetail detail = new ServerCompletionDetail();detail.setServerId(serverId);try {ServerInfo serverInfo = serverService.getServerById(serverId);if (serverInfo != null) {detail.setServerName(serverInfo.getServerName());}ConfigStatusStats stats = redisManager.getConfigStatusStats(serverId);if (stats != null) {mapStatsToDetail(detail, stats);}detail.setStatus(determineServerStatus(stats));detail.setCompletionRate(calculateCompletionRate(detail));detail.setLastUpdateTime(new Date());} catch (Exception e) {log.error("获取服务器完成详情异常,服务器ID: {}", serverId, e);detail.setStatus("ERROR");detail.setLastUpdateTime(new Date());}return detail;}private void mapStatsToDetail(ServerCompletionDetail detail, ConfigStatusStats stats) {detail.setTotalConfigs(stats.getTotalCount().intValue());detail.setCompletedConfigs(stats.getCompletedCount().intValue());detail.setProcessingConfigs(stats.getProcessingCount().intValue());detail.setPendingConfigs(stats.getPendingCount().intValue());detail.setFailedConfigs(stats.getFailedCount().intValue());}private String determineServerStatus(ConfigStatusStats stats) {if (stats == null) return "UNKNOWN";if (stats.getPendingCount() > 0) return "PENDING";else if (stats.getProcessingCount() > 0) return "PROCESSING";else if (stats.getFailedCount() > 0) return "FAILED";else return "COMPLETED";}private String calculateCompletionRate(ServerCompletionDetail detail) {if (detail.getTotalConfigs() == null || detail.getTotalConfigs() == 0) return "0%";if (detail.getCompletedConfigs() == null) return "0%";double rate = (double) detail.getCompletedConfigs() / detail.getTotalConfigs() * 100;return String.format("%.2f%%", rate);}private void updateStatusCounters(DcCompletionStatus status, ServerCompletionDetail serverDetail) {switch (serverDetail.getStatus()) {case "COMPLETED": status.setCompletedServers(status.getCompletedServers() + 1); break;case "PROCESSING": status.setProcessingServers(status.getProcessingServers() + 1); break;case "PENDING": status.setPendingServers(status.getPendingServers() + 1); break;case "FAILED": status.setFailedServers(status.getFailedServers() + 1); break;}}private void calculateCompletionStatus(DcCompletionStatus status, long totalConfigs, long completedConfigs) {double completionRate = totalConfigs > 0 ? (double) completedConfigs / totalConfigs * 100 : 0;status.setCompletionRate(String.format("%.2f%%", completionRate));status.setAllCompleted(status.getProcessingServers() == 0 && status.getPendingServers() == 0);}private void setEmptyStatus(DcCompletionStatus status) {status.setAllCompleted(true);status.setCompletionRate("100%");status.setTotalServers(0);status.setCompletedServers(0);status.setProcessingServers(0);status.setPendingServers(0);status.setFailedServers(0);status.setMessage("中心无服务器");}
}@Data
class DcCompletionStatus {private String dcId;private Boolean allCompleted = false;private String completionRate = "0%";private Integer totalServers = 0;private Integer completedServers = 0;private Integer processingServers = 0;private Integer pendingServers = 0;private Integer failedServers = 0;private Date checkTime;private List<ServerCompletionDetail> serverDetails = new ArrayList<>();private String message;public DcCompletionStatus(String dcId) {this.dcId = dcId;this.checkTime = new Date();}
}

配置类

@Configuration
public class ThreadPoolConfig {@Bean("setupExecutor")public ThreadPoolTaskExecutor setupExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(20);executor.setQueueCapacity(100);executor.setThreadNamePrefix("config-setup-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);executor.initialize();return executor;}
}

API接口定义

初始化类相关API

@FeignClient(contextId = "configInitApi", value = "demo-service",path = "/api/demo/init")
public interface ConfigInitApi {@Operation(summary = "初始化中心服务器", description = "initDcServers")@PostMapping("/dc/{dcId}/servers")Result<Boolean> initDcServers(@PathVariable("dcId") String dcId);@Operation(summary = "初始化服务器配置", description = "initServerConfigs")@PostMapping("/server/{serverId}")Result<Boolean> initServerConfigs(@PathVariable("serverId") String serverId, @RequestParam("dcId") String dcId);
}

设置类相关API

@FeignClient(contextId = "configSetupApi", value = "demo-service",path = "/api/demo/setup")
public interface ConfigSetupApi {@Operation(summary = "触发中心配置设置", description = "setupDcConfigs")@PostMapping("/dc/{dcId}")Result<SetupTaskResult> setupDcConfigs(@PathVariable("dcId") String dcId);@Operation(summary = "重试服务器失败配置", description = "retryServerFailedConfigs")@PostMapping("/server/{serverId}/retry")Result<Boolean> retryServerFailedConfigs(@PathVariable("serverId") String serverId);@Operation(summary = "重试中心下所有服务器的失败配置", description = "retryDcFailedConfigs")@PostMapping("/dc/{dcId}/retry")Result<DcRetryResult> retryDcFailedConfigs(@PathVariable("dcId") String dcId);
}

查询类相关API

@FeignClient(contextId = "configQueryApi", value = "demo-service",path = "/api/demo/query")
public interface ConfigQueryApi {@Operation(summary = "查询中心所有服务器设置完成状态", description = "getDcCompletionStatus")@GetMapping("/dc/{dcId}/completion-status")Result<DcCompletionStatus> getDcCompletionStatus(@PathVariable("dcId") String dcId);@Operation(summary = "查询中心下所有未完成设置的服务器列表", description = "getDcIncompleteServers")@GetMapping("/dc/{dcId}/incomplete-servers")Result<List<ServerCompletionDetail>> getDcIncompleteServers(@PathVariable("dcId") String dcId);
}

查询类相关API

@FeignClient(contextId = "configPauseApi", value = "demo-service",path = "/api/demo/pause")
public interface ConfigPauseApi {@Operation(summary = "暂停中心设置任务", description = "pauseDcSetup")@PostMapping("/dc/{dcId}/pause")Result<PauseOperationResult> pauseDcSetup(@PathVariable("dcId") String dcId,@RequestParam("operator") String operator);@Operation(summary = "恢复中心设置任务", description = "resumeDcSetup")@PostMapping("/dc/{dcId}/resume")Result<PauseOperationResult> resumeDcSetup(@PathVariable("dcId") String dcId,@RequestParam("operator") String operator);@Operation(summary = "暂停服务器设置任务", description = "pauseServerSetup")@PostMapping("/server/{serverId}/pause")Result<PauseOperationResult> pauseServerSetup(@PathVariable("serverId") String serverId,@RequestParam("operator") String operator);@Operation(summary = "恢复服务器设置任务", description = "resumeServerSetup")@PostMapping("/server/{serverId}/resume")Result<PauseOperationResult> resumeServerSetup(@PathVariable("serverId") String serverId,@RequestParam("operator") String operator);@Operation(summary = "全局暂停所有设置任务", description = "pauseGlobalSetup")@PostMapping("/global/pause")Result<PauseOperationResult> pauseGlobalSetup(@RequestParam("operator") String operator);@Operation(summary = "全局恢复所有设置任务", description = "resumeGlobalSetup")@PostMapping("/global/resume")Result<PauseOperationResult> resumeGlobalSetup(@RequestParam("operator") String operator);@Operation(summary = "查询暂停状态列表", description = "getPauseStatusList")@GetMapping("/status")Result<PauseStatusList> getPauseStatusList();
}

控制类代码

配置初始化控制器 (ConfigInitController)

@RestController
@RequestMapping("/api/demo/init")
@Validated
@Slf4j
public class ConfigInitController {@Autowiredprivate ConfigRedisManager configRedisManager;@PostMapping("/dc/{dcId}/servers")public Result<Boolean> initDcServers(@PathVariable("dcId") @NotBlank String dcId) {log.info("初始化中心服务器, dcId: {}", dcId);try {boolean success = configRedisManager.initDcServers(dcId);return Result.success(success);} catch (Exception e) {log.error("初始化中心服务器失败, dcId: {}", dcId, e);return Result.error("初始化中心服务器失败: " + e.getMessage());}}@PostMapping("/server/{serverId}")public Result<Boolean> initServerConfigs(@PathVariable("serverId") @NotBlank String serverId,@RequestParam("dcId") @NotBlank String dcId) {log.info("初始化服务器配置, serverId: {}, dcId: {}", serverId, dcId);try {boolean success = configRedisManager.initServerConfigs(serverId, dcId);return Result.success(success);} catch (Exception e) {log.error("初始化服务器配置失败, serverId: {}, dcId: {}", serverId, dcId, e);return Result.error("初始化服务器配置失败: " + e.getMessage());}}
}

配置设置控制器 (ConfigSetupController)

@RestController
@RequestMapping("/api/demo/setup")
@Validated
@Slf4j
public class ConfigSetupController {@Autowiredprivate ConfigSetupProcessor configSetupProcessor;@PostMapping("/dc/{dcId}")public Result<SetupTaskResult> setupDcConfigs(@PathVariable("dcId") @NotBlank String dcId) {log.info("触发中心配置设置, dcId: {}", dcId);try {SetupTaskResult result = configSetupProcessor.setupDcConfigs(dcId);return Result.success(result);} catch (Exception e) {log.error("触发中心配置设置失败, dcId: {}", dcId, e);return Result.error("触发中心配置设置失败: " + e.getMessage());}}@PostMapping("/server/{serverId}/retry")public Result<Boolean> retryServerFailedConfigs(@PathVariable("serverId") @NotBlank String serverId) {log.info("重试服务器失败配置, serverId: {}", serverId);try {boolean success = configSetupProcessor.retryServerFailedConfigs(serverId);return Result.success(success);} catch (Exception e) {log.error("重试服务器失败配置失败, serverId: {}", serverId, e);return Result.error("重试服务器失败配置失败: " + e.getMessage());}}@PostMapping("/dc/{dcId}/retry")public Result<DcRetryResult> retryDcFailedConfigs(@PathVariable("dcId") @NotBlank String dcId) {log.info("重试中心下所有服务器的失败配置, dcId: {}", dcId);try {DcRetryResult result = configSetupProcessor.retryDcFailedConfigs(dcId);return Result.success(result);} catch (Exception e) {log.error("重试中心失败配置失败, dcId: {}", dcId, e);return Result.error("重试中心失败配置失败: " + e.getMessage());}}
}

配置查询控制器 (ConfigQueryController)

@RestController
@RequestMapping("/api/demo/query")
@Validated
@Slf4j
public class ConfigQueryController {@Autowiredprivate ConfigQueryService configQueryService;@GetMapping("/dc/{dcId}/completion-status")public Result<DcCompletionStatus> getDcCompletionStatus(@PathVariable("dcId") @NotBlank String dcId) {log.info("查询中心所有服务器设置完成状态, dcId: {}", dcId);try {DcCompletionStatus status = configQueryService.getDcCompletionStatus(dcId);return Result.success(status);} catch (Exception e) {log.error("查询中心完成状态失败, dcId: {}", dcId, e);return Result.error("查询中心完成状态失败: " + e.getMessage());}}@GetMapping("/dc/{dcId}/incomplete-servers")public Result<List<ServerCompletionDetail>> getDcIncompleteServers(@PathVariable("dcId") @NotBlank String dcId) {log.info("查询中心下所有未完成设置的服务器列表, dcId: {}", dcId);try {List<ServerCompletionDetail> incompleteServers = configQueryService.getDcIncompleteServers(dcId);return Result.success(incompleteServers);} catch (Exception e) {log.error("查询中心未完成服务器列表失败, dcId: {}", dcId, e);return Result.error("查询中心未完成服务器列表失败: " + e.getMessage());}}
}

配置暂停控制器 (ConfigPauseController)

@RestController
@RequestMapping("/api/demo/pause")
@Validated
@Slf4j
public class ConfigPauseController {@Autowiredprivate PauseManagementService pauseManagementService;@PostMapping("/dc/{dcId}/pause")public Result<PauseOperationResult> pauseDcSetup(@PathVariable("dcId") @NotBlank String dcId,@RequestParam("operator") @NotBlank String operator) {log.info("暂停中心设置任务, dcId: {}, operator: {}", dcId, operator);try {PauseOperationResult result = pauseManagementService.pauseDcSetup(dcId, operator);return Result.success(result);} catch (Exception e) {log.error("暂停中心设置任务失败, dcId: {}", dcId, e);return Result.error("暂停中心设置任务失败: " + e.getMessage());}}@PostMapping("/dc/{dcId}/resume")public Result<PauseOperationResult> resumeDcSetup(@PathVariable("dcId") @NotBlank String dcId,@RequestParam("operator") @NotBlank String operator) {log.info("恢复中心设置任务, dcId: {}, operator: {}", dcId, operator);try {PauseOperationResult result = pauseManagementService.resumeDcSetup(dcId, operator);return Result.success(result);} catch (Exception e) {log.error("恢复中心设置任务失败, dcId: {}", dcId, e);return Result.error("恢复中心设置任务失败: " + e.getMessage());}}@PostMapping("/server/{serverId}/pause")public Result<PauseOperationResult> pauseServerSetup(@PathVariable("serverId") @NotBlank String serverId,@RequestParam("operator") @NotBlank String operator) {log.info("暂停服务器设置任务, serverId: {}, operator: {}", serverId, operator);try {PauseOperationResult result = pauseManagementService.pauseServerSetup(serverId, operator);return Result.success(result);} catch (Exception e) {log.error("暂停服务器设置任务失败, serverId: {}", serverId, e);return Result.error("暂停服务器设置任务失败: " + e.getMessage());}}@PostMapping("/server/{serverId}/resume")public Result<PauseOperationResult> resumeServerSetup(@PathVariable("serverId") @NotBlank String serverId,@RequestParam("operator") @NotBlank String operator) {log.info("恢复服务器设置任务, serverId: {}, operator: {}", serverId, operator);try {PauseOperationResult result = pauseManagementService.resumeServerSetup(serverId, operator);return Result.success(result);} catch (Exception e) {log.error("恢复服务器设置任务失败, serverId: {}", serverId, e);return Result.error("恢复服务器设置任务失败: " + e.getMessage());}}@PostMapping("/global/pause")public Result<PauseOperationResult> pauseGlobalSetup(@RequestParam("operator") @NotBlank String operator) {log.info("全局暂停所有设置任务, operator: {}", operator);try {PauseOperationResult result = pauseManagementService.pauseGlobalSetup(operator);return Result.success(result);} catch (Exception e) {log.error("全局暂停所有设置任务失败", e);return Result.error("全局暂停所有设置任务失败: " + e.getMessage());}}@PostMapping("/global/resume")public Result<PauseOperationResult> resumeGlobalSetup(@RequestParam("operator") @NotBlank String operator) {log.info("全局恢复所有设置任务, operator: {}", operator);try {PauseOperationResult result = pauseManagementService.resumeGlobalSetup(operator);return Result.success(result);} catch (Exception e) {log.error("全局恢复所有设置任务失败", e);return Result.error("全局恢复所有设置任务失败: " + e.getMessage());}}@GetMapping("/status")public Result<PauseStatusList> getPauseStatusList() {log.info("查询暂停状态列表");try {PauseStatusList statusList = pauseManagementService.getPauseStatusList();return Result.success(statusList);} catch (Exception e) {log.error("查询暂停状态列表失败", e);return Result.error("查询暂停状态列表失败: " + e.getMessage());}}
}

性能优化建议

Redis优化

  • 使用Redis集群提高可用性
  • 合理设置Key过期时间
  • 使用Pipeline批量操作
  • 监控Redis内存使用情况

系统优化

  • 根据服务器数量动态调整线程池大小
  • 实现优雅停机机制
  • 添加熔断器和限流器
  • 完善监控和告警体系

总结

本技术方案基于Redis实现了分布式配置批量设置系统,具有以下特点:

  1. 高可用性:基于Redis的分布式架构,避免单点故障
  2. 可扩展性:支持水平扩展,可处理大规模服务器
  3. 实时监控:提供完整的进度监控和状态查询
  4. 容错机制:支持多级重试和故障恢复
  5. 性能优化:合理的并发控制和间隔策略
http://www.dtcms.com/a/503254.html

相关文章:

  • STM32中硬件I2C的时钟占空比
  • iFlutter --> Flutter 开发者 的 IntelliJ IDEA / Android Studio 插件
  • Easyx图形库应用(和lua结合使用)
  • 网站建设计划表模板网络运营需要学什么专业
  • Scrapy 框架入门:高效搭建爬虫项目
  • 【JVM】详解 垃圾回收
  • 【前端魔法】实现网站一键切换主题
  • 电子 东莞网站建设wordpress 图片服务器配置
  • Spring Boot 3零基础教程,WEB 开发 通过配置类代码方式修改静态资源配置 笔记31
  • Vue模块与组件、模块化与组件化
  • SiriKali,一款跨平台的加密文件管理器
  • 构建优雅的 Spring Boot Starter:深入掌握国际化与配置覆盖的最佳实践
  • 网站建设的意义单页式网站
  • 易申建设网站兼职做ps网站
  • 力扣(LeetCode) ——11.盛水最多的容器(C++)
  • word插入页码?【图文详解】word怎么插入页码?word页码设置?
  • Leetcode 3719. Longest Balanced Subarray I
  • Rust unsafe
  • 辽宁省建设工程造价管理协会网站wordpress登陆按钮
  • 【pulldown-cmark】 初学者指南
  • [嵌入式系统-140]:Android以及其衍射版本都是基于Linux,Ubuntu、Rehat也都是基于Linux,异同进行比较。
  • GitLab 代码基础操作清单
  • 深度学习经典分类(算法分析与案例)
  • 做网站的叫什么百度seo引流怎么做
  • js,后端,css记录
  • 云服务器部署Python后端偶遇`ImportError`: 从依赖版本到Python升级的排错全攻略
  • 使用AI编程工具的“经济学”:成本控制与性能优化策略
  • 免费网站收录运营一个app大概多少钱
  • void编辑器+mcpsever本地demo部署
  • 企业做网站设计页面图片