当前位置: 首页 > news >正文

一致性hash应用-分库分表

😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: XXXXXXX
⏱️ @ 创作时间: 2025年03月31日

目录

    • 前言
    • 核心概念
    • 路由&扩容原理
    • 资源倾斜问题
    • 代码实现
      • 基础实现
      • 资源倾斜问题
    • 整合Spring使用(以分库分表为例)
      • 一致性hash工具
      • hash路由配置
      • 测试
    • 数据迁移方案

前言

一致性哈希(Consistent Hashing)是分布式系统中常用的数据分片技术,将数据分布到一个哈希环上,,它能在节点增减时最小化数据迁移量,哈希环的大小为2的32次方-1;

通过这种一致性哈希的分库分表方案,可以实现数据的均匀分布和平滑扩容,对于水平扩展的业务场景;在实现扩容,会伴随数据的迁移,通过一致性Hash,环实现对旧数据影响最小。

核心概念

  1. 哈希环:将哈希空间组织成一个环形结构。
  2. 虚拟节点:每个物理节点对应多个虚拟节点,实现更均匀的分布。
  3. 数据路由:数据键的哈希值在环上顺时针找到的第一个节点即为存储节点。
  4. 扩容处理:新增节点时只影响相邻节点的数据,相比于取模方式,影响范围小。

路由&扩容原理

1、计算节点的hash值,比如存在node1、node2、node3,分别在服务器A/B/C上,并且每个节点都有自己的hash值,如下图分布
在这里插入图片描述

2、 数据在进行存储时,计算数据key的hash值,然后顺时针找到的第一节点所属的hash值,那么数据就存储到该节点上,如下图:
在这里插入图片描述

3、数据的扩容,影响范围最小化,比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动
在这里插入图片描述

资源倾斜问题

比如:

我们现在有三个节点(nodeA、nodeB、nodeC),这三个节点分别在Hash环的不同位置,并且数据Key分别为:a、b、c、d、e、f;

在进行hash计算时,可能出现 a、b、c、d可能在nodeA中,e在nodeB中、f在nodeC中,这就出现了资源倾斜;为了尽可能的实现平均分别,引入了虚拟节点;

设定每个节点都有100个虚拟接口,虚拟节点分别为:nodeA001…nodeA002…nodeA100、nodeB001…nodeB002…nodeB100、nodeC001…nodeC002…nodeC100;引入虚拟节点后此时三个真实节点实际在hash环上就有300个节点了,提高了节点的平均分布率,可以更好的实现数据均分到不同的节点上。

如图所示,节点A在Hash环上有A1和A2等虚拟节点,只要在A1和A2后面的数据key,都会存入到服务器A中;
在这里插入图片描述

代码实现

基础实现

引入依赖:

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>33.0.0-jre</version>
</dependency>

hash计算工具类:

import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.*;
public class ShardingRouterComponent {

    // 虚拟节点到物理节点的映射
    private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();
    
    public ShardingRouterComponent(List<String> physicalShards) {
        initVirtualNodes(physicalShards);
    }
    
   	private void initVirtualNodes(List<String> physicalShards) {
        for (String shard : physicalShards) {
            // addShard(shard);
            long hash = hash(shard);
            VIRTUAL_NODE_MAP.put(hash, shard);
        }
    }
    
    public String routeShard(Long key) {
        if (VIRTUAL_NODE_MAP.isEmpty()) {
            throw new RuntimeException("No shards available");
        }
        long hash = hash(key.toString());
        // 找到第一个大于等于该哈希值的节点
        SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);
        if (tail.isEmpty()) {
            return VIRTUAL_NODE_MAP.firstEntry().getValue();
        }
        return tail.get(tail.firstKey());
    }
    
   	public long hash(String key) {
        return Hashing.murmur3_128()
                .hashString(key, StandardCharsets.UTF_8)
                .asLong();
    }
}

验证

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MainTest {
        public static void main(String[] args) {
        // 创建4个节点
        List<String> list = new ArrayList<>();
        for (int i = 0; i <= 4; i++) {
            String s = "node_" + i;
            list.add(s);
        }
        ShardingRouterComponent routerComponent = new ShardingRouterComponent(list);
        System.out.println(routerComponent.getAllPhysicalShards());
        Map<String, List<Long>> m1 = new HashMap<>();
        for (long a = 1; a <= 30; a++) {
            String shard = routerComponent.routeShard(a);
            System.out.println("key=" + a + " -> shard=" + shard);
            List<Long> longList = m1.get(shard);
            if (longList == null) {
                longList = new ArrayList<>();
                longList.add(a);
                m1.put(shard, longList);
            } else {
                longList.add(a);
            }
        }
        System.out.println(m1);
    }
}

