当前位置: 首页 > news >正文

docker-compose搭建 redis 集群

搭建三主三从的 redis 集群,docker-compose 文件如下

RC_WKDIR=/data/redis_dir docker-compose up -d
version: '3.8'services:redis-node-11:image: rediscontainer_name: redis-node-11hostname: redis-node-11ports:- "57011:6379"volumes:- ${RC_WKDIR:-.}/data/node-11:/datacommand: - redis-server - --cluster-enabled yes- --appendonly yes- --cluster-config-file nodes.confnetworks:- redis-clusterredis-node-12:image: rediscontainer_name: redis-node-12ports:- "57012:6379"volumes:- ${RC_WKDIR:-.}/data/node-12:/datacommand: - redis-server - --cluster-enabled yes- --appendonly yes- --cluster-config-file nodes.confnetworks:- redis-clusterredis-node-21:image: rediscontainer_name: redis-node-21ports:- "57021:6379"volumes:- ${RC_WKDIR:-.}/data/node-21:/datacommand: - redis-server - --cluster-enabled yes- --appendonly yes- --cluster-config-file nodes.confnetworks:- redis-clusterredis-node-22:image: rediscontainer_name: redis-node-22ports:- "57022:6379"volumes:- ${RC_WKDIR:-.}/data/node-22:/datacommand: - redis-server - --cluster-enabled yes- --appendonly yes- --cluster-config-file nodes.confnetworks:- redis-clusterredis-node-31:image: rediscontainer_name: redis-node-31ports:- "57031:6379"volumes:- ${RC_WKDIR:-.}/data/node-31:/datacommand: - redis-server - --cluster-enabled yes- --appendonly yes- --cluster-config-file nodes.confnetworks:- redis-clusterredis-node-32:image: rediscontainer_name: redis-node-32ports:- "57032:6379"volumes:- ${RC_WKDIR:-.}/data/node-32:/datacommand: - redis-server - --cluster-enabled yes- --appendonly yes- --cluster-config-file nodes.confnetworks:- redis-clusterredis-cluster-init:image: rediscontainer_name: redis-cluster-initdepends_on:- redis-node-11- redis-node-12- redis-node-21- redis-node-22- redis-node-31- redis-node-32command: /bin/bash -c "echo 'Waiting for Redis nodes to be ready...';sleep 10;echo $$(redis-cli -h redis-node-11 -p 6379 cluster nodes | grep myself);echo '正在创建Redis集群...';echo 'yes' | redis-cli --cluster create \redis-node-11:6379 redis-node-12:6379 \redis-node-21:6379 redis-node-22:6379 \redis-node-31:6379 redis-node-32:6379 \--cluster-replicas 1 --cluster-yes;echo '配置从节点关系...';redis-cli --cluster add-node redis-node-12:6379 redis-node-11:6379 --cluster-slave --cluster-master-id $(redis-cli -h redis-node-11 -p 6379 cluster nodes | grep myself | awk '{print \$1}');redis-cli --cluster add-node redis-node-22:6379 redis-node-21:6379 --cluster-slave --cluster-master-id $(redis-cli -h redis-node-21 -p 6379 cluster nodes | grep myself | awk '{print \$1}');redis-cli --cluster add-node redis-node-32:6379 redis-node-31:6379 --cluster-slave --cluster-master-id $(redis-cli -h redis-node-31 -p 6379 cluster nodes | grep myself | awk '{print \$1}');echo 'Redis集群初始化完成!';"networks:- redis-clusternetworks:redis-cluster:driver: bridge

另外有用到扩缩容的场景,通过 python 脚本实现了下(不太严谨,抛砖引玉)

