From 43a4e34a6e47c101dac6c006bc79962fd82dc121 Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Mon, 24 Aug 2020 18:07:59 +0700
Subject: [PATCH] [to discuss] exec blocks without ObjectDb: Mutation over TxDb
 (#947)

* use mutation over tx

* clean

* clear

* add .CommitAndBegin() method

* clean

* increase timings for logging

* return ideal batch size
---
 eth/stagedsync/stage_execute.go | 26 ++++++++++++++--------
 ethdb/interface.go              | 38 ++++++++++++++++++++++++++++++++-
 ethdb/kv_lmdb.go                |  4 ++--
 ethdb/mutation.go               |  5 +++++
 ethdb/object_db.go              |  2 +-
 ethdb/tx_db.go                  | 29 ++++++++++++++++++-------
 6 files changed, 83 insertions(+), 21 deletions(-)

diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go
index c94dfb54b8..c09b6afddc 100644
--- a/eth/stagedsync/stage_execute.go
+++ b/eth/stagedsync/stage_execute.go
@@ -58,11 +58,13 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 		}
 	}
 
-	batch := stateDB.NewBatch()
-	//batch, err := stateDB.Begin()
-	//if err != nil {
-	//	return err
-	//}
+	tx, err := stateDB.Begin()
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	batch := tx.NewBatch()
 	defer batch.Rollback()
 
 	engine := chainContext.Engine()
@@ -79,12 +81,12 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 
 		stageProgress = blockNum
 
-		blockHash := rawdb.ReadCanonicalHash(batch, blockNum)
-		block := rawdb.ReadBlock(batch, blockHash, blockNum)
+		blockHash := rawdb.ReadCanonicalHash(tx, blockNum)
+		block := rawdb.ReadBlock(tx, blockHash, blockNum)
 		if block == nil {
 			break
 		}
-		senders := rawdb.ReadSenders(batch, blockHash, blockNum)
+		senders := rawdb.ReadSenders(tx, blockHash, blockNum)
 		block.Body().SendersToTxs(senders)
 
 		var stateReader state.StateReader
@@ -107,7 +109,10 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 			if err = s.Update(batch, blockNum); err != nil {
 				return err
 			}
-			if _, err = batch.Commit(); err != nil {
+			if err = batch.CommitAndBegin(); err != nil {
+				return err
+			}
+			if err = tx.CommitAndBegin(); err != nil {
 				return err
 			}
 		}
@@ -138,6 +143,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
 	if _, err := batch.Commit(); err != nil {
 		return fmt.Errorf("sync Execute: failed to write batch commit: %v", err)
 	}
+	if _, err = tx.Commit(); err != nil {
+		return err
+	}
 	log.Info("Completed on", "block", stageProgress)
 	s.Done()
 	return nil
