From 342c5fee89a63f2b8c0a11529e4b5bb5c619729f Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Sat, 28 Nov 2020 21:24:47 +0700
Subject: [PATCH] [to discuss] More compact append implementation (#1372)

* more compact append implementation

* do appned for dupsort buckets

* do appned for dupsort buckets

* do appned for dupsort buckets

* do appned for dupsort buckets

* do appned for dupsort buckets

* fix tests

* fix tests
---
 common/changeset/storage_changeset_test.go |  6 ++---
 common/etl/collector.go                    | 21 +++++++++++++++--
 core/generate_index_test.go                |  2 +-
 core/state/db_state_writer.go              | 27 ++++++++++++++++++++--
 core/state/plain_state_writer.go           | 27 ++++++++++++++++++++--
 ethdb/interface.go                         |  1 +
 ethdb/kv_lmdb.go                           | 13 +++++++++++
 ethdb/kv_mdbx.go                           | 13 +++++++++++
 ethdb/mutation.go                          |  4 ++++
 ethdb/object_db.go                         |  8 +++++++
 ethdb/tx_db.go                             | 12 +++++-----
 11 files changed, 118 insertions(+), 16 deletions(-)

diff --git a/common/changeset/storage_changeset_test.go b/common/changeset/storage_changeset_test.go
index e366278773..9a4579e4ca 100644
--- a/common/changeset/storage_changeset_test.go
+++ b/common/changeset/storage_changeset_test.go
@@ -403,9 +403,9 @@ func doTestFind(
 		}
 
 		c := tx.Cursor(bucket)
-		defer c.Close()
+
 		err := encodeFunc(1, ch, func(k, v []byte) error {
-			if err2 := c.Append(common.CopyBytes(k), common.CopyBytes(v)); err2 != nil {
+			if err2 := c.Put(common.CopyBytes(k), common.CopyBytes(v)); err2 != nil {
 				return err2
 			}
 			return nil
@@ -555,7 +555,7 @@ func TestMultipleIncarnationsOfTheSameContract(t *testing.T) {
 	assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractC, 5, key4), val4))
 
 	assert.NoError(t, EncodeStoragePlain(1, ch, func(k, v []byte) error {
-		return c.Append(k, v)
+		return c.Put(k, v)
 	}))
 
 	data1, err1 := cs.FindWithIncarnation(1, dbutils.PlainGenerateCompositeStorageKey(contractA, 2, key1))
diff --git a/common/etl/collector.go b/common/etl/collector.go
index 38b611f50e..1c61fde90a 100644
--- a/common/etl/collector.go
+++ b/common/etl/collector.go
@@ -13,6 +13,7 @@ import (
 	"time"
 
 	"github.com/ledgerwatch/turbo-geth/common"
+	"github.com/ledgerwatch/turbo-geth/common/dbutils"
 	"github.com/ledgerwatch/turbo-geth/ethdb"
 	"github.com/ledgerwatch/turbo-geth/log"
 	"github.com/ugorji/go/codec"
@@ -170,9 +171,11 @@ func loadFilesIntoBucket(logPrefix string, db ethdb.Database, bucket string, pro
 		}
 	}
 	var canUseAppend bool
+	isDupSort := dbutils.BucketsConfigs[bucket].Flags&dbutils.DupSort != 0 && !dbutils.BucketsConfigs[bucket].AutoDupSortKeysConversion
 
 	logEvery := time.NewTicker(30 * time.Second)
 	defer logEvery.Stop()
+	var pervK []byte
 
 	i := 0
 	loadNextFunc := func(originalK, k, v []byte) error {
@@ -207,9 +210,23 @@ func loadFilesIntoBucket(logPrefix string, db ethdb.Database, bucket string, pro
 			return nil
 		}
 		if canUseAppend {
-			if err := tx.(*ethdb.TxDb).Append(bucket, k, v); err != nil {
-				return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err)
+			if isDupSort {
+				if bytes.Equal(k, pervK) {
+					if err := tx.AppendDup(bucket, k, v); err != nil {
+						return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err)
+					}
+				} else {
+					if err := tx.Append(bucket, k, v); err != nil {
+						return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err)
+					}
+				}
+				pervK = k
+			} else {
+				if err := tx.Append(bucket, k, v); err != nil {
+					return fmt.Errorf("%s: append: k=%x, %w", logPrefix, k, err)
+				}
 			}
+
 			return nil
 		}
 		if err := tx.Put(bucket, k, v); err != nil {
diff --git a/core/generate_index_test.go b/core/generate_index_test.go
index 0c9031c24b..351becfa7c 100644
--- a/core/generate_index_test.go
+++ b/core/generate_index_test.go
@@ -222,7 +222,7 @@ func generateTestData(t *testing.T, db ethdb.Database, csBucket string, numOfBlo
 			expected3[len(expected3)-1] = expected3[len(expected3)-1].Append(uint64(i), false)
 		}
 		err = csInfo.Encode(uint64(i), cs, func(k, v []byte) error {
-			return db.Append(csBucket, k, v)
+			return db.Put(csBucket, k, v)
 		})
 		if err != nil {
 			t.Fatal(err)
diff --git a/core/state/db_state_writer.go b/core/state/db_state_writer.go
index 3e1ab8b56c..8f8aa8a79b 100644
--- a/core/state/db_state_writer.go
+++ b/core/state/db_state_writer.go
@@ -1,6 +1,7 @@
 package state
 
 import (
+	"bytes"
 	"context"
 	"encoding/binary"
 	"fmt"
@@ -208,11 +209,23 @@ func (dsw *DbStateWriter) WriteChangeSets() error {
 	if err != nil {
 		return err
 	}
+	var prevK []byte
 	if err = changeset.Mapper[dbutils.AccountChangeSetBucket].Encode(dsw.blockNr, accountChanges, func(k, v []byte) error {
-		return dsw.db.Append(dbutils.AccountChangeSetBucket, k, v)
+		if bytes.Equal(k, prevK) {
+			if err = dsw.db.AppendDup(dbutils.AccountChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		} else {
+			if err = dsw.db.Append(dbutils.AccountChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		}
+		prevK = k
+		return nil
 	}); err != nil {
 		return err
 	}
+	prevK = nil
 
 	storageChanges, err := dsw.csw.GetStorageChanges()
 	if err != nil {
@@ -222,7 +235,17 @@ func (dsw *DbStateWriter) WriteChangeSets() error {
 		return nil
 	}
 	if err = changeset.Mapper[dbutils.StorageChangeSetBucket].Encode(dsw.blockNr, storageChanges, func(k, v []byte) error {
-		return dsw.db.Append(dbutils.StorageChangeSetBucket, k, v)
+		if bytes.Equal(k, prevK) {
+			if err = dsw.db.AppendDup(dbutils.StorageChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		} else {
+			if err = dsw.db.Append(dbutils.StorageChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		}
+		prevK = k
+		return nil
 	}); err != nil {
 		return err
 	}
diff --git a/core/state/plain_state_writer.go b/core/state/plain_state_writer.go
index 181139aeb1..00f96aa42d 100644
--- a/core/state/plain_state_writer.go
+++ b/core/state/plain_state_writer.go
@@ -1,6 +1,7 @@
 package state
 
 import (
+	"bytes"
 	"context"
 	"encoding/binary"
 
@@ -152,11 +153,23 @@ func (w *PlainStateWriter) WriteChangeSets() error {
 	if err != nil {
 		return err
 	}
+	var prevK []byte
 	if err = changeset.Mapper[dbutils.PlainAccountChangeSetBucket].Encode(w.blockNumber, accountChanges, func(k, v []byte) error {
-		return db.Append(dbutils.PlainAccountChangeSetBucket, k, v)
+		if bytes.Equal(k, prevK) {
+			if err = db.AppendDup(dbutils.PlainAccountChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		} else {
+			if err = db.Append(dbutils.PlainAccountChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		}
+		prevK = k
+		return nil
 	}); err != nil {
 		return err
 	}
+	prevK = nil
 
 	storageChanges, err := w.csw.GetStorageChanges()
 	if err != nil {
@@ -166,7 +179,17 @@ func (w *PlainStateWriter) WriteChangeSets() error {
 		return nil
 	}
 	if err = changeset.Mapper[dbutils.PlainStorageChangeSetBucket].Encode(w.blockNumber, storageChanges, func(k, v []byte) error {
-		return db.Append(dbutils.PlainStorageChangeSetBucket, k, v)
+		if bytes.Equal(k, prevK) {
+			if err = db.AppendDup(dbutils.PlainStorageChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		} else {
+			if err = db.Append(dbutils.PlainStorageChangeSetBucket, k, v); err != nil {
+				return err
+			}
+		}
+		prevK = k
+		return nil
 	}); err != nil {
 		return err
 	}
diff --git a/ethdb/interface.go b/ethdb/interface.go
index c68ab9caf1..81b7989945 100644
--- a/ethdb/interface.go
+++ b/ethdb/interface.go
@@ -104,6 +104,7 @@ type Database interface {
 	Ancients() (uint64, error)
 	TruncateAncients(items uint64) error
 	Append(bucket string, key, value []byte) error
+	AppendDup(bucket string, key, value []byte) error
 	Sequence(bucket string, amount uint64) (uint64, error)
 }
 
diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go
index b1c5582dda..0a1d91045b 100644
--- a/ethdb/kv_lmdb.go
+++ b/ethdb/kv_lmdb.go
@@ -1526,6 +1526,19 @@ func (c *LmdbDupSortCursor) LastDup(k []byte) ([]byte, error) {
 	return v, nil
 }
 
+func (c *LmdbDupSortCursor) Append(k []byte, v []byte) error {
+	if c.c == nil {
+		if err := c.initCursor(); err != nil {
+			return err
+		}
+	}
+
+	if err := c.c.Put(k, v, lmdb.Append|lmdb.AppendDup); err != nil {
+		return fmt.Errorf("in Append: %w", err)
+	}
+	return nil
+}
+
 func (c *LmdbDupSortCursor) AppendDup(k []byte, v []byte) error {
 	if c.c == nil {
 		if err := c.initCursor(); err != nil {
diff --git a/ethdb/kv_mdbx.go b/ethdb/kv_mdbx.go
index 4f8e63d84a..36dc9874d5 100644
--- a/ethdb/kv_mdbx.go
+++ b/ethdb/kv_mdbx.go
@@ -1486,6 +1486,19 @@ func (c *MdbxDupSortCursor) LastDup(k []byte) ([]byte, error) {
 	return v, nil
 }
 
+func (c *MdbxDupSortCursor) Append(k []byte, v []byte) error {
+	if c.c == nil {
+		if err := c.initCursor(); err != nil {
+			return err
+		}
+	}
+
+	if err := c.c.Put(k, v, mdbx.Append|mdbx.AppendDup); err != nil {
+		return fmt.Errorf("in Append: %w", err)
+	}
+	return nil
+}
+
 func (c *MdbxDupSortCursor) AppendDup(k []byte, v []byte) error {
 	if c.c == nil {
 		if err := c.initCursor(); err != nil {
diff --git a/ethdb/mutation.go b/ethdb/mutation.go
index 2e904b2599..15c526d867 100644
--- a/ethdb/mutation.go
+++ b/ethdb/mutation.go
@@ -144,6 +144,10 @@ func (m *mutation) Append(table string, key []byte, value []byte) error {
 	return m.Put(table, key, value)
 }
 
+func (m *mutation) AppendDup(table string, key []byte, value []byte) error {
+	return m.Put(table, key, value)
+}
+
 func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) {
 	m.mu.Lock()
 	defer m.mu.Unlock()
diff --git a/ethdb/object_db.go b/ethdb/object_db.go
index 14544de717..6d2b940190 100644
--- a/ethdb/object_db.go
+++ b/ethdb/object_db.go
@@ -108,6 +108,14 @@ func (db *ObjectDatabase) Append(bucket string, key []byte, value []byte) error
 	return err
 }
 
+// AppendDup appends a single entry to the end of the bucket.
+func (db *ObjectDatabase) AppendDup(bucket string, key []byte, value []byte) error {
+	err := db.kv.Update(context.Background(), func(tx Tx) error {
+		return tx.CursorDupSort(bucket).AppendDup(key, value)
+	})
+	return err
+}
+
 // MultiPut - requirements: input must be sorted and without duplicates
 func (db *ObjectDatabase) MultiPut(tuples ...[]byte) (uint64, error) {
 	err := db.kv.Update(context.Background(), func(tx Tx) error {
diff --git a/ethdb/tx_db.go b/ethdb/tx_db.go
index 07a8ee11ac..4bdfbf7273 100644
--- a/ethdb/tx_db.go
+++ b/ethdb/tx_db.go
@@ -71,12 +71,12 @@ func (m *TxDb) Reserve(bucket string, key []byte, i int) ([]byte, error) {
 
 func (m *TxDb) Append(bucket string, key []byte, value []byte) error {
 	m.len += uint64(len(key) + len(value))
-	switch c := m.cursors[bucket].(type) {
-	case CursorDupSort:
-		return c.AppendDup(key, value)
-	default:
-		return c.Append(key, value)
-	}
+	return m.cursors[bucket].Append(key, value)
+}
+
+func (m *TxDb) AppendDup(bucket string, key []byte, value []byte) error {
+	m.len += uint64(len(key) + len(value))
+	return m.cursors[bucket].(CursorDupSort).AppendDup(key, value)
 }
 
 func (m *TxDb) Delete(bucket string, k, v []byte) error {
-- 
GitLab