启动server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 1// Start begins accepting connections from peers.
2func (s *server) Start() {
3 ....
4 go s.peerHandler()
5
6 if !cfg.DisableRPC {
7 s.wg.Add(1)
8
9 go s.rebroadcastHandler()
10
11 s.rpcServer.Start()
12 }
13
14 // Start the CPU miner if generation is enabled.
15 if cfg.Generate {
16 s.cpuMiner.Start()
17 }
18}
19 |
peerHandler负责启动节点的地址管理、同步管理、连接管理等,主要代码逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| 1func (s *server) peerHandler() {
2
3 s.addrManager.Start()
4 s.syncManager.Start()
5
6 srvrLog.Tracef("Starting peer handler")
7
8 state := &peerState{
9 inboundPeers: make(map[int32]*serverPeer),
10 persistentPeers: make(map[int32]*serverPeer),
11 outboundPeers: make(map[int32]*serverPeer),
12 banned: make(map[string]time.Time),
13 outboundGroups: make(map[string]int),
14 }
15
16 if !cfg.DisableDNSSeed {
17 connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices, btcdLookup, func(addrs []*wire.NetAddress) {
18 s.addrManager.AddAddresses(addrs, addrs[0])
19 })
20 }
21 go s.connManager.Start()
22
23out:
24 for {
25 select {
26 // New peers connected to the server.
27 case p := <-s.newPeers:
28 s.handleAddPeerMsg(state, p)
29
30 // Disconnected peers.
31 case p := <-s.donePeers:
32 s.handleDonePeerMsg(state, p)
33
34 // Block accepted in mainchain or orphan, update peer height.
35 case umsg := <-s.peerHeightsUpdate:
36 s.handleUpdatePeerHeights(state, umsg)
37
38 // Peer to ban.
39 case p := <-s.banPeers:
40 s.handleBanPeerMsg(state, p)
41
42 // New inventory to potentially be relayed to other peers.
43 case invMsg := <-s.relayInv:
44 s.handleRelayInvMsg(state, invMsg)
45
46 // Message to broadcast to all connected peers except those
47 // which are excluded by the message.
48 case bmsg := <-s.broadcast:
49 s.handleBroadcastMsg(state, &bmsg)
50
51 case qmsg := <-s.query:
52 s.handleQuery(state, qmsg)
53
54 case <-s.quit:
55 // Disconnect all peers on server shutdown.
56 state.forAllPeers(func(sp *serverPeer) {
57 srvrLog.Tracef("Shutdown peer %s", sp)
58 sp.Disconnect()
59 })
60 break out
61 }
62 }
63.............
64}
65 |
1.先执行节点地址管理
1 2
| 1s.addrManager.Start()
2 |
主要逻辑:
1 2 3 4 5 6 7
| 1func (a *AddrManager) Start() {
2.......
3 a.loadPeers() //从文件加载节点列表
4.......
5 go a.addressHandler()
6}
7 |
1 2
| 1go a.addressHandler()
2 |
定时保存地址列表到peer.json文件
2.启动同步管理
1 2
| 1s.syncManager.Start()
2 |
主要逻辑是调用blockHandler()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| 1func (sm *SyncManager) blockHandler() {
2out:
3 for {
4 select {
5 case m := <-sm.msgChan:
6 switch msg := m.(type) {
7 case *newPeerMsg:
8 sm.handleNewPeerMsg(msg.peer)
9
10 case *txMsg:
11 sm.handleTxMsg(msg)
12 msg.reply <- struct{}{}
13
14 case *blockMsg:
15 sm.handleBlockMsg(msg)
16 msg.reply <- struct{}{}
17
18 case *invMsg:
19 sm.handleInvMsg(msg)
20
21 case *headersMsg:
22 sm.handleHeadersMsg(msg)
23
24 case *donePeerMsg:
25 sm.handleDonePeerMsg(msg.peer)
26
27 case getSyncPeerMsg:
28 var peerID int32
29 if sm.syncPeer != nil {
30 peerID = sm.syncPeer.ID()
31 }
32 msg.reply <- peerID
33
34 case processBlockMsg:
35 _, isOrphan, err := sm.chain.ProcessBlock(
36 msg.block, msg.flags)
37 if err != nil {
38 msg.reply <- processBlockResponse{
39 isOrphan: false,
40 err: err,
41 }
42 }
43
44 msg.reply <- processBlockResponse{
45 isOrphan: isOrphan,
46 err: nil,
47 }
48
49 case isCurrentMsg:
50 msg.reply <- sm.current()
51
52 case pauseMsg:
53 // Wait until the sender unpauses the manager.
54 <-msg.unpause
55
56 default:
57 log.Warnf("Invalid message type in block "+
58 "handler: %T", msg)
59 }
60
61 case <-sm.quit:
62 break out
63 }
64 }
65
66 sm.wg.Done()
67 log.Trace("Block handler done")
68}
69 |
从通道case m := <-sm.msgChan:处理块消息,然后分发处理。
3.解析dns seed
1 2 3
| 1connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices,
2 btcdLookup, func(addrs []*wire.NetAddress) {
3 |
通过dns seed获得节点地址,将返回的节点地址添加到addrManager
4.连接管理
1 2
| 1go s.connManager.Start()
2 |
启动连接管理,主要逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 1func (cm *ConnManager) Start() {
2 ......
3 go cm.connHandler()
4
5 if cm.cfg.OnAccept != nil {
6 for _, listner := range cm.cfg.Listeners {
7 cm.wg.Add(1)
8 go cm.listenHandler(listner)
9 }
10 }
11
12 for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
13 go cm.NewConnReq()
14 }
15}
16 |
从cm.requests通道获得不同类型消息
1 2
| 1go cm.listenHandler(listner)
2 |
**go **cm.cfg.OnAccept(conn)
OnAccept即server的inboundPeerConnected
1 2 3 4 5 6 7 8
| 1func (s *server) inboundPeerConnected(conn net.Conn) {
2 sp := newServerPeer(s, false)
3 sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
4 sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
5 sp.AssociateConnection(conn)
6 go s.peerDoneHandler(sp)
7}
8 |
1 2
| 1sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
2 |
对peer进行初始配置,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| 1// newPeerConfig returns the configuration for the given serverPeer.
2func newPeerConfig(sp *serverPeer) *peer.Config {
3 return &peer.Config{
4 Listeners: peer.MessageListeners{
5 OnVersion: sp.OnVersion,
6 OnMemPool: sp.OnMemPool,
7 OnTx: sp.OnTx,
8 OnBlock: sp.OnBlock,
9 OnInv: sp.OnInv,
10 OnHeaders: sp.OnHeaders,
11 OnGetData: sp.OnGetData,
12 OnGetBlocks: sp.OnGetBlocks,
13 OnGetHeaders: sp.OnGetHeaders,
14 OnGetCFilters: sp.OnGetCFilters,
15 OnGetCFHeaders: sp.OnGetCFHeaders,
16 OnGetCFCheckpt: sp.OnGetCFCheckpt,
17 OnFeeFilter: sp.OnFeeFilter,
18 OnFilterAdd: sp.OnFilterAdd,
19 OnFilterClear: sp.OnFilterClear,
20 OnFilterLoad: sp.OnFilterLoad,
21 OnGetAddr: sp.OnGetAddr,
22 OnAddr: sp.OnAddr,
23 OnRead: sp.OnRead,
24 OnWrite: sp.OnWrite,
25
26 // Note: The reference client currently bans peers that send alerts
27 // not signed with its key. We could verify against their key, but
28 // since the reference client is currently unwilling to support
29 // other implementations' alert messages, we will not relay theirs.
30 OnAlert: nil,
31 },
32 NewestBlock: sp.newestBlock,
33 HostToNetAddress: sp.server.addrManager.HostToNetAddress,
34 Proxy: cfg.Proxy,
35 UserAgentName: userAgentName,
36 UserAgentVersion: userAgentVersion,
37 UserAgentComments: cfg.UserAgentComments,
38 ChainParams: sp.server.chainParams,
39 Services: sp.server.services,
40 DisableRelayTx: cfg.BlocksOnly,
41 ProtocolVersion: peer.MaxProtocolVersion,
42 }
43}
44 |
1 2
| 1以上On\*\*\*会在收到消息时处理。
2 |
handleConnected消息会调用
1 2
| 1go cm.cfg.OnConnection(connReq, msg.conn)
2 |
cm.cfg.OnConnection是在newserver时初始化的,即方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 1func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
2 sp := newServerPeer(s, c.Permanent)
3 p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
4 if err != nil {
5 srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
6 s.connManager.Disconnect(c.ID())
7 }
8 sp.Peer = p
9 sp.connReq = c
10 sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
11 sp.AssociateConnection(conn)
12 go s.peerDoneHandler(sp)
13 s.addrManager.Attempt(sp.NA())
14}
15 |
1 2
| 1sp.AssociateConnection(conn)
2 |
将sp(ServerPeer)对象与节点关联,并启动节点,主要逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13
| 1func (p *Peer) AssociateConnection(conn net.Conn) {
2 ....
3 p.conn = conn
4 p.timeConnected = time.Now()
5......
6 go func() {
7 if err := p.start(); err != nil {
8 log.Debugf("Cannot start peer %v: %v", p, err)
9 p.Disconnect()
10 }
11 }()
12}
13 |
异步启动节点p.start(),
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| 1func (p *Peer) start() error {
2 log.Tracef("Starting peer %s", p)
3
4 negotiateErr := make(chan error)
5 go func() {
6 if p.inbound {
7 negotiateErr <- p.negotiateInboundProtocol()
8 } else {
9 negotiateErr <- p.negotiateOutboundProtocol()
10 }
11 }()
12
13 // Negotiate the protocol within the specified negotiateTimeout.
14 select {
15 case err := <-negotiateErr:
16 if err != nil {
17 return err
18 }
19 case <-time.After(negotiateTimeout):
20 return errors.New("protocol negotiation timeout")
21 }
22 log.Debugf("Connected to %s", p.Addr())
23
24 // The protocol has been negotiated successfully so start processing input
25 // and output messages.
26 go p.stallHandler()
27 go p.inHandler()
28 go p.queueHandler()
29 go p.outHandler()
30 go p.pingHandler()
31
32 // Send our verack message now that the IO processing machinery has started.
33 p.QueueMessage(wire.NewMsgVerAck(), nil)
34 return nil
35}
36 |
节点是否失联的处理方法,维持一个deadline时间,每次连接请求会增大deadline时间
接受消息的处理方法,收到消息会调用p.cfg.Listeners.*****(p, msg)方法
维护发送队列,从outputQueue通道读取放到sendQueue通道
1 2
| 1此方法负责从sendQueue通道读取消息,然后发送消息p.writeMessage(msg.msg, msg.encoding)
2 |
1 2
| 1定时发送ping消息到outputQueue通道
2 |
本文作者:architect.bian,欢迎收藏,转载请保留原文地址并保留版权声明!谢谢~
还没完!往下看!!!