diff --git a/ethdb/interface.go b/ethdb/interface.go
index 934a418f19..62a3d9075b 100644
--- a/ethdb/interface.go
+++ b/ethdb/interface.go
@@ -79,7 +79,17 @@ type Database interface {
 	// Entries are passed as an array:
 	// bucket0, key0, val0, bucket1, key1, val1, ...
 	MultiPut(tuples ...[]byte) (uint64, error)
-	NewBatch() DbWithPendingMutations       // starts in-mem batch
+
+	// NewBatch - starts in-mem batch
+	//
+	// Common pattern:
+	//
+	// batch := db.NewBatch()
+	// defer batch.Rollback()
+	// ... some calculations on `batch`
+	// batch.Commit()
+	//
+	NewBatch() DbWithPendingMutations       //
 	Begin() (DbWithPendingMutations, error) // starts db transaction
 	Last(bucket string) ([]byte, []byte, error)
 
@@ -106,7 +116,33 @@ type MinDatabase interface {
 // Later they can either be committed to the database or rolled back.
 type DbWithPendingMutations interface {
 	Database
+
+	// Commit - commits transaction (or flush data into underlying db object in case of `mutation`)
+	//
+	// Common pattern:
+	//
+	// tx := db.Begin()
+	// defer tx.Rollback()
+	// ... some calculations on `tx`
+	// tx.Commit()
+	//
 	Commit() (uint64, error)
+
+	// CommitAndBegin - commits and starts new transaction inside same db object.
+	// useful for periodical commits implementation.
+	//
+	// Common pattern:
+	//
+	// tx := db.Begin()
+	// defer tx.Rollback()
+	// for {
+	// 		... some calculations on `tx`
+	//       tx.CommitAndBegin()
+	//       // defer here - is not useful, because 'tx' object is reused and first `defer` will work perfectly
+	// }
+	// tx.Commit()
+	//
+	CommitAndBegin() error
 	Rollback()
 	BatchSize() int
 }
diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go
index da9ddcea26..6c9f3d4992 100644
--- a/ethdb/kv_lmdb.go
+++ b/ethdb/kv_lmdb.go
@@ -328,7 +328,7 @@ func (db *LmdbKV) Update(ctx context.Context, f func(tx Tx) error) (err error) {
 	}
 
 	commitTook := time.Since(commitTimer)
-	if commitTook > 10*time.Second {
+	if commitTook > 20*time.Second {
 		log.Info("Batch", "commit", commitTook)
 	}
 
@@ -337,7 +337,7 @@ func (db *LmdbKV) Update(ctx context.Context, f func(tx Tx) error) (err error) {
 		log.Warn("fsync after commit failed: \n", err)
 	}
 	fsyncTook := time.Since(fsyncTimer)
-	if fsyncTook > 1*time.Second {
+	if fsyncTook > 20*time.Second {
 		log.Info("Batch", "fsync", fsyncTook)
 	}
 	return nil
diff --git a/ethdb/mutation.go b/ethdb/mutation.go
index 386090427d..78fae46ab2 100644
--- a/ethdb/mutation.go
+++ b/ethdb/mutation.go
@@ -138,6 +138,11 @@ func (m *mutation) Delete(bucket string, key []byte) error {
 	return nil
 }
 
+func (m *mutation) CommitAndBegin() error {
+	_, err := m.Commit()
+	return err
+}
+
 func (m *mutation) Commit() (uint64, error) {
 	if metrics.Enabled {
 		if m.puts.Size() >= m.db.IdealBatchSize() {
diff --git a/ethdb/object_db.go b/ethdb/object_db.go
index 1b9d9f329e..ebde4dacb0 100644
--- a/ethdb/object_db.go
+++ b/ethdb/object_db.go
@@ -361,7 +361,7 @@ func (db *ObjectDatabase) NewBatch() DbWithPendingMutations {
 }
 
 func (db *ObjectDatabase) Begin() (DbWithPendingMutations, error) {
-	batch := &TxDb{db: db, cursors: map[string]*LmdbCursor{}}
+	batch := &TxDb{db: db}
 	if err := batch.begin(nil); err != nil {
 		panic(err)
 	}
diff --git a/ethdb/tx_db.go b/ethdb/tx_db.go
index bad00d685d..0391d37b7f 100644
--- a/ethdb/tx_db.go
+++ b/ethdb/tx_db.go
@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/c2h5oh/datasize"
 	"github.com/ledgerwatch/turbo-geth/common"
 	"github.com/ledgerwatch/turbo-geth/common/dbutils"
 	"github.com/ledgerwatch/turbo-geth/log"
@@ -14,14 +13,15 @@ import (
 
 // TxDb - provides Database interface around ethdb.Tx
 // It's not thread-safe!
-// It's not usable after .Commit()/.Rollback() call
+// TxDb not usable after .Commit()/.Rollback() call, but usable after .CommitAndBegin() call
 // you can put unlimited amount of data into this class, call IdealBatchSize is unnecessary
 // Walk and MultiWalk methods - work outside of Tx object yet, will implement it later
 type TxDb struct {
-	db      Database
-	Tx      Tx
-	cursors map[string]*LmdbCursor
-	len     uint64
+	db       Database
+	Tx       Tx
+	ParentTx Tx
+	cursors  map[string]*LmdbCursor
+	len      uint64
 }
 
 func (m *TxDb) Close() {
@@ -29,7 +29,7 @@ func (m *TxDb) Close() {
 }
 
 func (m *TxDb) Begin() (DbWithPendingMutations, error) {
-	batch := &TxDb{db: m.db, cursors: map[string]*LmdbCursor{}}
+	batch := &TxDb{db: m.db}
 	if err := batch.begin(m.Tx); err != nil {
 		return nil, err
 	}
@@ -64,6 +64,8 @@ func (m *TxDb) begin(parent Tx) error {
 		return err
 	}
 	m.Tx = tx
+	m.ParentTx = parent
+	m.cursors = make(map[string]*LmdbCursor, 16)
 	for i := range dbutils.Buckets {
 		m.cursors[dbutils.Buckets[i]] = tx.Cursor(dbutils.Buckets[i]).(*LmdbCursor)
 		if err := m.cursors[dbutils.Buckets[i]].initCursor(); err != nil {
@@ -196,7 +198,7 @@ func (m *TxDb) BatchSize() int {
 
 // IdealBatchSize defines the size of the data batches should ideally add in one write.
 func (m *TxDb) IdealBatchSize() int {
-	return int(1 * datasize.GB)
+	return m.db.IdealBatchSize()
 }
 
 func (m *TxDb) Walk(bucket string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error {
@@ -285,6 +287,15 @@ func MultiWalk(c Cursor, startkeys [][]byte, fixedbits []int, walker func(int, [
 	return nil
 }
 
+func (m *TxDb) CommitAndBegin() error {
+	_, err := m.Commit()
+	if err != nil {
+		return err
+	}
+
+	return m.begin(m.ParentTx)
+}
+
 func (m *TxDb) Commit() (uint64, error) {
 	if m.Tx == nil {
 		return 0, fmt.Errorf("second call .Commit() on same transaction")
@@ -293,6 +304,7 @@ func (m *TxDb) Commit() (uint64, error) {
 		return 0, err
 	}
 	m.Tx = nil
+	m.ParentTx = nil
 	m.cursors = nil
 	m.len = 0
 	return 0, nil
@@ -305,6 +317,7 @@ func (m *TxDb) Rollback() {
 	m.Tx.Rollback()
 	m.cursors = nil
 	m.Tx = nil
+	m.ParentTx = nil
 	m.len = 0
 }
 
-- 
GitLab