Go和Java实现一致性Hash算法

一致性哈希算法在分布式系统的应用中是十分广泛的。常见的应用场景是分布式缓存。它主要解决了哈希取模算法在分布式系统中存在的动态伸缩等问题。

1. 为什么需要一致性哈希算法?

1.1 哈希取模算法的局限性

在分布式缓存集群中,当新增加缓存服务器或其中一台挂掉后,由路由算法发生改变,导致大量的缓存数据不能命中。从而造成数据库面临巨大压力而崩溃,可能导致整个系统崩溃。

在分布式缓存服务中,经常需要对服务进行节点添加和删除操作,我们希望的是节点添加和删除操作尽量减少数据-节点之间的映射关系更新。

假如我们使用的是哈希取模( hash(key)%nodes ) 算法作为路由策略:

哈希取模的缺点在于如果有节点的删除和添加操作,对 hash(key)%nodes 结果影响范围太大了,造成大量的请求无法命中从而导致缓存数据被重新加载。

基于上面的缺点提出了一种新的算法:一致性哈希。一致性哈希可以实现节点删除和添加只会影响一小部分数据的映射关系,由于这个特性哈希算法也常常用于各种均衡器中实现系统流量的平滑迁移。

1.2 一致性哈希工作原理

一致性哈希算法通过一个叫作一致性哈希环的数据结构实现。这个环的起点是0,终点是 $2^{32} - 1$,并且起点和终点相连接,故这个环的整数分布范围是 0 ~ $2^{32} - 1$。

具体原理如下:

首先对节点进行哈希计算,哈希值通常在 2^32-1 范围内。然后将 2^32-1 这个区间首尾连接抽象成一个环并将节点的哈希值映射到环上,当我们要查询 key 的目标节点时,同样的我们对 key 进行哈希计算,然后顺时针查找到的第一个节点就是目标节点。

根据原理我们分析一下节点添加和删除对数据范围的影响。

1. 添加节点(扩容)

只会影响新增节点与前一个节点(新增节点逆时针查找的第一个节点)之间的数据。

2. 删除节点(缩容)

只会影响删除节点与前一个节点(删除节点逆时针查找的第一个节点)之间的数据。

3. 服务器节点分布不均匀

如果服务器节点不是均匀的分布在哈希环上,那么有
可能造成服务节点负载压力不均衡。

试想一下假如环上的节点数量非常少,那么非常有可能造成数据分布不平衡,本质上是环上的区间分布粒度太粗。

怎么解决呢?不是粒度太粗吗?那就加入更多的节点,这就引出了一致性哈希的虚拟节点概念,虚拟节点的作用在于让环上的节点区间分布粒度变细。

一个真实节点对应多个虚拟节点,将虚拟节点的哈希值映射到环上,查询 key 的目标节点我们先查询虚拟节点再找到真实节点即可。

针对这个问题,我们可以通过引入虚拟节点来解决负载不均衡的问题。即将每台物理服务器虚拟为一组虚拟服务器,将虚拟服务器放置到哈希环上,如果要确定key所在的服务器,需先确定key所在的虚拟服务器,再由虚拟服务器确定物理服务器。

一台物理服务器,虚拟成若干个虚拟节点,随机分布在环上,压力近似均衡分摊。如有三台物理服务器,每台物理服务器虚拟出150个虚拟节点,随机分配在Hash环上的150个位置上。最后可使三台物理服务器近似均摊压力。当增加一台服务器时,先虚拟150个节点,然后散落在哈希环上。

2. Java实现一致性Hash算法

实现思路:

在每次添加物理节点的时候,根据物理节点的名字生成虚拟节点的名字,把虚拟节点的名字求hash值,然后把hash值作为key,物理节点作为value存放到Map中;这里我们选择使用TreeMap,因为需要key是顺序的存储。

在计算数据key需要存放到哪个物理节点时,先计算出key的hash值,然后调用TreeMap.tailMap()返回比hash值大的map子集,如果子集为空就需要把TreeMap的第一个元素返回,如果不为空,那么取子集中的第一个元素。

2.1 算法代码

