当前位置: 首页 > news >正文

Zookeeper实现分布式锁

本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。

文章目录

  • 概述
  • 一、基于Zookeeper临时节点的分布式锁
  • 二、基于Zookeeper临时有序节点的分布式锁
  • 三、Curator 分布式锁


概述

  实现分布式锁的思路有很多,关键点在于利用一个第三方组件,对多服务实例进行管理。常见的实现分布式锁的思路:

  • 使用数据库实现,利用表的唯一索引的约束,在数据库中建一张表,给某个字段加上唯一索引,在执行业务代码之前,首先往该表中插入一条记录,分为以下两种情况。
    • 如果插入成功,则继续执行业务,业务执行完成后,删除该条记录。
    • 如果插入失败,则会抛出数据库唯一索引的异常,则捕获异常,进行等待重试。
  • 使用Redis的set nx ex实现,如果自己去实现,会有非常多的问题。
  • 使用redisson或zookeeper等成熟的解决方案实现,无论是使用数据库还是redis,自己实现的分布式锁都是有非常多问题的,例如死锁,可重入性,续约,解锁判断等。Redisson实现的分布式锁,适用于对于并发要求较高,性能要求较高的场景,但是一致性不能很好的得到保证,例如在主从复制的过程中出现了问题,从节点没有同步到主节点的锁。基于Zookeeper实现的分布式锁,则是更加偏向于对于可靠性和一致性要求较高的场景,但是性能不如Redisson。

  使用Zookeeper实现分布式锁,可以利用临时节点,或临时有序节点,以及使用Curator框架。既然有成熟的解决方案,前两者自己手动实现肯定是不推荐的,但是在这里做一个简单实现,主要是学习思路。

一、基于Zookeeper临时节点的分布式锁

  首先,Zookeeper实现锁,是要用临时节点的,利用临时节点服务器重启,客户端与 Zookeeper 的会话失效则删除的特性,防止死锁的问题。
  临时节点实现分布式锁的思路,某个线程尝试获取锁,创建一个临时节点:
- 该节点不存在,则创建节点,加锁成功,执行业务代码,业务执行完成后删除节点。
- 该节点存在,则加锁失败,进入等待方法,监听节点(回调方法中,监听到节点被删除,解除阻塞),然后阻塞自身。

  首先需要定义一个锁规范的顶级接口。定义了加锁和解锁两个方法。

public interface Lock {/*** 加锁* @return*/void lock() throws InterruptedException, KeeperException;/*** 解锁*/void unlock() throws InterruptedException, KeeperException;
}

  然后定义一个中间抽象层,实现lock方法:

  • 首先尝试获取锁,获取到锁则执行业务代码
  • 否则等待加锁,并且再次重试。
public abstract class AbstractLock implements Lock {@Overridepublic void lock() throws InterruptedException, KeeperException {if (tryLock()) {System.out.println("获取锁");} else {//等待加锁(判断该路径对应的节点在zk中是否存在,并且设置监听器,监听节点删除事件,配合并发工具配合阻塞和解除阻塞)waitForLock();//再次尝试加锁lock();}}/*** 尝试加锁** @return*/public abstract boolean tryLock();/*** 等待锁*/public abstract void waitForLock() throws InterruptedException, KeeperException;
}

  具体的实现类:

public class TemporaryNodeLock extends AbstractLock {/*** 得到zk连接*/private ZooKeeper zooKeeper = null;private final static String CONNECT_STR = "192.168.198.128:2181";private final String LOCK_PATH = "/lock1";private final String LOCK_DATA = "lock1";public TemporaryNodeLock() {CountDownLatch countDownLatch = new CountDownLatch(1);try {zooKeeper = new ZooKeeper(CONNECT_STR, 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected) {countDownLatch.countDown();System.out.println("连接建立成功");}}});countDownLatch.await();} catch (IOException e) {throw new RuntimeException("zk连接初始化错误");} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 尝试加锁** @return*/@Overridepublic boolean tryLock() {try {zooKeeper.create(LOCK_PATH, LOCK_DATA.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e) {return false;}return true;}/*** 等待锁*/@Overridepublic void waitForLock() throws InterruptedException, KeeperException {CountDownLatch countDownLatch = new CountDownLatch(1);//判断节点是否存在,并且绑定监听事件Stat stat = zooKeeper.exists(LOCK_PATH, event -> {// 发生了LOCK_PATH的删除事件if (event.getType() == Watcher.Event.EventType.NodeDeleted && event.getPath().equals(LOCK_PATH)) {countDownLatch.countDown();}});if (stat != null) {countDownLatch.await();}}/*** 解锁*/@Overridepublic void unlock() throws InterruptedException, KeeperException {zooKeeper.delete(LOCK_PATH, -1);}}

二、基于Zookeeper临时有序节点的分布式锁