#!/usr/bin/env python3
import argparse
import subprocess
import sys
from datetime import datetime
from typing import List, Tuple, Optional, Dictclass RedisClusterResizer:def __init__(self, cluster_entry: str, password: Optional[str] = None):self.cluster_entry = cluster_entryself.password = passwordself.redis_cli = "redis-cli"if self.password:self.redis_cli += f" -a {self.password}"self.redis_cli += " --cluster"def log(self, message: str):print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}")def run_command(self, cmd: str, check: bool = True) -> Tuple[bool, str]:try:print('run cmd: {}'.format(cmd))result = subprocess.run(cmd, shell=True, check=check,stdout=subprocess.PIPE, stderr=subprocess.PIPE,text=True)return True, result.stdoutexcept subprocess.CalledProcessError as e:return False, f'{{stdout: {e.stdout}, stderr: {e.stderr}}}'def check_redis_node(self, node: str) -> bool:host, port = node.split(":")cmd = f"redis-cli -h {host} -p {port}"if self.password:cmd += f" -a {self.password}"cmd += " ping"success, output = self.run_command(cmd)return success and "PONG" in outputdef check_cluster_status(self) -> bool:self.log("检查集群状态...")cmd = f"{self.redis_cli} check {self.cluster_entry}"success, output = self.run_command(cmd)if not success:self.log(f"集群状态检查失败: {output}")return successdef get_node_id(self, node: str) -> Optional[str]:host, port = node.split(":")cmd = f"redis-cli -h {host} -p {port}"if self.password:cmd += f" -a {self.password}"cmd += " cluster nodes | grep myself"success, output = self.run_command(cmd)if not success or not output:return Nonereturn output.split()[0]def get_master_nodes(self) -> List[Tuple[str, str]]:"""返回 (node_id, node_address) 列表"""cmd = f"redis-cli -h {self.cluster_entry.split(':')[0]} -p {self.cluster_entry.split(':')[1]}"if self.password:cmd += f" -a {self.password}"cmd += " cluster nodes | grep master | grep -v fail"success, output = self.run_command(cmd)if not success:return []nodes = []for line in output.splitlines():parts = line.split()node_id = parts[0]node_addr = parts[1].split("@")[0]nodes.append((node_id, node_addr))return nodesdef get_slave_nodes(self, master_id: str) -> List[Tuple[str, str]]:"""返回指定主节点的从节点列表 (node_id, node_address)"""cmd = f"redis-cli -h {self.cluster_entry.split(':')[0]} -p {self.cluster_entry.split(':')[1]}"if self.password:cmd += f" -a {self.password}"cmd += f" cluster nodes | grep slave | grep {master_id}"success, output = self.run_command(cmd)if not success:return []slaves = []for line in output.splitlines():parts = line.split()slave_id = parts[0]slave_addr = parts[1].split("@")[0]slaves.append((slave_id, slave_addr))return slavesdef get_cluster_nodes(self) -> List[Dict]:"""获取集群所有节点信息,返回结构化数据"""cmd = f"redis-cli -h {self.cluster_entry.split(':')[0]} -p {self.cluster_entry.split(':')[1]}"if self.password:cmd += f" -a {self.password}"cmd += " cluster nodes"success, output = self.run_command(cmd)if not success:return []nodes = []for line in output.splitlines():parts = line.split()if len(parts) < 8:continuenode = {"id": parts[0],"address": parts[1].split("@")[0],"flags": parts[2].split(","),"master": parts[3],"ping_sent": parts[4],"ping_recv": parts[5],"config_epoch": parts[6],"link_state": parts[7],"slots": []}if len(parts) > 8:if parts[8] == "->":node["migrating"] = parts[9]else:node["slots"] = self.parse_slots(parts[8:])nodes.append(node)return nodesdef parse_slots(self, slot_parts: List[str]) -> List[Tuple[int, int]]:"""解析槽位范围"""slots = []for part in slot_parts:if part.startswith("["):continueif "-" in part:start, end = map(int, part.split("-"))slots.append((start, end))else:slot = int(part)slots.append((slot, slot))return slotsdef show_cluster_topology(self):"""显示集群拓扑结构"""nodes = self.get_cluster_nodes()if not nodes:self.log("无法获取集群节点信息")return# 统计主从关系masters = {}for node in nodes:if "master" in node["flags"] and node["master"] == "-":masters[node["id"]] = {"node": node,"slaves": []}for node in nodes:if "slave" in node["flags"] and node["master"] in masters:masters[node["master"]]["slaves"].append(node)# 打印拓扑print("\nRedis 集群拓扑结构:")print("=" * 60)for master_id, master_data in masters.items():master = master_data["node"]print(f"主节点 [{master['id'][:8]}] {master['address']}")print(f"  状态: {'|'.join(master['flags'])} 槽位: {len(master['slots'])}个")# 打印槽位范围if master["slots"]:slot_ranges = []current_start = master["slots"][0][0]current_end = master["slots"][0][1]for start, end in master["slots"][1:]:if start == current_end + 1:current_end = endelse:slot_ranges.append(f"{current_start}-{current_end}")current_start, current_end = start, endslot_ranges.append(f"{current_start}-{current_end}")print(f"  槽位分布: {', '.join(slot_ranges)}")# 打印从节点if master_data["slaves"]:for slave in master_data["slaves"]:print(f"  └─ 从节点 [{slave['id'][:8]}] {slave['address']}")else:print("  └─ (无从节点)")print("-" * 60)# 打印未分配节点unassigned = [n for n in nodes if n["master"] == "-" and "master" not in n["flags"]]if unassigned:print("\n未分配节点:")for node in unassigned:print(f"  - {node['address']} [{node['id'][:8]}]")print("=" * 60)self.print_cluster_summary(nodes)def print_cluster_summary(self, nodes: List[Dict]):"""打印集群摘要信息"""masters = [n for n in nodes if "master" in n["flags"] and n["master"] == "-"]slaves = [n for n in nodes if "slave" in n["flags"]]total_slots = sum((end - start + 1) for master in masters for start, end in master["slots"])print(f"\n集群摘要:")print(f"  - 主节点数: {len(masters)}")print(f"  - 从节点数: {len(slaves)}")print(f"  - 已分配槽位: {total_slots}/16384 ({total_slots/16384:.1%})")print(f"  - 节点总数: {len(nodes)}")def scale_out(self, new_master: str, new_slave: str):"""扩容集群"""self.log(f"开始扩容操作 - 新主节点: {new_master}, 新从节点: {new_slave}")# 检查新节点if not self.check_redis_node(new_master):self.log(f"错误: 新主节点 {new_master} 不可用")sys.exit(1)if not self.check_redis_node(new_slave):self.log(f"错误: 新从节点 {new_slave} 不可用")sys.exit(1)# 添加新主节点self.log(f"添加新主节点 {new_master} 到集群...")cmd = f"{self.redis_cli} add-node {new_master} {self.cluster_entry}"success, output = self.run_command(cmd)if not success:self.log(f"添加主节点失败: {output}")# sys.exit(1)# 获取新主节点IDnew_master_id = self.get_node_id(new_master)if not new_master_id:self.log("无法获取新主节点ID")sys.exit(1)# 重新分配哈希槽self.log("重新分配哈希槽...")masters = self.get_master_nodes()if not masters:self.log("无法获取主节点列表")sys.exit(1)# 计算每个主节点应该移动多少槽 (16384 / (原主节点数 + 1))slots_per_node = 16384 // (len(masters) + 1)# 从所有现有主节点迁移槽到新主节点cmd = (f"{self.redis_cli} reshard {self.cluster_entry} "f"--cluster-from all "f"--cluster-to {new_master_id} "f"--cluster-slots {slots_per_node} "f"--cluster-yes")success, output = self.run_command(cmd)if not success:self.log(f"槽迁移失败: {output}")# sys.exit(1)# 添加从节点self.log(f"添加从节点 {new_slave} 到集群...")cmd = (f"{self.redis_cli} add-node {new_slave} {self.cluster_entry} "f"--cluster-slave --cluster-master-id {new_master_id}")success, output = self.run_command(cmd)if not success:self.log(f"添加从节点失败: {output}")# sys.exit(1)# 检查集群状态if not self.check_cluster_status():sys.exit(1)self.log("扩容操作完成")def scale_in(self, do_master: str, do_slave: str):"""缩容集群"""self.log("开始缩容操作...")# 获取所有主节点masters = self.get_master_nodes()if len(masters) <= 1:self.log("错误: 集群中只有一个主节点,不能缩容")sys.exit(1)# 选择最后一个主节点作为移除目标target_master_id, target_master = masters[-1]for mi in masters[::-1]:if mi[-1]==do_master:masters.remove(mi)target_master_id, target_master = masters[-1]breakself.log(f"将移除主节点: {target_master} (ID: {target_master_id})")# 获取该主节点的从节点slaves = self.get_slave_nodes(target_master_id)if not slaves:self.log("警告: 目标主节点没有从节点")target_slave_id, target_slave = None, Noneelse:target_slave_id, target_slave = slaves[0]self.log(f"将移除从节点: {target_slave} (ID: {target_slave_id})")# 选择一个接收槽的目标主节点(第一个主节点)recipient_master_id, recipient_master = masters[0]self.log(f"将槽迁移到目标节点: {recipient_master} (ID: {recipient_master_id})")# 迁移槽self.log(f"迁移主节点 {target_master} 的哈希槽...")cmd = (f"{self.redis_cli} reshard {target_master} "f"--cluster-from {target_master_id} "f"--cluster-to {recipient_master_id} "f"--cluster-slots 16384 "  # 迁移所有槽f"--cluster-yes")success, output = self.run_command(cmd)if not success:self.log(f"槽迁移失败: {output}")sys.exit(1)# 删除从节点if target_slave_id:self.log(f"删除从节点 {target_slave}...")cmd = f"{self.redis_cli} del-node {self.cluster_entry} {target_slave_id}"success, output = self.run_command(cmd)if not success:self.log(f"删除从节点失败: {output}")# sys.exit(1)# 删除主节点self.log(f"删除主节点 {target_master}...")cmd = f"{self.redis_cli} del-node {self.cluster_entry} {target_master_id}"success, output = self.run_command(cmd)if not success:self.log(f"删除主节点失败: {output}")# sys.exit(1)# 检查集群状态if not self.check_cluster_status():self.log("集群状态异常")sys.exit(1)self.log("缩容操作完成")def main():parser = argparse.ArgumentParser(description="Redis集群管理工具")parser.add_argument("action", choices=["scale-out", "scale-in", "show-topology"], help="操作类型: scale-out(扩容), scale-in(缩容) 或 show-topology(显示拓扑)")parser.add_argument("--cluster-entry", default="10.150.24.18:57011",help="集群入口节点 (默认: 10.150.24.18:57011)")parser.add_argument("--password", help="Redis密码")parser.add_argument("action_nodes", nargs="*", help="扩容/缩容时需要的新节点地址 (主节点 从节点)")args = parser.parse_args()resizer = RedisClusterResizer(args.cluster_entry, args.password)if args.action == "scale-out":if len(args.action_nodes) < 2:print("错误: 扩容需要指定新主节点和新从节点")sys.exit(1)resizer.scale_out(args.action_nodes[0], args.action_nodes[1])elif args.action == "scale-in":resizer.scale_in(args.action_nodes[0], args.action_nodes[1])elif args.action == "show-topology":resizer.show_cluster_topology()else:print(f"错误: 未知操作 {args.action}")sys.exit(1)if __name__ == "__main__":main()
http://www.dtcms.com/a/328441.html

相关文章:

  • 阿里巴巴开源多模态大模型-Qwen-VL系列论文精读(一)
  • VBS 时间函数
  • 基于 libwebsockets 库实现的 WebSocket 服务器类
  • Shader warning in ‘Universal Render Pipeline/Particles/Simple Lit‘
  • provide()函数和inject()函数
  • 【UEFI系列】Super IO
  • VUE+SPRINGBOOT从0-1打造前后端-前后台系统-语音评测
  • 嵌入式学习(day25)文件IO:open read/write close
  • VGG改进(2):基于Local Attention的模型优化
  • 书籍数组中未出现的最小正整数(8)0812
  • 《飞算JavaAI:新一代智能编码引擎,革新Java研发范式》
  • 跑腿平台开发实战:同城O2O系统源码的模块化与可扩展性方案
  • 每日一练:将一个数字表示成幂的和的方案数;动态规划、深度优先搜索
  • 【Altium designer】快速建立原理图工程的步骤
  • 2025开放计算技术大会|开源开放推动系统创新 加速AIDC全球协作
  • 过拟合、欠拟合与方差/偏差的关系
  • Langchain结合deepseek:框架+模型的AI测试实践
  • 小白学习pid环控制-实现篇
  • 杰里平台7083G 如何支持4M flash
  • 【oracle闪回查询】记录字段短时间被修改的记录
  • MyBatis-Plus核心内容
  • AAC音频编码器技术详解:原理、应用与发展
  • Java数组排序
  • 嵌入式系统分层开发:架构模式与工程实践(四)(状态机的应用和面向对象的编程)
  • redis认识缓存击穿
  • 特征工程--机器学习
  • [ 数据结构 ] 时间和空间复杂度
  • Linux中Apache与Web之虚拟主机配置指南
  • 栈和队列:数据结构中的基础与应用​
  • GaussDB 数据库架构师修炼(十三)安全管理(2)-数据库权限管理