From f06db2f37b06bf79b320b1a70fc26d54028bf15d Mon Sep 17 00:00:00 2001 From: ledgerwatch <akhounov@gmail.com> Date: Sat, 23 May 2020 10:19:56 +0100 Subject: [PATCH] Stages 6 and 7 for generating history indices (#569) * save state * add current index feature * fix test * remove logs * Only execute 1000 blocks * Reset history index * Correct action * Increase batch size * Increase chunk size, print memory stats * Fix linter * Remove unused from * Split into 2 staged * Use storage history gen * remove log * Not to run tx_cacher in staged mode * Not to recover during stage 2 * Not to recover during stage 2 * Remove counter Co-authored-by: b00ris <b00ris@mail.ru> --- cmd/geth/chaincmd.go | 2 +- cmd/hack/hack.go | 18 + cmd/state/commands/check_enc.go | 4 +- cmd/state/commands/check_index.go | 4 +- cmd/state/commands/regenerate_index.go | 4 +- .../regenerate_index.go | 6 +- cmd/state/stateless/state_snapshot.go | 2 +- .../{stats => verify}/check_changeset_enc.go | 2 +- .../{stateless => verify}/check_indexes.go | 2 +- common/dbutils/composite_keys.go | 22 - common/dbutils/history_index.go | 19 +- core/blockchain.go | 5 +- core/generate_index.go | 189 ++++++--- core/generate_index_test.go | 388 ++++++++++++++++++ core/tx_cacher.go | 10 +- eth/backend.go | 5 +- eth/backend_test.go | 4 +- eth/downloader/downloader.go | 6 +- eth/downloader/downloader_test.go | 2 +- eth/downloader/stagedsync_downloader.go | 48 ++- eth/downloader/stagedsync_stage_indexes.go | 83 ++++ .../stagedsync_stage_indexes_test.go | 15 + eth/downloader/stagedsync_stage_senders.go | 2 +- eth/downloader/stagedsync_stages.go | 2 + eth/downloader/stagedsync_test.go | 6 +- eth/handler.go | 6 +- ethdb/bolt_db.go | 3 +- migrations/migrations.go | 10 +- migrations/migrations_test.go | 12 +- 29 files changed, 745 insertions(+), 136 deletions(-) rename cmd/state/{stateless => generate}/regenerate_index.go (88%) rename cmd/state/{stats => verify}/check_changeset_enc.go (99%) rename cmd/state/{stateless => verify}/check_indexes.go (98%) create mode 100644 core/generate_index_test.go create mode 100644 eth/downloader/stagedsync_stage_indexes.go create mode 100644 eth/downloader/stagedsync_stage_indexes_test.go diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 89620cff36..422e2d0fab 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -412,7 +412,7 @@ func copyDb(ctx *cli.Context) error { if syncMode == downloader.FastSync { //syncBloom = trie.NewSyncBloom(uint64(ctx.GlobalInt(utils.CacheFlag.Name)/2), chainDb) } - dl := downloader.New(0, chainDb, syncBloom, new(event.TypeMux), chain, nil, nil) + dl := downloader.New(0, chainDb, syncBloom, new(event.TypeMux), chain, nil, nil, true) // Create a source peer to satisfy downloader requests from db, err := ethdb.NewBoltDatabase(ctx.Args().First()) diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index 687b2bbe98..a617f72d4c 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -2037,6 +2037,21 @@ func resetState(chaindata string) { fmt.Printf("Reset state done\n") } +func resetHistoryIndex(chaindata string) { + db, err := ethdb.NewBoltDatabase(chaindata) + check(err) + defer db.Close() + //nolint:errcheck + db.DeleteBucket(dbutils.AccountsHistoryBucket) + //nolint:errcheck + db.DeleteBucket(dbutils.StorageHistoryBucket) + err = downloader.SaveStageProgress(db, downloader.AccountHistoryIndex, 0) + check(err) + err = downloader.SaveStageProgress(db, downloader.StorageHistoryIndex, 0) + check(err) + fmt.Printf("Reset history index done\n") +} + type Receiver struct { defaultReceiver *trie.DefaultReceiver accountMap map[string]*accounts.Account @@ -2369,6 +2384,9 @@ func main() { if *action == "resetState" { resetState(*chaindata) } + if *action == "resetHistoryIndex" { + resetHistoryIndex(*chaindata) + } if *action == "getProof" { testGetProof(*chaindata, uint64(*block), common.HexToAddress(*account)) } diff --git a/cmd/state/commands/check_enc.go b/cmd/state/commands/check_enc.go index 5c19cfc4ae..ff65935d0d 100644 --- a/cmd/state/commands/check_enc.go +++ b/cmd/state/commands/check_enc.go @@ -1,7 +1,7 @@ package commands import ( - "github.com/ledgerwatch/turbo-geth/cmd/state/stats" + "github.com/ledgerwatch/turbo-geth/cmd/state/verify" "github.com/spf13/cobra" ) @@ -15,6 +15,6 @@ var checkEncCmd = &cobra.Command{ Use: "checkEnc", Short: "Check changesets Encoding", RunE: func(cmd *cobra.Command, args []string) error { - return stats.CheckEnc(chaindata) + return verify.CheckEnc(chaindata) }, } diff --git a/cmd/state/commands/check_index.go b/cmd/state/commands/check_index.go index fa1374d1e7..ab727b56cf 100644 --- a/cmd/state/commands/check_index.go +++ b/cmd/state/commands/check_index.go @@ -1,7 +1,7 @@ package commands import ( - "github.com/ledgerwatch/turbo-geth/cmd/state/stateless" + "github.com/ledgerwatch/turbo-geth/cmd/state/verify" "github.com/spf13/cobra" ) @@ -16,6 +16,6 @@ var checkIndexCMD = &cobra.Command{ Use: "checkIndex", Short: "Index checker", RunE: func(cmd *cobra.Command, args []string) error { - return stateless.CheckIndex(chaindata, []byte(changeSetBucket), []byte(indexBucket)) + return verify.CheckIndex(chaindata, []byte(changeSetBucket), []byte(indexBucket)) }, } diff --git a/cmd/state/commands/regenerate_index.go b/cmd/state/commands/regenerate_index.go index 779c3bfce0..2a5c9b0240 100644 --- a/cmd/state/commands/regenerate_index.go +++ b/cmd/state/commands/regenerate_index.go @@ -1,7 +1,7 @@ package commands import ( - "github.com/ledgerwatch/turbo-geth/cmd/state/stateless" + "github.com/ledgerwatch/turbo-geth/cmd/state/generate" "github.com/spf13/cobra" ) @@ -16,6 +16,6 @@ var regenerateIndexCmd = &cobra.Command{ Use: "regenerateIndex", Short: "Generate index for accounts/storage based on changesets", RunE: func(cmd *cobra.Command, args []string) error { - return stateless.RegenerateIndex(chaindata, []byte(indexBucket), []byte(changeSetBucket)) + return generate.RegenerateIndex(chaindata, []byte(indexBucket), []byte(changeSetBucket)) }, } diff --git a/cmd/state/stateless/regenerate_index.go b/cmd/state/generate/regenerate_index.go similarity index 88% rename from cmd/state/stateless/regenerate_index.go rename to cmd/state/generate/regenerate_index.go index 02d113f4a7..5901975f69 100644 --- a/cmd/state/stateless/regenerate_index.go +++ b/cmd/state/generate/regenerate_index.go @@ -1,4 +1,4 @@ -package stateless +package generate import ( "bytes" @@ -27,8 +27,8 @@ func RegenerateIndex(chaindata string, indexBucket []byte, csBucket []byte) erro } } - ig := core.NewIndexGenerator(db, csBucket, indexBucket, walker) - err = ig.GenerateIndex() + ig := core.NewIndexGenerator(db) + err = ig.GenerateIndex(0, csBucket, indexBucket, walker, nil) if err != nil { return err } diff --git a/cmd/state/stateless/state_snapshot.go b/cmd/state/stateless/state_snapshot.go index 4a1bc6f8f1..ea4ed6afa5 100644 --- a/cmd/state/stateless/state_snapshot.go +++ b/cmd/state/stateless/state_snapshot.go @@ -208,7 +208,7 @@ func loadSnapshot(db ethdb.Database, filename string, createDb CreateDbFunc) { err = copyDatabase(diskDb, db) check(err) - err = migrations.NewMigrator().Apply(diskDb, false, false, false, false, false) + err = migrations.NewMigrator().Apply(diskDb, false, false, false, false) check(err) } diff --git a/cmd/state/stats/check_changeset_enc.go b/cmd/state/verify/check_changeset_enc.go similarity index 99% rename from cmd/state/stats/check_changeset_enc.go rename to cmd/state/verify/check_changeset_enc.go index 3bd2954d11..78841e69a9 100644 --- a/cmd/state/stats/check_changeset_enc.go +++ b/cmd/state/verify/check_changeset_enc.go @@ -1,4 +1,4 @@ -package stats +package verify import ( "bytes" diff --git a/cmd/state/stateless/check_indexes.go b/cmd/state/verify/check_indexes.go similarity index 98% rename from cmd/state/stateless/check_indexes.go rename to cmd/state/verify/check_indexes.go index bf0da0e2be..e83ef76f36 100644 --- a/cmd/state/stateless/check_indexes.go +++ b/cmd/state/verify/check_indexes.go @@ -1,4 +1,4 @@ -package stateless +package verify import ( "bytes" diff --git a/common/dbutils/composite_keys.go b/common/dbutils/composite_keys.go index 7a9e20317d..cffc45b775 100644 --- a/common/dbutils/composite_keys.go +++ b/common/dbutils/composite_keys.go @@ -170,21 +170,6 @@ func ParseStoragePrefix(prefix []byte) (common.Hash, uint64) { return addrHash, inc } -func DecodeIncarnation(buf []byte) uint64 { - incarnation := binary.BigEndian.Uint64(buf) - return incarnation ^ ^uint64(0) -} - -func RemoveIncarnationFromKey(key []byte, buf *[]byte) { - tmp := *buf - if len(key) <= common.HashLength { - tmp = append(tmp, key...) - } else { - tmp = append(tmp, key[:common.HashLength]...) - tmp = append(tmp, key[common.HashLength+8:]...) - } - *buf = tmp -} // Key + blockNum func CompositeKeySuffix(key []byte, timestamp uint64) (composite, encodedTS []byte) { @@ -195,10 +180,3 @@ func CompositeKeySuffix(key []byte, timestamp uint64) (composite, encodedTS []by return composite, encodedTS } -// blockNum + history bucket -func CompositeChangeSetKey(encodedTS, hBucket []byte) []byte { - changeSetKey := make([]byte, len(encodedTS)+len(hBucket)) - copy(changeSetKey, encodedTS) - copy(changeSetKey[len(encodedTS):], hBucket) - return changeSetKey -} diff --git a/common/dbutils/history_index.go b/common/dbutils/history_index.go index 2965511e8b..35661d8365 100644 --- a/common/dbutils/history_index.go +++ b/common/dbutils/history_index.go @@ -48,7 +48,7 @@ func (hi HistoryIndexBytes) Decode() ([]uint64, []bool, error) { return numbers, sets, nil } -func (hi HistoryIndexBytes) Append(v uint64, s bool) HistoryIndexBytes { +func (hi HistoryIndexBytes) Append(v uint64, emptyValue bool) HistoryIndexBytes { if len(hi) < 8 { panic(fmt.Errorf("minimal length of index chunk is %d, got %d", 8, len(hi))) } @@ -67,7 +67,7 @@ func (hi HistoryIndexBytes) Append(v uint64, s bool) HistoryIndexBytes { panic(fmt.Errorf("item %d cannot be placed into the chunk with minElement %d", v, minElement)) } v -= minElement - if s { + if emptyValue { hi = append(hi, 0x80|byte(v>>16)) } else { hi = append(hi, byte(v>>16)) @@ -133,6 +133,7 @@ func (hi HistoryIndexBytes) Search(v uint64) (uint64, bool, bool) { func (hi HistoryIndexBytes) Key(key []byte) ([]byte, error) { blockNum, ok := hi.LastElement() if !ok { + fmt.Println(hi) return nil, errors.New("empty index") } return IndexChunkKey(key, blockNum), nil @@ -154,6 +155,10 @@ func (hi HistoryIndexBytes) LastElement() (uint64, bool) { return minElement + (uint64(hi[idx]&0x7f) << 16) + (uint64(hi[idx+1]) << 8) + uint64(hi[idx+2]), true } +func CurrentChunkKey(key []byte) []byte { + return IndexChunkKey(key, ^uint64(0)) +} + func IndexChunkKey(key []byte, blockNumber uint64) []byte { var blockNumBytes []byte // make([]byte, len(key)+8) switch len(key) { @@ -173,7 +178,15 @@ func IndexChunkKey(key []byte, blockNumber uint64) []byte { return blockNumBytes } - +func CompositeKeyWithoutIncarnation(key []byte) []byte { + if len(key) == common.HashLength*2+common.IncarnationLength { + kk := make([]byte, common.HashLength*2) + copy(kk, key[:common.HashLength]) + copy(kk[common.HashLength:], key[common.HashLength+common.IncarnationLength:]) + return kk + } + return key +} func IsIndexBucket(b []byte) bool { return bytes.Equal(b, AccountsHistoryBucket) || bytes.Equal(b, StorageHistoryBucket) } diff --git a/core/blockchain.go b/core/blockchain.go index d6ca2d2eaf..7a25fdf42a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1527,7 +1527,10 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, verif return 0, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) - senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + if execute { + InitTxCacher() + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + } // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex diff --git a/core/generate_index.go b/core/generate_index.go index cbfc821077..5b9b50b2de 100644 --- a/core/generate_index.go +++ b/core/generate_index.go @@ -1,9 +1,10 @@ package core import ( - "bytes" + "encoding/binary" + "errors" + "runtime" "sort" - "time" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" @@ -11,40 +12,29 @@ import ( "github.com/ledgerwatch/turbo-geth/log" ) -func NewIndexGenerator(db ethdb.Database, changeSetBucket []byte, indexBucket []byte, walkerAdapter func([]byte) ChangesetWalker) *IndexGenerator { - fixedBits := uint(common.HashLength) - if bytes.Equal(changeSetBucket, dbutils.StorageChangeSetBucket) { - fixedBits = common.HashLength*2 + common.IncarnationLength - } +func NewIndexGenerator(db ethdb.Database) *IndexGenerator { return &IndexGenerator{ - db: db, - csBucket: changeSetBucket, - bucketToWrite: indexBucket, - fixedBits: fixedBits, - csWalker: walkerAdapter, - cache: nil, + db: db, + cache: nil, } } type IndexGenerator struct { - db ethdb.Database - csBucket []byte - bucketToWrite []byte - fixedBits uint - csWalker func([]byte) ChangesetWalker - cache map[string][]IndexWithKey + db ethdb.Database + cache map[string][]IndexWithKey } type IndexWithKey struct { Val dbutils.HistoryIndexBytes } -func (ig *IndexGenerator) changeSetWalker(blockNum uint64) func([]byte, []byte) error { +func (ig *IndexGenerator) changeSetWalker(blockNum uint64, indexBucket []byte) func([]byte, []byte) error { return func(k, v []byte) error { - indexes, ok := ig.cache[string(k)] + cacheKey := k + indexes, ok := ig.cache[string(cacheKey)] if !ok || len(indexes) == 0 { - indexBytes, err := ig.db.GetIndexChunk(ig.bucketToWrite, k, blockNum) + indexBytes, err := ig.db.GetIndexChunk(indexBucket, k, blockNum) if err != nil && err != ethdb.ErrKeyNotFound { return err } @@ -61,83 +51,72 @@ func (ig *IndexGenerator) changeSetWalker(blockNum uint64) func([]byte, []byte) indexes = append(indexes, IndexWithKey{ Val: index, }) - ig.cache[string(k)] = indexes + ig.cache[string(cacheKey)] = indexes } lastIndex := indexes[len(indexes)-1] if dbutils.CheckNewIndexChunk(lastIndex.Val, blockNum) { lastIndex.Val = dbutils.NewHistoryIndex() indexes = append(indexes, lastIndex) - ig.cache[string(k)] = indexes + ig.cache[string(cacheKey)] = indexes } lastIndex.Val = lastIndex.Val.Append(blockNum, len(v) == 0) + indexes[len(indexes)-1] = lastIndex + ig.cache[string(cacheKey)] = indexes + return nil } } -/* -1) Ключ - Ð°Ð´Ñ€ÐµÑ ÐºÐ¾Ð½Ñ‚Ñ€Ð°Ñ‚Ð°/ключа + инвертированный номер первого блока в куÑке индекÑа -2) ПоиÑк - Walk(bucket, contractAddress+blocknum, fixedBits=32/64, walker{ - в k будет нужный индекÑ, еÑли он еÑть - return false, nil -}) -*/ -func (ig *IndexGenerator) GenerateIndex() error { - startTime := time.Now() - batchSize := ig.db.IdealBatchSize() * 3 - //addrHash - > index or addhash + inverted firshBlock for full chunk contracts +func (ig *IndexGenerator) GenerateIndex(from uint64, changeSetBucket []byte, indexBucket []byte, walkerAdapter func([]byte) ChangesetWalker, commitHook func(db ethdb.Database, blockNum uint64) error) error { + batchSize := 1000000 + //addrHash - > index or addhash + last block for full chunk contracts ig.cache = make(map[string][]IndexWithKey, batchSize) - //todo add truncate to all db - if bolt, ok := ig.db.(*ethdb.BoltDatabase); ok { - log.Warn("Remove bucket", "bucket", string(ig.bucketToWrite)) - err := bolt.DeleteBucket(ig.bucketToWrite) - if err != nil { - return err - } - } - + log.Info("Index generation started", "from", from) commit := func() error { tuples := make(ethdb.MultiPutTuples, 0, len(ig.cache)*3) for key, vals := range ig.cache { - for _, val := range vals { + for i, val := range vals { var ( chunkKey []byte err error ) - chunkKey, err = val.Val.Key([]byte(key)) - if err != nil { - return err + if i == len(vals)-1 { + chunkKey = dbutils.CurrentChunkKey([]byte(key)) + } else { + chunkKey, err = val.Val.Key([]byte(key)) + if err != nil { + return err + } } - tuples = append(tuples, ig.bucketToWrite, chunkKey, val.Val) + tuples = append(tuples, indexBucket, chunkKey, val.Val) } } sort.Sort(tuples) _, err := ig.db.MultiPut(tuples...) if err != nil { + log.Error("Unable to put index", "err", err) return err } ig.cache = make(map[string][]IndexWithKey, batchSize) return nil } - currentKey := []byte{} + var blockNum uint64 + currentKey := dbutils.EncodeTimestamp(from) for { stop := true - err := ig.db.Walk(ig.csBucket, currentKey, 0, func(k, v []byte) (b bool, e error) { - blockNum, _ := dbutils.DecodeTimestamp(k) - currentKey = common.CopyBytes(k) - err := ig.csWalker(v).Walk(ig.changeSetWalker(blockNum)) + err := ig.db.Walk(changeSetBucket, currentKey, 0, func(k, v []byte) (b bool, e error) { + blockNum, _ = dbutils.DecodeTimestamp(k) + + err := walkerAdapter(v).Walk(ig.changeSetWalker(blockNum, indexBucket)) if err != nil { return false, err } if len(ig.cache) > batchSize { - log.Info("Next chunk", - "blocknum", blockNum, - "time", time.Since(startTime), - "chunk size", len(ig.cache), - ) + currentKey = common.CopyBytes(k) stop = false return false, nil } @@ -149,21 +128,111 @@ func (ig *IndexGenerator) GenerateIndex() error { } if len(ig.cache) > 0 { + chunkSize := len(ig.cache) err = commit() if err != nil { return err } + var m runtime.MemStats + runtime.ReadMemStats(&m) + log.Info("Committed batch", "blocknum", blockNum, "chunk size", chunkSize, + "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) } + if commitHook != nil { + err = commitHook(ig.db, blockNum) + if err != nil { + return err + } + } + if stop { break } } - log.Info("Generation index finished", "bucket", string(ig.bucketToWrite)) + log.Info("Generation index finished", "bucket", string(indexBucket)) return nil } +func (ig *IndexGenerator) Truncate(timestampTo uint64, changeSetBucket []byte, indexBucket []byte, walkerAdapter func([]byte) ChangesetWalker) error { + currentKey := dbutils.EncodeTimestamp(timestampTo) + keys := make(map[string]struct{}) + err := ig.db.Walk(changeSetBucket, currentKey, 0, func(k, v []byte) (b bool, e error) { + currentKey = common.CopyBytes(k) + err := walkerAdapter(v).Walk(func(kk []byte, _ []byte) error { + keys[string(kk)] = struct{}{} + return nil + }) + if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + return err + } + + accountHistoryEffects := make(map[string][]byte) + var startKey = make([]byte, common.HashLength+8) + + for key := range keys { + key := common.CopyBytes([]byte(key)) + copy(startKey, key) + + binary.BigEndian.PutUint64(startKey[common.HashLength:], timestampTo) + if err := ig.db.Walk(indexBucket, startKey, 8*common.HashLength, func(k, v []byte) (bool, error) { + timestamp := binary.BigEndian.Uint64(k[common.HashLength:]) // the last timestamp in the chunk + kStr := string(common.CopyBytes(k)) + //fmt.Println("Truncate", common.Bytes2Hex(k), timestamp, timestampTo) + if timestamp > timestampTo { + accountHistoryEffects[kStr] = nil + // truncate the chunk + index := dbutils.WrapHistoryIndex(v) + index = index.TruncateGreater(timestampTo) + if len(index) > 8 { // If the chunk is empty after truncation, it gets simply deleted + // Truncated chunk becomes "the last chunk" with the timestamp 0xffff....ffff + lastK, err := index.Key(key) + if err != nil { + return false, err + } + accountHistoryEffects[string(lastK)] = common.CopyBytes(index) + } + } + return true, nil + }); err != nil { + return err + } + } + + for key, value := range accountHistoryEffects { + if value == nil { + //fmt.Println("drop", common.Bytes2Hex([]byte(key)), binary.BigEndian.Uint64([]byte(key)[common.HashLength:])) + if err := ig.db.Delete(indexBucket, []byte(key)); err != nil { + return err + } + } else { + //fmt.Println("write", common.Bytes2Hex([]byte(key)), binary.BigEndian.Uint64([]byte(key)[common.HashLength:])) + if err := ig.db.Put(indexBucket, []byte(key), value); err != nil { + return err + } + } + } + return nil +} + +func (ig *IndexGenerator) DropIndex(bucket []byte) error { + //todo add truncate to all db + if bolt, ok := ig.db.(*ethdb.BoltDatabase); ok { + log.Warn("Remove bucket", "bucket", string(bucket)) + err := bolt.DeleteBucket(bucket) + if err != nil { + return err + } + } + return errors.New("imposible to drop") +} + type ChangesetWalker interface { Walk(func([]byte, []byte) error) error } diff --git a/core/generate_index_test.go b/core/generate_index_test.go new file mode 100644 index 0000000000..895c2b919e --- /dev/null +++ b/core/generate_index_test.go @@ -0,0 +1,388 @@ +package core + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/changeset" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/crypto" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" + "os" + "reflect" + "sort" + "strconv" + "testing" +) + +func TestIndexGenerator_GenerateIndex_SimpleCase(t *testing.T) { + db := ethdb.NewMemDatabase() + + ig := NewIndexGenerator(db) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + hashes, expecedIndexes, err := generateTestData(t, db) + if err != nil { + t.Fatal(err) + } + + err = ig.GenerateIndex(0, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + }, nil) + if err != nil { + t.Fatal(err) + } + + checkIndex(t, db, hashes[0].Bytes(), 0, expecedIndexes[hashes[0]][0]) + checkIndex(t, db, hashes[0].Bytes(), 999, expecedIndexes[hashes[0]][0]) + checkIndex(t, db, hashes[0].Bytes(), 1000, expecedIndexes[hashes[0]][1]) + checkIndex(t, db, hashes[0].Bytes(), 1999, expecedIndexes[hashes[0]][1]) + checkIndex(t, db, hashes[0].Bytes(), 2000, expecedIndexes[hashes[0]][2]) + checkIndex(t, db, hashes[1].Bytes(), 0, expecedIndexes[hashes[1]][0]) + checkIndex(t, db, hashes[1].Bytes(), 2000, expecedIndexes[hashes[1]][1]) + checkIndex(t, db, hashes[2].Bytes(), 0, expecedIndexes[hashes[2]][0]) + + //check last chunk + lastChunkCheck(t, db, hashes[0].Bytes(), expecedIndexes[hashes[0]][2]) + lastChunkCheck(t, db, hashes[1].Bytes(), expecedIndexes[hashes[1]][1]) + lastChunkCheck(t, db, hashes[2].Bytes(), expecedIndexes[hashes[2]][0]) + +} + +func TestIndexGenerator_Truncate(t *testing.T) { + //don't run it parallel + db := ethdb.NewMemDatabase() + hashes, expected, err := generateTestData(t, db) + if err != nil { + t.Fatal(err) + } + + ig := NewIndexGenerator(db) + err = ig.GenerateIndex(0, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + }, nil) + if err != nil { + t.Fatal(err) + } + + reduceSlice := func(arr []uint64, timestamtTo uint64) []uint64 { + pos := sort.Search(len(arr), func(i int) bool { + return arr[i] > timestamtTo + }) + return arr[:pos] + } + + t.Run("truncate to 2050", func(t *testing.T) { + expected[hashes[0]][2] = reduceSlice(expected[hashes[0]][2], 2050) + expected[hashes[1]][1] = reduceSlice(expected[hashes[1]][1], 2050) + expected[hashes[2]][0] = reduceSlice(expected[hashes[2]][0], 2050) + + err = ig.Truncate(2050, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + }) + if err != nil { + t.Fatal(err) + } + + checkIndex(t, db, hashes[0].Bytes(), 2030, expected[hashes[0]][2]) + checkIndex(t, db, hashes[1].Bytes(), 2030, expected[hashes[1]][1]) + checkIndex(t, db, hashes[2].Bytes(), 2030, expected[hashes[2]][0]) + checkIndex(t, db, hashes[0].Bytes(), 1999, expected[hashes[0]][1]) + checkIndex(t, db, hashes[1].Bytes(), 999, expected[hashes[1]][0]) + }) + + t.Run("truncate to 2000", func(t *testing.T) { + expected[hashes[0]][2] = reduceSlice(expected[hashes[0]][2], 2000) + expected[hashes[1]][1] = reduceSlice(expected[hashes[1]][1], 2000) + expected[hashes[2]][0] = reduceSlice(expected[hashes[2]][0], 2000) + + err = ig.Truncate(2000, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + }) + if err != nil { + t.Fatal(err) + } + + checkIndex(t, db, hashes[0].Bytes(), 2000, expected[hashes[0]][2]) + checkIndex(t, db, hashes[1].Bytes(), 2000, expected[hashes[1]][1]) + checkIndex(t, db, hashes[2].Bytes(), expected[hashes[2]][0][len(expected[hashes[2]][0])-1], expected[hashes[2]][0]) + }) + + t.Run("truncate to 1999", func(t *testing.T) { + err = ig.Truncate(1999, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + }) + if err != nil { + t.Fatal(err) + } + checkIndex(t, db, hashes[0].Bytes(), 1999, expected[hashes[0]][1]) + checkIndex(t, db, hashes[1].Bytes(), 1998, expected[hashes[1]][0]) + checkIndex(t, db, hashes[2].Bytes(), 1998, expected[hashes[2]][0]) + _, err = db.GetIndexChunk(dbutils.AccountsHistoryBucket, hashes[0].Bytes(), 2000) + if err != ethdb.ErrKeyNotFound { + t.Fatal() + } + _, err = db.GetIndexChunk(dbutils.AccountsHistoryBucket, hashes[1].Bytes(), 2000) + if err != ethdb.ErrKeyNotFound { + t.Fatal() + } + }) + + t.Run("truncate to 999", func(t *testing.T) { + expected[hashes[1]][0] = reduceSlice(expected[hashes[1]][0], 999) + expected[hashes[2]][0] = reduceSlice(expected[hashes[2]][0], 999) + + err = ig.Truncate(999, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + }) + if err != nil { + t.Fatal(err) + } + checkIndex(t, db, hashes[0].Bytes(), 999, expected[hashes[0]][0]) + checkIndex(t, db, hashes[1].Bytes(), 998, expected[hashes[1]][0]) + checkIndex(t, db, hashes[2].Bytes(), 999, expected[hashes[2]][0]) + _, err = db.GetIndexChunk(dbutils.AccountsHistoryBucket, hashes[0].Bytes(), 1000) + if err != ethdb.ErrKeyNotFound { + t.Fatal() + } + _, err = db.GetIndexChunk(dbutils.AccountsHistoryBucket, hashes[1].Bytes(), 1000) + if err != ethdb.ErrKeyNotFound { + t.Fatal() + } + }) +} + +func TestIndexGenerator_GenerateIndexStorage(t *testing.T) { + db := ethdb.NewMemDatabase() + key1, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + addrHash1 := crypto.PubkeyToAddress(key1.PublicKey).Hash() + key2, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + + addrHash2 := crypto.PubkeyToAddress(key2.PublicKey).Hash() + key3, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + addrHash3 := crypto.PubkeyToAddress(key3.PublicKey).Hash() + + compositeKey1 := dbutils.GenerateCompositeStorageKey(addrHash1, ^uint64(1), common.Hash{111}) + compositeKey2 := dbutils.GenerateCompositeStorageKey(addrHash2, ^uint64(2), common.Hash{111}) + compositeKey3 := dbutils.GenerateCompositeStorageKey(addrHash3, ^uint64(3), common.Hash{111}) + expected11 := make([]uint64, 0) + expected12 := make([]uint64, 0) + expected13 := make([]uint64, 0) + expected21 := make([]uint64, 0) + expected22 := make([]uint64, 0) + expected3 := make([]uint64, 0) + const numOfBlocks = 2100 + + for i := 0; i < numOfBlocks; i++ { + cs := changeset.NewStorageChangeSet() + err = cs.Add(compositeKey1, []byte(strconv.Itoa(i))) + if err != nil { + t.Fatal(err) + } + + if i < 1000 { + expected11 = append(expected11, uint64(i)) + } else if i < 2000 { + expected12 = append(expected12, uint64(i)) + } else { + expected13 = append(expected13, uint64(i)) + } + + if i%2 == 0 { + err = cs.Add(compositeKey2, []byte(strconv.Itoa(i))) + if err != nil { + t.Fatal(err) + } + + if i < 2000 { + expected21 = append(expected21, uint64(i)) + } else { + expected22 = append(expected22, uint64(i)) + } + } + if i%3 == 0 { + err = cs.Add(compositeKey3, []byte(strconv.Itoa(i))) + if err != nil { + t.Fatal(err) + } + expected3 = append(expected3, uint64(i)) + } + v, innerErr := changeset.EncodeStorage(cs) + if innerErr != nil { + t.Fatal(innerErr) + } + err = db.Put(dbutils.StorageChangeSetBucket, dbutils.EncodeTimestamp(uint64(i)), v) + if err != nil { + t.Fatal(err) + } + } + + ig := NewIndexGenerator(db) + err = ig.GenerateIndex(0, dbutils.StorageChangeSetBucket, dbutils.StorageHistoryBucket, func(bytes []byte) ChangesetWalker { + return changeset.StorageChangeSetBytes(bytes) + }, nil) + if err != nil { + t.Fatal(err) + } + + check := func(compositeKey []byte, chunkBlock uint64, expected []uint64) { + t.Helper() + b, err := db.GetIndexChunk(dbutils.StorageHistoryBucket, compositeKey, chunkBlock) + if err != nil { + t.Fatal(err) + } + val, _, err := dbutils.HistoryIndexBytes(b).Decode() + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(val, expected) { + fmt.Println(val) + fmt.Println(expected) + t.Fatal() + } + } + + check(compositeKey1, 0, expected11) + check(compositeKey1, 999, expected11) + check(compositeKey1, 1000, expected12) + check(compositeKey1, 1999, expected12) + check(compositeKey1, 2000, expected13) + check(compositeKey2, 0, expected21) + check(compositeKey2, 2000, expected22) + check(compositeKey3, 0, expected3) +} + +func generateTestData(t *testing.T, db ethdb.Database) ([]common.Hash, map[common.Hash][][]uint64, error) { //nolint + key1, err := crypto.GenerateKey() + if err != nil { + return nil, nil, err + } + addrHash1 := crypto.PubkeyToAddress(key1.PublicKey).Hash() + key2, err := crypto.GenerateKey() + if err != nil { + return nil, nil, err + } + + addrHash2 := crypto.PubkeyToAddress(key2.PublicKey).Hash() + key3, err := crypto.GenerateKey() + if err != nil { + return nil, nil, err + } + + addrHash3 := crypto.PubkeyToAddress(key3.PublicKey).Hash() + + expected11 := make([]uint64, 0) + expected12 := make([]uint64, 0) + expected13 := make([]uint64, 0) + expected21 := make([]uint64, 0) + expected22 := make([]uint64, 0) + expected3 := make([]uint64, 0) + const numOfBlocks = 2100 + + for i := 0; i < numOfBlocks; i++ { + cs := changeset.NewAccountChangeSet() + err = cs.Add(addrHash1.Bytes(), []byte(strconv.Itoa(i))) + if err != nil { + return nil, nil, err + } + + if i < 1000 { + expected11 = append(expected11, uint64(i)) + } else if i < 2000 { + expected12 = append(expected12, uint64(i)) + } else { + expected13 = append(expected13, uint64(i)) + } + + if i%2 == 0 { + err = cs.Add(addrHash2.Bytes(), []byte(strconv.Itoa(i))) + if err != nil { + return nil, nil, err + } + + if i < 2000 { + expected21 = append(expected21, uint64(i)) + } else { + expected22 = append(expected22, uint64(i)) + } + } + if i%3 == 0 { + err = cs.Add(addrHash3.Bytes(), []byte(strconv.Itoa(i))) + if err != nil { + return nil, nil, err + } + expected3 = append(expected3, uint64(i)) + } + v, err := changeset.EncodeAccounts(cs) + if err != nil { + t.Fatal(err) + } + err = db.Put(dbutils.AccountChangeSetBucket, dbutils.EncodeTimestamp(uint64(i)), v) + if err != nil { + t.Fatal(err) + } + } + return []common.Hash{addrHash1, addrHash2, addrHash3}, map[common.Hash][][]uint64{ + addrHash1: {expected11, expected12, expected13}, + addrHash2: {expected21, expected22}, + addrHash3: {expected3}, + }, nil +} + +func checkIndex(t *testing.T, db ethdb.Database, addrHash []byte, chunkBlock uint64, expected []uint64) { + t.Helper() + b, err := db.GetIndexChunk(dbutils.AccountsHistoryBucket, addrHash, chunkBlock) + if err != nil { + t.Fatal(err) + } + val, _, err := dbutils.HistoryIndexBytes(b).Decode() + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(val, expected) { + fmt.Println(val) + fmt.Println(expected) + t.Fatal() + } +} + +func lastChunkCheck(t *testing.T, db ethdb.Database, key []byte, expected []uint64) { + t.Helper() + v, err := db.Get(dbutils.AccountsHistoryBucket, dbutils.CurrentChunkKey(key)) + if err != nil { + t.Fatal(err) + } + + val, _, err := dbutils.HistoryIndexBytes(v).Decode() + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(val, expected) { + fmt.Println(val) + fmt.Println(expected) + t.Fatal() + } +} +func debugIndexes(db ethdb.Database, bucket []byte) { //nolint + l := common.HashLength + if bytes.Equal(dbutils.StorageHistoryBucket, bucket) { + l = common.HashLength * 2 + } + db.Walk(bucket, []byte{}, 0, func(k []byte, v []byte) (bool, error) { //nolint + fmt.Println(common.Bytes2Hex(k), binary.BigEndian.Uint64(k[l:])) + return true, nil + }) +} diff --git a/core/tx_cacher.go b/core/tx_cacher.go index b1bab32235..4b5cc921ab 100644 --- a/core/tx_cacher.go +++ b/core/tx_cacher.go @@ -18,12 +18,20 @@ package core import ( "runtime" + "sync" "github.com/ledgerwatch/turbo-geth/core/types" ) // senderCacher is a concurrent transaction sender recoverer and cacher. -var senderCacher = newTxSenderCacher(runtime.NumCPU()) +var senderCacher *txSenderCacher +var once sync.Once + +func InitTxCacher() { + once.Do(func() { + senderCacher = newTxSenderCacher(runtime.NumCPU()) + }) +} // txSenderCacherRequest is a request for recovering transaction senders with a // specific signature scheme and caching it into the transactions themselves. diff --git a/eth/backend.go b/eth/backend.go index 36e1dd64d5..1600d0629e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -193,7 +193,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { return nil, err } - sm, err := getStorageModeFromDB(chainDb) + sm, err := GetStorageModeFromDB(chainDb) if err != nil { return nil, err } @@ -207,7 +207,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { config.StorageMode.Receipts, config.StorageMode.TxIndex, config.StorageMode.Preimages, - true, ) if err != nil { return nil, err @@ -700,7 +699,7 @@ func setModeOnEmpty(db ethdb.Database, key []byte, currentValue bool) error { return nil } -func getStorageModeFromDB(db ethdb.Database) (StorageMode, error) { +func GetStorageModeFromDB(db ethdb.Database) (StorageMode, error) { var ( sm StorageMode v []byte diff --git a/eth/backend_test.go b/eth/backend_test.go index 35bfc1113e..c8c8019073 100644 --- a/eth/backend_test.go +++ b/eth/backend_test.go @@ -10,7 +10,7 @@ import ( func TestSetStorageModeIfNotExist(t *testing.T) { db := ethdb.NewMemDatabase() - sm, err := getStorageModeFromDB(db) + sm, err := GetStorageModeFromDB(db) if err != nil { t.Fatal(err) } @@ -29,7 +29,7 @@ func TestSetStorageModeIfNotExist(t *testing.T) { t.Fatal(err) } - sm, err = getStorageModeFromDB(db) + sm, err = GetStorageModeFromDB(db) if err != nil { t.Fatal(err) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ab043909d4..59a241c2fe 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -155,6 +155,8 @@ type Downloader struct { bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) + // generate history index, disable/enable pruning + history bool } // LightChain encapsulates functions required to synchronise a light chain. @@ -231,7 +233,7 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, history bool) *Downloader { if lightchain == nil { lightchain = chain } @@ -253,6 +255,8 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, receiptWakeCh: make(chan bool, 1), headerProcCh: make(chan []*types.Header, 1), quitCh: make(chan struct{}), + //generate index, disable/enable pruning + history: history, } go dl.qosTuner() return dl diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c293a568a3..f75a66cf9f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -88,7 +88,7 @@ func newTester() *downloadTester { } tester.stateDb = ethdb.NewMemDatabase() tester.stateDb.Put(nil, testGenesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(uint64(FullSync), tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer) + tester.downloader = New(uint64(FullSync), tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer, false) return tester } diff --git a/eth/downloader/stagedsync_downloader.go b/eth/downloader/stagedsync_downloader.go index b4e3442b51..9484ba4bb6 100644 --- a/eth/downloader/stagedsync_downloader.go +++ b/eth/downloader/stagedsync_downloader.go @@ -8,7 +8,8 @@ import ( ) func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers []func() error) error { - log.Info("Sync stage 1/5. Downloading headers...") + fmt.Println("doStagedSyncWithFetchers") + log.Info("Sync stage 1/7. Downloading headers...") var err error @@ -19,7 +20,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers return err } - log.Info("Sync stage 1/5. Downloading headers... Complete!") + log.Info("Sync stage 1/7. Downloading headers... Complete!") log.Info("Checking for unwinding...") // Check unwinds backwards and if they are outstanding, invoke corresponding functions for stage := Finish - 1; stage > Headers; stage-- { @@ -41,6 +42,10 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers if !core.UsePlainStateExecution { err = d.unwindHashCheckStage(unwindPoint) } + case AccountHistoryIndex: + err = unwindAccountHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution) + case StorageHistoryIndex: + err = unwindStorageHistoryIndex(unwindPoint, d.stateDB, core.UsePlainStateExecution) default: return fmt.Errorf("unrecognized stage for unwinding: %d", stage) } @@ -49,7 +54,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers } } log.Info("Checking for unwinding... Complete!") - log.Info("Sync stage 2/5. Downloading block bodies...") + log.Info("Sync stage 2/7. Downloading block bodies...") /* * Stage 2. Download Block bodies @@ -63,19 +68,19 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers return err } - log.Info("Sync stage 2/5. Downloading block bodies... Complete!") + log.Info("Sync stage 2/7. Downloading block bodies... Complete!") /* * Stage 3. Recover senders from tx signatures */ - log.Info("Sync stage 3/5. Recovering senders from tx signatures...") + log.Info("Sync stage 3/7. Recovering senders from tx signatures...") err = d.spawnRecoverSendersStage() if err != nil { return err } - log.Info("Sync stage 3/5. Recovering senders from tx signatures... Complete!") - log.Info("Sync stage 4/5. Executing blocks w/o hash checks...") + log.Info("Sync stage 3/6. Recovering senders from tx signatures... Complete!") + log.Info("Sync stage 4/6. Executing blocks w/o hash checks...") /* * Stage 4. Execute block bodies w/o calculating trie roots @@ -87,16 +92,39 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers return err } - log.Info("Sync stage 4/5. Executing blocks w/o hash checks... Complete!") + log.Info("Sync stage 4/7. Executing blocks w/o hash checks... Complete!") // Further stages go there - log.Info("Sync stage 5/5. Validating final hash") + log.Info("Sync stage 5/7. Validating final hash") if !core.UsePlainStateExecution { if err = d.spawnCheckFinalHashStage(syncHeadNumber); err != nil { return err } } - log.Info("Sync stage 5/5. Validating final hash... Complete!") + + log.Info("Sync stage 5/7. Validating final hash... Complete!") + + if d.history { + log.Info("Sync stage 6/7. Generating account history index") + err = spawnAccountHistoryIndex(d.stateDB, core.UsePlainStateExecution) + if err != nil { + return err + } + log.Info("Sync stage 6/7. Generating account history index... Complete!") + } else { + log.Info("Sync stage 6/7, generating account history index is disabled. Enable by adding `h` to --storage-mode") + } + + if d.history { + log.Info("Sync stage 7/7. Generating storage history index") + err = spawnStorageHistoryIndex(d.stateDB, core.UsePlainStateExecution) + if err != nil { + return err + } + log.Info("Sync stage 7/7. Generating storage history index... Complete!") + } else { + log.Info("Sync stage 7/7, generating storage history index is disabled. Enable by adding `h` to --storage-mode") + } return err } diff --git a/eth/downloader/stagedsync_stage_indexes.go b/eth/downloader/stagedsync_stage_indexes.go new file mode 100644 index 0000000000..f1e03034c6 --- /dev/null +++ b/eth/downloader/stagedsync_stage_indexes.go @@ -0,0 +1,83 @@ +package downloader + +import ( + "bytes" + "fmt" + "github.com/ledgerwatch/turbo-geth/common/changeset" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" +) + +func spawnAccountHistoryIndex(db ethdb.Database, plainState bool) error { + lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex) + if err != nil { + return err + } + if plainState { + log.Info("Skipped account index generation for plain state") + return nil + } + ig := core.NewIndexGenerator(db) + if err := ig.GenerateIndex(lastProcessedBlockNumber, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, walkerFactory(dbutils.AccountChangeSetBucket, plainState), func(innerDB ethdb.Database, blockNum uint64) error { + return SaveStageProgress(innerDB, AccountHistoryIndex, blockNum) + }); err != nil { + fmt.Println("AccountChangeSetBucket, err", err) + return err + } + return nil +} + +func spawnStorageHistoryIndex(db ethdb.Database, plainState bool) error { + lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex) + if err != nil { + return err + } + if plainState { + log.Info("Skipped storaged index generation for plain state") + return nil + } + ig := core.NewIndexGenerator(db) + if err := ig.GenerateIndex(lastProcessedBlockNumber, dbutils.StorageChangeSetBucket, dbutils.StorageHistoryBucket, walkerFactory(dbutils.StorageChangeSetBucket, plainState), func(innerDB ethdb.Database, blockNum uint64) error { + return SaveStageProgress(innerDB, StorageHistoryIndex, blockNum) + }); err != nil { + fmt.Println("StorageChangeSetBucket, err", err) + return err + } + return nil +} + +func unwindAccountHistoryIndex(unwindPoint uint64, db ethdb.Database, plainState bool) error { + ig := core.NewIndexGenerator(db) + return ig.Truncate(unwindPoint, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, walkerFactory(dbutils.AccountChangeSetBucket, plainState)) +} + +func unwindStorageHistoryIndex(unwindPoint uint64, db ethdb.Database, plainState bool) error { + ig := core.NewIndexGenerator(db) + return ig.Truncate(unwindPoint, dbutils.StorageChangeSetBucket, dbutils.StorageHistoryBucket, walkerFactory(dbutils.StorageChangeSetBucket, plainState)) +} + +func walkerFactory(csBucket []byte, plainState bool) func(bytes []byte) core.ChangesetWalker { + switch { + case bytes.Equal(csBucket, dbutils.AccountChangeSetBucket) && !plainState: + return func(bytes []byte) core.ChangesetWalker { + return changeset.AccountChangeSetBytes(bytes) + } + case bytes.Equal(csBucket, dbutils.AccountChangeSetBucket) && plainState: + return func(bytes []byte) core.ChangesetWalker { + return changeset.AccountChangeSetPlainBytes(bytes) + } + case bytes.Equal(csBucket, dbutils.StorageChangeSetBucket) && !plainState: + return func(bytes []byte) core.ChangesetWalker { + return changeset.StorageChangeSetBytes(bytes) + } + case bytes.Equal(csBucket, dbutils.StorageChangeSetBucket) && plainState: + return func(bytes []byte) core.ChangesetWalker { + return changeset.StorageChangeSetPlainBytes(bytes) + } + default: + log.Error("incorrect bucket", "bucket", string(csBucket), "plainState", plainState) + panic("incorrect bucket " + string(csBucket)) + } +} diff --git a/eth/downloader/stagedsync_stage_indexes_test.go b/eth/downloader/stagedsync_stage_indexes_test.go new file mode 100644 index 0000000000..e7e68948b4 --- /dev/null +++ b/eth/downloader/stagedsync_stage_indexes_test.go @@ -0,0 +1,15 @@ +package downloader + +import ( + "testing" +) + +func TestName(t *testing.T) { + tester := newStagedSyncTester(true) + if err := tester.newPeer("peer", 65, testChainForkLightA); err != nil { + t.Fatal(err) + } + if err := tester.sync("peer", nil); err != nil { + t.Fatal(err) + } +} diff --git a/eth/downloader/stagedsync_stage_senders.go b/eth/downloader/stagedsync_stage_senders.go index df804b8237..8d3277f8a2 100644 --- a/eth/downloader/stagedsync_stage_senders.go +++ b/eth/downloader/stagedsync_stage_senders.go @@ -51,7 +51,7 @@ func (d *Downloader) spawnRecoverSendersStage() error { emptyHash := common.Hash{} var blockNumber big.Int - const batchSize = 10000 + const batchSize = 1000 jobs := make(chan *senderRecoveryJob, batchSize) out := make(chan *senderRecoveryJob, batchSize) diff --git a/eth/downloader/stagedsync_stages.go b/eth/downloader/stagedsync_stages.go index 83446b9b52..cda48815e3 100644 --- a/eth/downloader/stagedsync_stages.go +++ b/eth/downloader/stagedsync_stages.go @@ -33,6 +33,8 @@ const ( Senders // "From" recovered from signatures, bodies re-written Execution // Executing each block w/o buildinf a trie HashCheck // Checking the root hash + AccountHistoryIndex // Generating history index for accounts + StorageHistoryIndex // Generating history index for storage Finish // Nominal stage after all other stages ) diff --git a/eth/downloader/stagedsync_test.go b/eth/downloader/stagedsync_test.go index 080dbe0557..65c149812d 100644 --- a/eth/downloader/stagedsync_test.go +++ b/eth/downloader/stagedsync_test.go @@ -30,7 +30,7 @@ type stagedSyncTester struct { lock sync.RWMutex } -func newStagedSyncTester() *stagedSyncTester { +func newStagedSyncTester(history bool) *stagedSyncTester { tester := &stagedSyncTester{ peers: make(map[string]*stagedSyncTesterPeer), genesis: testGenesis, @@ -41,7 +41,7 @@ func newStagedSyncTester() *stagedSyncTester { rawdb.WriteTd(tester.db, tester.genesis.Hash(), tester.genesis.NumberU64(), tester.genesis.Difficulty()) rawdb.WriteBlock(context.Background(), tester.db, testGenesis) tester.currentHeader = tester.genesis.Header() - tester.downloader = New(uint64(StagedSync), tester.db, trie.NewSyncBloom(1, tester.db), new(event.TypeMux), tester, nil, tester.dropPeer) + tester.downloader = New(uint64(StagedSync), tester.db, trie.NewSyncBloom(1, tester.db), new(event.TypeMux), tester, nil, tester.dropPeer, history) return tester } @@ -319,7 +319,7 @@ func (stp *stagedSyncTesterPeer) RequestReceipts(hashes []common.Hash) error { } func TestUnwind(t *testing.T) { - tester := newStagedSyncTester() + tester := newStagedSyncTester(false) if err := tester.newPeer("peer", 65, testChainForkLightA); err != nil { t.Fatal(err) } diff --git a/eth/handler.go b/eth/handler.go index ebdbde0975..98bc7384d4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -163,8 +163,12 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh } func initPm(manager *ProtocolManager, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) { + sm, err := GetStorageModeFromDB(chaindb) + if err != nil { + log.Error("Get storage mode", "err", err) + } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(manager.checkpointNumber, chaindb, nil /*stateBloom */, manager.eventMux, blockchain, nil, manager.removePeer) + manager.downloader = downloader.New(manager.checkpointNumber, chaindb, nil /*stateBloom */, manager.eventMux, blockchain, nil, manager.removePeer, sm.History) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/ethdb/bolt_db.go b/ethdb/bolt_db.go index 816a34433f..fbf2bbd32d 100644 --- a/ethdb/bolt_db.go +++ b/ethdb/bolt_db.go @@ -200,14 +200,13 @@ func (db *BoltDatabase) Get(bucket, key []byte) ([]byte, error) { // GetIndexChunk returns proper index chunk or return error if index is not created. // key must contain inverted block number in the end func (db *BoltDatabase) GetIndexChunk(bucket, key []byte, timestamp uint64) ([]byte, error) { - var dat []byte err := db.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(bucket) if b != nil { c := b.Cursor() k, v := c.Seek(dbutils.IndexChunkKey(key, timestamp)) - if !bytes.HasPrefix(k, key) { + if !bytes.HasPrefix(k, dbutils.CompositeKeyWithoutIncarnation(key)) { return ErrKeyNotFound } dat = make([]byte, len(v)) diff --git a/migrations/migrations.go b/migrations/migrations.go index 82129773f0..75cee8b1f9 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -8,7 +8,7 @@ import ( type Migration struct { Name string - Up func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error + Up func(db ethdb.Database, history, receipts, txIndex, preImages bool) error } func NewMigrator() *Migrator { @@ -21,7 +21,7 @@ type Migrator struct { Migrations []Migration } -func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { +func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages bool) error { if len(m.Migrations) == 0 { return nil } @@ -41,7 +41,7 @@ func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImage m.Migrations = m.Migrations[i+1:] for _, v := range m.Migrations { log.Warn("Apply migration", "name", v.Name) - err := v.Up(db, history, receipts, txIndex, preImages, thinHistory) + err := v.Up(db, history, receipts, txIndex, preImages) if err != nil { return err } @@ -54,6 +54,4 @@ func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImage return nil } -var migrations = []Migration{ - -} +var migrations = []Migration{} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index cb4c2c7849..51f07db188 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -11,13 +11,13 @@ func TestApplyWithInit(t *testing.T) { migrations = []Migration{ { "one", - func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { return nil }, }, { "two", - func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { return nil }, }, @@ -25,7 +25,7 @@ func TestApplyWithInit(t *testing.T) { migrator := NewMigrator() migrator.Migrations = migrations - err := migrator.Apply(db, false, false, false, false, false) + err := migrator.Apply(db, false, false, false, false) if err != nil { t.Fatal() } @@ -43,14 +43,14 @@ func TestApplyWithoutInit(t *testing.T) { migrations = []Migration{ { "one", - func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { t.Fatal("shouldn't been executed") return nil }, }, { "two", - func(db ethdb.Database, history, receipts, txIndex, preImages, thinHistory bool) error { + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { return nil }, }, @@ -62,7 +62,7 @@ func TestApplyWithoutInit(t *testing.T) { migrator := NewMigrator() migrator.Migrations = migrations - err = migrator.Apply(db, false, false, false, false, false) + err = migrator.Apply(db, false, false, false, false) if err != nil { t.Fatal() } -- GitLab