当前位置: 首页 > news >正文

Zookeeper分布式锁实现

        zookeeper最初设计的初衷就是为了保证分布式系统的一致性。本文将讲解如何利用zookeeper的临时顺序结点,实现分布式锁。

目录

1. 理论分析

        1.1 结点类型

        1.2 监听器

        1.3 实现原理

2. 手写实现简易zookeeper分布式锁

        1.1 依赖

         1.2 常量定义

        1.3 实现zookeeper分布式锁

        1.4 使用方式

3. 引入Curator框架实现zookeeper分布式锁

        2.1 框架依赖

        2.2 使用方式


1. 理论分析

        zookeeper 和Linux一样,采用目录树的方式管理结点,目录层级间以 / 区分

        每个数据节点在 ZooKeeper 中被称为 znode,它是 ZooKeeper 中数据的最小单元。由于ZooKeeper 主要用于协调服务,出于性能和一致性考虑,每个节点的存放数据上限为1M

     

        1.1 结点类型

        znode有四种类型:

        1.持久化结点  (PERSISTENT): 创建节点后一直存在

        2. 持久化有序结点(PERSISTENT_SEQUENTIAL):在持久化结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号

        3. 临时结点(EPHEMERAL):在zookeeper与客户端失去连接后自动删除

        4. 临时有序结点(EPHEMERAL_SEQUENTIAL):在临时结点的基础上,zookeeper会自动根据创建顺序,在结点名称后面加上一串序号

        1.2 监听器

                Watcher 监听机制是 Zookeeper 中非常重要的特性。结点可以绑定监听事件,当监听事件发生的时候,Zookeeper会向客户端发送通知事件,执行监听器的回调方法。

        

        1.3 实现原理

        我们首先新建一个"/locks"的持久化结点,用来管理表示锁的子节点。(实际场景使用可以根据不同锁对象划分成更细致的持久化结点,比如"/locks/bilibili/comment/publish")

        当用户尝试获取锁的时候,在"locks"结点下新建一个临时有序结点,例如"seq-00001"

        新建结点成功后,系统进行检查,建立的结点是否是当前所有子节点中序号最小的一个

        如果是最小的一个,说明用户是当前锁的持有者,往下执行业务逻辑,执行完成后摧毁临时结点

        ​​​​​​​

        如果不是最小的一个,为了避免不断地自旋检查空耗性能,一般采用注册监听器的方式减少性能消耗:监听前一个结点的摧毁事件。如果用户持有的结点前面还有其他结点,说明用户不是持有的人,不能执行业务逻辑,应当阻塞等待;直到用户前一个结点被摧毁,说明轮到用户持有锁了,可以继续往下执行业务逻辑。

        

2. 手写实现简易zookeeper分布式锁

        1.1 依赖

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>

        <!--日志-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.6</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

         1.2 常量定义

public interface ZkConstants {
    //连接地址
    String connectString = "127.0.0.1:2181";
    // 连接超时时间
    int sessionTimeout = 2000;
}

        1.3 实现zookeeper分布式锁

public class DistributedLock {

    // zk客户端连接
    private ZooKeeper zkClient;

    // 连接成功等待
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 前一个结点(锁)
    private String waitPath;
    // 结点删除等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // 当前创建的结点(锁)
    private String createNode;

