From 20de87c1bf25e516cf8cd304838c3e532425d497 Mon Sep 17 00:00:00 2001 From: gary rong <garyrong0905@gmail.com> Date: Wed, 24 Jun 2020 18:54:13 +0800 Subject: [PATCH] eth: don't block if transaction broadcast loop fails (#21255) * eth: don't block if transaction broadcast loop is returned * eth: kick out peer if we failed to send message * eth: address comment --- eth/handler.go | 2 +- eth/peer.go | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index d7be45ed1c..f84ed04a51 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -406,7 +406,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Make sure that we first exchange headers and only then announce transactions p.HandshakeOrderMux.Lock() // Register the peer locally - if err := pm.peers.Register(p); err != nil { + if err := pm.peers.Register(p, pm.removePeer); err != nil { p.Log().Error("Ethereum peer registration failed", "err", err) p.HandshakeOrderMux.Lock() return err diff --git a/eth/peer.go b/eth/peer.go index bd4c94285e..5307304fb7 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -132,17 +132,19 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(ha // broadcastBlocks is a write loop that multiplexes blocks and block accouncements // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. -func (p *peer) broadcastBlocks() { +func (p *peer) broadcastBlocks(removePeer func(string)) { for { select { case prop := <-p.queuedBlocks: if err := p.SendNewBlock(prop.block, prop.td); err != nil { + removePeer(p.id) return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) case block := <-p.queuedBlockAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { + removePeer(p.id) return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) @@ -156,7 +158,7 @@ func (p *peer) broadcastBlocks() { // broadcastTransactions is a write loop that schedules transaction broadcasts // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. -func (p *peer) broadcastTransactions() { +func (p *peer) broadcastTransactions(removePeer func(string)) { var ( queue []common.Hash // Queue of hashes to broadcast as full transactions done chan struct{} // Non-nil if background broadcaster is running @@ -207,6 +209,7 @@ func (p *peer) broadcastTransactions() { done = nil case <-fail: + removePeer(p.id) return case <-p.term: @@ -218,7 +221,7 @@ func (p *peer) broadcastTransactions() { // announceTransactions is a write loop that schedules transaction broadcasts // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. -func (p *peer) announceTransactions() { +func (p *peer) announceTransactions(removePeer func(string)) { var ( queue []common.Hash // Queue of hashes to announce as transaction stubs done chan struct{} // Non-nil if background announcer is running @@ -277,6 +280,7 @@ func (p *peer) announceTransactions() { done = nil case <-fail: + removePeer(p.id) return case <-p.term: @@ -677,7 +681,7 @@ func newPeerSet() *peerSet { // Register injects a new peer into the working set, or returns an error if the // peer is already known. If a new peer it registered, its broadcast loop is also // started. -func (ps *peerSet) Register(p *peer) error { +func (ps *peerSet) Register(p *peer, removePeer func(string)) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -689,10 +693,10 @@ func (ps *peerSet) Register(p *peer) error { } ps.peers[p.id] = p - go p.broadcastBlocks() - go p.broadcastTransactions() + go p.broadcastBlocks(removePeer) + go p.broadcastTransactions(removePeer) if p.version >= eth65 { - go p.announceTransactions() + go p.announceTransactions(removePeer) } return nil } -- GitLab