diff --git a/cmd/integration/commands/reset_state.go b/cmd/integration/commands/reset_state.go index 206f725d8692d902a4be482578960b18b7f0ced3..3c455f2969aa569f058588aa534a38ce5eec8c68 100644 --- a/cmd/integration/commands/reset_state.go +++ b/cmd/integration/commands/reset_state.go @@ -6,7 +6,6 @@ import ( "os" "path" "sync" - "text/tabwriter" "time" "github.com/ledgerwatch/lmdb-go/lmdb" @@ -162,14 +161,11 @@ func resetTxLookup(db *ethdb.ObjectDatabase) error { func printStages(db *ethdb.ObjectDatabase) error { var err error var progress uint64 - w := new(tabwriter.Writer) - defer w.Flush() - w.Init(os.Stdout, 8, 8, 0, '\t', 0) for stage := stages.SyncStage(0); stage < stages.Finish; stage++ { if progress, _, err = stages.GetStageProgress(db, stage); err != nil { return err } - fmt.Fprintf(w, "%s \t %d\n", string(stages.DBKeys[stage]), progress) + fmt.Printf("Stage: %d, progress: %d\n", stage, progress) } return nil } diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go index 9f9ae2d337bb17bb533484d230d03dd9eb8735c6..579e3192da5707443333c1d7857e0dfdda6ad871 100644 --- a/cmd/integration/commands/root.go +++ b/cmd/integration/commands/root.go @@ -4,10 +4,8 @@ import ( "context" "fmt" "github.com/ledgerwatch/turbo-geth/cmd/utils" - "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/internal/debug" "github.com/ledgerwatch/turbo-geth/log" - "github.com/ledgerwatch/turbo-geth/migrations" "github.com/spf13/cobra" "os" "os/signal" @@ -21,14 +19,6 @@ var rootCmd = &cobra.Command{ if err := debug.SetupCobra(cmd); err != nil { panic(err) } - - if len(chaindata) > 0 { - db := ethdb.MustOpen(chaindata) - defer db.Close() - if err := migrations.NewMigrator().Apply(db, ""); err != nil { - panic(err) - } - } }, PersistentPostRun: func(cmd *cobra.Command, args []string) { debug.Exit() diff --git a/cmd/pics/state.go b/cmd/pics/state.go index 4fac949e3095fe58df9a3ad736caf801ef2da48d..879ad5536273929306ccc16c0848d7dbcd07996e 100644 --- a/cmd/pics/state.go +++ b/cmd/pics/state.go @@ -116,6 +116,9 @@ var bucketLabels = map[string]string{ string(dbutils.HeaderPrefix): "Headers", string(dbutils.ConfigPrefix): "Config", string(dbutils.BlockBodyPrefix): "Block Bodies", + string(dbutils.HeadHeaderKey): "Last Header", + string(dbutils.HeadFastBlockKey): "Last Fast", + string(dbutils.HeadBlockKey): "Last Block", string(dbutils.HeaderNumberPrefix): "Header Numbers", string(dbutils.AccountChangeSetBucket): "Account Change Sets", string(dbutils.StorageChangeSetBucket): "Storage Change Sets", diff --git a/cmd/state/stateless/state_snapshot.go b/cmd/state/stateless/state_snapshot.go index 66ebeb301f62e931c7d8f12ce3e1b6c79075d98d..a7315dc425a4034a10319e3c2a502c14920c923a 100644 --- a/cmd/state/stateless/state_snapshot.go +++ b/cmd/state/stateless/state_snapshot.go @@ -117,7 +117,7 @@ func loadSnapshot(db ethdb.Database, filename string, createDb CreateDbFunc) { err = copyDatabase(diskDb, db) check(err) - err = migrations.NewMigrator().Apply(diskDb, "") + err = migrations.NewMigrator().Apply(diskDb, false, false, false, false) check(err) } diff --git a/common/dbutils/bucket.go b/common/dbutils/bucket.go index 6e4324ed2fd2036441aadf7ed1de1cb6eb9022f6..5fe68fc755bff03530f39b9568d6daaba652208f 100644 --- a/common/dbutils/bucket.go +++ b/common/dbutils/bucket.go @@ -8,7 +8,7 @@ import ( "github.com/ledgerwatch/turbo-geth/metrics" ) -// Buckets +// The fields below define the low level database schema prefixing. var ( // "Plain State". The same as CurrentStateBucket, but the keys arent' hashed. @@ -91,6 +91,18 @@ var ( // databaseVerisionKey tracks the current database version. DatabaseVerisionKey = []byte("DatabaseVersion") + // headHeaderKey tracks the latest know header's hash. + HeadHeaderKey = []byte("LastHeader") + + // headBlockKey tracks the latest know full block's hash. + HeadBlockKey = []byte("LastBlock") + + // headFastBlockKey tracks the latest known incomplete block's hash during fast sync. + HeadFastBlockKey = []byte("LastFast") + + // fastTrieProgressKey tracks the number of trie entries imported during fast sync. + FastTrieProgressKey = []byte("TrieSync") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). HeaderPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header HeaderTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td @@ -107,55 +119,39 @@ var ( ConfigPrefix = []byte("ethereum-config-") // config prefix for the db // Chain index prefixes (use `i` + single byte to avoid mixing data types). - BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress - - // Progress of sync stages: stageName -> stageData - SyncStageProgress = []byte("SSP2") - SyncStageProgressOld1 = []byte("SSP") - // Position to where to unwind sync stages: stageName -> stageData - SyncStageUnwind = []byte("SSU2") - SyncStageUnwindOld1 = []byte("SSU") - - CliqueBucket = []byte("clique-") - - // this bucket stored in separated database - InodesBucket = []byte("inodes") + BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress + BloomBitsIndexPrefixShead = []byte("iBshead") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress - // Transaction senders - stored separately from the block bodies - Senders = []byte("txSenders") - - // fastTrieProgressKey tracks the number of trie entries imported during fast sync. - FastTrieProgressKey = []byte("TrieSync") - // headBlockKey tracks the latest know full block's hash. - HeadBlockKey = []byte("LastBlock") - // headFastBlockKey tracks the latest known incomplete block's hash during fast sync. - HeadFastBlockKey = []byte("LastFast") - // headHeaderKey tracks the latest know header's hash. - HeadHeaderKey = []byte("LastHeader") - - // migrationName -> serialized SyncStageProgress and SyncStageUnwind buckets - // it stores stages progress to understand in which context was executed migration - // in case of bug-report developer can ask content of this bucket - Migrations = []byte("migrations") -) + PreimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) + PreimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) -// Keys -var ( // last block that was pruned // it's saved one in 5 minutes LastPrunedBlockKey = []byte("LastPrunedBlock") + + // LastAppliedMigration keep the name of tle last applied migration. + LastAppliedMigration = []byte("lastAppliedMigration") + //StorageModeHistory - does node save history. StorageModeHistory = []byte("smHistory") //StorageModeReceipts - does node save receipts. StorageModeReceipts = []byte("smReceipts") //StorageModeTxIndex - does node save transactions index. StorageModeTxIndex = []byte("smTxIndex") -) + //StorageModeIntermediateTrieHash - does IntermediateTrieHash feature enabled + StorageModeIntermediateTrieHash = []byte("smIntermediateTrieHash") -// Metrics -var ( - PreimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) - PreimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) + // Progress of sync stages + SyncStageProgress = []byte("SSP") + // Position to where to unwind sync stages + SyncStageUnwind = []byte("SSU") + CliqueBucket = []byte("clique-") + + // this bucket stored in separated database + InodesBucket = []byte("inodes") + + // Transaction senders - stored separately from the block bodies + Senders = []byte("txSenders") ) // Buckets - list of all buckets. App will panic if some bucket is not in this list. @@ -171,7 +167,13 @@ var Buckets = [][]byte{ StorageChangeSetBucket, IntermediateTrieHashBucket, DatabaseVerisionKey, + HeadHeaderKey, + HeadBlockKey, + HeadFastBlockKey, + FastTrieProgressKey, HeaderPrefix, + HeaderTDSuffix, + HeaderHashSuffix, HeaderNumberPrefix, BlockBodyPrefix, BlockReceiptsPrefix, @@ -180,8 +182,14 @@ var Buckets = [][]byte{ PreimagePrefix, ConfigPrefix, BloomBitsIndexPrefix, + BloomBitsIndexPrefixShead, + LastPrunedBlockKey, DatabaseInfoBucket, IncarnationMapBucket, + LastAppliedMigration, + StorageModeHistory, + StorageModeReceipts, + StorageModeTxIndex, CliqueBucket, SyncStageProgress, SyncStageUnwind, @@ -191,17 +199,6 @@ var Buckets = [][]byte{ PlainStorageChangeSetBucket, InodesBucket, Senders, - FastTrieProgressKey, - HeadBlockKey, - HeadFastBlockKey, - HeadHeaderKey, - Migrations, -} - -// DeprecatedBuckets - list of buckets which can be programmatically deleted - for example after migration -var DeprecatedBuckets = [][]byte{ - SyncStageProgressOld1, - SyncStageUnwindOld1, } var BucketsCfg = map[string]*BucketConfigItem{} @@ -239,32 +236,26 @@ func init() { }) for i := range Buckets { - BucketsCfg[string(Buckets[i])] = createBucketConfig(i, Buckets[i]) - } - - for i := range DeprecatedBuckets { - BucketsCfg[string(DeprecatedBuckets[i])] = createBucketConfig(len(Buckets)+i, DeprecatedBuckets[i]) - } -} - -func createBucketConfig(id int, name []byte) *BucketConfigItem { - cfg := &BucketConfigItem{ID: id} - - for _, dupCfg := range dupSortConfig { - if !bytes.Equal(dupCfg.Bucket, name) { - continue - } - - cfg.DupFromLen = dupCfg.FromLen - cfg.DupToLen = dupCfg.ToLen - - if bytes.Equal(dupCfg.Bucket, CurrentStateBucket) { - cfg.IsDupsort = debug.IsHashedStateDupsortEnabled() - } - if bytes.Equal(dupCfg.Bucket, PlainStateBucket) { - cfg.IsDupsort = debug.IsPlainStateDupsortEnabled() + BucketsCfg[string(Buckets[i])] = &BucketConfigItem{ID: i} + + for _, cfg := range dupSortConfig { + if cfg.ID != i { + continue + } + bucketCfg, ok := BucketsCfg[string(cfg.Bucket)] + if !ok { + continue + } + cfg.FromLen = bucketCfg.DupFromLen + cfg.ToLen = bucketCfg.DupToLen + + if bytes.Equal(cfg.Bucket, CurrentStateBucket) { + bucketCfg.IsDupsort = debug.IsHashedStateDupsortEnabled() + } + if bytes.Equal(cfg.Bucket, PlainStateBucket) { + bucketCfg.IsDupsort = debug.IsPlainStateDupsortEnabled() + } } } - return cfg } diff --git a/common/etl/etl.go b/common/etl/etl.go index c1b5f98801784cf34c917edf99d9056fb5d6c09d..a28de66ed069dcd7a1925d5da234f2eff6afc7f0 100644 --- a/common/etl/etl.go +++ b/common/etl/etl.go @@ -53,7 +53,7 @@ func NextKey(key []byte) ([]byte, error) { // loaded from files into a DB // * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) // * `isDone`: true, if everything is processed -type LoadCommitHandler func(db ethdb.Putter, key []byte, isDone bool) error +type LoadCommitHandler func(ethdb.Putter, []byte, bool) error type TransformArgs struct { ExtractStartKey []byte diff --git a/core/pruner.go b/core/pruner.go index 358e4f6a99e5fef797d86023cf91c9a107e7610c..7457a140221205e31d609d449c825d57a0cb6d65 100644 --- a/core/pruner.go +++ b/core/pruner.go @@ -125,7 +125,7 @@ func (p *BasicPruner) Stop() { } func (p *BasicPruner) ReadLastPrunedBlockNum() uint64 { - data, _ := p.db.Get(dbutils.DatabaseInfoBucket, dbutils.LastPrunedBlockKey) + data, _ := p.db.Get(dbutils.LastPrunedBlockKey, dbutils.LastPrunedBlockKey) if len(data) == 0 { return 0 } @@ -136,7 +136,7 @@ func (p *BasicPruner) ReadLastPrunedBlockNum() uint64 { func (p *BasicPruner) WriteLastPrunedBlockNum(num uint64) { b := make([]byte, 8) binary.LittleEndian.PutUint64(b, num) - if err := p.db.Put(dbutils.DatabaseInfoBucket, dbutils.LastPrunedBlockKey, b); err != nil { + if err := p.db.Put(dbutils.LastPrunedBlockKey, dbutils.LastPrunedBlockKey, b); err != nil { log.Crit("Failed to store last pruned block's num", "err", err) } } diff --git a/core/state/history.go b/core/state/history.go index cf2e7ba47e8c18eb497f00734ad2edd96bfa5179..7b641d3969969671836abd88547922ce6a8fa3ea 100644 --- a/core/state/history.go +++ b/core/state/history.go @@ -127,7 +127,7 @@ func FindByHistory(tx ethdb.Tx, plain, storage bool, key []byte, timestamp uint6 var lastChangesetBlock, lastIndexBlock uint64 stageBucket := tx.Bucket(dbutils.SyncStageProgress) if stageBucket != nil { - v1, err1 := stageBucket.Get(stages.DBKeys[stages.Execution]) + v1, err1 := stageBucket.Get([]byte{byte(stages.Execution)}) if err1 != nil && !errors.Is(err1, ethdb.ErrKeyNotFound) { return nil, err1 } @@ -135,9 +135,9 @@ func FindByHistory(tx ethdb.Tx, plain, storage bool, key []byte, timestamp uint6 lastChangesetBlock = binary.BigEndian.Uint64(v1[:8]) } if storage { - v1, err1 = stageBucket.Get(stages.DBKeys[stages.AccountHistoryIndex]) + v1, err1 = stageBucket.Get([]byte{byte(stages.AccountHistoryIndex)}) } else { - v1, err1 = stageBucket.Get(stages.DBKeys[stages.StorageHistoryIndex]) + v1, err1 = stageBucket.Get([]byte{byte(stages.StorageHistoryIndex)}) } if err1 != nil && !errors.Is(err1, ethdb.ErrKeyNotFound) { return nil, err1 @@ -559,14 +559,14 @@ func returnCorrectWalker(bucket, hBucket []byte) func(v []byte) changeset.Walker func getIndexGenerationProgress(tx ethdb.Tx, stage stages.SyncStage) (generatedTo uint64, executedTo uint64, err error) { b := tx.Bucket(dbutils.SyncStageProgress) - v, err := b.Get(stages.DBKeys[stage]) + v, err := b.Get([]byte{byte(stage)}) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, 0, err } if len(v) >= 8 { generatedTo = binary.BigEndian.Uint64(v[:8]) } - v, err = b.Get(stages.DBKeys[stages.Execution]) + v, err = b.Get([]byte{byte(stages.Execution)}) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, 0, err } diff --git a/core/state/history_test.go b/core/state/history_test.go index 88255030db7448a42b35f32525a36037fa5d9e6f..391097374cd82de61e6c490f6bb0815d5ecf3883 100644 --- a/core/state/history_test.go +++ b/core/state/history_test.go @@ -1376,12 +1376,12 @@ func TestWalkAsOfStateHashed_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.StorageHistoryIndex], b) + err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.StorageHistoryIndex)}, b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) + err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) if err != nil { t.Fatal(err) } @@ -1531,12 +1531,12 @@ func TestWalkAsOfStatePlain_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.StorageHistoryIndex], b) + err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.StorageHistoryIndex)}, b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) + err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) if err != nil { t.Fatal(err) } @@ -1686,12 +1686,12 @@ func TestWalkAsOfAccountHashed_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.AccountHistoryIndex], b) + err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.AccountHistoryIndex)}, b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) + err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) if err != nil { t.Fatal(err) } @@ -1849,12 +1849,12 @@ func TestWalkAsOfAccountPlain_WithoutIndex(t *testing.T) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, 0) - err := db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.AccountHistoryIndex], b) + err := db.Put(dbutils.SyncStageProgress, []byte{byte(stages.AccountHistoryIndex)}, b) if err != nil { t.Fatal(err) } binary.BigEndian.PutUint64(b, 7) - err = db.Put(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution], b) + err = db.Put(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}, b) if err != nil { t.Fatal(err) } diff --git a/eth/backend.go b/eth/backend.go index 8b6b48125f0fa8c2fec2714298898d05a62e15eb..9c4300348dd932f1ff64af6d94bbb0c06e6f2feb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -47,7 +47,6 @@ import ( "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/internal/ethapi" "github.com/ledgerwatch/turbo-geth/log" - "github.com/ledgerwatch/turbo-geth/migrations" "github.com/ledgerwatch/turbo-geth/miner" "github.com/ledgerwatch/turbo-geth/node" "github.com/ledgerwatch/turbo-geth/p2p" @@ -155,14 +154,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } } if ctx.Config.PrivateApiAddr != "" { + //remotedbserver.StartDeprecated(chainDb.KV(), ctx.Config.PrivateApiAddr) remotedbserver.StartGrpc(chainDb.KV(), ctx.Config.PrivateApiAddr) } - err = migrations.NewMigrator().Apply(chainDb, ctx.Config.DataDir) - if err != nil { - return nil, err - } - chainConfig, genesisHash, _, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis, config.StorageMode.History, false /* overwrite */) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index 7322767628b59d72152cec7c917d57c20b5687a6..adb8c9b9a3f8bd9cb5bc4e235e0bd000c32cc9bf 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -43,23 +43,9 @@ const ( Finish // Nominal stage after all other stages ) -var DBKeys = map[SyncStage][]byte{ - Headers: []byte("Headers"), - Bodies: []byte("Bodies"), - Senders: []byte("Senders"), - Execution: []byte("Execution"), - IntermediateHashes: []byte("IntermediateHashes"), - HashState: []byte("HashState"), - AccountHistoryIndex: []byte("AccountHistoryIndex"), - StorageHistoryIndex: []byte("StorageHistoryIndex"), - TxLookup: []byte("TxLookup"), - TxPool: []byte("TxPool"), - Finish: []byte("Finish"), -} - // GetStageProgress retrieves saved progress of given sync stage from the database func GetStageProgress(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) { - v, err := db.Get(dbutils.SyncStageProgress, DBKeys[stage]) + v, err := db.Get(dbutils.SyncStageProgress, []byte{byte(stage)}) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, nil, err } @@ -68,14 +54,14 @@ func GetStageProgress(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) // SaveStageProgress saves the progress of the given stage in the database func SaveStageProgress(db ethdb.Putter, stage SyncStage, progress uint64, stageData []byte) error { - return db.Put(dbutils.SyncStageProgress, DBKeys[stage], marshalData(progress, stageData)) + return db.Put(dbutils.SyncStageProgress, []byte{byte(stage)}, marshalData(progress, stageData)) } // GetStageUnwind retrieves the invalidation for the given stage // Invalidation means that that stage needs to rollback to the invalidation // point and be redone func GetStageUnwind(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) { - v, err := db.Get(dbutils.SyncStageUnwind, DBKeys[stage]) + v, err := db.Get(dbutils.SyncStageUnwind, []byte{byte(stage)}) if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { return 0, nil, err } @@ -84,7 +70,7 @@ func GetStageUnwind(db ethdb.Getter, stage SyncStage) (uint64, []byte, error) { // SaveStageUnwind saves the progress of the given stage in the database func SaveStageUnwind(db ethdb.Putter, stage SyncStage, invalidation uint64, stageData []byte) error { - return db.Put(dbutils.SyncStageUnwind, DBKeys[stage], marshalData(invalidation, stageData)) + return db.Put(dbutils.SyncStageUnwind, []byte{byte(stage)}, marshalData(invalidation, stageData)) } func marshalData(blockNumber uint64, stageData []byte) []byte { diff --git a/ethdb/interface.go b/ethdb/interface.go index b2a15f9c5768e3c4a5e82623c52c412f8ead68f5..5a618dbf4bb3dfacfcb42c2527b8a77ebc6936e3 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -47,7 +47,7 @@ type Getter interface { // Only the keys whose first fixedbits match those of startkey are iterated over. // walker is called for each eligible entry. // If walker returns false or an error, the walk stops. - Walk(bucket, startkey []byte, fixedbits int, walker func(k, v []byte) (bool, error)) error + Walk(bucket, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error // MultiWalk is similar to multiple Walk calls folded into one. MultiWalk(bucket []byte, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error @@ -116,8 +116,7 @@ type HasNetInterface interface { } type NonTransactional interface { - ClearBuckets(buckets ...[]byte) error // makes them empty - DropBuckets(buckets ...[]byte) error // drops them, use of them after drop will panic + ClearBuckets(buckets ...[]byte) error } var errNotSupported = errors.New("not supported") diff --git a/ethdb/kv_abstract.go b/ethdb/kv_abstract.go index 82fdcbda4a6f2bbbcdc6b33800e0ee5ecf279e4e..4eb8272acbca189addb22e375c50c86b9568eb78 100644 --- a/ethdb/kv_abstract.go +++ b/ethdb/kv_abstract.go @@ -11,8 +11,6 @@ type KV interface { Begin(ctx context.Context, writable bool) (Tx, error) IdealBatchSize() int - DropBuckets(buckets ...[]byte) error - CreateBuckets(buckets ...[]byte) error } type Tx interface { @@ -29,6 +27,7 @@ type Bucket interface { Cursor() Cursor Size() (uint64, error) + Clear() error } type Cursor interface { diff --git a/ethdb/kv_badger.go b/ethdb/kv_badger.go index f14f3bcd31122cc3b60912cf7aac97dfd7ec35d7..16962950e5dda9fe385c63afb503d9418e109ef3 100644 --- a/ethdb/kv_badger.go +++ b/ethdb/kv_badger.go @@ -127,14 +127,6 @@ func (db *badgerKV) vlogGCLoop(ctx context.Context, gcTicker *time.Ticker) { } } -func (db *badgerKV) DropBuckets(buckets ...[]byte) error { - panic("not implemented") -} - -func (db *badgerKV) CreateBuckets(buckets ...[]byte) error { - panic("not implemented") -} - // Close closes BoltKV // All transactions must be closed before closing the database. func (db *badgerKV) Close() { diff --git a/ethdb/kv_bolt.go b/ethdb/kv_bolt.go index 4ec00d8e047ac896f647a3fc1ca9dedd6d625adc..507f17df866418e0414ed1661d24362de80b5f79 100644 --- a/ethdb/kv_bolt.go +++ b/ethdb/kv_bolt.go @@ -215,14 +215,6 @@ func NewBolt() boltOpts { return o } -func (db *BoltKV) DropBuckets(buckets ...[]byte) error { - panic("not implemented") -} - -func (db *BoltKV) CreateBuckets(buckets ...[]byte) error { - panic("not implemented") -} - // Close closes BoltKV // All transactions must be closed before closing the database. func (db *BoltKV) Close() { diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index 4bd43936b0cf19b28085b0a59afe72c9e143869e..5a2b4be4772758adeaf33e6e8c5f17fc0582f26f 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -87,7 +87,7 @@ func (opts lmdbOpts) Open() (KV, error) { wg: &sync.WaitGroup{}, } - db.buckets = make([]lmdb.DBI, len(dbutils.Buckets)+len(dbutils.DeprecatedBuckets)) + db.buckets = make([]lmdb.DBI, len(dbutils.Buckets)) if opts.readOnly { if err := env.View(func(tx *lmdb.Txn) error { for _, name := range dbutils.Buckets { @@ -102,23 +102,16 @@ func (opts lmdbOpts) Open() (KV, error) { return nil, err } } else { - if err := db.CreateBuckets(dbutils.Buckets...); err != nil { + if err := env.Update(func(tx *lmdb.Txn) error { + for id := range dbutils.Buckets { + if err := createBucket(tx, db, id); err != nil { + return err + } + } + return nil + }); err != nil { return nil, err } - // don't create deprecated buckets - } - - if err := env.View(func(tx *lmdb.Txn) error { - for _, name := range dbutils.DeprecatedBuckets { - dbi, createErr := tx.OpenDBI(string(name), 0) - if createErr == nil { - continue // if deprecated bucket couldn't be open - then it's deleted and it's fine - } - db.buckets[dbutils.BucketsCfg[string(name)].ID] = dbi - } - return nil - }); err != nil { - return nil, err } if !opts.inMem { @@ -132,6 +125,21 @@ func (opts lmdbOpts) Open() (KV, error) { return db, nil } +func createBucket(tx *lmdb.Txn, db *LmdbKV, id int) error { + var flags uint = lmdb.Create + name := string(dbutils.Buckets[id]) + cfg := dbutils.BucketsCfg[name] + if cfg.IsDupsort { + flags |= lmdb.DupSort + } + dbi, err := tx.OpenDBI(name, flags) + if err != nil { + return err + } + db.buckets[id] = dbi + return nil +} + func (opts lmdbOpts) MustOpen() KV { db, err := opts.Open() if err != nil { @@ -153,66 +161,6 @@ func NewLMDB() lmdbOpts { return lmdbOpts{} } -func (db *LmdbKV) CreateBuckets(buckets ...[]byte) error { - for _, name := range buckets { - name := name - cfg, ok := dbutils.BucketsCfg[string(name)] - if !ok { - continue - } - - var flags uint = lmdb.Create - if cfg.IsDupsort { - flags |= lmdb.DupSort - } - if err := db.Update(context.Background(), func(tx Tx) error { - dbi, err := tx.(*lmdbTx).tx.OpenDBI(string(name), flags) - if err != nil { - return err - } - db.buckets[cfg.ID] = dbi - return nil - }); err != nil { - return err - } - } - return nil -} - -func (db *LmdbKV) DropBuckets(buckets ...[]byte) error { - if db.env == nil { - return fmt.Errorf("db closed") - } - - for _, name := range buckets { - name := name - cfg, ok := dbutils.BucketsCfg[string(name)] - if !ok { - panic(fmt.Errorf("unknown bucket: %s. add it to dbutils.Buckets", string(name))) - } - - if cfg.ID < len(dbutils.Buckets) { - return fmt.Errorf("only buckets from dbutils.DeprecatedBuckets can be deleted, bucket: %s", name) - } - - if err := db.env.Update(func(txn *lmdb.Txn) error { - dbi := db.buckets[cfg.ID] - if dbi == 0 { // if bucket was not open on db start, then try to open it now, and if fail then nothing to drop - var openErr error - dbi, openErr = txn.OpenDBI(string(name), 0) - if openErr != nil { - return nil // DBI doesn't exists means no drop needed - } - } - return txn.Drop(dbi, true) - }); err != nil { - return err - } - } - - return nil -} - // Close closes db // All transactions must be closed before closing the database. func (db *LmdbKV) Close() { @@ -292,7 +240,6 @@ type lmdbBucket struct { isDupsort bool dupFrom int dupTo int - name []byte tx *lmdbTx dbi lmdb.DBI } @@ -341,7 +288,7 @@ func (tx *lmdbTx) Bucket(name []byte) Bucket { panic(fmt.Errorf("unknown bucket: %s. add it to dbutils.Buckets", string(name))) } - return &lmdbBucket{tx: tx, id: cfg.ID, dbi: tx.db.buckets[cfg.ID], isDupsort: cfg.IsDupsort, dupFrom: cfg.DupFromLen, dupTo: cfg.DupToLen, name: name} + return &lmdbBucket{tx: tx, id: cfg.ID, dbi: tx.db.buckets[cfg.ID], isDupsort: cfg.IsDupsort, dupFrom: cfg.DupFromLen, dupTo: cfg.DupToLen} } func (tx *lmdbTx) Commit(ctx context.Context) error { @@ -454,6 +401,16 @@ func (b *lmdbBucket) Size() (uint64, error) { return (st.LeafPages + st.BranchPages + st.OverflowPages) * uint64(os.Getpagesize()), nil } +func (b *lmdbBucket) Clear() error { + if err := b.tx.tx.Drop(b.dbi, true); err != nil { + return err + } + if err := createBucket(b.tx.tx, b.tx.db, b.id); err != nil { + return err + } + return nil +} + func (b *lmdbBucket) Cursor() Cursor { return &LmdbCursor{bucket: b, ctx: b.tx.ctx} } @@ -508,7 +465,7 @@ func (c *LmdbCursor) Seek(seek []byte) (k, v []byte, err error) { if lmdb.IsNotFound(err) { return nil, nil, nil } - err = fmt.Errorf("failed LmdbKV cursor.Seek(): %w, bucket: %s, isDupsort: %t, key: %x", err, c.bucket.name, c.bucket.isDupsort, seek) + err = fmt.Errorf("failed LmdbKV cursor.Seek(): %w, bucket: %d %s, isDupsort: %t, key: %x", err, c.bucket.id, dbutils.Buckets[c.bucket.id], c.bucket.isDupsort, seek) return []byte{}, nil, err } if c.prefix != nil && !bytes.HasPrefix(k, c.prefix) { @@ -699,7 +656,7 @@ func (c *LmdbCursor) Put(key []byte, value []byte) error { } if len(key) == 0 { - return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", c.bucket.name) + return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id]) } if c.cursor == nil { if err := c.initCursor(); err != nil { @@ -793,7 +750,7 @@ func (c *LmdbCursor) putCurrent(key []byte, value []byte) error { // Danger: if provided data will not sorted (or bucket have old records which mess with new in sorting manner) - db will corrupt. func (c *LmdbCursor) Append(key []byte, value []byte) error { if len(key) == 0 { - return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", c.bucket.name) + return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id]) } if c.cursor == nil { diff --git a/ethdb/kv_remote.go b/ethdb/kv_remote.go index eed3569b0318e4dacba0f0a6a4404aef174a826f..f6176d2ba1173419b289cf574c346a3deca62811 100644 --- a/ethdb/kv_remote.go +++ b/ethdb/kv_remote.go @@ -105,14 +105,6 @@ func NewRemote() remoteOpts { return remoteOpts{Remote: remote.DefaultOpts} } -func (db *RemoteKV) CreateBuckets(buckets ...[]byte) error { - panic("not implemented") -} - -func (db *RemoteKV) DropBuckets(buckets ...[]byte) error { - panic("not supported") -} - // Close closes BoltKV // All transactions must be closed before closing the database. func (db *RemoteKV) Close() { diff --git a/ethdb/kv_remote2.go b/ethdb/kv_remote2.go index f1196815b20651b25f7d17c197efaedec780885c..33a4afc6065396c2a77cbdc914d976aa35ec3c1b 100644 --- a/ethdb/kv_remote2.go +++ b/ethdb/kv_remote2.go @@ -115,14 +115,6 @@ func NewRemote2() remote2Opts { return remote2Opts{} } -func (db *Remote2KV) CreateBuckets(buckets ...[]byte) error { - panic("not implemented") -} - -func (db *Remote2KV) DropBuckets(buckets ...[]byte) error { - panic("not supported") -} - // Close // All transactions must be closed before closing the database. func (db *Remote2KV) Close() { diff --git a/ethdb/object_db.go b/ethdb/object_db.go index 2bb7e428965a2b992d835f73450159f217e9d47e..0197230a2161213224cd984685834be00f2d0927 100644 --- a/ethdb/object_db.go +++ b/ethdb/object_db.go @@ -330,14 +330,16 @@ func (db *ObjectDatabase) Delete(bucket, key []byte) error { } func (db *ObjectDatabase) ClearBuckets(buckets ...[]byte) error { - if err := db.kv.DropBuckets(buckets...); err != nil { - return nil + for _, bucket := range buckets { + bucket := bucket + if err := db.kv.Update(context.Background(), func(tx Tx) error { + return tx.Bucket(bucket).Clear() + }); err != nil { + return err + } } - return db.kv.CreateBuckets(buckets...) -} -func (db *ObjectDatabase) DropBuckets(buckets ...[]byte) error { - return db.kv.DropBuckets(buckets...) + return nil } func (db *ObjectDatabase) Close() { diff --git a/migrations/migrations.go b/migrations/migrations.go index 97e781abc8a1e474518a04421b8f0758dd532e3d..75cee8b1f9b7521ea5079981fe2d5ab6f98df624 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -1,56 +1,14 @@ package migrations import ( - "bytes" - "errors" - "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" - "github.com/ledgerwatch/turbo-geth/common/etl" - "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" - "github.com/ugorji/go/codec" ) -// migrations apply sequentially in order of this array, skips applied migrations -// it allows - don't worry about merge conflicts and use switch branches -// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports -// -// Idempotency is expected -// Best practices to achieve Idempotency: -// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name. -// Example: -// - SyncStageProgress = []byte("SSP1") -// + SyncStageProgressOld1 = []byte("SSP1") -// + SyncStageProgress = []byte("SSP2") -// - clear new bucket in the beginning of transaction, drop old bucket in the end (not defer!). -// Example: -// Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { -// if err := db.(ethdb.NonTransactional).ClearBuckets(dbutils.SyncStageProgress); err != nil { // clear new bucket -// return err -// } -// -// extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error { -// ... // migration logic -// } -// if err := etl.Transform(...); err != nil { -// return err -// } -// -// if err := db.(ethdb.NonTransactional).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { // clear old bucket -// return err -// } -// }, -// - if you need migrate multiple buckets - create separate migration for each bucket -// - write test for new transaction -var migrations = []Migration{ - stagesToUseNamedKeys, - unwindStagesToUseNamedKeys, -} - type Migration struct { Name string - Up func(db ethdb.Database, dataDir string, OnLoadCommit etl.LoadCommitHandler) error + Up func(db ethdb.Database, history, receipts, txIndex, preImages bool) error } func NewMigrator() *Migrator { @@ -63,92 +21,37 @@ type Migrator struct { Migrations []Migration } -func AppliedMigrations(db ethdb.Database, withPayload bool) (map[string][]byte, error) { - applied := map[string][]byte{} - err := db.Walk(dbutils.Migrations, nil, 0, func(k []byte, v []byte) (bool, error) { - if withPayload { - applied[string(common.CopyBytes(k))] = common.CopyBytes(v) - } else { - applied[string(common.CopyBytes(k))] = []byte{} - } - return true, nil - }) - return applied, err -} - -func (m *Migrator) Apply(db ethdb.Database, datadir string) error { +func (m *Migrator) Apply(db ethdb.Database, history, receipts, txIndex, preImages bool) error { if len(m.Migrations) == 0 { return nil } - applied, err := AppliedMigrations(db, false) - if err != nil { + lastApplied, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err != nil && err != ethdb.ErrKeyNotFound { return err } - for i := range m.Migrations { - v := m.Migrations[i] - if _, ok := applied[v.Name]; ok { - continue - } - log.Info("Apply migration", "name", v.Name) - if err := v.Up(db, datadir, func(putter ethdb.Putter, key []byte, isDone bool) error { - if !isDone { - return nil // don't save partial progress - } - stagesProgress, err := MarshalMigrationPayload(db) - if err != nil { - return err - } - err = db.Put(dbutils.Migrations, []byte(v.Name), stagesProgress) - if err != nil { - return err - } - return nil - }); err != nil { - return err + i := len(m.Migrations) - 1 + for ; i >= 0; i-- { + if m.Migrations[i].Name == string(lastApplied) { + break } - - log.Info("Applied migration", "name", v.Name) } - return nil -} - -func MarshalMigrationPayload(db ethdb.Getter) ([]byte, error) { - s := map[string][]byte{} - - buf := bytes.NewBuffer(nil) - encoder := codec.NewEncoder(buf, &codec.CborHandle{}) - for i := range stages.DBKeys { - v, err := db.Get(dbutils.SyncStageProgress, stages.DBKeys[i]) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { - return nil, err - } - if len(v) > 0 { - s[string(stages.DBKeys[i])] = common.CopyBytes(v) - } - - v, err = db.Get(dbutils.SyncStageUnwind, stages.DBKeys[i]) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { - return nil, err + m.Migrations = m.Migrations[i+1:] + for _, v := range m.Migrations { + log.Warn("Apply migration", "name", v.Name) + err := v.Up(db, history, receipts, txIndex, preImages) + if err != nil { + return err } - if len(v) > 0 { - s["unwind_"+string(stages.DBKeys[i])] = common.CopyBytes(v) + err = db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(v.Name)) + if err != nil { + return err } + log.Warn("Applied migration", "name", v.Name) } - - if err := encoder.Encode(s); err != nil { - return nil, err - } - return buf.Bytes(), nil + return nil } -func UnmarshalMigrationPayload(data []byte) (map[string][]byte, error) { - s := map[string][]byte{} - - if err := codec.NewDecoder(bytes.NewReader(data), &codec.CborHandle{}).Decode(&s); err != nil { - return nil, err - } - return s, nil -} +var migrations = []Migration{} diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index ea3c74a7229278a0c881758bbf42723ef950dccb..51f07db1888e6e331f366f2e9e0d9041ca65090c 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -1,154 +1,76 @@ package migrations import ( - "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" - "testing" - "github.com/ledgerwatch/turbo-geth/common/dbutils" - "github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/ethdb" - "github.com/stretchr/testify/require" + "testing" ) func TestApplyWithInit(t *testing.T) { - require, db := require.New(t), ethdb.NewMemDatabase() + db := ethdb.NewMemDatabase() migrations = []Migration{ { "one", - func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { + return nil }, }, { "two", - func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { + return nil }, }, } migrator := NewMigrator() migrator.Migrations = migrations - err := migrator.Apply(db, "") - require.NoError(err) - - applied, err := AppliedMigrations(db, false) - require.NoError(err) - - _, ok := applied[migrations[0].Name] - require.True(ok) - _, ok = applied[migrations[1].Name] - require.True(ok) - - // apply again - err = migrator.Apply(db, "") - require.NoError(err) - - applied2, err := AppliedMigrations(db, false) - require.NoError(err) - require.Equal(applied, applied2) + err := migrator.Apply(db, false, false, false, false) + if err != nil { + t.Fatal() + } + v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err != nil { + t.Fatal(err) + } + if string(v) != migrations[1].Name { + t.Fatal() + } } func TestApplyWithoutInit(t *testing.T) { - require, db := require.New(t), ethdb.NewMemDatabase() + db := ethdb.NewMemDatabase() migrations = []Migration{ { "one", - func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { t.Fatal("shouldn't been executed") return nil }, }, { "two", - func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) - }, - }, - } - err := db.Put(dbutils.Migrations, []byte(migrations[0].Name), []byte{1}) - require.NoError(err) - - migrator := NewMigrator() - migrator.Migrations = migrations - err = migrator.Apply(db, "") - require.NoError(err) - - applied, err := AppliedMigrations(db, false) - require.NoError(err) - - require.Equal(2, len(applied)) - _, ok := applied[migrations[1].Name] - require.True(ok) - _, ok = applied[migrations[0].Name] - require.True(ok) - - // apply again - err = migrator.Apply(db, "") - require.NoError(err) - - applied2, err := AppliedMigrations(db, false) - require.NoError(err) - require.Equal(applied, applied2) -} - -func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { - require, db := require.New(t), ethdb.NewMemDatabase() - migrations = []Migration{ - { - "one", - func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) - }, - }, - { - "two", - func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - t.Fatal("shouldn't been executed") + func(db ethdb.Database, history, receipts, txIndex, preImages bool) error { return nil }, }, } - err := db.Put(dbutils.Migrations, []byte(migrations[1].Name), []byte{1}) // apply non-first migration - require.NoError(err) + err := db.Put(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration, []byte(migrations[0].Name)) + if err != nil { + t.Fatal() + } migrator := NewMigrator() migrator.Migrations = migrations - err = migrator.Apply(db, "") - require.NoError(err) - - applied, err := AppliedMigrations(db, false) - require.NoError(err) - - require.Equal(2, len(applied)) - _, ok := applied[migrations[1].Name] - require.True(ok) - _, ok = applied[migrations[0].Name] - require.True(ok) - - // apply again - err = migrator.Apply(db, "") - require.NoError(err) - - applied2, err := AppliedMigrations(db, false) - require.NoError(err) - require.Equal(applied, applied2) -} - -func TestMarshalStages(t *testing.T) { - require, db := require.New(t), ethdb.NewMemDatabase() - - err := stages.SaveStageProgress(db, stages.Execution, 42, []byte{}) - require.NoError(err) - - data, err := MarshalMigrationPayload(db) - require.NoError(err) - - res, err := UnmarshalMigrationPayload(data) - require.NoError(err) - - require.Equal(1, len(res)) - v, ok := res[string(stages.DBKeys[stages.Execution])] - require.True(ok) - require.NotNil(v) + err = migrator.Apply(db, false, false, false, false) + if err != nil { + t.Fatal() + } + v, err := db.Get(dbutils.DatabaseInfoBucket, dbutils.LastAppliedMigration) + if err != nil { + t.Fatal(err) + } + if string(v) != migrations[1].Name { + t.Fatal() + } } diff --git a/migrations/stages_to_use_named_keys.go b/migrations/stages_to_use_named_keys.go deleted file mode 100644 index 4bac29943436d909220325594a7a2b360f905d60..0000000000000000000000000000000000000000 --- a/migrations/stages_to_use_named_keys.go +++ /dev/null @@ -1,85 +0,0 @@ -package migrations - -import ( - "github.com/ledgerwatch/turbo-geth/common/dbutils" - "github.com/ledgerwatch/turbo-geth/common/etl" - "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" - "github.com/ledgerwatch/turbo-geth/ethdb" -) - -var stagesToUseNamedKeys = Migration{ - Name: "stages_to_use_named_keys", - Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - if err := db.(ethdb.NonTransactional).ClearBuckets(dbutils.SyncStageProgress); err != nil { - return err - } - - extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error { - newKey, ok := stages.DBKeys[stages.SyncStage(k[0])] - if !ok { - return nil // nothing to do - } - // create new version of keys with same data - if err := next(k, newKey, v); err != nil { - return err - } - return nil - } - - if err := etl.Transform( - db, - dbutils.SyncStageProgressOld1, - dbutils.SyncStageProgress, - datadir, - extractFunc, - etl.IdentityLoadFunc, - etl.TransformArgs{OnLoadCommit: OnLoadCommit}, - ); err != nil { - return err - } - - if err := db.(ethdb.NonTransactional).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { - return err - } - return nil - }, -} - -var unwindStagesToUseNamedKeys = Migration{ - Name: "unwind_stages_to_use_named_keys", - Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error { - if err := db.(ethdb.NonTransactional).ClearBuckets(dbutils.SyncStageUnwind); err != nil { - return err - } - - extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error { - newKey, ok := stages.DBKeys[stages.SyncStage(k[0])] - if !ok { - return nil // nothing to do - - } - // create new version of keys with same data - if err := next(k, newKey, v); err != nil { - return err - } - return nil - } - - if err := etl.Transform( - db, - dbutils.SyncStageUnwindOld1, - dbutils.SyncStageUnwind, - datadir, - extractFunc, - etl.IdentityLoadFunc, - etl.TransformArgs{OnLoadCommit: OnLoadCommit}, - ); err != nil { - return err - } - - if err := db.(ethdb.NonTransactional).DropBuckets(dbutils.SyncStageUnwindOld1); err != nil { - return err - } - return nil - }, -} diff --git a/migrations/stages_to_use_named_keys_test.go b/migrations/stages_to_use_named_keys_test.go deleted file mode 100644 index cf4f9ede2d13ad2a0cd36fd24818e6125db16c78..0000000000000000000000000000000000000000 --- a/migrations/stages_to_use_named_keys_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package migrations - -import ( - "encoding/binary" - "errors" - "testing" - - "github.com/ledgerwatch/turbo-geth/common/dbutils" - "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" - "github.com/ledgerwatch/turbo-geth/ethdb" - "github.com/stretchr/testify/require" -) - -func TestSyncStagesToUseNamedKeys(t *testing.T) { - require, db := require.New(t), ethdb.NewMemDatabase() - - err := db.KV().CreateBuckets(dbutils.SyncStageProgressOld1) - require.NoError(err) - - // pretend that execution stage is at block 42 - err = db.Put(dbutils.SyncStageProgressOld1, []byte{byte(stages.Execution)}, dbutils.EncodeBlockNumber(42)) - require.NoError(err) - - migrator := NewMigrator() - migrator.Migrations = []Migration{stagesToUseNamedKeys} - err = migrator.Apply(db, "") - require.NoError(err) - - i := 0 - err = db.Walk(dbutils.SyncStageProgress, nil, 0, func(k, v []byte) (bool, error) { - i++ - return true, nil - }) - require.NoError(err) - require.Equal(1, i) - - _, err = db.Get(dbutils.SyncStageProgress, []byte{byte(stages.Execution)}) - require.True(errors.Is(err, ethdb.ErrKeyNotFound)) - - v, err := db.Get(dbutils.SyncStageProgress, stages.DBKeys[stages.Execution]) - require.NoError(err) - require.Equal(42, int(binary.BigEndian.Uint64(v))) -} - -func TestUnwindStagesToUseNamedKeys(t *testing.T) { - require, db := require.New(t), ethdb.NewMemDatabase() - - err := db.KV().CreateBuckets(dbutils.SyncStageUnwindOld1) - require.NoError(err) - - // pretend that execution stage is at block 42 - err = db.Put(dbutils.SyncStageUnwindOld1, []byte{byte(stages.Execution)}, dbutils.EncodeBlockNumber(42)) - require.NoError(err) - - migrator := NewMigrator() - migrator.Migrations = []Migration{unwindStagesToUseNamedKeys} - err = migrator.Apply(db, "") - require.NoError(err) - - i := 0 - err = db.Walk(dbutils.SyncStageUnwind, nil, 0, func(k, v []byte) (bool, error) { - i++ - return true, nil - }) - require.NoError(err) - require.Equal(1, i) - - _, err = db.Get(dbutils.SyncStageUnwind, []byte{byte(stages.Execution)}) - require.True(errors.Is(err, ethdb.ErrKeyNotFound)) - - v, err := db.Get(dbutils.SyncStageUnwind, stages.DBKeys[stages.Execution]) - require.NoError(err) - require.Equal(42, int(binary.BigEndian.Uint64(v))) -}