Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537 Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537
者:kylinkzhang,CSIG后臺開發工程師
| 導語 一致性Hash算法是解決分布式緩存等問題的一種算法; 本文介紹了一致性Hash算法的原理,并給出了一種實現和實際運用的案例;
考慮這么一種場景:
我們有三臺緩存服務器編號node0、node1、node2,現在有3000萬個key,希望可以將這些個key均勻的緩存到三臺機器上,你會想到什么方案呢?
我們可能首先想到的方案是:取模算法hash(key)% N,即:對key進行hash運算后取模,N是機器的數量;
這樣,對key進行hash后的結果對3取模,得到的結果一定是0、1或者2,正好對應服務器node0、node1、node2,存取數據直接找對應的服務器即可,簡單粗暴,完全可以解決上述的問題;
取模算法雖然使用簡單,但對機器數量取模,在集群擴容和收縮時卻有一定的局限性:因為在生產環境中根據業務量的大小,調整服務器數量是常有的事;
而服務器數量N發生變化后hash(key)% N計算的結果也會隨之變化!
比如:一個服務器節點掛了,計算公式從hash(key)% 3變成了hash(key)% 2,結果會發生變化,此時想要訪問一個key,這個key的緩存位置大概率會發生改變,那么之前緩存key的數據也會失去作用與意義;
大量緩存在同一時間失效,造成緩存的雪崩,進而導致整個緩存系統的不可用,這基本上是不能接受的;
為了解決優化上述情況,一致性hash算法應運而生~
一致性哈希算法在 1997 年由麻省理工學院提出,是一種特殊的哈希算法,在移除或者添加一個服務器時,能夠盡可能小地改變已存在的服務請求與處理請求服務器之間的映射關系;
一致性哈希解決了簡單哈希算法在分布式哈希表(Distributed Hash Table,DHT)中存在的動態伸縮等問題;
一致性hash算法本質上也是一種取模算法;
不過,不同于上邊按服務器數量取模,一致性hash是對固定值2^32取模;
IPv4的地址是4組8位2進制數組成,所以用2^32可以保證每個IP地址會有唯一的映射;
我們可以將這2^32個值抽象成一個圓環??,圓環的正上方的點代表0,順時針排列,以此類推:1、2、3…直到2^32-1,而這個由2的32次方個點組成的圓環統稱為hash環;
在對服務器進行映射時,使用hash(服務器ip)% 2^32,即:
使用服務器IP地址進行hash計算,用哈希后的結果對2^32取模,結果一定是一個0到2^32-1之間的整數;
而這個整數映射在hash環上的位置代表了一個服務器,依次將node0、node1、node2三個緩存服務器映射到hash環上;
在對對應的Key映射到具體的服務器時,需要首先計算Key的Hash值:hash(key)% 2^32;
注:此處的Hash函數可以和之前計算服務器映射至Hash環的函數不同,只要保證取值范圍和Hash環的范圍相同即可(即:2^32);
將Key映射至服務器遵循下面的邏輯:
從緩存對象key的位置開始,沿順時針方向遇到的第一個服務器,便是當前對象將要緩存到的服務器;
假設我們有 "semlinker"、"kakuqo"、"lolo"、"fer" 四個對象,分別簡寫為 o1、o2、o3 和 o4;
首先,使用哈希函數計算這個對象的 hash 值,值的范圍是 [0, 2^32-1]:
圖中對象的映射關系如下:
hash(o1)=k1; hash(o2)=k2;
hash(o3)=k3; hash(o4)=k4;
同時 3 臺緩存服務器,分別為 CS1、CS2 和 CS3:
則可知,各對象和服務器的映射關系如下:
K1=> CS1
K4=> CS3
K2=> CS2
K3=> CS1
即:
以上便是一致性Hash的工作原理;
可以看到,一致性Hash就是:將原本單個點的Hash映射,轉變為了在一個環上的某個片段上的映射!
下面我們來看幾種服務器擴縮容的場景;
假設 CS3 服務器出現故障導致服務下線,這時原本存儲于 CS3 服務器的對象 o4,需要被重新分配至 CS2 服務器,其它對象仍存儲在原有的機器上:
此時受影響的數據只有 CS2 和 CS3 服務器之間的部分數據!
假如業務量激增,我們需要增加一臺服務器 CS4,經過同樣的 hash 運算,該服務器最終落于 t1 和 t2 服務器之間,具體如下圖所示:
此時,只有 t1 和 t2 服務器之間的部分對象需要重新分配;
在以上示例中只有 o3 對象需要重新分配,即它被重新到 CS4 服務器;
在前面我們已經說過:如果使用簡單的取模方法,當新添加服務器時可能會導致大部分緩存失效,而使用一致性哈希算法后,這種情況得到了較大的改善,因為只有少部分對象需要重新分配!
在上面給出的例子中,各個服務器幾乎是平均被均攤到Hash環上;
但是在實際場景中很難選取到一個Hash函數這么完美的將各個服務器散列到Hash環上;
此時,在服務器節點數量太少的情況下,很容易因為節點分布不均勻而造成數據傾斜問題;
如下圖被緩存的對象大部分緩存在node-4服務器上,導致其他節點資源浪費,系統壓力大部分集中在node-4節點上,這樣的集群是非常不健康的:
同時,還有另一個問題:
在上面新增服務器 CS4 時,CS4 只分擔了 CS1 服務器的負載,服務器 CS2 和 CS3 并沒有因為 CS4 服務器的加入而減少負載壓力;如果 CS4 服務器的性能與原有服務器的性能一致甚至可能更高,那么這種結果并不是我們所期望的;
針對上面的問題,我們可以通過:引入虛擬節點來解決負載不均衡的問題:
即將每臺物理服務器虛擬為一組虛擬服務器,將虛擬服務器放置到哈希環上,如果要確定對象的服務器,需先確定對象的虛擬服務器,再由虛擬服務器確定物理服務器;
如下圖所示:
在圖中:o1 和 o2 表示對象,v1 ~ v6 表示虛擬服務器,s1 ~ s3 表示實際的物理服務器;
虛擬節點的hash計算通常可以采用:對應節點的IP地址加數字編號后綴 hash(10.24.23.227#1) 的方式;
舉個例子,node-1節點IP為10.24.23.227,正常計算node-1的hash值:
假設我們給node-1設置三個虛擬節點,node-1#1、node-1#2、node-1#3,對它們進行hash后取模:
注意:
分配的虛擬節點個數越多,映射在hash環上才會越趨于均勻,節點太少的話很難看出效果;
引入虛擬節點的同時也增加了新的問題,要做虛擬節點和真實節點間的映射,對象key->虛擬節點->實際節點之間的轉換;
一致性hash在分布式系統中應該是實現負載均衡的首選算法,它的實現比較靈活,既可以在客戶端實現,也可以在中間件上實現,比如日常使用較多的緩存中間件memcached和redis集群都有用到它;
memcached的集群比較特殊,嚴格來說它只能算是偽集群,因為它的服務器之間不能通信,請求的分發路由完全靠客戶端來的計算出緩存對象應該落在哪個服務器上,而它的路由算法用的就是一致性hash;
還有redis集群中hash槽的概念,雖然實現不盡相同,但思想萬變不離其宗,看完本篇的一致性hash,你再去理解redis槽位就輕松多了;
其它的應用場景還有很多:
下面我們根據上面的講述,使用Golang實現一個一致性Hash算法,這個算法具有一些下面的功能特性:
具體源代碼見:
https://github.com/JasonkayZK/consistent-hashing-demo
下面開始實現吧!
首先定義每一臺緩存服務器的數據結構:
core/host.go
type Host struct {
// the host id: ip:port
Name string
// the load bound of the host
LoadBound int64
}
其中:
其次,定義一致性Hash的結構:
core/algorithm.go
// Consistent is an implementation of consistent-hashing-algorithm
type Consistent struct {
// the number of replicas
replicaNum int
// the total loads of all replicas
totalLoad int64
// the hash function for keys
hashFunc func(key string) uint64
// the map of virtual nodes to hosts
hostMap map[string]*Host
// the map of hashed virtual nodes to host name
replicaHostMap map[uint64]string
// the hash ring
sortedHostsHashSet []uint64
// the hash ring lock
sync.RWMutex
}
其中:
大概的結構如上所示,下面我們來看一些常量和錯誤的定義;
常量的定義如下:
core/algorithm.go
const (
// The format of the host replica name
hostReplicaFormat=`%s%d`
)
var (
// the default number of replicas
defaultReplicaNum=10
// the load bound factor
// ref: https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
loadBoundFactor=0.25
// the default Hash function for keys
defaultHashFunc=func(key string) uint64 {
out :=sha512.Sum512([]byte(key))
return binary.LittleEndian.Uint64(out[:])
}
)
分別表示:
還有一些錯誤的定義:
core/error.go
var (
ErrHostAlreadyExists=errors.New("host already exists")
ErrHostNotFound=errors.New("host not found")
)
分別表示服務器已經注冊,以及緩存服務器未找到;
下面來看具體的方法實現!
注冊緩存服務器的代碼如下:
core/algorithm.go
func (c *Consistent) RegisterHost(hostName string) error {
c.Lock()
defer c.Unlock()
if _, ok :=c.hostMap[hostName]; ok {
return ErrHostAlreadyExists
}
c.hostMap[hostName]=&Host{
Name: hostName,
LoadBound: 0,
}
for i :=0; i < c.replicaNum; i++ {
hashedIdx :=c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i))
c.replicaHostMap[hashedIdx]=hostName
c.sortedHostsHashSet=append(c.sortedHostsHashSet, hashedIdx)
}
// sort hashes in ascending order
sort.Slice(c.sortedHostsHashSet, func(i int, j int) bool {
if c.sortedHostsHashSet[i] < c.sortedHostsHashSet[j] {
return true
}
return false
})
return nil
}
代碼比較簡單,簡單說一下;
首先,檢查服務器是否已經注冊,如果已經注冊,則直接返回已經注冊的錯誤;
隨后,創建一個Host對象,并且在 for 循環中創建多個虛擬節點:
最后,對Hash環進行排序;
這里使用數組作為Hash環只是為了便于說明,在實際實現中建議選用其他數據結構進行實現,以獲取更好的性能;
當緩存服務器信息寫入 replicaHostMap 映射以及 Hash 環后,即完成了緩存服務器的注冊;
注銷緩存服務器的代碼如下:
core/algorithm.go
func (c *Consistent) UnregisterHost(hostName string) error {
c.Lock()
defer c.Unlock()
if _, ok :=c.hostMap[hostName]; !ok {
return ErrHostNotFound
}
delete(c.hostMap, hostName)
for i :=0; i < c.replicaNum; i++ {
hashedIdx :=c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i))
delete(c.replicaHostMap, hashedIdx)
c.delHashIndex(hashedIdx)
}
return nil
}
// Remove hashed host index from the hash ring
func (c *Consistent) delHashIndex(val uint64) {
idx :=-1
l :=0
r :=len(c.sortedHostsHashSet) - 1
for l <=r {
m :=(l + r) / 2
if c.sortedHostsHashSet[m]==val {
idx=m
break
} else if c.sortedHostsHashSet[m] < val {
l=m + 1
} else if c.sortedHostsHashSet[m] > val {
r=m - 1
}
}
if idx !=-1 {
c.sortedHostsHashSet=append(c.sortedHostsHashSet[:idx], c.sortedHostsHashSet[idx+1:]...)
}
}
和注冊緩存服務器相反,將服務器在 Map 映射以及 Hash 環中去除即完成了注銷;
這里的邏輯和上面注冊的邏輯極為類似,這里不再贅述!
查詢 Key 是整個一致性 Hash 算法的核心,但是實現起來也并不復雜;
代碼如下:
core/algorithm.go
func (c *Consistent) GetKey(key string) (string, error) {
hashedKey :=c.hashFunc(key)
idx :=c.searchKey(hashedKey)
return c.replicaHostMap[c.sortedHostsHashSet[idx]], nil
}
func (c *Consistent) searchKey(key uint64) int {
idx :=sort.Search(len(c.sortedHostsHashSet), func(i int) bool {
return c.sortedHostsHashSet[i] >=key
})
if idx >=len(c.sortedHostsHashSet) {
// make search as a ring
idx=0
}
return idx
}
代碼首先計算 key 的散列值;
隨后,在Hash環上“順時針”尋找可以緩存的第一臺緩存服務器:
idx :=sort.Search(len(c.sortedHostsHashSet), func(i int) bool {
return c.sortedHostsHashSet[i] >=key
})
注意到,如果 key 比當前Hash環中最大的虛擬節點的 hash 值還大,則選擇當前 Hash環 中 hash 值最小的一個節點(即“環形”的邏輯):
if idx >=len(c.sortedHostsHashSet) {
// make search as a ring
idx=0
}
searchKey 返回了虛擬節點在 Hash 環數組中的 index;
隨后,我們使用 map 返回 index 對應的緩存服務器的名稱即可;
至此,一致性 Hash 算法基本實現,接下來我們來驗證一下;
在驗證算法之前,我們還需要準備幾臺緩存服務器;
為了簡單起見,這里使用了 HTTP 服務器作為緩存服務器,具體代碼如下所示:
server/main.go
package main
import (
"flag"
"fmt"
"net/http"
"sync"
"time"
)
type CachedMap struct {
KvMap sync.Map
Lock sync.RWMutex
}
var (
cache=CachedMap{KvMap: sync.Map{}}
port=flag.String("p", "8080", "port")
regHost="http://localhost:18888"
expireTime=10
)
func main() {
flag.Parse()
stopChan :=make(chan interface{})
startServer(*port)
<-stopChan
}
func startServer(port string) {
hostName :=fmt.Sprintf("localhost:%s", port)
fmt.Printf("start server: %s\n", port)
err :=registerHost(hostName)
if err !=nil {
panic(err)
}
http.HandleFunc("/", kvHandle)
err=http.ListenAndServe(":"+port, nil)
if err !=nil {
err=unregisterHost(hostName)
if err !=nil {
panic(err)
}
panic(err)
}
}
func kvHandle(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
if _, ok :=cache.KvMap.Load(r.Form["key"][0]); !ok {
val :=fmt.Sprintf("hello: %s", r.Form["key"][0])
cache.KvMap.Store(r.Form["key"][0], val)
fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val)
time.AfterFunc(time.Duration(expireTime)*time.Second, func() {
cache.KvMap.Delete(r.Form["key"][0])
fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val)
})
}
val, _ :=cache.KvMap.Load(r.Form["key"][0])
_, err :=fmt.Fprintf(w, val.(string))
if err !=nil {
panic(err)
}
}
func registerHost(host string) error {
resp, err :=http.Get(fmt.Sprintf("%s/register?host=%s", regHost, host))
if err !=nil {
return err
}
defer resp.Body.Close()
return nil
}
func unregisterHost(host string) error {
resp, err :=http.Get(fmt.Sprintf("%s/unregister?host=%s", regHost, host))
if err !=nil {
return err
}
defer resp.Body.Close()
return nil
}
代碼接受由命令行指定的 -p 參數指定服務器端口號;
代碼執行后,會調用 startServer 函數啟動一個http服務器;
在 startServer 函數中,首先調用 registerHost 在代理服務器上進行注冊(下文會講),并監聽 /路徑,具體代碼如下:
func startServer(port string) {
hostName :=fmt.Sprintf("localhost:%s", port)
fmt.Printf("start server: %s\n", port)
err :=registerHost(hostName)
if err !=nil {
panic(err)
}
http.HandleFunc("/", kvHandle)
err=http.ListenAndServe(":"+port, nil)
if err !=nil {
err=unregisterHost(hostName)
if err !=nil {
panic(err)
}
panic(err)
}
}
kvHandle 函數對請求進行處理:
func kvHandle(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
if _, ok :=cache.KvMap.Load(r.Form["key"][0]); !ok {
val :=fmt.Sprintf("hello: %s", r.Form["key"][0])
cache.KvMap.Store(r.Form["key"][0], val)
fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val)
time.AfterFunc(time.Duration(expireTime)*time.Second, func() {
cache.KvMap.Delete(r.Form["key"][0])
fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val)
})
}
val, _ :=cache.KvMap.Load(r.Form["key"][0])
_, err :=fmt.Fprintf(w, val.(string))
if err !=nil {
panic(err)
}
}
首先,解析來自路徑的參數:?key=xxx;
隨后,查詢服務器中的緩存(為了簡單起見,這里使用 sync.Map 來模擬緩存):
最后,返回緩存;
有了緩存服務器之后,我們還需要一個代理服務器來選擇具體選擇哪個緩存服務器來請求;
代碼如下:
proxy/proxy.go
package proxy
import (
"fmt"
"github.com/jasonkayzk/consistent-hashing-demo/core"
"io/ioutil"
"net/http"
"time"
)
type Proxy struct {
consistent *core.Consistent
}
// NewProxy creates a new Proxy
func NewProxy(consistent *core.Consistent) *Proxy {
proxy :=&Proxy{
consistent: consistent,
}
return proxy
}
func (p *Proxy) GetKey(key string) (string, error) {
host, err :=p.consistent.GetKey(key)
if err !=nil {
return "", err
}
resp, err :=http.Get(fmt.Sprintf("http://%s?key=%s", host, key))
if err !=nil {
return "", err
}
defer resp.Body.Close()
body, _ :=ioutil.ReadAll(resp.Body)
fmt.Printf("Response from host %s: %s\n", host, string(body))
return string(body), nil
}
func (p *Proxy) RegisterHost(host string) error {
err :=p.consistent.RegisterHost(host)
if err !=nil {
return err
}
fmt.Println(fmt.Sprintf("register host: %s success", host))
return nil
}
func (p *Proxy) UnregisterHost(host string) error {
err :=p.consistent.UnregisterHost(host)
if err !=nil {
return err
}
fmt.Println(fmt.Sprintf("unregister host: %s success", host))
return nil
}
代理服務器的邏輯很簡單,就是創建一個一致性Hash結構: Consistent,把 Consistent 和請求緩存服務器的邏輯進行了一層封裝;
啟動代理服務器的代碼如下:
package main
import (
"fmt"
"github.com/jasonkayzk/consistent-hashing-demo/core"
"github.com/jasonkayzk/consistent-hashing-demo/proxy"
"net/http"
)
var (
port="18888"
p=proxy.NewProxy(core.NewConsistent(10, nil))
)
func main() {
stopChan :=make(chan interface{})
startServer(port)
<-stopChan
}
func startServer(port string) {
http.HandleFunc("/register", registerHost)
http.HandleFunc("/unregister", unregisterHost)
http.HandleFunc("/key", getKey)
fmt.Printf("start proxy server: %s\n", port)
err :=http.ListenAndServe(":"+port, nil)
if err !=nil {
panic(err)
}
}
func registerHost(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
err :=p.RegisterHost(r.Form["host"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("register host: %s success", r.Form["host"][0]))
}
func unregisterHost(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
err :=p.UnregisterHost(r.Form["host"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("unregister host: %s success", r.Form["host"][0]))
}
func getKey(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
val, err :=p.GetKey(r.Form["key"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))
}
和緩存服務器類似,這里采用 HTTP 服務器來模擬;
代理服務器監聽 18888 端口的幾個路由:
這里為了簡單起見,使用了這種方式進行服務注冊,實際使用時請使用其他組件進行實現!
接下來啟動緩存服務器:
start proxy server: 18888
分別啟動三個緩存服務器:
$ go run server/main.go -p 8080
start server: 8080
$ go run server/main.go -p 8081
start server: 8081
$ go run server/main.go -p 8082
start server: 8082
同時,代理服務器輸出:
register host: localhost:8080 success
register host: localhost:8081 success
register host: localhost:8082 success
可以看到緩存服務器已經成功注冊;
可以使用 curl 命令請求代理服務器獲取緩存 key:
$ curl localhost:18888/key?key=123
key: 123, val: hello: 123
此時,代理服務器輸出:
Response from host localhost:8080: hello: 123
同時,8000端口的緩存服務器輸出:
cached key: {123: hello: 123}
removed cached key after 10s: {123: hello: 123}
可以看到,8000端口的服務器對key值進行了緩存,并在10秒后清除了緩存;
嘗試獲取多個Key:
Response from host localhost:8082: hello: 45363456
Response from host localhost:8080: hello: 4
Response from host localhost:8082: hello: 1
Response from host localhost:8080: hello: 2
Response from host localhost:8082: hello: 3
Response from host localhost:8080: hello: 4
Response from host localhost:8082: hello: 5
Response from host localhost:8080: hello: 6
Response from host localhost:8082: hello: sdkbnfoerwtnbre
Response from host localhost:8082: hello: sd45555254tg423i5gvj4v5
Response from host localhost:8081: hello: 0
Response from host localhost:8082: hello: 032452345
可以看到不同的key被散列到了不同的緩存服務器;
接下來我們通過debug查看具體的變量來一探究竟;
開啟debug,并注冊單個緩存服務器后,查看 Consistent 中的值:
注冊三個緩存服務器后,查看 Consistent 中的值:
從debug中的變量,我們就可以很清楚的看到注冊不同數量的服務器時,一致性Hash上服務器的動態變化!
以上就是基本的一致性Hash算法的實現了!
但是很多時候,我們的緩存服務器需要同時處理大量的緩存請求,而通過上面的算法,我們總是會去同一臺緩存服務器去獲取緩存數據;
如果很多的熱點數據都落在了同一臺緩存服務器上,則可能會出現性能瓶頸;
Google 在2017年提出了: 含有負載邊界值的一致性Hash算法;
下面我們在基本的一致性Hash算法的基礎上,實現含有負載邊界值的一致性Hash!
17年時,Google 提出了含有負載邊界值的一致性Hash算法,此算法主要應用于在實現一致性的同時,實現負載的平均性;
此算法最初由 Vimeo 的 Andrew Rodland 在 haproxy 中實現并開源;
參考:
https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
arvix論文地址:
https://arxiv.org/abs/1608.01350
這個算法將緩存服務器視為一個含有一定容量的桶(可以簡單理解為Hash桶),將客戶端視為球,則平均性目標表示為:所有約等于平均密度(球的數量除以桶的數量):
實際使用時,可以設定一個平均密度的參數 ε,將每個桶的容量設置為平均加載時間的 下上限 (1+ε);
具體的計算過程如下:
例如下面的圖:
使用哈希函數將 6 個球和 3 個桶分配給 Hash環 上的隨機位置,假設每個桶的容量設置為 2,按 ID 值的遞增順序分配球;
在上面基本一致性 Hash 算法實現的基礎上,我們繼續實現含有負載邊界值的一致性Hash算法;
在核心算法中添加根據負載情況查詢Key的函數,以及增加/釋放負載值的函數;
根據負載情況查詢 Key 的函數:
core/algorithm.go
func (c *Consistent) GetKeyLeast(key string) (string, error) {
c.RLock()
defer c.RUnlock()
if len(c.replicaHostMap)==0 {
return "", ErrHostNotFound
}
hashedKey :=c.hashFunc(key)
idx :=c.searchKey(hashedKey) // Find the first host that may serve the key
i :=idx
for {
host :=c.replicaHostMap[c.sortedHostsHashSet[i]]
loadChecked, err :=c.checkLoadCapacity(host)
if err !=nil {
return "", err
}
if loadChecked {
return host, nil
}
i++
// if idx goes to the end of the ring, start from the beginning
if i >=len(c.replicaHostMap) {
i=0
}
}
}
func (c *Consistent) checkLoadCapacity(host string) (bool, error) {
// a safety check if someone performed c.Done more than needed
if c.totalLoad < 0 {
c.totalLoad=0
}
var avgLoadPerNode float64
avgLoadPerNode=float64((c.totalLoad + 1) / int64(len(c.hostMap)))
if avgLoadPerNode==0 {
avgLoadPerNode=1
}
avgLoadPerNode=math.Ceil(avgLoadPerNode * (1 + loadBoundFactor))
candidateHost, ok :=c.hostMap[host]
if !ok {
return false, ErrHostNotFound
}
if float64(candidateHost.LoadBound)+1 <=avgLoadPerNode {
return true, nil
}
return false, nil
}
在 GetKeyLeast 函數中,首先根據 searchKey 函數,順時針獲取可能滿足條件的第一個虛擬節點;
隨后調用 checkLoadCapacity 校驗當前緩存服務器的負載數是否滿足條件:
如果不滿足條件,則沿著 Hash 環走到下一個虛擬節點,繼續判斷是否滿足條件,直到滿足條件;
這里使用的是無條件的 for 循環,因為一定存在低于 平均負載(1 + loadBoundFactor) 的虛擬節點!*
增加/釋放負載值的函數:
core/algorithm.go
func (c *Consistent) Inc(hostName string) {
c.Lock()
defer c.Unlock()
atomic.AddInt64(&c.hostMap[hostName].LoadBound, 1)
atomic.AddInt64(&c.totalLoad, 1)
}
func (c *Consistent) Done(host string) {
c.Lock()
defer c.Unlock()
if _, ok :=c.hostMap[host]; !ok {
return
}
atomic.AddInt64(&c.hostMap[host].LoadBound, -1)
atomic.AddInt64(&c.totalLoad, -1)
}
邏輯比較簡單,就是原子的對對應緩存服務器進行負載加減一操作;
在代理服務器中增加路由:
proxy/proxy.go
func (p *Proxy) GetKeyLeast(key string) (string, error) {
host, err :=p.consistent.GetKeyLeast(key)
if err !=nil {
return "", err
}
p.consistent.Inc(host)
time.AfterFunc(time.Second*10, func() { // drop the host after 10 seconds(for testing)!
fmt.Printf("dropping host: %s after 10 second\n", host)
p.consistent.Done(host)
})
resp, err :=http.Get(fmt.Sprintf("http://%s?key=%s", host, key))
if err !=nil {
return "", err
}
defer resp.Body.Close()
body, _ :=ioutil.ReadAll(resp.Body)
fmt.Printf("Response from host %s: %s\n", host, string(body))
return string(body), nil
}
注意:這里模擬的是單個key請求可能會持續10s鐘;
啟動代理服務器時增加路由:
main.go
func startServer(port string) {
// ......
http.HandleFunc("/key_least", getKeyLeast)
// ......
}
func getKeyLeast(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
val, err :=p.GetKeyLeast(r.Form["key"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))
}
啟動代理服務器,并開啟三臺緩存服務器;
通過下面的命令獲取含有負載邊界的Key:
$ curl localhost:18888/key_least?key=123
key: 123, val: hello: 123
多次請求后的結果如下:
```
start proxy server: 18888
register host: localhost:8080 success
register host: localhost:8081 success
register host: localhost:8082 success
Response from host localhost:8080: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8082: hello: 123
```
可以看到,緩存被均攤到了其他服務器(這是由于一個緩存請求會持續10s導致的)!
本文拋磚引玉的講解了一致性Hash算法的原理,并提供了Go的實現;
在此基礎之上,根據 Google 的論文實現了帶有負載邊界的一致性Hash算法;
當然上面的代碼在實際生產環境下仍然需要部分改進,如:
大家在實際使用時,可以根據需要,搭配實際的組件!
希表和哈希函數
在記錄的存儲位置和它的關鍵字之間是建立一個確定的對應關系(映射函數),使每個關鍵字和一個存儲位置能唯一對應。
這個映射函數稱為哈希函數,根據這個原則建立的表稱為哈希表(Hash Table),也叫散列表。
以上描述,如果通過數學形式來描述就是:
若查找關鍵字為 key,則其值存放在 f(key) 的存儲位置上。由此,不需比較便可直接取得所查記錄。
注:哈希查找與線性表查找和樹表查找最大的區別在于,不用數值比較。
沖突
若 key1 ≠ key2 ,而 f(key1)=f(key2),這種情況稱為沖突(Collision)。
根據哈希函數f(key)和處理沖突的方法將一組關鍵字映射到一個有限的連續的地址集(區間)上,并以關鍵字在地址集中的“像”作為記錄在表中的存儲位置,這一映射過程稱為構造哈希表。
構造哈希表這個場景就像汽車找停車位,如果車位被人占了,只能找空的地方停。
構造哈希表
由以上內容可知,哈希查找本身其實不費吹灰之力,問題的關鍵在于如何構造哈希表和處理沖突。
常見的構造哈希表的方法有 5 種:
(1)直接定址法
說白了,就是小學時學過的一元一次方程。
即 f(key)=a * key + b。其中,a和b 是常數。
(2)數字分析法
假設關鍵字是R進制數(如十進制)。并且哈希表中可能出現的關鍵字都是事先知道的,則可選取關鍵字的若干數位組成哈希地址。
選取的原則是使得到的哈希地址盡量避免沖突,即所選數位上的數字盡可能是隨機的。
(3)平方取中法
取關鍵字平方后的中間幾位為哈希地址。通常在選定哈希函數時不一定能知道關鍵字的全部情況,僅取其中的幾位為地址不一定合適;
而一個數平方后的中間幾位數和數的每一位都相關, 由此得到的哈希地址隨機性更大。取的位數由表長決定。
(4)除留余數法
取關鍵字被某個不大于哈希表表長 m 的數 p 除后所得的余數為哈希地址。
即 f(key)=key % p (p ≤ m)
這是一種最簡單、最常用的方法,它不僅可以對關鍵字直接取模,也可在折疊、平方取中等運算之后取模。
注意:p的選擇很重要,如果選的不好,容易產生沖突。根據經驗,一般情況下可以選p為素數。
(5)隨機數法
選擇一個隨機函數,取關鍵字的隨機函數值為它的哈希地址,即 f(key)=random(key)。
通常,在關鍵字長度不等時采用此法構造哈希函數較為恰當。
解決沖突
設計合理的哈希函數可以減少沖突,但不能完全避免沖突。
所以需要有解決沖突的方法,常見有兩類
(1)開放定址法
如果兩個數據元素的哈希值相同,則在哈希表中為后插入的數據元素另外選擇一個表項。
當程序查找哈希表時,如果沒有在第一個對應的哈希表項中找到符合查找要求的數據元素,程序就會繼續往后查找,直到找到一個符合查找要求的數據元素,或者遇到一個空的表項。
例子
若要將一組關鍵字序列 {1, 9, 25, 11, 12, 35, 17, 29} 存放到哈希表中。
采用除留余數法構造哈希表;采用開放定址法處理沖突。
不妨設選取的p和m為13,由 f(key)=key % 13 可以得到下表。
需要注意的是,在上圖中有兩個關鍵字的探查次數為 2 ,其他都是1。
這個過程是這樣的:
a. 12 % 13 結果是12,而它的前面有個 25 ,25 % 13 也是12,存在沖突。
我們使用開放定址法 (12 + 1) % 13=0,沒有沖突,完成。
b. 35 % 13 結果是 9,而它的前面有個 9,9 % 13也是 9,存在沖突。
我們使用開放定址法 (9 + 1) % 13=10,沒有沖突,完成。
(2)拉鏈法
將哈希值相同的數據元素存放在一個鏈表中,在查找哈希表的過程中,當查找到這個鏈表時,必須采用線性查找方法。
在這種方法中,哈希表中每個單元存放的不再是記錄本身,而是相應同義詞單鏈表的頭指針。
例子
如果對開放定址法例子中提到的序列使用拉鏈法,得到的結果如下圖所示:
實現一個哈希表
假設要實現一個哈希表,要求
a. 哈希函數采用除留余數法,即 f(key)=key % p (p ≤ m)
b. 解決沖突采用開放定址法,即 f2(key)=(f(key)+i) % size (p ≤ m)
(1)定義哈希表的數據結構
class HashTable {
public int key=0; // 關鍵字
public int data=0; // 數值
public int count=0; // 探查次數
}
(2)在哈希表中查找關鍵字key
根據設定的哈希函數,計算哈希地址。如果出現地址沖突,則按設定的處理沖突的方法尋找下一個地址。
如此反復,直到不沖突為止(查找成功)或某個地址為空(查找失敗)。
/**
* 查找哈希表
* 構造哈希表采用除留取余法,即f(key)=key mod p (p ≤ size)
* 解決沖突采用開放定址法,即f2(key)=(f(key) + i) mod p (1 ≤ i ≤ size-1)
* ha為哈希表,p為模,size為哈希表大小,key為要查找的關鍵字
*/
public int searchHashTable(HashTable[] ha, int p, int size, int key) {
int addr=key % p; // 采用除留取余法找哈希地址
// 若發生沖突,用開放定址法找下一個哈希地址
while (ha[addr].key !=KEY && ha[addr].key !=key) {
addr=(addr + 1) % size;
}
if (ha[addr].key==key) {
return addr; // 查找成功
} else {
return FAILED; // 查找失敗
}
}
(3)刪除關鍵字為key的記錄
在采用開放定址法處理沖突的哈希表上執行刪除操作,只能在被刪記錄上做刪除標記,而不能真正刪除記錄。
找到要刪除的記錄,將關鍵字置為刪除標記DELKEY。
public int deleteHashTable(HashTable[] ha, int p, int size, int key) {
int addr=0;
addr=searchHashTable(ha, p, size, key);
if (FAILED !=addr) { // 找到記錄
ha[addr].key=DELKEY; // 將該位置的關鍵字置為DELKEY
return SUCCESS;
} else {
return KEY; // 查找不到記錄,直接返回KEY
}
}
(4)插入關鍵字為key的記錄
將待插入的關鍵字key插入哈希表
先調用查找算法,若在表中找到待插入的關鍵字,則插入失敗;
若在表中找到一個開放地址,則將待插入的結點插入到其中,則插入成功。
public void insertHashTable(HashTable[] ha, int p, int size, int key) {
int i=1;
int addr=0;
addr=key % p; // 通過哈希函數獲取哈希地址
if (ha[addr].key==KEY || ha[addr].key==DELKEY) { // 如果沒有沖突,直接插入
ha[addr].key=key;
ha[addr].count=1;
} else { // 如果有沖突,使用開放定址法處理沖突
do {
addr=(addr + 1) % size; // 尋找下一個哈希地址
i++;
} while (ha[addr].key !=KEY && ha[addr].key !=DELKEY);
ha[addr].key=key;
ha[addr].count=i;
}
}
(5)建立哈希表
先將哈希表中各關鍵字清空,使其地址為開放的,然后調用插入算法將給定的關鍵字序列依次插入。
public void createHashTable(HashTable[] ha, int[] list, int p, int size) {
int i=0;
// 將哈希表中的所有關鍵字清空
for (i=0; i < ha.length; i++) {
ha[i].key=KEY;
ha[i].count=0;
}
// 將關鍵字序列依次插入哈希表中
for (i=0; i < list.length; i++) {
this.insertHashTable(ha, p, size, list[i]);
}
}
完整代碼
class HashTable {
public int key=0; // 關鍵字
public int data=0; // 數值
public int count=0; // 探查次數
}
public class HashSearch {
private final static int MAXSIZE=20;
private final static int KEY=1;
private final static int DELKEY=2;
private final static int SUCCESS=0;
private final static int FAILED=0xFFFFFFFF;
/**
* 查找哈希表
* 構造哈希表采用除留取余法,即f(key)=key mod p (p ≤ size)
* 解決沖突采用開放定址法,即f2(key)=(f(key) + i) mod p (1 ≤ i ≤ size-1)
* ha為哈希表,p為模,size為哈希表大小,key為要查找的關鍵字
*/
public int searchHashTable(HashTable[] ha, int p, int size, int key) {
int addr=key % p; // 采用除留取余法找哈希地址
// 若發生沖突,用開放定址法找下一個哈希地址
while (ha[addr].key !=KEY && ha[addr].key !=key) {
addr=(addr + 1) % size;
}
if (ha[addr].key==key) {
return addr; // 查找成功
} else {
return FAILED; // 查找失敗
}
}
/**
* 刪除哈希表中關鍵字為key的記錄
* 找到要刪除的記錄,將關鍵字置為刪除標記DELKEY
*/
public int deleteHashTable(HashTable[] ha, int p, int size, int key) {
int addr=0;
addr=searchHashTable(ha, p, size, key);
if (FAILED !=addr) { // 找到記錄
ha[addr].key=DELKEY; // 將該位置的關鍵字置為DELKEY
return SUCCESS;
} else {
return KEY; // 查找不到記錄,直接返回KEY
}
}
/**
* 將待插入的關鍵字key插入哈希表
* 先調用查找算法,若在表中找到待插入的關鍵字,則插入失敗;
* 若在表中找到一個開放地址,則將待插入的結點插入到其中,則插入成功。
*/
public void insertHashTable(HashTable[] ha, int p, int size, int key) {
int i=1;
int addr=0;
addr=key % p; // 通過哈希函數獲取哈希地址
if (ha[addr].key==KEY || ha[addr].key==DELKEY) { // 如果沒有沖突,直接插入
ha[addr].key=key;
ha[addr].count=1;
} else { // 如果有沖突,使用開放定址法處理沖突
do {
addr=(addr + 1) % size; // 尋找下一個哈希地址
i++;
} while (ha[addr].key !=KEY && ha[addr].key !=DELKEY);
ha[addr].key=key;
ha[addr].count=i;
}
}
/**
* 創建哈希表
* 先將哈希表中各關鍵字清空,使其地址為開放的,然后調用插入算法將給定的關鍵字序列依次插入。
*/
public void createHashTable(HashTable[] ha, int[] list, int p, int size) {
int i=0
// 將哈希表中的所有關鍵字清空
for (i=0; i < ha.length; i++) {
ha[i].key=KEY;
ha[i].count=0;
}
// 將關鍵字序列依次插入哈希表中
for (i=0; i < list.length; i++) {
this.insertHashTable(ha, p, size, list[i]);
}
}
/**
* 輸出哈希表
*/
public void displayHashTable(HashTable[] ha) {
int i=0;
System.out.format("pos:\t", "pos");
for (i=0; i < ha.length; i++) {
System.out.format("%4d", i);
}
System.out.println;
System.out.format("key:\t");
for (i=0; i < ha.length; i++) {
if (ha[i].key !=KEY) {
System.out.format("%4d", ha[i].key);
} else {
System.out.format(" ");
}
}
System.out.println;
System.out.format("count:\t");
for (i=0; i < ha.length; i++) {
if (0 !=ha[i].count) {
System.out.format("%4d", ha[i].count);
} else {
System.out.format(" ");
}
}
System.out.println;
}
public static void main(String[] args) {
int list={ 3, 112, 245, 27, 44, 19, 76, 29, 90 };
HashTable ha=new HashTable[MAXSIZE];
for (int i=0; i < ha.length; i++) {
ha[i]=new HashTable;
}
HashSearch search=new HashSearch;
search.createHashTable(ha, list, 19, MAXSIZE);
search.displayHashTable(ha);
}
}
參考資料
《數據結構習題與解析》(B級第3版)
轉自:靜默虛空
http://www.cnblogs.com/jingmoxukong/p/4332252.html
-----這里是數學思維的聚集地------
“超級數學建模”(微信號supermodeling),每天學一點小知識,輕松了解各種思維,做個好玩的理性派。50萬數學精英都在關注!
篇文章給大家帶來的內容是關于JavaScript中散列表(哈希表)的詳細介紹(代碼示例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。
散列表
散列表(Hash table,也叫哈希表),是根據鍵(Key)而直接訪問在內存存儲位置的數據結構。也就是說,它通過計算一個關于鍵值的函數,將所需查詢的數據映射到表中一個位置來訪問記錄,這加快了查找速度。這個映射函數稱做散列函數,存放記錄的數組稱做散列表。
我們從上圖開始分析
通過上面簡單的例子,應該會有一下幾點一個大致的理解
那么我們在接著看一般我們會怎么去取值呢?
比如我們存儲一個key為1000,value為'張三' ---> {key:1000,value:'張三'}
從我們上述的解釋,它是不是應該存放在1000%10的這個插槽里。
當我們通過key想要找到value張三,是不是到key%10這個插槽里找就可以了呢?到了這里你可以停下來思考一下。
散列的一些術語(可以簡單的看一下)
看到這里不知道你是否大致理解了散列函數是什么了沒。通過例子,再通過你的思考,你可以回頭在讀一遍文章頭部關于散列表的定義。如果你能讀懂了,那么我估計你應該是懂了。
常用的散列函數
處理整數 h=>k%M (也就是我們上面所舉的例子)
處理字符串:
function h_str(str,M){
return [...str].reduce((hash,c)=>{
hash=(31*hash + c.charCodeAt(0)) % M
},0)
}
hash算法不是這里的重點,我也沒有很深入的去研究,這里主要還是去理解散列表是個怎樣的數據結構,它有哪些優點,它具體做了怎樣一件事。
而hash函數它只是通過某種算法把key映射到列表中。
構建散列表
通過上面的解釋,我們這里做一個簡單的散列表
散列表的組成
初始化
- 初始化散列表有多少個槽
- 用一個數組來創建M個槽
class HashTable {
constructor(num=1000){
this.M=num;
this.slots=new Array(num);
}
}
散列函數
處理字符串的散列函數,這里使用字符串是因為,數值也可以傳換成字符串比較通用一些
先將傳遞過來的key值轉為字符串,為了能夠嚴謹一些
將字符串轉換為數組, 例如'abc'=> [...'abc']=> ['a','b','c']
分別對每個字符的charCodeAt進行計算,取M余數是為了剛好對應插槽的數量,你總共就10個槽,你的數值%10 肯定會落到 0-9的槽里
h(str){
str=str + '';
return [...str].reduce((hash,c)=>{
hash=(331 * hash + c.charCodeAt()) % this.M;
return hash;
},0)
}
添加
調用hash函數得到對應的存儲地址(就是我們之間類比的槽)
因為一個槽中可能會存多個值,所以需要用一個二維數組去表示,比如我們計算得來的槽的編號是0,也就是slot[0],那么我們應該往slot[0] [0]里存,后面進來的同樣是編號為0的槽的話就接著往slot[0] [1]里存
add(key,value) {
const h=this.h(key);
// 判斷這個槽是否是一個二維數組, 不是則創建二維數組
if(!this.slots[h]){
this.slots[h]=[];
}
// 將值添加到對應的槽中
this.slots[h].push(value);
}
刪除
通過hash算法,找到所在的槽
通過過濾來刪除
delete(key){
const h=this.h(key);
this.slots[h]=this.slots[h].filter(item=>item.key!==key);
}
查找
通過hash算法找到對應的槽
用find函數去找同一個key的值
返回對應的值
search(key){
const h=this.h(key);
const list=this.slots[h];
const data=list.find(x=> x.key===key);
return data ? data.value : null;
}
總結
講到這里,散列表的數據結構已經講完了,其實我們每學一種數據結構或算法的時候,不是去照搬實現的代碼,我們要學到的是思想,比如說散列表它究竟做了什么,它是一種存儲方式,可以快速的通過鍵去查找到對應的值。那么我們會思考,如果我們設計的槽少了,在同一個槽里存放了大量的數據,那么這個散列表它的搜索速度肯定是會大打折扣的,這種情況又應該用什么方式去解決,又或者是否用其他的數據結構的代替它。
補充一個小知識點
v8引擎中的數組 arr=[1,2,3,4,5] 或 new Array(100) 我們都知道它是開辟了一塊連續的空間去存儲,而arr=[] , arr[100000]=10 這樣的操作它是使用的散列,因為這種操作如果連續開辟100萬個空間去存儲一個值,那么顯然是在浪費空間。
以上就是JavaScript中散列表(哈希表)的詳細介紹(代碼示例)的詳細內容,更多請關注其它相關文章!
更多技巧請《轉發 + 關注》哦!
*請認真填寫需求信息,我們會在24小時內與您取得聯系。