From f3ce1ce423de3ad7d9eb5d734263b62beec1a386 Mon Sep 17 00:00:00 2001
From: Igor Mandrigin <mandrigin@users.noreply.github.com>
Date: Mon, 12 Oct 2020 15:58:45 +0200
Subject: [PATCH] turbo-api: add a plug-in mechanism for possible Bloom and
 other filters (#1229)

* Add a convenience method to replace stages

* allow more parameters to the execution function

* add a plug-in point for stuff like bloom filters, etc

* add comments

* fixups
---
 cmd/integration/commands/stages.go           | 11 +++-
 cmd/integration/commands/state_stages.go     | 12 ++++-
 cmd/tg/main.go                               |  1 +
 cmd/tgcustom/main.go                         |  8 +++
 eth/downloader/downloader_stagedsync_test.go |  1 +
 eth/handler.go                               |  2 +-
 eth/stagedsync/all_stages.go                 |  2 +-
 eth/stagedsync/stage_execute.go              | 35 +++++++++----
 eth/stagedsync/stagebuilder.go               | 48 +++++++++++++++---
 eth/stagedsync/stagedsync.go                 | 53 +++++++++++++-------
 10 files changed, 135 insertions(+), 38 deletions(-)

diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go
index 60aba1262d..e6abc44bbd 100644
--- a/cmd/integration/commands/stages.go
+++ b/cmd/integration/commands/stages.go
@@ -265,7 +265,15 @@ func stageExec(ctx context.Context) error {
 		u := &stagedsync.UnwindState{Stage: stages.Execution, UnwindPoint: stage4.BlockNumber - unwind}
 		return stagedsync.UnwindExecutionStage(u, stage4, db, false)
 	}
-	return stagedsync.SpawnExecuteBlocksStage(stage4, db, bc.Config(), bc, bc.GetVMConfig(), block, ch, sm.Receipts, hdd, nil)
+	return stagedsync.SpawnExecuteBlocksStage(stage4, db,
+		bc.Config(), bc, bc.GetVMConfig(),
+		ch,
+		stagedsync.ExecuteBlockStageParams{
+			ToBlock:       block, // limit execution to the specified block
+			WriteReceipts: sm.Receipts,
+			Hdd:           hdd,
+		})
+
 }
 
 func stageIHash(ctx context.Context) error {
@@ -480,6 +488,7 @@ func newSync(quitCh <-chan struct{}, db ethdb.Database, tx ethdb.Database, hook
 	st, err := stagedsync.New(
 		stagedsync.DefaultStages(),
 		stagedsync.DefaultUnwindOrder(),
+		stagedsync.OptionalParameters{},
 	).Prepare(nil, chainConfig, bc, bc.GetVMConfig(), db, tx, "integration_test", sm, "", false, quitCh, nil, nil, func() error { return nil }, hook)
 	if err != nil {
 		panic(err)
diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go
index 61dd0a64d5..65ea7d44bd 100644
--- a/cmd/integration/commands/state_stages.go
+++ b/cmd/integration/commands/state_stages.go
@@ -5,6 +5,7 @@ import (
 	"context"
 	"errors"
 	"fmt"
+
 	"github.com/ledgerwatch/turbo-geth/cmd/utils"
 	"github.com/ledgerwatch/turbo-geth/common/dbutils"
 
@@ -136,7 +137,16 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
 
 		// set block limit of execute stage
 		st.MockExecFunc(stages.Execution, func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder) error {
-			if err := stagedsync.SpawnExecuteBlocksStage(stageState, tx, bc.Config(), bc, bc.GetVMConfig(), execToBlock, ch, sm.Receipts, hdd, changeSetHook); err != nil {
+			if err := stagedsync.SpawnExecuteBlocksStage(
+				stageState, tx,
+				bc.Config(), bc, bc.GetVMConfig(),
+				ch,
+				stagedsync.ExecuteBlockStageParams{
+					ToBlock:       execToBlock, // limit execution to the specified block
+					WriteReceipts: sm.Receipts,
+					Hdd:           hdd,
+					ChangeSetHook: changeSetHook,
+				}); err != nil {
 				return fmt.Errorf("spawnExecuteBlocksStage: %w", err)
 			}
 			return nil
diff --git a/cmd/tg/main.go b/cmd/tg/main.go
index 3cb6d19235..da827102e0 100644
--- a/cmd/tg/main.go
+++ b/cmd/tg/main.go
@@ -32,6 +32,7 @@ func runTurboGeth(cliCtx *cli.Context) {
 	sync := stagedsync.New(
 		stagedsync.DefaultStages(),
 		stagedsync.DefaultUnwindOrder(),
+		stagedsync.OptionalParameters{},
 	)
 
 	ctx := utils.RootContext()
diff --git a/cmd/tgcustom/main.go b/cmd/tgcustom/main.go
index 016e054263..b4d8f02d5b 100644
--- a/cmd/tgcustom/main.go
+++ b/cmd/tgcustom/main.go
@@ -5,8 +5,10 @@ import (
 	"os"
 
 	"github.com/ledgerwatch/turbo-geth/common/dbutils"
+	"github.com/ledgerwatch/turbo-geth/core/state"
 	"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
 	"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
+	"github.com/ledgerwatch/turbo-geth/ethdb"
 	"github.com/ledgerwatch/turbo-geth/log"
 	"github.com/ledgerwatch/turbo-geth/turbo/node"
 
@@ -72,6 +74,12 @@ func runTurboGeth(ctx *cli.Context) {
 	sync := stagedsync.New(
 		syncStages(ctx),
 		stagedsync.DefaultUnwindOrder(),
+		stagedsync.OptionalParameters{
+			StateReaderBuilder: func(getter ethdb.Getter) state.StateReader {
+				// put your custom caching code here
+				return state.NewPlainStateReader(getter)
+			},
+		},
 	)
 
 	// running a node and initializing a custom bucket with all default settings
diff --git a/eth/downloader/downloader_stagedsync_test.go b/eth/downloader/downloader_stagedsync_test.go
index f86b28e455..26cdf0b7cd 100644
--- a/eth/downloader/downloader_stagedsync_test.go
+++ b/eth/downloader/downloader_stagedsync_test.go
@@ -46,6 +46,7 @@ func newStagedSyncTester() (*stagedSyncTester, func()) {
 		stagedsync.New(
 			stagedsync.DefaultStages(),
 			stagedsync.DefaultUnwindOrder(),
+			stagedsync.OptionalParameters{},
 		),
 	)
 	clear := func() {
diff --git a/eth/handler.go b/eth/handler.go
index bb61e5de05..7ab804202b 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -112,7 +112,7 @@ type ProtocolManager struct {
 func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb *ethdb.ObjectDatabase, whitelist map[uint64]common.Hash, stagedSync *stagedsync.StagedSync) (*ProtocolManager, error) {
 	// Create the protocol manager with the base fields
 	if stagedSync == nil {
-		stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder())
+		stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{})
 	}
 	manager := &ProtocolManager{
 		networkID:   networkID,
diff --git a/eth/stagedsync/all_stages.go b/eth/stagedsync/all_stages.go
index 264209defe..4837afc27a 100644
--- a/eth/stagedsync/all_stages.go
+++ b/eth/stagedsync/all_stages.go
@@ -73,7 +73,7 @@ func InsertBlockInStages(db ethdb.Database, config *params.ChainConfig, engine c
 	if err := SpawnExecuteBlocksStage(&StageState{
 		Stage:       stages.Execution,
 		BlockNumber: num - 1,
-	}, db, config, bc, bc.GetVMConfig(), 0, nil, true, false, nil); err != nil {
+	}, db, config, bc, bc.GetVMConfig(), nil, ExecuteBlockStageParams{WriteReceipts: true}); err != nil {
 		return err
 	}
 
diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go
index 76e8cd6e2e..cae2bb918f 100644
--- a/eth/stagedsync/stage_execute.go
+++ b/eth/stagedsync/stage_execute.go
@@ -4,12 +4,13 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
 	"os"
 	"runtime"
 	"runtime/pprof"
 	"time"
 
+	"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
+
 	"github.com/ledgerwatch/turbo-geth/common"
 	"github.com/ledgerwatch/turbo-geth/common/dbutils"
 	"github.com/ledgerwatch/turbo-geth/core"
@@ -34,14 +35,24 @@ type HasChangeSetWriter interface {
 
 type ChangeSetHook func(blockNum uint64, wr *state.ChangeSetWriter)
 
-func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, vmConfig *vm.Config, toBlock uint64, quit <-chan struct{}, writeReceipts bool, hdd bool, changeSetHook ChangeSetHook) error {
+type StateReaderBuilder func(ethdb.Getter) state.StateReader
+
+type ExecuteBlockStageParams struct {
+	ToBlock       uint64 // not setting this params means no limit
+	WriteReceipts bool
+	Hdd           bool
+	ChangeSetHook ChangeSetHook
+	ReaderBuilder StateReaderBuilder
+}
+
+func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig *params.ChainConfig, chainContext core.ChainContext, vmConfig *vm.Config, quit <-chan struct{}, params ExecuteBlockStageParams) error {
 	prevStageProgress, _, errStart := stages.GetStageProgress(stateDB, stages.Senders)
 	if errStart != nil {
 		return errStart
 	}
 	var to = prevStageProgress
-	if toBlock > 0 {
-		to = min(prevStageProgress, toBlock)
+	if params.ToBlock > 0 {
+		to = min(prevStageProgress, params.ToBlock)
 	}
 	if to <= s.BlockNumber {
 		s.Done()
@@ -85,7 +96,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 	stageProgress := s.BlockNumber
 	logBlock := stageProgress
 	// Warmup only works for HDD sync, and for long ranges
-	var warmup = hdd && (to-s.BlockNumber) > 30000
+	var warmup = params.Hdd && (to-s.BlockNumber) > 30000
 
 	for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
 		if err := common.Stopped(quit); err != nil {
@@ -122,7 +133,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 		var stateReader state.StateReader
 		var stateWriter state.WriterWithChangeSets
 
-		stateReader = state.NewPlainStateReader(batch)
+		if params.ReaderBuilder != nil {
+			stateReader = params.ReaderBuilder(batch)
+		} else {
+			stateReader = state.NewPlainStateReader(batch)
+		}
 		stateWriter = state.NewPlainStateWriter(batch, tx, blockNum)
 
 		// where the magic happens
@@ -131,7 +146,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 			return err
 		}
 
-		if writeReceipts {
+		if params.WriteReceipts {
 			if err = appendReceipts(tx, receipts, block.NumberU64(), block.Hash()); err != nil {
 				return err
 			}
@@ -149,7 +164,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 					return err
 				}
 			}
-			warmup = hdd && (to-blockNum) > 30000
+			warmup = params.Hdd && (to-blockNum) > 30000
 		}
 
 		if prof {
@@ -159,9 +174,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 			}
 		}
 
-		if changeSetHook != nil {
+		if params.ChangeSetHook != nil {
 			if hasChangeSet, ok := stateWriter.(HasChangeSetWriter); ok {
-				changeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
+				params.ChangeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
 			}
 		}
 
diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go
index c3890b0ac1..bfbb7889ae 100644
--- a/eth/stagedsync/stagebuilder.go
+++ b/eth/stagedsync/stagebuilder.go
@@ -1,6 +1,8 @@
 package stagedsync
 
 import (
+	"fmt"
+	"strings"
 	"time"
 
 	"github.com/ledgerwatch/turbo-geth/core"
@@ -28,12 +30,13 @@ type StageParameters struct {
 	datadir     string
 	// QuitCh is a channel that is closed. This channel is useful to listen to when
 	// the stage can take significant time and gracefully shutdown at Ctrl+C.
-	QuitCh           <-chan struct{}
-	headersFetchers  []func() error
-	txPool           *core.TxPool
-	poolStart        func() error
-	changeSetHook    ChangeSetHook
-	prefetchedBlocks *PrefetchedBlocks
+	QuitCh             <-chan struct{}
+	headersFetchers    []func() error
+	txPool             *core.TxPool
+	poolStart          func() error
+	changeSetHook      ChangeSetHook
+	prefetchedBlocks   *PrefetchedBlocks
+	stateReaderBuilder StateReaderBuilder
 }
 
 // StageBuilder represent an object to create a single stage for staged sync
@@ -47,6 +50,29 @@ type StageBuilder struct {
 // StageBuilders represents an ordered list of builders to build different stages. It also contains helper methods to change the list of stages.
 type StageBuilders []StageBuilder
 
+// MustReplace finds a stage with a specific ID and then sets the new one instead of that.
+// Chainable but panics if it can't find stage to replace.
+func (bb StageBuilders) MustReplace(id stages.SyncStage, newBuilder StageBuilder) StageBuilders {
+	result := make([]StageBuilder, len(bb))
+
+	found := false
+
+	for i, originalBuilder := range bb {
+		if strings.EqualFold(string(originalBuilder.ID), string(id)) {
+			found = true
+			result[i] = newBuilder
+		} else {
+			result[i] = originalBuilder
+		}
+	}
+
+	if !found {
+		panic(fmt.Sprintf("StageBuilders#Replace can't find the stage with id %s", string(id)))
+	}
+
+	return result
+}
+
 // Build creates sync states out of builders
 func (bb StageBuilders) Build(world StageParameters) []*Stage {
 	stages := make([]*Stage, len(bb))
@@ -140,7 +166,15 @@ func DefaultStages() StageBuilders {
 					ID:          stages.Execution,
 					Description: "Execute blocks w/o hash checks",
 					ExecFunc: func(s *StageState, u Unwinder) error {
-						return SpawnExecuteBlocksStage(s, world.TX, world.chainConfig, world.chainContext, world.vmConfig, 0 /* limit (meaning no limit) */, world.QuitCh, world.storageMode.Receipts, world.hdd, world.changeSetHook)
+						return SpawnExecuteBlocksStage(s, world.TX,
+							world.chainConfig, world.chainContext, world.vmConfig,
+							world.QuitCh,
+							ExecuteBlockStageParams{
+								WriteReceipts: world.storageMode.Receipts,
+								Hdd:           world.hdd,
+								ChangeSetHook: world.changeSetHook,
+								ReaderBuilder: world.stateReaderBuilder,
+							})
 					},
 					UnwindFunc: func(u *UnwindState, s *StageState) error {
 						return UnwindExecutionStage(u, s, world.TX, world.storageMode.Receipts)
diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go
index 01f53721f8..5556fc1e9a 100644
--- a/eth/stagedsync/stagedsync.go
+++ b/eth/stagedsync/stagedsync.go
@@ -2,6 +2,7 @@ package stagedsync
 
 import (
 	"github.com/ledgerwatch/turbo-geth/core"
+	"github.com/ledgerwatch/turbo-geth/core/state"
 	"github.com/ledgerwatch/turbo-geth/core/vm"
 	"github.com/ledgerwatch/turbo-geth/ethdb"
 	"github.com/ledgerwatch/turbo-geth/params"
@@ -13,13 +14,23 @@ type StagedSync struct {
 	PrefetchedBlocks *PrefetchedBlocks
 	stageBuilders    StageBuilders
 	unwindOrder      UnwindOrder
+	params           OptionalParameters
 }
 
-func New(stages StageBuilders, unwindOrder UnwindOrder) *StagedSync {
+// OptionalParameters contains any non-necessary parateres you can specify to fine-tune
+// and experiment on StagedSync.
+type OptionalParameters struct {
+	// StateReaderBuilder is a function that returns state reader for the block execution stage.
+	// It can be used to add someting like bloom filters to figure out non-existing accounts and similar experiments.
+	StateReaderBuilder StateReaderBuilder
+}
+
+func New(stages StageBuilders, unwindOrder UnwindOrder, params OptionalParameters) *StagedSync {
 	return &StagedSync{
 		PrefetchedBlocks: NewPrefetchedBlocks(),
 		stageBuilders:    stages,
 		unwindOrder:      unwindOrder,
+		params:           params,
 	}
 }
 
@@ -40,24 +51,32 @@ func (stagedSync *StagedSync) Prepare(
 	poolStart func() error,
 	changeSetHook ChangeSetHook,
 ) (*State, error) {
+	var readerBuilder StateReaderBuilder
+	if stagedSync.params.StateReaderBuilder != nil {
+		readerBuilder = stagedSync.params.StateReaderBuilder
+	} else {
+		readerBuilder = func(getter ethdb.Getter) state.StateReader { return state.NewPlainStateReader(getter) }
+	}
+
 	stages := stagedSync.stageBuilders.Build(
 		StageParameters{
-			d:                d,
-			chainConfig:      chainConfig,
-			chainContext:     chainContext,
-			vmConfig:         vmConfig,
-			db:               db,
-			TX:               tx,
-			pid:              pid,
-			storageMode:      storageMode,
-			datadir:          datadir,
-			QuitCh:           quitCh,
-			headersFetchers:  headersFetchers,
-			txPool:           txPool,
-			poolStart:        poolStart,
-			changeSetHook:    changeSetHook,
-			hdd:              hdd,
-			prefetchedBlocks: stagedSync.PrefetchedBlocks,
+			d:                  d,
+			chainConfig:        chainConfig,
+			chainContext:       chainContext,
+			vmConfig:           vmConfig,
+			db:                 db,
+			TX:                 tx,
+			pid:                pid,
+			storageMode:        storageMode,
+			datadir:            datadir,
+			QuitCh:             quitCh,
+			headersFetchers:    headersFetchers,
+			txPool:             txPool,
+			poolStart:          poolStart,
+			changeSetHook:      changeSetHook,
+			hdd:                hdd,
+			prefetchedBlocks:   stagedSync.PrefetchedBlocks,
+			stateReaderBuilder: readerBuilder,
 		},
 	)
 	state := NewState(stages)
-- 
GitLab