运行上面代码后,可以看到不同的key进入了不同的node节点中,但是存在一个问题:有点节点数据较少,有的节点数据较多,这就是资源倾斜了;要处理资源倾斜,我们需要增加一个虚拟节点来处理;
在这里插入图片描述

资源倾斜问题

增加虚拟节点:

修改ShardingRouterComponent类来实现虚拟节点的功能,所谓虚拟节点就是我们虚构的节点,不存在真实的服务器,我们将使用真实节点+'#VN'+虚拟数的方式拼接一个key,并且将该key的hash值与真实节点进行绑定;

完整代码如下:

import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.*;

public class ShardingRouterComponent {

    // 虚拟节点到物理节点的映射
    private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();

    // 每个物理节点对应的虚拟节点数
    private final int VIRTUAL_NODE_SHARD_NUM;

    public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {
        this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;
        initVirtualNodes(physicalShards);
    }

    private void initVirtualNodes(List<String> physicalShards) {
        for (String shard : physicalShards) {
            addShard(shard);
        }
    }

    public String routeShard(Long key) {
        if (VIRTUAL_NODE_MAP.isEmpty()) {
            throw new RuntimeException("No shards available");
        }
        long hash = hash(key.toString());
        // 找到第一个大于等于该哈希值的节点
        SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);
        if (tail.isEmpty()) {
            return VIRTUAL_NODE_MAP.firstEntry().getValue();
        }
        return tail.get(tail.firstKey());
    }

    public long hash(String key) {
        return Hashing.murmur3_128()
                .hashString(key, StandardCharsets.UTF_8)
                .asLong();
    }


    public void addShard(String shard) {
        for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {
            long hash = hash(shard + "#VN#" + i);
            VIRTUAL_NODE_MAP.put(hash, shard);
        }
    }

}

验证

我们现在有4个物理节点(node0...node3),并且为每个节点,都设置100个虚拟节点,验证代码如下:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MainTest {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        for (int i = 0; i <= 4; i++) {
            String s = "node_" + i;
            list.add(s);
        }
        ShardingRouterComponent routerComponent = new ShardingRouterComponent(list, 10);
        System.out.println(routerComponent.getAllPhysicalShards());
        Map<String, List<Long>> m1 = new HashMap<>();
        for (long a = 1; a <= 30; a++) {
            String shard = routerComponent.routeShard(a);
            System.out.println("key=" + a + " -> shard=" + shard);
            List<Long> longList = m1.get(shard);
            if (longList == null) {
                longList = new ArrayList<>();
                longList.add(a);
                m1.put(shard, longList);
            } else {
                longList.add(a);
            }
        }
        System.out.println(m1);

    }
}

运行上面代码后,可以看到数据相对较均匀
在这里插入图片描述

整合Spring使用(以分库分表为例)

一致性hash工具

import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.*;

public class ShardingRouterComponent {

    // 虚拟节点到物理节点的映射
    private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();

    // 每个物理节点对应的虚拟节点数
    private final int VIRTUAL_NODE_SHARD_NUM;

    public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {
        this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;
        initVirtualNodes(physicalShards);
    }

    private void initVirtualNodes(List<String> physicalShards) {
        for (String shard : physicalShards) {
            addShard(shard);
        }
    }

    public String routeShard(Long key) {
        if (VIRTUAL_NODE_MAP.isEmpty()) {
            throw new RuntimeException("No shards available");
        }
        long hash = hash(key.toString());
        // 找到第一个大于等于该哈希值的节点
        SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);
        if (tail.isEmpty()) {
            return VIRTUAL_NODE_MAP.firstEntry().getValue();
        }
        return tail.get(tail.firstKey());
    }

    public long hash(String key) {
        return Hashing.murmur3_128()
                .hashString(key, StandardCharsets.UTF_8)
                .asLong();
    }

    public void addShard(String shard) {
        for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {
            long hash = hash(shard + "#VN#" + i);
            VIRTUAL_NODE_MAP.put(hash, shard);
        }
    }

}

hash路由配置

模拟场景:对数据进行分库分表操作,定义了4个库和9个表,并且每个节点进行200个虚拟节点;

