当前位置: 首页 > news >正文

分布式微服务--ZooKeeper的客户端常用命令 Java API 操作

一、ZooKeeper 客户端常用命令

1. 启动与退出

bin/zkCli.sh -server 127.0.0.1:2181  # 连接客户端
quit                                  # 退出客户端

2. 节点操作

# 查看子节点
ls /
ls -s /
ls /app# 查看节点详细信息
ls2 /app
stat /app
# 创建节点
create /node1 "hello"          # 持久节点
create -e /node2 "temp"        # 临时节点
create -s /node3 "seq"         # 顺序节点
create -e -s /node4 "tempSeq"  # 临时顺序节点创建node1下的子节点,不能node1和node1一起创建,必须创建了node1才能执行下面的否则报错
create /node1/node11 "hello1"  #子节点
# 获取/修改数据
get /node1
set /node1 "world"
# 删除节点
delete /node1   # 删除无子节点的
deleteall /节点path #删除带有子节点的节点
rmr /node1      # 递归删除

3. Watch 监听

get /node1 true   # 监听数据变化
ls / true         # 监听子节点变化

⚠️ 监听是一次性的,触发后失效。


4. ACL 权限控制

getAcl /node1
setAcl /node1 world:anyone:rw

权限:r=读,w=写,c=创建,d=删除,a=管理。
模式:world(所有人)、authdigest(用户名密码)、ip


5. 辅助命令

help        # 查看帮助
history     # 查看历史命令
redo <id>   # 重做历史命令

二、ZooKeeper Java API 操作

1. 原生 API(org.apache.zookeeper.ZooKeeper)

需要的依赖

<!-- 原生 ZooKeeper 依赖 -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.3</version>
</dependency>

代码实现

