当前位置: 首页 > news >正文

具有熔断能力和活性探测的服务负载均衡解决方案

一、整体架构设计

1.核心组件
  • 负载均衡器:负责选择可用的服务节点
  • 健康检查器:定期检测服务节点的可用性
  • 服务节点管理:维护所有可用节点的状态信息
2.负载均衡策略
  • 轮询(Round Robin)
  • 随机(Random)
  • 加权轮询(Weighted Round Robin)
  • 最少连接(Least Connection)
  • 一致性哈希(Consistent Hash)

二、完整代码

1.定义节点服务类
public class ServiceNode {private String ip;private int port;private boolean active;private int weight;private int currentConnections;private long lastActiveTime;// 熔断相关属性private int failureCount;private long circuitOpenedTime;private static final int FAILURE_THRESHOLD = 3;private static final long CIRCUIT_BREAKER_TIMEOUT = 30000; // 30秒// 判断节点是否可用(考虑熔断状态)public boolean isAvailable() {if (failureCount >= FAILURE_THRESHOLD) {// 检查是否应该尝试恢复if (System.currentTimeMillis() - circuitOpenedTime > CIRCUIT_BREAKER_TIMEOUT) 			  {return true; // 进入半开状态}return false; // 熔断中}return active; // 正常状态}public void recordFailure() {failureCount++;if (failureCount >= FAILURE_THRESHOLD) {circuitOpenedTime = System.currentTimeMillis();}}public void recordSuccess() {failureCount = 0;circuitOpenedTime = 0;}// 其他getter/setter方法...
}
2.负载均衡接口
public interface LoadBalancer {ServiceNode selectNode(List<ServiceNode> nodes);default void onRequestSuccess(ServiceNode node) {node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}default void onRequestFail(ServiceNode node) {node.setActive(false);}
}
3.实现不同的负载均衡策略
轮询策略,考虑熔断状态
public class RoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> availableNodes = nodes.stream().filter(ServiceNode::isAvailable).collect(Collectors.toList());if (availableNodes.isEmpty()) {// 尝试从不可用节点中选择已经过了熔断超时的节点List<ServiceNode> halfOpenNodes = nodes.stream().filter(n -> !n.isHealthy() && n.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD &&System.currentTimeMillis() - n.getCircuitOpenedTime() > ServiceNode.CIRCUIT_BREAKER_TIMEOUT).collect(Collectors.toList());if (!halfOpenNodes.isEmpty()) {// 选择其中一个进入半开状态的节点int index = counter.getAndIncrement() % halfOpenNodes.size();ServiceNode selected = halfOpenNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}throw new RuntimeException("No available nodes");}int index = counter.getAndIncrement() % availableNodes.size();ServiceNode selected = availableNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}@Overridepublic void onRequestSuccess(ServiceNode node) {node.recordSuccess();node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}@Overridepublic void onRequestFail(ServiceNode node) {node.recordFailure();node.setActive(false);node.setCurrentConnections(node.getCurrentConnections() - 1);}
}
加权轮询策略
public class WeightedRoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> activeNodes = nodes.stream().filter(ServiceNode::isHealthy).collect(Collectors.toList());if (activeNodes.isEmpty()) {throw new RuntimeException("No available nodes");}int totalWeight = activeNodes.stream().mapToInt(ServiceNode::getWeight).sum();int current = counter.getAndIncrement() % totalWeight;for (ServiceNode node : activeNodes) {if (current < node.getWeight()) {node.setCurrentConnections(node.getCurrentConnections() + 1);return node;}current -= node.getWeight();}// fallback to simple round robinint index = counter.get() % activeNodes.size();ServiceNode selected = activeNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}
}
4.健康检查器
public class HealthChecker {private final List<ServiceNode> nodes;private final ScheduledExecutorService scheduler;private final HttpClient httpClient;public HealthChecker(List<ServiceNode> nodes) {this.nodes = nodes;this.scheduler = Executors.newSingleThreadScheduledExecutor();this.httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(2)).build();}public void start() {scheduler.scheduleAtFixedRate(this::checkAllNodes, 0, 10, TimeUnit.SECONDS);}public void stop() {scheduler.shutdown();}private void checkAllNodes() {nodes.parallelStream().forEach(this::checkNode);}private void checkNode(ServiceNode node) {// 如果节点处于熔断状态但未超时,不检查if (node.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD && System.currentTimeMillis() - node.getCircuitOpenedTime() < ServiceNode.CIRCUIT_BREAKER_TIMEOUT) {return;}try {HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + "/health")).timeout(Duration.ofSeconds(2)).GET().build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == 200) {node.setActive(true);node.recordSuccess(); // 重置熔断状态node.setLastActiveTime(System.currentTimeMillis());} else {node.recordFailure();node.setActive(false);}} catch (Exception e) {node.recordFailure();node.setActive(false);}}
}
5.服务调用封装
public class ExternalServiceInvoker {private final LoadBalancer loadBalancer;private final List<ServiceNode> nodes;private final HttpClient httpClient;public ExternalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {this.loadBalancer = loadBalancer;this.nodes = nodes;this.httpClient = HttpClient.newHttpClient();}public String invokeService(String path, String requestBody) throws Exception {ServiceNode node = null;try {node = loadBalancer.selectNode(nodes);HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + path)).header("Content-Type", "application/json").timeout(Duration.ofSeconds(5)).POST(HttpRequest.BodyPublishers.ofString(requestBody)).build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() >= 200 && response.statusCode() < 300) {loadBalancer.onRequestSuccess(node);return response.body();} else {loadBalancer.onRequestFail(node);throw new RuntimeException("Service call failed with status: " + response.statusCode());}} catch (Exception e) {if (node != null) {loadBalancer.onRequestFail(node);}throw e;}}
}

