比特币全节点Go语言实现BTCD之网络连接过程

释放双眼,带上耳机,听听看~!

启动server:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1// Start begins accepting connections from peers.
2func (s *server) Start() {
3   ....
4   go s.peerHandler()
5
6   if !cfg.DisableRPC {
7      s.wg.Add(1)
8
9      go s.rebroadcastHandler()
10
11      s.rpcServer.Start()
12   }
13
14   // Start the CPU miner if generation is enabled.
15   if cfg.Generate {
16      s.cpuMiner.Start()
17   }
18}
19

peerHandler负责启动节点的地址管理、同步管理、连接管理等,主要代码逻辑:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
1func (s *server) peerHandler() {
2
3   s.addrManager.Start()
4   s.syncManager.Start()
5
6   srvrLog.Tracef("Starting peer handler")
7
8   state := &peerState{
9      inboundPeers:    make(map[int32]*serverPeer),
10      persistentPeers: make(map[int32]*serverPeer),
11      outboundPeers:   make(map[int32]*serverPeer),
12      banned:          make(map[string]time.Time),
13      outboundGroups:  make(map[string]int),
14   }
15
16   if !cfg.DisableDNSSeed {
17      connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices, btcdLookup, func(addrs []*wire.NetAddress) {
18            s.addrManager.AddAddresses(addrs, addrs[0])
19         })
20   }
21   go s.connManager.Start()
22
23out:
24   for {
25      select {
26      // New peers connected to the server.
27      case p := <-s.newPeers:
28         s.handleAddPeerMsg(state, p)
29
30      // Disconnected peers.
31      case p := <-s.donePeers:
32         s.handleDonePeerMsg(state, p)
33
34      // Block accepted in mainchain or orphan, update peer height.
35      case umsg := <-s.peerHeightsUpdate:
36         s.handleUpdatePeerHeights(state, umsg)
37
38      // Peer to ban.
39      case p := <-s.banPeers:
40         s.handleBanPeerMsg(state, p)
41
42      // New inventory to potentially be relayed to other peers.
43      case invMsg := <-s.relayInv:
44         s.handleRelayInvMsg(state, invMsg)
45
46      // Message to broadcast to all connected peers except those
47      // which are excluded by the message.
48      case bmsg := <-s.broadcast:
49         s.handleBroadcastMsg(state, &bmsg)
50
51      case qmsg := <-s.query:
52         s.handleQuery(state, qmsg)
53
54      case <-s.quit:
55         // Disconnect all peers on server shutdown.
56         state.forAllPeers(func(sp *serverPeer) {
57            srvrLog.Tracef("Shutdown peer %s", sp)
58            sp.Disconnect()
59         })
60         break out
61      }
62   }
63.............
64}
65

1.先执行节点地址管理


1
2
1s.addrManager.Start()
2

主要逻辑:


1
2
3
4
5
6
7
1func (a *AddrManager) Start() {
2.......
3   a.loadPeers() //从文件加载节点列表
4.......
5   go a.addressHandler()
6}
7

1
2
1go a.addressHandler()
2

定时保存地址列表到peer.json文件

2.启动同步管理


1
2
1s.syncManager.Start()
2

