Zookeeper分布式锁实现
zookeeper最初设计的初衷就是为了保证分布式系统的一致性。本文将讲解如何利用zookeeper的临时顺序结点,实现分布式锁。
目录
1. 理论分析
1.1 结点类型
1.2 监听器
1.3 实现原理
2. 手写实现简易zookeeper分布式锁
1.1 依赖
1.2 常量定义
1.3 实现zookeeper分布式锁
1.4 使用方式
3. 引入Curator框架实现zookeeper分布式锁
2.1 框架依赖
2.2 使用方式
1. 理论分析
zookeeper 和Linux一样,采用目录树的方式管理结点,目录层级间以 / 区分
每个数据节点在 ZooKeeper 中被称为 znode,它是 ZooKeeper 中数据的最小单元。由于ZooKeeper 主要用于协调服务,出于性能和一致性考虑,每个节点的存放数据上限为1M
1.1 结点类型
znode有四种类型:
1.持久化结点 (PERSISTENT): 创建节点后一直存在
2. 持久化有序结点(PERSISTENT_SEQUENTIAL):在持久化结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号
3. 临时结点(EPHEMERAL):在zookeeper与客户端失去连接后自动删除
4. 临时有序结点(EPHEMERAL_SEQUENTIAL):在临时结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号
1.2 监听器
Watcher 监听机制是 Zookeeper 中非常重要的特性。结点可以绑定监听事件,当监听事件发生的时候,Zookeeper会向客户端发送通知事件,执行监听器的回调方法。
1.3 实现原理
我们首先新建一个"/locks"的持久化结点,用来管理表示锁的子节点。(实际场景使用可以根据不同锁对象划分成更细致的持久化结点,比如"/locks/bilibili/comment/publish")
当用户尝试获取锁的时候,在"locks"结点下新建一个临时有序结点,例如"seq-00001"
新建结点成功后,系统进行检查,建立的结点是否是当前所有子节点中序号最小的一个
如果是最小的一个,说明用户是当前锁的持有者,往下执行业务逻辑,执行完成后摧毁临时结点
如果不是最小的一个,为了避免不断地自旋检查空耗性能,一般采用注册监听器的方式减少性能消耗:监听前一个结点的摧毁事件。如果用户持有的结点前面还有其他结点,说明用户不是持有的人,不能执行业务逻辑,应当阻塞等待;直到用户前一个结点被摧毁,说明轮到用户持有锁了,可以继续往下执行业务逻辑。
2. 手写实现简易zookeeper分布式锁
1.1 依赖
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<!--日志-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
1.2 常量定义
public interface ZkConstants {
//连接地址
String connectString = "127.0.0.1:2181";
// 连接超时时间
int sessionTimeout = 2000;
}
1.3 实现zookeeper分布式锁
public class DistributedLock {
// zk客户端连接
private ZooKeeper zkClient;
// 连接成功等待
private CountDownLatch connectLatch = new CountDownLatch(1);
// 前一个结点(锁)
private String waitPath;
// 结点删除等待
private CountDownLatch waitLatch = new CountDownLatch(1);
// 当前创建的结点(锁)
private String createNode;
/**
* 构造方法:初始化客户端连接
*
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
public DistributedLock() throws IOException, InterruptedException, KeeperException {
//获取连接
zkClient = new ZooKeeper(ZkConstants.connectString, ZkConstants.sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//连接成功,释放countDownLatch
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
//前一个结点删除
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
//解锁下一个结点
waitLatch.countDown();
}
}
});
//等待zk正常连接后,再往下执行
connectLatch.await();
//判断根节点/locks是否存在
Stat exists = zkClient.exists("/locks", false);
if (exists == null) {
//创建根节点 -- 持久结点
zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 加锁
*
* @throws InterruptedException
* @throws KeeperException
*/
public void zkLock() throws InterruptedException, KeeperException {
//创建对应的临时带序号结点
createNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断创建节点是否是序号最小的结点
List<String> children = zkClient.getChildren("/locks", false);
if (children.size() == 1) {
return;
} else {
//排序结点以得到当前创建结点的序号(等待锁的序位)
Collections.sort(children);
//获取生成的临时结点序号
String thisNode = createNode.substring("/locks/".length());
//获得排序
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
//最小序号结点,直接获取锁
return;
} else {
//监听序号前一个结点
waitPath = "/locks/" + children.get(index - 1);
//true代表使用创建zkClient时初始化的监听器
zkClient.getData(waitPath, true, null);
waitLatch.await();
}
}
}
/**
* 解锁
*
* @throws InterruptedException
* @throws KeeperException
*/
public void zkUnLock() throws InterruptedException, KeeperException {
//删除临时带序号结点
zkClient.delete(createNode, -1);
}
}
1.4 使用方式
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
DistributedLock lock1 = new DistributedLock();
DistributedLock lock2 = new DistributedLock();
//多线程获取锁1
CompletableFuture.supplyAsync(() -> {
try {
lock1.zkLock();
System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......");
Thread.sleep(5000);
lock1.zkUnLock();
System.out.println("线程" + Thread.currentThread().getName() + "释放锁......");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
return true;
}, executorService);
//多线程获取锁2
CompletableFuture.supplyAsync(() -> {
try {
lock2.zkLock();
System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......");
Thread.sleep(5000);
lock2.zkUnLock();
System.out.println("线程" + Thread.currentThread().getName() + "释放锁......");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
return true;
}, executorService);
executorService.shutdown();
}
}
3. 引入Curator框架实现zookeeper分布式锁
实际生产环境下,自然不可能手写这么多代码处理分布式锁,且不提很多地方的代码可复用,CountDownLatch反复处理带来的代码复杂性高,并且一些可重入锁、异常处理等逻辑上文也并没有完善。
生产场景中被广泛使用的zookeeper分布式锁的框架便是Curator
2.1 框架依赖
/..
省略
../
<!--Curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
2.2 使用方式
public class CuratorLockTest {
public static void main(String[] args) {
//创建分布式锁1
InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
try {
lock1.acquire();
System.out.println("线程1获取到锁");
//curator支持可重入锁
lock1.acquire();
System.out.println("线程1 再次获取到锁");
Thread.sleep(5000);
lock1.release();
System.out.println("线程1 释放锁");
lock1.release();
System.out.println("线程1 再次释放锁");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
executorService.execute(() -> {
try {
lock2.acquire();
System.out.println("线程2获取到锁");
lock2.acquire();
System.out.println("线程2 再次获取到锁");
Thread.sleep(5000);
lock2.release();
System.out.println("线程2 释放锁");
lock2.release();
System.out.println("线程2 再次释放锁");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
executorService.shutdown();
}
public static CuratorFramework getCuratorFramework() {
//4秒超时,重试3次
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(4000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(ZkConstants.connectString)
.connectionTimeoutMs(ZkConstants.sessionTimeout)
.sessionTimeoutMs(ZkConstants.sessionTimeout)
.retryPolicy(exponentialBackoffRetry)
.build();
client.start();
System.out.println("zookeeper 启动成功...");
return client;
}
}
希望能对大家理解zookeeper分布式锁有所帮助