diff --git a/common/changeset/storage_changeset_test.go b/common/changeset/storage_changeset_test.go index e3662787733b15dc42b8dfd908cbe13dd7e1a070..9a4579e4ca14ad3e6c525428af382274f4f135a7 100644 --- a/common/changeset/storage_changeset_test.go +++ b/common/changeset/storage_changeset_test.go @@ -403,9 +403,9 @@ func doTestFind( } c := tx.Cursor(bucket) - defer c.Close() + err := encodeFunc(1, ch, func(k, v []byte) error { - if err2 := c.Append(common.CopyBytes(k), common.CopyBytes(v)); err2 != nil { + if err2 := c.Put(common.CopyBytes(k), common.CopyBytes(v)); err2 != nil { return err2 } return nil @@ -555,7 +555,7 @@ func TestMultipleIncarnationsOfTheSameContract(t *testing.T) { assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractC, 5, key4), val4)) assert.NoError(t, EncodeStoragePlain(1, ch, func(k, v []byte) error { - return c.Append(k, v) + return c.Put(k, v) })) data1, err1 := cs.FindWithIncarnation(1, dbutils.PlainGenerateCompositeStorageKey(contractA, 2, key1)) diff --git a/common/etl/collector.go b/common/etl/collector.go index 38b611f50e12e04e17474bf528396f696fb22793..1c61fde90a4a155b855606e4c3c14cb7c11fc151 100644 --- a/common/etl/collector.go +++ b/common/etl/collector.go @@ -13,6 +13,7 @@ import ( "time" "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ugorji/go/codec" @@ -170,9 +171,11 @@ func loadFilesIntoBucket(logPrefix string, db ethdb.Database, bucket string, pro } } var canUseAppend bool + isDupSort := dbutils.BucketsConfigs[bucket].Flags&dbutils.DupSort != 0 && !dbutils.BucketsConfigs[bucket].AutoDupSortKeysConversion logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() + var pervK []byte i := 0 loadNextFunc := func(originalK, k, v []byte) error { @@ -207,9 +210,23 @@ func loadFilesIntoBucket(logPrefix string, db ethdb.Database, bucket string, pro return nil } if canUseAppend { - if err := tx.(*ethdb.TxDb).Append(bucket, k, v); err != nil { - return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err) + if isDupSort { + if bytes.Equal(k, pervK) { + if err := tx.AppendDup(bucket, k, v); err != nil { + return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err) + } + } else { + if err := tx.Append(bucket, k, v); err != nil { + return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err) + } + } + pervK = k + } else { + if err := tx.Append(bucket, k, v); err != nil { + return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err) + } } + return nil } if err := tx.Put(bucket, k, v); err != nil { diff --git a/core/generate_index_test.go b/core/generate_index_test.go index 0c9031c24bcc93acf90f2a87483daea3ae30a7fe..351becfa7c06d816f5d3ef9f25ef767928d5cca2 100644 --- a/core/generate_index_test.go +++ b/core/generate_index_test.go @@ -222,7 +222,7 @@ func generateTestData(t *testing.T, db ethdb.Database, csBucket string, numOfBlo expected3[len(expected3)-1] = expected3[len(expected3)-1].Append(uint64(i), false) } err = csInfo.Encode(uint64(i), cs, func(k, v []byte) error { - return db.Append(csBucket, k, v) + return db.Put(csBucket, k, v) }) if err != nil { t.Fatal(err) diff --git a/core/state/db_state_writer.go b/core/state/db_state_writer.go index 3e1ab8b56c772d420f22809ac6c77260df224b62..8f8aa8a79b75a6c74e89a85f09b1f7a9d1762776 100644 --- a/core/state/db_state_writer.go +++ b/core/state/db_state_writer.go @@ -1,6 +1,7 @@ package state import ( + "bytes" "context" "encoding/binary" "fmt" @@ -208,11 +209,23 @@ func (dsw *DbStateWriter) WriteChangeSets() error { if err != nil { return err } + var prevK []byte if err = changeset.Mapper[dbutils.AccountChangeSetBucket].Encode(dsw.blockNr, accountChanges, func(k, v []byte) error { - return dsw.db.Append(dbutils.AccountChangeSetBucket, k, v) + if bytes.Equal(k, prevK) { + if err = dsw.db.AppendDup(dbutils.AccountChangeSetBucket, k, v); err != nil { + return err + } + } else { + if err = dsw.db.Append(dbutils.AccountChangeSetBucket, k, v); err != nil { + return err + } + } + prevK = k + return nil }); err != nil { return err } + prevK = nil storageChanges, err := dsw.csw.GetStorageChanges() if err != nil { @@ -222,7 +235,17 @@ func (dsw *DbStateWriter) WriteChangeSets() error { return nil } if err = changeset.Mapper[dbutils.StorageChangeSetBucket].Encode(dsw.blockNr, storageChanges, func(k, v []byte) error { - return dsw.db.Append(dbutils.StorageChangeSetBucket, k, v) + if bytes.Equal(k, prevK) { + if err = dsw.db.AppendDup(dbutils.StorageChangeSetBucket, k, v); err != nil { + return err + } + } else { + if err = dsw.db.Append(dbutils.StorageChangeSetBucket, k, v); err != nil { + return err + } + } + prevK = k + return nil }); err != nil { return err } diff --git a/core/state/plain_state_writer.go b/core/state/plain_state_writer.go index 181139aeb150c333346ece1278bbabdcb2da1c51..00f96aa42d8473aa1a66cc17323957a2d22e8d6e 100644 --- a/core/state/plain_state_writer.go +++ b/core/state/plain_state_writer.go @@ -1,6 +1,7 @@ package state import ( + "bytes" "context" "encoding/binary" @@ -152,11 +153,23 @@ func (w *PlainStateWriter) WriteChangeSets() error { if err != nil { return err } + var prevK []byte if err = changeset.Mapper[dbutils.PlainAccountChangeSetBucket].Encode(w.blockNumber, accountChanges, func(k, v []byte) error { - return db.Append(dbutils.PlainAccountChangeSetBucket, k, v) + if bytes.Equal(k, prevK) { + if err = db.AppendDup(dbutils.PlainAccountChangeSetBucket, k, v); err != nil { + return err + } + } else { + if err = db.Append(dbutils.PlainAccountChangeSetBucket, k, v); err != nil { + return err + } + } + prevK = k + return nil }); err != nil { return err } + prevK = nil storageChanges, err := w.csw.GetStorageChanges() if err != nil { @@ -166,7 +179,17 @@ func (w *PlainStateWriter) WriteChangeSets() error { return nil } if err = changeset.Mapper[dbutils.PlainStorageChangeSetBucket].Encode(w.blockNumber, storageChanges, func(k, v []byte) error { - return db.Append(dbutils.PlainStorageChangeSetBucket, k, v) + if bytes.Equal(k, prevK) { + if err = db.AppendDup(dbutils.PlainStorageChangeSetBucket, k, v); err != nil { + return err + } + } else { + if err = db.Append(dbutils.PlainStorageChangeSetBucket, k, v); err != nil { + return err + } + } + prevK = k + return nil }); err != nil { return err } diff --git a/ethdb/interface.go b/ethdb/interface.go index c68ab9caf1fe9ac80d6aa4e6ad98bcc67057546c..81b79899452b2cdd2acf98c19746d5b1fce5af22 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -104,6 +104,7 @@ type Database interface { Ancients() (uint64, error) TruncateAncients(items uint64) error Append(bucket string, key, value []byte) error + AppendDup(bucket string, key, value []byte) error Sequence(bucket string, amount uint64) (uint64, error) } diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index b1c5582dda18f685184bddb13d411cdb7465908e..0a1d91045b4b725b710dd356de091463f477fed8 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -1526,6 +1526,19 @@ func (c *LmdbDupSortCursor) LastDup(k []byte) ([]byte, error) { return v, nil } +func (c *LmdbDupSortCursor) Append(k []byte, v []byte) error { + if c.c == nil { + if err := c.initCursor(); err != nil { + return err + } + } + + if err := c.c.Put(k, v, lmdb.Append|lmdb.AppendDup); err != nil { + return fmt.Errorf("in Append: %w", err) + } + return nil +} + func (c *LmdbDupSortCursor) AppendDup(k []byte, v []byte) error { if c.c == nil { if err := c.initCursor(); err != nil { diff --git a/ethdb/kv_mdbx.go b/ethdb/kv_mdbx.go index 4f8e63d84a248e21b3baeafcc933f262cebbfb4d..36dc9874d5839be98e310bcb40d92254ac9e634f 100644 --- a/ethdb/kv_mdbx.go +++ b/ethdb/kv_mdbx.go @@ -1486,6 +1486,19 @@ func (c *MdbxDupSortCursor) LastDup(k []byte) ([]byte, error) { return v, nil } +func (c *MdbxDupSortCursor) Append(k []byte, v []byte) error { + if c.c == nil { + if err := c.initCursor(); err != nil { + return err + } + } + + if err := c.c.Put(k, v, mdbx.Append|mdbx.AppendDup); err != nil { + return fmt.Errorf("in Append: %w", err) + } + return nil +} + func (c *MdbxDupSortCursor) AppendDup(k []byte, v []byte) error { if c.c == nil { if err := c.initCursor(); err != nil { diff --git a/ethdb/mutation.go b/ethdb/mutation.go index 2e904b2599104ed731879980a78bbd714b6cb895..15c526d8675f6284a449689887f1c2e103e5cc5a 100644 --- a/ethdb/mutation.go +++ b/ethdb/mutation.go @@ -144,6 +144,10 @@ func (m *mutation) Append(table string, key []byte, value []byte) error { return m.Put(table, key, value) } +func (m *mutation) AppendDup(table string, key []byte, value []byte) error { + return m.Put(table, key, value) +} + func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) { m.mu.Lock() defer m.mu.Unlock() diff --git a/ethdb/object_db.go b/ethdb/object_db.go index 14544de717ba5d40cd5e06bd2ad89fa7214e5ee9..6d2b940190b646518e1b293de115fc29489c30de 100644 --- a/ethdb/object_db.go +++ b/ethdb/object_db.go @@ -108,6 +108,14 @@ func (db *ObjectDatabase) Append(bucket string, key []byte, value []byte) error return err } +// AppendDup appends a single entry to the end of the bucket. +func (db *ObjectDatabase) AppendDup(bucket string, key []byte, value []byte) error { + err := db.kv.Update(context.Background(), func(tx Tx) error { + return tx.CursorDupSort(bucket).AppendDup(key, value) + }) + return err +} + // MultiPut - requirements: input must be sorted and without duplicates func (db *ObjectDatabase) MultiPut(tuples ...[]byte) (uint64, error) { err := db.kv.Update(context.Background(), func(tx Tx) error { diff --git a/ethdb/tx_db.go b/ethdb/tx_db.go index 07a8ee11acabbab7959614c0ffcd6b738253eef4..4bdfbf72734a43418e66bb7a722468bfa54012fd 100644 --- a/ethdb/tx_db.go +++ b/ethdb/tx_db.go @@ -71,12 +71,12 @@ func (m *TxDb) Reserve(bucket string, key []byte, i int) ([]byte, error) { func (m *TxDb) Append(bucket string, key []byte, value []byte) error { m.len += uint64(len(key) + len(value)) - switch c := m.cursors[bucket].(type) { - case CursorDupSort: - return c.AppendDup(key, value) - default: - return c.Append(key, value) - } + return m.cursors[bucket].Append(key, value) +} + +func (m *TxDb) AppendDup(bucket string, key []byte, value []byte) error { + m.len += uint64(len(key) + len(value)) + return m.cursors[bucket].(CursorDupSort).AppendDup(key, value) } func (m *TxDb) Delete(bucket string, k, v []byte) error {