public class ConsistentHashing {

private SortedMap<Integer, Node> hashCircle = new TreeMap<>();
private int virtualNums; // 虚拟节点数

public ConsistentHashing(Node[] nodes, int virtualNums) {
this.virtualNums = virtualNums;
// 初始化一致性hash环
for (Node node : nodes) {
// 创建虚拟节点
add(node);
}
}

/**
* 添加服务器节点
*
* @param node the server
*/
public void add(Node node) {
for (int i = 0; i < virtualNums; i++) {
hashCircle.put(hash(node.toString() + i), node);
}
}

/**
* 删除服务器节点
*
* @param node the server
*/
public void remove(Node node) {
for (int i = 0; i < virtualNums; i++) {
hashCircle.remove(hash(node.toString() + i));
}
}

/**
* 获取服务器节点
*
* @param key the key
* @return the server
*/
public Node getNode(String key) {
if (key == null || hashCircle.isEmpty())
return null;
int hash = hash(key);
if (!hashCircle.containsKey(hash)) {
// 未命中对应的节点
SortedMap<Integer, Node> tailMap = hashCircle.tailMap(hash);
hash = tailMap.isEmpty() ? hashCircle.firstKey() : tailMap.firstKey();
}
return hashCircle.get(hash);
}

/**
* FNV1_32_HASH算法
*
* @param key the key
* @return
*/
private int hash(String key) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < key.length(); i++) {
hash = (hash ^ key.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}

/**
* 集群节点的机器地址
*/
public static class Node {
private String ipAddr;
private int port;
private String name;

public Node(String ipAddr, int port, String name) {
this.ipAddr = ipAddr;
this.port = port;
this.name = name;
}

@Override
public String toString() {
return name + ":<" + ipAddr + ":" + port + ">";
}
}
}

2.2 评估服务器节点的负载均衡性

我们通过方差计算服务器节点的均衡性,代码如下:

public class ConsistentHashingTest {

public static void main(String[] args) {
ConsistentHashing.Node[] nodes = new ConsistentHashing.Node[4];
Map<ConsistentHashing.Node, List<String>> map = new HashMap<>();

// make nodes 4台服务器节点
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new ConsistentHashing.Node("10.1.32.2" + i, 8070, "myNode" + i);
}

ConsistentHashing ch = new ConsistentHashing(nodes, 160);

// make keys 100万个key
String[] keys = new String[1_000_000];
for (int i = 0; i < keys.length; i++) {
keys[i] = "key" + (i + 17) + "ss" + (i * 19);
}

// make results
for (String key : keys) {
ConsistentHashing.Node n = ch.getNode(key);
List<String> list = map.computeIfAbsent(n, k -> new ArrayList<>());
list.add(key);
}

// 统计标准差,评估服务器节点的负载均衡性
int[] loads = new int[nodes.length];
int x = 0;
for (Iterator<ConsistentHashing.Node> i = map.keySet().iterator(); i.hasNext(); ) {
ConsistentHashing.Node key = i.next();
List<String> list = map.get(key);
loads[x++] = list.size();
}
int min = Integer.MAX_VALUE;
int max = 0;
for (int load : loads) {
min = Math.min(min, load);
max = Math.max(max, load);
}
System.out.println("最小值: " + min + "; 最大值: " + max);
System.out.println("方差:" + variance(loads));
}

public static double variance(int[] data) {
double variance = 0;
double expect = (double) sum(data) / data.length;
for (double datum : data) {
variance += (Math.pow(datum - expect, 2));
}
variance /= data.length;
return Math.sqrt(variance);
}

private static int sum(int[] data) {
int sum = 0;
for (int i = 0; i < data.length; i++) {
sum += data[i];
}
return sum;
}
}

测试结果:

# 虚拟节点是 120
最小值: 243919; 最大值: 253236
方差:3692.7378054771234

# 虚拟节点是 130
最小值: 240190; 最大值: 257384
方差:7432.346466628153

# 虚拟节点是 150
最小值: 233002; 最大值: 260369
方差:10227.744937179456

# 虚拟节点是 160
最小值: 241154; 最大值: 253743
方差:5150.429156876153

# 虚拟节点是 170
最小值: 235938; 最大值: 260044
方差:9244.906895150432

# 虚拟节点是 200
最小值: 233187; 最大值: 263222
方差:11395.83342717855

通过测试,每台物理服务的虚拟节点在120到200之间,均衡性更好。

3. Go实现一致性Hash算法

import (
"errors"
"hash/crc32"
"sort"
"strconv"
"sync"
)

//声明新切片类型
type units []uint32

//返回切片长度
func (x units) Len() int {
return len(x)
}