@Component
public class ShardingRouterConfig {

    @Bean
    public ShardingRouterComponent databaseSharding() {

        // 可以写到yml中定义
        int dbNum = 4;
        int tableNum = 9;

        // 可以写到yml中定义
        int virtualNodePerShard = 200;

        List<String> list = new ArrayList<>();
        for (int i = 0; i <= dbNum; i++) {
            list.add("db_" + i);
        }
        return new ShardingRouterComponent(list, virtualNodePerShard);
    }

    @Bean
    public ShardingRouterComponent tableSharding() {
        List<String> list = new ArrayList<>();
        for (int i = 0; i <= tableNum; i++) {
            list.add("table_" + i);
        }
        return new ShardingRouterComponent(list, virtualNodePerShard);
    }
}

测试

为了演示demo,直接在controller中进行使用,实际可以单独创建一个bean进行key路由的计算;

@RestController
@Slf4j
public class TestController {
    
    
    @Resource
    private ShardingRouterComponent databaseSharding;

    @Resource
    private ShardingRouterComponent tableSharding;
    
    @GetMapping("/test")
    public void test() throws Exception {
        // 随机一个key
        long key = System.currentTimeMillis();
        // 计算所属库
        log.info(databaseSharding.routeShard(key)); 
        // 计算所属表
        log.info(tableSharding.routeShard(key));
    }
}

数据迁移方案

一致性hash的一个优点就是:数据的扩容,影响范围最小化

比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动

在这里插入图片描述

数据迁移方案

第一步:单独一个应用,应用会做一下操作

  • 该应用中的hash节点数是扩容后节点数据,应用通过多线程的方式对所有的表按照id升序进行查询,将查询后的数据重新计算分片键的hash值,并且将数据insert到新hash值所映射的库/表中;PS:在主体应用中 如果不做新节点的配置,是不会有数据被使用到的
  • 该应用通过Canal组件监听扩容前的所有表,如何是旧的节点表出现insert、delete、update,则需要判断分片健的hash是否匹配当前节点表,如果不匹配 就需要更新或者新增到新表中;

第二步:主体应用发布,主体应用发布后,进行以下操作

  • 主体应用成功发布后,数据的新增和修改都是在新表进行的,持续观察一段时间;

  • 独立应用再进行一个新的任务,该任务扫描旧表的所有数据,通过id进行倒叙查询

    • 如果旧数据的hash值属于新表,并且新表中不存在新表则进行新增;
    • 旧数据已经存在新表时 ,则比较旧表数据的修改时间与新表的修改时间。如果旧表数据修改时间大于新表修改时间,则更新新表数据+删除旧表数据(一个事务中);如果旧表数据更新时间小于等于新表数据, 则直接删除旧表;

在这里插入图片描述

相关文章:

  • github 页面超时解决方法
  • ai画图hiresfix放大算法。
  • 蓝桥杯每天5题
  • SQL注入:基于GET和POST的报错注入详解
  • 【含文档+PPT+源码】基于微信小程序的在线考试与选课教学辅助系统
  • RAG 优化 Embedding 模型或调整检索策略
  • VBA代码解决方案第二十三讲 EXCEL中,如何删除工作表中的空白行
  • XSLT Apply:深入解析XSLT在XML转换中的应用
  • Qt之QTextEdit控制文本滚动, 停止滚动, 开始滚动, 鼠标控制滚动
  • 单调队列-滑动窗口算法一篇学会-AcWing 154. 滑动窗口
  • js中的document.querySelect()
  • 哈希表 - 两数之和(Map) - JS
  • OpenBMC:BmcWeb 处理http请求2 查找路由对象
  • 0102-web架构网站搭建-基础入门-网络安全
  • 我的世界1.20.1进阶模组开发教程——升级模板与文字格式
  • Nginx 配置 HTTPS 与 WSS 完整指南
  • 亚马逊新卖家破局指南:从0到1搭建可持续出单模型
  • Linux内核编程
  • 关于CodeJava的学习笔记——11
  • 贪心算法(13)(java)合并区间
  • 国外做美食视频网站有哪些/今日短新闻20条
  • 如何做销售直播网站/新品上市怎么推广词
  • 互联网技术专业学什么/沈阳seo按天计费
  • 做视频写真网站犯法吗/网站维护中
  • 电工应用技术网站资源建设/巨量算数数据分析入口
  • 微信扫一扫登录网站如何做/注册域名查询网站官网