diff --git a/core/state/cached_reader2.go b/core/state/cached_reader2.go new file mode 100644 index 0000000000000000000000000000000000000000..f8a05164722a98ca2441a7a0c0e03d8541ecfad7 --- /dev/null +++ b/core/state/cached_reader2.go @@ -0,0 +1,80 @@ +package state + +import ( + "bytes" + "encoding/binary" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/kvcache" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/core/types/accounts" +) + +// CachedReader2 is a wrapper for an instance of type StateReader +// This wrapper only makes calls to the underlying reader if the item is not in the cache +type CachedReader2 struct { + r StateReader + cache kvcache.CacheView + db kv.Tx +} + +// NewCachedReader2 wraps a given state reader into the cached reader +func NewCachedReader2(r StateReader, cache kvcache.CacheView, tx kv.Tx) *CachedReader2 { + return &CachedReader2{r: r, cache: cache, db: tx} +} + +// ReadAccountData is called when an account needs to be fetched from the state +func (cr *CachedReader2) ReadAccountData(address common.Address) (*accounts.Account, error) { + enc, err := cr.cache.Get(address.Bytes()) + if err != nil { + return nil, err + } + if len(enc) == 0 { + return nil, nil + } + var a accounts.Account + if err = a.DecodeForStorage(enc); err != nil { + return nil, err + } + return &a, nil +} + +func (r *CachedReader2) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { + compositeKey := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()) + enc, err := r.cache.Get(compositeKey) + if err != nil { + return nil, err + } + if len(enc) == 0 { + return nil, nil + } + return enc, nil +} + +func (r *CachedReader2) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { + if bytes.Equal(codeHash.Bytes(), emptyCodeHash) { + return nil, nil + } + code, err := r.db.GetOne(kv.Code, codeHash.Bytes()) + if len(code) == 0 { + return nil, nil + } + return code, err +} + +func (r *CachedReader2) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { + code, err := r.ReadAccountCode(address, incarnation, codeHash) + return len(code), err +} + +func (r *CachedReader2) ReadAccountIncarnation(address common.Address) (uint64, error) { + b, err := r.db.GetOne(kv.IncarnationMap, address.Bytes()) + if err != nil { + return 0, err + } + if len(b) == 0 { + return 0, nil + } + return binary.BigEndian.Uint64(b), nil +} diff --git a/eth/backend.go b/eth/backend.go index cf0a81344f809a533d4297aa6b9d976515676404..3d8fb1ae4169ba7a1db34bf01930bab8e9fcf80d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -49,6 +49,7 @@ import ( "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/clique" "github.com/ledgerwatch/erigon/consensus/ethash" + "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" @@ -226,7 +227,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere sentries: []direct.SentryClient{}, notifications: &stagedsync.Notifications{ Events: privateapi.NewEvents(), - Accumulator: &shards.Accumulator{}, + Accumulator: shards.NewAccumulator(chainConfig), StateChangesConsumer: kvRPC, }, } @@ -494,13 +495,12 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere if backend.config.TxPool.V2 { if err := backend.txPool2DB.View(context.Background(), func(tx kv.Tx) error { - var baseFee uint64 - if hh.BaseFee != nil { - baseFee = hh.BaseFee.Uint64() - } + pendingBaseFee := misc.CalcBaseFee(chainConfig, hh) return backend.txPool2.OnNewBlock(context.Background(), &remote.StateChangeBatch{ - DatabaseViewID: tx.ViewID(), ChangeBatch: []*remote.StateChange{ - {BlockHeight: hh.Number.Uint64(), BlockHash: gointerfaces.ConvertHashToH256(hh.Hash()), ProtocolBaseFee: baseFee}, + PendingBlockBaseFee: pendingBaseFee.Uint64(), + DatabaseViewID: tx.ViewID(), + ChangeBatch: []*remote.StateChange{ + {BlockHeight: hh.Number.Uint64(), BlockHash: gointerfaces.ConvertHashToH256(hh.Hash())}, }, }, txpool2.TxSlots{}, txpool2.TxSlots{}, tx) }); err != nil { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 8d3cb7e3336eb637e28d973c016e3a5b1d1fa40c..4180ba54620e7581725556ef3e1987746e23d595 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -204,11 +204,7 @@ func newStateReaderWriter( if err != nil { return nil, nil, err } - var blockBaseFee uint64 - if block.BaseFee() != nil { - blockBaseFee = block.BaseFee().Uint64() - } - accumulator.StartChange(block.NumberU64(), block.Hash(), block.NumberU64()-1, block.ParentHash(), txs, blockBaseFee, false) + accumulator.StartChange(block.NumberU64(), block.Hash(), txs, false) } else { accumulator = nil } @@ -425,17 +421,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, tx kv.RwTx, quit <-chan if err != nil { return err } - targetHeader := rawdb.ReadHeader(tx, hash, u.UnwindPoint) - var protocolBaseFee uint64 - if targetHeader.BaseFee != nil { - protocolBaseFee = targetHeader.BaseFee.Uint64() - } - prevHash, err := rawdb.ReadCanonicalHash(tx, s.BlockNumber) - if err != nil { - return fmt.Errorf("read canonical hash of unwind point: %w", err) - } - - accumulator.StartChange(u.UnwindPoint, hash, s.BlockNumber, prevHash, txs, protocolBaseFee, true) + accumulator.StartChange(u.UnwindPoint, hash, txs, true) } changes := etl.NewCollector(cfg.tmpdir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize)) diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index 08b62b6d8ebfc94b7b9b6fcd3bc37c6683bee355..3cbe501db8598415b5a1a5d06bf551f193c7969f 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -3,6 +3,7 @@ package stagedsync import ( "context" "fmt" + "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon-lib/kv" @@ -123,12 +124,7 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context) return nil } -func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, unwindTo *uint64, notifier ChainEventNotifier, db kv.RwDB) error { - tx, err := db.BeginRo(ctx) - if err != nil { - return err - } - defer tx.Rollback() +func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error { notifyTo, err := stages.GetStageProgress(tx, stages.Finish) // because later stages can be disabled if err != nil { return err diff --git a/go.mod b/go.mod index 8520408f8a1070a02d9c5a3eaaf1b361ad67b972..3a07d6de4f9bfafd3d32789d52fbfd5e2fcdeb86 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/goccy/go-json v0.7.4 github.com/gofrs/flock v0.8.1 github.com/golang/protobuf v1.5.2 - github.com/golang/snappy v0.0.3 + github.com/golang/snappy v0.0.4 github.com/google/btree v1.0.1 github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa github.com/gorilla/websocket v1.4.2 @@ -36,7 +36,7 @@ require ( github.com/json-iterator/go v1.1.11 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20210922103429-7740382188a6 + github.com/ledgerwatch/erigon-lib v0.0.0-20210926124830-9b3efaa33e2a github.com/ledgerwatch/log/v3 v3.3.1 github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d github.com/logrusorgru/aurora/v3 v3.0.0 diff --git a/go.sum b/go.sum index 93b17301f70e0678842292997080609765cdf828..9e9a00939cc1d39420ebc3087d888ee290e6dc9f 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,9 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -493,8 +494,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20210922103429-7740382188a6 h1:jlAd4fIbjy+82GJUa/D+RGr5ZgDBQomsmaON0p9BGAM= -github.com/ledgerwatch/erigon-lib v0.0.0-20210922103429-7740382188a6/go.mod h1:WgyjBACSDhgfepaaDJIbzd2TV868EjOrp2ILnEMKspY= +github.com/ledgerwatch/erigon-lib v0.0.0-20210926124830-9b3efaa33e2a h1:zK1CbuTV8MClnVuxH35a4CqAXcJoEcAaqBLb9DqBjTc= +github.com/ledgerwatch/erigon-lib v0.0.0-20210926124830-9b3efaa33e2a/go.mod h1:WgyjBACSDhgfepaaDJIbzd2TV868EjOrp2ILnEMKspY= github.com/ledgerwatch/log/v3 v3.3.1 h1:HmvLeTEvtCtqSvtu4t/a5MAdcLfeBcbIeowXbLYuzLc= github.com/ledgerwatch/log/v3 v3.3.1/go.mod h1:S3VJqhhVX32rbp1JyyvhJou12twtFwNEPESBgpbNkRk= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno= diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index 6fafe8a078bf790303573e2a8dd6b8c0d2415dce..dab38689560aa3b0665addf67c5688c0892991cb 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -7,17 +7,23 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/params" ) // Accumulator collects state changes in a form that can then be delivered to the RPC daemon type Accumulator struct { viewID uint64 // mdbx's txID + chainConfig *params.ChainConfig changes []*remote.StateChange latestChange *remote.StateChange accountChangeIndex map[common.Address]int // For the latest changes, allows finding account change by account's address storageChangeIndex map[common.Address]map[common.Hash]int } +func NewAccumulator(chainConfig *params.ChainConfig) *Accumulator { + return &Accumulator{chainConfig: chainConfig} +} + type StateChangeConsumer interface { SendStateChanges(ctx context.Context, sc *remote.StateChangeBatch) } @@ -29,22 +35,22 @@ func (a *Accumulator) Reset(viewID uint64) { a.storageChangeIndex = nil a.viewID = viewID } -func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer) { +func (a *Accumulator) ChainConfig() *params.ChainConfig { return a.chainConfig } +func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer, pendingBaseFee uint64) { if a == nil || c == nil || len(a.changes) == 0 { return } - c.SendStateChanges(ctx, &remote.StateChangeBatch{DatabaseViewID: a.viewID, ChangeBatch: a.changes}) + sc := &remote.StateChangeBatch{DatabaseViewID: a.viewID, ChangeBatch: a.changes, PendingBlockBaseFee: pendingBaseFee} + c.SendStateChanges(ctx, sc) a.Reset(0) // reset here for GC, but there will be another Reset with correct viewID } // StartChange begins accumulation of changes for a new block -func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, prevBlockHeight uint64, prevBlockHash common.Hash, txs [][]byte, protocolBaseFee uint64, unwind bool) { +func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, txs [][]byte, unwind bool) { a.changes = append(a.changes, &remote.StateChange{}) a.latestChange = a.changes[len(a.changes)-1] a.latestChange.BlockHeight = blockHeight a.latestChange.BlockHash = gointerfaces.ConvertHashToH256(blockHash) - a.latestChange.PrevBlockHeight = prevBlockHeight - a.latestChange.PrevBlockHash = gointerfaces.ConvertHashToH256(prevBlockHash) if unwind { a.latestChange.Direction = remote.Direction_UNWIND } else { @@ -58,7 +64,6 @@ func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, pre a.latestChange.Txs[i] = libcommon.Copy(txs[i]) } } - a.latestChange.ProtocolBaseFee = protocolBaseFee } // ChangeAccount adds modification of account balance or nonce (or both) to the latest change diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 63c9613489e528032a19259f01c29b84a0b6de9f..4118152a4d6243dffec3e9586b95ac896008515d 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -162,7 +162,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey Key: key, Notifications: &stagedsync.Notifications{ Events: privateapi.NewEvents(), - Accumulator: &shards.Accumulator{}, + Accumulator: shards.NewAccumulator(gspec.Config), StateChangesConsumer: nil, }, UpdateHead: func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) { @@ -275,7 +275,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.ChainConfig, mock.Engine, &vm.Config{}, - &shards.Accumulator{}, + mock.Notifications.Accumulator, cfg.StateStream, mock.tmpdir, ), diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 9d1d72af59eb462aa9eba44b52920870336a2e73..7cac20c053b43a07e16cd7064aba8f1e6737c605 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -13,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" + "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/vm" @@ -171,11 +172,23 @@ func StageLoopStep( } updateHead(ctx, head, headHash, headTd256) - notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer) + if notifications.Accumulator != nil { + if err := db.View(ctx, func(tx kv.Tx) error { + header := rawdb.ReadCurrentHeader(tx) + if header == nil { + return nil + } + pendingBaseFee := misc.CalcBaseFee(notifications.Accumulator.ChainConfig(), header) + notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64()) - err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, db) - if err != nil { - return err + err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, tx) + if err != nil { + return err + } + return nil + }); err != nil { + return err + } } return nil