Raft是干啥的? 动画说明
简述如下图:用户SET 5,3个不同节点都能获取SET 5的操作。所谓的最简单的分布式一致性。
Raft项目包含哪些模块? 项目有多少行? 分析下整个项目大概是1W多行,并不是很大,代码注释非常齐全,理解起来比较轻松。代码抽象得很好,非常值得学习。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 -> % cloc ./ 77 text files. 77 unique files. 8 files ignored. github.com/AlDanial/cloc v 1.90 T=0.09 s (814.1 files/s, 201614.7 lines/s) ------------------------------------------------------------------------------- Language files blank comment code ------------------------------------------------------------------------------- Go 57 2207 2786 11864 Markdown 4 106 0 255 YAML 4 26 29 141 XML 4 0 0 108 make 1 10 2 33 Bourne Shell 1 3 3 10 ------------------------------------------------------------------------------- SUM: 71 2352 2820 12411 -------------------------------------------------------------------------------
从哪里开始? 首先,我们来看下api.go/Raft
结构,表示的是一个Raft节点,最核心的数据结构。划分3大块
NewRaft节点需要的信息:下图中标红点的,几大核心组成。下一篇我们从网络层开始详细分析。
除了raftState,其他部分全是interface,模块抽象的很细致。
Leader操作相关:整个Raft最核心的就是Leader的状态转换过程。我们后续也会对这一部分的实现做详细说明。
一些异步操作:这一块是嵌入在各个模块的实现中的。项目里面一些chan的使用也是值得学习的。
新建Raft节点逻辑?
① 校验配置
② 获取当前任期,从StableStore中获取
③ 获取最近的日志号和具体Log,从LogStore中获取
④ 创建Buffer applyCh,用来应用Log到FSM。这里Future的应用可以学习下,后面写applyLog的时候可以详细分析。
⑤ 初始化Raft结构。
⑥ Set各种变量,这里可以研究下config是怎么实现线程安全的变更。
⑦ 尝试从备份中恢复,处理peer变更日志,eg:添加节点、删除节点。这里为啥只处理Peer变更信息,都已经将Log解析出来了,为啥不直接应用呢?
⑧ 给传输层注册心跳处理器。在这里初始化,而不是在goroutine里面处理,是为了避免队头阻塞。
⑨ 启动goroutine,如下启动了3个协程。
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func NewRaft (conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) { if err := ValidateConfig(conf); err != nil { return nil , err } currentTerm, err := stable.GetUint64(keyCurrentTerm) lastIndex, err := logs.LastIndex() if err = logs.GetLog(lastIndex, &lastLog); err != nil { return nil , fmt.Errorf("failed to get last log at index %d: %v" , lastIndex, err) } applyCh := make (chan *logFuture) if conf.BatchApplyCh { applyCh = make (chan *logFuture, conf.MaxAppendEntries) } r := &Raft{ ... } r.conf.Store(*conf) r.setState(Follower) r.setCurrentTerm(currentTerm) r.setLastLog(lastLog.Index, lastLog.Term) if err := r.restoreSnapshot(); err != nil { return nil , err } snapshotIndex, _ := r.getLastSnapshot() for index := snapshotIndex + 1 ; index <= lastLog.Index; index++ { var entry Log if err := r.logs.GetLog(index, &entry); err != nil { ... } if err := r.processConfigurationLogEntry(&entry); err != nil { return nil , err } } trans.SetHeartbeatHandler(r.processHeartbeat) r.goFunc(r.run) r.goFunc(r.runFSM) r.goFunc(r.runSnapshots) return r, nil }
至此,我们就将Raft节点启动起来了
技术点1:atomic.Value
1 2 3 4 conf atomic.Value r.conf.Store(*conf)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func TestAtomic (t *testing.T) { var wg sync.WaitGroup var gloMy = my{} for i := 0 ; i < 8 ; i++ { go func () { wg.Add(1 ) for { tmp := rand.Uint64() % 100 gloMy.v1 = tmp gloMy.v2 = fmt.Sprintf("s%d" , tmp) } wg.Done() }() } wg.Add(1 ) go func () { for { t.Log(gloMy.v1, gloMy.v2) } wg.Done() }() wg.Wait() } === RUN TestAtomic tmp_test.go :33 : 75 s64 tmp_test.go :33 : 63 s63 tmp_test.go :33 : 48 s48 tmp_test.go :33 : 5 s5 tmp_test.go :33 : 1 s1
推荐一篇文章 :文中将atomic的来龙去脉都解释了一遍。
技术点2:队头阻塞 1 2 trans.SetHeartbeatHandler(r.processHeartbeat)
小小疑问❓ Q: 为啥要叫FSM,我的理解FSM不就是类似于MySQL这种可以应用Log的地方吗?总感觉用状态机命名怪怪的,容易误会。
Q: Config 和 Configuration的区别? Config里面都是单个Raft节点的配置,比如超时之类的;Configuration是Raft Cluster相关的信息,比如有几个节点之类的。