主要逻辑是调用blockHandler()方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
1func (sm *SyncManager) blockHandler() {
2out:
3   for {
4      select {
5      case m := <-sm.msgChan:
6         switch msg := m.(type) {
7         case *newPeerMsg:
8            sm.handleNewPeerMsg(msg.peer)
9
10         case *txMsg:
11            sm.handleTxMsg(msg)
12            msg.reply <- struct{}{}
13
14         case *blockMsg:
15            sm.handleBlockMsg(msg)
16            msg.reply <- struct{}{}
17
18         case *invMsg:
19            sm.handleInvMsg(msg)
20
21         case *headersMsg:
22            sm.handleHeadersMsg(msg)
23
24         case *donePeerMsg:
25            sm.handleDonePeerMsg(msg.peer)
26
27         case getSyncPeerMsg:
28            var peerID int32
29            if sm.syncPeer != nil {
30               peerID = sm.syncPeer.ID()
31            }
32            msg.reply <- peerID
33
34         case processBlockMsg:
35            _, isOrphan, err := sm.chain.ProcessBlock(
36               msg.block, msg.flags)
37            if err != nil {
38               msg.reply <- processBlockResponse{
39                  isOrphan: false,
40                  err:      err,
41               }
42            }
43
44            msg.reply <- processBlockResponse{
45               isOrphan: isOrphan,
46               err:      nil,
47            }
48
49         case isCurrentMsg:
50            msg.reply <- sm.current()
51
52         case pauseMsg:
53            // Wait until the sender unpauses the manager.
54            <-msg.unpause
55
56         default:
57            log.Warnf("Invalid message type in block "+
58               "handler: %T", msg)
59         }
60
61      case <-sm.quit:
62         break out
63      }
64   }
65
66   sm.wg.Done()
67   log.Trace("Block handler done")
68}
69

从通道case m := <-sm.msgChan:处理块消息,然后分发处理。

3.解析dns seed


1
2
3
1connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices,
2   btcdLookup, func(addrs []*wire.NetAddress) {
3

通过dns seed获得节点地址,将返回的节点地址添加到addrManager

4.连接管理


1
2
1go s.connManager.Start()
2

启动连接管理,主要逻辑:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1func (cm *ConnManager) Start() {
2   ......
3   go cm.connHandler()
4
5   if cm.cfg.OnAccept != nil {
6      for _, listner := range cm.cfg.Listeners {
7         cm.wg.Add(1)
8         go cm.listenHandler(listner)
9      }
10   }
11
12   for i := atomic.LoadUint64(&amp;cm.connReqCount); i &lt; uint64(cm.cfg.TargetOutbound); i++ {
13      go cm.NewConnReq()
14   }
15}
16

1
2
1go cm.connHandler()
2

从cm.requests通道获得不同类型消息


1
2
1go cm.listenHandler(listner)
2

1
2
1此方法会调用
2

**go **cm.cfg.OnAccept(conn)

OnAccept即server的inboundPeerConnected


1
2
3
4
5
6
7
8
1func (s *server) inboundPeerConnected(conn net.Conn) {
2   sp := newServerPeer(s, false)
3   sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
4   sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
5   sp.AssociateConnection(conn)
6   go s.peerDoneHandler(sp)
7}
8

1
2
1sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
2

对peer进行初始配置,配置如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1// newPeerConfig returns the configuration for the given serverPeer.
2func newPeerConfig(sp *serverPeer) *peer.Config {
3   return &amp;peer.Config{
4      Listeners: peer.MessageListeners{
5         OnVersion:      sp.OnVersion,
6         OnMemPool:      sp.OnMemPool,
7         OnTx:           sp.OnTx,
8         OnBlock:        sp.OnBlock,
9         OnInv:          sp.OnInv,
10         OnHeaders:      sp.OnHeaders,
11         OnGetData:      sp.OnGetData,
12         OnGetBlocks:    sp.OnGetBlocks,
13         OnGetHeaders:   sp.OnGetHeaders,
14         OnGetCFilters:  sp.OnGetCFilters,
15         OnGetCFHeaders: sp.OnGetCFHeaders,
16         OnGetCFCheckpt: sp.OnGetCFCheckpt,
17         OnFeeFilter:    sp.OnFeeFilter,
18         OnFilterAdd:    sp.OnFilterAdd,
19         OnFilterClear:  sp.OnFilterClear,
20         OnFilterLoad:   sp.OnFilterLoad,
21         OnGetAddr:      sp.OnGetAddr,
22         OnAddr:         sp.OnAddr,
23         OnRead:         sp.OnRead,
24         OnWrite:        sp.OnWrite,
25
26         // Note: The reference client currently bans peers that send alerts
27         // not signed with its key.  We could verify against their key, but
28         // since the reference client is currently unwilling to support
29         // other implementations&#x27; alert messages, we will not relay theirs.
30         OnAlert: nil,
31      },
32      NewestBlock:       sp.newestBlock,
33      HostToNetAddress:  sp.server.addrManager.HostToNetAddress,
34      Proxy:             cfg.Proxy,
35      UserAgentName:     userAgentName,
36      UserAgentVersion:  userAgentVersion,
37      UserAgentComments: cfg.UserAgentComments,
38      ChainParams:       sp.server.chainParams,
39      Services:          sp.server.services,
40      DisableRelayTx:    cfg.BlocksOnly,
41      ProtocolVersion:   peer.MaxProtocolVersion,
42   }
43}
44

1
2
1以上On\*\*\*会在收到消息时处理。
2

handleConnected消息会调用


1
2
1go cm.cfg.OnConnection(connReq, msg.conn)
2

cm.cfg.OnConnection是在newserver时初始化的,即方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
2   sp := newServerPeer(s, c.Permanent)
3   p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
4   if err != nil {
5      srvrLog.Debugf(&quot;Cannot create outbound peer %s: %v&quot;, c.Addr, err)
6      s.connManager.Disconnect(c.ID())
7   }
8   sp.Peer = p
9   sp.connReq = c
10   sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
11   sp.AssociateConnection(conn)
12   go s.peerDoneHandler(sp)
13   s.addrManager.Attempt(sp.NA())
14}
15

