分布式专题——16 ZooKeeper经典应用场景实战(上)
1 ZooKeeper Java 客户端实战
-
Zookeeper 应用开发主要依靠 Java 客户端 API 来连接和操作 Zookeeper 集群,可供选择的 Java 客户端 API 有:
- Zookeeper 官方的 Java 客户端 API;
- 第三方的 Java 客户端 API,比如 Curator;
-
Zookeeper 官方 API 提供了基本的操作,比如:创建会话、节点,读取、更新节点数据,删除节点以及检查节点是否存在等基本操作。但同时也存在不少不足:
- Watcher 监测是一次性的,每次触发后得重新注册;
- 会话超时没有重连机制;
- 异常处理麻烦,Zookeeper 提供了很多异常,开发者难以处理抛出的众多异常;
- 只提供简单的
byte[]
数组类型接口,没有 Java POJO 级别的序列化数据处理接口; - 创建节点抛异常时,需自行检查节点是否存在;
- 无法实现级联删除;
-
官方 API 功能博靠谱简单,实际开发中使用起来繁琐且笨重,一般不推荐使用。
1.1 ZooKeeper 原生 Java 客户端的使用
1.1.1 引入依赖
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>
- 注意:保持与服务端版本一致,不然会有很多兼容性的问题。
1.1.2 ZooKeeper 常用构造器
-
Zookeeper 原生客户端主要借助
org.apache.zookeeper.ZooKeeper
类来使用 Zookeeper 服务,这个类是与 Zookeeper 集群交互的核心类;ZooKeeper (connectString, sessionTimeout, watcher)
-
connectString:是用逗号分隔的 Zookeeper 节点列表,每个节点格式为
host:port
,host
可以是机器名或 IP 地址,port
是 Zookeeper 节点对外提供服务的端口号。客户端会从connectString
中随机选一个节点建立连接; -
sessionTimeout:表示会话超时时间,单位通常是毫秒,用于定义客户端与 Zookeeper 集群会话的有效时长;
-
watcher:是一个监听器,用于接收来自 Zookeeper 集群的事件,比如连接状态变化、节点数据变化等事件;
-
-
使用原生 API 连接 Zookeeper 集群及操作示例:
public class ZkClientDemo {// 定义了单个 Zookeeper 节点的连接字符串,地址为 localhost,端口 2181private static final String CONNECT_STR="localhost:2181";// 定义了 Zookeeper 集群的连接字符串,包含多个节点的 host:port 组合,客户端可从这些节点中选择进行连接private final static String CLUSTER_CONNECT_STR="192.168.65.156:2181, 192.168.65.190:2181, 192.168.65.200:2181";public static void main(String[] args) throws Exception {// 创建 CountDownLatch 实例,初始计数为 1。它用于同步,让主线程等待 Zookeeper 连接建立成功后再继续执行后续操作final CountDownLatch countDownLatch = new CountDownLatch(1);// 调用 ZooKeeper 构造器,传入集群连接字符串 CLUSTER_CONNECT_STR、会话超时时间 4000(毫秒)以及一个自定义的 Watcher 匿名内部类ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, new Watcher() {// 在 Watcher 的 process 方法中,监听连接状态@Overridepublic void process(WatchedEvent event) {// 当事件状态为 Event.KeeperState.SyncConnected(同步连接状态)// 且事件类型为 Event.EventType.None(无特定事件类型,通常表示连接成功建立)时if(Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None){// 调用 countDownLatch.countDown(),将计数减 1,并打印 “连接建立”countDownLatch.countDown();System.out.println("连接建立");}}});System.out.printf("连接中");// 调用 countDownLatch.await(),主线程会阻塞,直到 countDownLatch 计数变为 0(即连接建立成功)countDownLatch.await();// 打印 Zookeeper 客户端的状态System.out.println(zooKeeper.getState());// 调用 zooKeeper.create 方法创建一个持久节点// 节点路径 "/user"// 节点数据(将字符串 "fox" 转为字节数组)// 访问控制列表 ZooDefs.Ids.OPEN_ACL_UNSAFE(表示完全开放的访问控制,任何客户端都可操作该节点)// 创建模式 CreateMode.PERSISTENT(表示创建的是持久节点,Zookeeper 服务重启后节点依然存在)zooKeeper.create("/user", "fox".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} }
1.1.3 Zookeeper 主要方法
-
ZooKeeper 原生 Java 客户端的主要方法有:
-
create(path, data, acl, createMode)
:用于创建指定路径的 ZNode,并在该 ZNode 中保存data[]
数据,createMode
指定 ZNode 的类型(如持久节点、临时节点等); -
delete(path, version)
:若指定路径path
上的 ZNode 版本与给定的version
匹配,就删除该 ZNode; -
exists(path, watch)
:判断指定路径path
上的 ZNode 是否存在,同时在该 ZNode 上设置一个监听器watch
,用于监听节点的变化; -
getData(path, watch)
:获取指定路径path
上 ZNode 的数据,并且在该 ZNode 上设置监听器watch
; -
setData(path, data, version)
:当指定路径path
上的 ZNode 版本与给定的version
匹配时,设置该 ZNode 的数据; -
getChildren(path, watch)
:获取指定路径path
上 ZNode 的子 ZNode 名称,同时在该 ZNode 上设置监听器watch
; -
sync(path)
:将客户端 session 连接的节点与 leader 节点进行同步;
-
-
方法特点:
-
监听器(watch)相关:所有获取 ZNode 数据的 API(如
exists
、getData
、getChildren
)都能设置watch
,用于监控 ZNode 的变化,当 ZNode 发生变化(如数据修改、子节点增减等)时,会触发相应的事件通知; -
更新操作版本控制:所有更新 ZNode 数据的 API(如
delete
、setData
)都有两个版本,无条件更新和条件更新。若version
为-1
,则为无条件更新;否则,只有当给定的version
与 ZNode 当前的version
一致时,才会进行更新,这属于条件更新; -
同步与异步方法:所有方法都有同步和异步两个版本。同步版本的方法会发送请求给 Zookeeper 并等待服务器响应;异步版本的方法会把请求放入客户端的请求队列,然后立即返回,后续通过回调(callback)来接收来自服务端的响应;
-
-
同步创建节点:
@Test public void createTest() throws KeeperException, InterruptedException {// 调用 zooKeeper.create 方法同步创建节点:// 指定节点路径(由 ZK_NODE 定义)// 数据(将字符串 "data" 转为字节数组)// 访问控制列表 ZooDefs.Ids.OPEN_ACL_UNSAFE(表示完全开放的访问控制)// 创建模式 CreateMode.PERSISTENT(创建持久节点,Zookeeper 服务重启后节点仍存在)String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}",path); }
-
异步创建节点:
@Test public void createAsycTest() throws InterruptedException {// 调用 zooKeeper.create 的异步版本创建节点,参数与同步创建类似,还传入了一个回调函数// 当服务端处理完创建请求后,会调用该回调函数,在回调中记录相关信息(如响应码 rc、路径 path、上下文 ctx、节点名 name 等)zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc {},path {},ctx {},name {}",rc,path,ctx,name),"context");TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }
-
修改节点数据:
@Test public void setTest() throws KeeperException, InterruptedException {// 创建 Stat 对象,用于存储节点的状态信息(如版本等)Stat stat = new Stat();// 调用 zooKeeper.getData 方法获取节点(由 ZK_NODE 指定)当前的数据,并将状态信息存入 Stat 对象,然后记录修改前的数据byte[] data = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改前: {}",new String(data));// 调用 zooKeeper.setData 方法修改节点数据// 传入新数据(将字符串 "changed!" 转为字节数组)以及从 Stat 对象中获取的当前版本,实现条件更新(只有版本匹配时才会修改)zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改后: {}",new String(dataAfter)); }
1.2 Curator 开源客户端的使用
1.2.1 简介
-
Curator 是 Netflix 公司开源的一套 ZooKeeper 客户端框架,和 ZkClient 类似,它解决了 ZooKeeper 底层开发的诸多细节问题,像连接、重连、反复注册 Watcher 以及处理
NodeExistsException
异常等; -
Curator 是 Apache 基金会的顶级项目之一,具备更加完善的文档。同时,它提供了一套易用性和可读性更强的 Fluent 风格的客户端 API 框架,能让开发者更便捷、清晰地进行 ZooKeeper 相关开发;
-
Curator 为 ZooKeeper 客户端框架提供了一些普遍的、开箱即用的分布式开发解决方案,例如 Recipe、共享锁服务、Master 选举机制和分布式计算器等。这些方案帮助开发者避免了“重复造轮子”的无效开发工作,可直接用于满足分布式场景下的各种需求;
-
在实际的开发场景中,使用 Curator 客户端就足以应付日常的 ZooKeeper 集群操作的需求,能高效地完成与 ZooKeeper 集群相关的开发任务;
-
官网:Welcome to Apache Curator | Apache Curator。
1.2.2 引入依赖
-
Curator 包含的包:
curator-framework
:是对 ZooKeeper 底层 API 的封装,简化了 ZooKeeper 原生 API 的使用,让开发者能更便捷地与 ZooKeeper 集群交互;curator-client
:提供了一些客户端相关的操作,比如重试策略等,有助于提升客户端与 ZooKeeper 集群连接的稳定性和可靠性;curator-recipes
:封装了一些高级特性,像 Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等,这些特性为分布式场景下的开发提供了现成的解决方案,避免开发者重复开发;
<!-- zookeeper client --> <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version> </dependency><!--curator--> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions> </dependency>
1.2.3 创建客户端实例
-
在使用
curator - framework
操作 ZooKeeper 前,需要创建CuratorFramework
类型的客户端实例,有两种方式:-
使用工厂类
CuratorFrameworkFactory
的静态newClient()
方法:// 定义重试策略,下面使用 ExponentialBackoffRetry 重试策略 // 参数 1000 是初始休眠时间(单位毫秒),3 是最大重试次数 // 该策略的特点是随着重试次数增加,重试间隔时间按指数倍增长 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)// 创建客户端实例 // 传入 ZooKeeper 连接字符串 zookeeperConnectionString 和重试策略 retryPolicy CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);// 启动客户端 client.start();
-
使用工厂类
CuratorFrameworkFactory
的静态builder
构造者方法:// 定义 ExponentialBackoffRetry 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);// 构建客户端 CuratorFramework client = CuratorFrameworkFactory.builder()// 服务器地址列表,可以是单个地址,也可以是多个地址,多个地址用逗号分隔.connectString("192.168.128.129:2181") // 会话超时时间(5000 毫秒,作用于服务端,设置会话在 ZooKeeper 服务端的失效时间).sessionTimeoutMs(5000) // 连接超时时间(5000 毫秒,作用于客户端,限制客户端发起会话连接到接收 ZooKeeper 服务端应答的时间).connectionTimeoutMs(5000)// 重试策略.retryPolicy(retryPolicy)// 隔离名称.namespace("base").build();// 启动客户端 client.start();
-
-
关键参数说明:
-
retryPolicy
(重试策略):当客户端异常退出或与服务端失去连接时,用于设置客户端重新连接 ZooKeeper 服务端的策略。Curator 提供了多种实现方式:ExponentialBackoffRetry
:重试一组次数,且重试之间的睡眠时间增加;RetryNTimes
:设置最大重试次数;RetryOneTime
:只重试一次;RetryUntilElapsed
:在给定的时间结束之前重试;- 在 Curator 内部,可通过判断服务器返回的
KeeperException
状态代码来决定是否进行重试处理,如返回OK
表示操作无问题,SYSTEMERROR
表示系统或服务端错误;
-
connectionString
:服务器地址列表,可以是单个地址,也可以是多个地址,多个地址用逗号分隔,格式如host1:port1, host2:port2, host3:port3
; -
超时时间:
sessionTimeoutMs
(会话超时时间):作用于服务端,设置该会话在 ZooKeeper 服务端的失效时间;connectionTimeoutMs
(客户端创建会话的超时时间):作用于客户端,限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。
-
1.2.4 创建节点
-
在 ZooKeeper 中,创建一个节点需要包含节点类型(临时节点或持久节点)、节点的数据信息、节点是否为有序节点等属性和性质;
@Test public void testCreate() throws Exception {// 使用 curatorFramework 的 create() 方法创建节点,通过 forPath("/curator-node") 指定节点路径为 /curator-node、// 由于没有指定节点类型,默认创建的是持久节点,且没有设置节点数据(使用默认的空数据)String path = curatorFramework.create().forPath("/curator-node");// withMode(CreateMode.PERSISTENT) 明确指定节点类型为持久节点// CreateMode 还有其他类型,如临时节点 EPHEMERAL、持久有序节点 PERSISTENT_SEQUENTIAL、临时有序节点 EPHEMERAL_SEQUENTIAL 等// 指定节点路径为 /curator-node,并将字符串 "some-data" 转为字节数组作为节点数据curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes());log.info("curator create node :{} successfully.", path); }
-
在 Curator 中:
- 使用
create
函数创建数据节点; - 通过
withMode
函数指定节点类型(包括持久化节点、临时节点、顺序节点、临时顺序节点、持久化顺序节点等),默认是持久化节点; - 之后调用
forPath
函数来指定节点的路径和数据信息。
- 使用
1.2.5 一次性创建带层级结构的节点
@Test
public void testCreateWithParent() throws Exception {// 定义节点路径,这是一个带有父节点 node-parent 和子节点 sub-node-1 的层级路径String pathWithParent="/node-parent/sub-node-1";// creatingParentsIfNeeded() 方法的作用是:如果待创建节点的父节点不存在,会自动创建所有需要的父节点,从而一次性创建出带层级结构的节点String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.",path);
}
1.2.6 获取数据
@Test
public void testGetData() throws Exception {// 从路径为 "/curator-node" 的节点获取数据,返回的是字节数组 bytesbyte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.",new String(bytes));
}
1.2.7 更新节点
@Test
public void testSetData() throws Exception {// 将路径为 "/curator-node" 的节点数据更新为 "changed!"(转为字节数组)curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());// 获取更新后的数byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.",new String(bytes));
}
1.2.8 删除节点
@Test
public void testDelete() throws Exception {// 定义要删除的节点路径String pathWithParent="/node-parent";// 删除节点// guaranteed():起到保障删除成功的作用,只要客户端会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除// deletingChildrenIfNeeded():指定后,系统在删除该数据节点时会以递归的方式删除其所有子节点,以及子节点的子节点等。curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
1.2.9 异步接口
-
Curator 引入了
BackgroundCallback
接口:- 用于处理服务器端返回的信息,处理过程在异步线程中调用,默认在
EventThread
中调用,也可自定义线程池; BackgroundCallback
接口的processResult
方法会在异步后台操作完成时被调用,参数包括客户端client
和操作结果详情event
;
public interface BackgroundCallback {/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception; }
- 用于处理服务器端返回的信息,处理过程在异步线程中调用,默认在
-
inBackground()
异步处理默认在EventThread
中执行,例:@Test public void test() throws Exception {// inBackground 表示采用异步方式处理,当获取数据的操作在服务端完成后,会在异步线程中执行传入的 Lambda 表达式,打印相关信息curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}).forPath(ZK_NODE);// 让线程休眠很长时间,以便观察异步操作的执行TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }
-
也可以指定线程池执行,例:
@Test public void test() throws Exception {// 创建一个单线程的线程池 executorServiceExecutorService executorService = Executors.newSingleThreadExecutor();// 传入自定义的线程池 executorService,表示异步处理将在该自定义线程池中执行,而不是默认的 EventThreadcuratorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);}, executorService).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }
1.2.10 Curator 监听器
1.2.10.1 CuratorListener
接口
-
CuratorListener
接口用于接收关于错误和后台事件的通知,其eventReceived
方法会在后台任务完成或监听器(watch)触发时被调用,参数包括:CuratorFramework
客户端实例和CuratorEvent
事件对象,可在该方法中处理相应的事件;- 使用此监听器后,调用
inBackground
方法能异步获得监听通知(针对 background 通知和错误通知);
/*** Receives notifications about errors and background events*/ public interface CuratorListener {/*** Called when a background task has completed or a watch has triggered** @param client client* @param event the event* @throws Exception any errors*/public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception; }
1.2.10.2 Curator Caches
- Curator 引入 Cache 来实现对 ZooKeeper 服务端事件的监听,Cache 事件监听可理解为本地缓存视图与远程 ZooKeeper 视图的对比过程,且提供了反复注册的功能。Cache 分为节点监听和子节点监听两类注册类型。
node cache(NodeCache
)
-
作用:对某一个节点进行监听;
-
构造:
// client:CuratorFramework 客户端 // path:要缓存的节点路径 public NodeCache(CuratorFramework client, String path)
-
事件处理:
- 可通过注册监听器(调用
addListener(NodeCacheListener listener)
方法)来实现对当前节点数据变化的处理; - 当节点数据变化时,
NodeCacheListener
的nodeChanged
方法会被调用;
public void addListener(NodeCacheListener listener)
- 可通过注册监听器(调用
-
例:
@Slf4j public class NodeCacheTest extends AbstractCuratorTest{// 定义要监听的节点路径常量public static final String NODE_CACHE = "/node-cache";@Testpublic void testNodeCacheTest() throws Exception {// 调用工具方法,确保监听的节点存在,如果不存在则创建createIfNeed(NODE_CACHE);// 创建NodeCache实例,参数为Curator客户端和要监听的节点路径// NodeCache用于监听指定节点的数据变化和节点的创建/删除NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);// 为NodeCache添加监听器,当节点发生变化时触发回调nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {// 节点变化时输出日志,提示哪个节点发生了变化log.info("{} path nodeChanged: ", NODE_CACHE);// 调用自定义方法,打印节点的最新数据printNodeData();}});// 启动NodeCache监听// 启动后会立即缓存当前节点的数据,并开始监听节点的变化nodeCache.start();}// 自定义方法,用于获取并打印指定节点的数据public void printNodeData() throws Exception {// 通过Curator客户端获取节点数据,返回字节数组byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);// 将字节数组转为字符串并输出日志log.info("data: {}",new String(bytes));} }
path cache(PathChildrenCache
)
-
作用:对子节点进行监听,但不会对二级子节点进行监听;
-
构造:
public PathChildrenCache(CuratorFramework client, // 客户端String path, // 要监听的路径boolean cacheData // 是否缓存节点内容(cacheData 为 true 时,除了节点状态,还会缓存节点内容))
-
事件处理:
- 通过注册
PathChildrenCacheListener
监听器(调用addListener(PathChildrenCacheListener listener)
方法)来实现对当前节点的子节点数据变化的处理; - 子节点变化时,
PathChildrenCacheListener
的childEvent
方法会被调用;
public void addListener(PathChildrenCacheListener listener)
- 通过注册
-
例:
@Slf4j public class PathCacheTest extends AbstractCuratorTest{public static final String PATH = "/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);// 创建 PathChildrenCache 实例PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);// 添加监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {log.info("event: {}",event);}});// 启动监听(true 表示首次启动时缓存节点内容到 Cache 中)pathChildrenCache.start(true);} }
tree cache(TreeCache
)
-
作用:
TreeCache
使用内部类TreeNode
来维护树结构,并与 ZK 节点进行映射,所以可以监听当前节点下所有节点的事件(包括子节点及递归子节点); -
构造:
public TreeCache(CuratorFramework client, // 客户端String path, // 要监听的路径boolean cacheData // 是否缓存节点内容)
-
事件处理:
- 通过注册
TreeCacheListener
监听器(调用addListener(TreeCacheListener listener)
方法)来实现对当前节点的子节点及递归子节点数据变化的处理; - 节点变化时,
TreeCacheListener
的childEvent
方法会被调用;
public void addListener(TreeCacheListener listener)
- 通过注册
-
例:
@Slf4j public class TreeCacheTest extends AbstractCuratorTest{public static final String TREE_CACHE = "/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);// 创建 TreeCache 实例TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);// 添加监听器treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {log.info(" tree cache: {}",event);}});// 启动监听treeCache.start();} }
2 ZooKeeper 在分布式命名服务中的实战
- 命名服务的作用是为系统中的资源提供标识能力;
- ZooKeeper 的命名服务主要利用其节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名;
- 典型的应用场景有:分布式 API 目录、分布式节点命名、分布式 ID 生成器。
2.1 分布式 API 目录
-
功能:为分布式系统中各种 API 接口服务的名称、链接地址,提供类似 JNDI(Java 命名和目录接口)中的文件系统的功能。借助 ZooKeeper 的树形分层结构,就能提供分布式的 API 调用功能;
-
著名的 Dubbo 分布式框架就用了 ZooKeeper 的分布式 JNDI 功能,在 Dubbo 中,使用 ZooKeeper 维护全局服务接口 API 的地址列表。大致思路为:
- 服务提供者(Service Provider)在启动时,向 ZooKeeper 上的指定节点
/dubbo/${serviceName}/providers
写入自己的 API 地址,这一操作相当于服务的公开; - 服务消费者(Consumer)启动时,订阅节点
/dubbo/${serviceName}/providers
下的服务提供者的 URL 地址,从而获得所有服务提供者的 API;
- 服务提供者(Service Provider)在启动时,向 ZooKeeper 上的指定节点
-
下图展示了 ZooKeeper 的节点结构:
dubbo
为根节点下的一级节点,其下有服务相关节点(如com.foo.BarService
),服务节点下又有providers
(服务提供者注册地址节点)和consumers
(服务消费者相关节点)等;- 服务提供者向
providers
节点注册自己的 URL 地址(如10.20.153.10:20880
等); - 服务消费者订阅
providers
节点下的地址; - 监控(Monitor)也会订阅相关节点以获取信息。
2.2 分布式节点的命名
-
一个分布式系统通常由很多节点组成,节点的数量并非固定,而是不断动态变化的。比如业务不断膨胀和流量高峰来临时,大量节点可能动态加入集群;当流量高峰过去,又需要下线大量节点。此外,由于机器或网络原因,一些节点会主动离开集群;
-
如果节点数据量大或者变动频繁,通过配置文件手动为每个节点命名是不现实的,这就需要用到分布式节点的命名服务。生成集群节点编号的方案:
-
方案一:使用数据库的自增 ID 特性,用数据表存储机器的 MAC 地址或者 IP 来维护;
-
方案二:使用 ZooKeeper 持久顺序节点的顺序特性来维护节点的 NodeId 编号;
-
-
基于 ZooKeeper 的集群节点命名服务(方案二)流程:
-
启动节点服务,连接 ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点;
-
在根节点下创建一个临时顺序 Node 节点,取回 ZNode 的编号并把它作为分布式系统中节点的 NODEID;
-
如果临时节点太多,可以根据需要删除临时顺序 ZNode 节点。
-
2.3 分布式 ID 生成器
2.3.1 简介
-
在分布式系统中,分布式 ID 生成器应用场景众多:
-
大量的数据记录,需要分布式 ID 来唯一标识每条记录;
-
大量的系统消息,需要分布式 ID 区分不同消息;
-
大量的请求日志,如 restful 的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析;
-
分布式节点的命名服务,往往也需要分布式 ID 来标识不同节点等;
-
-
传统的数据库自增主键已不能满足分布式系统需求,分布式环境下的唯一 ID 系统需满足:
-
全局唯一:不能出现重复 ID,确保在整个分布式系统中每个 ID 都是独一无二的;
-
高可用:ID 生成系统是基础系统,被许多关键系统调用,一旦宕机,会对依赖它的系统造成严重影响,所以必须保证高可用性;
-
-
分布式 ID 生成器的方案,大致有以下几种:
-
Java 的 UUID:利用 Java 自身提供的 UUID 生成机制,生成全局唯一的标识符;
-
分布式缓存 Redis 生成 ID:利用 Redis 的原子操作
INCR
和INCRBY
,通过原子性的递增操作来生成全局唯一的 ID; -
Twitter 的 SnowFlake 算法:一种知名的分布式 ID 生成算法,能生成有序的、全局唯一的 ID;
-
ZooKeeper 生成 ID:利用 ZooKeeper 的顺序节点特性,生成全局唯一的 ID;
-
MongoDb 的 ObjectId:MongoDB 是分布式的非结构化 NoSQL 数据库,每插入一条记录会自动生成全局唯一的“_id”字段值,它是一个 12 字节的字符串,可作为分布式系统中全局唯一的 ID。
-
2.3.2 基于 ZooKeeper 实现分布式 ID 生成器
-
原理:
- 在 ZooKeeper 节点的四种类型中,
PERSISTENT_SEQUENTIAL
(持久化顺序节点)和EPHEMERAL_SEQUENTIAL
(临时顺序节点)具备自动编号的能力; - ZooKeeper 的每一个节点都会为它的第一级子节点维护一份顺序编号,记录每个子节点创建的先后顺序,且这个顺序编号是分布式同步的,也是全局唯一的;
- 所以可以通过创建 ZooKeeper 的临时顺序节点的方法,生成全局唯一的 ID;
@Slf4j public class IDMaker extends CuratorBaseOperations {// 创建临时顺序节点private String createSeqNode(String pathPefix) throws Exception {// 获取 CuratorFramework 实例(curatorFramework)CuratorFramework curatorFramework = getCuratorFramework();// 调用其 create() 方法进行节点创建String destPath = curatorFramework.create().creatingParentsIfNeeded() // 表示如果父节点不存在,会自动创建所需的父节点.withMode(CreateMode.EPHEMERAL_SEQUENTIAL) // 指定创建的是临时顺序节点.forPath(pathPefix); // 指定节点的路径前缀// 返回创建的节点路径 destPathreturn destPath;}// 生成分布式IDpublic String makeId(String path) throws Exception {// 创建临时顺序节点,得到节点路径 strString str = createSeqNode(path);// 如果 str 不为 null,则通过字符串操作获取末尾的序号if(null != str){// 找到 path 在 str 中最后一次出现的索引 indexint index = str.lastIndexOf(path);// 如果 index 大于等于 0if(index >= 0){// 从 index + path.length() 的位置开始截取子字符串,得到的就是生成的全局唯一 ID,若截取位置超出字符串长度则返回空字符串index += path.length();return index <= str.length() ? str.substring(index) : "";}}// 如果 index 小于 0,则直接返回 strreturn str;} }
- 在 ZooKeeper 节点的四种类型中,
-
测试:
@Test public void testMarkId() throws Exception {// 创建 IDMaker 实例 idMaker,并调用 init() 方法进行初始化IDMaker idMaker = new IDMaker();idMaker.init();// 定义节点路径前缀 pathPrefix 为 "/idmarker/id-"String pathPrefix = "/idmarker/id-";// 通过两层循环创建多个线程(外层循环创建 5 个线程,内层循环每个线程执行 10 次 ID 生成操作)for(int i = 0; i < 5; i++){// 每个线程中,调用 idMaker.makeId(pathPrefix) 生成 ID,并记录线程名、操作次数和生成的 ID 到日志中new Thread(() -> {for (int j = 0; j < 10; j++){String id = null;try {id = idMaker.makeId(pathPrefix);log.info("{}线程第{}个创建的id为{}", Thread.currentThread().getName(), j, id);} catch (Exception e) {e.printStackTrace();}}},"thread"+i).start();}// 让当前线程休眠很长时间,以便观察多线程环境下 ID 生成的情况Thread.sleep(Integer.MAX_VALUE); }
- 从测试结果可以看到:
- 不同线程生成的 ID 是全局唯一的(如
thread4
线程第 9 个创建的 id 为00000000154
,thread1
线程第 9 个创建的 id 为00000000155
等); - 验证了基于 ZooKeeper 临时顺序节点生成分布式 ID 的全局唯一性。
- 不同线程生成的 ID 是全局唯一的(如
- 从测试结果可以看到:
2.3.3 基于 ZooKeeper 实现 SnowFlake ID 算法
2.3.3.1 简介
-
Twitter 的 SnowFlake 算法是著名的分布式服务器用户 ID 生成算法,生成的 ID 是 64bit 的长整型数字,分为四个部分:
-
第一位:占用 1bit,值始终为 0,无实际作用;
-
时间戳:占用 41bit,精确到毫秒,总共可容纳约 69 年的时间;
-
工作机器 ID:占用 10bit,最多可容纳 1024 个节点;
-
序列号:占用 12bit,在同一毫秒同一节点上从 0 开始不断累加,最多可累加至 4095;
-
在工作节点达 1024 个的场景下,同一毫秒最多可生成 (1024 \times 4096 = 4194304) 个 ID,能满足绝大多数并发场景需求;
-
2.3.3.2 SnowFlake 算法优缺点
- 优点:
- 生成 ID 不依赖数据库,完全在内存生成,具有高性能和高可用性;
- 容量大,每秒可生成几百万个 ID;
- ID 呈趋势递增,后续插入数据库索引树时性能较高;
- 缺点:
- 依赖系统时钟一致性,若某台机器系统时钟回拨,可能造成 ID 冲突或乱序;
- 若机器系统时间在启动前回拨过,可能出现 ID 重复的危险。
2.3.3.3 实现
public class SnowflakeIdGenerator {// 类采用单例模式,instance 是静态的 SnowflakeIdGenerator 实例,通过下面定义的私有构造方法 SnowflakeIdGenerator() 保证单例public static SnowflakeIdGenerator instance = new SnowflakeIdGenerator();// init 方法用于初始化单例,接收 workerId(节点 ID,最大为 8091)public synchronized void init(long workerId) {// 若 workerId 超过 MAX_WORKER_ID,会抛出 IllegalArgumentException 异常if (workerId > MAX_WORKER_ID) {throw new IllegalArgumentException("woker Id wrong: " + workerId);}// 否则将 workerId 赋值给实例的 workerId 属性instance.workerId = workerId;}private SnowflakeIdGenerator() {}// 开始使用该算法的时间(2017-01-01 00:00:00),用于计算相对时间戳private static final long START_TIME = 1483200000000L;// 工作机器 ID 的位数,最多支持 8192 个节点(2的13次方)private static final int WORKER_ID_BITS = 13;// 序列号的位数,支持单节点最高每毫秒生成 1024 个 ID(2的10次方)private final static int SEQUENCE_BITS = 10;// 最大工作机器 ID(8091),通过位运算 ~(-1L << WORKER_ID_BITS) 计算得到private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);// 最大序列号(1023),通过位运算 ~(-1L << SEQUENCE_BITS) 计算得到private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);// 工作机器编号的移位位数,值为 SEQUENCE_BITSprivate final static long WORKER_ID_SHIFT = SEQUENCE_BITS;// 时间戳的移位位数,值为 WORKER_ID_BITS + SEQUENCE_BITSprivate final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;// 当前项目的工作机器 IDprivate long workerId;// 上次生成 ID 的时间戳,初始为 -1Lprivate long lastTimestamp = -1L;// 当前毫秒生成的序列号,初始为 0Lprivate long sequence = 0L;// 调用 generateId 方法生成下一个 IDpublic Long nextId() {return generateId();}// 生成唯一 ID(synchronized 保证线程安全)private synchronized long generateId() {// 获取当前时间戳long current = System.currentTimeMillis();// 若当前时间小于上一次ID生成的时间戳,说明系统时钟回退,返回 -1 表示出现问题if (current < lastTimestamp) {return -1;}// 若当前生成id的时间还是上次的时间,对 sequence 加 1 并与 MAX_SEQUENCE 进行与运算(防止溢出)if (current == lastTimestamp) {sequence = (sequence + 1) & MAX_SEQUENCE;// 若当前毫秒生成的序列数已经大于最大值,则调用 nextMs 方法阻塞到下一个毫秒,重新获取时间戳if (sequence == MAX_SEQUENCE) {current = this.nextMs(lastTimestamp);}} else {// 若 current > lastTimestamp,说明到了新的毫秒,将 sequence 重置为 0Lsequence = 0L;}// 更新上次生成id的时间戳为当前时间戳lastTimestamp = current;// 通过移位操作生成 64 位唯一 ID// 计算相对时间戳:时间戳右移动23位long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;// 计算工作机器 ID 移位后的值:workerId 右移动10位long workerId = this.workerId << WORKER_ID_SHIFT;// 将相对时间戳、工作机器 ID 移位后的值和序列号进行按位或运算,得到最终的 ID 并返回return time | workerId | sequence;}// 用于阻塞到下一个毫秒,不断获取当前时间戳,直到大于传入的 timeStamp,保证时间戳的递增性private long nextMs(long timeStamp) {long current = System.currentTimeMillis();while (current <= timeStamp) {current = System.currentTimeMillis();}return current;}
}
3 ZooKeeper 实现分布式队列
- 常见的消息队列有 RabbitMQ、RocketMQ、Kafka 等;
- ZooKeeper 作为分布式小文件管理系统,能实现简单队列功能。但它不适合大数据量存储,官方不推荐作为队列使用,不过由于实现简单、集群搭建便利,在一些吞吐量不高的小型系统中仍比较好用。
3.1 设计思路
- 创建队列根节点:在 ZooKeeper 中创建一个持久节点,作为队列的根节点,所有队列元素的节点都放在该根节点下;
- 实现入队操作:当要添加元素到队列时,在队列根节点下创建一个临时有序节点,节点数据可包含队列元素信息;
- 实现出队操作:
- 获取根节点下的所有子节点;
- 找到具有最小序号的子节点;
- 获取该节点的数据;
- 删除该节点;
- 返回节点的数据;
-
代码示例:
/*** 入队* @param data* @throws Exception*/ public void enqueue(String data) throws Exception {// 调用 ZooKeeper 客户端(zk)的 create 方法,在队列根节点(QUEUE_ROOT)下创建临时有序子节点// 节点路径为 QUEUE_ROOT + "/queue-"// 数据为 data 转成的 UTF - 8 字节数组// 访问控制列表为 ZooDefs.Ids.OPEN_ACL_UNSAFE(完全开放访问)// 创建模式为 CreateMode.EPHEMERAL_SEQUENTIAL(临时有序节点)zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); }/*** 出队* @return* @throws Exception*/ public String dequeue() throws Exception {// 采用无限循环尝试出队while (true) {// 先获取队列根节点(QUEUE_ROOT)下的所有子节点列表 childrenList<String> children = zk.getChildren(QUEUE_ROOT, false);// 若子节点列表为空,返回 null,表示队列为空if (children.isEmpty()) {return null;}// 对 children 进行排序,以便找到序号最小的子节点(因为是有序节点,排序后可得到入队顺序的节点)Collections.sort(children);// 遍历排序后的子节点for (String child : children) {// 构造子节点路径 childPathString childPath = QUEUE_ROOT + "/" + child;try {// 获取该子节点的数据(zk.getData)byte[] data = zk.getData(childPath, false, null);// 删除该子节点(zk.delete,版本号为 -1 表示无条件删除)zk.delete(childPath, -1);// 将数据转成 UTF - 8 字符串返回return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 若捕获到 KeeperException.NoNodeException 异常,说明该节点已被其他消费者删除,继续尝试下一个节点}}} }
3.2 使用 Apache Curator 实现分布式队列
-
Apache Curator 是对 ZooKeeper 客户端的封装库,提供了诸多高级功能,其中就包括分布式队列。通过 Curator 可以更便捷地实现 ZooKeeper 相关的分布式功能,简化了开发流程;
public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {// 创建客户端,连接地址为 localhost:2181,重试策略采用 ExponentialBackoffRetry,初始休眠时间为 1000 毫秒,最大重试次数为 3 次CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));// 启动客户端client.start();// 实现 QueueSerializer<String> 接口QueueSerializer<String> serializer = new QueueSerializer<String>() {// serialize 方法将字符串转为字节数组@Overridepublic byte[] serialize(String item) {return item.getBytes();}// deserialize 方法将字节数组转回字符串@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}// 以上两个方法用于队列中数据的序列化和反序列化操作,以便在 ZooKeeper 节点中存储和读取数据};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {// consumeMessage 方法用于处理消费到的消息@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}// stateChanged 方法用于处理连接状态变化@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列// 传入 Curator 客户端 client、队列消费者 consumer、序列化器 serializer 以及队列根节点路径 QUEUE_ROOTDistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue(); // 调用 buildQueue 方法构建分布式队列 DistributedQueue<String>queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}// 主线程休眠 10000 毫秒,给队列足够的时间处理消息Thread.sleep(10000);// 关闭队列queue.close();// 关闭 Curator 客户端,释放资源client.close();} }
3.3 注意事项
-
使用 Curator 的
DistributedQueue
时,默认情况下不使用锁。只有当调用QueueBuilder
的lockPath()
方法并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,队列操作可能会受到并发问题的影响; -
在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。在分布式环境中,多个消费者可能同时尝试消费队列中的消息,如果不使用锁来同步这些操作,可能会导致消息被多次处理或者处理顺序出现混乱;
-
并非所有场景都需要指定锁节点路径:
-
如果应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性能,因为不再需要等待获取锁;
-
如果需要保证队列操作的原子性和顺序性(例如避免消息重复处理、确保处理顺序正确等场景),则需要指定锁节点路径来启用锁;
-
-
代码示例:
// 构建队列构建器,传入 Curator 客户端 client、队列消费者 consumer、序列化器 serializer 以及队列根节点路径 "/order" QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer, "/order"); // 指定锁节点路径为 "/orderlock",用于实现分布式锁,以保证队列操作的原子性和顺序性,之后调用 buildQueue() 构建队列 queue = builder.lockPath("/orderlock").buildQueue(); // 启动队列,此时队列开始监听 ZooKeeper 中 /order 节点下的消息 queue.start();