    /**
     * 构造方法:初始化客户端连接
     *
     * @throws IOException
     * @throws InterruptedException
     * @throws KeeperException
     */
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //获取连接
        zkClient = new ZooKeeper(ZkConstants.connectString, ZkConstants.sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //连接成功,释放countDownLatch
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                //前一个结点删除
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    //解锁下一个结点
                    waitLatch.countDown();
                }
            }
        });
        //等待zk正常连接后,再往下执行
        connectLatch.await();
        //判断根节点/locks是否存在
        Stat exists = zkClient.exists("/locks", false);
        if (exists == null) {
            //创建根节点 -- 持久结点
            zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 加锁
     *
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void zkLock() throws InterruptedException, KeeperException {
        //创建对应的临时带序号结点
        createNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        //判断创建节点是否是序号最小的结点
        List<String> children = zkClient.getChildren("/locks", false);
        if (children.size() == 1) {
            return;
        } else {
            //排序结点以得到当前创建结点的序号(等待锁的序位)
            Collections.sort(children);
            //获取生成的临时结点序号
            String thisNode = createNode.substring("/locks/".length());
            //获得排序
            int index = children.indexOf(thisNode);

            if (index == -1) {
                System.out.println("数据异常");
            } else if (index == 0) {
                //最小序号结点,直接获取锁
                return;
            } else {
                //监听序号前一个结点
                waitPath = "/locks/" + children.get(index - 1);
                //true代表使用创建zkClient时初始化的监听器
                zkClient.getData(waitPath, true, null);
                waitLatch.await();
            }
        }

    }

    /**
     * 解锁
     *
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void zkUnLock() throws InterruptedException, KeeperException {
        //删除临时带序号结点
        zkClient.delete(createNode, -1);
    }

}

        1.4 使用方式

public class DistributedLockTest {


    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();
        //多线程获取锁1
        CompletableFuture.supplyAsync(() -> {
            try {
                lock1.zkLock();
                System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......");
                Thread.sleep(5000);
                lock1.zkUnLock();
                System.out.println("线程" + Thread.currentThread().getName() + "释放锁......");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (KeeperException e) {
                throw new RuntimeException(e);
            }
            return true;
        }, executorService);
        //多线程获取锁2
        CompletableFuture.supplyAsync(() -> {
            try {
                lock2.zkLock();
                System.out.println("线程" + Thread.currentThread().getName() + "获取到锁......");
                Thread.sleep(5000);
                lock2.zkUnLock();
                System.out.println("线程" + Thread.currentThread().getName() + "释放锁......");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (KeeperException e) {
                throw new RuntimeException(e);
            }
            return true;
        }, executorService);

        executorService.shutdown();
    }
}

3. 引入Curator框架实现zookeeper分布式锁

        实际生产环境下,自然不可能手写这么多代码处理分布式锁,且不提很多地方的代码可复用,CountDownLatch反复处理带来的代码复杂性高,并且一些可重入锁、异常处理等逻辑上文也并没有完善。

        生产场景中被广泛使用的zookeeper分布式锁的框架便是Curator

        2.1 框架依赖

        /..
           省略
        ../
        <!--Curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>

        2.2 使用方式

public class CuratorLockTest {

    public static void main(String[] args) {
        //创建分布式锁1
        InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
        InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.execute(() -> {
            try {
                lock1.acquire();
                System.out.println("线程1获取到锁");

                //curator支持可重入锁
                lock1.acquire();
                System.out.println("线程1 再次获取到锁");

                Thread.sleep(5000);

                lock1.release();
                System.out.println("线程1 释放锁");

                lock1.release();
                System.out.println("线程1 再次释放锁");

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });


        executorService.execute(() -> {
                try {
                    lock2.acquire();
                    System.out.println("线程2获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2 再次释放锁");

                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        );
        executorService.shutdown();
    }

    public static CuratorFramework getCuratorFramework() {
        //4秒超时,重试3次
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(4000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(ZkConstants.connectString)
                .connectionTimeoutMs(ZkConstants.sessionTimeout)
                .sessionTimeoutMs(ZkConstants.sessionTimeout)
                .retryPolicy(exponentialBackoffRetry)
                .build();
        client.start();
        System.out.println("zookeeper 启动成功...");
        return client;
    }
}

        希望能对大家理解zookeeper分布式锁有所帮助

相关文章:

  • rust学习笔记1-window安装开发环境
  • 上线了一个微软工具(免费),我独自开发,本篇有源码
  • python类方法名加前缀下划线
  • vue3的响应式的理解,与普通对象的区别(一)
  • 非docker安装open-webui连接ollama实现deepseek本地使用,集成其他openai模型,常见启动报错问题解决。
  • SpringAI-开启 Java AI 新纪元
  • Twitter 安卓客户端安装包下载
  • Linux——库函数
  • DeepSeek渣机部署编程用的模型,边缘设备部署模型
  • 【云安全】云原生- K8S etcd 未授权访问
  • Day1:强化学习基本概念
  • 2025最新Java面试题大全(整理版)2000+ 面试题附答案详解
  • 【linux】ubunbu切换到root
  • (四)Axure学习图文教程
  • Mybatis-扩展功能
  • 学习资料整合记录
  • 【Kubernetes】k8s 部署指南
  • Copilot基于企业PPT模板生成演示文稿
  • Apache Struts2 - 任意文件上传漏洞 - CVE-2024-53677
  • Linux学习笔记之进程
  • 海南机场拟超23亿元收购美兰空港控股权,进一步聚焦机场主业
  • 外交部:中美双方并未就关税问题进行磋商或谈判
  • 国家能源局通报上月投诉情况:赤峰有群众反映电费异常增高,已退费
  • “五一”假期逛上海车展请提前购票,展会现场不售当日票
  • 新开发银行如何开启第二个“金色十年”?
  • 媒体:黑话烂梗包围小学生,“有话好好说”很难吗?