From 2b59b45216ffe61311edeef491c5558c9e55d892 Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Tue, 22 Jun 2021 17:08:47 +0700
Subject: [PATCH] Do log tables size, db metrics - avoid concurrency, check
 stale readers hourly  (#2216)

* add table size logs and metrics

* log stale readers

* - don't collect db metrics in background
- do check stale readers once an hour

* execution to update metrics when print logs

* hide file exists err

* hide file exists err
---
 cmd/integration/commands/root.go |   3 -
 eth/stagedsync/stage_execute.go  |   6 +-
 eth/stagedsync/stage_tevm.go     |   4 +-
 eth/stagedsync/state.go          |  30 +++++++
 ethdb/kv/kv_mdbx.go              | 129 ++++++++++++++++++++-----------
 ethdb/kv/kv_remote.go            |   9 +--
 ethdb/kv/kv_snapshot.go          |  16 ++--
 ethdb/kv_interface.go            |  39 +++++++---
 go.mod                           |   2 +-
 go.sum                           |   4 +-
 turbo/node/node.go               |   2 -
 11 files changed, 155 insertions(+), 89 deletions(-)

diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go
index 5d014ece32..6d78a06335 100644
--- a/cmd/integration/commands/root.go
+++ b/cmd/integration/commands/root.go
@@ -8,7 +8,6 @@ import (
 	kv2 "github.com/ledgerwatch/erigon/ethdb/kv"
 	"github.com/ledgerwatch/erigon/internal/debug"
 	"github.com/ledgerwatch/erigon/log"
-	"github.com/ledgerwatch/erigon/metrics"
 	"github.com/ledgerwatch/erigon/migrations"
 	"github.com/spf13/cobra"
 )
@@ -56,7 +55,6 @@ func openDB(path string, applyMigrations bool) ethdb.RwKV {
 			db = kv2.NewObjectDatabase(openKV(label, path, false))
 		}
 	}
-	metrics.AddCallback(db.RwKV().CollectMetrics)
 	return db.RwKV()
 }
 
@@ -69,6 +67,5 @@ func openKV(label ethdb.Label, path string, exclusive bool) ethdb.RwKV {
 		opts = opts.DBVerbosity(ethdb.DBVerbosityLvl(databaseVerbosity))
 	}
 	kv := opts.MustOpen()
-	metrics.AddCallback(kv.CollectMetrics)
 	return kv
 }
diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go
index 4aab79c5fd..fcb18b519f 100644
--- a/eth/stagedsync/stage_execute.go
+++ b/eth/stagedsync/stage_execute.go
@@ -324,11 +324,9 @@ Loop:
 		case <-logEvery.C:
 			logBlock, logTx, logTime = logProgress(logPrefix, logBlock, logTime, blockNum, logTx, lastLogTx, gas, batch)
 			gas = 0
-			if hasTx, ok := tx.(ethdb.HasTx); ok {
-				hasTx.Tx().CollectMetrics()
-			}
+			tx.CollectMetrics()
+			stageExecutionGauge.Update(int64(blockNum))
 		}