//比对两个数大小
func (x units) Less(i, j int) bool {
return x[i] < x[j]
}

//切片中两个值的交换
func (x units) Swap(i, j int) {
x[i], x[j] = x[j], x[i]
}

//当hash环上没有数据时,提示错误
var errEmpty = errors.New("Hash 环没有数据")

//创建结构体保存一致性hash信息
type Consistent struct {
//hash环,key为哈希值,值存放节点的信息
circle map[uint32]string
//已经排序的节点hash切片
sortedHashes units
//虚拟节点个数,用来增加hash的平衡性
VirtualNode int
//map 读写锁
sync.RWMutex
}

//创建一致性hash算法结构体,设置默认节点数量
func NewConsistent() *Consistent {
return &Consistent{
//初始化变量
circle: make(map[uint32]string),
//设置虚拟节点个数
VirtualNode: 20,
}
}

//自动生成key值
func (c *Consistent) generateKey(element string, index int) string {
//副本key生成逻辑
return element + strconv.Itoa(index)
}

//获取hash位置
func (c *Consistent) hashkey(key string) uint32 {
if len(key) < 64 {
//声明一个数组长度为64
var srcatch [64]byte
//拷贝数据到数组中
copy(srcatch[:], key)
//使用IEEE 多项式返回数据的CRC-32校验和
return crc32.ChecksumIEEE(srcatch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}

//更新排序,方便查找
func (c *Consistent) updateSortedHashes() {
hashes := c.sortedHashes[:0]
//判断切片容量,是否过大,如果过大则重置
if cap(c.sortedHashes)/(c.VirtualNode*4) > len(c.circle) {
hashes = nil
}

//添加hashes
for k := range c.circle {
hashes = append(hashes, k)
}

//对所有节点hash值进行排序,
//方便之后进行二分查找
sort.Sort(hashes)
//重新赋值
c.sortedHashes = hashes

}

//向hash环中添加节点
func (c *Consistent) Add(element string) {
//加锁
c.Lock()
//解锁
defer c.Unlock()
c.add(element)
}

//添加节点
func (c *Consistent) add(element string) {
//循环虚拟节点,设置副本
for i := 0; i < c.VirtualNode; i++ {
//根据生成的节点添加到hash环中
c.circle[c.hashkey(c.generateKey(element, i))] = element
}
//更新排序
c.updateSortedHashes()
}

//删除节点
func (c *Consistent) remove(element string) {
for i := 0; i < c.VirtualNode; i++ {
delete(c.circle, c.hashkey(c.generateKey(element, i)))
}
c.updateSortedHashes()
}

//删除一个节点
func (c *Consistent) Remove(element string) {
c.Lock()
defer c.Unlock()
c.remove(element)
}

//顺时针查找最近的节点
func (c *Consistent) search(key uint32) int {
//查找算法
f := func(x int) bool {
return c.sortedHashes[x] > key
}
//使用"二分查找"算法来搜索指定切片满足条件的最小值
i := sort.Search(len(c.sortedHashes), f)
//如果超出范围则设置i=0
if i >= len(c.sortedHashes) {
i = 0
}
return i
}

//根据数据标示获取最近的服务器节点信息
func (c *Consistent) Get(name string) (string, error) {
//添加锁
c.RLock()
//解锁
defer c.RUnlock()
//如果为零则返回错误
if len(c.circle) == 0 {
return "", errEmpty
}
//计算hash值
key := c.hashkey(name)
i := c.search(key)
return c.circle[c.sortedHashes[i]], nil
}

4. 总结

一致性hash算法解决了分布式环境下机器增加或者减少时,简单的取模运算无法获取较高命中率的问题。通过虚拟节点的使用,一致性hash算法可以均匀分担机器的负载,使得这一算法更具现实的意义。正因如此,一致性hash算法被广泛应用于分布式系统中。

参考感谢
基于 Go 语言实现一致性 Hash 算法 - 简书 (jianshu.com)
Go语言实现一致性哈希(Consistent Hashing)算法 - Go语言中文网 - Golang中文社区 (studygolang.com)
3.7. 实现一致性hash算法 | 框架及架构优化 |《Go高并发实战 1.0》| Go 技术论坛 (learnku.com)
一文讲透一致性哈希的原理和实现 (qq.com)
基于 Go 语言实现一致性 Hash 算法 - 简书 (jianshu.com)