一致性hash应用-分库分表
目录
- 前言
- 核心概念
- 路由&扩容原理
- 资源倾斜问题
- 代码实现
- 基础实现
- 资源倾斜问题
- 整合Spring使用(以分库分表为例)
- 一致性hash工具
- hash路由配置
- 测试
- 数据迁移方案
前言
一致性哈希(Consistent Hashing)是分布式系统中常用的数据分片技术,将数据分布到一个哈希环上,,它能在节点增减时最小化数据迁移量,哈希环的大小为2的32次方-1;
通过这种一致性哈希的分库分表方案,可以实现数据的均匀分布和平滑扩容,对于水平扩展的业务场景;在实现扩容,会伴随数据的迁移,通过一致性Hash,环实现对旧数据影响最小。
核心概念
- 哈希环:将哈希空间组织成一个环形结构。
- 虚拟节点:每个物理节点对应多个虚拟节点,实现更均匀的分布。
- 数据路由:数据键的哈希值在环上顺时针找到的第一个节点即为存储节点。
- 扩容处理:新增节点时只影响相邻节点的数据,相比于取模方式,影响范围小。
路由&扩容原理
1、计算节点的hash值,比如存在node1、node2、node3,分别在服务器A/B/C上,并且每个节点都有自己的hash值,如下图分布
2、 数据在进行存储时,计算数据key的hash值,然后顺时针找到的第一节点所属的hash值,那么数据就存储到该节点上,如下图:
3、数据的扩容,影响范围最小化,比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动
资源倾斜问题
比如:
我们现在有三个节点(nodeA、nodeB、nodeC),这三个节点分别在Hash环的不同位置,并且数据Key分别为:a、b、c、d、e、f;
在进行hash计算时,可能出现 a、b、c、d可能在nodeA中,e在nodeB中、f在nodeC中,这就出现了资源倾斜;为了尽可能的实现平均分别,引入了虚拟节点;
设定每个节点都有100个虚拟接口,虚拟节点分别为:nodeA001…nodeA002…nodeA100、nodeB001…nodeB002…nodeB100、nodeC001…nodeC002…nodeC100;引入虚拟节点后此时三个真实节点实际在hash环上就有300个节点了,提高了节点的平均分布率,可以更好的实现数据均分到不同的节点上。
如图所示,节点A在Hash环上有A1和A2等虚拟节点,只要在A1和A2后面的数据key,都会存入到服务器A中;
代码实现
基础实现
引入依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>
hash计算工具类:
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class ShardingRouterComponent {
// 虚拟节点到物理节点的映射
private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();
public ShardingRouterComponent(List<String> physicalShards) {
initVirtualNodes(physicalShards);
}
private void initVirtualNodes(List<String> physicalShards) {
for (String shard : physicalShards) {
// addShard(shard);
long hash = hash(shard);
VIRTUAL_NODE_MAP.put(hash, shard);
}
}
public String routeShard(Long key) {
if (VIRTUAL_NODE_MAP.isEmpty()) {
throw new RuntimeException("No shards available");
}
long hash = hash(key.toString());
// 找到第一个大于等于该哈希值的节点
SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);
if (tail.isEmpty()) {
return VIRTUAL_NODE_MAP.firstEntry().getValue();
}
return tail.get(tail.firstKey());
}
public long hash(String key) {
return Hashing.murmur3_128()
.hashString(key, StandardCharsets.UTF_8)
.asLong();
}
}
验证
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MainTest {
public static void main(String[] args) {
// 创建4个节点
List<String> list = new ArrayList<>();
for (int i = 0; i <= 4; i++) {
String s = "node_" + i;
list.add(s);
}
ShardingRouterComponent routerComponent = new ShardingRouterComponent(list);
System.out.println(routerComponent.getAllPhysicalShards());
Map<String, List<Long>> m1 = new HashMap<>();
for (long a = 1; a <= 30; a++) {
String shard = routerComponent.routeShard(a);
System.out.println("key=" + a + " -> shard=" + shard);
List<Long> longList = m1.get(shard);
if (longList == null) {
longList = new ArrayList<>();
longList.add(a);
m1.put(shard, longList);
} else {
longList.add(a);
}
}
System.out.println(m1);
}
}
运行上面代码后,可以看到不同的key进入了不同的node节点中,但是存在一个问题:有点节点数据较少,有的节点数据较多,这就是资源倾斜了;要处理资源倾斜,我们需要增加一个虚拟节点
来处理;
资源倾斜问题
增加虚拟节点:
修改ShardingRouterComponent
类来实现虚拟节点的功能,所谓虚拟节点
就是我们虚构的节点,不存在真实的服务器,我们将使用真实节点+'#VN'+虚拟数
的方式拼接一个key,并且将该key的hash值与真实节点
进行绑定;
完整代码如下:
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class ShardingRouterComponent {
// 虚拟节点到物理节点的映射
private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();
// 每个物理节点对应的虚拟节点数
private final int VIRTUAL_NODE_SHARD_NUM;
public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {
this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;
initVirtualNodes(physicalShards);
}
private void initVirtualNodes(List<String> physicalShards) {
for (String shard : physicalShards) {
addShard(shard);
}
}
public String routeShard(Long key) {
if (VIRTUAL_NODE_MAP.isEmpty()) {
throw new RuntimeException("No shards available");
}
long hash = hash(key.toString());
// 找到第一个大于等于该哈希值的节点
SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);
if (tail.isEmpty()) {
return VIRTUAL_NODE_MAP.firstEntry().getValue();
}
return tail.get(tail.firstKey());
}
public long hash(String key) {
return Hashing.murmur3_128()
.hashString(key, StandardCharsets.UTF_8)
.asLong();
}
public void addShard(String shard) {
for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {
long hash = hash(shard + "#VN#" + i);
VIRTUAL_NODE_MAP.put(hash, shard);
}
}
}
验证
我们现在有4个物理节点(node0...node3)
,并且为每个节点,都设置100个虚拟节点,验证代码如下:
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MainTest {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i <= 4; i++) {
String s = "node_" + i;
list.add(s);
}
ShardingRouterComponent routerComponent = new ShardingRouterComponent(list, 10);
System.out.println(routerComponent.getAllPhysicalShards());
Map<String, List<Long>> m1 = new HashMap<>();
for (long a = 1; a <= 30; a++) {
String shard = routerComponent.routeShard(a);
System.out.println("key=" + a + " -> shard=" + shard);
List<Long> longList = m1.get(shard);
if (longList == null) {
longList = new ArrayList<>();
longList.add(a);
m1.put(shard, longList);
} else {
longList.add(a);
}
}
System.out.println(m1);
}
}
运行上面代码后,可以看到数据相对较均匀
整合Spring使用(以分库分表为例)
一致性hash工具
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class ShardingRouterComponent {
// 虚拟节点到物理节点的映射
private final TreeMap<Long, String> VIRTUAL_NODE_MAP = new TreeMap<>();
// 每个物理节点对应的虚拟节点数
private final int VIRTUAL_NODE_SHARD_NUM;
public ShardingRouterComponent(List<String> physicalShards, int virtualNodePerShard) {
this.VIRTUAL_NODE_SHARD_NUM = virtualNodePerShard;
initVirtualNodes(physicalShards);
}
private void initVirtualNodes(List<String> physicalShards) {
for (String shard : physicalShards) {
addShard(shard);
}
}
public String routeShard(Long key) {
if (VIRTUAL_NODE_MAP.isEmpty()) {
throw new RuntimeException("No shards available");
}
long hash = hash(key.toString());
// 找到第一个大于等于该哈希值的节点
SortedMap<Long, String> tail = VIRTUAL_NODE_MAP.tailMap(hash);
if (tail.isEmpty()) {
return VIRTUAL_NODE_MAP.firstEntry().getValue();
}
return tail.get(tail.firstKey());
}
public long hash(String key) {
return Hashing.murmur3_128()
.hashString(key, StandardCharsets.UTF_8)
.asLong();
}
public void addShard(String shard) {
for (int i = 0; i < VIRTUAL_NODE_SHARD_NUM; i++) {
long hash = hash(shard + "#VN#" + i);
VIRTUAL_NODE_MAP.put(hash, shard);
}
}
}
hash路由配置
模拟场景:对数据进行分库分表操作,定义了4个库和9个表,并且每个节点进行200个虚拟节点;
@Component
public class ShardingRouterConfig {
@Bean
public ShardingRouterComponent databaseSharding() {
// 可以写到yml中定义
int dbNum = 4;
int tableNum = 9;
// 可以写到yml中定义
int virtualNodePerShard = 200;
List<String> list = new ArrayList<>();
for (int i = 0; i <= dbNum; i++) {
list.add("db_" + i);
}
return new ShardingRouterComponent(list, virtualNodePerShard);
}
@Bean
public ShardingRouterComponent tableSharding() {
List<String> list = new ArrayList<>();
for (int i = 0; i <= tableNum; i++) {
list.add("table_" + i);
}
return new ShardingRouterComponent(list, virtualNodePerShard);
}
}
测试
为了演示demo,直接在controller中进行使用,实际可以单独创建一个bean进行key路由的计算;
@RestController
@Slf4j
public class TestController {
@Resource
private ShardingRouterComponent databaseSharding;
@Resource
private ShardingRouterComponent tableSharding;
@GetMapping("/test")
public void test() throws Exception {
// 随机一个key
long key = System.currentTimeMillis();
// 计算所属库
log.info(databaseSharding.routeShard(key));
// 计算所属表
log.info(tableSharding.routeShard(key));
}
}
数据迁移方案
一致性hash的一个优点就是:数据的扩容,影响范围最小化
比如新增了一个nodeX,并且nodeX节点的Hash值比nodeC的hash值,此时有一个数据C顺时针的第一个节点就是nodeX了,但是数据A、和数据B并不会收到影响;对于扩容后我们只需要将nodeC的数据迁移到nodeX即可,其他节点不用动
数据迁移方案
第一步:单独一个应用,应用会做一下操作
- 该应用中的hash节点数是扩容后节点数据,应用通过多线程的方式对所有的表按照
id升序
进行查询,将查询后的数据重新计算分片键的hash值,并且将数据insert到新hash值所映射的库/表
中;PS:在主体应用中 如果不做新节点的配置,是不会有数据被使用到的 - 该应用通过
Canal组件
监听扩容前的所有表,如何是旧的节点表
出现insert、delete、update
,则需要判断分片健的hash是否匹配当前节点表,如果不匹配 就需要更新或者新增
到新表中;
第二步:主体应用发布,主体应用发布后,进行以下操作
-
主体应用成功发布后,数据的新增和修改都是在新表进行的,持续观察一段时间;
-
独立应用再进行一个新的任务,该任务扫描旧表的所有数据,通过id进行倒叙查询
- 如果旧数据的hash值属于新表,并且新表中不存在新表则进行新增;
- 旧数据已经存在新表时 ,则比较旧表数据的修改时间与新表的修改时间。如果旧表数据修改时间大于新表修改时间,则更新新表数据+删除旧表数据(一个事务中);如果旧表数据更新时间小于等于新表数据, 则直接删除旧表;