概述
接着上面–导读,本篇讲解Transport传输层实现。
如下NewRaft函数的参数中,除了Config结构,其他都是以interface形式实现。
接下来几篇我们分别对这5类接口进行详细分析。
1 | func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) { |
传输层涉及哪些文件
transport.go: 定义传输层接口,下图中的Transport Interface对象。
inmem_transport.go: 以内存的方式实现Transport接口,用于测试。
net_transport.go: 网络方式实现Transport接口。
tcp_transport.go: 以TCP的方式实现了NetworkTransport需要的SteamLayer。
command.go:这个文件里面定义了各种RPC request和response的结构,eg: AppendEntriesRequest、AppendEntriesResponse
总结,NetworkTransport和InmemTransport是对Transport层的具体实现,接下来我们对NetworkTransport进行详细分析。
传输层具体是怎么实现的呢?
从测试用例TestNetworkTransport_AppendEntries开始
测试用例一般能告诉我们怎么玩,而AppendEntries发送日志又是最常用的功能。因此,我们就从这里开始。
初始化消费者(trans1),这一步后面是要详细看的,所以我们标星。
启动消费者,监听rpcCh,也就是Consumer()返回的只读channel。🤔:rpc.Respond函数只是给respCh添加了一个元素,那谁来消费这个元素呢?
初始化生产者( trans2),同样使用newTCPTransport实现。
调用AppendEntries消息给消费者trans1,这里直接就获取返回了。为啥看着就是同步返回的呢? 按理说网络调用一般都是异步返回吧。这个星标步骤,我们后面来详细分析。
1 | func TestNetworkTransport_AppendEntries(t *testing.T) { |
makeTransport实现分析
先看下函数调用关系
newTCPTransport:就是tcp端口绑定,生成TCPStreamLayer(上面的类图中知道是NetworkTransport结构需要的成员)
NewNetworkTransportWithConfig:创建NetworkTransport对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19makeTransport
-> NewTCPTransportWithConfig
-> newTCPTransport // 负责进行tcp端口绑定,生成TCPStreamLayer,包装器
-> NewNetworkTransportWithConfig // 真正创建NetworkTransport对象
2. newTCPTransport
下面是newTCPTransport的实现,最后transportCreator就是网络层的创建器
```go
// ① 调用net库绑定tcp监听端口
list, err := net.Listen("tcp", bindAddr)
// ② 将TCPListener -> TCPStreamLayer(上面的类图中NetworkTransport需要实现StreamLayer接口)
stream := &TCPStreamLayer{
advertise: advertise,
listener: list.(*net.TCPListener),
}
// ③ 将上面的 TCPStreamLayer-> NetworkTransport,调用 NewNetworkTransportWithConfig
trans := transportCreator(stream)newNetworkTransport
下面是创建网络层的代码,一个Acceptor专门用来accetor连接,每个新的连接都会创建一个协程进行处理。
Main线程调用trans.listen()启动监听协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// ① 将上面的config注入进来,主要是Stream
trans := &NetworkTransport{
connPool: make(map[ServerAddress][]*netConn),
consumeCh: make(chan RPC),
logger: config.Logger,
maxPool: config.MaxPool,
shutdownCh: make(chan struct{}),
stream: config.Stream,
timeout: config.Timeout,
TimeoutScale: DefaultTimeoutScale,
serverAddressProvider: config.ServerAddressProvider,
}
// ② 设置Stream上下文
trans.setupStreamContext()
// ③ 启动一个后台线程,专门来处理连接
go trans.listen()Acceptor监听新连接,并启动处理协程
1
2
3
4
5
6
7
8func (n *NetworkTransport) listen() {
for {
// ① 接收新连接,这个底层是epoll实现,
conn, err := n.stream.Accept()
// ② 启动处理协程
go n.handleConn(n.getStreamContext(), conn)
}
}对连接进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
r := bufio.NewReaderSize(conn, connReceiveBufferSize)
w := bufio.NewWriter(conn)
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
enc := codec.NewEncoder(w, &codec.MsgpackHandle{})
for {
// ① 读数据 -- 解码 -- 处理 -- 编码
if err := n.handleCommand(r, dec, enc); err != nil {
...
}
// ② 返回
if err := w.Flush(); err != nil {
...
}
}
}handleCommand是核心的处理逻辑,这里通过chan让用户使用的时候感觉是同步的。
获取类型ReadByte,约定第一位表示的RPC类型,eg:AppendEntries RequestVote
定义respCh。
对不同类型的Request进行解码。
将解码出来的RPC消息放到消费channel中
等respCh返回处理后的结构
将结构进行编码。
下面是和别的协程进行交互的方式,实际上处理逻辑是外包出去进行处理的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, enc *codec.Encoder) error {
getTypeStart := time.Now()
// ① 获取类型,约定第一位表示的RPC类型,eg:AppendEntries RequestVote
rpcType, err := r.ReadByte()
// ② 定义respCh
respCh := make(chan RPCResponse, 1)
rpc := RPC{
RespChan: respCh,
}
// ③ 对不同类型的Request进行解码
switch rpcType {
case rpcAppendEntries:
var req AppendEntriesRequest
if err := dec.Decode(&req); err != nil {
return err
}
rpc.Command = &req
case rpcRequestVote:
var req RequestVoteRequest
if err := dec.Decode(&req); err != nil {
return err
}
rpc.Command = &req
...
}
// ④ 将解码出来的灌到Raft.consumeCh
select {
case n.consumeCh <- rpc:
...
}
// ⑤ 等结果,上面的consumeCh被Raft的runLeader之类的其他协程进行处理后,将结果塞会到respCh。
RESP:
select {
case resp := <-respCh:
// ⑥ 对结果进行编码
if err := enc.Encode(resp.Response); err != nil {
return err
}
}
return nil
}
AppendEntries实现分析
上面的传输层已经创建好了连接的处理器(消费者),现在需要消息的生产者,AppendEntries就能充当这一角色。
需要网络交互的RequestVote、AppendEntries、TimeoutNow都是调用的genericRPC实现的,genericRPC是真正执行请求的函数。
下面是一个genericRPC的流程:
① 从连接池获取连接对象,简单的维护了一个连接池。
② 在连接上发送RPC请求。
③ 解码Response,将conn返还连接池如果可以的话。
注意:这里发送请求之后,同步获取Response。如果是短链接,为了高效就不会直接在这里等结果了。
1 | func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { |
我们对比看看InmemTransport.makeRPC方法来体会其中的不同,这边获取结果需要从respCh读取。那么,这个Response是谁塞进去respCh的呢,毕竟我们只是获取了。
1 | func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) { |
思考:
1、上面的实现是一条一条的发,实际上为了高效,我们往往都是批处理的。
2、想RequestVote要给多个对象发投票消息,那么肯定不会发一个消息等一个结果,而是群发,然后处理结构。
AppendEntriesPipeline实现
同样的我们从测试用例开始,看看批处理是怎么玩的。主要看看批处理和单条处理的区别。
1 | // 初始化AppendEntriesPipeline对象,启动协程处理 inprogressCh -> doneCh |
下面是发消息的时候 非pipeline VS pipeline的区别
下面是解码消息的时候 非pipeline VS pipeline的区别,pipeline的decodeResponses方法是初始化AppendEntriesPipeline对象的时候就启动的协程。
这里有一个❓疑问关于net.Conn的 A->B->A,这个过程在网络上是两边可以同时发送数据吗?为啥上面的非pipeline模式,send之后马上就可以decode消息了,conn这里帮我们做了什么。
批处理的整体实现就是,启动一个协程监控inprogress的任务。然后开发批量的发消息,比如一次发10条,然后inprogress协程被激活,开始处理。将处理的结果放到doneCh,最后用户从doneCh获取消息即可。
RequestVote过程
上面我们分析了一对一pipeline发消息,下面我们来看看1:N,在集群里面给所有人发消息,并且处理结构
1 | func (r *Raft) electSelf() <-chan *voteResult { |
总结🤔
网络是一个很复杂的模块,后续可以看看比较经典的Redis、Nginx这些优秀组件的实现。
ProxySQL:惊群效应思考