From 52d6f1d551f23ac6e241d1a42c1f388ca5401ea6 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Tue, 24 May 2022 12:24:20 +0700 Subject: [PATCH] kv_mdbx: atomic closed flag #464 (#4247) --- cmd/sentry/sentry/sentry_multy_client.go | 2 +- eth/integrity/trie.go | 24 ++++++++++++++++-------- go.mod | 2 +- go.sum | 4 ++-- turbo/snapshotsync/block_snapshots.go | 6 ++++-- turbo/trie/structural_branch_test.go | 1 - 6 files changed, 24 insertions(+), 15 deletions(-) diff --git a/cmd/sentry/sentry/sentry_multy_client.go b/cmd/sentry/sentry/sentry_multy_client.go index 5fb8c1e212..f5e6035745 100644 --- a/cmd/sentry/sentry/sentry_multy_client.go +++ b/cmd/sentry/sentry/sentry_multy_client.go @@ -288,7 +288,7 @@ func RecvMessage( return default: } - if err = handleInboundMessage(ctx, req, sentry); err != nil { + if err := handleInboundMessage(ctx, req, sentry); err != nil { if rlp.IsInvalidRLPError(err) { log.Debug("[RecvMessage] Kick peer for invalid RLP", "err", err) outreq := proto_sentry.PenalizePeerRequest{ diff --git a/eth/integrity/trie.go b/eth/integrity/trie.go index ba6324899c..d3ff9a6095 100644 --- a/eth/integrity/trie.go +++ b/eth/integrity/trie.go @@ -27,6 +27,9 @@ func AssertSubset(prefix []byte, a, b uint16) { func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) { quit := ctx.Done() + readAheadCtx, cancel := context.WithCancel(ctx) + defer cancel() + logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() seek := make([]byte, 256) @@ -34,23 +37,26 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) { buf2 := make([]byte, 256) { - kv.ReadAhead(ctx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32) - kv.ReadAhead(ctx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32) c, err := tx.Cursor(kv.TrieOfAccounts) if err != nil { panic(err) } + defer c.Close() + kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32) + trieAcc2, err := tx.Cursor(kv.TrieOfAccounts) if err != nil { panic(err) } + defer trieAcc2.Close() + accC, err := tx.Cursor(kv.HashedAccounts) if err != nil { panic(err) } - defer c.Close() - defer trieAcc2.Close() defer accC.Close() + kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32) + for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() { if errc != nil { panic(errc) @@ -145,23 +151,25 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) { } } { - kv.ReadAhead(ctx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32) - kv.ReadAhead(ctx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32) c, err := tx.Cursor(kv.TrieOfStorage) if err != nil { panic(err) } + defer c.Close() + kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32) + trieStorage, err := tx.Cursor(kv.TrieOfStorage) if err != nil { panic(err) } + defer trieStorage.Close() + storageC, err := tx.Cursor(kv.HashedStorage) if err != nil { panic(err) } - defer c.Close() - defer trieStorage.Close() defer storageC.Close() + kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32) for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() { if errc != nil { diff --git a/go.mod b/go.mod index c9311aed79..8c35906dc4 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220523035115-a36ebfc60970 + github.com/ledgerwatch/erigon-lib v0.0.0-20220524050710-eff88cf2616c github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/pelletier/go-toml v1.9.5 diff --git a/go.sum b/go.sum index f72b718a2f..aa9948edc9 100644 --- a/go.sum +++ b/go.sum @@ -382,8 +382,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220523035115-a36ebfc60970 h1:aLs60OpPKGG3s+9xuPLWQdf/1yqsWFDSDYBlat6QvX8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220523035115-a36ebfc60970/go.mod h1:owzCqIrpNrvfFfvJ9dySWr5IPJOgCOdJ1C6iITZSPGQ= +github.com/ledgerwatch/erigon-lib v0.0.0-20220524050710-eff88cf2616c h1:/LJLg4aHLzHqLFdM1pjw2oCscfuNfL38/JRW6X7mKcw= +github.com/ledgerwatch/erigon-lib v0.0.0-20220524050710-eff88cf2616c/go.mod h1:owzCqIrpNrvfFfvJ9dySWr5IPJOgCOdJ1C6iITZSPGQ= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index e72f4ad13f..98c51a7137 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -1001,6 +1001,8 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockFrom, blockTo uint64, workers int, lvl log.Lvl) (firstTxID uint64, err error) { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() + warmupCtx, cancel := context.WithCancel(ctx) + defer cancel() chainConfig := tool.ChainConfigFromDB(db) chainID, _ := uint256.FromBig(chainConfig.ChainID) @@ -1081,10 +1083,10 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF return true, nil } if doWarmup && !warmupSenders.Load() && blockNum%1_000 == 0 { - kv.ReadAhead(ctx, db, warmupSenders, kv.Senders, dbutils.EncodeBlockNumber(blockNum), 10_000) + kv.ReadAhead(warmupCtx, db, warmupSenders, kv.Senders, dbutils.EncodeBlockNumber(blockNum), 10_000) } if doWarmup && !warmupTxs.Load() && blockNum%1_000 == 0 { - kv.ReadAhead(ctx, db, warmupTxs, kv.EthTx, dbutils.EncodeBlockNumber(body.BaseTxId), 100*10_000) + kv.ReadAhead(warmupCtx, db, warmupTxs, kv.EthTx, dbutils.EncodeBlockNumber(body.BaseTxId), 100*10_000) } senders, err := rawdb.ReadSenders(tx, h, blockNum) if err != nil { diff --git a/turbo/trie/structural_branch_test.go b/turbo/trie/structural_branch_test.go index 318e10604c..7512479eca 100644 --- a/turbo/trie/structural_branch_test.go +++ b/turbo/trie/structural_branch_test.go @@ -31,7 +31,6 @@ import ( ) func TestIHCursor(t *testing.T) { - t.Skip("FIXME #4236") db, tx := memdb.NewTestTx(t) require := require.New(t) hash := common.HexToHash(fmt.Sprintf("%064d", 0)) -- GitLab