From 62d36ada39038e68fd6c46effd1196db4fc2914f Mon Sep 17 00:00:00 2001 From: andrew <andrey.ashikhmin@gmail.com> Date: Wed, 13 Nov 2019 17:43:26 +0100 Subject: [PATCH] BadgerDatabase Walk --- ethdb/badger_db.go | 67 ++++++++++++++++++++++++++++++++++-------- ethdb/database_test.go | 52 ++++++++++++++++++++++++++++++++ ethdb/interface.go | 9 ++++++ 3 files changed, 116 insertions(+), 12 deletions(-) diff --git a/ethdb/badger_db.go b/ethdb/badger_db.go index d09955c709..633ffcdd65 100644 --- a/ethdb/badger_db.go +++ b/ethdb/badger_db.go @@ -17,6 +17,8 @@ package ethdb import ( + "bytes" + "github.com/dgraph-io/badger" "github.com/ledgerwatch/turbo-geth/common/dbutils" @@ -65,6 +67,13 @@ func bucketKey(bucket, key []byte) []byte { return composite } +func keyWithoutBucket(key, bucket []byte) []byte { + if len(key) <= len(bucket) || !bytes.HasPrefix(key, bucket) || key[len(bucket)] != bucketSeparator { + return nil + } + return key[len(bucket)+1:] +} + // Delete removes a single entry. func (db *BadgerDatabase) Delete(bucket, key []byte) error { return db.db.Update(func(txn *badger.Txn) error { @@ -99,14 +108,9 @@ func (db *BadgerDatabase) Get(bucket, key []byte) ([]byte, error) { // PutS adds a new entry to the historical buckets: // hBucket (unless changeSetBucketOnly) and ChangeSet. func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, changeSetBucketOnly bool) error { - composite, encodedStamp := dbutils.CompositeKeySuffix(key, timestamp) + composite, encodedTS := dbutils.CompositeKeySuffix(key, timestamp) hKey := bucketKey(hBucket, composite) - - suffixkey := make([]byte, len(encodedStamp)+len(hBucket)) - copy(suffixkey, encodedStamp) - copy(suffixkey[len(encodedStamp):], hBucket) - - changeSetKey := bucketKey(dbutils.ChangeSetBucket, suffixkey) + changeSetKey := bucketKey(dbutils.ChangeSetBucket, dbutils.CompositeChangeSetKey(encodedTS, hBucket)) return db.db.Update(func(tx *badger.Txn) error { if !changeSetBucketOnly { @@ -149,8 +153,8 @@ func (db *BadgerDatabase) PutS(hBucket, key, value []byte, timestamp uint64, cha // DeleteTimestamp removes data for a given timestamp from all historical buckets (incl. ChangeSet). func (db *BadgerDatabase) DeleteTimestamp(timestamp uint64) error { - encodedStamp := dbutils.EncodeTimestamp(timestamp) - prefix := bucketKey(dbutils.ChangeSetBucket, encodedStamp) + encodedTS := dbutils.EncodeTimestamp(timestamp) + prefix := bucketKey(dbutils.ChangeSetBucket, encodedTS) return db.db.Update(func(tx *badger.Txn) error { var keys [][]byte it := tx.NewIterator(badger.DefaultIteratorOptions) @@ -169,9 +173,9 @@ func (db *BadgerDatabase) DeleteTimestamp(timestamp uint64) error { return err } - bucket := k[len(encodedStamp):] + bucket := k[len(prefix):] err = changedAccounts.Walk(func(kk, _ []byte) error { - kk = append(kk, encodedStamp...) + kk = append(kk, encodedTS...) return tx.Delete(bucketKey(bucket, kk)) }) if err != nil { @@ -181,7 +185,7 @@ func (db *BadgerDatabase) DeleteTimestamp(timestamp uint64) error { keys = append(keys, k) } for _, k := range keys { - if err := tx.Delete(bucketKey(dbutils.ChangeSetBucket, k)); err != nil { + if err := tx.Delete(k); err != nil { return err } } @@ -237,4 +241,43 @@ func (db *BadgerDatabase) Has(bucket, key []byte) (bool, error) { return err == nil, err } +// Walk iterates over entries with keys greater or equals to startkey. +// Only the keys whose first fixedbits match those of startkey are iterated over. +// walker is called for each eligible entry. +// If walker returns false or an error, the walk stops. +func (db *BadgerDatabase) Walk(bucket, startkey []byte, fixedbits uint, walker func(k, v []byte) (bool, error)) error { + fixedbytes, mask := bytesmask(fixedbits) + prefix := bucketKey(bucket, startkey) + err := db.db.View(func(tx *badger.Txn) error { + it := tx.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Seek(prefix); ; it.Next() { + item := it.Item() + k := keyWithoutBucket(item.Key(), bucket) + if k == nil { + break + } + + goOn := fixedbits == 0 || bytes.Equal(k[:fixedbytes-1], startkey[:fixedbytes-1]) && (k[fixedbytes-1]&mask) == (startkey[fixedbytes-1]&mask) + if !goOn { + break + } + + err := item.Value(func(v []byte) error { + var err2 error + goOn, err2 = walker(k, v) + return err2 + }) + if err != nil { + return err + } + if !goOn { + break + } + } + return nil + }) + return err +} + // TODO [Andrew] implement the full Database interface diff --git a/ethdb/database_test.go b/ethdb/database_test.go index eeaf71c21a..070c64043c 100644 --- a/ethdb/database_test.go +++ b/ethdb/database_test.go @@ -27,6 +27,9 @@ import ( "strconv" "sync" "testing" + + "github.com/ledgerwatch/turbo-geth/common" + "github.com/stretchr/testify/assert" ) func newTestBoltDB() (*BoltDatabase, func()) { @@ -239,3 +242,52 @@ func testParallelPutGet(db MinDatabase) { } pending.Wait() } + +func TestMemoryDB_Walk(t *testing.T) { + testWalk(NewMemDatabase(), t) +} + +func TestBoltDB_Walk(t *testing.T) { + db, remove := newTestBoltDB() + defer remove() + testWalk(db, t) +} + +func TestBadgerDB_Walk(t *testing.T) { + db, remove := newTestBadgerDB() + defer remove() + testWalk(db, t) +} + +var hexEntries = map[string]string{ + "6b": "89c6", + "91": "c476", + "a8": "0a514e", + "bb": "7a", + "bd": "fe76", + "c0": "12", +} + +var startKey = common.FromHex("a0") +var fixedBits uint = 3 + +var keysInRange = [][]byte{common.FromHex("a8"), common.FromHex("bb"), common.FromHex("bd")} + +func testWalk(db MinDatabase, t *testing.T) { + for k, v := range hexEntries { + err := db.Put(bucket, common.FromHex(k), common.FromHex(v)) + if err != nil { + t.Fatalf("put failed: %v", err) + } + } + + var gotKeys [][]byte + + err := db.Walk(bucket, startKey, fixedBits, func(key, val []byte) (bool, error) { + gotKeys = append(gotKeys, key) + return true, nil + }) + assert.NoError(t, err) + + assert.Equal(t, keysInRange, gotKeys) +} diff --git a/ethdb/interface.go b/ethdb/interface.go index 4b552f7eb5..8f5c2e8a44 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -20,6 +20,9 @@ import ( "errors" ) +// TODO [Andrew] Add some comments about historical buckets & ChangeSet. +// https://github.com/AlexeyAkhunov/papers/blob/master/TurboGeth-Devcon4.pdf + // ErrKeyNotFound is returned when key isn't found in the database. var ErrKeyNotFound = errors.New("db: key not found") @@ -53,6 +56,10 @@ type Getter interface { // Has indicates whether a key exists in the database. Has(bucket, key []byte) (bool, error) + // Walk iterates over entries with keys greater or equal to startkey. + // Only the keys whose first fixedbits match those of startkey are iterated over. + // walker is called for each eligible entry. + // If walker returns false or an error, the walk stops. Walk(bucket, startkey []byte, fixedbits uint, walker func([]byte, []byte) (bool, error)) error MultiWalk(bucket []byte, startkeys [][]byte, fixedbits []uint, walker func(int, []byte, []byte) (bool, error)) error WalkAsOf(bucket, hBucket, startkey []byte, fixedbits uint, timestamp uint64, walker func([]byte, []byte) (bool, error)) error @@ -92,6 +99,8 @@ type MinDatabase interface { Get(bucket, key []byte) ([]byte, error) Put(bucket, key, value []byte) error Delete(bucket, key []byte) error + + Walk(bucket, startkey []byte, fixedbits uint, walker func([]byte, []byte) (bool, error)) error } // DbWithPendingMutations is an extended version of the Database, -- GitLab