Moonshine's Blog

日拱一卒无有尽,功不唐捐终入海

hashicorp:raft源码系列(2)--网络层

概述

接着上面–导读,本篇讲解Transport传输层实现。

如下NewRaft函数的参数中,除了Config结构,其他都是以interface形式实现。

接下来几篇我们分别对这5类接口进行详细分析。

1
2
3
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
......
}

传输层涉及哪些文件

  • transport.go: 定义传输层接口,下图中的Transport Interface对象。

  • inmem_transport.go: 以内存的方式实现Transport接口,用于测试。

  • net_transport.go: 网络方式实现Transport接口。

  • tcp_transport.go: 以TCP的方式实现了NetworkTransport需要的SteamLayer。

  • command.go:这个文件里面定义了各种RPC request和response的结构,eg: AppendEntriesRequest、AppendEntriesResponse

    ovlYo6.png

总结,NetworkTransport和InmemTransport是对Transport层的具体实现,接下来我们对NetworkTransport进行详细分析。

传输层具体是怎么实现的呢?

从测试用例TestNetworkTransport_AppendEntries开始

测试用例一般能告诉我们怎么玩,而AppendEntries发送日志又是最常用的功能。因此,我们就从这里开始。

ovW3X4.md.png

  1. 初始化消费者(trans1),这一步后面是要详细看的,所以我们标星。

  2. 启动消费者,监听rpcCh,也就是Consumer()返回的只读channel。🤔:rpc.Respond函数只是给respCh添加了一个元素,那谁来消费这个元素呢?

  3. 初始化生产者( trans2),同样使用newTCPTransport实现。

  4. 调用AppendEntries消息给消费者trans1,这里直接就获取返回了。为啥看着就是同步返回的呢? 按理说网络调用一般都是异步返回吧。这个星标步骤,我们后面来详细分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func TestNetworkTransport_AppendEntries(t *testing.T) {
for _, useAddrProvider := range []bool{true, false} {
// ①初始化trans1 -- 消费者
trans1, err := makeTransport(t, useAddrProvider, "localhost:0")

rpcCh := trans1.Consumer()

// 定义测试使用的RPC请求
args := AppendEntriesRequest{
...
}
resp := AppendEntriesResponse{
...
}

// ② 启动消费者监听
go func() {
select {
case rpc := <-rpcCh:
// 获取消息然后返回,注意Respond函数只是给RPC对象的RespChan添加一个值,那么谁来消费这和chan呢
rpc.Respond(&resp, nil)
...
}
}()

// ③ 初始化生产者 trans2
trans2, err := makeTransport(t, useAddrProvider, string(trans1.LocalAddr()))

// ④ 发送AppendEntries消息给trans1,这里直接就获取返回了。为啥看着就是同步返回的呢
if err := trans2.AppendEntries("id1", trans1.LocalAddr(), &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
}

makeTransport实现分析
  1. 先看下函数调用关系

    newTCPTransport:就是tcp端口绑定,生成TCPStreamLayer(上面的类图中知道是NetworkTransport结构需要的成员)

    NewNetworkTransportWithConfig:创建NetworkTransport对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
       makeTransport
    -> NewTCPTransportWithConfig
    -> newTCPTransport // 负责进行tcp端口绑定,生成TCPStreamLayer,包装器
    -> NewNetworkTransportWithConfig // 真正创建NetworkTransport对象

    2. newTCPTransport

    下面是newTCPTransport的实现,最后transportCreator就是网络层的创建器

    ```go
    // ① 调用net库绑定tcp监听端口
    list, err := net.Listen("tcp", bindAddr)
    // ② 将TCPListener -> TCPStreamLayer(上面的类图中NetworkTransport需要实现StreamLayer接口)
    stream := &TCPStreamLayer{
    advertise: advertise,
    listener: list.(*net.TCPListener),
    }
    // ③ 将上面的 TCPStreamLayer-> NetworkTransport,调用 NewNetworkTransportWithConfig
    trans := transportCreator(stream)
  2. newNetworkTransport

    下面是创建网络层的代码,一个Acceptor专门用来accetor连接,每个新的连接都会创建一个协程进行处理。

    T9BOV1.png

    • Main线程调用trans.listen()启动监听协程

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      // ① 将上面的config注入进来,主要是Stream
      trans := &NetworkTransport{
      connPool: make(map[ServerAddress][]*netConn),
      consumeCh: make(chan RPC),
      logger: config.Logger,
      maxPool: config.MaxPool,
      shutdownCh: make(chan struct{}),
      stream: config.Stream,
      timeout: config.Timeout,
      TimeoutScale: DefaultTimeoutScale,
      serverAddressProvider: config.ServerAddressProvider,
      }
      // ② 设置Stream上下文
      trans.setupStreamContext()
      // ③ 启动一个后台线程,专门来处理连接
      go trans.listen()
    • Acceptor监听新连接,并启动处理协程

      1
      2
      3
      4
      5
      6
      7
      8
      func (n *NetworkTransport) listen() {
      for {
      // ① 接收新连接,这个底层是epoll实现,
      conn, err := n.stream.Accept()
      // ② 启动处理协程
      go n.handleConn(n.getStreamContext(), conn)
      }
      }
    • 对连接进行处理

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
      r := bufio.NewReaderSize(conn, connReceiveBufferSize)
      w := bufio.NewWriter(conn)
      dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
      enc := codec.NewEncoder(w, &codec.MsgpackHandle{})

      for {
      // ① 读数据 -- 解码 -- 处理 -- 编码
      if err := n.handleCommand(r, dec, enc); err != nil {
      ...
      }
      // ② 返回
      if err := w.Flush(); err != nil {
      ...
      }
      }
      }

      handleCommand是核心的处理逻辑,这里通过chan让用户使用的时候感觉是同步的。

      1. 获取类型ReadByte,约定第一位表示的RPC类型,eg:AppendEntries RequestVote

      2. 定义respCh。

      3. 对不同类型的Request进行解码。

      4. 将解码出来的RPC消息放到消费channel中

      5. 等respCh返回处理后的结构

      6. 将结构进行编码。

        下面是和别的协程进行交互的方式,实际上处理逻辑是外包出去进行处理的。

      T9R9Hg.png

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, enc *codec.Encoder) error {
      getTypeStart := time.Now()

      // ① 获取类型,约定第一位表示的RPC类型,eg:AppendEntries RequestVote
      rpcType, err := r.ReadByte()

      // ② 定义respCh
      respCh := make(chan RPCResponse, 1)
      rpc := RPC{
      RespChan: respCh,
      }

      // ③ 对不同类型的Request进行解码
      switch rpcType {
      case rpcAppendEntries:
      var req AppendEntriesRequest
      if err := dec.Decode(&req); err != nil {
      return err
      }
      rpc.Command = &req

      case rpcRequestVote:
      var req RequestVoteRequest
      if err := dec.Decode(&req); err != nil {
      return err
      }
      rpc.Command = &req
      ...
      }

      // ④ 将解码出来的灌到Raft.consumeCh
      select {
      case n.consumeCh <- rpc:
      ...
      }

      // ⑤ 等结果,上面的consumeCh被Raft的runLeader之类的其他协程进行处理后,将结果塞会到respCh。
      RESP:
      select {
      case resp := <-respCh:
      // ⑥ 对结果进行编码
      if err := enc.Encode(resp.Response); err != nil {
      return err
      }
      }
      return nil
      }

AppendEntries实现分析

上面的传输层已经创建好了连接的处理器(消费者),现在需要消息的生产者,AppendEntries就能充当这一角色。

需要网络交互的RequestVote、AppendEntries、TimeoutNow都是调用的genericRPC实现的,genericRPC是真正执行请求的函数。

ovxupF.png

下面是一个genericRPC的流程:

① 从连接池获取连接对象,简单的维护了一个连接池。

② 在连接上发送RPC请求。

③ 解码Response,将conn返还连接池如果可以的话。

注意:这里发送请求之后,同步获取Response。如果是短链接,为了高效就不会直接在这里等结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error {
// ① 从连接池中获取连接,NetworkTransport维护了一个连接池
conn, err := n.getConnFromAddressProvider(id, target)

// ② 发送RPC请求
if err = sendRPC(conn, rpcType, args); err != nil {
return err
}

// ③ 解码Response,并且归还连接。注意:这里发送了请求之后,马上就解码Response了。想想我们的mysql客户端,(发消息,等结果),不可以一直发消息而不接收。
canReturn, err := decodeResponse(conn, resp)
if canReturn {
n.returnConn(conn)
}
return err
}

我们对比看看InmemTransport.makeRPC方法来体会其中的不同,这边获取结果需要从respCh读取。那么,这个Response是谁塞进去respCh的呢,毕竟我们只是获取了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
// 定义交互用的RPC对象
req := RPC{
Command: args,
Reader: r,
RespChan: respCh,
}

// 发送消息,直接将构造出来的req灌到peer.consumerCh消费channel,本来就是内存中的对象。
select {
case peer.consumerCh <- req:
...
}

// 等respCh结果,消费协程把RPC拿出来-处理-结果塞回到respCh。
select {
case rpcResp = <-respCh:
if rpcResp.Error != nil {
err = rpcResp.Error
}
....
}

思考:

1、上面的实现是一条一条的发,实际上为了高效,我们往往都是批处理的。

2、想RequestVote要给多个对象发投票消息,那么肯定不会发一个消息等一个结果,而是群发,然后处理结构。

AppendEntriesPipeline实现

同样的我们从测试用例开始,看看批处理是怎么玩的。主要看看批处理和单条处理的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 初始化AppendEntriesPipeline对象,启动协程处理 inprogressCh -> doneCh
pipeline, err := trans2.AppendEntriesPipeline("id1", trans1.LocalAddr())

// 发10个消息,并且将RPC加入到inprogressCh
for i := 0; i < 10; i++ {
out := new(AppendEntriesResponse)
if _, err := pipeline.AppendEntries(&args, out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Consumer()会返回doneCh,从doneCh获取数据,发了10个消息,所以需要获取10次结果
respCh := pipeline.Consumer()
for i := 0; i < 10; i++ {
select {
case ready := <-respCh:
...
}
}

下面是发消息的时候 非pipeline VS pipeline的区别

TCM7Us.md.png

下面是解码消息的时候 非pipeline VS pipeline的区别,pipeline的decodeResponses方法是初始化AppendEntriesPipeline对象的时候就启动的协程。

TClAFs.md.png

这里有一个❓疑问关于net.Conn的 A->B->A,这个过程在网络上是两边可以同时发送数据吗?为啥上面的非pipeline模式,send之后马上就可以decode消息了,conn这里帮我们做了什么。

批处理的整体实现就是,启动一个协程监控inprogress的任务。然后开发批量的发消息,比如一次发10条,然后inprogress协程被激活,开始处理。将处理的结果放到doneCh,最后用户从doneCh获取消息即可。

RequestVote过程

上面我们分析了一对一pipeline发消息,下面我们来看看1:N,在集群里面给所有人发消息,并且处理结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r *Raft) electSelf() <-chan *voteResult {
// ① 创建一个包含peers数量的respCh
respCh := make(chan *voteResult, len(r.configurations.latest.Servers))

// ② 并发发消息,然后将结果灌回到respCh。后面从respCh获取投票结果即可进行处理。
askPeer := func(peer Server) {
r.goFunc(func() {
resp := &voteResult{voterID: peer.ID}
err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse)
...
respCh <- resp
})
}

return respCh
}

总结🤔

网络是一个很复杂的模块,后续可以看看比较经典的Redis、Nginx这些优秀组件的实现。

ProxySQL:惊群效应思考