Moonshine's Blog

日拱一卒无有尽,功不唐捐终入海

hashicorp-raft源码系列-4-leader逻辑

前面三篇我们分别介绍了Raft的API、网络层、存储层,从这一篇开始我们来看Raft的Leader、Candidate、Follower实现。

r.runLeader逻辑

事务提交的过程:接着我们一点点分析,

发送请求,Apply方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Apply
// -> ApplyLog
// 写的时候也不需要返回什么,主要就是Check Error

func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: log.Data,
Extensions: log.Extensions,
},
}
logFuture.init()

select {
...
// 这个写法很有意思,到底是什么意思呢?放到applyCh马上返回,这个时候return logFuture不一定有数据的吧,client怎么判断操作是成功还是失败呢。
case r.applyCh <- logFuture:
return logFuture
}
}
runLeader()

接着我们看下leader初始化的时候会干些啥,重点看下初始化的后台处理协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (r *Raft) runLeader() {
// ?? leaderCh这个chan还真的是不知道干啥的,后面有时间在重点看下
overrideNotifyBool(r.leaderCh, true)

// setup leader state,主要是节点的信息
r.setupLeaderState()

// 定义清理逻辑
defer func() {
...
}()

// 重点:为每一个peer启动一个Log复制协程,
r.startStopReplication()
// 调用-> r.goFunc(func() { r.replicate(s) })

// 开始leader循环
r.leaderLoop()
}
Leader消费r.applyCh

将客户端的写凑成一批,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
case newLog := <-r.applyCh:
// Group commit, 组提交
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.config().MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}

// Dispatch the logs
if stepDown {
....
} else {
r.dispatchLogs(ready)
}

接着看看dispatchLogs逻辑

加r.leaderState.inflight list, 表示需要处理的applyLogs

日志持久化 r.logs.StoreLogs(logs),类比mysql 持久化redo log

将自己的log index往前,并且通知commitCh

持久化lastIndex,r.setLastLog(lastIndex, term)

通知每个replication新Log – asyncNotifyCh(f.triggerCh)

接着就是上面的初始化的时候干的事情了,给每个channel发消息

想想哪些是异步的?为啥要异步处理呢?

搞清楚重点

下面的observe是干啥的?

1
r.observe(LeaderObservation{Leader: leader})

client 写请求 :api.go Apply方法,返回一个ApplyFuture对象,包含req、response、error

1
2


–> 发消息 append entity –> 多数派的回复 – > 客户端返回 –> 应用FSM(另一个流程)–> 通知其他节点可以应用 –

client返回的标准是啥?日志多数派 or Master apply成功(不可能)

认真琢磨其中的优化

问题记录

1、chan里面只有一个元素,并且不能并发调用。下面代码的理解,到底是怎么避免并发调用的?还有别的实现吗?这个实现是不是有点trick?

1
2
3
4
5
6
7
8
9
10
11
12
13
func overrideNotifyBool(ch chan bool, v bool) {
select {
case ch <- v:
// value sent, all done
case <-ch:
// channel had an old value
select {
case ch <- v:
default:
panic("race: channel was sent concurrently")
}
}
}

2、leaderLoop这个里面全是异步的chan,代码规划的非常好,很值得学习