三、在Spring Boot中的使用

1.配置为Spring Bean
@Configuration
public class LoadBalancerConfiguration {@Beanpublic List<ServiceNode> serviceNodes() {return Arrays.asList(new ServiceNode("192.168.1.101", 8080, 1),new ServiceNode("192.168.1.102", 8080, 1),new ServiceNode("192.168.1.103", 8080, 1),new ServiceNode("192.168.1.104", 8080, 1));}@Beanpublic LoadBalancer loadBalancer() {return new WeightedRoundRobinLoadBalancer();}@Beanpublic HealthChecker healthChecker(List<ServiceNode> nodes) {HealthChecker checker = new HealthChecker(nodes);checker.start();return checker;}@Beanpublic ExternalServiceInvoker externalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {return new ExternalServiceInvoker(loadBalancer, nodes);}
}
2.在Controller中使用
@RestController
@RequestMapping("/api")
public class MyController {@Autowiredprivate ExternalServiceInvoker serviceInvoker;@PostMapping("/call-external")public ResponseEntity<String> callExternalService(@RequestBody String request) {try {String response = serviceInvoker.invokeService("/external-api", request);return ResponseEntity.ok(response);} catch (Exception e) {return ResponseEntity.status(500).body("Service call failed: " + e.getMessage());}}
}

四、总结与最佳实践

  1. 选择合适的负载均衡策略

    • 对于性能相近的节点,使用轮询或随机
    • 对于性能差异大的节点,使用加权轮询
    • 对于长连接服务,考虑最少连接算法
  2. 健康检查配置建议

    • 检查间隔:5-30秒
    • 超时时间:1-3秒
    • 检查端点:/health或/actuator/health
  3. 生产环境注意事项

    • 添加日志记录节点选择过程和健康状态变化
    • 实现优雅降级,当所有节点不可用时返回缓存数据或友好错误
    • 考虑使用分布式配置中心动态调整节点列表
  4. 性能优化

    • 使用连接池减少连接建立开销
    • 实现请求重试机制(带退避策略)
    • 添加熔断器模式防止级联故障
  5. 结合熔断机制

    1. 完整的熔断机制:基于失败计数和超时的自动熔断/恢复
    2. 三种状态
      • 关闭(CLOSED):正常状态
      • 打开(OPEN):熔断状态,拒绝请求
      • 半开(HALF-OPEN):尝试恢复状态
    3. 与负载均衡深度集成
      • 节点选择时自动跳过熔断中的节点
      • 健康检查会重置成功节点的熔断状态
      • 支持半开状态下的试探性请求
    4. 可视化监控:通过REST端点查看节点和熔断状态
http://www.dtcms.com/a/329397.html

相关文章:

  • Go与Python爬虫实战对比:从开发效率到性能瓶颈的深度解析
  • 【车联网kafka】Kafka核心架构与实战经验(第四篇)
  • docker镜像状态监控
  • 磁悬浮轴承转子设计避坑指南:深度解析核心要点与高可靠性策略
  • 【网络运维】Playbook进阶: 管理变量
  • 如何在 Spring Boot 中设计和返回树形结构的组织和部门信息
  • [AI React Web] E2B沙箱 | WebGPU | 组件树 | 智能重构 | 架构异味检测
  • [AI React Web] 包与依赖管理 | `axios`库 | `framer-motion`库
  • Matlab(4)
  • STM32H5 的 PB14 引脚被意外拉低的问题解析
  • STM32读内部FLASH,偶尔读取错误
  • 三、非线性规划
  • 基于C#、.net、asp.net的心理健康咨询系统设计与实现/心理辅导系统设计与实现
  • 【vue3】v-model 的 “新玩法“
  • 基于人工智能和物联网融合跌倒监控系统(LW+源码+讲解+部署)
  • 面试实战 问题二十六 JDK 1.8 核心新特性详解
  • 猫头虎AI分享:Word MCP,让AI具备Word文档操作能力,文档创建、内容添加、格式编辑等AI能力
  • 【Word VBA Zotero 引用宏错误分析与改正指南】【解决[21–23]参考文献格式插入超链接问题】
  • 基于Java的Markdown转Word工具(标题、段落、表格、Echarts图等)
  • Linux 桌面到工作站的“性能炼金术”——开发者效率的 6 个隐形瓶颈与破解方案
  • 一条n8n工作流
  • vscode+phpstudy+xdebug如何调试php
  • windows10装Ubuntu22.04系统(双系统)
  • VS2022+Qt 5.15.2+FreeCAD 0.21.2开发环境配置流程
  • C# WPF本地Deepseek部署
  • 洛谷 P2607 [ZJOI2008] 骑士-提高+/省选-
  • M4T无人机在外墙脱落监测中的应用:从被动补救到主动预防的技术革新
  • 【代码随想录day 19】 力扣 450.删除二叉搜索树中的节点
  • 从原材料到成品,光模块 PCB 制造工艺全剖析
  • hutool 作为http 客户端工具调用的一点点总结