From fe9f841172c79fe72ee0ae34698892e7cc6d34ab Mon Sep 17 00:00:00 2001
From: Andrew Ashikhmin <34320705+yperbasis@users.noreply.github.com>
Date: Fri, 11 Feb 2022 13:24:29 +0100
Subject: [PATCH] Add Beacon payloads to body downloader prefetch (#3482)

* Block instead of PayloadMessage

* tx.MarshalBinary instead of rlp.Encode in StartProposer

* Add Beacon payloads to body downloader prefetch

* Restore PayloadMessage
---
 eth/stagedsync/stage_headers.go | 31 +++++++----
 ethdb/privateapi/ethbackend.go  |  7 +--
 turbo/stages/mock_sentry.go     | 62 ++++++++++++----------
 turbo/stages/stageloop.go       | 94 ++++++++++++++++++---------------
 4 files changed, 108 insertions(+), 86 deletions(-)

diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go
index 6bab3dd74c..8369d10407 100644
--- a/eth/stagedsync/stage_headers.go
+++ b/eth/stagedsync/stage_headers.go
@@ -30,6 +30,7 @@ import (
 	"github.com/ledgerwatch/erigon/rlp"
 	"github.com/ledgerwatch/erigon/turbo/snapshotsync"
 	"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
+	"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
 	"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
 	"github.com/ledgerwatch/log/v3"
 )
@@ -41,6 +42,7 @@ const ShortPoSReorgThresholdBlocks = 10
 type HeadersCfg struct {
 	db                    kv.RwDB
 	hd                    *headerdownload.HeaderDownload
+	bodyDownload          *bodydownload.BodyDownload
 	chainConfig           params.ChainConfig
 	headerReqSend         func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool)
 	announceNewHashes     func(context.Context, []headerdownload.Announce)
@@ -61,6 +63,7 @@ type HeadersCfg struct {
 func StageHeadersCfg(
 	db kv.RwDB,
 	headerDownload *headerdownload.HeaderDownload,
+	bodyDownload *bodydownload.BodyDownload,
 	chainConfig params.ChainConfig,
 	headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool),
 	announceNewHashes func(context.Context, []headerdownload.Announce),
@@ -78,6 +81,7 @@ func StageHeadersCfg(
 	return HeadersCfg{
 		db:                    db,
 		hd:                    headerDownload,
+		bodyDownload:          bodyDownload,
 		chainConfig:           chainConfig,
 		headerReqSend:         headerReqSend,
 		announceNewHashes:     announceNewHashes,
@@ -220,6 +224,7 @@ func handleForkChoice(
 			return nil
 		}
 
+		// FIXME(yperbasis): HeaderNumber is only populated at Stage 3
 		header, err = rawdb.ReadHeaderByHash(tx, headerHash)
 		if err != nil {
 			return err
@@ -229,13 +234,7 @@ func handleForkChoice(
 	headerNumber := header.Number.Uint64()
 	cfg.hd.UpdateTopSeenHeightPoS(headerNumber)
 
-	parent, err := rawdb.ReadHeaderByHash(tx, header.ParentHash)
-	if err != nil {
-		if !repliedWithSyncStatus {
-			cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
-		}
-		return err
-	}
+	parent := rawdb.ReadHeader(tx, header.ParentHash, headerNumber-1)
 
 	forkingPoint, err := headerInserter.ForkingPoint(tx, header, parent)
 	if err != nil {
@@ -304,10 +303,17 @@ func handleNewPayload(
 	}
 
 	// If we have the parent then we can move on with the stagedsync
-	parent, err := rawdb.ReadHeaderByHash(tx, header.ParentHash)
+	parent := rawdb.ReadHeader(tx, header.ParentHash, headerNumber-1)
+
+	transactions, err := types.DecodeTransactions(payloadMessage.Body.Transactions)
 	if err != nil {
-		cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
-		return err
+		log.Warn("Error during Beacon transaction decoding", "err", err.Error())
+		cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{
+			Status:          remote.EngineStatus_INVALID,
+			LatestValidHash: header.ParentHash, // TODO(yperbasis): potentially wrong when parent is nil
+			ValidationError: err,
+		}
+		return nil
 	}
 
 	if parent != nil {
@@ -336,7 +342,10 @@ func handleNewPayload(
 		}
 	}
 
-	// TODO(yperbasis): bodyDownloader.AddToPrefetch(block)
+	if cfg.bodyDownload != nil {
+		block := types.NewBlockFromStorage(headerHash, header, transactions, nil)
+		cfg.bodyDownload.AddToPrefetch(block)
+	}
 
 	return nil
 }
diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go
index 1a95b4ef79..f3c4f79d56 100644
--- a/ethdb/privateapi/ethbackend.go
+++ b/ethdb/privateapi/ethbackend.go
@@ -440,10 +440,11 @@ func (s *EthBackendServer) StartProposer() {
 
 				for _, tx := range block.Transactions() {
 					buf.Reset()
-
-					err := rlp.Encode(buf, tx)
+					// EIP-2718 txn shouldn't be additionally wrapped as RLP strings,
+					// so MarshalBinary instead of rlp.Encode
+					err := tx.MarshalBinary(buf)
 					if err != nil {
-						log.Warn("Broken tx rlp", "err", err.Error())
+						log.Warn("Failed to marshal transaction", "err", err.Error())
 						return
 					}
 					encodedTransactions = append(encodedTransactions, common.CopyBytes(buf.Bytes()))
diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go
index e45edce86a..914c7e9193 100644
--- a/turbo/stages/mock_sentry.go
+++ b/turbo/stages/mock_sentry.go
@@ -276,34 +276,40 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 
 	isBor := mock.ChainConfig.Bor != nil
 	mock.Sync = stagedsync.New(
-		stagedsync.DefaultStages(mock.Ctx, prune, stagedsync.StageHeadersCfg(
-			mock.DB,
-			mock.downloader.Hd,
-			*mock.ChainConfig,
-			sendHeaderRequest,
-			propagateNewBlockHashes,
-			penalize,
-			cfg.BatchSize,
-			false,
-			nil,
-			nil,
-			nil,
-			allSnapshots,
-			snapshotsDownloader,
-			blockReader,
-			mock.tmpdir,
-		), stagedsync.StageCumulativeIndexCfg(mock.DB), stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig), stagedsync.StageBodiesCfg(
-			mock.DB,
-			mock.downloader.Bd,
-			sendBodyRequest,
-			penalize,
-			blockPropagator,
-			cfg.BodyDownloadTimeoutSeconds,
-			*mock.ChainConfig,
-			cfg.BatchSize,
-			allSnapshots,
-			blockReader,
-		), stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
+		stagedsync.DefaultStages(mock.Ctx, prune,
+			stagedsync.StageHeadersCfg(
+				mock.DB,
+				mock.downloader.Hd,
+				mock.downloader.Bd,
+				*mock.ChainConfig,
+				sendHeaderRequest,
+				propagateNewBlockHashes,
+				penalize,
+				cfg.BatchSize,
+				false,
+				nil,
+				nil,
+				nil,
+				allSnapshots,
+				snapshotsDownloader,
+				blockReader,
+				mock.tmpdir,
+			),
+			stagedsync.StageCumulativeIndexCfg(mock.DB),
+			stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig),
+			stagedsync.StageBodiesCfg(
+				mock.DB,
+				mock.downloader.Bd,
+				sendBodyRequest,
+				penalize,
+				blockPropagator,
+				cfg.BodyDownloadTimeoutSeconds,
+				*mock.ChainConfig,
+				cfg.BatchSize,
+				allSnapshots,
+				blockReader,
+			),
+			stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
 			stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, allSnapshots),
 			stagedsync.StageExecuteBlocksCfg(
 				mock.DB,
diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go
index b6cf757ab1..0c5cc0cbed 100644
--- a/turbo/stages/stageloop.go
+++ b/turbo/stages/stageloop.go
@@ -268,50 +268,56 @@ func NewStagedSync(
 	runInTestMode := cfg.ImportMode
 	isBor := controlServer.ChainConfig.Bor != nil
 	return stagedsync.New(
-		stagedsync.DefaultStages(ctx, cfg.Prune, stagedsync.StageHeadersCfg(
-			db,
-			controlServer.Hd,
-			*controlServer.ChainConfig,
-			controlServer.SendHeaderRequest,
-			controlServer.PropagateNewBlockHashes,
-			controlServer.Penalize,
-			cfg.BatchSize,
-			p2pCfg.NoDiscovery,
-			newPayloadCh,
-			forkChoiceCh,
-			waitingForBeaconChain,
-			allSnapshots,
-			snapshotDownloader,
-			blockReader,
-			tmpdir,
-		), stagedsync.StageCumulativeIndexCfg(db), stagedsync.StageBlockHashesCfg(db, tmpdir, controlServer.ChainConfig), stagedsync.StageBodiesCfg(
-			db,
-			controlServer.Bd,
-			controlServer.SendBodyRequest,
-			controlServer.Penalize,
-			controlServer.BroadcastNewBlock,
-			cfg.BodyDownloadTimeoutSeconds,
-			*controlServer.ChainConfig,
-			cfg.BatchSize,
-			allSnapshots,
-			blockReader,
-		), stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, allSnapshots), stagedsync.StageExecuteBlocksCfg(
-			db,
-			cfg.Prune,
-			cfg.BatchSize,
-			nil,
-			controlServer.ChainConfig,
-			controlServer.Engine,
-			&vm.Config{EnableTEMV: cfg.Prune.Experiments.TEVM},
-			accumulator,
-			cfg.StateStream,
-			tmpdir,
-			blockReader,
-		), stagedsync.StageTranspileCfg(
-			db,
-			cfg.BatchSize,
-			controlServer.ChainConfig,
-		), stagedsync.StageHashStateCfg(db, tmpdir),
+		stagedsync.DefaultStages(ctx, cfg.Prune,
+			stagedsync.StageHeadersCfg(
+				db,
+				controlServer.Hd,
+				controlServer.Bd,
+				*controlServer.ChainConfig,
+				controlServer.SendHeaderRequest,
+				controlServer.PropagateNewBlockHashes,
+				controlServer.Penalize,
+				cfg.BatchSize,
+				p2pCfg.NoDiscovery,
+				newPayloadCh,
+				forkChoiceCh,
+				waitingForBeaconChain,
+				allSnapshots,
+				snapshotDownloader,
+				blockReader,
+				tmpdir,
+			),
+			stagedsync.StageCumulativeIndexCfg(db),
+			stagedsync.StageBlockHashesCfg(db, tmpdir, controlServer.ChainConfig),
+			stagedsync.StageBodiesCfg(
+				db,
+				controlServer.Bd,
+				controlServer.SendBodyRequest,
+				controlServer.Penalize,
+				controlServer.BroadcastNewBlock,
+				cfg.BodyDownloadTimeoutSeconds,
+				*controlServer.ChainConfig,
+				cfg.BatchSize,
+				allSnapshots,
+				blockReader,
+			),
+			stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance),
+			stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, allSnapshots),
+			stagedsync.StageExecuteBlocksCfg(
+				db,
+				cfg.Prune,
+				cfg.BatchSize,
+				nil,
+				controlServer.ChainConfig,
+				controlServer.Engine,
+				&vm.Config{EnableTEMV: cfg.Prune.Experiments.TEVM},
+				accumulator,
+				cfg.StateStream,
+				tmpdir,
+				blockReader,
+			),
+			stagedsync.StageTranspileCfg(db, cfg.BatchSize, controlServer.ChainConfig),
+			stagedsync.StageHashStateCfg(db, tmpdir),
 			stagedsync.StageTrieCfg(db, true, true, tmpdir, blockReader),
 			stagedsync.StageHistoryCfg(db, cfg.Prune, tmpdir),
 			stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir),
-- 
GitLab