Moonshine's Blog

日拱一卒无有尽,功不唐捐终入海

怎么实现一个分布式kv系统(2)-静态分区

摘要

本节要实现的有2点

  • 解析toml文件
  • 计算key的hash值
  • 将请求路由到对应的shard

编程实现

1. 定义&解析toml规则文件

定义sharding.toml文件

1
2
3
4
5
6
7
8
9
10
11
[[shards]]
name = "Moscow"
id = 0

[[shards]]
name = "Minsk"
id = 1

[[shards]]
name = "Kiev"
id = 2

导入解析toml的包

添加configFile参数解析

在main中读取配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import (
"github.com/BurntSushi/toml"
)

var (
configFile = flag.String("config-file", "sharding.toml", "The configuration file")
)

func main() {
flag.Parse()

var c config.Config
if _, err := toml.DecodeFile(*configFile, &c); err != nil {
log.Fatalf("failed to decode config file(%q):%v", *configFile, err)
}
}

2. 添加config模块

创建config/config.go

定义Config & Shard结构

1
2
3
4
5
6
7
8
9
10
package config

type Shard struct {
Name string
Id string
}

type Config struct {
Shards []Shard
}

3. 指定shard

先不自动shard,先手动指定shard

添加shard参数解析

检查shard是否存在

确认shard的id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// main.go
var (
shardName = flag.String("shardName", "Moscow", "The name of the shard")
)

ok, shard := c.ExistsShard(*shardName)
if !ok {
log.Fatalf("shard %v not exists", shardName)
}

// config.go
func (c *Config) ExistsShard(name string) (bool, Shard) {
var rc Shard
for _, shard := range c.Shards {
if shard.Name == name {
return true, shard
}
}
return false, rc
}

4. 写入数据到指定shard

写入规则:hash(key) % shardCounter 就是需要写入的分片

更新Server结构,添加shardCounter&shardIndex字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// web.go
type Server struct {
db *db.Database
shardCounter int
shardIdex int
}

func NewServer(db *db.Database, shardCounter, shardIdex int) *Server {
return &Server{
db: db,
shardCounter: shardCounter,
shardIndex: shardIndex,
}
}

导入计算hash的包: hash/fnv包,计算shard

更新GetHandler、SetHandler,调用getShard来获取应该写入数据的分片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// web.go
func (s *Server) getShard(key string) uint64 {
h := fnv.New64()
h.Write([]byte(key))
return h.Sum64() % uint64(s.shardCounter)
}

func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
value, err := s.db.GetKey(key)
fmt.Fprintf(w, "%q:%q; shard: %d; %v Get Called\n", key, value, s.getShard(key), err)
}

func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
value := r.Form.Get("value")
err := s.db.SetKey(key, []byte(value))
fmt.Fprintf(w, "%q:%q; shard: %d; %v; Set called\n", key, value, s.getShard(key), err)
}

4.获取所有节点的地址

以上虽然可以计算出分片,可以还没有办法路由给其他的分片。

首先,需要知道其他分片的地址。

其次,将信息传入到Server对象中。

接着,定义redirect函数,请求转发。

最后,改造GetHandle、GetHandle转发请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// config.go
func (c *Config) GetAddress() map[int]string {
addrs := make(map[int]string)
for _, addr := range c.Shards {
addrs[addr.Id] = addr.Address
}
return addrs
}

// web.go
type Server struct {
db *db.Database
shardCounter int
shardIndex int
addrs map[int]string
}

// main.go
addrs := c.GetAddress()
svr := web.NewServer(db, len(c.Shards), shard.Id, addrs)

// web.go
func (s *Server) redirect(w http.ResponseWriter, r *http.Request, shard int) error {
resp, err := http.Get("http://" + s.addrs[shard] + r.RequestURI)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Error redirect request:%v", err)
return err
}
defer resp.Body.Close()
io.Copy(w, resp.Body)
return nil
}



func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
shard := s.getShard(key)
if shard != uint32(s.shardIndex) {
err := s.redirect(w, r, int(shard))
if err != nil {
return
}
} else {
value, err := s.db.GetKey(key)
fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Get Called\n", key, value, shard, s.shardIndex, err)
}
}

func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
value := r.Form.Get("value")
shard := s.getShard(key)
if shard != uint32(s.shardIndex) {
err := s.redirect(w, r, int(shard))
if err != nil {
return
}
} else {
err := s.db.SetKey(key, []byte(value))
fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Set Called\n", key, value, shard, s.shardIndex, err)
}
}

5.测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
get(){
for key in a b c;do
echo -e "\n========= get $key========="
for port in 8000 8001 8002;do
curl "http://127.0.0.1:$port/get?key=$key"
done
done
}

set(){
for key in a b c;do
echo -e "\n========= set $key========="
for port in 8000 8001 8002;do
curl "http://127.0.0.1:$port/set?key=$key&value=$key"
done
done
}

set
get

参考资料

本节完整代码:https://github.com/YuriyNasretdinov/distribkv/tree/part2

youtube视频:https://www.youtube.com/watch?v=5VK5tAyZDxQ&list=PLWwSgbaBp9XrMkjEhmTIC37WX2JfwZp7I&index=3

B站视频:https://www.bilibili.com/video/BV1nR4y177YM?p=2