diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 3c455f2969aa569f058588aa534a38ce5eec8c68..206f725d8692d902a4be482578960b18b7f0ced3 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -6,6 +6,7 @@ import ( "os" "path" "sync" + "text/tabwriter" "time" "github.com/ledgerwatch/lmdb-go/lmdb" @@ -161,11 +162,14 @@ func resetTxLookup(db *ethdb.ObjectDatabase) error { func printStages(db *ethdb.ObjectDatabase) error { var err error var progress uint64 + w := new(tabwriter.Writer) + defer w.Flush() + w.Init(os.Stdout, 8, 8, 0, '\t', 0) for stage := stages.SyncStage(0); stage < stages.Finish; stage++ { if progress, _, err = stages.GetStageProgress(db, stage); err != nil { return err } - fmt.Printf("Stage: %d, progress: %d\n", stage, progress) + fmt.Fprintf(w, "%s \t %d\n", string(stages.DBKeys[stage]), progress) } return nil } diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go index 579e3192da5707443333c1d7857e0dfdda6ad871..9f9ae2d337bb17bb533484d230d03dd9eb8735c6 100644 --- a/cmd/integration/commands/root.go +++ b/cmd/integration/commands/root.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "github.com/ledgerwatch/turbo-geth/cmd/utils" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/internal/debug" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/migrations" "github.com/spf13/cobra" "os" "os/signal" @@ -19,6 +21,14 @@ var rootCmd = &cobra.Command{ if err := debug.SetupCobra(cmd); err != nil { panic(err) } + + if len(chaindata) > 0 { + db := ethdb.MustOpen(chaindata) + defer db.Close() + if err := migrations.NewMigrator().Apply(db, ""); err != nil { + panic(err) + } + } }, PersistentPostRun: func(cmd *cobra.Command, args []string) { debug.Exit() diff --git a/cmd/pics/state.go b/cmd/pics/state.go index 879ad5536273929306ccc16c0848d7dbcd07996e..4fac949e3095fe58df9a3ad736caf801ef2da48d 100644 --- a/cmd/pics/state.go +++ b/cmd/pics/state.go @@ -116,9 +116,6 @@ var bucketLabels = map[string]string{ string(dbutils.HeaderPrefix): "Headers", string(dbutils.ConfigPrefix): "Config", string(dbutils.BlockBodyPrefix): "Block Bodies", - string(dbutils.HeadHeaderKey): "Last Header", - string(dbutils.HeadFastBlockKey): "Last Fast", - string(dbutils.HeadBlockKey): "Last Block", string(dbutils.HeaderNumberPrefix): "Header Numbers", string(dbutils.AccountChangeSetBucket): "Account Change Sets", string(dbutils.StorageChangeSetBucket): "Storage Change Sets", diff --git a/cmd/state/stateless/state_snapshot.go b/cmd/state/stateless/state_snapshot.go index a7315dc425a4034a10319e3c2a502c14920c923a..66ebeb301f62e931c7d8f12ce3e1b6c79075d98d 100644 --- a/cmd/state/stateless/state_snapshot.go +++ b/cmd/state/stateless/state_snapshot.go @@ -117,7 +117,7 @@ func loadSnapshot(db ethdb.Database, filename string, createDb CreateDbFunc) { err = copyDatabase(diskDb, db) check(err) - err = migrations.NewMigrator().Apply(diskDb, false, false, false, false) + err = migrations.NewMigrator().Apply(diskDb, "") check(err) } diff --git a/common/dbutils/bucket.go b/common/dbutils/bucket.go index 5fe68fc755bff03530f39b9568d6daaba652208f..6e4324ed2fd2036441aadf7ed1de1cb6eb9022f6 100644 --- a/common/dbutils/bucket.go +++ b/common/dbutils/bucket.go @@ -8,7 +8,7 @@ import ( "github.com/ledgerwatch/turbo-geth/metrics" ) -// The fields below define the low level database schema prefixing. +// Buckets var ( // "Plain State". The same as CurrentStateBucket, but the keys arent' hashed. @@ -91,18 +91,6 @@ var ( // databaseVerisionKey tracks the current database version. DatabaseVerisionKey = []byte("DatabaseVersion") - // headHeaderKey tracks the latest know header's hash. - HeadHeaderKey = []byte("LastHeader") - - // headBlockKey tracks the latest know full block's hash. - HeadBlockKey = []byte("LastBlock") - - // headFastBlockKey tracks the latest known incomplete block's hash during fast sync. - HeadFastBlockKey = []byte("LastFast") - - // fastTrieProgressKey tracks the number of trie entries imported during fast sync. - FastTrieProgressKey = []byte("TrieSync") - // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). HeaderPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header HeaderTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td @@ -119,39 +107,55 @@ var ( ConfigPrefix = []byte("ethereum-config-") // config prefix for the db // Chain index prefixes (use `i` + single byte to avoid mixing data types). - BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress - BloomBitsIndexPrefixShead = []byte("iBshead") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress + BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress - PreimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) - PreimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) + // Progress of sync stages: stageName -> stageData + SyncStageProgress = []byte("SSP2") + SyncStageProgressOld1 = []byte("SSP") + // Position to where to unwind sync stages: stageName -> stageData + SyncStageUnwind = []byte("SSU2") + SyncStageUnwindOld1 = []byte("SSU") + + CliqueBucket = []byte("clique-") + // this bucket stored in separated database + InodesBucket = []byte("inodes") + + // Transaction senders - stored separately from the block bodies + Senders = []byte("txSenders") + + // fastTrieProgressKey tracks the number of trie entries imported during fast sync. + FastTrieProgressKey = []byte("TrieSync") + // headBlockKey tracks the latest know full block's hash. + HeadBlockKey = []byte("LastBlock") + // headFastBlockKey tracks the latest known incomplete block's hash during fast sync. + HeadFastBlockKey = []byte("LastFast") + // headHeaderKey tracks the latest know header's hash. + HeadHeaderKey = []byte("LastHeader") + + // migrationName -> serialized SyncStageProgress and SyncStageUnwind buckets + // it stores stages progress to understand in which context was executed migration + // in case of bug-report developer can ask content of this bucket + Migrations = []byte("migrations") +) + +// Keys +var ( // last block that was pruned // it's saved one in 5 minutes LastPrunedBlockKey = []byte("LastPrunedBlock") - - // LastAppliedMigration keep the name of tle last applied migration. - LastAppliedMigration = []byte("lastAppliedMigration") - //StorageModeHistory - does node save history. StorageModeHistory = []byte("smHistory") //StorageModeReceipts - does node save receipts. StorageModeReceipts = []byte("smReceipts") //StorageModeTxIndex - does node save transactions index. StorageModeTxIndex = []byte("smTxIndex") - //StorageModeIntermediateTrieHash - does IntermediateTrieHash feature enabled - StorageModeIntermediateTrieHash = []byte("smIntermediateTrieHash") - - // Progress of sync stages - SyncStageProgress = []byte("SSP") - // Position to where to unwind sync stages - SyncStageUnwind = []byte("SSU") - CliqueBucket = []byte("clique-") - - // this bucket stored in separated database - InodesBucket = []byte("inodes") +) - // Transaction senders - stored separately from the block bodies - Senders = []byte("txSenders") +// Metrics +var ( + PreimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) + PreimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) ) // Buckets - list of all buckets. App will panic if some bucket is not in this list. @@ -167,13 +171,7 @@ var Buckets = [][]byte{ StorageChangeSetBucket, IntermediateTrieHashBucket, DatabaseVerisionKey, - HeadHeaderKey, - HeadBlockKey, - HeadFastBlockKey, - FastTrieProgressKey, HeaderPrefix, - HeaderTDSuffix, - HeaderHashSuffix, HeaderNumberPrefix, BlockBodyPrefix, BlockReceiptsPrefix, @@ -182,14 +180,8 @@ var Buckets = [][]byte{ PreimagePrefix, ConfigPrefix, BloomBitsIndexPrefix, - BloomBitsIndexPrefixShead, - LastPrunedBlockKey, DatabaseInfoBucket, IncarnationMapBucket, - LastAppliedMigration, - StorageModeHistory, - StorageModeReceipts, - StorageModeTxIndex, CliqueBucket, SyncStageProgress, SyncStageUnwind, @@ -199,6 +191,17 @@ var Buckets = [][]byte{ PlainStorageChangeSetBucket, InodesBucket, Senders, + FastTrieProgressKey, + HeadBlockKey, + HeadFastBlockKey, + HeadHeaderKey, + Migrations, +} + +// DeprecatedBuckets - list of buckets which can be programmatically deleted - for example after migration +var DeprecatedBuckets = [][]byte{ + SyncStageProgressOld1, + SyncStageUnwindOld1, } var BucketsCfg = map[string]*BucketConfigItem{} @@ -236,26 +239,32 @@ func init() { }) for i := range Buckets { - BucketsCfg[string(Buckets[i])] = &BucketConfigItem{ID: i} - - for _, cfg := range dupSortConfig { - if cfg.ID != i { - continue - } - bucketCfg, ok := BucketsCfg[string(cfg.Bucket)] - if !ok { - continue - } - cfg.FromLen = bucketCfg.DupFromLen - cfg.ToLen = bucketCfg.DupToLen - - if bytes.Equal(cfg.Bucket, CurrentStateBucket) { - bucketCfg.IsDupsort = debug.IsHashedStateDupsortEnabled() - } - if bytes.Equal(cfg.Bucket, PlainStateBucket) { - bucketCfg.IsDupsort = debug.IsPlainStateDupsortEnabled() - } + BucketsCfg[string(Buckets[i])] = createBucketConfig(i, Buckets[i]) + } + + for i := range DeprecatedBuckets { + BucketsCfg[string(DeprecatedBuckets[i])] = createBucketConfig(len(Buckets)+i, DeprecatedBuckets[i]) + } +} + +func createBucketConfig(id int, name []byte) *BucketConfigItem { + cfg := &BucketConfigItem{ID: id} + + for _, dupCfg := range dupSortConfig { + if !bytes.Equal(dupCfg.Bucket, name) { + continue + } + + cfg.DupFromLen = dupCfg.FromLen + cfg.DupToLen = dupCfg.ToLen + + if bytes.Equal(dupCfg.Bucket, CurrentStateBucket) { + cfg.IsDupsort = debug.IsHashedStateDupsortEnabled() + } + if bytes.Equal(dupCfg.Bucket, PlainStateBucket) { + cfg.IsDupsort = debug.IsPlainStateDupsortEnabled() } } + return cfg } diff --git a/common/etl/etl.go b/common/etl/etl.go index a28de66ed069dcd7a1925d5da234f2eff6afc7f0..c1b5f98801784cf34c917edf99d9056fb5d6c09d 100644 --- a/common/etl/etl.go +++ b/common/etl/etl.go @@ -53,7 +53,7 @@ func NextKey(key []byte) ([]byte, error) { // loaded from files into a DB // * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) // * `isDone`: true, if everything is processed -type LoadCommitHandler func(ethdb.Putter, []byte, bool) error +type LoadCommitHandler func(db ethdb.Putter, key []byte, isDone bool) error type TransformArgs struct { ExtractStartKey []byte diff --git a/core/pruner.go b/core/pruner.go index 7457a140221205e31d609d449c825d57a0cb6d65..358e4f6a99e5fef797d86023cf91c9a107e7610c 100644 --- a/core/pruner.go +++ b/core/pruner.go @@ -125,7 +125,7 @@ func (p *BasicPruner) Stop() { } func (p *BasicPruner) ReadLastPrunedBlockNum() uint64 { - data, _ := p.db.Get(dbutils.LastPrunedBlockKey, dbutils.LastPrunedBlockKey) + data, _ := p.db.Get(dbutils.DatabaseInfoBucket, dbutils.LastPrunedBlockKey) if len(data) == 0 { return 0 } @@ -136,7 +136,7 @@ func (p *BasicPruner) ReadLastPrunedBlockNum() uint64 { func (p *BasicPruner) WriteLastPrunedBlockNum(num uint64) { b := make([]byte, 8) binary.LittleEndian.PutUint64(b, num) - if err := p.db.Put(dbutils.LastPrunedBlockKey, dbutils.LastPrunedBlockKey, b); err != nil { + if err := p.db.Put(dbutils.DatabaseInfoBucket, dbutils.LastPrunedBlockKey, b); err != nil { log.Crit("Failed to store last pruned block's num", "err", err) } } diff --git a/core/state/history.go b/core/state/history.go index 7b641d3969969671836abd88547922ce6a8fa3ea..cf2e7ba47e8c18eb497f00734ad2edd96bfa5179 100644 --- a/core/state/history.go +++ b/core/state/history.go @@ -127,7 +127,7 @@ func FindByHistory(tx ethdb.Tx, plain, storage bool, key []byte, timestamp uint6 var lastChangesetBlock, lastIndexBlock uint64 stageBucket := tx.Bucket(dbutils.SyncStageProgress) if stageBucket != nil { - v1, err1 := stageBucket.Get([]byte{byte(stages.Execution)}) + v1, err1 := stageBucket.Get(stages.DBKeys[stages.Execution]) if err1 != nil && !errors.Is(err1, ethdb.ErrKeyNotFound) { return nil, err1 } @@ -135,9 +135,9 @@ func FindByHistory(tx ethdb.Tx, plain, storage bool, key []byte, timestamp uint6 lastChangesetBlock = binary.BigEndian.Uint64(v1[:8]) } if storage { - v1, err1 = stageBucket.Get([]byte{byte(stages.AccountHistoryIndex)}) + v1, err1 = stageBucket.Get(stages.DBKeys[stages.AccountHistoryIndex]) } else { - v1, err1 = stageBucket.Get([]byte{byte(stages.StorageHistoryIndex)}) + v1, err1 = stageBucket.Get(stages.DBKeys[stages.StorageHistoryIndex]) } if err1 != nil && !errors.Is(err1, ethdb.ErrKeyNotFound) { return nil, err1 @@ -559,14 +559,14 @@ func returnCorrectWalker(bucket, hBucket []byte) func(v []byte) changeset.Walker func getIndexGenerationProgress(tx ethdb.Tx, stage stages.SyncStage) (generatedTo uint64, executedTo uint64, err error) { b := tx.Bucket(dbutils.SyncStageProgress) - v, err := b.Get([]byte{byte(stage)}) + v, err := b.Get(stages.DBKeys[stage]) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, 0, err } if len(v) >= 8 { generatedTo = binary.BigEndian.Uint64(v[:8]) } - v, err = b.Get([]byte{byte(stages.Execution)}) + v, err = b.Get(stages.DBKeys[stages.Execution]) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, 0, err } diff --git a/core/state/history_test.go b/core/state/history_test.go index 391097374cd82de61e6c490f6bb0815d5ecf3883..88255030db7448a42b35f32525a36037fa5d9e6f 100644 --- a/core/state/history_test.go +++ b/core/state/history_test.go @@ -1376,12 +1376,12 @@ func TestWalkAsOfStateHashed_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.StorageHistoryIndex)}, b) + err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.StorageHistoryIndex], b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) + err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) if err != nil { t.Fatal(err) } @@ -1531,12 +1531,12 @@ func TestWalkAsOfStatePlain_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.StorageHistoryIndex)}, b) + err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.StorageHistoryIndex], b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) + err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) if err != nil { t.Fatal(err) } @@ -1686,12 +1686,12 @@ func TestWalkAsOfAccountHashed_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.AccountHistoryIndex)}, b) + err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.AccountHistoryIndex], b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) + err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) if err != nil { t.Fatal(err) } @@ -1849,12 +1849,12 @@ func TestWalkAsOfAccountPlain_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.AccountHistoryIndex)}, b) + err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.AccountHistoryIndex], b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) + err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) if err != nil { t.Fatal(err) } diff --git a/eth/backend.go b/eth/backend.go index 9c4300348dd932f1ff64af6d94bbb0c06e6f2feb..8b6b48125f0fa8c2fec2714298898d05a62e15eb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -47,6 +47,7 @@ import ( "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/internal/ethapi" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/migrations" "github.com/ledgerwatch/turbo-geth/miner" "github.com/ledgerwatch/turbo-geth/node" "github.com/ledgerwatch/turbo-geth/p2p" @@ -154,10 +155,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } } if ctx.Config.PrivateApiAddr != "" { - //remotedbserver.StartDeprecated(chainDb.KV(), ctx.Config.PrivateApiAddr) remotedbserver.StartGrpc(chainDb.KV(), ctx.Config.PrivateApiAddr) } + err = migrations.NewMigrator().Apply(chainDb, ctx.Config.DataDir) + if err != nil { + return nil, err + } + chainConfig, genesisHash, _, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis, config.StorageMode.History, false /* overwrite */) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index adb8c9b9a3f8bd9cb5bc4e235e0bd000c32cc9bf..7322767628b59d72152cec7c917d57c20b5687a6 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -43,9 +43,23 @@ const ( Finish // Nominal stage after all other stages ) +var DBKeys = map[SyncStage][]byte{ + Headers: []byte("Headers"), + Bodies: []byte("Bodies"), + Senders: []byte("Senders"), + Execution: []byte("Execution"), + IntermediateHashes: []byte("IntermediateHashes"), + HashState: []byte("HashState"), + AccountHistoryIndex: []byte("AccountHistoryIndex"), + StorageHistoryIndex: []byte("StorageHistoryIndex"), + TxLookup: []byte("TxLookup"), + TxPool: []byte("TxPool"), + Finish: []byte("Finish"), +} + // GetStageProgress retrieves saved progress of given sync stage from the database func GetStageProgress(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) { - v, err := db.Get(dbutils.SyncStageProgress, []byte{byte(stage)}) + v, err := db.Get(dbutils.SyncStageProgress, DBKeys[stage]) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, nil, err } @@ -54,14 +68,14 @@ func GetStageProgress(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) // SaveStageProgress saves the progress of the given stage in the database func SaveStageProgress(db ethdb.Putter, stage SyncStage, progress uint64, stageData []byte) error { - return db.Put(dbutils.SyncStageProgress, []byte{byte(stage)}, marshalData(progress, stageData)) + return db.Put(dbutils.SyncStageProgress, DBKeys[stage], marshalData(progress, stageData)) } // GetStageUnwind retrieves the invalidation for the given stage // Invalidation means that that stage needs to rollback to the invalidation // point and be redone func GetStageUnwind(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) { - v, err := db.Get(dbutils.SyncStageUnwind, []byte{byte(stage)}) + v, err := db.Get(dbutils.SyncStageUnwind, DBKeys[stage]) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, nil, err } @@ -70,7 +84,7 @@ func GetStageUnwind(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) { // SaveStageUnwind saves the progress of the given stage in the database func SaveStageUnwind(db ethdb.Putter, stage SyncStage, invalidation uint64, stageData []byte) error { - return db.Put(dbutils.SyncStageUnwind, []byte{byte(stage)}, marshalData(invalidation, stageData)) + return db.Put(dbutils.SyncStageUnwind, DBKeys[stage], marshalData(invalidation, stageData)) } func marshalData(blockNumber uint64, stageData []byte) []byte { diff --git a/ethdb/interface.go b/ethdb/interface.go index 5a618dbf4bb3dfacfcb42c2527b8a77ebc6936e3..b2a15f9c5768e3c4a5e82623c52c412f8ead68f5 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -47,7 +47,7 @@ type Getter interface { // Only the keys whose first fixedbits match those of startkey are iterated over. // walker is called for each eligible entry. // If walker returns false or an error, the walk stops. - Walk(bucket, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error + Walk(bucket, startkey []byte, fixedbits int, walker func(k, v []byte) (bool, error)) error // MultiWalk is similar to multiple Walk calls folded into one. MultiWalk(bucket []byte, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error @@ -116,7 +116,8 @@ type HasNetInterface interface { } type NonTransactional interface { - ClearBuckets(buckets ...[]byte) error + ClearBuckets(buckets ...[]byte) error // makes them empty + DropBuckets(buckets ...[]byte) error // drops them, use of them after drop will panic } var errNotSupported = errors.New("not supported") diff --git a/ethdb/kv_abstract.go b/ethdb/kv_abstract.go index 4eb8272acbca189addb22e375c50c86b9568eb78..82fdcbda4a6f2bbbcdc6b33800e0ee5ecf279e4e 100644 --- a/ethdb/kv_abstract.go +++ b/ethdb/kv_abstract.go @@ -11,6 +11,8 @@ type KV interface { Begin(ctx context.Context, writable bool) (Tx, error) IdealBatchSize() int + DropBuckets(buckets ...[]byte) error + CreateBuckets(buckets ...[]byte) error } type Tx interface { @@ -27,7 +29,6 @@ type Bucket interface { Cursor() Cursor Size() (uint64, error) - Clear() error } type Cursor interface { diff --git a/ethdb/kv_badger.go b/ethdb/kv_badger.go index 16962950e5dda9fe385c63afb503d9418e109ef3..f14f3bcd31122cc3b60912cf7aac97dfd7ec35d7 100644 --- a/ethdb/kv_badger.go +++ b/ethdb/kv_badger.go @@ -127,6 +127,14 @@ func (db *badgerKV) vlogGCLoop(ctx context.Context, gcTicker *time.Ticker) { } } +func (db *badgerKV) DropBuckets(buckets ...[]byte) error { + panic("not implemented") +} + +func (db *badgerKV) CreateBuckets(buckets ...[]byte) error { + panic("not implemented") +} + // Close closes BoltKV // All transactions must be closed before closing the database. func (db *badgerKV) Close() { diff --git a/ethdb/kv_bolt.go b/ethdb/kv_bolt.go index 507f17df866418e0414ed1661d24362de80b5f79..4ec00d8e047ac896f647a3fc1ca9dedd6d625adc 100644 --- a/ethdb/kv_bolt.go +++ b/ethdb/kv_bolt.go @@ -215,6 +215,14 @@ func NewBolt() boltOpts { return o } +func (db *BoltKV) DropBuckets(buckets ...[]byte) error { + panic("not implemented") +} + +func (db *BoltKV) CreateBuckets(buckets ...[]byte) error { + panic("not implemented") +} + // Close closes BoltKV // All transactions must be closed before closing the database. func (db *BoltKV) Close() { diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index 5a2b4be4772758adeaf33e6e8c5f17fc0582f26f..4bd43936b0cf19b28085b0a59afe72c9e143869e 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -87,7 +87,7 @@ func (opts lmdbOpts) Open() (KV, error) { wg: &sync.WaitGroup{}, } - db.buckets = make([]lmdb.DBI, len(dbutils.Buckets)) + db.buckets = make([]lmdb.DBI, len(dbutils.Buckets)+len(dbutils.DeprecatedBuckets)) if opts.readOnly { if err := env.View(func(tx *lmdb.Txn) error { for _, name := range dbutils.Buckets { @@ -102,16 +102,23 @@ func (opts lmdbOpts) Open() (KV, error) { return nil, err } } else { - if err := env.Update(func(tx *lmdb.Txn) error { - for id := range dbutils.Buckets { - if err := createBucket(tx, db, id); err != nil { - return err - } - } - return nil - }); err != nil { + if err := db.CreateBuckets(dbutils.Buckets...); err != nil { return nil, err } + // don't create deprecated buckets + } + + if err := env.View(func(tx *lmdb.Txn) error { + for _, name := range dbutils.DeprecatedBuckets { + dbi, createErr := tx.OpenDBI(string(name), 0) + if createErr == nil { + continue // if deprecated bucket couldn't be open - then it's deleted and it's fine + } + db.buckets[dbutils.BucketsCfg[string(name)].ID] = dbi + } + return nil + }); err != nil { + return nil, err } if !opts.inMem { @@ -125,21 +132,6 @@ func (opts lmdbOpts) Open() (KV, error) { return db, nil } -func createBucket(tx *lmdb.Txn, db *LmdbKV, id int) error { - var flags uint = lmdb.Create - name := string(dbutils.Buckets[id]) - cfg := dbutils.BucketsCfg[name] - if cfg.IsDupsort { - flags |= lmdb.DupSort - } - dbi, err := tx.OpenDBI(name, flags) - if err != nil { - return err - } - db.buckets[id] = dbi - return nil -} - func (opts lmdbOpts) MustOpen() KV { db, err := opts.Open() if err != nil { @@ -161,6 +153,66 @@ func NewLMDB() lmdbOpts { return lmdbOpts{} } +func (db *LmdbKV) CreateBuckets(buckets ...[]byte) error { + for _, name := range buckets { + name := name + cfg, ok := dbutils.BucketsCfg[string(name)] + if !ok { + continue + } + + var flags uint = lmdb.Create + if cfg.IsDupsort { + flags |= lmdb.DupSort + } + if err := db.Update(context.Background(), func(tx Tx) error { + dbi, err := tx.(*lmdbTx).tx.OpenDBI(string(name), flags) + if err != nil { + return err + } + db.buckets[cfg.ID] = dbi + return nil + }); err != nil { + return err + } + } + return nil +} + +func (db *LmdbKV) DropBuckets(buckets ...[]byte) error { + if db.env == nil { + return fmt.Errorf("db closed") + } + + for _, name := range buckets { + name := name + cfg, ok := dbutils.BucketsCfg[string(name)] + if !ok { + panic(fmt.Errorf("unknown bucket: %s. add it to dbutils.Buckets", string(name))) + } + + if cfg.ID < len(dbutils.Buckets) { + return fmt.Errorf("only buckets from dbutils.DeprecatedBuckets can be deleted, bucket: %s", name) + } + + if err := db.env.Update(func(txn *lmdb.Txn) error { + dbi := db.buckets[cfg.ID] + if dbi == 0 { // if bucket was not open on db start, then try to open it now, and if fail then nothing to drop + var openErr error + dbi, openErr = txn.OpenDBI(string(name), 0) + if openErr != nil { + return nil // DBI doesn't exists means no drop needed + } + } + return txn.Drop(dbi, true) + }); err != nil { + return err + } + } + + return nil +} + // Close closes db // All transactions must be closed before closing the database. func (db *LmdbKV) Close() { @@ -240,6 +292,7 @@ type lmdbBucket struct { isDupsort bool dupFrom int dupTo int + name []byte tx *lmdbTx dbi lmdb.DBI } @@ -288,7 +341,7 @@ func (tx *lmdbTx) Bucket(name []byte) Bucket { panic(fmt.Errorf("unknown bucket: %s. add it to dbutils.Buckets", string(name))) } - return &lmdbBucket{tx: tx, id: cfg.ID, dbi: tx.db.buckets[cfg.ID], isDupsort: cfg.IsDupsort, dupFrom: cfg.DupFromLen, dupTo: cfg.DupToLen} + return &lmdbBucket{tx: tx, id: cfg.ID, dbi: tx.db.buckets[cfg.ID], isDupsort: cfg.IsDupsort, dupFrom: cfg.DupFromLen, dupTo: cfg.DupToLen, name: name} } func (tx *lmdbTx) Commit(ctx context.Context) error { @@ -401,16 +454,6 @@ func (b *lmdbBucket) Size() (uint64, error) { return (st.LeafPages + st.BranchPages + st.OverflowPages) * uint64(os.Getpagesize()), nil } -func (b *lmdbBucket) Clear() error { - if err := b.tx.tx.Drop(b.dbi, true); err != nil { - return err - } - if err := createBucket(b.tx.tx, b.tx.db, b.id); err != nil { - return err - } - return nil -} - func (b *lmdbBucket) Cursor() Cursor { return &LmdbCursor{bucket: b, ctx: b.tx.ctx} } @@ -465,7 +508,7 @@ func (c *LmdbCursor) Seek(seek []byte) (k, v []byte, err error) { if lmdb.IsNotFound(err) { return nil, nil, nil } - err = fmt.Errorf("failed LmdbKV cursor.Seek(): %w, bucket: %d %s, isDupsort: %t, key: %x", err, c.bucket.id, dbutils.Buckets[c.bucket.id], c.bucket.isDupsort, seek) + err = fmt.Errorf("failed LmdbKV cursor.Seek(): %w, bucket: %s, isDupsort: %t, key: %x", err, c.bucket.name, c.bucket.isDupsort, seek) return []byte{}, nil, err } if c.prefix != nil && !bytes.HasPrefix(k, c.prefix) { @@ -656,7 +699,7 @@ func (c *LmdbCursor) Put(key []byte, value []byte) error { } if len(key) == 0 { - return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id]) + return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", c.bucket.name) } if c.cursor == nil { if err := c.initCursor(); err != nil { @@ -750,7 +793,7 @@ func (c *LmdbCursor) putCurrent(key []byte, value []byte) error { // Danger: if provided data will not sorted (or bucket have old records which mess with new in sorting manner) - db will corrupt. func (c *LmdbCursor) Append(key []byte, value []byte) error { if len(key) == 0 { - return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id]) + return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", c.bucket.name) } if c.cursor == nil { diff --git a/ethdb/kv_remote.go b/ethdb/kv_remote.go index f6176d2ba1173419b289cf574c346a3deca62811..eed3569b0318e4dacba0f0a6a4404aef174a826f 100644 --- a/ethdb/kv_remote.go +++ b/ethdb/kv_remote.go @@ -105,6 +105,14 @@ func NewRemote() remoteOpts { return remoteOpts{Remote: remote.DefaultOpts} } +func (db *RemoteKV) CreateBuckets(buckets ...[]byte) error { + panic("not implemented") +} + +func (db *RemoteKV) DropBuckets(buckets ...[]byte) error { + panic("not supported") +} + // Close closes BoltKV // All transactions must be closed before closing the database. func (db *RemoteKV) Close() { diff --git a/ethdb/kv_remote2.go b/ethdb/kv_remote2.go index 33a4afc6065396c2a77cbdc914d976aa35ec3c1b..f1196815b20651b25f7d17c197efaedec780885c 100644 --- a/ethdb/kv_remote2.go +++ b/ethdb/kv_remote2.go @@ -115,6 +115,14 @@ func NewRemote2() remote2Opts { return remote2Opts{} } +func (db *Remote2KV) CreateBuckets(buckets ...[]byte) error { + panic("not implemented") +} + +func (db *Remote2KV) DropBuckets(buckets ...[]byte) error { + panic("not supported") +} + // Close // All transactions must be closed before closing the database. func (db *Remote2KV) Close() { diff --git a/ethdb/object_db.go b/ethdb/object_db.go index 0197230a2161213224cd984685834be00f2d0927..2bb7e428965a2b992d835f73450159f217e9d47e 100644 --- a/ethdb/object_db.go +++ b/ethdb/object_db.go @@ -330,16 +330,14 @@ func (db *ObjectDatabase) Delete(bucket, key []byte) error { } func (db *ObjectDatabase) ClearBuckets(buckets ...[]byte) error { - for _, bucket := range buckets { - bucket := bucket - if err := db.kv.Update(context.Background(), func(tx Tx) error { - return tx.Bucket(bucket).Clear() - }); err != nil { - return err - } + if err := db.kv.DropBuckets(buckets...); err != nil { + return nil } + return db.kv.CreateBuckets(buckets...) +} - return nil +func (db *ObjectDatabase) DropBuckets(buckets ...[]byte) error { + return db.kv.DropBuckets(buckets...) } func (db *ObjectDatabase) Close() { diff --git a/migrations/migrations.go b/migrations/migrations.go index 75cee8b1f9b7521ea5079981fe2d5ab6f98df624..97e781abc8a1e474518a04421b8f0758dd532e3d 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -1,14 +1,56 @@ package migrations import ( + "bytes" + "errors" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/common/etl" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ugorji/go/codec" ) +// migrations apply sequentially in order of this array, skips applied migrations +// it allows - don't worry about merge conflicts and use switch branches +// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports +// +// Idempotency is expected +// Best practices to achieve Idempotency: +// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name. +// Example: +// - SyncStageProgress = []byte("SSP1") +// + SyncStageProgressOld1 = []byte("SSP1") +// + SyncStageProgress = []byte("SSP2") +// - clear new bucket in the beginning of transaction, drop old bucket in the end (not defer!). +// Example: +// Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { +// if err := db.(ethdb.NonTransactional).ClearBuckets(dbutils.SyncStageProgress); err != nil { // clear new bucket +// return err +// } +// +// extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error { +// ... // migration logic +// } +// if err := etl.Transform(...); err != nil { +// return err +// } +// +// if err := db.(ethdb.NonTransactional).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { // clear old bucket +// return err +// } +// }, +// - if you need migrate multiple buckets - create separate migration for each bucket +// - write test for new transaction +var migrations = []Migration{ + stagesToUseNamedKeys, + unwindStagesToUseNamedKeys, +} + type Migration struct { Name string - Up func(db ethdb.Database, history, receipts, txIndex, preImages bool) error + Up func(db ethdb.Database, dataDir string, OnLoadCommit etl.LoadCommitHandler) error } func NewMigrator() *Migrator { @@ -21,37 +63,92 @@ type Migrator struct { Migrations []Migration } -func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages bool) error { +func AppliedMigrations(db ethdb.Database, withPayload bool) (map[string][]byte, error) { + applied := map[string][]byte{} + err := db.Walk(dbutils.Migrations, nil, 0, func(k []byte, v []byte) (bool, error) { + if withPayload { + applied[string(common.CopyBytes(k))] = common.CopyBytes(v) + } else { + applied[string(common.CopyBytes(k))] = []byte{} + } + return true, nil + }) + return applied, err +} + +func (m *Migrator) Apply(db ethdb.Database, datadir string) error { if len(m.Migrations) == 0 { return nil } - lastApplied, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) - if err != nil && err != ethdb.ErrKeyNotFound { + applied, err := AppliedMigrations(db, false) + if err != nil { return err } - i := len(m.Migrations) - 1 - for ; i >= 0; i-- { - if m.Migrations[i].Name == string(lastApplied) { - break + for i := range m.Migrations { + v := m.Migrations[i] + if _, ok := applied[v.Name]; ok { + continue + } + log.Info("Apply migration", "name", v.Name) + if err := v.Up(db, datadir, func(putter ethdb.Putter, key []byte, isDone bool) error { + if !isDone { + return nil // don't save partial progress + } + stagesProgress, err := MarshalMigrationPayload(db) + if err != nil { + return err + } + err = db.Put(dbutils.Migrations, []byte(v.Name), stagesProgress) + if err != nil { + return err + } + return nil + }); err != nil { + return err } + + log.Info("Applied migration", "name", v.Name) } + return nil +} - m.Migrations = m.Migrations[i+1:] - for _, v := range m.Migrations { - log.Warn("Apply migration", "name", v.Name) - err := v.Up(db, history, receipts, txIndex, preImages) - if err != nil { - return err +func MarshalMigrationPayload(db ethdb.Getter) ([]byte, error) { + s := map[string][]byte{} + + buf := bytes.NewBuffer(nil) + encoder := codec.NewEncoder(buf, &codec.CborHandle{}) + + for i := range stages.DBKeys { + v, err := db.Get(dbutils.SyncStageProgress, stages.DBKeys[i]) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return nil, err } - err = db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(v.Name)) - if err != nil { - return err + if len(v) > 0 { + s[string(stages.DBKeys[i])] = common.CopyBytes(v) + } + + v, err = db.Get(dbutils.SyncStageUnwind, stages.DBKeys[i]) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return nil, err + } + if len(v) > 0 { + s["unwind_"+string(stages.DBKeys[i])] = common.CopyBytes(v) } - log.Warn("Applied migration", "name", v.Name) } - return nil + + if err := encoder.Encode(s); err != nil { + return nil, err + } + return buf.Bytes(), nil } -var migrations = []Migration{} +func UnmarshalMigrationPayload(data []byte) (map[string][]byte, error) { + s := map[string][]byte{} + + if err := codec.NewDecoder(bytes.NewReader(data), &codec.CborHandle{}).Decode(&s); err != nil { + return nil, err + } + return s, nil +} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index 51f07db1888e6e331f366f2e9e0d9041ca65090c..ea3c74a7229278a0c881758bbf42723ef950dccb 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -1,76 +1,154 @@ package migrations import ( + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "testing" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/ethdb" - "testing" + "github.com/stretchr/testify/require" ) func TestApplyWithInit(t *testing.T) { - db := ethdb.NewMemDatabase() + require, db := require.New(t), ethdb.NewMemDatabase() migrations = []Migration{ { "one", - func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { - return nil + func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + return OnLoadCommit(db, nil, true) }, }, { "two", - func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { - return nil + func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + return OnLoadCommit(db, nil, true) }, }, } migrator := NewMigrator() migrator.Migrations = migrations - err := migrator.Apply(db, false, false, false, false) - if err != nil { - t.Fatal() - } - v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) - if err != nil { - t.Fatal(err) - } - if string(v) != migrations[1].Name { - t.Fatal() - } + err := migrator.Apply(db, "") + require.NoError(err) + + applied, err := AppliedMigrations(db, false) + require.NoError(err) + + _, ok := applied[migrations[0].Name] + require.True(ok) + _, ok = applied[migrations[1].Name] + require.True(ok) + + // apply again + err = migrator.Apply(db, "") + require.NoError(err) + + applied2, err := AppliedMigrations(db, false) + require.NoError(err) + require.Equal(applied, applied2) } func TestApplyWithoutInit(t *testing.T) { - db := ethdb.NewMemDatabase() + require, db := require.New(t), ethdb.NewMemDatabase() migrations = []Migration{ { "one", - func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { + func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { t.Fatal("shouldn't been executed") return nil }, }, { "two", - func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { - return nil + func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + return OnLoadCommit(db, nil, true) }, }, } - err := db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(migrations[0].Name)) - if err != nil { - t.Fatal() - } + err := db.Put(dbutils.Migrations, []byte(migrations[0].Name), []byte{1}) + require.NoError(err) migrator := NewMigrator() migrator.Migrations = migrations - err = migrator.Apply(db, false, false, false, false) - if err != nil { - t.Fatal() - } - v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) - if err != nil { - t.Fatal(err) - } - if string(v) != migrations[1].Name { - t.Fatal() + err = migrator.Apply(db, "") + require.NoError(err) + + applied, err := AppliedMigrations(db, false) + require.NoError(err) + + require.Equal(2, len(applied)) + _, ok := applied[migrations[1].Name] + require.True(ok) + _, ok = applied[migrations[0].Name] + require.True(ok) + + // apply again + err = migrator.Apply(db, "") + require.NoError(err) + + applied2, err := AppliedMigrations(db, false) + require.NoError(err) + require.Equal(applied, applied2) +} + +func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { + require, db := require.New(t), ethdb.NewMemDatabase() + migrations = []Migration{ + { + "one", + func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + return OnLoadCommit(db, nil, true) + }, + }, + { + "two", + func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + t.Fatal("shouldn't been executed") + return nil + }, + }, } + err := db.Put(dbutils.Migrations, []byte(migrations[1].Name), []byte{1}) // apply non-first migration + require.NoError(err) + + migrator := NewMigrator() + migrator.Migrations = migrations + err = migrator.Apply(db, "") + require.NoError(err) + + applied, err := AppliedMigrations(db, false) + require.NoError(err) + + require.Equal(2, len(applied)) + _, ok := applied[migrations[1].Name] + require.True(ok) + _, ok = applied[migrations[0].Name] + require.True(ok) + + // apply again + err = migrator.Apply(db, "") + require.NoError(err) + + applied2, err := AppliedMigrations(db, false) + require.NoError(err) + require.Equal(applied, applied2) +} + +func TestMarshalStages(t *testing.T) { + require, db := require.New(t), ethdb.NewMemDatabase() + + err := stages.SaveStageProgress(db, stages.Execution, 42, []byte{}) + require.NoError(err) + + data, err := MarshalMigrationPayload(db) + require.NoError(err) + + res, err := UnmarshalMigrationPayload(data) + require.NoError(err) + + require.Equal(1, len(res)) + v, ok := res[string(stages.DBKeys[stages.Execution])] + require.True(ok) + require.NotNil(v) } diff --git a/migrations/stages_to_use_named_keys.go b/migrations/stages_to_use_named_keys.go new file mode 100644 index 0000000000000000000000000000000000000000..4bac29943436d909220325594a7a2b360f905d60 --- /dev/null +++ b/migrations/stages_to_use_named_keys.go @@ -0,0 +1,85 @@ +package migrations + +import ( + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/common/etl" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" +) + +var stagesToUseNamedKeys = Migration{ + Name: "stages_to_use_named_keys", + Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + if err := db.(ethdb.NonTransactional).ClearBuckets(dbutils.SyncStageProgress); err != nil { + return err + } + + extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error { + newKey, ok := stages.DBKeys[stages.SyncStage(k[0])] + if !ok { + return nil // nothing to do + } + // create new version of keys with same data + if err := next(k, newKey, v); err != nil { + return err + } + return nil + } + + if err := etl.Transform( + db, + dbutils.SyncStageProgressOld1, + dbutils.SyncStageProgress, + datadir, + extractFunc, + etl.IdentityLoadFunc, + etl.TransformArgs{OnLoadCommit: OnLoadCommit}, + ); err != nil { + return err + } + + if err := db.(ethdb.NonTransactional).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { + return err + } + return nil + }, +} + +var unwindStagesToUseNamedKeys = Migration{ + Name: "unwind_stages_to_use_named_keys", + Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + if err := db.(ethdb.NonTransactional).ClearBuckets(dbutils.SyncStageUnwind); err != nil { + return err + } + + extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error { + newKey, ok := stages.DBKeys[stages.SyncStage(k[0])] + if !ok { + return nil // nothing to do + + } + // create new version of keys with same data + if err := next(k, newKey, v); err != nil { + return err + } + return nil + } + + if err := etl.Transform( + db, + dbutils.SyncStageUnwindOld1, + dbutils.SyncStageUnwind, + datadir, + extractFunc, + etl.IdentityLoadFunc, + etl.TransformArgs{OnLoadCommit: OnLoadCommit}, + ); err != nil { + return err + } + + if err := db.(ethdb.NonTransactional).DropBuckets(dbutils.SyncStageUnwindOld1); err != nil { + return err + } + return nil + }, +} diff --git a/migrations/stages_to_use_named_keys_test.go b/migrations/stages_to_use_named_keys_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cf4f9ede2d13ad2a0cd36fd24818e6125db16c78 --- /dev/null +++ b/migrations/stages_to_use_named_keys_test.go @@ -0,0 +1,74 @@ +package migrations + +import ( + "encoding/binary" + "errors" + "testing" + + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/stretchr/testify/require" +) + +func TestSyncStagesToUseNamedKeys(t *testing.T) { + require, db := require.New(t), ethdb.NewMemDatabase() + + err := db.KV().CreateBuckets(dbutils.SyncStageProgressOld1) + require.NoError(err) + + // pretend that execution stage is at block 42 + err = db.Put(dbutils.SyncStageProgressOld1, []byte{byte(stages.Execution)}, dbutils.EncodeBlockNumber(42)) + require.NoError(err) + + migrator := NewMigrator() + migrator.Migrations = []Migration{stagesToUseNamedKeys} + err = migrator.Apply(db, "") + require.NoError(err) + + i := 0 + err = db.Walk(dbutils.SyncStageProgress, nil, 0, func(k, v []byte) (bool, error) { + i++ + return true, nil + }) + require.NoError(err) + require.Equal(1, i) + + _, err = db.Get(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}) + require.True(errors.Is(err, ethdb.ErrKeyNotFound)) + + v, err := db.Get(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution]) + require.NoError(err) + require.Equal(42, int(binary.BigEndian.Uint64(v))) +} + +func TestUnwindStagesToUseNamedKeys(t *testing.T) { + require, db := require.New(t), ethdb.NewMemDatabase() + + err := db.KV().CreateBuckets(dbutils.SyncStageUnwindOld1) + require.NoError(err) + + // pretend that execution stage is at block 42 + err = db.Put(dbutils.SyncStageUnwindOld1, []byte{byte(stages.Execution)}, dbutils.EncodeBlockNumber(42)) + require.NoError(err) + + migrator := NewMigrator() + migrator.Migrations = []Migration{unwindStagesToUseNamedKeys} + err = migrator.Apply(db, "") + require.NoError(err) + + i := 0 + err = db.Walk(dbutils.SyncStageUnwind, nil, 0, func(k, v []byte) (bool, error) { + i++ + return true, nil + }) + require.NoError(err) + require.Equal(1, i) + + _, err = db.Get(dbutils.SyncStageUnwind, []byte{byte(stages.Execution)}) + require.True(errors.Is(err, ethdb.ErrKeyNotFound)) + + v, err := db.Get(dbutils.SyncStageUnwind, stages.DBKeys[stages.Execution]) + require.NoError(err) + require.Equal(42, int(binary.BigEndian.Uint64(v))) +}