-		stageExecutionGauge.Update(int64(blockNum))
 	}
 
 	if err := s.Update(batch, stageProgress); err != nil {
diff --git a/eth/stagedsync/stage_tevm.go b/eth/stagedsync/stage_tevm.go
index 5952630967..6f3e75be68 100644
--- a/eth/stagedsync/stage_tevm.go
+++ b/eth/stagedsync/stage_tevm.go
@@ -97,9 +97,7 @@ func transpileBatch(logPrefix string, s *StageState, fromBlock uint64, toBlock u
 		select {
 		case <-logEvery.C:
 			logBlock, logTime = logTEVMProgress(logPrefix, logBlock, logTime, stageProgress)
-			if hasTx, ok := tx.(ethdb.HasTx); ok {
-				hasTx.Tx().CollectMetrics()
-			}
+			tx.CollectMetrics()
 		default:
 		}
 
diff --git a/eth/stagedsync/state.go b/eth/stagedsync/state.go
index ec974c7b40..2d81894b98 100644
--- a/eth/stagedsync/state.go
+++ b/eth/stagedsync/state.go
@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/ledgerwatch/erigon/common"
+	"github.com/ledgerwatch/erigon/common/dbutils"
 	"github.com/ledgerwatch/erigon/common/debug"
 	"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
 	"github.com/ledgerwatch/erigon/ethdb"
@@ -193,6 +194,13 @@ func (s *State) Run(db ethdb.RwKV, tx ethdb.RwTx) error {
 		timings = append(timings, string(stage.ID), time.Since(t))
 	}
 
+	if err := printLogs(tx, timings); err != nil {
+		return err
+	}
+	return nil
+}
+
+func printLogs(tx ethdb.RwTx, timings []interface{}) error {
 	var m runtime.MemStats
 	runtime.ReadMemStats(&m)
 	log.Info("Memory", "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
@@ -201,6 +209,28 @@ func (s *State) Run(db ethdb.RwKV, tx ethdb.RwTx) error {
 	} else {
 		log.Info("Timings", timings...)
 	}
+
+	if tx == nil {
+		return nil
+	}
+	buckets := []string{
+		"freelist",
+		dbutils.PlainStateBucket,
+		dbutils.AccountChangeSetBucket,
+		dbutils.StorageChangeSetBucket,
+		dbutils.EthTx,
+		dbutils.Log,
+	}
+	bucketSizes := make([]interface{}, 0, 2*len(buckets))
+	for _, bucket := range buckets {
+		sz, err1 := tx.BucketSize(bucket)
+		if err1 != nil {
+			return err1
+		}
+		bucketSizes = append(bucketSizes, bucket, common.StorageSize(sz))
+	}
+	log.Info("Tables", bucketSizes...)
+	tx.CollectMetrics()
 	return nil
 }
 
diff --git a/ethdb/kv/kv_mdbx.go b/ethdb/kv/kv_mdbx.go
index 1503d64c34..f4c6dca936 100644
--- a/ethdb/kv/kv_mdbx.go
+++ b/ethdb/kv/kv_mdbx.go
@@ -28,6 +28,7 @@ var _ DbCopier = &MdbxKV{}
 
 const expectMdbxVersionMajor = 0
 const expectMdbxVersionMinor = 10
+const pageSize = 4 * 1024
 
 const NonExistingDBI dbutils.DBI = 999_999_999
 
@@ -148,7 +149,6 @@ func (opts MdbxOpts) Open() (ethdb.RwKV, error) {
 			opts.mapSize = 2 * datasize.TB
 		}
 	}
-	const pageSize = 4 * 1024
 	if opts.flags&mdbx.Accede == 0 {
 		if opts.inMem {
 			if err = env.SetGeometry(-1, -1, int(opts.mapSize), int(2*datasize.MB), 0, 4*1024); err != nil {
@@ -206,13 +206,12 @@ func (opts MdbxOpts) Open() (ethdb.RwKV, error) {
 	}
 
 	db := &MdbxKV{
-		opts:     opts,
-		env:      env,
-		log:      logger,
-		wg:       &sync.WaitGroup{},
-		buckets:  dbutils.BucketsCfg{},
-		pageSize: pageSize,
-		txSize:   dirtyPagesLimit * pageSize,
+		opts:    opts,
+		env:     env,
+		log:     logger,
+		wg:      &sync.WaitGroup{},
+		buckets: dbutils.BucketsCfg{},
+		txSize:  dirtyPagesLimit * pageSize,
 	}
 	customBuckets := opts.bucketsCfg(dbutils.BucketsConfigs)
 	for name, cfg := range customBuckets { // copy map to avoid changing global variable
@@ -290,7 +289,7 @@ func (opts MdbxOpts) Open() (ethdb.RwKV, error) {
 		if staleReaders, err := db.env.ReaderCheck(); err != nil {
 			db.log.Error("failed ReaderCheck", "err", err)
 		} else if staleReaders > 0 {
-			db.log.Debug("cleared reader slots from dead processes", "amount", staleReaders)
+			db.log.Info("[db] cleared reader slots from dead processes", "amount", staleReaders)
 		}
 	}
 	return db, nil
@@ -305,13 +304,12 @@ func (opts MdbxOpts) MustOpen() ethdb.RwKV {
 }
 
 type MdbxKV struct {
-	env      *mdbx.Env
-	log      log.Logger
-	wg       *sync.WaitGroup
-	buckets  dbutils.BucketsCfg
-	opts     MdbxOpts
-	txSize   uint64
-	pageSize uint64
+	env     *mdbx.Env
+	log     log.Logger
+	wg      *sync.WaitGroup
+	buckets dbutils.BucketsCfg
+	opts    MdbxOpts
+	txSize  uint64
 }
 
 func (db *MdbxKV) NewDbWithTheSameParameters() *ObjectDatabase {
@@ -339,28 +337,6 @@ func (db *MdbxKV) Close() {
 	}
 }
 
-func (db *MdbxKV) CollectMetrics() {
-	if !metrics.Enabled {
-		return
-	}
-	if db.opts.label != ethdb.Chain {
-		return
-	}
-	info, err := db.env.Info()
-	if err != nil {
-		return // ignore error for metrics collection
-	}
-	ethdb.DbSize.Update(int64(info.Geo.Current))
-	ethdb.DbPgopsNewly.Update(int64(info.PageOps.Newly))
-	ethdb.DbPgopsCow.Update(int64(info.PageOps.Cow))
-	ethdb.DbPgopsClone.Update(int64(info.PageOps.Clone))
-	ethdb.DbPgopsSplit.Update(int64(info.PageOps.Split))
-	ethdb.DbPgopsMerge.Update(int64(info.PageOps.Merge))
-	ethdb.DbPgopsSpill.Update(int64(info.PageOps.Spill))
-	ethdb.DbPgopsUnspill.Update(int64(info.PageOps.Unspill))
-	ethdb.DbPgopsWops.Update(int64(info.PageOps.Wops))
-}
-
 func (db *MdbxKV) BeginRo(_ context.Context) (txn ethdb.Tx, err error) {
 	if db.env == nil {
 		return nil, fmt.Errorf("db closed")
@@ -498,12 +474,35 @@ func (tx *MdbxTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, wal
 }
 
 func (tx *MdbxTx) CollectMetrics() {
-	if !metrics.Enabled {
+	if tx.db.opts.label != ethdb.Chain {
 		return
 	}
-	if tx.db.opts.label != ethdb.Chain {
+
+	info, err := tx.db.env.Info()
+	if err != nil {
 		return
 	}
+	if info.SinceReaderCheck.Hours() > 1 {
+		if staleReaders, err := tx.db.env.ReaderCheck(); err != nil {
+			tx.db.log.Error("failed ReaderCheck", "err", err)
+		} else if staleReaders > 0 {
+			tx.db.log.Info("[db] cleared reader slots from dead processes", "amount", staleReaders)
+		}
+	}
+
+	if !metrics.Enabled {
+		return
+	}
+	ethdb.DbSize.Update(int64(info.Geo.Current))
+	ethdb.DbPgopsNewly.Update(int64(info.PageOps.Newly))
+	ethdb.DbPgopsCow.Update(int64(info.PageOps.Cow))
+	ethdb.DbPgopsClone.Update(int64(info.PageOps.Clone))
+	ethdb.DbPgopsSplit.Update(int64(info.PageOps.Split))
+	ethdb.DbPgopsMerge.Update(int64(info.PageOps.Merge))
+	ethdb.DbPgopsSpill.Update(int64(info.PageOps.Spill))
+	ethdb.DbPgopsUnspill.Update(int64(info.PageOps.Unspill))
+	ethdb.DbPgopsWops.Update(int64(info.PageOps.Wops))
+
 	txInfo, err := tx.tx.Info(true)
 	if err != nil {
 		return
@@ -520,14 +519,50 @@ func (tx *MdbxTx) CollectMetrics() {
 	}
 	ethdb.GcLeafMetric.Update(int64(gc.LeafPages))
 	ethdb.GcOverflowMetric.Update(int64(gc.OverflowPages))
-	ethdb.GcPagesMetric.Update(int64((gc.LeafPages + gc.OverflowPages) * tx.db.pageSize / 8))
+	ethdb.GcPagesMetric.Update(int64((gc.LeafPages + gc.OverflowPages) * pageSize / 8))
 
-	state, err := tx.BucketStat(dbutils.PlainStateBucket)
-	if err != nil {
-		return
+	{
+		st, err := tx.BucketStat(dbutils.PlainStateBucket)
+		if err != nil {
+			return
+		}
+		ethdb.TableStateLeaf.Update(int64(st.LeafPages))
+		ethdb.TableStateBranch.Update(int64(st.BranchPages))
+		ethdb.TableStateEntries.Update(int64(st.Entries))
+		ethdb.TableStateSize.Update(int64(st.LeafPages+st.BranchPages+st.OverflowPages) * pageSize)
+	}
+	{
+		st, err := tx.BucketStat(dbutils.StorageChangeSetBucket)
+		if err != nil {
+			return
+		}
+		ethdb.TableScsLeaf.Update(int64(st.LeafPages))
+		ethdb.TableScsBranch.Update(int64(st.BranchPages))
+		ethdb.TableScsEntries.Update(int64(st.Entries))
+		ethdb.TableScsSize.Update(int64(st.LeafPages+st.BranchPages+st.OverflowPages) * pageSize)
+	}
+	{
+		st, err := tx.BucketStat(dbutils.EthTx)
+		if err != nil {
+			return
+		}
+		ethdb.TableTxLeaf.Update(int64(st.LeafPages))
+		ethdb.TableTxBranch.Update(int64(st.BranchPages))
+		ethdb.TableTxOverflow.Update(int64(st.OverflowPages))
+		ethdb.TableTxEntries.Update(int64(st.Entries))
+		ethdb.TableTxSize.Update(int64(st.LeafPages+st.BranchPages+st.OverflowPages) * pageSize)
+	}
+	{
+		st, err := tx.BucketStat(dbutils.Log)
+		if err != nil {
+			return
+		}
+		ethdb.TableLogLeaf.Update(int64(st.LeafPages))
+		ethdb.TableLogBranch.Update(int64(st.BranchPages))
+		ethdb.TableLogOverflow.Update(int64(st.OverflowPages))
+		ethdb.TableLogEntries.Update(int64(st.Entries))
+		ethdb.TableLogSize.Update(int64(st.LeafPages+st.BranchPages+st.OverflowPages) * pageSize)
 	}
-	ethdb.StateLeafMetric.Update(int64(state.LeafPages))
-	ethdb.StateBranchesMetric.Update(int64(state.BranchPages))
 }
 
 func (tx *MdbxTx) Comparator(bucket string) dbutils.CmpFunc {
@@ -956,7 +991,7 @@ func (tx *MdbxTx) BucketSize(name string) (uint64, error) {
 	if err != nil {
 		return 0, err
 	}
-	return (st.LeafPages + st.BranchPages + st.OverflowPages) * uint64(os.Getpagesize()), nil
+	return (st.LeafPages + st.BranchPages + st.OverflowPages) * pageSize, nil
 }
 
 func (tx *MdbxTx) BucketStat(name string) (*mdbx.Stat, error) {
diff --git a/ethdb/kv/kv_remote.go b/ethdb/kv/kv_remote.go
index 4c96c292cd..6487d9cb2a 100644
--- a/ethdb/kv/kv_remote.go
+++ b/ethdb/kv/kv_remote.go
@@ -213,8 +213,6 @@ func (db *RemoteKV) Close() {
 	}
 }
 
-func (db *RemoteKV) CollectMetrics() {}
-
 func (db *RemoteKV) BeginRo(ctx context.Context) (ethdb.Tx, error) {
 	streamCtx, streamCancelFn := context.WithCancel(ctx) // We create child context for the stream so we can cancel it to prevent leak
 	stream, err := db.remoteKV.Tx(streamCtx)
@@ -243,9 +241,8 @@ func (db *RemoteKV) Update(ctx context.Context, f func(tx ethdb.RwTx) error) (er
 	return fmt.Errorf("remote db provider doesn't support .Update method")
 }
 
-func (tx *remoteTx) Comparator(bucket string) dbutils.CmpFunc { panic("not implemented yet") }
-func (tx *remoteTx) CollectMetrics()                          {}
-func (tx *remoteTx) CHandle() unsafe.Pointer                  { panic("not implemented yet") }
+func (tx *remoteTx) CollectMetrics()         {}
+func (tx *remoteTx) CHandle() unsafe.Pointer { panic("not implemented yet") }
 
 func (tx *remoteTx) IncrementSequence(bucket string, amount uint64) (uint64, error) {
 	panic("not implemented yet")
@@ -283,6 +280,8 @@ func (tx *remoteTx) statelessCursor(bucket string) (ethdb.Cursor, error) {
 	return c, nil
 }
 
+func (tx *remoteTx) BucketSize(name string) (uint64, error) { panic("not implemented") }
+
 // TODO: this must be optimized - and implemented as single command on server, with server-side buffered streaming
 func (tx *remoteTx) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
 	c, err := tx.Cursor(bucket)
diff --git a/ethdb/kv/kv_snapshot.go b/ethdb/kv/kv_snapshot.go
index 25e12400b8..95b2e13df2 100644
--- a/ethdb/kv/kv_snapshot.go
+++ b/ethdb/kv/kv_snapshot.go
@@ -150,10 +150,6 @@ func (s *SnapshotKV) SnapshotKV(bucket string) ethdb.RoKV {
 	return s.snapshots[bucket].snapshot
 }
 
-func (s *SnapshotKV) CollectMetrics() {
-	s.db.CollectMetrics()
-}
-
 func (s *SnapshotKV) BeginRo(ctx context.Context) (ethdb.Tx, error) {
 	dbTx, err := s.db.BeginRo(ctx)
 	if err != nil {
@@ -345,7 +341,9 @@ func (s *snTX) Delete(bucket string, k, v []byte) error {
 }
 
 func (s *snTX) CollectMetrics() {
-	s.dbTX.CollectMetrics()
+	if rw, ok := s.dbTX.(ethdb.RwTx); ok {
+		rw.CollectMetrics()
+	}
 }
 
 func (s *snTX) getSnapshotTX(bucket string) (ethdb.Tx, error) {
@@ -465,12 +463,8 @@ func (s *snTX) Rollback() {
 
 }
 
-func (s *snTX) BucketSize(name string) (uint64, error) {
-	panic("implement me")
-}
-
-func (s *snTX) Comparator(bucket string) dbutils.CmpFunc {
-	return s.dbTX.Comparator(bucket)
+func (s *snTX) BucketSize(bucket string) (uint64, error) {
+	return s.dbTX.BucketSize(bucket)
 }
 
 func (s *snTX) IncrementSequence(bucket string, amount uint64) (uint64, error) {
diff --git a/ethdb/kv_interface.go b/ethdb/kv_interface.go
index b1b072263f..41a87c19f4 100644
--- a/ethdb/kv_interface.go
+++ b/ethdb/kv_interface.go
@@ -37,14 +37,30 @@ var (
 	DbPgopsUnspill = metrics.GetOrRegisterGauge("db/pgops/unspill", metrics.DefaultRegistry) //nolint
 	DbPgopsWops    = metrics.GetOrRegisterGauge("db/pgops/wops", metrics.DefaultRegistry)    //nolint
 
+	DbCommitBigBatchTimer = metrics.NewRegisteredTimer("db/commit/big_batch", nil)
+
 	GcLeafMetric     = metrics.GetOrRegisterGauge("db/gc/leaf", metrics.DefaultRegistry)     //nolint
 	GcOverflowMetric = metrics.GetOrRegisterGauge("db/gc/overflow", metrics.DefaultRegistry) //nolint
 	GcPagesMetric    = metrics.GetOrRegisterGauge("db/gc/pages", metrics.DefaultRegistry)    //nolint
 
-	StateLeafMetric     = metrics.GetOrRegisterGauge("db/state/leaf", metrics.DefaultRegistry)   //nolint
-	StateBranchesMetric = metrics.GetOrRegisterGauge("db/state/branch", metrics.DefaultRegistry) //nolint
-
-	DbCommitBigBatchTimer = metrics.NewRegisteredTimer("db/commit/big_batch", nil)
+	TableScsLeaf      = metrics.GetOrRegisterGauge("table/scs/leaf", metrics.DefaultRegistry)      //nolint
+	TableScsBranch    = metrics.GetOrRegisterGauge("table/scs/branch", metrics.DefaultRegistry)    //nolint
+	TableScsEntries   = metrics.GetOrRegisterGauge("table/scs/entries", metrics.DefaultRegistry)   //nolint
+	TableScsSize      = metrics.GetOrRegisterGauge("table/scs/size", metrics.DefaultRegistry)      //nolint
+	TableStateLeaf    = metrics.GetOrRegisterGauge("table/state/leaf", metrics.DefaultRegistry)    //nolint
+	TableStateBranch  = metrics.GetOrRegisterGauge("table/state/branch", metrics.DefaultRegistry)  //nolint
+	TableStateEntries = metrics.GetOrRegisterGauge("table/state/entries", metrics.DefaultRegistry) //nolint
+	TableStateSize    = metrics.GetOrRegisterGauge("table/state/size", metrics.DefaultRegistry)    //nolint
+	TableLogLeaf      = metrics.GetOrRegisterGauge("table/log/leaf", metrics.DefaultRegistry)      //nolint
+	TableLogBranch    = metrics.GetOrRegisterGauge("table/log/branch", metrics.DefaultRegistry)    //nolint
+	TableLogOverflow  = metrics.GetOrRegisterGauge("table/log/overflow", metrics.DefaultRegistry)  //nolint
+	TableLogEntries   = metrics.GetOrRegisterGauge("table/log/entries", metrics.DefaultRegistry)   //nolint
+	TableLogSize      = metrics.GetOrRegisterGauge("table/log/size", metrics.DefaultRegistry)      //nolint
+	TableTxLeaf       = metrics.GetOrRegisterGauge("table/tx/leaf", metrics.DefaultRegistry)       //nolint
+	TableTxBranch     = metrics.GetOrRegisterGauge("table/tx/branch", metrics.DefaultRegistry)     //nolint
+	TableTxOverflow   = metrics.GetOrRegisterGauge("table/tx/overflow", metrics.DefaultRegistry)   //nolint
+	TableTxEntries    = metrics.GetOrRegisterGauge("table/tx/entries", metrics.DefaultRegistry)    //nolint
+	TableTxSize       = metrics.GetOrRegisterGauge("table/tx/size", metrics.DefaultRegistry)       //nolint
 )
 
 type DBVerbosityLvl int8
@@ -92,7 +108,7 @@ type Closer interface {
 	Close()
 }
 
-// Read-only version of KV.
+// RoKV - Read-only version of KV.
 type RoKV interface {
 	Closer
 
@@ -113,11 +129,9 @@ type RoKV interface {
 	//	Commit and Rollback while it has active child transactions.
 	BeginRo(ctx context.Context) (Tx, error)
 	AllBuckets() dbutils.BucketsCfg
-
-	CollectMetrics()
 }
 
-// KV low-level database interface - main target is - to provide common abstraction over top of MDBX and RemoteKV.
+// RwKV low-level database interface - main target is - to provide common abstraction over top of MDBX and RemoteKV.
 //
 // Common pattern for short-living transactions:
 //
@@ -160,6 +174,8 @@ type StatelessReadTx interface {
 	// Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort.
 	// Starts from 0.
 	ReadSequence(bucket string) (uint64, error)
+
+	BucketSize(bucket string) (uint64, error)
 }
 
 type StatelessWriteTx interface {
@@ -192,10 +208,7 @@ type Tx interface {
 	ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error
 	ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error
 
-	Comparator(bucket string) dbutils.CmpFunc
-
 	CHandle() unsafe.Pointer // Pointer to the underlying C transaction handle (e.g. *C.MDB_txn)
-	CollectMetrics()
 }
 
 type RwTx interface {
@@ -205,6 +218,10 @@ type RwTx interface {
 
 	RwCursor(bucket string) (RwCursor, error)
 	RwCursorDupSort(bucket string) (RwCursorDupSort, error)
+
+	// CollectMetrics - does collect all DB-related and Tx-related metrics
+	// this method exists only in RwTx to avoid concurrency
+	CollectMetrics()
 }
 
 // BucketMigrator used for buckets migration, don't use it in usual app code
diff --git a/go.mod b/go.mod
index adbd1ec6ab..77c6724902 100644
--- a/go.mod
+++ b/go.mod
@@ -53,7 +53,7 @@ require (
 	github.com/spf13/cobra v1.1.3
 	github.com/spf13/pflag v1.0.5
 	github.com/stretchr/testify v1.7.0
-	github.com/torquem-ch/mdbx-go v0.12.0 // indirect
+	github.com/torquem-ch/mdbx-go v0.13.0
 	github.com/ugorji/go/codec v1.1.13
 	github.com/ugorji/go/codec/codecgen v1.1.13
 	github.com/urfave/cli v1.22.4
diff --git a/go.sum b/go.sum
index ef5673d74b..7a9acce2e7 100644
--- a/go.sum
+++ b/go.sum
@@ -984,8 +984,8 @@ github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZ
 github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
-github.com/torquem-ch/mdbx-go v0.12.0 h1:hhD9lJtKJPPKCyDoguxpgEoKgQC+derFUSeDGX0PyHk=
-github.com/torquem-ch/mdbx-go v0.12.0/go.mod h1:T2fsoJDVppxfAPTLd1svUgH1kpPmeXdPESmroSHcL1E=
+github.com/torquem-ch/mdbx-go v0.13.0 h1:+QSPxgDKXgv/5SlSXFgmM5OVL1KOB25y7j2U09wLsXg=
+github.com/torquem-ch/mdbx-go v0.13.0/go.mod h1:T2fsoJDVppxfAPTLd1svUgH1kpPmeXdPESmroSHcL1E=
 github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
 github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
 github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
diff --git a/turbo/node/node.go b/turbo/node/node.go
index c63d2c1351..33b04fe828 100644
--- a/turbo/node/node.go
+++ b/turbo/node/node.go
@@ -76,8 +76,6 @@ func New(
 
 	ethereum := RegisterEthService(node, ethConfig)
 
-	metrics.AddCallback(ethereum.ChainKV().CollectMetrics)
-
 	return &ErigonNode{stack: node, backend: ethereum}
 }
 
-- 
GitLab