From 1b0d7c80bf556bd147b368279d8dfd8ff739ba53 Mon Sep 17 00:00:00 2001
From: Giulio rebuffo <giulio.rebuffo@gmail.com>
Date: Mon, 4 Apr 2022 09:11:59 +0200
Subject: [PATCH] Better batches for Execution (Hashmaps based instead of BTree
 based) (#3814)

---
 core/rawdb/accessors_chain.go   |   2 +-
 eth/calltracer/calltracer.go    |   2 +-
 eth/stagedsync/stage_execute.go |  24 ++-
 ethdb/olddb/mapmutation.go      | 304 ++++++++++++++++++++++++++++++++
 turbo/cli/flags.go              |   1 -
 5 files changed, 321 insertions(+), 12 deletions(-)
 create mode 100644 ethdb/olddb/mapmutation.go

diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go
index 58d571044e..49596a8871 100644
--- a/core/rawdb/accessors_chain.go
+++ b/core/rawdb/accessors_chain.go
@@ -979,7 +979,7 @@ func WriteReceipts(tx kv.Putter, number uint64, receipts types.Receipts) error {
 }
 
 // AppendReceipts stores all the transaction receipts belonging to a block.
-func AppendReceipts(tx kv.RwTx, blockNumber uint64, receipts types.Receipts) error {
+func AppendReceipts(tx kv.StatelessWriteTx, blockNumber uint64, receipts types.Receipts) error {
 	buf := bytes.NewBuffer(make([]byte, 0, 1024))
 
 	for txId, r := range receipts {
diff --git a/eth/calltracer/calltracer.go b/eth/calltracer/calltracer.go
index 3d3b919603..2c697a3f2a 100644
--- a/eth/calltracer/calltracer.go
+++ b/eth/calltracer/calltracer.go
@@ -67,7 +67,7 @@ func (ct *CallTracer) CaptureAccountWrite(account common.Address) error {
 	return nil
 }
 
-func (ct *CallTracer) WriteToDb(tx kv.RwTx, block *types.Block, vmConfig vm.Config) error {
+func (ct *CallTracer) WriteToDb(tx kv.StatelessWriteTx, block *types.Block, vmConfig vm.Config) error {
 	ct.tos[block.Coinbase()] = false
 	for _, uncle := range block.Uncles() {
 		ct.tos[uncle.Coinbase] = false
diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go
index 6fb6dd0478..f6e76d64b7 100644
--- a/eth/stagedsync/stage_execute.go
+++ b/eth/stagedsync/stage_execute.go
@@ -219,16 +219,20 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint
 	startTime := time.Now()
 
 	var batch ethdb.DbWithPendingMutations
-	batch = olddb.NewBatch(tx, quit)
+	// state is stored through ethdb batches
+	batch = olddb.NewHashBatch(tx, quit, cfg.tmpdir)
 	defer batch.Rollback()
-
+	// changes are stored through memory buffer
 	logEvery := time.NewTicker(logInterval)
 	defer logEvery.Stop()
 	stageProgress := s.BlockNumber
 	logBlock := stageProgress
 	logTx, lastLogTx := uint64(0), uint64(0)
 	logTime := time.Now()
-	var gas uint64
+	var gas uint64             // used for logs
+	var currentStateGas uint64 // used for batch commits of state
+	// Transform batch_size limit into Ggas
+	gasState := uint64(cfg.batchSize) * uint64(datasize.KB) * 3
 
 	startGasUsed, err := rawdb.ReadCumulativeGasUsed(tx, s.BlockNumber)
 	if err != nil {
@@ -278,8 +282,9 @@ Loop:
 		}
 		stageProgress = blockNum
 
-		updateProgress := batch.BatchSize() >= int(cfg.batchSize)
-		if updateProgress {
+		if currentStateGas >= gasState {
+			log.Info("Committed State", "gas reached", currentStateGas, "gasTarget", gasState)
+			currentStateGas = 0
 			if err = batch.Commit(); err != nil {
 				return err
 			}
@@ -297,13 +302,13 @@ Loop:
 				// TODO: This creates stacked up deferrals
 				defer tx.Rollback()
 			}
-			batch = olddb.NewBatch(tx, quit)
+			batch = olddb.NewHashBatch(tx, quit, cfg.tmpdir)
 			// TODO: This creates stacked up deferrals
 			defer batch.Rollback()
 		}
 
 		gas = gas + block.GasUsed()
-
+		currentStateGas = currentStateGas + block.GasUsed()
 		select {
 		default:
 		case <-logEvery.C:
@@ -318,7 +323,7 @@ Loop:
 			if estimateRatio != 0 {
 				estimatedTime = common.PrettyDuration((elapsed.Seconds() / estimateRatio) * float64(time.Second))
 			}
-			logBlock, logTx, logTime = logProgress(logPrefix, logBlock, logTime, blockNum, logTx, lastLogTx, gas, estimatedTime, batch)
+			logBlock, logTx, logTime = logProgress(logPrefix, logBlock, logTime, blockNum, logTx, lastLogTx, gas, float64(currentStateGas)/float64(gasState), estimatedTime, batch)
 			gas = 0
 			tx.CollectMetrics()
 			syncMetrics[stages.Execution].Set(blockNum)
@@ -342,7 +347,7 @@ Loop:
 	return stoppedErr
 }
 
-func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64, estimatedTime common.PrettyDuration, batch ethdb.DbWithPendingMutations) (uint64, uint64, time.Time) {
+func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64, gasState float64, estimatedTime common.PrettyDuration, batch ethdb.DbWithPendingMutations) (uint64, uint64, time.Time) {
 	currentTime := time.Now()
 	interval := currentTime.Sub(prevTime)
 	speed := float64(currentBlock-prevBlock) / (float64(interval) / float64(time.Second))
@@ -356,6 +361,7 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current
 		"blk/s", speed,
 		"tx/s", speedTx,
 		"Mgas/s", speedMgas,
+		"gasState", gasState,
 	}
 	if estimatedTime > 0 {
 		logpairs = append(logpairs, "estimated duration", estimatedTime)
diff --git a/ethdb/olddb/mapmutation.go b/ethdb/olddb/mapmutation.go
new file mode 100644
index 0000000000..5c9f20a8dd
--- /dev/null
+++ b/ethdb/olddb/mapmutation.go
@@ -0,0 +1,304 @@
+package olddb
+
+import (
+	"context"
+	"encoding/binary"
+	"fmt"
+	"sync"
+	"time"
+	"unsafe"
+
+	"github.com/ledgerwatch/erigon-lib/etl"
+	"github.com/ledgerwatch/erigon-lib/kv"
+	"github.com/ledgerwatch/erigon/ethdb"
+	"github.com/ledgerwatch/log/v3"
+)
+
+type mapmutation struct {
+	puts   map[string]map[string][]byte
+	db     kv.RwTx
+	quit   <-chan struct{}
+	clean  func()
+	mu     sync.RWMutex
+	size   int
+	count  uint64
+	tmpdir string
+}
+
+// NewBatch - starts in-mem batch
+//
+// Common pattern:
+//
+// batch := db.NewBatch()
+// defer batch.Rollback()
+// ... some calculations on `batch`
+// batch.Commit()
+func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string) *mapmutation {
+	clean := func() {}
+	if quit == nil {
+		ch := make(chan struct{})
+		clean = func() { close(ch) }
+		quit = ch
+	}
+	return &mapmutation{
+		db:     tx,
+		puts:   make(map[string]map[string][]byte),
+		quit:   quit,
+		clean:  clean,
+		tmpdir: tmpdir,
+	}
+}
+
+func (m *mapmutation) RwKV() kv.RwDB {
+	if casted, ok := m.db.(ethdb.HasRwKV); ok {
+		return casted.RwKV()
+	}
+	return nil
+}
+
+func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+	if _, ok := m.puts[table]; !ok {
+		return nil, false
+	}
+	var value []byte
+	var ok bool
+	if value, ok = m.puts[table][*(*string)(unsafe.Pointer(&key))]; !ok {
+		return nil, false
+	}
+	return value, ok
+}
+
+func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) {
+	v, ok := m.getMem(kv.Sequence, []byte(bucket))
+	if !ok && m.db != nil {
+		v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
+		if err != nil {
+			return 0, err
+		}
+	}
+
+	var currentV uint64 = 0
+	if len(v) > 0 {
+		currentV = binary.BigEndian.Uint64(v)
+	}
+
+	newVBytes := make([]byte, 8)
+	binary.BigEndian.PutUint64(newVBytes, currentV+amount)
+	if err = m.Put(kv.Sequence, []byte(bucket), newVBytes); err != nil {
+		return 0, err
+	}
+
+	return currentV, nil
+}
+func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) {
+	v, ok := m.getMem(kv.Sequence, []byte(bucket))
+	if !ok && m.db != nil {
+		v, err = m.db.GetOne(kv.Sequence, []byte(bucket))
+		if err != nil {
+			return 0, err
+		}
+	}
+	var currentV uint64 = 0
+	if len(v) > 0 {
+		currentV = binary.BigEndian.Uint64(v)
+	}
+
+	return currentV, nil
+}
+
+// Can only be called from the worker thread
+func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) {
+	if value, ok := m.getMem(table, key); ok {
+		if value == nil {
+			return nil, nil
+		}
+		return value, nil
+	}
+	if m.db != nil {
+		// TODO: simplify when tx can no longer be parent of mutation
+		value, err := m.db.GetOne(table, key)
+		if err != nil {
+			return nil, err
+		}
+
+		return value, nil
+	}
+	return nil, nil
+}
+
+// Can only be called from the worker thread
+func (m *mapmutation) Get(table string, key []byte) ([]byte, error) {
+	value, err := m.GetOne(table, key)
+	if err != nil {
+		return nil, err
+	}
+
+	if value == nil {
+		return nil, ethdb.ErrKeyNotFound
+	}
+
+	return value, nil
+}
+
+func (m *mapmutation) Last(table string) ([]byte, []byte, error) {
+	c, err := m.db.Cursor(table)
+	if err != nil {
+		return nil, nil, err
+	}
+	defer c.Close()
+	return c.Last()
+}
+
+func (m *mapmutation) hasMem(table string, key []byte) bool {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+	if _, ok := m.puts[table]; !ok {
+		return false
+	}
+
+	_, ok := m.puts[table][*(*string)(unsafe.Pointer(&key))]
+	return ok
+}
+
+func (m *mapmutation) Has(table string, key []byte) (bool, error) {
+	if m.hasMem(table, key) {
+		return true, nil
+	}
+	if m.db != nil {
+		return m.db.Has(table, key)
+	}
+	return false, nil
+}
+
+func (m *mapmutation) Put(table string, key []byte, value []byte) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	if _, ok := m.puts[table]; !ok {
+		m.puts[table] = make(map[string][]byte)
+	}
+	var ok bool
+	if _, ok = m.puts[table][*(*string)(unsafe.Pointer(&key))]; !ok {
+		m.size += len(value) - len(m.puts[table][*(*string)(unsafe.Pointer(&key))])
+		m.puts[table][*(*string)(unsafe.Pointer(&key))] = value
+		return nil
+	}
+	m.puts[table][*(*string)(unsafe.Pointer(&key))] = value
+	m.size += len(key) + len(value)
+	m.count++
+	return nil
+}
+
+func (m *mapmutation) Append(table string, key []byte, value []byte) error {
+	return m.Put(table, key, value)
+}
+
+func (m *mapmutation) AppendDup(table string, key []byte, value []byte) error {
+	return m.Put(table, key, value)
+}
+
+func (m *mapmutation) BatchSize() int {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+	return m.size
+}
+
+func (m *mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
+	m.panicOnEmptyDB()
+	return m.db.ForEach(bucket, fromPrefix, walker)
+}
+
+func (m *mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
+	m.panicOnEmptyDB()
+	return m.db.ForPrefix(bucket, prefix, walker)
+}
+
+func (m *mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error {
+	m.panicOnEmptyDB()
+	return m.db.ForAmount(bucket, prefix, amount, walker)
+}
+
+func (m *mapmutation) Delete(table string, k, v []byte) error {
+	if v != nil {
+		return m.db.Delete(table, k, v) // TODO: mutation to support DupSort deletes
+	}
+	return m.Put(table, k, nil)
+}
+
+func (m *mapmutation) doCommit(tx kv.RwTx) error {
+	logEvery := time.NewTicker(30 * time.Second)
+	defer logEvery.Stop()
+	count := 0
+	total := float64(m.count)
+
+	for table, bucket := range m.puts {
+		collector := etl.NewCollector("", m.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
+		defer collector.Close()
+		for key, value := range bucket {
+			collector.Collect([]byte(key), value)
+			count++
+			select {
+			default:
+			case <-logEvery.C:
+				progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000)
+				log.Info("Write to db", "progress", progress, "current table", table)
+				tx.CollectMetrics()
+			case <-m.quit:
+				return nil
+			}
+		}
+		if err := collector.Load(m.db, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil {
+			return err
+		}
+	}
+
+	tx.CollectMetrics()
+	return nil
+}
+
+func (m *mapmutation) Commit() error {
+	if m.db == nil {
+		return nil
+	}
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if err := m.doCommit(m.db); err != nil {
+		return err
+	}
+
+	m.puts = map[string]map[string][]byte{}
+	m.size = 0
+	m.count = 0
+	m.clean()
+	return nil
+}
+
+func (m *mapmutation) Rollback() {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	m.puts = map[string]map[string][]byte{}
+	m.size = 0
+	m.count = 0
+	m.size = 0
+	m.clean()
+}
+
+func (m *mapmutation) Close() {
+	m.Rollback()
+}
+
+func (m *mapmutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) {
+	panic("mutation can't start transaction, because doesn't own it")
+}
+
+func (m *mapmutation) panicOnEmptyDB() {
+	if m.db == nil {
+		panic("Not implemented")
+	}
+}
+
+func (m *mapmutation) SetRwKV(kv kv.RwDB) {
+	m.db.(ethdb.HasRwKV).SetRwKV(kv)
+}
diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go
index 9c5823aaa0..42074ffcc1 100644
--- a/turbo/cli/flags.go
+++ b/turbo/cli/flags.go
@@ -176,7 +176,6 @@ func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config) {
 		utils.Fatalf(fmt.Sprintf("error while parsing mode: %v", err))
 	}
 	cfg.Prune = mode
-
 	if ctx.GlobalString(BatchSizeFlag.Name) != "" {
 		err := cfg.BatchSize.UnmarshalText([]byte(ctx.GlobalString(BatchSizeFlag.Name)))
 		if err != nil {
-- 
GitLab