diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index 7fc59661c12c6d1acd97c31d2bd8ccfa37e6b912..df05a1531534b09c1c66f7dbc39160aca8c91744 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -11,6 +11,7 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon-lib/kv/kvcache" "github.com/ledgerwatch/erigon-lib/kv/remotedb" "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon-lib/txpool/txpooluitl" @@ -95,9 +96,13 @@ var rootCmd = &cobra.Command{ cfg.LogEvery = 5 * time.Minute cfg.CommitEvery = 5 * time.Minute + cacheConfig := kvcache.DefaultCoherentCacheConfig + cacheConfig.MetricsLabel = "txpool" + newTxs := make(chan txpool.Hashes, 1024) defer close(newTxs) - txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg, newTxs, coreDB, sentryClients, kvClient) + txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg, + kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient) if err != nil { return err } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e8a8f03a67bd8e2c6f89a38bdc69926d8566eec7..35694cca6b6aa96fe042e7d8a75a0d283bc79ffa 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -148,6 +148,10 @@ var ( // Transaction pool settings TxPoolV2Flag = cli.BoolFlag{ Name: "txpool.v2", + Usage: "experimental internal pool and block producer, see ./cmd/txpool/readme.md for more info. Disabling internal txpool and block producer.", + } + TxPoolDisableFlag = cli.BoolFlag{ + Name: "txpool.disable", Usage: "experimental external pool and block producer, see ./cmd/txpool/readme.md for more info. Disabling internal txpool and block producer.", } TxPoolLocalsFlag = cli.StringFlag{ @@ -920,6 +924,9 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolV2Flag.Name) { cfg.V2 = true } + if ctx.GlobalIsSet(TxPoolDisableFlag.Name) { + cfg.Disable = true + } if ctx.GlobalIsSet(TxPoolLocalsFlag.Name) { locals := strings.Split(ctx.GlobalString(TxPoolLocalsFlag.Name), ",") for _, account := range locals { diff --git a/eth/backend.go b/eth/backend.go index dfac89ac785aaf2d4169b23718f56d7b30ea3090..4cf0b06a050483065f84d9a352a38e0534d0e013 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -37,6 +37,7 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/kvcache" txpool2 "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon-lib/txpool/txpooluitl" "github.com/ledgerwatch/erigon/cmd/sentry/download" @@ -362,13 +363,18 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere if config.TxPool.V2 { cfg := txpool2.DefaultConfig cfg.DBDir = path.Join(stack.Config().DataDir, "txpool") - cfg.LogEvery = 5 * time.Minute - cfg.CommitEvery = 5 * time.Minute + cfg.LogEvery = 15 * time.Second //5 * time.Minute + cfg.CommitEvery = 15 * time.Second //5 * time.Minute + + cacheConfig := kvcache.DefaultCoherentCacheConfig + cacheConfig.MetricsLabel = "txpool" stateDiffClient := direct.NewStateDiffClientDirect(kvRPC) backend.newTxs2 = make(chan txpool2.Hashes, 1024) //defer close(newTxs) - backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents(ctx, cfg, backend.newTxs2, backend.chainDB, backend.sentries, stateDiffClient) + backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents( + ctx, cfg, kvcache.New(cacheConfig), backend.newTxs2, backend.chainDB, backend.sentries, stateDiffClient, + ) if err != nil { return nil, err } @@ -435,34 +441,39 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } } - if config.TxPool.V2 { - backend.txPool2Fetch.ConnectCore() - backend.txPool2Fetch.ConnectSentries() - go txpool2.MainLoop(backend.downloadCtx, backend.txPool2DB, backend.chainDB, backend.txPool2, backend.newTxs2, backend.txPool2Send, backend.txPool2GrpcServer.NewSlotsStreams, func() { - select { - case backend.notifyMiningAboutNewTxs <- struct{}{}: - default: - } - }) - } else { - go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer) - go func() { - newTransactions := make(chan core.NewTxsEvent, 128) - sub := backend.txPool.SubscribeNewTxsEvent(newTransactions) - defer sub.Unsubscribe() - defer close(newTransactions) - for { - select { - case <-ctx.Done(): - return - case <-newTransactions: + if !config.TxPool.Disable { + if config.TxPool.V2 { + backend.txPool2Fetch.ConnectCore() + backend.txPool2Fetch.ConnectSentries() + go txpool2.MainLoop(backend.downloadCtx, + backend.txPool2DB, backend.chainDB, + backend.txPool2, backend.newTxs2, backend.txPool2Send, backend.txPool2GrpcServer.NewSlotsStreams, + func() { select { case backend.notifyMiningAboutNewTxs <- struct{}{}: default: } + }) + } else { + go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer) + go func() { + newTransactions := make(chan core.NewTxsEvent, 128) + sub := backend.txPool.SubscribeNewTxsEvent(newTransactions) + defer sub.Unsubscribe() + defer close(newTransactions) + for { + select { + case <-ctx.Done(): + return + case <-newTransactions: + select { + case backend.notifyMiningAboutNewTxs <- struct{}{}: + default: + } + } } - } - }() + }() + } } go func() { defer debug.LogPanic() @@ -638,7 +649,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy if hh.BaseFee != nil { baseFee = hh.BaseFee.Uint64() } - return s.txPool2.OnNewBlock(nil, txpool2.TxSlots{}, txpool2.TxSlots{}, baseFee, hh.Number.Uint64(), hh.Hash()) + return s.txPool2.OnNewBlock(context.Background(), nil, txpool2.TxSlots{}, txpool2.TxSlots{}, baseFee, hh.Number.Uint64(), hh.Hash()) }); err != nil { return err } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 79970288d708a7d40ef8e54b4e27d5631e774918..99f28c06e01c78fa081d86a0495db973f231e96f 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -208,7 +208,7 @@ func newStateReaderWriter( if block.BaseFee() != nil { blockBaseFee = block.BaseFee().Uint64() } - accumulator.StartChange(block.NumberU64(), block.Hash(), txs, blockBaseFee, false) + accumulator.StartChange(block.NumberU64(), block.Hash(), block.NumberU64()-1, block.ParentHash(), txs, blockBaseFee, false) } else { accumulator = nil } @@ -430,7 +430,12 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan if targetHeader.BaseFee != nil { protocolBaseFee = targetHeader.BaseFee.Uint64() } - accumulator.StartChange(u.UnwindPoint, hash, txs, protocolBaseFee, true /* unwind */) + prevHash, err := rawdb.ReadCanonicalHash(tx, s.BlockNumber) + if err != nil { + return fmt.Errorf("read canonical hash of unwind point: %w", err) + } + + accumulator.StartChange(u.UnwindPoint, hash, s.BlockNumber, prevHash, txs, protocolBaseFee, true /* unwind */) } changes := etl.NewCollector(cfg.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize)) diff --git a/ethdb/remotedbserver/server.go b/ethdb/remotedbserver/server.go index a77c2cf4a1481bbf9cf851f99b63a642c65373c4..e02f109628783df25f41a82d6acaa43fedb8bbce 100644 --- a/ethdb/remotedbserver/server.go +++ b/ethdb/remotedbserver/server.go @@ -240,9 +240,11 @@ func (s *KvServer) StateChanges(req *remote.StateChangeRequest, server remote.KV } func (s *KvServer) SendStateChanges(sc *remote.StateChange) { + log.Info("SendStateChanges start") if err := s.stateChangeStreams.Broadcast(sc); err != nil { log.Warn("Sending new peer notice to core P2P failed", "error", err) } + log.Info("SendStateChanges end") } type StateChangeStreams struct { diff --git a/go.mod b/go.mod index e488948f0eec047ecde9ce39cb4a707498d633c5..79844db9b6ff2549ecb1b004db52a0f7da3e33d5 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible github.com/kylelemons/godebug v1.1.0 // indirect - github.com/ledgerwatch/erigon-lib v0.0.0-20210911154831-f79629b98d81 + github.com/ledgerwatch/erigon-lib v0.0.0-20210913065323-1b66a410bef3 github.com/ledgerwatch/log/v3 v3.3.0 github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d github.com/logrusorgru/aurora/v3 v3.0.0 diff --git a/go.sum b/go.sum index 8b238a11d59d8efc8d5c326bf7151bfbfa4c157f..fca6bbfa173d95252efa6bc6e258680286ab7d22 100644 --- a/go.sum +++ b/go.sum @@ -493,8 +493,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20210911154831-f79629b98d81 h1:BnV+T0n7sSnNvFOOqk/1piYfeaE2KYEEP+UiERLuhBk= -github.com/ledgerwatch/erigon-lib v0.0.0-20210911154831-f79629b98d81/go.mod h1:T3Jsfvp5YkG2anXy53dEEM1FbvQA4dXrfp5WcSypMoE= +github.com/ledgerwatch/erigon-lib v0.0.0-20210913065323-1b66a410bef3 h1:LEE17DjFV3+2pRin3snY9mOXiNCJnaUXLU4HWuzu5RY= +github.com/ledgerwatch/erigon-lib v0.0.0-20210913065323-1b66a410bef3/go.mod h1:GiYMKOrTi4T2feH10Fvu8QTbvLm/GOckYzUmuk9WU8s= github.com/ledgerwatch/log/v3 v3.3.0 h1:k8N/3NQLILr8CKCMyza261vLFKU7VA+nMNNb0wVyQSc= github.com/ledgerwatch/log/v3 v3.3.0/go.mod h1:J58eOHHrIYHxl7LKkRsb/0YibKwtLfauUryl5SLRGm0= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno= diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index a585271e9e09be030ffb82cbb7bb62acfba8a5bc..640beac7804812d38b1228ff089b8c5b7c06400c 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -11,6 +11,7 @@ var DefaultFlags = []cli.Flag{ utils.DataDirFlag, utils.EthashDatasetDirFlag, utils.TxPoolV2Flag, + utils.TxPoolDisableFlag, utils.TxPoolLocalsFlag, utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index ee3c940e92abd4f899e170c87d7edb91f0134bfb..65698672be4885280ae795e8cd7393e360ceaf29 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -41,11 +41,13 @@ func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer) { } // StartChange begins accumulation of changes for a new block -func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, txs [][]byte, protocolBaseFee uint64, unwind bool) { +func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, prevBlockHeight uint64, prevBlockHash common.Hash, txs [][]byte, protocolBaseFee uint64, unwind bool) { a.changes = append(a.changes, remote.StateChange{}) a.latestChange = &a.changes[len(a.changes)-1] a.latestChange.BlockHeight = blockHeight a.latestChange.BlockHash = gointerfaces.ConvertHashToH256(blockHash) + a.latestChange.PrevBlockHeight = prevBlockHeight + a.latestChange.PrevBlockHash = gointerfaces.ConvertHashToH256(prevBlockHash) if unwind { a.latestChange.Direction = remote.Direction_UNWIND } else { diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 733cf3a9a16bfd11aa1e9c608b609630ffbea374..83a3e5ce34bb668066379ceafbf31fceb7c53119 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -171,12 +171,13 @@ func StageLoopStep( } updateHead(ctx, head, headHash, headTd256) + notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer) + err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, db) if err != nil { return err } - notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer) return nil }