import org.apache.zookeeper.*;public class ZkDemo {public static void main(String[] args) throws Exception {// 1. 连接 ZooKeeperZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 30000, event -> {System.out.println("收到事件:" + event);});// 2. 创建节点zk.create("/node1", "hello".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);// 3. 获取节点数据byte[] data = zk.getData("/node1", false, null);System.out.println("节点数据:" + new String(data));// 4. 修改节点数据zk.setData("/node1", "world".getBytes(), -1);// 5. 获取子节点System.out.println("子节点:" + zk.getChildren("/", false));// 6. 删除节点zk.delete("/node1", -1);// 7. 关闭连接zk.close();}
}

2. Curator 客户端(推荐,简化 API

需要的依赖

 <!--curator-->
<!-- Curator(如果你要用 Curator 封装的API) -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version>
</dependency>

⚠️ curator-frameworkcurator-recipes 内部会依赖 zookeeper,所以一般不用单独引入 zookeeper 依赖,除非你要控制 zk 版本。

2.1 使用Curator实现增删改查的api

Curator 方法ZooKeeper CLI 类似命令
create().forPath("/node")create /node
getData().forPath("/node")get /node
getChildren().forPath("/")ls /
getData().storingStatIn(stat)ls -s /node
setData().forPath("/node", data)set /node data
delete().forPath("/node")delete /node
delete().deletingChildrenIfNeeded()rmr /node

代码实现:

public class CuratorTest {private CuratorFramework client; // Curator 客户端对象/*** 建立连接*/@Beforepublic void testConnect() {/*** @param connectString       连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181"* @param sessionTimeoutMs    会话超时时间 单位ms* @param connectionTimeoutMs 连接超时时间 单位ms* @param retryPolicy         重试策略*//* //重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);//1.第一种方式CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60 * 1000, 15 * 1000, retryPolicy);*///重试策略:初始等待3秒,重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二种方式:通过builder方式构建客户端//CuratorFrameworkFactory.builder();client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") // 设置ZK地址.sessionTimeoutMs(60 * 1000)           // 会话超时时间.connectionTimeoutMs(15 * 1000)        // 连接超时时间.retryPolicy(retryPolicy)              // 设置重试策略// namespace("czq")// 设置命名空间,将 "czq" 作为客户端操作的根路径// 这样之后创建节点时无需每次都写 /czq 前缀// 例如:client.create().forPath("/node11", "hello1".getBytes())// 实际会在 ZooKeeper 中创建 /czq/node11 节点.namespace("czq")   .build();//开启连接client.start();}//==============================create=============================================================================/*** 创建节点:create 持久 临时 顺序 数据* 1. 基本创建 :create().forPath("")* 2. 创建节点 带有数据:create().forPath("",data)* 3. 设置节点的类型:create().withMode().forPath("",data)* 4. 创建多级节点  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)*/@Testpublic void testCreate() throws Exception {//2. 创建节点 带有数据//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String path = client.create().forPath("/app2", "hehe".getBytes());System.out.println(path);}@Testpublic void testCreate2() throws Exception {//1. 基本创建//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String path = client.create().forPath("/app1");System.out.println(path);}@Testpublic void testCreate3() throws Exception {//3. 设置节点的类型//默认类型:持久化String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");System.out.println(path);}@Testpublic void testCreate4() throws Exception {//4. 创建多级节点  /app1/p1//creatingParentsIfNeeded():如果父节点不存在,则创建父节点String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println(path);}//===========================get================================================================================/*** 查询节点:* 1. 查询数据:get: getData().forPath()* 2. 查询子节点: ls /: getChildren().forPath()* 3. 查询节点状态信息:ls -s /:getData().storingStatIn(状态对象).forPath()*/@Testpublic void testGet1() throws Exception {//1. 查询数据:getbyte[] data = client.getData().forPath("/app1");System.out.println(new String(data));}@Testpublic void testGet2() throws Exception {// 2. 查询子节点: ls /List<String> path = client.getChildren().forPath("/");System.out.println(path);}@Testpublic void testGet3() throws Exception {Stat status = new Stat(); // 状态对象,用来存储节点元信息System.out.println(status);//3. 查询节点状态信息:ls -s ///.storingStatIn(status)代表存储状态信息到status对象中client.getData().storingStatIn(status).forPath("/app1");System.out.println(status);}//===========================set================================================================================/*** 修改数据* 1. 基本修改数据:setData().forPath()* 2. 根据版本修改: setData().withVersion().forPath()* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。** @throws Exception*/@Testpublic void testSet() throws Exception {// 基本修改节点数据client.setData().forPath("/app1", "itcast".getBytes());}@Testpublic void testSetForVersion() throws Exception {Stat status = new Stat();//3. 查询节点状态信息:ls -s//.storingStatIn(status)代表存储状态信息到status对象中client.getData().storingStatIn(status).forPath("/app1");int version = status.getVersion();//查询出来的节点版本System.out.println(version);// 根据版本修改节点数据,保证并发安全client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());}//===========================delete================================================================================/*** 删除节点: delete deleteall* 1. 删除单个节点:delete().forPath("/app1");* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");* 4. 回调:inBackground* @throws Exception*/@Testpublic void testDelete() throws Exception {// 1. 删除单个节点client.delete().forPath("/app1");}@Testpublic void testDelete2() throws Exception {//2. 删除带有子节点的节点client.delete().deletingChildrenIfNeeded().forPath("/app4");}@Testpublic void testDelete3() throws Exception {//3. 必须成功的删除(自动重试保证删除成功)client.delete().guaranteed().forPath("/app2");}@Testpublic void testDelete4() throws Exception {//4. 回调异步删除client.delete().guaranteed().inBackground(new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("我被删除了~");System.out.println(event);}}).forPath("/app1");}@Afterpublic void close() {// 关闭客户端连接if (client != null) {client.close();}}}
2.2 使用Curator实现Watch事件监听的api
(1)Curator 2.x/4.x 常见的写法
  • 使用者三个类(NodeCachePathChildrenCacheTreeCache

    • NodeCache(监听一个节点自己)

    • PathChildrenCache(监听某个节点的直接子节点)

    • TreeCache(监听某个节点和所有子节点)

        Curator 5.x 开始,这三个类(NodeCachePathChildrenCacheTreeCache)已经被 统一弃用,官方推荐用 CuratorCache 来代替。

监听器监听范围典型应用场景
NodeCache单个节点的数据变化监听配置节点变化
PathChildrenCache子节点的增删改,不监听本节点监听服务节点上下线
TreeCache节点及其所有子节点全量配置或服务树监控

代码实现    

public class CuratorWatcherTest {private CuratorFramework client; // Curator 客户端对象/*** 建立连接*/@Beforepublic void testConnect() {/*** @param connectString       连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181"* @param sessionTimeoutMs    会话超时时间 单位ms* @param connectionTimeoutMs 连接超时时间 单位ms* @param retryPolicy         重试策略*//* //重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);//1.第一种方式CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60 * 1000, 15 * 1000, retryPolicy);*///重试策略:初始等待3秒,重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二种方式:通过builder方式构建客户端//CuratorFrameworkFactory.builder();client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") // 设置ZK地址.sessionTimeoutMs(60 * 1000)           // 会话超时时间.connectionTimeoutMs(15 * 1000)        // 连接超时时间.retryPolicy(retryPolicy)              // 设置重试策略// namespace("czq")// 设置命名空间,将 "czq" 作为客户端操作的根路径// 这样之后创建节点时无需每次都写 /czq 前缀// 例如:client.create().forPath("/node11", "hello1".getBytes())// 实际会在 ZooKeeper 中创建 /czq/node11 节点.namespace("czq")   .build();//开启连接client.start();}@Afterpublic void close() {if (client != null) {client.close(); // 关闭客户端连接}}/*** 演示 NodeCache:给指定一个节点注册监听器* NodeCache 只能监听某个具体节点的数据变化(新增/修改/删除)*/@Testpublic void testNodeCache() throws Exception {// 1. 创建 NodeCache 对象,监听 /app1 节点final NodeCache nodeCache = new NodeCache(client,"/app1");// 2. 注册监听器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点变化了~");// 获取修改后的节点数据(如果节点被删除,这里可能会是 null)byte[] data = nodeCache.getCurrentData().getData();System.out.println(new String(data));}});// 3. 开启监听// 参数 true 表示在启动监听时,立即加载一次缓存数据nodeCache.start(true);// 阻塞住主线程,保证监听器一直生效while (true){}}/*** 演示 PathChildrenCache:监听某个节点的所有子节点* 只能监听子节点的变化(新增/修改/删除),不能监听当前节点本身*/@Testpublic void testPathChildrenCache() throws Exception {// 1. 创建 PathChildrenCache 监听对象// 参数 true 表示对子节点数据进行缓存PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);// 2. 绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("子节点变化了~");System.out.println(event);// 监听子节点数据变更PathChildrenCacheEvent.Type type = event.getType();if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){System.out.println("数据变了!!!");byte[] data = event.getData().getData();System.out.println(new String(data));}}});// 3. 开启监听pathChildrenCache.start();// 阻塞主线程,保证监听器一直生效while (true){}}/*** 演示 TreeCache:监听某个节点自己和它的所有子节点* 相当于 NodeCache + PathChildrenCache 的结合体*/@Testpublic void testTreeCache() throws Exception {// 1. 创建 TreeCache 监听对象TreeCache treeCache = new TreeCache(client,"/app2");// 2. 注册监听器treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("节点变化了");System.out.println(event);}});// 3. 开启监听treeCache.start();// 阻塞主线程,保证监听器一直生效while (true){}}
}

(2)Curator 5.x以上常见的写法

 Curator 5.x 开始,这三个类(NodeCachePathChildrenCacheTreeCache)已经被 统一弃用,官方推荐用 CuratorCache 来代替。(新版本)

方法名对应旧API监听范围应用场景
testCuratorCacheNodeNodeCache单节点数据变化单个配置项
testCuratorCacheChildrenPathChildrenCache子节点(不含父节点)服务注册/发现
testCuratorCacheTreeTreeCache节点 + 全部子节点配置中心、全量监控

代码实现 

public class CuratorWatcherTest {// Curator 客户端对象,用于操作 ZooKeeperprivate CuratorFramework client;/*** 建立连接*/@Beforepublic void testConnect() {/*** @param connectString       连接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181"* @param sessionTimeoutMs    会话超时时间 单位ms* @param connectionTimeoutMs 连接超时时间 单位ms* @param retryPolicy         重试策略*//* //重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);//1.第一种方式CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60 * 1000, 15 * 1000, retryPolicy);*///重试策略:初始等待3秒,重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二种方式:通过builder方式构建客户端//CuratorFrameworkFactory.builder();client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") // 设置ZK地址.sessionTimeoutMs(60 * 1000)           // 会话超时时间.connectionTimeoutMs(15 * 1000)        // 连接超时时间.retryPolicy(retryPolicy)              // 设置重试策略// namespace("czq")// 设置命名空间,将 "czq" 作为客户端操作的根路径// 这样之后创建节点时无需每次都写 /czq 前缀// 例如:client.create().forPath("/node11", "hello1".getBytes())// 实际会在 ZooKeeper 中创建 /czq/node11 节点.namespace("czq")   .build();//开启连接client.start();}@Afterpublic void close() {// 测试完成后关闭客户端,释放资源if (client != null) {client.close();}}// ==============================替代 NodeCache=============================================================================/*** 1. 监听单个节点(替代 NodeCache)* 使用 CuratorCache + CuratorCacheListener 来代替旧的 NodeCache*/@Testpublic void testCuratorCacheNode() throws Exception {// 创建 CuratorCache,监听 /app1 节点CuratorCache cache = CuratorCache.build(client, "/app1");// 定义监听器,使用 forNodeCache 模式,只监听该节点数据变化CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new Runnable() {@Overridepublic void run() {System.out.println("节点变化了~");try {// 获取节点最新数据并打印byte[] data = client.getData().forPath("/app1");System.out.println("最新数据:" + new String(data));} catch (Exception e) {e.printStackTrace();}}}).build();// 将监听器绑定到缓存cache.listenable().addListener(listener);// 开启缓存(监听)cache.start();// 阻塞主线程,保证监听器一直运行Thread.sleep(Long.MAX_VALUE);}// ==============================替代 PathChildrenCache=============================================================================/*** 2. 监听子节点变化(替代 PathChildrenCache)* 使用 CuratorCache + PathChildrenCacheListener 监听某节点的所有子节点*/@Testpublic void testCuratorCacheChildren() throws Exception {// 创建 CuratorCache,监听 /app2 节点的子节点CuratorCache cache = CuratorCache.build(client, "/app2");// 定义监听器,forPathChildrenCache 表示只监听子节点的变化CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache("/app2", client, new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("子节点变化了~");System.out.println("类型:" + event.getType());if (event.getData() != null) {// 打印节点路径和数据System.out.println("节点:" + event.getData().getPath());System.out.println("数据:" + new String(event.getData().getData()));}}}).build();// 将监听器绑定到缓存cache.listenable().addListener(listener);// 开启缓存cache.start();// 阻塞主线程,保证监听器一直运行Thread.sleep(Long.MAX_VALUE);}// ==============================替代 TreeCache=============================================================================/*** 3. 监听节点及其所有子节点(替代 TreeCache)* 使用 CuratorCache + TreeCacheListener 监听整个节点树*/@Testpublic void testCuratorCacheTree() throws Exception {// 创建 CuratorCache,监听 /app2 节点及其子节点CuratorCache cache = CuratorCache.build(client, "/app2");// 定义监听器,forTreeCache 表示节点本身和子节点都监听CuratorCacheListener listener = CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("树节点变化了~");System.out.println("类型:" + event.getType());if (event.getData() != null) {// 打印节点路径和数据System.out.println("节点:" + event.getData().getPath());System.out.println("数据:" + new String(event.getData().getData()));}}}).build();// 将监听器绑定到缓存cache.listenable().addListener(listener);// 开启缓存cache.start();// 阻塞主线程,保证监听器一直运行Thread.sleep(Long.MAX_VALUE);}}

2.3 实现分布式锁

 这里不做过多讲解详细去看我的另一篇博客:


文章转载自:

http://EXxxh7Ul.qtsks.cn
http://seLUeHAB.qtsks.cn
http://jGpodw84.qtsks.cn
http://dBQSIvz6.qtsks.cn
http://zGwFMbA5.qtsks.cn
http://w49Y63cy.qtsks.cn
http://4sGzcrlB.qtsks.cn
http://pZMV2yM6.qtsks.cn
http://mjdi8PlF.qtsks.cn
http://D8C3xSDG.qtsks.cn
http://sPOq7GUa.qtsks.cn
http://Y7ZLPttT.qtsks.cn
http://Fn22W227.qtsks.cn
http://VJG5sqd9.qtsks.cn
http://410xs5pg.qtsks.cn
http://sHc3AQcY.qtsks.cn
http://PYvXR1Ma.qtsks.cn
http://0y26Edpo.qtsks.cn
http://jaYPRqF0.qtsks.cn
http://h2l2Z6Cw.qtsks.cn
http://tBqtbFMK.qtsks.cn
http://MSRqm7Zn.qtsks.cn
http://LIsYgf2c.qtsks.cn
http://1wn5FJow.qtsks.cn
http://Zd04f0pr.qtsks.cn
http://3BS5lqdB.qtsks.cn
http://8p7aKy8B.qtsks.cn
http://U95hth2t.qtsks.cn
http://6VNHERMF.qtsks.cn
http://7KIJHV5b.qtsks.cn
http://www.dtcms.com/a/366541.html

相关文章:

  • 微软GraphRAG 端到端使用及自用工具类
  • Java场景题面试合集
  • ECMAScript (5)ES6前端开发核心:国际化与格式化、内存管理与性能
  • 日本移动应用市场营销分析:娱乐和金融应用增长强劲,游戏类广告支出最高!
  • UDS统一诊断服务
  • 服务器不支持node.js16以上版本安装?用Docker轻松部署Node.js 20+环境运行Strapi项目
  • Simulations RL 平台学习笔记
  • 基于华为云的STM32F103C8T6智能停车场管理系统
  • 分布式对象存储系统 Minio 之 Centos 环境安装
  • 不只是链接:我用“双向链表”思维做内容推广,效率飙升300%
  • 【Markdown转Word完整教程】从原理到实现
  • Matlab中的转置—— ‘ 和 .‘ 的区别
  • YOLOv8自定义目标检测模型训练与应用指南
  • 揭秘23种设计模式的艺术与技巧之结构型
  • Git常用命令大全:高效开发必备
  • Flowable——流程定义与部署(RepositoryService)
  • 【IO进程 共享内存、信号量集】
  • IBM穿孔卡片:现代计算技术的奠基之作
  • 技术视界 | 跨域机器人通信与智能系统:打破壁垒的开源探索
  • 【Python】pyinstaller:打包工具
  • Mac 使用 softhsm
  • 一文搞懂保险中的Nominee\Beneficiary\Trustee三个角色
  • 无线路由器:从家庭上网到智慧互联的核心设备
  • 文件传输工具rsync|rust开发环境安装|Ascend实验相关命令
  • 51单片机-按键、蜂鸣器、定时器模块及中断
  • Python学习3.0使用Unittest框架运行测试用例
  • MyBatis-Plus简介以及简单配置和使用
  • 2025全国总工会第二届职工数字化应用技术技能大赛 安徽省选拔赛—数据安全管理员赛项
  • 静态IP如何使用
  • 【Linux系统】线程同步