Go和Java实现一致性Hash算法 一致性哈希算法在分布式系统的应用中是十分广泛的。常见的应用场景是分布式缓存。它主要解决了哈希取模算法在分布式系统中存在的动态伸缩等问题。
1. 为什么需要一致性哈希算法? 1.1 哈希取模算法的局限性 在分布式缓存集群中,当新增加缓存服务器或其中一台挂掉后,由路由算法发生改变,导致大量的缓存数据不能命中。从而造成数据库面临巨大压力而崩溃,可能导致整个系统崩溃。
在分布式缓存服务中,经常需要对服务进行节点添加和删除操作,我们希望的是节点添加和删除操作尽量减少数据-节点之间的映射关系更新。
假如我们使用的是哈希取模( hash(key)%nodes ) 算法作为路由策略:
哈希取模的缺点在于如果有节点的删除和添加操作,对 hash(key)%nodes 结果影响范围太大了,造成大量的请求无法命中从而导致缓存数据被重新加载。
基于上面的缺点提出了一种新的算法:一致性哈希。一致性哈希可以实现节点删除和添加只会影响一小部分数据的映射关系,由于这个特性哈希算法也常常用于各种均衡器中实现系统流量的平滑迁移。
1.2 一致性哈希工作原理 一致性哈希算法通过一个叫作一致性哈希环的数据结构实现。这个环的起点是0,终点是 $2^{32} - 1$,并且起点和终点相连接,故这个环的整数分布范围是 0 ~ $2^{32} - 1$。
具体原理如下:
首先对节点进行哈希计算,哈希值通常在 2^32-1 范围内。然后将 2^32-1 这个区间首尾连接抽象成一个环并将节点的哈希值映射到环上,当我们要查询 key 的目标节点时,同样的我们对 key 进行哈希计算,然后顺时针查找到的第一个节点就是目标节点。
根据原理我们分析一下节点添加和删除对数据范围的影响。
1. 添加节点(扩容) 只会影响新增节点与前一个节点(新增节点逆时针查找的第一个节点)之间的数据。
2. 删除节点(缩容) 只会影响删除节点与前一个节点(删除节点逆时针查找的第一个节点)之间的数据。
3. 服务器节点分布不均匀 如果服务器节点不是均匀的分布在哈希环上,那么有 可能造成服务节点负载压力不均衡。
试想一下假如环上的节点数量非常少,那么非常有可能造成数据分布不平衡,本质上是环上的区间分布粒度太粗。
怎么解决呢?不是粒度太粗吗?那就加入更多的节点,这就引出了一致性哈希的虚拟节点概念,虚拟节点的作用在于让环上的节点区间分布粒度变细。
一个真实节点对应多个虚拟节点,将虚拟节点的哈希值映射到环上,查询 key 的目标节点我们先查询虚拟节点再找到真实节点即可。
针对这个问题,我们可以通过引入虚拟节点来解决负载不均衡的问题。即将每台物理服务器虚拟为一组虚拟服务器,将虚拟服务器放置到哈希环上,如果要确定key所在的服务器,需先确定key所在的虚拟服务器,再由虚拟服务器确定物理服务器。
一台物理服务器,虚拟成若干个虚拟节点,随机分布在环上,压力近似均衡分摊。如有三台物理服务器,每台物理服务器虚拟出150个虚拟节点,随机分配在Hash环上的150个位置上。最后可使三台物理服务器近似均摊压力。当增加一台服务器时,先虚拟150个节点,然后散落在哈希环上。
2. Java实现一致性Hash算法 实现思路:
在每次添加物理节点的时候,根据物理节点的名字生成虚拟节点的名字,把虚拟节点的名字求hash值,然后把hash值作为key,物理节点作为value存放到Map中;这里我们选择使用TreeMap,因为需要key是顺序的存储。
在计算数据key需要存放到哪个物理节点时,先计算出key的hash值,然后调用TreeMap.tailMap()返回比hash值大的map子集,如果子集为空就需要把TreeMap的第一个元素返回,如果不为空,那么取子集中的第一个元素。
2.1 算法代码 public class ConsistentHashing { private SortedMap<Integer, Node> hashCircle = new TreeMap<>(); private int virtualNums; public ConsistentHashing (Node[] nodes, int virtualNums) { this .virtualNums = virtualNums; for (Node node : nodes) { add(node); } } public void add (Node node) { for (int i = 0 ; i < virtualNums; i++) { hashCircle.put(hash(node.toString() + i), node); } } public void remove (Node node) { for (int i = 0 ; i < virtualNums; i++) { hashCircle.remove(hash(node.toString() + i)); } } public Node getNode (String key) { if (key == null || hashCircle.isEmpty()) return null ; int hash = hash(key); if (!hashCircle.containsKey(hash)) { SortedMap<Integer, Node> tailMap = hashCircle.tailMap(hash); hash = tailMap.isEmpty() ? hashCircle.firstKey() : tailMap.firstKey(); } return hashCircle.get(hash); } private int hash (String key) { final int p = 16777619 ; int hash = (int ) 2166136261L ; for (int i = 0 ; i < key.length(); i++) { hash = (hash ^ key.charAt(i)) * p; } hash += hash << 13 ; hash ^= hash >> 7 ; hash += hash << 3 ; hash ^= hash >> 17 ; hash += hash << 5 ; if (hash < 0 ) { hash = Math.abs(hash); } return hash; } public static class Node { private String ipAddr; private int port; private String name; public Node (String ipAddr, int port, String name) { this .ipAddr = ipAddr; this .port = port; this .name = name; } @Override public String toString () { return name + ":<" + ipAddr + ":" + port + ">" ; } } }
2.2 评估服务器节点的负载均衡性 我们通过方差计算服务器节点的均衡性,代码如下:
public class ConsistentHashingTest { public static void main (String[] args) { ConsistentHashing.Node[] nodes = new ConsistentHashing.Node[4 ]; Map<ConsistentHashing.Node, List<String>> map = new HashMap<>(); for (int i = 0 ; i < nodes.length; i++) { nodes[i] = new ConsistentHashing.Node("10.1.32.2" + i, 8070 , "myNode" + i); } ConsistentHashing ch = new ConsistentHashing(nodes, 160 ); String[] keys = new String[1_000_000 ]; for (int i = 0 ; i < keys.length; i++) { keys[i] = "key" + (i + 17 ) + "ss" + (i * 19 ); } for (String key : keys) { ConsistentHashing.Node n = ch.getNode(key); List<String> list = map.computeIfAbsent(n, k -> new ArrayList<>()); list.add(key); } int [] loads = new int [nodes.length]; int x = 0 ; for (Iterator<ConsistentHashing.Node> i = map.keySet().iterator(); i.hasNext(); ) { ConsistentHashing.Node key = i.next(); List<String> list = map.get(key); loads[x++] = list.size(); } int min = Integer.MAX_VALUE; int max = 0 ; for (int load : loads) { min = Math.min(min, load); max = Math.max(max, load); } System.out.println("最小值: " + min + "; 最大值: " + max); System.out.println("方差:" + variance(loads)); } public static double variance (int [] data) { double variance = 0 ; double expect = (double ) sum(data) / data.length; for (double datum : data) { variance += (Math.pow(datum - expect, 2 )); } variance /= data.length; return Math.sqrt(variance); } private static int sum (int [] data) { int sum = 0 ; for (int i = 0 ; i < data.length; i++) { sum += data[i]; } return sum; } }
测试结果:
最小值: 243919; 最大值: 253236 方差:3692.7378054771234 最小值: 240190; 最大值: 257384 方差:7432.346466628153 最小值: 233002; 最大值: 260369 方差:10227.744937179456 最小值: 241154; 最大值: 253743 方差:5150.429156876153 最小值: 235938; 最大值: 260044 方差:9244.906895150432 最小值: 233187; 最大值: 263222 方差:11395.83342717855
通过测试,每台物理服务的虚拟节点在120到200之间,均衡性更好。
3. Go实现一致性Hash算法 import ( "errors" "hash/crc32" "sort" "strconv" "sync" ) type units []uint32 func (x units) Len () int { return len (x) } func (x units) Less (i, j int ) bool { return x[i] < x[j] } func (x units) Swap (i, j int ) { x[i], x[j] = x[j], x[i] } var errEmpty = errors.New("Hash 环没有数据" )type Consistent struct { circle map [uint32 ]string sortedHashes units VirtualNode int sync.RWMutex } func NewConsistent () *Consistent { return &Consistent{ circle: make (map [uint32 ]string ), VirtualNode: 20 , } } func (c *Consistent) generateKey (element string , index int ) string { return element + strconv.Itoa(index) } func (c *Consistent) hashkey (key string ) uint32 { if len (key) < 64 { var srcatch [64 ]byte copy (srcatch[:], key) return crc32.ChecksumIEEE(srcatch[:len (key)]) } return crc32.ChecksumIEEE([]byte (key)) } func (c *Consistent) updateSortedHashes () { hashes := c.sortedHashes[:0 ] if cap (c.sortedHashes)/(c.VirtualNode*4 ) > len (c.circle) { hashes = nil } for k := range c.circle { hashes = append (hashes, k) } sort.Sort(hashes) c.sortedHashes = hashes } func (c *Consistent) Add (element string ) { c.Lock() defer c.Unlock() c.add(element) } func (c *Consistent) add (element string ) { for i := 0 ; i < c.VirtualNode; i++ { c.circle[c.hashkey(c.generateKey(element, i))] = element } c.updateSortedHashes() } func (c *Consistent) remove (element string ) { for i := 0 ; i < c.VirtualNode; i++ { delete (c.circle, c.hashkey(c.generateKey(element, i))) } c.updateSortedHashes() } func (c *Consistent) Remove (element string ) { c.Lock() defer c.Unlock() c.remove(element) } func (c *Consistent) search (key uint32 ) int { f := func (x int ) bool { return c.sortedHashes[x] > key } i := sort.Search(len (c.sortedHashes), f) if i >= len (c.sortedHashes) { i = 0 } return i } func (c *Consistent) Get (name string ) (string , error) { c.RLock() defer c.RUnlock() if len (c.circle) == 0 { return "" , errEmpty } key := c.hashkey(name) i := c.search(key) return c.circle[c.sortedHashes[i]], nil }
4. 总结 一致性hash算法解决了分布式环境下机器增加或者减少时,简单的取模运算无法获取较高命中率的问题。通过虚拟节点的使用,一致性hash算法可以均匀分担机器的负载,使得这一算法更具现实的意义。正因如此,一致性hash算法被广泛应用于分布式系统中。
参考感谢基于 Go 语言实现一致性 Hash 算法 - 简书 (jianshu.com) Go语言实现一致性哈希(Consistent Hashing)算法 - Go语言中文网 - Golang中文社区 (studygolang.com) 3.7. 实现一致性hash算法 | 框架及架构优化 |《Go高并发实战 1.0》| Go 技术论坛 (learnku.com) 一文讲透一致性哈希的原理和实现 (qq.com) 基于 Go 语言实现一致性 Hash 算法 - 简书 (jianshu.com)