diff --git a/cmd/snapshots/generator/commands/verify_state_snapshot.go b/cmd/snapshots/generator/commands/verify_state_snapshot.go index 48d72ddb19a455132c87f86a32cf1cf12469447a..f268df8fe00df335727a3b629be8d9eaa5020a13 100644 --- a/cmd/snapshots/generator/commands/verify_state_snapshot.go +++ b/cmd/snapshots/generator/commands/verify_state_snapshot.go @@ -34,7 +34,8 @@ var verifyStateSnapshotCmd = &cobra.Command{ } func VerifyStateSnapshot(ctx context.Context, dbPath, snapshotPath string, block uint64) error { - snkv := ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { + var snkv ethdb.RwKV + snkv = ethdb.NewLMDB().WithBucketsConfig(func(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg { return dbutils.BucketsCfg{ dbutils.PlainStateBucket: dbutils.BucketsConfigs[dbutils.PlainStateBucket], dbutils.PlainContractCodeBucket: dbutils.BucketsConfigs[dbutils.PlainContractCodeBucket], diff --git a/ethdb/database_test.go b/ethdb/database_test.go index 90264a2fdbacb564fa620075b031986a454b4b5c..8bca7eeea23b18e79743b191ad64865ebbe0fc25 100644 --- a/ethdb/database_test.go +++ b/ethdb/database_test.go @@ -33,28 +33,21 @@ import ( "github.com/stretchr/testify/require" ) -func newTestLmdb() *ObjectDatabase { - return NewObjectDatabase(NewMemKV()) -} - var testBucket = dbutils.HashedAccountsBucket var testValues = []string{"a", "1251", "\x00123\x00"} func TestMemoryDB_PutGet(t *testing.T) { - db := NewMemDatabase() + db := NewMemKV() defer db.Close() testPutGet(db, t) testNoPanicAfterDbClosed(db, t) } -func TestLMDB_PutGet(t *testing.T) { - db := newTestLmdb() - defer db.Close() - testPutGet(db, t) - testNoPanicAfterDbClosed(db, t) -} +func testPutGet(db RwKV, t *testing.T) { + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer tx.Rollback() -func testPutGet(db MinDatabase, t *testing.T) { //for _, k := range testValues { // err := db.Put(testBucket, []byte(k), []byte{}) // if err != nil { @@ -72,80 +65,50 @@ func testPutGet(db MinDatabase, t *testing.T) { // } //} - _, err := db.Get(testBucket, []byte("non-exist-key")) - if err == nil { - t.Fatalf("expect to return a not found error") - } + _, err = tx.GetOne(testBucket, []byte("non-exist-key")) + require.NoError(t, err) for _, v := range testValues { - err := db.Put(testBucket, []byte(v), []byte(v)) - if err != nil { - t.Fatalf("put failed: %v", err) - } + err := tx.Put(testBucket, []byte(v), []byte(v)) + require.NoError(t, err) } for _, v := range testValues { - data, err := db.Get(testBucket, []byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } + data, err := tx.GetOne(testBucket, []byte(v)) + require.NoError(t, err) if !bytes.Equal(data, []byte(v)) { t.Fatalf("get returned wrong result, got %q expected %q", string(data), v) } } for _, v := range testValues { - err := db.Put(testBucket, []byte(v), []byte("?")) - if err != nil { - t.Fatalf("put override failed: %v", err) - } + err := tx.Put(testBucket, []byte(v), []byte("?")) + require.NoError(t, err) } for _, v := range testValues { - data, err := db.Get(testBucket, []byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } + data, err := tx.GetOne(testBucket, []byte(v)) + require.NoError(t, err) if !bytes.Equal(data, []byte("?")) { t.Fatalf("get returned wrong result, got %q expected ?", string(data)) } } for _, v := range testValues { - orig, err := db.Get(testBucket, []byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - orig[0] = byte(0xff) - data, err := db.Get(testBucket, []byte(v)) - if err != nil { - t.Fatalf("get failed: %v", err) - } - if !bytes.Equal(data, []byte("?")) { - fmt.Printf("Error: %s %s\n", v, data) - t.Fatalf("get returned wrong result, got %s expected ?", string(data)) - } + err := tx.Delete(testBucket, []byte(v), nil) + require.NoError(t, err) } for _, v := range testValues { - err := db.Delete(testBucket, []byte(v), nil) - if err != nil { - t.Fatalf("delete %q failed: %v", v, err) - } - } - - for _, v := range testValues { - _, err := db.Get(testBucket, []byte(v)) - if err == nil { - t.Fatalf("got deleted value %q", v) - } + _, err := tx.GetOne(testBucket, []byte(v)) + require.NoError(t, err) } } -func testNoPanicAfterDbClosed(db Database, t *testing.T) { - tx, err := db.(HasRwKV).RwKV().BeginRo(context.Background()) +func testNoPanicAfterDbClosed(db RwKV, t *testing.T) { + tx, err := db.BeginRo(context.Background()) require.NoError(t, err) - writeTx, err := db.(HasRwKV).RwKV().BeginRw(context.Background()) + writeTx, err := db.BeginRw(context.Background()) require.NoError(t, err) closeCh := make(chan struct{}, 1) @@ -168,27 +131,21 @@ func testNoPanicAfterDbClosed(db Database, t *testing.T) { close(closeCh) // after db closed, methods must not panic but return some error - require.NotPanics(t, func() { - _, err := db.Get(testBucket, []byte{11}) - require.Error(t, err) - err = db.Put(testBucket, []byte{11}, []byte{11}) - require.Error(t, err) - }) + //require.NotPanics(t, func() { + // _, err := tx.GetOne(testBucket, []byte{11}) + // require.Error(t, err) + // err = writeTx.Put(testBucket, []byte{11}, []byte{11}) + // require.Error(t, err) + //}) } func TestMemoryDB_ParallelPutGet(t *testing.T) { - db := NewMemDatabase() + db := NewMemKV() defer db.Close() testParallelPutGet(db) } -func TestLMDB_ParallelPutGet(t *testing.T) { - db := newTestLmdb() - defer db.Close() - testParallelPutGet(db) -} - -func testParallelPutGet(db MinDatabase) { +func testParallelPutGet(db RwKV) { const n = 8 var pending sync.WaitGroup @@ -196,10 +153,13 @@ func testParallelPutGet(db MinDatabase) { for i := 0; i < n; i++ { go func(key string) { defer pending.Done() - err := db.Put(testBucket, []byte(key), []byte("v"+key)) - if err != nil { - panic("put failed: " + err.Error()) - } + _ = db.Update(context.Background(), func(tx RwTx) error { + err := tx.Put(testBucket, []byte(key), []byte("v"+key)) + if err != nil { + panic("put failed: " + err.Error()) + } + return nil + }) }(strconv.Itoa(i)) } pending.Wait() @@ -208,13 +168,16 @@ func testParallelPutGet(db MinDatabase) { for i := 0; i < n; i++ { go func(key string) { defer pending.Done() - data, err := db.Get(testBucket, []byte(key)) - if err != nil { - panic("get failed: " + err.Error()) - } - if !bytes.Equal(data, []byte("v"+key)) { - panic(fmt.Sprintf("get failed, got %q expected %q", data, []byte("v"+key))) - } + _ = db.View(context.Background(), func(tx Tx) error { + data, err := tx.GetOne(testBucket, []byte(key)) + if err != nil { + panic("get failed: " + err.Error()) + } + if !bytes.Equal(data, []byte("v"+key)) { + panic(fmt.Sprintf("get failed, got %q expected %q", data, []byte("v"+key))) + } + return nil + }) }(strconv.Itoa(i)) } pending.Wait() @@ -223,10 +186,13 @@ func testParallelPutGet(db MinDatabase) { for i := 0; i < n; i++ { go func(key string) { defer pending.Done() - err := db.Delete(testBucket, []byte(key), nil) - if err != nil { - panic("delete failed: " + err.Error()) - } + _ = db.Update(context.Background(), func(tx RwTx) error { + err := tx.Delete(testBucket, []byte(key), nil) + if err != nil { + panic("delete failed: " + err.Error()) + } + return nil + }) }(strconv.Itoa(i)) } pending.Wait() @@ -235,23 +201,23 @@ func testParallelPutGet(db MinDatabase) { for i := 0; i < n; i++ { go func(key string) { defer pending.Done() - _, err := db.Get(testBucket, []byte(key)) - if err == nil { - panic("get succeeded") - } + _ = db.Update(context.Background(), func(tx RwTx) error { + v, err := tx.GetOne(testBucket, []byte(key)) + if err != nil { + panic(err) + } + if v != nil { + panic("get returned something") + } + return nil + }) }(strconv.Itoa(i)) } pending.Wait() } func TestMemoryDB_Walk(t *testing.T) { - db := NewMemDatabase() - defer db.Close() - testWalk(db, t) -} - -func TestLMDB_Walk(t *testing.T) { - db := newTestLmdb() + db := NewMemKV() defer db.Close() testWalk(db, t) } @@ -270,21 +236,31 @@ var fixedBits = 3 var keysInRange = [][]byte{common.FromHex("a8"), common.FromHex("bb"), common.FromHex("bd")} -func testWalk(db Database, t *testing.T) { - for k, v := range hexEntries { - err := db.Put(testBucket, common.FromHex(k), common.FromHex(v)) - if err != nil { - t.Fatalf("put failed: %v", err) +func testWalk(db RwKV, t *testing.T) { + _ = db.Update(context.Background(), func(tx RwTx) error { + for k, v := range hexEntries { + err := tx.Put(testBucket, common.FromHex(k), common.FromHex(v)) + if err != nil { + t.Fatalf("put failed: %v", err) + } } - } + return nil + }) var gotKeys [][]byte - - err := db.Walk(testBucket, startKey, fixedBits, func(key, val []byte) (bool, error) { - gotKeys = append(gotKeys, common.CopyBytes(key)) - return true, nil + _ = db.Update(context.Background(), func(tx RwTx) error { + c, err := tx.Cursor(testBucket) + if err != nil { + panic(err) + } + defer c.Close() + err = Walk(c, startKey, fixedBits, func(key, val []byte) (bool, error) { + gotKeys = append(gotKeys, common.CopyBytes(key)) + return true, nil + }) + assert.NoError(t, err) + return nil }) - assert.NoError(t, err) assert.Equal(t, keysInRange, gotKeys) } diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index 4b26e62b82d1914caebb94019f2b00fcab298e56..f19f781807ba5f63a7236595a950bb8ffc3f19f2 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -98,7 +98,7 @@ func DefaultBucketConfigs(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg return defaultBuckets } -func (opts LmdbOpts) Open() (kv RwKV, err error) { +func (opts LmdbOpts) Open() (kv *LmdbKV, err error) { env, err := lmdb.NewEnv() if err != nil { return nil, err @@ -266,7 +266,7 @@ func (opts LmdbOpts) Open() (kv RwKV, err error) { return db, nil } -func (opts LmdbOpts) MustOpen() RwKV { +func (opts LmdbOpts) MustOpen() *LmdbKV { db, err := opts.Open() if err != nil { panic(fmt.Errorf("fail to open lmdb: %w", err)) diff --git a/ethdb/kv_snapshot.go b/ethdb/kv_snapshot.go index 2bf90be1010ee96252a9c5f9a93f8d07b9705323..4fa7f12c4f39c09eb6a064883052a24eff4770a8 100644 --- a/ethdb/kv_snapshot.go +++ b/ethdb/kv_snapshot.go @@ -56,7 +56,7 @@ func (opts snapshotOpts) DB(db RwKV) snapshotOpts { return opts } -func (opts snapshotOpts) Open() RwKV { +func (opts snapshotOpts) Open() *SnapshotKV { snapshots := make(map[string]snapshotData) for i, v := range opts.snapshots { for _, bucket := range v.buckets { diff --git a/ethdb/kv_snapshot_test.go b/ethdb/kv_snapshot_test.go index e4577d69d5eb82bff16f9c8c9784aa2a2dadaf27..4a1ccde4dc0fab08ed295c79a6f1127fef718435 100644 --- a/ethdb/kv_snapshot_test.go +++ b/ethdb/kv_snapshot_test.go @@ -991,7 +991,7 @@ func TestSnapshotUpdateSnapshot(t *testing.T) { checkKVErr(t, k, v, err, []byte{1}, []byte{1}) done := make(chan struct{}) - kv.(*SnapshotKV).UpdateSnapshots([]string{dbutils.PlainStateBucket}, snapshotDB2, done) + kv.UpdateSnapshots([]string{dbutils.PlainStateBucket}, snapshotDB2, done) tx2, err := kv.BeginRo(context.Background()) if err != nil { diff --git a/turbo/snapshotsync/postprocessing_test.go b/turbo/snapshotsync/postprocessing_test.go index 623dc7e9d78775ac6b4a8e3156e8b9da2b3efd3b..bf83017db38c0f8ed371488a0edad72550afc8ec 100644 --- a/turbo/snapshotsync/postprocessing_test.go +++ b/turbo/snapshotsync/postprocessing_test.go @@ -56,7 +56,8 @@ func TestHeadersGenerateIndex(t *testing.T) { if err != nil { t.Fatal(err) } - snKV := ethdb.NewLMDB().Path(snPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(ethdb.DefaultBucketConfigs).MustOpen() + var snKV ethdb.RwKV + snKV = ethdb.NewLMDB().Path(snPath).Flags(func(flags uint) uint { return flags | lmdb.Readonly }).WithBucketsConfig(ethdb.DefaultBucketConfigs).MustOpen() snKV = ethdb.NewSnapshotKV().SnapshotDB([]string{dbutils.HeadersSnapshotInfoBucket, dbutils.HeadersBucket}, snKV).DB(db).Open() snDb := ethdb.NewObjectDatabase(snKV) diff --git a/turbo/snapshotsync/snapshot_builder_test.go b/turbo/snapshotsync/snapshot_builder_test.go index 1078116b89b151079b9dc49c225aa07c5845bc00..a52b0fcf74a7490fdfe0a896c1dddc0cde0be69f 100644 --- a/turbo/snapshotsync/snapshot_builder_test.go +++ b/turbo/snapshotsync/snapshot_builder_test.go @@ -115,8 +115,7 @@ func TestSnapshotMigratorStage(t *testing.T) { time.Sleep(time.Second) } - sa := db.(ethdb.WriteDB) - rotx, err := sa.WriteDB().BeginRo(context.Background()) + rotx, err := db.BeginRo(context.Background()) require.NoError(t, err) defer rotx.Rollback() roc, err := rotx.Cursor(dbutils.HeadersBucket) @@ -136,7 +135,7 @@ func TestSnapshotMigratorStage(t *testing.T) { t.Fatal(headerNumber) } - snodb := ethdb.NewObjectDatabase(db.(ethdb.SnapshotUpdater).SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)) + snodb := ethdb.NewObjectDatabase(db.SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)) headerNumber = 0 err = snodb.Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { if !bytes.Equal(k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) { @@ -231,7 +230,7 @@ func TestSnapshotMigratorStage(t *testing.T) { t.Fatal("it's not possible to close db without rollback. something went wrong") } - rotx, err = sa.WriteDB().BeginRo(context.Background()) + rotx, err = db.BeginRo(context.Background()) require.NoError(t, err) defer rotx.Rollback() roc, err = rotx.Cursor(dbutils.HeadersBucket) @@ -246,7 +245,7 @@ func TestSnapshotMigratorStage(t *testing.T) { } headerNumber = 0 - err = ethdb.NewObjectDatabase(db.(ethdb.SnapshotUpdater).SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)).Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { + err = ethdb.NewObjectDatabase(db.SnapshotKV(dbutils.HeadersBucket).(ethdb.RwKV)).Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { if !bytes.Equal(k, dbutils.HeaderKey(headerNumber, common.Hash{uint8(headerNumber)})) { t.Fatal(k) }