  按照上述的思路,在不考虑重入,续约等情况下 ,即可实现简单的分布式锁,但是有优化的空间。假设有三十个线程,A线程通过tryLock方法成功执行,获取到了锁,其他二十九个线程,都没有获取到,在waitForLock方法中等待。当A线程执行完业务代码,执行unlock解锁操作时,其他二十九个线程,同时监听到了解锁的事件,都被唤醒,重新执行tryLock方法尝试加锁,但是最后同样只有一个线程能加锁成功,其余的线程依旧要继续等待,唤醒了不必要的节点,这就称之为惊群效应
  应该是需要避免的,否则可能造成高并发场景下服务器瞬时压力过大。那可以换一种思路,使用临时有序节点的特性,使用临时有序节点,在根节点下创建出的节点,是类似于这样的:

/lock/sub0000000001
/lock/sub0000000002

/lock/sub0000000030

  既然同一时间,只有一个线程能获取到锁,大多数的线程都需要阻塞,那么就可以列表中的后一个节点,监听前一个节点。,当前一个节点执行完成业务代码删除节点解锁,后一个节点监听到事件,被唤醒,继续执行。
  实现思路,同样需要定义一个锁规范的顶级接口。定义了加锁和解锁两个方法,但是具体的实现类,可以不需要抽象层,而是直接重写加锁和解锁两个方法即可。

@Slf4j
public class TemporarySequenceNodeLock implements Lock {private final String CONNECT_STR = "192.168.198.128:2181";private String ROOT_LOCK_PATH = "/lock";private String ROOT_LOCK_NAME = "lock";private String CHILDREN_LOCK_PATH = "/sub";private CountDownLatch connectCountDownLatch = new CountDownLatch(1);private CountDownLatch lockCountDownLatch = new CountDownLatch(1);private String waitPath = null;public ZooKeeper zooKeeper = null;public TemporarySequenceNodeLock() {try {zooKeeper = new ZooKeeper(CONNECT_STR, 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {//监听连接事件if (event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected) {connectCountDownLatch.countDown();System.out.println("连接建立成功");}//监听解锁事件if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {lockCountDownLatch.countDown();}}});connectCountDownLatch.await();} catch (IOException e) {throw new RuntimeException("zk连接初始化错误");} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 加锁** @return*/@Overridepublic void lock() throws InterruptedException, KeeperException {//创建根节点(永久节点)try {zooKeeper.create(ROOT_LOCK_PATH, ROOT_LOCK_NAME.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} catch (KeeperException e) {if (e.code() == KeeperException.Code.NODEEXISTS) {log.info("节点已存在,创建失败");}}//在根节点下创建临时有序节点final String path = ROOT_LOCK_PATH + CHILDREN_LOCK_PATH;// /lock/sub0000000005String currentNode = zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);ZkNodeThreadLocal.set(currentNode);//获取该根节点下的所有子节点List<String> subs = zooKeeper.getChildren(ROOT_LOCK_PATH, false);//如果只有一个节点,直接获取到锁if (subs.size() == 1) {log.info("当前集合只有一个节点,获取锁成功:{}", Thread.currentThread().getName());return;}//对节点进行排序//sub0000000000//sub0000000001Collections.sort(subs);//获取当前节点在集合中的位置int lastIndexOf = currentNode.lastIndexOf("/");String subNode = currentNode.substring(lastIndexOf + 1);int index = subs.indexOf(subNode);if (index == -1) {throw new RuntimeException("index error!");}//如果当前元素的索引是集合中的第一个元素,则直接加锁成功if (index == 0) {log.info("当前节点是集合中的头部元素,获取锁成功:{}", Thread.currentThread().getName());return;}//否则等待waitPath = ROOT_LOCK_PATH + "/" + subs.get(index - 1);zooKeeper.getData(waitPath, true, new Stat());lockCountDownLatch.await();}/*** 解锁*/@Overridepublic void unlock() throws InterruptedException, KeeperException {String currentNode = ZkNodeThreadLocal.get();zooKeeper.delete(currentNode, -1);log.info("{}删除节点成功", Thread.currentThread().getName());}
}

三、Curator 分布式锁

  上面两种方案,仅仅是提供了一种实现分布式锁的思路,如果要深究是存在很多问题的,也不能直接在生产环境使用:

  • 每个Lock中,都持有一个自己的Zookeeper连接,而连接是有上限的。
  • 不支持锁重入
  • 无法处理续约
  • 未处理锁误删

  而Curator是一个成熟的解决方案,底层依旧使用的是有序临时节点的思想,但是支持锁重入,以及读写锁等模式的实现。

public class Test implements Runnable {private final static String CONNECT_STR = "192.168.198.128:2181";private static final CuratorFramework CLIENT = CuratorFrameworkFactory.builder().connectString(CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(100, 1)).build();private OrderCodeGenerator orderCodeGenerator = new OrderCodeGenerator();//可重入互斥锁final InterProcessMutex lock = new InterProcessMutex(CLIENT, "/curator_lock");public static void main(String[] args) {CLIENT.start();for (int i = 0; i < 30; i++) {new Thread(new Test()).start();}}/*** When an object implementing interface {@code Runnable} is used* to create a thread, starting the thread causes the object's* {@code run} method to be called in that separately executing* thread.* <p>* The general contract of the method {@code run} is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {// 加锁lock.acquire();String orderCode = orderCodeGenerator.getOrderCode();System.out.println("生成订单号 " + orderCode);} catch (Exception e) {e.printStackTrace();} finally {try {// 释放锁lock.release();} catch (Exception e) {e.printStackTrace();}}}static class OrderCodeGenerator {private static int count = 0;/*** 生成订单号*/public String getOrderCode(){SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmss");return simpleDateFormat.format(new Date()) + "-" + ++count;}}
}
http://www.dtcms.com/a/585599.html

相关文章:

  • 好看的个人网站设计专做轮胎的网站
  • VGG论文精细解读
  • 抖音自动化-实现给特定用户发私信
  • 安徽省教育基本建设学会网站查看网站被百度收录
  • LeetCode算法学习之旋转数组
  • webrtc降噪-NoiseSuppressor类源码分析与算法原理
  • openEuler容器化实践:从Docker入门到生产部署
  • Spring Security实战代码详解
  • ES6 Promise:告别回调地狱的异步编程革命
  • 企业网站备案教程免费建设网站抽取佣金
  • seo网站诊断流程公司网站建设费用会计处理
  • 与Figma AI对话的对话框在哪里?
  • 【科研绘图系列】R语言绘制微生物箱线图(box plot)
  • 禅城区网站建设管理网站模板下载免费下载
  • 前端微服务化
  • Linux 软件安装 “命令密码本”:yum/apt/brew 一网打尽
  • 做网站框架显示不出来中国最大的软件公司
  • 轻量级云原生体验:在OpenEuler 25.09上快速部署单节点K3s
  • 程序员 给老婆做网站网站建设 海南
  • 解释 StringRedisTemplate 类和对象的作用与关系
  • MATLAB遗传算法优化RVFL神经网络回归预测(随机函数链接神经网络)
  • 建设网站的知识竞赛国家建设网站
  • ROS2 Humble 笔记(七)标准与自定义 Interface 接口
  • 深入探索序列学习:循环神经网络(RNN)及其变体(LSTM、GRU)的详尽解析
  • 永川区做网站临沂网站建设方案报价
  • B哩B哩车机版专为汽车端或大屏设备开发
  • 一种基于视网膜图像的深度学习系统 DeepRETStroke
  • 2025汽车零部件行业数字化转型落地方案
  • 前后端分离和传统非分离(后端渲染)方案的核心差异
  • 【ZeroRange WebRTC】在自有 AWS 环境实现与 Amazon KVS 等效的 WebRTC 安全方案(落地指南)