1
2
1sp.AssociateConnection(conn)
2

将sp(ServerPeer)对象与节点关联,并启动节点,主要逻辑:


1
2
3
4
5
6
7
8
9
10
11
12
13
1func (p *Peer) AssociateConnection(conn net.Conn) {
2   ....
3   p.conn = conn
4   p.timeConnected = time.Now()
5......
6   go func() {
7      if err := p.start(); err != nil {
8         log.Debugf(&quot;Cannot start peer %v: %v&quot;, p, err)
9         p.Disconnect()
10      }
11   }()
12}
13

异步启动节点p.start(),


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1func (p *Peer) start() error {
2   log.Tracef(&quot;Starting peer %s&quot;, p)
3
4   negotiateErr := make(chan error)
5   go func() {
6      if p.inbound {
7         negotiateErr &lt;- p.negotiateInboundProtocol()
8      } else {
9         negotiateErr &lt;- p.negotiateOutboundProtocol()
10      }
11   }()
12
13   // Negotiate the protocol within the specified negotiateTimeout.
14   select {
15   case err := &lt;-negotiateErr:
16      if err != nil {
17         return err
18      }
19   case &lt;-time.After(negotiateTimeout):
20      return errors.New(&quot;protocol negotiation timeout&quot;)
21   }
22   log.Debugf(&quot;Connected to %s&quot;, p.Addr())
23
24   // The protocol has been negotiated successfully so start processing input
25   // and output messages.
26   go p.stallHandler()
27   go p.inHandler()
28   go p.queueHandler()
29   go p.outHandler()
30   go p.pingHandler()
31
32   // Send our verack message now that the IO processing machinery has started.
33   p.QueueMessage(wire.NewMsgVerAck(), nil)
34   return nil
35}
36

1
2
1go p.stallHandler()
2

节点是否失联的处理方法,维持一个deadline时间,每次连接请求会增大deadline时间


1
2
1go p.inHandler()
2

接受消息的处理方法,收到消息会调用p.cfg.Listeners.*****(p, msg)方法


1
2
1go p.queueHandler()
2

维护发送队列,从outputQueue通道读取放到sendQueue通道


1
2
1go p.outHandler()
2

1
2
1此方法负责从sendQueue通道读取消息,然后发送消息p.writeMessage(msg.msg, msg.encoding)
2

1
2
1go p.pingHandler()
2

1
2
1定时发送ping消息到outputQueue通道
2

本文作者:architect.bian,欢迎收藏,转载请保留原文地址并保留版权声明!谢谢~
还没完!往下看!!!

给TA打赏
共{{data.count}}人
人已打赏
安全技术

c++编码规范

2022-1-11 12:36:11

安全资讯

腾讯股票全年涨24.7% 马化腾财富因此增加近250亿

2016-12-30 17:52:48

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索