Moonshine's Blog

日拱一卒无有尽,功不唐捐终入海

hashicorp:raft源码系列(1)--导读

Raft是干啥的?

动画说明

简述如下图:用户SET 5,3个不同节点都能获取SET 5的操作。所谓的最简单的分布式一致性。

oO3r0P.png

Raft项目包含哪些模块?

项目有多少行?

分析下整个项目大概是1W多行,并不是很大,代码注释非常齐全,理解起来比较轻松。代码抽象得很好,非常值得学习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-> % cloc ./
77 text files.
77 unique files.
8 files ignored.

github.com/AlDanial/cloc v 1.90 T=0.09 s (814.1 files/s, 201614.7 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Go 57 2207 2786 11864
Markdown 4 106 0 255
YAML 4 26 29 141
XML 4 0 0 108
make 1 10 2 33
Bourne Shell 1 3 3 10
-------------------------------------------------------------------------------
SUM: 71 2352 2820 12411
-------------------------------------------------------------------------------

从哪里开始?

首先,我们来看下api.go/Raft结构,表示的是一个Raft节点,最核心的数据结构。划分3大块

  1. NewRaft节点需要的信息:下图中标红点的,几大核心组成。下一篇我们从网络层开始详细分析。

    除了raftState,其他部分全是interface,模块抽象的很细致。

  2. Leader操作相关:整个Raft最核心的就是Leader的状态转换过程。我们后续也会对这一部分的实现做详细说明。

  3. 一些异步操作:这一块是嵌入在各个模块的实现中的。项目里面一些chan的使用也是值得学习的。

oO0Ein.png

新建Raft节点逻辑?

  • ① 校验配置

  • ② 获取当前任期,从StableStore中获取

  • ③ 获取最近的日志号和具体Log,从LogStore中获取

  • ④ 创建Buffer applyCh,用来应用Log到FSM。这里Future的应用可以学习下,后面写applyLog的时候可以详细分析。

  • ⑤ 初始化Raft结构。

  • ⑥ Set各种变量,这里可以研究下config是怎么实现线程安全的变更。

  • ⑦ 尝试从备份中恢复,处理peer变更日志,eg:添加节点、删除节点。这里为啥只处理Peer变更信息,都已经将Log解析出来了,为啥不直接应用呢?

  • ⑧ 给传输层注册心跳处理器。在这里初始化,而不是在goroutine里面处理,是为了避免队头阻塞。

  • ⑨ 启动goroutine,如下启动了3个协程。

    oOhDud.png

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
// ① 校验配置
if err := ValidateConfig(conf); err != nil {
return nil, err
}

// ② 获取当前任务,这里是从StableStore中获取
currentTerm, err := stable.GetUint64(keyCurrentTerm)

// ③ 获取最近的日志号和具体Log,从LogStore中获取;
lastIndex, err := logs.LastIndex()
if err = logs.GetLog(lastIndex, &lastLog); err != nil {
return nil, fmt.Errorf("failed to get last log at index %d: %v", lastIndex, err)
}

// ④ 创建Buffer applyCh,用来应用Log到FSM。这里Future的应用可以学习下。
applyCh := make(chan *logFuture)
if conf.BatchApplyCh {
applyCh = make(chan *logFuture, conf.MaxAppendEntries)
}

// ⑤ 初始化Raft结构。
r := &Raft{
...
}

// ⑥ Set各种变量,下面这一行的实现可以研究下,为啥不直接r.conf = conf
r.conf.Store(*conf)
r.setState(Follower)
r.setCurrentTerm(currentTerm)
r.setLastLog(lastLog.Index, lastLog.Term)

// ⑦ 尝试从备份中恢复,处理peer变更日志,eg:添加节点、删除节点.
if err := r.restoreSnapshot(); err != nil {
return nil, err
}
snapshotIndex, _ := r.getLastSnapshot()
for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log
if err := r.logs.GetLog(index, &entry); err != nil {
...
}
if err := r.processConfigurationLogEntry(&entry); err != nil {
return nil, err
}
}

// ⑧ 给传输层注册心跳处理器
trans.SetHeartbeatHandler(r.processHeartbeat)

// ⑨ 启动goroutine
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
}

至此,我们就将Raft节点启动起来了

技术点1:atomic.Value

atomic.Value Store 流程

1
2
3
4
// conf这个变量会涉及到多个goroutine的并发修改&读取。
// 为了保证读写的原子性,通常我们会加锁。但是加锁又有点过重了,为了更加高效的实现,使用了atomic。
conf atomic.Value
r.conf.Store(*conf)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 8个写routine + 1个读routine
func TestAtomic(t *testing.T) {
var wg sync.WaitGroup
var gloMy = my{}
for i := 0; i < 8; i++ {
go func() {
wg.Add(1)
for {
tmp := rand.Uint64() % 100
gloMy.v1 = tmp
// 在这两次操作之间就会产生中间状态,这个状态就很奇怪没有意义
gloMy.v2 = fmt.Sprintf("s%d", tmp)
}
wg.Done()
}()
}

wg.Add(1)
go func() {
for {
t.Log(gloMy.v1, gloMy.v2)
}
wg.Done()
}()

wg.Wait()
}


=== RUN TestAtomic
tmp_test.go:33: 75 s64 // 不合符预期,脏读了
tmp_test.go:33: 63 s63
tmp_test.go:33: 48 s48
tmp_test.go:33: 5 s5
tmp_test.go:33: 1 s1

推荐一篇文章 :文中将atomic的来龙去脉都解释了一遍。

技术点2:队头阻塞

1
2
// 这行的位置,在goroutine启动之前注册HeartbeatHandler处理器
trans.SetHeartbeatHandler(r.processHeartbeat)

小小疑问❓

Q: 为啥要叫FSM,我的理解FSM不就是类似于MySQL这种可以应用Log的地方吗?总感觉用状态机命名怪怪的,容易误会。

Q: Config 和 Configuration的区别? Config里面都是单个Raft节点的配置,比如超时之类的;Configuration是Raft Cluster相关的信息,比如有几个节点之类的。