From e01096f531862b982833732514376cead8d58e82 Mon Sep 17 00:00:00 2001
From: Martin Holst Swende <martin@swende.se>
Date: Wed, 17 Feb 2021 14:59:00 +0100
Subject: [PATCH] eth/handler, broadcast: optimize tx broadcast mechanism
 (#22176)

This PR optimizes the broadcast loop. Instead of iterating twice through a given set of transactions to weed out which peers have and which do not have a tx, to send/announce transactions, we do it only once.
---
 eth/handler.go                 | 56 ++++++++++++++++++----------------
 eth/protocols/eth/broadcast.go | 12 ++++----
 2 files changed, 36 insertions(+), 32 deletions(-)

diff --git a/eth/handler.go b/eth/handler.go
index a5a62b894..13fa70193 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -456,44 +456,51 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
 	}
 }
 
-// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to
+// BroadcastTransactions will propagate a batch of transactions
+// - To a square root of all peers
+// - And, separately, as announcements to all peers which are not known to
 // already have the given transaction.
-func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) {
+func (h *handler) BroadcastTransactions(txs types.Transactions) {
 	var (
-		txset = make(map[*ethPeer][]common.Hash)
-		annos = make(map[*ethPeer][]common.Hash)
+		annoCount   int // Count of announcements made
+		annoPeers   int
+		directCount int // Count of the txs sent directly to peers
+		directPeers int // Count of the peers that were sent transactions directly
+
+		txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
+		annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
+
 	)
 	// Broadcast transactions to a batch of peers not knowing about it
-	if propagate {
-		for _, tx := range txs {
-			peers := h.peers.peersWithoutTransaction(tx.Hash())
-
-			// Send the block to a subset of our peers
-			transfer := peers[:int(math.Sqrt(float64(len(peers))))]
-			for _, peer := range transfer {
-				txset[peer] = append(txset[peer], tx.Hash())
-			}
-			log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(transfer))
-		}
-		for peer, hashes := range txset {
-			peer.AsyncSendTransactions(hashes)
-		}
-		return
-	}
-	// Otherwise only broadcast the announcement to peers
 	for _, tx := range txs {
 		peers := h.peers.peersWithoutTransaction(tx.Hash())
-		for _, peer := range peers {
+		// Send the tx unconditionally to a subset of our peers
+		numDirect := int(math.Sqrt(float64(len(peers))))
+		for _, peer := range peers[:numDirect] {
+			txset[peer] = append(txset[peer], tx.Hash())
+		}
+		// For the remaining peers, send announcement only
+		for _, peer := range peers[numDirect:] {
 			annos[peer] = append(annos[peer], tx.Hash())
 		}
 	}
+	for peer, hashes := range txset {
+		directPeers++
+		directCount += len(hashes)
+		peer.AsyncSendTransactions(hashes)
+	}
 	for peer, hashes := range annos {
+		annoPeers++
+		annoCount += len(hashes)
 		if peer.Version() >= eth.ETH65 {
 			peer.AsyncSendPooledTransactionHashes(hashes)
 		} else {
 			peer.AsyncSendTransactions(hashes)
 		}
 	}
+	log.Debug("Transaction broadcast", "txs", len(txs),
+		"announce packs", annoPeers, "announced hashes", annoCount,
+		"tx packs", directPeers, "broadcast txs", directCount)
 }
 
 // minedBroadcastLoop sends mined blocks to connected peers.
@@ -511,13 +518,10 @@ func (h *handler) minedBroadcastLoop() {
 // txBroadcastLoop announces new transactions to connected peers.
 func (h *handler) txBroadcastLoop() {
 	defer h.wg.Done()
-
 	for {
 		select {
 		case event := <-h.txsCh:
-			h.BroadcastTransactions(event.Txs, true)  // First propagate transactions to peers
-			h.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
-
+			h.BroadcastTransactions(event.Txs)
 		case <-h.txsSub.Err():
 			return
 		}
diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go
index 74ec2f065..328396d51 100644
--- a/eth/protocols/eth/broadcast.go
+++ b/eth/protocols/eth/broadcast.go
@@ -142,18 +142,18 @@ func (p *Peer) announceTransactions() {
 		if done == nil && len(queue) > 0 {
 			// Pile transaction hashes until we reach our allowed network limit
 			var (
-				hashes  []common.Hash
+				count   int
 				pending []common.Hash
 				size    common.StorageSize
 			)
-			for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
-				if p.txpool.Get(queue[i]) != nil {
-					pending = append(pending, queue[i])
+			for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
+				if p.txpool.Get(queue[count]) != nil {
+					pending = append(pending, queue[count])
 					size += common.HashLength
 				}
-				hashes = append(hashes, queue[i])
 			}
-			queue = queue[:copy(queue, queue[len(hashes):])]
+			// Shift and trim queue
+			queue = queue[:copy(queue, queue[count:])]
 
 			// If there's anything available to transfer, fire up an async writer
 			if len(pending) > 0 {
-- 
GitLab