From 3de50637cd38a422a09f585c63d96d2b9e3fd8a2 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Wed, 22 Sep 2021 07:54:29 +0700 Subject: [PATCH] remove changeset.Walk func (#2716) --- cmd/hack/hack.go | 60 +++++------------ cmd/integration/commands/state_stages.go | 20 +++--- cmd/rpcdaemon/commands/debug_api.go | 8 +-- cmd/state/commands/check_change_sets.go | 12 ++-- cmd/state/verify/check_indexes.go | 10 +-- common/changeset/account_changeset.go | 8 +-- common/changeset/changeset.go | 25 ++++++- common/changeset/storage_changeset.go | 22 ++----- core/state/history_test.go | 20 +++--- eth/stagedsync/stage_indexes.go | 27 +++----- ethdb/bitmapdb/dbutils.go | 33 ++++++---- ethdb/olddb/object_db.go | 11 ---- ethdb/olddb/tx_db.go | 21 ------ turbo/snapshotsync/snapshot_builder_test.go | 72 +++++++-------------- 14 files changed, 131 insertions(+), 218 deletions(-) diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index 9e3d889012..94eb75b035 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -888,19 +888,6 @@ func validateTxLookups2(db kv.RwDB, startBlock uint64, interruptCh chan bool) { } } -func getModifiedAccounts(chaindata string) { - // TODO(tjayrush): The call to GetModifiedAccounts needs a database tx - fmt.Println("hack - getModiiedAccounts is temporarily disabled.") - db := mdbx.MustOpen(chaindata) - defer db.Close() - tool.Check(db.View(context.Background(), func(tx kv.Tx) error { - addrs, err := changeset.GetModifiedAccounts(tx, 49300, 49400) - tool.Check(err) - fmt.Printf("Len(addrs)=%d\n", len(addrs)) - return nil - })) -} - type Receiver struct { defaultReceiver *trie.RootHashAggregator accountMap map[string]*accounts.Account @@ -1012,17 +999,12 @@ func testGetProof(chaindata string, address common.Address, rewind int, regen bo log.Info("GetProof", "address", address, "storage keys", len(storageKeys), "head", *headNumber, "block", block, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) - ts := dbutils.EncodeBlockNumber(block + 1) accountMap := make(map[string]*accounts.Account) - if err := changeset.Walk(tx, kv.AccountChangeSet, ts, 0, func(blockN uint64, address, v []byte) (bool, error) { - if blockN > *headNumber { - return false, nil - } - + if err := changeset.ForRange(tx, kv.AccountChangeSet, block+1, *headNumber+1, func(blockN uint64, address, v []byte) error { var addrHash, err = common.HashData(address) if err != nil { - return false, err + return err } k := addrHash[:] @@ -1030,14 +1012,14 @@ func testGetProof(chaindata string, address common.Address, rewind int, regen bo if len(v) > 0 { var a accounts.Account if innerErr := a.DecodeForStorage(v); innerErr != nil { - return false, innerErr + return innerErr } accountMap[string(k)] = &a } else { accountMap[string(k)] = nil } } - return true, nil + return nil }); err != nil { return err } @@ -1045,19 +1027,16 @@ func testGetProof(chaindata string, address common.Address, rewind int, regen bo log.Info("Constructed account map", "size", len(accountMap), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys)) storageMap := make(map[string][]byte) - if err := changeset.Walk(tx, kv.StorageChangeSet, ts, 0, func(blockN uint64, address, v []byte) (bool, error) { - if blockN > *headNumber { - return false, nil - } + if err := changeset.ForRange(tx, kv.StorageChangeSet, block+1, *headNumber+1, func(blockN uint64, address, v []byte) error { var addrHash, err = common.HashData(address) if err != nil { - return false, err + return err } k := addrHash[:] if _, ok := storageMap[string(k)]; !ok { storageMap[string(k)] = v } - return true, nil + return nil }); err != nil { return err } @@ -1278,29 +1257,23 @@ func changeSetStats(chaindata string, block1, block2 uint64) error { return err1 } defer tx.Rollback() - if err := changeset.Walk(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block1), 0, func(blockN uint64, k, v []byte) (bool, error) { - if blockN >= block2 { - return false, nil - } + if err := changeset.ForRange(tx, kv.AccountChangeSet, block1, block2, func(blockN uint64, k, v []byte) error { if (blockN-block1)%100000 == 0 { fmt.Printf("at the block %d for accounts, booster size: %d\n", blockN, len(accounts)) } accounts[string(common.CopyBytes(k))] = struct{}{} - return true, nil + return nil }); err != nil { return err } storage := make(map[string]struct{}) - if err := changeset.Walk(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block1), 0, func(blockN uint64, k, v []byte) (bool, error) { - if blockN >= block2 { - return false, nil - } + if err := changeset.ForRange(tx, kv.StorageChangeSet, block1, block2, func(blockN uint64, k, v []byte) error { if (blockN-block1)%100000 == 0 { fmt.Printf("at the block %d for accounts, booster size: %d\n", blockN, len(accounts)) } storage[string(common.CopyBytes(k))] = struct{}{} - return true, nil + return nil }); err != nil { return err } @@ -1319,11 +1292,11 @@ func searchChangeSet(chaindata string, key []byte, block uint64) error { } defer tx.Rollback() - if err := changeset.Walk(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block), 0, func(blockN uint64, k, v []byte) (bool, error) { + if err := changeset.ForEach(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block), func(blockN uint64, k, v []byte) error { if bytes.Equal(k, key) { fmt.Printf("Found in block %d with value %x\n", blockN, v) } - return true, nil + return nil }); err != nil { return err } @@ -1339,11 +1312,11 @@ func searchStorageChangeSet(chaindata string, key []byte, block uint64) error { return err1 } defer tx.Rollback() - if err := changeset.Walk(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block), 0, func(blockN uint64, k, v []byte) (bool, error) { + if err := changeset.ForEach(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block), func(blockN uint64, k, v []byte) error { if bytes.Equal(k, key) { fmt.Printf("Found in block %d with value %x\n", blockN, v) } - return true, nil + return nil }); err != nil { return err } @@ -2384,9 +2357,6 @@ func main() { case "val-tx-lookup-2": ValidateTxLookups2(*chaindata) - case "modiAccounts": - getModifiedAccounts(*chaindata) - case "slice": dbSlice(*chaindata, *bucket, common.FromHex(*hash)) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index d2d2d3a6a9..470a52ab6f 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -534,11 +534,11 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error { func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset.ChangeSet, expectedStorageChanges *changeset.ChangeSet) error { i := 0 sort.Sort(expectedAccountChanges) - err := changeset.Walk(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) { + err := changeset.ForPrefix(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error { c := expectedAccountChanges.Changes[i] i++ if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) { - return true, nil + return nil } fmt.Printf("Unexpected account changes in block %d\n", blockNum) @@ -546,7 +546,7 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset fmt.Printf("0x%x: %x\n", k, v) fmt.Printf("Expected: ==========================\n") fmt.Printf("0x%x %x\n", c.Key, c.Value) - return false, fmt.Errorf("check change set failed") + return fmt.Errorf("check change set failed") }) if err != nil { return err @@ -560,11 +560,11 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset i = 0 sort.Sort(expectedStorageChanges) - err = changeset.Walk(db, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) { + err = changeset.ForPrefix(db, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error { c := expectedStorageChanges.Changes[i] i++ if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) { - return true, nil + return nil } fmt.Printf("Unexpected storage changes in block %d\n", blockNum) @@ -572,7 +572,7 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset fmt.Printf("0x%x: %x\n", k, v) fmt.Printf("Expected: ==========================\n") fmt.Printf("0x%x %x\n", c.Key, c.Value) - return false, fmt.Errorf("check change set failed") + return fmt.Errorf("check change set failed") }) if err != nil { return err @@ -587,7 +587,7 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset func checkHistory(tx kv.Tx, changeSetBucket string, blockNum uint64) error { indexBucket := changeset.Mapper[changeSetBucket].IndexBucket blockNumBytes := dbutils.EncodeBlockNumber(blockNum) - if err := changeset.Walk(tx, changeSetBucket, blockNumBytes, 0, func(blockN uint64, address, v []byte) (bool, error) { + if err := changeset.ForEach(tx, changeSetBucket, blockNumBytes, func(blockN uint64, address, v []byte) error { k := dbutils.CompositeKeyWithoutIncarnation(address) from := blockN if from > 0 { @@ -595,12 +595,12 @@ func checkHistory(tx kv.Tx, changeSetBucket string, blockNum uint64) error { } bm, innerErr := bitmapdb.Get64(tx, indexBucket, k, from, blockN+1) if innerErr != nil { - return false, innerErr + return innerErr } if !bm.Contains(blockN) { - return false, fmt.Errorf("checkHistory failed: bucket=%s,block=%d,addr=%x", changeSetBucket, blockN, k) + return fmt.Errorf("checkHistory failed: bucket=%s,block=%d,addr=%x", changeSetBucket, blockN, k) } - return true, nil + return nil }); err != nil { return err } diff --git a/cmd/rpcdaemon/commands/debug_api.go b/cmd/rpcdaemon/commands/debug_api.go index 8902cd4b2d..3bcbedbf77 100644 --- a/cmd/rpcdaemon/commands/debug_api.go +++ b/cmd/rpcdaemon/commands/debug_api.go @@ -159,10 +159,10 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByNumber(ctx context.Context, return nil, fmt.Errorf("start block (%d) is later than the latest block (%d)", startNum, latestBlock) } - endNum := startNum // allows for single param calls + endNum := startNum + 1 // allows for single param calls if endNumber != nil { // forces negative numbers to fail (too large) but allows zero - endNum = uint64(endNumber.Int64()) + endNum = uint64(endNumber.Int64()) + 1 } // is endNum too big? @@ -193,7 +193,7 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByHash(ctx context.Context, s return nil, fmt.Errorf("start block %x not found", startHash) } startNum := startBlock.NumberU64() - endNum := startNum // allows for single parameter calls + endNum := startNum + 1 // allows for single parameter calls if endHash != nil { endBlock, err := rawdb.ReadBlockByHash(tx, *endHash) @@ -203,7 +203,7 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByHash(ctx context.Context, s if endBlock == nil { return nil, fmt.Errorf("end block %x not found", *endHash) } - endNum = endBlock.NumberU64() + endNum = endBlock.NumberU64() + 1 } if startNum > endNum { diff --git a/cmd/state/commands/check_change_sets.go b/cmd/state/commands/check_change_sets.go index acbaf1a50e..ea9d50cc6d 100644 --- a/cmd/state/commands/check_change_sets.go +++ b/cmd/state/commands/check_change_sets.go @@ -165,11 +165,11 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, sort.Sort(accountChanges) i := 0 match := true - err = changeset.Walk(historyTx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) { + err = changeset.ForPrefix(historyTx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error { c := accountChanges.Changes[i] if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) { i++ - return true, nil + return nil } match = false @@ -178,7 +178,7 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, fmt.Printf("%d: 0x%x: %x\n", i, k, v) fmt.Printf("Expected: ==========================\n") fmt.Printf("%d: 0x%x %x\n", i, c.Key, c.Value) - return false, nil + return nil }) if err != nil { return err @@ -197,18 +197,18 @@ func CheckChangeSets(genesis *core.Genesis, logger log.Logger, blockNum uint64, expectedStorageChanges = changeset.NewChangeSet() } sort.Sort(expectedStorageChanges) - err = changeset.Walk(historyTx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), 8*8, func(blockN uint64, k, v []byte) (bool, error) { + err = changeset.ForPrefix(historyTx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error { c := expectedStorageChanges.Changes[i] i++ if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) { - return false, nil + return nil } fmt.Printf("Unexpected storage changes in block %d\nIn the database: ======================\n", blockNum) fmt.Printf("0x%x: %x\n", k, v) fmt.Printf("Expected: ==========================\n") fmt.Printf("0x%x %x\n", c.Key, c.Value) - return true, fmt.Errorf("check change set failed") + return fmt.Errorf("check change set failed") }) if err != nil { return err diff --git a/cmd/state/verify/check_indexes.go b/cmd/state/verify/check_indexes.go index f11b3459d5..de9489ba91 100644 --- a/cmd/state/verify/check_indexes.go +++ b/cmd/state/verify/check_indexes.go @@ -24,7 +24,7 @@ func CheckIndex(ctx context.Context, chaindata string, changeSetBucket string, i startTime := time.Now() i := 0 - if err := changeset.Walk(tx, changeSetBucket, nil, 0, func(blockN uint64, k, v []byte) (bool, error) { + if err := changeset.ForEach(tx, changeSetBucket, nil, func(blockN uint64, k, v []byte) error { i++ if i%100_000 == 0 { fmt.Printf("Processed %dK, %s\n", blockN/1000, time.Since(startTime)) @@ -32,17 +32,17 @@ func CheckIndex(ctx context.Context, chaindata string, changeSetBucket string, i select { default: case <-ctx.Done(): - return false, ctx.Err() + return ctx.Err() } bm, innerErr := bitmapdb.Get64(tx, indexBucket, dbutils.CompositeKeyWithoutIncarnation(k), blockN-1, blockN+1) if innerErr != nil { - return false, innerErr + return innerErr } if !bm.Contains(blockN) { - return false, fmt.Errorf("%v,%v", blockN, common.Bytes2Hex(k)) + return fmt.Errorf("%v,%v", blockN, common.Bytes2Hex(k)) } - return true, nil + return nil }); err != nil { return err } diff --git a/common/changeset/account_changeset.go b/common/changeset/account_changeset.go index 57f3b1d2bf..98a2a77de4 100644 --- a/common/changeset/account_changeset.go +++ b/common/changeset/account_changeset.go @@ -56,14 +56,12 @@ func FindAccount(c kv.CursorDupSort, blockNumber uint64, key []byte) ([]byte, er } // GetModifiedAccounts returns a list of addresses that were modified in the block range +// [startNum:endNum) func GetModifiedAccounts(db kv.Tx, startNum, endNum uint64) ([]common.Address, error) { changedAddrs := make(map[common.Address]struct{}) - if err := Walk(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(startNum), 0, func(blockN uint64, k, v []byte) (bool, error) { - if blockN > endNum { - return false, nil - } + if err := ForRange(db, kv.AccountChangeSet, startNum, endNum, func(blockN uint64, k, v []byte) error { changedAddrs[common.BytesToAddress(k)] = struct{}{} - return true, nil + return nil }); err != nil { return nil, err } diff --git a/common/changeset/changeset.go b/common/changeset/changeset.go index 36799ebb16..e915c5fbee 100644 --- a/common/changeset/changeset.go +++ b/common/changeset/changeset.go @@ -144,14 +144,35 @@ func AvailableStorageFrom(tx kv.Tx) (uint64, error) { return binary.BigEndian.Uint64(k), nil } -func Walk(db kv.Tx, bucket string, startkey []byte, fixedbits int, walker func(blockN uint64, k, v []byte) (bool, error)) error { +// [from:to) +func ForRange(db kv.Tx, bucket string, from, to uint64, walker func(blockN uint64, k, v []byte) error) error { var blockN uint64 c, err := db.Cursor(bucket) if err != nil { return err } defer c.Close() - return ethdb.Walk(c, startkey, fixedbits, func(k, v []byte) (bool, error) { + return ethdb.Walk(c, dbutils.EncodeBlockNumber(from), 0, func(k, v []byte) (bool, error) { + blockN, k, v = FromDBFormat(k, v) + if blockN >= to { + return false, nil + } + if err := walker(blockN, k, v); err != nil { + return false, err + } + return true, nil + }) +} +func ForEach(db kv.Tx, bucket string, startkey []byte, walker func(blockN uint64, k, v []byte) error) error { + var blockN uint64 + return db.ForEach(bucket, startkey, func(k, v []byte) error { + blockN, k, v = FromDBFormat(k, v) + return walker(blockN, k, v) + }) +} +func ForPrefix(db kv.Tx, bucket string, startkey []byte, walker func(blockN uint64, k, v []byte) error) error { + var blockN uint64 + return db.ForPrefix(bucket, startkey, func(k, v []byte) error { blockN, k, v = FromDBFormat(k, v) return walker(blockN, k, v) }) diff --git a/common/changeset/storage_changeset.go b/common/changeset/storage_changeset.go index 29b42a3fc9..e979f52663 100644 --- a/common/changeset/storage_changeset.go +++ b/common/changeset/storage_changeset.go @@ -10,8 +10,6 @@ import ( "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/ethdb" ) const ( @@ -19,8 +17,7 @@ const ( ) var ( - ErrNotFound = errors.New("not found") - ErrFindValue = errors.New("find value error") + ErrNotFound = errors.New("not found") ) func NewStorageChangeSet() *ChangeSet { @@ -100,22 +97,13 @@ func RewindData(db kv.Tx, timestampSrc, timestampDst uint64, changes *etl.Collec } func walkAndCollect(collectorFunc func([]byte, []byte) error, db kv.Tx, bucket string, timestampDst, timestampSrc uint64, quit <-chan struct{}) error { - c, err := db.Cursor(bucket) - if err != nil { - return err - } - defer c.Close() - return ethdb.Walk(c, dbutils.EncodeBlockNumber(timestampDst), 0, func(dbKey, dbValue []byte) (bool, error) { + return ForRange(db, bucket, timestampDst, timestampSrc+1, func(_ uint64, k, v []byte) error { if err := libcommon.Stopped(quit); err != nil { - return false, err - } - timestamp, k, v := Mapper[bucket].Decode(dbKey, dbValue) - if timestamp > timestampSrc { - return false, nil + return err } if innerErr := collectorFunc(libcommon.Copy(k), libcommon.Copy(v)); innerErr != nil { - return false, innerErr + return innerErr } - return true, nil + return nil }) } diff --git a/core/state/history_test.go b/core/state/history_test.go index 4728b9a300..ef82c84cc6 100644 --- a/core/state/history_test.go +++ b/core/state/history_test.go @@ -45,9 +45,9 @@ func TestMutationDeleteTimestamp(t *testing.T) { } i := 0 - err := changeset.Walk(tx, kv.AccountChangeSet, nil, 0, func(blockN uint64, k, v []byte) (bool, error) { + err := changeset.ForEach(tx, kv.AccountChangeSet, nil, func(blockN uint64, k, v []byte) error { i++ - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -67,9 +67,9 @@ func TestMutationDeleteTimestamp(t *testing.T) { } count := 0 - err = changeset.Walk(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(1), 8*8, func(blockN uint64, k, v []byte) (bool, error) { + err = changeset.ForPrefix(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(1), func(blockN uint64, k, v []byte) error { count++ - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -147,11 +147,11 @@ func TestMutationCommit(t *testing.T) { } changeSetInDB := changeset.NewAccountChangeSet() - err := changeset.Walk(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(2), 8*8, func(_ uint64, k, v []byte) (bool, error) { + err := changeset.ForPrefix(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(2), func(_ uint64, k, v []byte) error { if err := changeSetInDB.Add(k, v); err != nil { - return false, err + return err } - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -179,11 +179,11 @@ func TestMutationCommit(t *testing.T) { } cs := changeset.NewStorageChangeSet() - err = changeset.Walk(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(2), 8*8, func(_ uint64, k, v []byte) (bool, error) { + err = changeset.ForPrefix(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(2), func(_ uint64, k, v []byte) error { if err2 := cs.Add(k, v); err2 != nil { - return false, err2 + return err2 } - return true, nil + return nil }) if err != nil { t.Fatal(err) diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go index c5084076f0..2d600d3bf8 100644 --- a/eth/stagedsync/stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -143,12 +143,9 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start, collectorUpdates := etl.NewCollector(cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) defer collectorUpdates.Close(logPrefix) - if err := changeset.Walk(tx, changesetBucket, dbutils.EncodeBlockNumber(start), 0, func(blockN uint64, k, v []byte) (bool, error) { - if blockN >= stop { - return false, nil - } + if err := changeset.ForRange(tx, changesetBucket, start, stop, func(blockN uint64, k, v []byte) error { if err := libcommon.Stopped(quit); err != nil { - return false, err + return err } k = dbutils.CompositeKeyWithoutIncarnation(k) @@ -162,7 +159,7 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start, case <-checkFlushEvery.C: if needFlush64(updates, cfg.bufLimit) { if err := flushBitmaps64(collectorUpdates, updates); err != nil { - return false, err + return err } updates = map[string]*roaring64.Bitmap{} } @@ -175,7 +172,7 @@ func promoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start, } m.Add(blockN) - return true, nil + return nil }); err != nil { return err } @@ -290,19 +287,19 @@ func unwindHistory(logPrefix string, db kv.RwTx, csBucket string, to uint64, cfg defer logEvery.Stop() updates := map[string]struct{}{} - if err := changeset.Walk(db, csBucket, dbutils.EncodeBlockNumber(to), 0, func(blockN uint64, k, v []byte) (bool, error) { + if err := changeset.ForEach(db, csBucket, dbutils.EncodeBlockNumber(to), func(blockN uint64, k, v []byte) error { select { case <-logEvery.C: var m runtime.MemStats runtime.ReadMemStats(&m) log.Info(fmt.Sprintf("[%s] Progress", logPrefix), "number", blockN, "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys)) case <-quitCh: - return false, libcommon.ErrStopped + return libcommon.ErrStopped default: } k = dbutils.CompositeKeyWithoutIncarnation(k) updates[string(k)] = struct{}{} - return true, nil + return nil }); err != nil { return err } @@ -422,14 +419,8 @@ func pruneHistoryIndex(tx kv.RwTx, csTable, logPrefix, tmpDir string, pruneTo ui collector := etl.NewCollector(tmpDir, etl.NewOldestEntryBuffer(etl.BufferOptimalSize)) defer collector.Close(logPrefix) - if err := changeset.Walk(tx, csTable, nil, 0, func(blockNum uint64, k, _ []byte) (bool, error) { - if blockNum >= pruneTo { - return false, nil - } - if err := collector.Collect(k, nil); err != nil { - return false, err - } - return true, nil + if err := changeset.ForRange(tx, csTable, 0, pruneTo, func(blockNum uint64, k, _ []byte) error { + return collector.Collect(k, nil) }); err != nil { return err } diff --git a/ethdb/bitmapdb/dbutils.go b/ethdb/bitmapdb/dbutils.go index db196718db..ca7a52fa2f 100644 --- a/ethdb/bitmapdb/dbutils.go +++ b/ethdb/bitmapdb/dbutils.go @@ -130,19 +130,21 @@ func Get(db kv.Tx, bucket string, key []byte, from, to uint32) (*roaring.Bitmap, return nil, err } defer c.Close() - if err := ethdb.Walk(c, fromKey, len(key)*8, func(k, v []byte) (bool, error) { - bm := roaring.New() - _, err := bm.ReadFrom(bytes.NewReader(v)) + for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() { if err != nil { - return false, err + return nil, err + } + if !bytes.HasPrefix(k, key) { + break + } + bm := roaring.New() + if _, err := bm.ReadFrom(bytes.NewReader(v)); err != nil { + return nil, err } chunks = append(chunks, bm) if binary.BigEndian.Uint32(k[len(k)-4:]) >= to { - return false, nil + break } - return true, nil - }); err != nil { - return nil, err } if len(chunks) == 0 { @@ -283,19 +285,22 @@ func Get64(db kv.Tx, bucket string, key []byte, from, to uint64) (*roaring64.Bit return nil, err } defer c.Close() - if err := ethdb.Walk(c, fromKey, len(key)*8, func(k, v []byte) (bool, error) { + for k, v, err := c.Seek(fromKey); k != nil; k, v, err = c.Next() { + if err != nil { + return nil, err + } + if !bytes.HasPrefix(k, key) { + break + } bm := roaring64.New() _, err := bm.ReadFrom(bytes.NewReader(v)) if err != nil { - return false, err + return nil, err } chunks = append(chunks, bm) if binary.BigEndian.Uint64(k[len(k)-8:]) >= to { - return false, nil + break } - return true, nil - }); err != nil { - return nil, err } if len(chunks) == 0 { diff --git a/ethdb/olddb/object_db.go b/ethdb/olddb/object_db.go index 37a5abc01f..63d84d11eb 100644 --- a/ethdb/olddb/object_db.go +++ b/ethdb/olddb/object_db.go @@ -143,17 +143,6 @@ func (db *ObjectDatabase) Last(bucket string) ([]byte, []byte, error) { return key, value, nil } -func (db *ObjectDatabase) Walk(bucket string, startkey []byte, fixedbits int, walker func(k, v []byte) (bool, error)) error { - err := db.kv.View(context.Background(), func(tx kv.Tx) error { - c, err := tx.Cursor(bucket) - if err != nil { - return err - } - return ethdb.Walk(c, startkey, fixedbits, walker) - }) - return err -} - func (db *ObjectDatabase) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { return db.kv.View(context.Background(), func(tx kv.Tx) error { return tx.ForEach(bucket, fromPrefix, walker) diff --git a/ethdb/olddb/tx_db.go b/ethdb/olddb/tx_db.go index b8cb58d6b3..737e166862 100644 --- a/ethdb/olddb/tx_db.go +++ b/ethdb/olddb/tx_db.go @@ -159,27 +159,6 @@ func (m *TxDb) BatchSize() int { return int(m.len) } -func (m *TxDb) Walk(bucket string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error { - // get cursor out of pool, then calls txDb.Put/Get/Delete on same bucket inside Walk callback - will not affect state of Walk - c, ok := m.cursors[bucket] - if ok { - delete(m.cursors, bucket) - } else { - var err error - c, err = m.tx.Cursor(bucket) - if err != nil { - return err - } - } - defer func() { // put cursor back to pool if can - if _, ok = m.cursors[bucket]; ok { - c.Close() - } else { - m.cursors[bucket] = c - } - }() - return ethdb.Walk(c, startkey, fixedbits, walker) -} func (m *TxDb) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { return m.tx.ForEach(bucket, fromPrefix, walker) } diff --git a/turbo/snapshotsync/snapshot_builder_test.go b/turbo/snapshotsync/snapshot_builder_test.go index 9911ef903f..41c508f32f 100644 --- a/turbo/snapshotsync/snapshot_builder_test.go +++ b/turbo/snapshotsync/snapshot_builder_test.go @@ -23,7 +23,6 @@ import ( "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/snapshotdb" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/log/v3" @@ -177,15 +176,13 @@ func TestSnapshotMigratorStageAsync(t *testing.T) { rotx, err := db.WriteDB().BeginRo(context.Background()) require.NoError(t, err) defer rotx.Rollback() - roc, err := rotx.Cursor(kv.Headers) - require.NoError(t, err) var headerNumber uint64 headerNumber = 11 - err = ethdb.Walk(roc, []byte{}, 0, func(k, v []byte) (bool, error) { + err = rotx.ForEach(kv.Headers, nil, func(k, v []byte) error { require.Equal(t, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)}), k) headerNumber++ - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -300,12 +297,10 @@ func TestSnapshotMigratorStageAsync(t *testing.T) { rotx, err = db.WriteDB().BeginRo(context.Background()) require.NoError(t, err) defer rotx.Rollback() - roc, err = rotx.Cursor(kv.Headers) - require.NoError(t, err) - err = ethdb.Walk(roc, []byte{}, 0, func(k, v []byte) (bool, error) { + err = rotx.ForEach(kv.Headers, nil, func(k, v []byte) error { t.Fatal("main db must be empty here", k) - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -492,15 +487,13 @@ func TestSnapshotMigratorStageSyncMode(t *testing.T) { rotx, err := db.WriteDB().BeginRo(context.Background()) require.NoError(t, err) defer rotx.Rollback() - roc, err := rotx.Cursor(kv.Headers) - require.NoError(t, err) var headerNumber uint64 headerNumber = 11 - err = ethdb.Walk(roc, []byte{}, 0, func(k, v []byte) (bool, error) { + err = rotx.ForEach(kv.Headers, nil, func(k, v []byte) error { require.Equal(t, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)}), k) headerNumber++ - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -513,17 +506,15 @@ func TestSnapshotMigratorStageSyncMode(t *testing.T) { snokv := db.HeadersSnapshot() snRoTx, err := snokv.BeginRo(context.Background()) require.NoError(t, err) - headersCursor, err := snRoTx.Cursor(kv.Headers) - require.NoError(t, err) headerNumber = 0 - err = ethdb.Walk(headersCursor, []byte{}, 0, func(k, v []byte) (bool, error) { + err = snRoTx.ForEach(kv.Headers, nil, func(k, v []byte) error { if !bytes.Equal(k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) { t.Fatal(k) } headerNumber++ - - return true, nil + return nil }) + snRoTx.Rollback() if err != nil { t.Fatal(err) @@ -827,14 +818,9 @@ func GenerateBodyData(tx kv.RwTx, from, to uint64) error { // check snapshot data based on GenerateBodyData func verifyBodiesSnapshot(t *testing.T, bodySnapshotTX kv.Tx, snapshotTo uint64) { t.Helper() - bodyCursor, err := bodySnapshotTX.Cursor(kv.BlockBody) - if err != nil { - t.Fatal(err) - } - defer bodyCursor.Close() var blockNum uint64 - err = ethdb.Walk(bodyCursor, []byte{}, 0, func(k, v []byte) (bool, error) { + err := bodySnapshotTX.ForEach(kv.BlockBody, nil, func(k, v []byte) error { //fmt.Println(common.Bytes2Hex(k)) if binary.BigEndian.Uint64(k[:8]) != blockNum { t.Fatal("incorrect block number", blockNum, binary.BigEndian.Uint64(k[:8]), common.Bytes2Hex(k)) @@ -843,7 +829,7 @@ func verifyBodiesSnapshot(t *testing.T, bodySnapshotTX kv.Tx, snapshotTo uint64) t.Fatal("block is not canonical", blockNum, common.Bytes2Hex(k)) } bfs := types.BodyForStorage{} - err = rlp.DecodeBytes(v, &bfs) + err := rlp.DecodeBytes(v, &bfs) if err != nil { t.Fatal(err, v) } @@ -861,7 +847,7 @@ func verifyBodiesSnapshot(t *testing.T, bodySnapshotTX kv.Tx, snapshotTo uint64) txNum++ } blockNum++ - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -874,14 +860,9 @@ func verifyBodiesSnapshot(t *testing.T, bodySnapshotTX kv.Tx, snapshotTo uint64) // check headers snapshot data based on GenerateBodyData func verifyHeadersSnapshot(t *testing.T, headersSnapshotTX kv.Tx, snapshotTo uint64) { t.Helper() - headersCursor, err := headersSnapshotTX.Cursor(kv.Headers) - if err != nil { - t.Fatal(err) - } - defer headersCursor.Close() var blockNum uint64 - err = ethdb.Walk(headersCursor, []byte{}, 0, func(k, v []byte) (bool, error) { + err := headersSnapshotTX.ForEach(kv.Headers, nil, func(k, v []byte) error { //fmt.Println(common.Bytes2Hex(k)) if binary.BigEndian.Uint64(k[:8]) != blockNum { t.Fatal("incorrect block number", blockNum, binary.BigEndian.Uint64(k[:8]), common.Bytes2Hex(k)) @@ -890,7 +871,7 @@ func verifyHeadersSnapshot(t *testing.T, headersSnapshotTX kv.Tx, snapshotTo uin t.Fatal("block is not canonical", blockNum, common.Bytes2Hex(k)) } blockNum++ - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -902,14 +883,9 @@ func verifyHeadersSnapshot(t *testing.T, headersSnapshotTX kv.Tx, snapshotTo uin func verifyFullBodiesData(t *testing.T, bodySnapshotTX kv.Tx, dataTo uint64) { t.Helper() - bodyCursor, err := bodySnapshotTX.Cursor(kv.BlockBody) - if err != nil { - t.Fatal(err) - } - defer bodyCursor.Close() var blockNum uint64 var numOfDuplicateBlocks uint8 - err = ethdb.Walk(bodyCursor, []byte{}, 0, func(k, v []byte) (bool, error) { + err := bodySnapshotTX.ForEach(kv.BlockBody, nil, func(k, v []byte) error { numOfDuplicateBlocks++ if binary.BigEndian.Uint64(k[:8]) != blockNum { t.Fatal("incorrect block number", blockNum, binary.BigEndian.Uint64(k[:8]), common.Bytes2Hex(k)) @@ -918,7 +894,7 @@ func verifyFullBodiesData(t *testing.T, bodySnapshotTX kv.Tx, dataTo uint64) { t.Fatal("incorrect block hash", blockNum, numOfDuplicateBlocks, common.Bytes2Hex(k)) } bfs := types.BodyForStorage{} - err = rlp.DecodeBytes(v, &bfs) + err := rlp.DecodeBytes(v, &bfs) if err != nil { t.Fatal(err, v) } @@ -949,7 +925,7 @@ func verifyFullBodiesData(t *testing.T, bodySnapshotTX kv.Tx, dataTo uint64) { numOfDuplicateBlocks = 0 } - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -968,7 +944,7 @@ func verifyPrunedBlocksData(t *testing.T, tx kv.Tx, dataFrom, dataTo, snapshotTx defer bodyCursor.Close() var blockNum uint64 var numOfDuplicateBlocks uint8 - err = ethdb.Walk(bodyCursor, []byte{}, 0, func(k, v []byte) (bool, error) { + err = tx.ForEach(kv.BlockBody, nil, func(k, v []byte) error { numOfDuplicateBlocks++ if binary.BigEndian.Uint64(k[:8]) != blockNum { t.Fatal("incorrect block number", blockNum, binary.BigEndian.Uint64(k[:8]), common.Bytes2Hex(k)) @@ -1009,7 +985,7 @@ func verifyPrunedBlocksData(t *testing.T, tx kv.Tx, dataFrom, dataTo, snapshotTx blockNum++ numOfDuplicateBlocks = 0 } - return true, nil + return nil }) if err != nil { t.Fatal(err) @@ -1262,13 +1238,9 @@ func TestPruneBlocks(t *testing.T) { } func PrintBodyBuckets(t *testing.T, tx kv.Tx) { //nolint: deadcode - bodyCursor, err := tx.Cursor(kv.BlockBody) - if err != nil { - t.Fatal(err) - } - err = ethdb.Walk(bodyCursor, []byte{}, 0, func(k, v []byte) (bool, error) { + err := tx.ForEach(kv.BlockBody, nil, func(k, v []byte) error { bfs := types.BodyForStorage{} - err = rlp.DecodeBytes(v, &bfs) + err := rlp.DecodeBytes(v, &bfs) if err != nil { t.Fatal(err, v) } @@ -1281,7 +1253,7 @@ func PrintBodyBuckets(t *testing.T, tx kv.Tx) { //nolint: deadcode for _, transaction := range transactions { fmt.Println("----", transaction.GetTo()) } - return true, nil + return nil }) if err != nil { t.Fatal(err) -- GitLab