概述
接着上一篇我们讲了网络层,接下来我们来讲LogStore、StableStore、FSM、SnapshotStore
,为啥这些一起讲呢,因为这些都是存储相关,并且功能相对简单,并没有像Transport
那样有NetworkTransport
实现可以用来进行分析。
1
| func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) { ......}
|
StableStore
这个组件主要就是为了安全考虑而存在的,提供一个可以持久化k-v的存储的即可。
1 2 3 4 5 6 7 8 9
| type StableStore interface { Set(key []byte, val []byte) error Get(key []byte) ([]byte, error)
SetUint64(key []byte, val uint64) error GetUint64(key []byte) (uint64, error) }
|
LogStore
这个组件就是为了存日志的,比如用户发过来set a=1
, 将这个存起来即可。和上面的StableStore
能力是同一种,不过日志可能比较多,需要根据实现情况进行选择。项目在测试的时候实现了一个inmem
的日志存储,用了一个map
来进行存储。
1 2 3 4 5 6 7 8 9 10
| func NewInmemStore() *InmemStore { i := &InmemStore{ logs: make(map[uint64]*Log), kv: make(map[string][]byte), kvInt: make(map[string]uint64), } return i }
|
FSM
FMS的功能,②+③不是Raft的核心逻辑。Raft启动时候会有一个后台协程runFSM
负责来做下面的三件事,Main线程将应用日志请求放到fsmMutateCh
里面,将打快照的请求放到fsmSnapshotCh
里面,不阻塞Main
① 应用日志,raft log多数派已经确认,就可以应用到底层存储了。
② 快照
③ 节点恢复
1 2 3 4 5
| type FSM interface { Apply(*Log) interface{} Snapshot() (FSMSnapshot, error) Restore(io.ReadCloser) error }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (r *Raft) runFSM() { commitSingle := func(req *commitTuple){...} commitBatch := func(reqs []*commitTuple){...} restore := func(req *restoreFuture){...} snapshot := func(req *reqSnapshotFuture){...} for { select { case ptr := <-r.fsmMutateCh: switch req := ptr.(type) { case []*commitTuple: commitBatch(req)
case *restoreFuture: restore(req)
default: panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr)) }
case req := <-r.fsmSnapshotCh: snapshot(req)
case <-r.shutdownCh: return } } }
|
snapshot不是重点,所以先简单介绍下,后续重读的时候补上这一模块。下一篇我们来看最重要的Raft的逻辑。