diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index 10e4d98a3cfd79e8b11a74aca3e546dcadfa4fe7..bce525de0f0ee0261419cb742f7d84a422b4773c 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -106,9 +106,16 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, } return h } + db := kv.NewMemKV() + + tx, err := db.BeginRw(context.Background()) + if err != nil { + return nil, nil, err + } + defer tx.Rollback() + var ( - db = kv.NewMemDatabase() - ibs = MakePreState(chainConfig.Rules(0), db, pre.Pre) + ibs = MakePreState(chainConfig.Rules(0), tx, pre.Pre) signer = types.MakeSigner(chainConfig, pre.Env.Number) gaspool = new(core.GasPool) blockHash = common.Hash{0x13, 0x37} @@ -146,20 +153,20 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, misc.ApplyDAOHardFork(ibs) } - for i, tx := range txs { - msg, err := tx.AsMessage(*signer, pre.Env.BaseFee) + for i, txn := range txs { + msg, err := txn.AsMessage(*signer, pre.Env.BaseFee) if err != nil { - log.Warn("rejected tx", "index", i, "hash", tx.Hash(), "error", err) + log.Warn("rejected txn", "index", i, "hash", txn.Hash(), "error", err) rejectedTxs = append(rejectedTxs, &rejectedTx{i, err.Error()}) continue } - tracer, err := getTracerFn(txIndex, tx.Hash()) + tracer, err := getTracerFn(txIndex, txn.Hash()) if err != nil { return nil, nil, err } vmConfig.Tracer = tracer vmConfig.Debug = (tracer != nil) - ibs.Prepare(tx.Hash(), blockHash, txIndex) + ibs.Prepare(txn.Hash(), blockHash, txIndex) txContext := core.NewEVMTxContext(msg) snapshot := ibs.Snapshot() evm := vm.NewEVM(vmContext, txContext, ibs, chainConfig, vmConfig) @@ -168,11 +175,11 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, msgResult, err := core.ApplyMessage(evm, msg, gaspool, true /* refunds */, false /* gasBailout */) if err != nil { ibs.RevertToSnapshot(snapshot) - log.Info("rejected tx", "index", i, "hash", tx.Hash(), "from", msg.From(), "error", err) + log.Info("rejected txn", "index", i, "hash", txn.Hash(), "from", msg.From(), "error", err) rejectedTxs = append(rejectedTxs, &rejectedTx{i, err.Error()}) continue } - includedTxs = append(includedTxs, tx) + includedTxs = append(includedTxs, txn) if hashError != nil { return nil, nil, NewError(ErrorMissingBlockhash, hashError) } @@ -181,23 +188,23 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, // Receipt: { // Create a new receipt for the transaction, storing the intermediate root and - // gas used by the tx. - receipt := &types.Receipt{Type: tx.Type(), CumulativeGasUsed: gasUsed} + // gas used by the txn. + receipt := &types.Receipt{Type: txn.Type(), CumulativeGasUsed: gasUsed} if msgResult.Failed() { receipt.Status = types.ReceiptStatusFailed } else { receipt.Status = types.ReceiptStatusSuccessful } - receipt.TxHash = tx.Hash() + receipt.TxHash = txn.Hash() receipt.GasUsed = msgResult.UsedGas // If the transaction created a contract, store the creation address in the receipt. if msg.To() == nil { - receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.GetNonce()) + receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, txn.GetNonce()) } // Set the receipt logs and create a bloom for filtering - receipt.Logs = ibs.GetLogs(tx.Hash()) + receipt.Logs = ibs.GetLogs(txn.Hash()) receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) // These three are non-consensus fields: //receipt.BlockHash @@ -233,19 +240,19 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, ibs.AddBalance(pre.Env.Coinbase, minerReward) } - err := ibs.FinalizeTx(chainConfig.Rules(1), state.NewDbStateWriter(db, 1)) - if err != nil { - return nil, nil, err - } // Commit block var root common.Hash - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { - root, err = trie.CalcRoot("", tx) - return err - }) + if err = ibs.FinalizeTx(chainConfig.Rules(1), state.NewPlainStateWriter(tx, tx, 1)); err != nil { + return nil, nil, err + } + root, err = trie.CalcRoot("", tx) if err != nil { return nil, nil, err } + if err = tx.Commit(); err != nil { + return nil, nil, err + } + execRs := &ExecutionResult{ StateRoot: root, TxRoot: types.DeriveSha(includedTxs), @@ -255,12 +262,12 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, Receipts: receipts, Rejected: rejectedTxs, } - return db.RwKV(), execRs, nil + return db, execRs, nil } -func MakePreState(chainRules params.Rules, db ethdb.Database, accounts core.GenesisAlloc) *state.IntraBlockState { +func MakePreState(chainRules params.Rules, db ethdb.RwTx, accounts core.GenesisAlloc) *state.IntraBlockState { var blockNr uint64 = 0 - r, _ := state.NewDbStateReader(db), state.NewDbStateWriter(db, blockNr) + r, _ := state.NewPlainStateReader(db), state.NewPlainStateWriter(db, db, blockNr) statedb := state.New(r) for addr, a := range accounts { statedb.SetCode(addr, a.Code) @@ -278,10 +285,10 @@ func MakePreState(chainRules params.Rules, db ethdb.Database, accounts core.Gene } } // Commit and re-open to start with a clean state. - if err := statedb.FinalizeTx(chainRules, state.NewDbStateWriter(db, blockNr+1)); err != nil { + if err := statedb.FinalizeTx(chainRules, state.NewPlainStateWriter(db, db, blockNr+1)); err != nil { panic(err) } - if err := statedb.CommitBlock(chainRules, state.NewDbStateWriter(db, blockNr+1)); err != nil { + if err := statedb.CommitBlock(chainRules, state.NewPlainStateWriter(db, db, blockNr+1)); err != nil { panic(err) } return statedb diff --git a/cmd/snapshots/tracker/commands/root.go b/cmd/snapshots/tracker/commands/root.go index ecbb90a7005cd561bbe591ecdf06c97fa95bc161..798d758d38dca016351be34ddd8bfaa4dc6d0696 100644 --- a/cmd/snapshots/tracker/commands/root.go +++ b/cmd/snapshots/tracker/commands/root.go @@ -76,7 +76,7 @@ var rootCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { db := kv.MustOpen(args[0]) m := http.NewServeMux() - m.Handle("/announce", &Tracker{db: kv.NewObjectDatabase(db)}) + m.Handle("/announce", &Tracker{db: db}) m.HandleFunc("/scrape", func(writer http.ResponseWriter, request *http.Request) { log.Warn("scrape", "url", request.RequestURI) ih := request.URL.Query().Get("info_hash") @@ -145,7 +145,7 @@ var rootCmd = &cobra.Command{ } type Tracker struct { - db ethdb.Database + db ethdb.RwKV } /* @@ -210,32 +210,43 @@ func (t *Tracker) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := append(req.InfoHash, req.PeerID...) if req.Event == tracker.Stopped.String() { - err = t.db.Delete(dbutils.SnapshotInfoBucket, key, nil) + err = t.db.Update(context.Background(), func(tx ethdb.RwTx) error { + return tx.Delete(dbutils.SnapshotInfoBucket, key, nil) + }) if err != nil { log.Error("Json marshal", "err", err) WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) return } } else { - if prevBytes, err := t.db.Get(dbutils.SnapshotInfoBucket, key); err == nil && len(prevBytes) > 0 { - prev := new(AnnounceReqWithTime) - err = json.Unmarshal(prevBytes, prev) - if err != nil { - log.Error("Unable to unmarshall", "err", err) - } - if time.Since(prev.UpdatedAt) < time.Second*SoftLimit { - //too early to update - WriteResp(w, ErrResponse{FailureReason: "too early to update"}, req.Compact) - return - - } - } else if !errors.Is(err, ethdb.ErrKeyNotFound) && err != nil { + var prevBytes []byte + err = t.db.View(context.Background(), func(tx ethdb.Tx) error { + prevBytes, err = tx.GetOne(dbutils.SnapshotInfoBucket, key) + return err + }) + if err != nil { log.Error("get from db is return error", "err", err) WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) return } - err = t.db.Put(dbutils.SnapshotInfoBucket, key, peerBytes) + if prevBytes == nil { + return + } + + prev := new(AnnounceReqWithTime) + err = json.Unmarshal(prevBytes, prev) if err != nil { + log.Error("Unable to unmarshall", "err", err) + } + if time.Since(prev.UpdatedAt) < time.Second*SoftLimit { + //too early to update + WriteResp(w, ErrResponse{FailureReason: "too early to update"}, req.Compact) + return + + } + if err = t.db.Update(context.Background(), func(tx ethdb.RwTx) error { + return tx.Put(dbutils.SnapshotInfoBucket, key, peerBytes) + }); err != nil { log.Error("db.Put", "err", err) WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) return @@ -247,31 +258,32 @@ func (t *Tracker) ServeHTTP(w http.ResponseWriter, r *http.Request) { TrackerId: trackerID, } - err = t.db.ForPrefix(dbutils.SnapshotInfoBucket, append(req.InfoHash, make([]byte, 20)...), func(k, v []byte) error { - a := AnnounceReqWithTime{} - err = json.Unmarshal(v, &a) - if err != nil { - log.Error("Fail to unmarshall", "k", common.Bytes2Hex(k), "err", err) - //skip failed - return nil - } - if time.Since(a.UpdatedAt) > 5*DisconnectInterval { - log.Debug("Skipped requset", "peer", common.Bytes2Hex(a.PeerID), "last updated", a.UpdatedAt, "now", time.Now()) + if err := t.db.View(context.Background(), func(tx ethdb.Tx) error { + return tx.ForPrefix(dbutils.SnapshotInfoBucket, append(req.InfoHash, make([]byte, 20)...), func(k, v []byte) error { + a := AnnounceReqWithTime{} + err = json.Unmarshal(v, &a) + if err != nil { + log.Error("Fail to unmarshall", "k", common.Bytes2Hex(k), "err", err) + //skip failed + return nil + } + if time.Since(a.UpdatedAt) > 5*DisconnectInterval { + log.Debug("Skipped requset", "peer", common.Bytes2Hex(a.PeerID), "last updated", a.UpdatedAt, "now", time.Now()) + return nil + } + if a.Left == 0 { + resp.Complete++ + } else { + resp.Incomplete++ + } + resp.Peers = append(resp.Peers, map[string]interface{}{ + "ip": a.RemoteAddr.String(), + "peer id": a.PeerID, + "port": a.Port, + }) return nil - } - if a.Left == 0 { - resp.Complete++ - } else { - resp.Incomplete++ - } - resp.Peers = append(resp.Peers, map[string]interface{}{ - "ip": a.RemoteAddr.String(), - "peer id": a.PeerID, - "port": a.Port, }) - return nil - }) - if err != nil { + }); err != nil { log.Error("Walk", "err", err) WriteResp(w, ErrResponse{FailureReason: err.Error()}, req.Compact) return diff --git a/core/genesis.go b/core/genesis.go index c59482699e90c61d9212efb6736c4947372d4d81..bcfcc1a5b6715845039e819d2364f7ae337742cb 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -447,7 +447,7 @@ func (g *Genesis) MustWrite(tx ethdb.RwTx, history bool) (*types.Block, *state.I return b, s } -// Commit writes the block and state of a genesis specification to the database. +// Write writes the block and state of a genesis specification to the database. // The block is committed as the canonical head block. func (g *Genesis) Write(tx ethdb.RwTx) (*types.Block, *state.IntraBlockState, error) { block, statedb, err2 := g.WriteGenesisState(tx) @@ -485,22 +485,6 @@ func (g *Genesis) Write(tx ethdb.RwTx) (*types.Block, *state.IntraBlockState, er return block, statedb, nil } -func (g *Genesis) Commit(db ethdb.Database, history bool) (*types.Block, error) { - tx, err := db.Begin(context.Background(), ethdb.RW) - if err != nil { - return nil, err - } - defer tx.Rollback() - block, _, err := g.Write(tx.(ethdb.HasTx).Tx().(ethdb.RwTx)) - if err != nil { - return block, err - } - if err := tx.Commit(); err != nil { - return block, err - } - return block, nil -} - // MustCommit writes the genesis block and state to db, panicking on error. // The block is committed as the canonical head block. func (g *Genesis) MustCommit(db ethdb.RwKV) *types.Block { diff --git a/core/rawdb/accessors_account.go b/core/rawdb/accessors_account.go index 9d031419a10f6a3260558c9747adf72ee2881258..0bbe72f4ea8ded5c3794290a80a1f49ad8e7f331 100644 --- a/core/rawdb/accessors_account.go +++ b/core/rawdb/accessors_account.go @@ -23,19 +23,6 @@ import ( "github.com/ledgerwatch/erigon/ethdb" ) -// ReadAccountDeprecated reading account object from multiple buckets of db -func ReadAccountDeprecated(db ethdb.DatabaseReader, addrHash common.Hash, acc *accounts.Account) (bool, error) { - addrHashBytes := addrHash[:] - enc, err := db.Get(dbutils.HashedAccountsBucket, addrHashBytes) - if err != nil { - return false, err - } - if err = acc.DecodeForStorage(enc); err != nil { - return false, err - } - return true, nil -} - func ReadAccount(db ethdb.Tx, addrHash common.Address, acc *accounts.Account) (bool, error) { addrHashBytes := addrHash[:] enc, err := db.GetOne(dbutils.PlainStateBucket, addrHashBytes) diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 18ff34556016d8ebd18ce124c0d1d554efcb3d30..65cf9ccab0c408a58eecd71bd0dd21f9250d2111 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -130,17 +130,3 @@ func ReadReceipt(db ethdb.Tx, txHash common.Hash) (*types.Receipt, common.Hash, log.Error("Receipt not found", "number", blockNumber, "hash", blockHash, "txhash", txHash) return nil, common.Hash{}, 0, 0, nil } - -// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given -// section and bit index from the. -func ReadBloomBits(db ethdb.DatabaseReader, bit uint, section uint64, head common.Hash) ([]byte, error) { - return db.Get(dbutils.BloomBitsPrefix, dbutils.BloomBitsKey(bit, section, head)) -} - -// WriteBloomBits stores the compressed bloom bits vector belonging to the given -// section and bit index. -func WriteBloomBits(db ethdb.Putter, bit uint, section uint64, head common.Hash, bits []byte) { - if err := db.Put(dbutils.BloomBitsPrefix, dbutils.BloomBitsKey(bit, section, head), bits); err != nil { - log.Crit("Failed to store bloom bits", "err", err) - } -} diff --git a/core/state/plain_state_writer.go b/core/state/plain_state_writer.go index 369b07e4c5e42b871fad58b39987928a8eba36ea..a2b25c846292710bc1dccdcb3c804a54450bf7b0 100644 --- a/core/state/plain_state_writer.go +++ b/core/state/plain_state_writer.go @@ -30,7 +30,7 @@ func NewPlainStateWriter(db plainStateWriterDB, changeSetsDB ethdb.RwTx, blockNu } } -func NewPlainStateWriterNoHistory(db ethdb.Database) *PlainStateWriter { +func NewPlainStateWriterNoHistory(db plainStateWriterDB) *PlainStateWriter { return &PlainStateWriter{ db: db, } diff --git a/eth/tracers/tracers_test.go b/eth/tracers/tracers_test.go index 60db77bc436d507d80a56d26d85fc170e009ff03..022f95e483c742c61d1c861fa838b4a2c718ed14 100644 --- a/eth/tracers/tracers_test.go +++ b/eth/tracers/tracers_test.go @@ -39,6 +39,7 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/tests" + "github.com/stretchr/testify/require" "github.com/holiman/uint256" ) @@ -132,7 +133,7 @@ func TestPrestateTracerCreate2(t *testing.T) { t.Fatalf("err %v", err) } signer := types.LatestSignerForChainID(big.NewInt(1)) - tx, err := types.SignTx(unsignedTx, *signer, privateKeyECDSA) + txn, err := types.SignTx(unsignedTx, *signer, privateKeyECDSA) if err != nil { t.Fatalf("err %v", err) } @@ -145,7 +146,7 @@ func TestPrestateTracerCreate2(t *testing.T) { gas (assuming no mem expansion): 32006 result: 0x60f3f640a8508fC6a86d45DF051962668E1e8AC7 */ - origin, _ := signer.Sender(tx) + origin, _ := signer.Sender(txn) txContext := vm.TxContext{ Origin: origin, GasPrice: big.NewInt(1), @@ -174,7 +175,9 @@ func TestPrestateTracerCreate2(t *testing.T) { Code: []byte{}, Balance: big.NewInt(500000000000000), } - statedb, _ := tests.MakePreState(params.Rules{}, kv.NewTestDB(t), alloc, context.BlockNumber) + + _, tx := kv.NewTestTx(t) + statedb, _ := tests.MakePreState(params.Rules{}, tx, alloc, context.BlockNumber) // Create the tracer, the EVM environment and run it tracer, err := New("prestateTracer", txContext) @@ -183,11 +186,11 @@ func TestPrestateTracerCreate2(t *testing.T) { } evm := vm.NewEVM(context, txContext, statedb, params.MainnetChainConfig, vm.Config{Debug: true, Tracer: tracer}) - msg, err := tx.AsMessage(*signer, nil) + msg, err := txn.AsMessage(*signer, nil) if err != nil { t.Fatalf("failed to prepare transaction for tracing: %v", err) } - st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.GetGas())) + st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(txn.GetGas())) if _, err = st.TransitionDb(false, false); err != nil { t.Fatalf("failed to execute transaction: %v", err) } @@ -230,15 +233,15 @@ func TestCallTracer(t *testing.T) { t.Fatalf("failed to parse testcase: %v", err) } // Configure a blockchain with the given prestate - tx, err := types.DecodeTransaction(rlp.NewStream(bytes.NewReader(common.FromHex(test.Input)), 0)) + txn, err := types.DecodeTransaction(rlp.NewStream(bytes.NewReader(common.FromHex(test.Input)), 0)) if err != nil { t.Fatalf("failed to parse testcase input: %v", err) } signer := types.MakeSigner(test.Genesis.Config, uint64(test.Context.Number)) - origin, _ := signer.Sender(tx) + origin, _ := signer.Sender(txn) txContext := vm.TxContext{ Origin: origin, - GasPrice: big.NewInt(int64(tx.GetPrice().Uint64())), + GasPrice: big.NewInt(int64(txn.GetPrice().Uint64())), } context := vm.BlockContext{ CanTransfer: core.CanTransfer, @@ -250,7 +253,10 @@ func TestCallTracer(t *testing.T) { GasLimit: uint64(test.Context.GasLimit), CheckTEVM: func(common.Hash) (bool, error) { return false, nil }, } - statedb, _ := tests.MakePreState(params.Rules{}, kv.NewTestDB(t), test.Genesis.Alloc, uint64(test.Context.Number)) + + _, tx := kv.NewTestTx(t) + statedb, err := tests.MakePreState(params.Rules{}, tx, test.Genesis.Alloc, uint64(test.Context.Number)) + require.NoError(t, err) // Create the tracer, the EVM environment and run it tracer, err := New("callTracer", txContext) @@ -259,11 +265,11 @@ func TestCallTracer(t *testing.T) { } evm := vm.NewEVM(context, txContext, statedb, test.Genesis.Config, vm.Config{Debug: true, Tracer: tracer}) - msg, err := tx.AsMessage(*signer, nil) + msg, err := txn.AsMessage(*signer, nil) if err != nil { t.Fatalf("failed to prepare transaction for tracing: %v", err) } - st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.GetGas())) + st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(txn.GetGas())) if _, err = st.TransitionDb(false, false); err != nil { t.Fatalf("failed to execute transaction: %v", err) } diff --git a/ethdb/kv/kv_mdbx.go b/ethdb/kv/kv_mdbx.go index 4185d181bedbf51fbd6d0999689045f463e98514..6a3a1d4edbb7d8040bf0e72408cc2220c9984832 100644 --- a/ethdb/kv/kv_mdbx.go +++ b/ethdb/kv/kv_mdbx.go @@ -678,11 +678,11 @@ func (tx *MdbxTx) DropBucket(bucket string) error { return tx.dropEvenIfBucketIsNotDeprecated(bucket) } -func (tx *MdbxTx) ExistsBucket(bucket string) bool { +func (tx *MdbxTx) ExistsBucket(bucket string) (bool, error) { if cfg, ok := tx.db.buckets[bucket]; ok { - return cfg.DBI != NonExistingDBI + return cfg.DBI != NonExistingDBI, nil } - return false + return false, nil } func (tx *MdbxTx) Commit() error { diff --git a/ethdb/kv/kv_snapshot.go b/ethdb/kv/kv_snapshot.go index 85f27c66505bf05f2f49cd71ab50ae138d8ced1c..373b650fde92e5dacc9eaa61f92b0e78c517dca5 100644 --- a/ethdb/kv/kv_snapshot.go +++ b/ethdb/kv/kv_snapshot.go @@ -7,6 +7,8 @@ import ( "fmt" "sync" + "github.com/ledgerwatch/erigon/log" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/ethdb" @@ -346,7 +348,7 @@ func (s *snTX) CreateBucket(bucket string) error { return s.dbTX.(ethdb.BucketMigrator).CreateBucket(bucket) } -func (s *snTX) ExistsBucket(bucket string) bool { +func (s *snTX) ExistsBucket(bucket string) (bool, error) { return s.dbTX.(ethdb.BucketMigrator).ExistsBucket(bucket) } diff --git a/ethdb/kv/object_db.go b/ethdb/kv/object_db.go index 21e22aa3d35de6bb19caa730c83b419d3cd297d1..2f7dd68b110b56d9eff796a27bbe7382a75e1bd3 100644 --- a/ethdb/kv/object_db.go +++ b/ethdb/kv/object_db.go @@ -217,12 +217,15 @@ func (db *ObjectDatabase) Delete(bucket string, k, v []byte) error { func (db *ObjectDatabase) BucketExists(name string) (bool, error) { exists := false - if err := db.kv.View(context.Background(), func(tx ethdb.Tx) error { + if err := db.kv.View(context.Background(), func(tx ethdb.Tx) (err error) { migrator, ok := tx.(ethdb.BucketMigrator) if !ok { return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) } - exists = migrator.ExistsBucket(name) + exists, err = migrator.ExistsBucket(name) + if err != nil { + return err + } return nil }); err != nil { return false, err diff --git a/ethdb/kv/tx_db.go b/ethdb/kv/tx_db.go index 19d64fc3b54d008631e974edd88e38a6a24b5b33..987f7642851cd21d8e63dfba97c73ee05cd071b6 100644 --- a/ethdb/kv/tx_db.go +++ b/ethdb/kv/tx_db.go @@ -103,13 +103,6 @@ func (m *TxDb) Close() { panic("don't call me") } -// NewTxDbWithoutTransaction creates TxDb object without opening transaction, -// such TxDb not usable before .Begin() call on it -// It allows inject TxDb object into class hierarchy, but open write transaction later -func NewTxDbWithoutTransaction(db ethdb.Database, flags ethdb.TxFlags) ethdb.DbWithPendingMutations { - return &TxDb{db: db, txFlags: flags} -} - func (m *TxDb) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { batch := m if m.tx != nil { @@ -327,13 +320,11 @@ func (m *TxDb) Tx() ethdb.Tx { } func (m *TxDb) BucketExists(name string) (bool, error) { - exists := false migrator, ok := m.tx.(ethdb.BucketMigrator) if !ok { return false, fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) } - exists = migrator.ExistsBucket(name) - return exists, nil + return migrator.ExistsBucket(name) } func (m *TxDb) ClearBuckets(buckets ...string) error { diff --git a/ethdb/kv_interface.go b/ethdb/kv_interface.go index 28800cc12ee2b0051a22396792307e895e0c7803..9d6dd99cfa91ec0012496d3814e0cd8d8504787a 100644 --- a/ethdb/kv_interface.go +++ b/ethdb/kv_interface.go @@ -241,7 +241,7 @@ type RwTx interface { type BucketMigrator interface { DropBucket(string) error CreateBucket(string) error - ExistsBucket(string) bool + ExistsBucket(string) (bool, error) ClearBucket(string) error ListBuckets() ([]string, error) } diff --git a/migrations/call_trace_index.go b/migrations/call_trace_index.go index 0fff9faf85bfde2d7a6bf16e7615bd03b9981699..2faf07ec2a4013a9d707dcd7e952e1c6206be2f3 100644 --- a/migrations/call_trace_index.go +++ b/migrations/call_trace_index.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" @@ -15,9 +14,14 @@ import ( var rebuilCallTraceIndex = Migration{ Name: "rebuild_call_trace_index", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + // Find the lowest key in the TraceCallSet table - tx := db.(ethdb.HasTx).Tx().(ethdb.RwTx) c, err := tx.CursorDupSort(dbutils.CallTraceSet) if err != nil { return err @@ -30,12 +34,12 @@ var rebuilCallTraceIndex = Migration{ } if k == nil { log.Warn("Nothing to rebuild, CallTraceSet table is empty") - return CommitProgress(db, nil, true) + return BeforeCommit(tx, nil, true) } blockNum := binary.BigEndian.Uint64(k) if blockNum == 0 { log.Warn("Nothing to rebuild, CallTraceSet's first record", "number", blockNum) - return CommitProgress(db, nil, true) + return BeforeCommit(tx, nil, true) } logPrefix := "db migration rebuild_call_trace_index" @@ -48,9 +52,12 @@ var rebuilCallTraceIndex = Migration{ } log.Info("First record in CallTraceTable", "number", blockNum) - if err = stages.SaveStageProgress(db, stages.CallTraces, blockNum-1); err != nil { + if err = stages.SaveStageProgress(tx, stages.CallTraces, blockNum-1); err != nil { + return err + } + if err = BeforeCommit(tx, nil, true); err != nil { return err } - return CommitProgress(db, nil, true) + return tx.Commit() }, } diff --git a/migrations/db_schema_version.go b/migrations/db_schema_version.go index 822cd4704b056758f2895125792c4ffd2dadf981..a5d07aa2b634eeee6da6c67142b3223defc0e3ac 100644 --- a/migrations/db_schema_version.go +++ b/migrations/db_schema_version.go @@ -1,14 +1,24 @@ package migrations import ( - "github.com/ledgerwatch/erigon/common/etl" + "context" + "github.com/ledgerwatch/erigon/ethdb" ) var dbSchemaVersion = Migration{ Name: "db_schema_version", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + // This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/migrations/fix_sequences.go b/migrations/fix_sequences.go index 2d52aa5959f9ebfd0bde55779d9e9b235959a73b..2bb679af856438e18f03028c73c79ef3d4664323 100644 --- a/migrations/fix_sequences.go +++ b/migrations/fix_sequences.go @@ -1,8 +1,9 @@ package migrations import ( + "context" + "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/ethdb" ) @@ -12,21 +13,30 @@ var oldSequences = map[string]string{ var fixSequences = Migration{ Name: "fix_sequences", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + for bkt, oldbkt := range oldSequences { - seq, getErr := db.GetOne(dbutils.Sequence, []byte(oldbkt)) + seq, getErr := tx.GetOne(dbutils.Sequence, []byte(oldbkt)) if getErr != nil { return getErr } if seq != nil { - putErr := db.Put(dbutils.Sequence, []byte(bkt), seq) + putErr := tx.Put(dbutils.Sequence, []byte(bkt), seq) if putErr != nil { return putErr } } } - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/migrations/header_prefix.go b/migrations/header_prefix.go index ee3595d3f1beef6db8b012a5bbbb91193014445d..ae9e47fe525df1dee7c74af16a71d1af7eedf654 100644 --- a/migrations/header_prefix.go +++ b/migrations/header_prefix.go @@ -2,6 +2,7 @@ package migrations import ( "bytes" + "context" "fmt" "github.com/ledgerwatch/erigon/common" @@ -12,16 +13,28 @@ import ( var headerPrefixToSeparateBuckets = Migration{ Name: "header_prefix_to_separate_buckets", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { - exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.HeaderPrefixOld) + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + exists, err := tx.ExistsBucket(dbutils.HeaderPrefixOld) if err != nil { return err } if !exists { - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() } - if err = db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.HeaderCanonicalBucket, dbutils.HeaderCanonicalBucket, dbutils.HeaderTDBucket); err != nil { + if err = tx.ClearBucket(dbutils.HeaderCanonicalBucket); err != nil { + return err + } + if err = tx.ClearBucket(dbutils.HeaderTDBucket); err != nil { return err } logPrefix := "split_header_prefix_bucket" @@ -91,7 +104,7 @@ var headerPrefixToSeparateBuckets = Migration{ headersCollector.Close(logPrefix) }() - err = db.ForEach(dbutils.HeaderPrefixOld, []byte{}, func(k, v []byte) error { + err = tx.ForEach(dbutils.HeaderPrefixOld, []byte{}, func(k, v []byte) error { var innerErr error switch { case IsHeaderKey(k): @@ -108,22 +121,25 @@ var headerPrefixToSeparateBuckets = Migration{ } return nil }) - if err = db.(ethdb.BucketsMigrator).DropBuckets(dbutils.HeaderPrefixOld); err != nil { + if err = tx.DropBucket(dbutils.HeaderPrefixOld); err != nil { return err } LoadStep: // Now transaction would have been re-opened, and we should be re-using the space - if err = canonicalCollector.Load(logPrefix, db.(ethdb.HasTx).Tx().(ethdb.RwTx), dbutils.HeaderCanonicalBucket, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { + if err = canonicalCollector.Load(logPrefix, tx, dbutils.HeaderCanonicalBucket, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { return fmt.Errorf("loading the transformed data back into the storage table: %w", err) } - if err = tdCollector.Load(logPrefix, db.(ethdb.HasTx).Tx().(ethdb.RwTx), dbutils.HeaderTDBucket, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { + if err = tdCollector.Load(logPrefix, tx, dbutils.HeaderTDBucket, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { return fmt.Errorf("loading the transformed data back into the acc table: %w", err) } - if err = headersCollector.Load(logPrefix, db.(ethdb.HasTx).Tx().(ethdb.RwTx), dbutils.HeadersBucket, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { + if err = headersCollector.Load(logPrefix, tx, dbutils.HeadersBucket, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { return fmt.Errorf("loading the transformed data back into the acc table: %w", err) } - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/migrations/header_prefix_test.go b/migrations/header_prefix_test.go index 4b3c962d592ce6599aa928a8008d95e4f3ca9aee..70020175e10db6f5daed1fdc703991e33b44f1a7 100644 --- a/migrations/header_prefix_test.go +++ b/migrations/header_prefix_test.go @@ -20,22 +20,18 @@ func TestHeaderPrefix(t *testing.T) { db := kv.NewTestKV(t) err := db.Update(context.Background(), func(tx ethdb.RwTx) error { - err := tx.(ethdb.BucketMigrator).CreateBucket(dbutils.HeaderPrefixOld) - if err != nil { - return err - } - c, err := tx.RwCursor(dbutils.HeaderPrefixOld) + err := tx.CreateBucket(dbutils.HeaderPrefixOld) if err != nil { return err } for i := uint64(0); i < 10; i++ { //header - err = c.Put(dbutils.HeaderKey(i, common.Hash{uint8(i)}), []byte("header "+strconv.Itoa(int(i)))) + err = tx.Put(dbutils.HeaderPrefixOld, dbutils.HeaderKey(i, common.Hash{uint8(i)}), []byte("header "+strconv.Itoa(int(i)))) require.NoError(err) //canonical - err = c.Put(HeaderHashKey(i), common.Hash{uint8(i)}.Bytes()) + err = tx.Put(dbutils.HeaderPrefixOld, HeaderHashKey(i), common.Hash{uint8(i)}.Bytes()) require.NoError(err) - err = c.Put(append(dbutils.HeaderKey(i, common.Hash{uint8(i)}), HeaderTDSuffix...), []byte{uint8(i)}) + err = tx.Put(dbutils.HeaderPrefixOld, append(dbutils.HeaderKey(i, common.Hash{uint8(i)}), HeaderTDSuffix...), []byte{uint8(i)}) require.NoError(err) } return nil diff --git a/migrations/migrations.go b/migrations/migrations.go index 2e149cfeff61b0feb29fead88bddae0c2f151336..3ab17d410f00e7472cc719b41caa1d539b6efc35 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -9,10 +9,8 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" - kv2 "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/log" "github.com/ugorji/go/codec" ) @@ -68,9 +66,10 @@ var migrations = map[ethdb.Label][]Migration{ ethdb.Sentry: {}, } +type Callback func(tx ethdb.RwTx, progress []byte, isDone bool) error type Migration struct { Name string - Up func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommitOnLoadCommit etl.LoadCommitHandler) error + Up func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) error } var ( @@ -146,13 +145,13 @@ func (m *Migrator) PendingMigrations(tx ethdb.Tx) ([]Migration, error) { return pending, nil } -func (m *Migrator) Apply(kv ethdb.RwKV, datadir string) error { +func (m *Migrator) Apply(db ethdb.RwKV, datadir string) error { if len(m.Migrations) == 0 { return nil } var applied map[string][]byte - if err := kv.View(context.Background(), func(tx ethdb.Tx) error { + if err := db.View(context.Background(), func(tx ethdb.Tx) error { var err error applied, err = AppliedMigrations(tx, false) return err @@ -160,13 +159,6 @@ func (m *Migrator) Apply(kv ethdb.RwKV, datadir string) error { return err } - db := kv2.NewObjectDatabase(kv) - tx, err1 := db.Begin(context.Background(), ethdb.RW) - if err1 != nil { - return err1 - } - defer tx.Rollback() - // migration names must be unique, protection against people's mistake uniqueNameCheck := map[string]bool{} for i := range m.Migrations { @@ -183,29 +175,27 @@ func (m *Migrator) Apply(kv ethdb.RwKV, datadir string) error { continue } - commitFuncCalled := false // commit function must be called if no error, protection against people's mistake + callbackCalled := false // commit function must be called if no error, protection against people's mistake log.Info("Apply migration", "name", v.Name) - progress, err := tx.GetOne(dbutils.Migrations, []byte("_progress_"+v.Name)) - if err != nil { + var progress []byte + if err := db.View(context.Background(), func(tx ethdb.Tx) (err error) { + progress, err = tx.GetOne(dbutils.Migrations, []byte("_progress_"+v.Name)) + return err + }); err != nil { return err } - if err = v.Up(tx, path.Join(datadir, "migrations", v.Name), progress, func(_ ethdb.Putter, key []byte, isDone bool) error { + if err := v.Up(db, path.Join(datadir, "migrations", v.Name), progress, func(tx ethdb.RwTx, key []byte, isDone bool) error { if !isDone { if key != nil { - err = tx.Put(dbutils.Migrations, []byte("_progress_"+v.Name), key) - if err != nil { + if err := tx.Put(dbutils.Migrations, []byte("_progress_"+v.Name), key); err != nil { return err } } - // do commit, but don't save partial progress - if err := tx.CommitAndBegin(context.Background()); err != nil { - return err - } return nil } - commitFuncCalled = true + callbackCalled = true stagesProgress, err := MarshalMigrationPayload(tx) if err != nil { @@ -221,15 +211,12 @@ func (m *Migrator) Apply(kv ethdb.RwKV, datadir string) error { return err } - if err := tx.CommitAndBegin(context.Background()); err != nil { - return err - } return nil }); err != nil { return err } - if !commitFuncCalled { + if !callbackCalled { return fmt.Errorf("%w: %s", ErrMigrationCommitNotCalled, v.Name) } log.Info("Applied migration", "name", v.Name) @@ -239,11 +226,13 @@ func (m *Migrator) Apply(kv ethdb.RwKV, datadir string) error { binary.BigEndian.PutUint32(version[:], dbutils.DBSchemaVersion.Major) binary.BigEndian.PutUint32(version[4:], dbutils.DBSchemaVersion.Minor) binary.BigEndian.PutUint32(version[8:], dbutils.DBSchemaVersion.Patch) - if err := tx.Put(dbutils.DatabaseInfoBucket, dbutils.DBSchemaVersionKey, version[:]); err != nil { - return fmt.Errorf("writing DB schema version: %w", err) - } - if err := tx.Commit(); err != nil { - return fmt.Errorf("committing DB version update: %w", err) + if err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + if err := tx.Put(dbutils.DatabaseInfoBucket, dbutils.DBSchemaVersionKey, version[:]); err != nil { + return fmt.Errorf("writing DB schema version: %w", err) + } + return nil + }); err != nil { + return err } log.Info("Updated DB schema to", "version", fmt.Sprintf("%d.%d.%d", dbutils.DBSchemaVersion.Major, dbutils.DBSchemaVersion.Minor, dbutils.DBSchemaVersion.Patch)) return nil diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index 191a2919f77821fe1bdc6c92fdf26c8d051434f9..20a2f9455097eabf0e3154584cfef7fc0bc36ff5 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -9,7 +9,6 @@ import ( "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/ethdb" "github.com/stretchr/testify/require" ) @@ -19,14 +18,32 @@ func TestApplyWithInit(t *testing.T) { m := []Migration{ { "one", - func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, }, { "two", - func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, }, } @@ -65,15 +82,24 @@ func TestApplyWithoutInit(t *testing.T) { m := []Migration{ { "one", - func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { + func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { t.Fatal("shouldn't been executed") return nil }, }, { "two", - func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, }, } @@ -120,13 +146,22 @@ func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { m := []Migration{ { "one", - func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, }, { "two", - func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { + func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { t.Fatal("shouldn't been executed") return nil }, @@ -192,14 +227,32 @@ func TestValidation(t *testing.T) { m := []Migration{ { Name: "repeated_name", - Up: func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, }, { Name: "repeated_name", - Up: func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return OnLoadCommit(db, nil, true) + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, }, } @@ -223,8 +276,9 @@ func TestCommitCallRequired(t *testing.T) { m := []Migration{ { Name: "one", - Up: func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommit etl.LoadCommitHandler) error { - return nil // don't call OnLoadCommit + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + //don't call BeforeCommit + return nil }, }, } diff --git a/migrations/prune.go b/migrations/prune.go index d12c1a40ed2884944817c082d974c255ab4cf3d7..920db5a66fe7201bef575b2286e651b39d3d3b14 100644 --- a/migrations/prune.go +++ b/migrations/prune.go @@ -1,8 +1,9 @@ package migrations import ( + "context" + "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/common/math" "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/prune" @@ -11,7 +12,12 @@ import ( var storageMode = Migration{ Name: "storage_mode", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() var ( // old db keys //StorageModeHistory - does node save history. StorageModeHistory = []byte("smHistory") @@ -30,7 +36,7 @@ var storageMode = Migration{ return math.MaxUint64 // means, prune disabled } { - v, err := db.GetOne(dbutils.DatabaseInfoBucket, StorageModeHistory) + v, err := tx.GetOne(dbutils.DatabaseInfoBucket, StorageModeHistory) if err != nil { return err } @@ -38,39 +44,42 @@ var storageMode = Migration{ } { - v, err := db.GetOne(dbutils.DatabaseInfoBucket, StorageModeReceipts) + v, err := tx.GetOne(dbutils.DatabaseInfoBucket, StorageModeReceipts) if err != nil { return err } pm.Receipts = castToPruneDistance(v) } { - v, err := db.GetOne(dbutils.DatabaseInfoBucket, StorageModeTxIndex) + v, err := tx.GetOne(dbutils.DatabaseInfoBucket, StorageModeTxIndex) if err != nil { return err } pm.TxIndex = castToPruneDistance(v) } { - v, err := db.GetOne(dbutils.DatabaseInfoBucket, StorageModeCallTraces) + v, err := tx.GetOne(dbutils.DatabaseInfoBucket, StorageModeCallTraces) if err != nil { return err } pm.CallTraces = castToPruneDistance(v) } { - v, err := db.GetOne(dbutils.DatabaseInfoBucket, dbutils.StorageModeTEVM) + v, err := tx.GetOne(dbutils.DatabaseInfoBucket, dbutils.StorageModeTEVM) if err != nil { return err } pm.Experiments.TEVM = len(v) == 1 && v[0] == 1 } - err = prune.SetIfNotExist(db, pm) + err = prune.SetIfNotExist(tx, pm) if err != nil { return err } - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/migrations/receipt_cbor.go b/migrations/receipt_cbor.go index b4aec90bae7a61e6dab2902067a2d22ccb5d140c..32e3628be71e80a1140fe8e7ae94dedd8588c00e 100644 --- a/migrations/receipt_cbor.go +++ b/migrations/receipt_cbor.go @@ -2,13 +2,16 @@ package migrations import ( "bytes" + "context" "encoding/binary" - "fmt" + "errors" + pkg2_big "math/big" + "runtime" + "strconv" "time" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" @@ -16,11 +19,6 @@ import ( "github.com/ledgerwatch/erigon/ethdb/cbor" "github.com/ledgerwatch/erigon/log" - "errors" - pkg2_big "math/big" - "runtime" - "strconv" - pkg1_common "github.com/ledgerwatch/erigon/common" codec1978 "github.com/ugorji/go/codec" ) @@ -38,20 +36,23 @@ type OldReceipts []*OldReceipt var ReceiptCbor = Migration{ Name: "receipt_cbor", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { - var tx ethdb.RwTx - if hasTx, ok := db.(ethdb.HasTx); ok { - tx = hasTx.Tx().(ethdb.RwTx) - } else { - return fmt.Errorf("no transaction") + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err } + defer tx.Rollback() + genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0) if err != nil { return err } if genesisBlock == nil { // Empty database check - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() } chainConfig, cerr := rawdb.ReadChainConfig(tx, genesisBlock.Hash()) if cerr != nil { @@ -112,7 +113,10 @@ var ReceiptCbor = Migration{ return err } } - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/migrations/receipt_repair.go b/migrations/receipt_repair.go index 36ca8a41460d7c05074b2e3104f4ccf4c2e9f8ff..259031c01782aa6d29a6ae4f7df5ff2cf45233ef 100644 --- a/migrations/receipt_repair.go +++ b/migrations/receipt_repair.go @@ -2,6 +2,7 @@ package migrations import ( "bytes" + "context" "encoding/binary" "fmt" "time" @@ -9,7 +10,6 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/consensus/ethash" "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core" @@ -41,13 +41,12 @@ func availableReceiptFrom(tx ethdb.Tx) (uint64, error) { var ReceiptRepair = Migration{ Name: "receipt_repair", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { - var tx ethdb.RwTx - if hasTx, ok := db.(ethdb.HasTx); ok { - tx = hasTx.Tx().(ethdb.RwTx) - } else { - return fmt.Errorf("no transaction") + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err } + defer tx.Rollback() blockNum, err := changeset.AvailableFrom(tx) if err != nil { @@ -141,7 +140,10 @@ var ReceiptRepair = Migration{ fixedCount++ } } - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/migrations/remove_clique.go b/migrations/remove_clique.go index 048a0ea4112334adf4566c903a91a64c1dccb2d2..9241b833abfb20f0f6dc07d68d7a3a41f9b62079 100644 --- a/migrations/remove_clique.go +++ b/migrations/remove_clique.go @@ -1,25 +1,37 @@ package migrations import ( + "context" + "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/ethdb" ) var removeCliqueBucket = Migration{ Name: "remove_clique_bucket", - Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) { + Up: func(db ethdb.RwKV, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() - if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.CliqueBucket); err != nil { + if exists, err := tx.ExistsBucket(dbutils.CliqueBucket); err != nil { return err } else if !exists { - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() } - if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.CliqueBucket); err != nil { + if err := tx.DropBucket(dbutils.CliqueBucket); err != nil { return err } - return CommitProgress(db, nil, true) + if err := BeforeCommit(tx, nil, true); err != nil { + return err + } + return tx.Commit() }, } diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 79f8c095d354636f07c9dbb82fe18b56049f134a..73ee78fa0d9d74dd51bb2c711edd0feb8114de8e 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -34,7 +34,6 @@ import ( "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/trie" @@ -177,8 +176,7 @@ func (t *StateTest) Run(rules params.Rules, tx ethdb.RwTx, subtest StateSubtest, } // RunNoVerify runs a specific subtest and returns the statedb and post-state root -func (t *StateTest) RunNoVerify(rules params.Rules, kvtx ethdb.RwTx, subtest StateSubtest, vmconfig vm.Config) (*state.IntraBlockState, common.Hash, error) { - tx := kv.WrapIntoTxDB(kvtx) +func (t *StateTest) RunNoVerify(rules params.Rules, tx ethdb.RwTx, subtest StateSubtest, vmconfig vm.Config) (*state.IntraBlockState, common.Hash, error) { config, eips, err := GetChainConfig(subtest.Fork) if err != nil { return nil, common.Hash{}, UnsupportedForkError{subtest.Fork} @@ -247,7 +245,7 @@ func (t *StateTest) RunNoVerify(rules params.Rules, kvtx ethdb.RwTx, subtest Sta return nil, common.Hash{}, err } // Generate hashed state - c, err := kvtx.RwCursor(dbutils.PlainStateBucket) + c, err := tx.RwCursor(dbutils.PlainStateBucket) if err != nil { return nil, common.Hash{}, err } @@ -286,7 +284,7 @@ func (t *StateTest) RunNoVerify(rules params.Rules, kvtx ethdb.RwTx, subtest Sta } c.Close() - root, err := trie.CalcRoot("", kvtx) + root, err := trie.CalcRoot("", tx) if err != nil { return nil, common.Hash{}, fmt.Errorf("error calculating state root: %v", err) } @@ -298,7 +296,7 @@ func (t *StateTest) gasLimit(subtest StateSubtest) uint64 { return t.json.Tx.GasLimit[t.json.Post[subtest.Fork][subtest.Index].Indexes.Gas] } -func MakePreState(rules params.Rules, db ethdb.Database, accounts core.GenesisAlloc, blockNr uint64) (*state.IntraBlockState, error) { +func MakePreState(rules params.Rules, db ethdb.RwTx, accounts core.GenesisAlloc, blockNr uint64) (*state.IntraBlockState, error) { r := state.NewPlainStateReader(db) statedb := state.New(r) for addr, a := range accounts { diff --git a/tests/vm_test_util.go b/tests/vm_test_util.go index 4b192c21d6ffaf4ff233eda9893f4574352bffef..a26a8a6b2babd5d5ac5ecdc8c2093b1f905ecfef 100644 --- a/tests/vm_test_util.go +++ b/tests/vm_test_util.go @@ -23,7 +23,6 @@ import ( "math/big" "github.com/holiman/uint256" - "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/turbo/trie" "github.com/ledgerwatch/erigon/common" @@ -82,7 +81,7 @@ type vmExecMarshaling struct { } func (t *VMTest) Run(tx ethdb.RwTx, vmconfig vm.Config, blockNr uint64) error { - state, err := MakePreState(params.MainnetChainConfig.Rules(blockNr), kv.WrapIntoTxDB(tx), t.json.Pre, blockNr) + state, err := MakePreState(params.MainnetChainConfig.Rules(blockNr), tx, t.json.Pre, blockNr) if err != nil { return fmt.Errorf("error in MakePreState: %v", err) } diff --git a/turbo/snapshotsync/downloader.go b/turbo/snapshotsync/downloader.go index 158d71437fa37edb7051c95e467ca2c9313b74b0..a817b2d926958b8fb924186c6639dcfd29259737 100644 --- a/turbo/snapshotsync/downloader.go +++ b/turbo/snapshotsync/downloader.go @@ -67,16 +67,16 @@ func (cli *Client) Torrents() []metainfo.Hash { } return hashes } -func (cli *Client) Load(db ethdb.Database) error { +func (cli *Client) Load(tx ethdb.Tx) error { log.Info("Load added torrents") - return db.ForEach(dbutils.SnapshotInfoBucket, []byte{}, func(k, infoHashBytes []byte) error { + return tx.ForEach(dbutils.SnapshotInfoBucket, []byte{}, func(k, infoHashBytes []byte) error { if !bytes.HasPrefix(k[8:], []byte(SnapshotInfoHashPrefix)) { return nil } networkID, snapshotName := ParseInfoHashKey(k) infoHash := metainfo.Hash{} copy(infoHash[:], infoHashBytes) - infoBytes, err := db.GetOne(dbutils.SnapshotInfoBucket, MakeInfoBytesKey(snapshotName, networkID)) + infoBytes, err := tx.GetOne(dbutils.SnapshotInfoBucket, MakeInfoBytesKey(snapshotName, networkID)) if err != nil { return err } @@ -117,7 +117,7 @@ func (cli *Client) AddTorrentSpec(snapshotName string, snapshotHash metainfo.Has return t, err } -func (cli *Client) AddTorrent(ctx context.Context, db ethdb.Database, snapshotType SnapshotType, networkID uint64) error { //nolint: interfacer +func (cli *Client) AddTorrent(ctx context.Context, db ethdb.RwTx, snapshotType SnapshotType, networkID uint64) error { //nolint: interfacer infoHashBytes, infoBytes, err := getTorrentSpec(db, snapshotType.String(), networkID) if err != nil { return err @@ -178,7 +178,7 @@ func (cli *Client) GetInfoBytes(ctx context.Context, snapshotHash metainfo.Hash) } } -func (cli *Client) AddSnapshotsTorrents(ctx context.Context, db ethdb.Database, networkId uint64, mode SnapshotMode) error { +func (cli *Client) AddSnapshotsTorrents(ctx context.Context, db ethdb.RwTx, networkId uint64, mode SnapshotMode) error { ctx, cancel := context.WithTimeout(ctx, time.Minute*10) defer cancel() eg := errgroup.Group{} @@ -256,11 +256,11 @@ func (cli *Client) Download() { } } -func (cli *Client) GetSnapshots(db ethdb.Database, networkID uint64) (map[SnapshotType]*SnapshotsInfo, error) { +func (cli *Client) GetSnapshots(tx ethdb.Tx, networkID uint64) (map[SnapshotType]*SnapshotsInfo, error) { mp := make(map[SnapshotType]*SnapshotsInfo) networkIDBytes := make([]byte, 8) binary.BigEndian.PutUint64(networkIDBytes, networkID) - err := db.ForPrefix(dbutils.SnapshotInfoBucket, append(networkIDBytes, []byte(SnapshotInfoHashPrefix)...), func(k, v []byte) error { + err := tx.ForPrefix(dbutils.SnapshotInfoBucket, append(networkIDBytes, []byte(SnapshotInfoHashPrefix)...), func(k, v []byte) error { var hash metainfo.Hash if len(v) != metainfo.HashSize { return nil @@ -331,7 +331,7 @@ func (cli *Client) StopSeeding(hash metainfo.Hash) error { return nil } -func getTorrentSpec(db ethdb.Database, snapshotName string, networkID uint64) ([]byte, []byte, error) { +func getTorrentSpec(db ethdb.Tx, snapshotName string, networkID uint64) ([]byte, []byte, error) { var infohash, infobytes []byte var err error b := make([]byte, 8) diff --git a/turbo/snapshotsync/postprocessing.go b/turbo/snapshotsync/postprocessing.go index b8796228d1c02b84c0d93f6b64926348a94f1f1f..9e1d0840e0f16a47afea9da4962c7d833b36c10a 100644 --- a/turbo/snapshotsync/postprocessing.go +++ b/turbo/snapshotsync/postprocessing.go @@ -15,7 +15,6 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/log" "github.com/ledgerwatch/erigon/rlp" ) @@ -41,23 +40,26 @@ var ( Snapshot11kkTD = []byte{138, 3, 199, 118, 5, 203, 95, 162, 81, 64, 161} ) -func PostProcessing(db ethdb.Database, downloadedSnapshots map[SnapshotType]*SnapshotsInfo) error { +func PostProcessing(db ethdb.RwKV, downloadedSnapshots map[SnapshotType]*SnapshotsInfo) error { if _, ok := downloadedSnapshots[SnapshotType_headers]; ok { - err := GenerateHeaderIndexes(context.Background(), db) - if err != nil { + if err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + return GenerateHeaderIndexes(context.Background(), tx) + }); err != nil { return err } } if _, ok := downloadedSnapshots[SnapshotType_state]; ok { - err := PostProcessState(db, downloadedSnapshots[SnapshotType_state]) - if err != nil { + if err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + return PostProcessState(tx, downloadedSnapshots[SnapshotType_state]) + }); err != nil { return err } } if _, ok := downloadedSnapshots[SnapshotType_bodies]; ok { - err := PostProcessBodies(db) - if err != nil { + if err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + return PostProcessBodies(tx) + }); err != nil { return err } } @@ -65,8 +67,8 @@ func PostProcessing(db ethdb.Database, downloadedSnapshots map[SnapshotType]*Sna return nil } -func PostProcessBodies(db ethdb.Database) error { - v, err := stages.GetStageProgress(db, stages.Bodies) +func PostProcessBodies(tx ethdb.RwTx) error { + v, err := stages.GetStageProgress(tx, stages.Bodies) if err != nil { return err } @@ -74,18 +76,16 @@ func PostProcessBodies(db ethdb.Database) error { if v > 0 { return nil } - err = db.(*kv.ObjectDatabase).ClearBuckets(dbutils.TxLookupPrefix) + err = tx.ClearBucket(dbutils.TxLookupPrefix) if err != nil { return err } - tx, err := db.Begin(context.Background(), ethdb.RW) + ethTxC, err := tx.Cursor(dbutils.EthTx) if err != nil { return err } - defer tx.Rollback() - - k, _, err := tx.Last(dbutils.EthTx) + k, _, err := ethTxC.Last() if err != nil { return err } @@ -99,7 +99,11 @@ func PostProcessBodies(db ethdb.Database) error { return err } - k, body, err := tx.Last(dbutils.BlockBodyPrefix) + bodyC, err := tx.Cursor(dbutils.BlockBodyPrefix) + if err != nil { + return err + } + k, body, err := bodyC.Last() if err != nil { return err } @@ -116,7 +120,7 @@ func PostProcessBodies(db ethdb.Database) error { return tx.Commit() } -func PostProcessState(db ethdb.GetterPutter, info *SnapshotsInfo) error { +func PostProcessState(db ethdb.RwTx, info *SnapshotsInfo) error { v, err := stages.GetStageProgress(db, stages.Execution) if err != nil { return err @@ -126,8 +130,10 @@ func PostProcessState(db ethdb.GetterPutter, info *SnapshotsInfo) error { return nil } // clear genesis state - err = db.(*kv.ObjectDatabase).ClearBuckets(dbutils.PlainStateBucket, dbutils.EthTx) - if err != nil { + if err = db.ClearBucket(dbutils.PlainStateBucket); err != nil { + return err + } + if err = db.ClearBucket(dbutils.EthTx); err != nil { return err } err = stages.SaveStageProgress(db, stages.Execution, info.SnapshotBlock) @@ -225,19 +231,25 @@ func PostProcessNoBlocksSync(db ethdb.Database, blockNum uint64, blockHash commo return tx.Commit() } -func generateHeaderHashToNumberIndex(ctx context.Context, tx ethdb.DbWithPendingMutations) error { +func generateHeaderHashToNumberIndex(ctx context.Context, tx ethdb.RwTx) error { + c, err := tx.Cursor(dbutils.HeadersBucket) + if err != nil { + return err + } log.Info("Generate headers hash to number index") - lastHeader, _, innerErr := tx.Last(dbutils.HeadersBucket) + lastHeader, _, innerErr := c.Last() if innerErr != nil { return innerErr } + c.Close() + headNumberBytes := lastHeader[:8] headHashBytes := lastHeader[8:] headNumber := big.NewInt(0).SetBytes(headNumberBytes).Uint64() headHash := common.BytesToHash(headHashBytes) - return etl.Transform("Torrent post-processing 1", tx.(ethdb.HasTx).Tx().(ethdb.RwTx), dbutils.HeadersBucket, dbutils.HeaderNumberBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { + return etl.Transform("Torrent post-processing 1", tx, dbutils.HeadersBucket, dbutils.HeaderNumberBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { return next(k, common.CopyBytes(k[8:]), common.CopyBytes(k[:8])) }, etl.IdentityLoadFunc, etl.TransformArgs{ Quit: ctx.Done(), @@ -245,7 +257,7 @@ func generateHeaderHashToNumberIndex(ctx context.Context, tx ethdb.DbWithPending }) } -func generateHeaderTDAndCanonicalIndexes(ctx context.Context, tx ethdb.DbWithPendingMutations) error { +func generateHeaderTDAndCanonicalIndexes(ctx context.Context, tx ethdb.RwTx) error { var hash common.Hash var number uint64 var err error @@ -254,7 +266,7 @@ func generateHeaderTDAndCanonicalIndexes(ctx context.Context, tx ethdb.DbWithPen td := h.Difficulty log.Info("Generate TD index & canonical") - err = etl.Transform("Torrent post-processing 2", tx.(ethdb.HasTx).Tx().(ethdb.RwTx), dbutils.HeadersBucket, dbutils.HeaderTDBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { + err = etl.Transform("Torrent post-processing 2", tx, dbutils.HeadersBucket, dbutils.HeaderTDBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { header := &types.Header{} innerErr := rlp.DecodeBytes(v, header) if innerErr != nil { @@ -276,7 +288,7 @@ func generateHeaderTDAndCanonicalIndexes(ctx context.Context, tx ethdb.DbWithPen return err } log.Info("Generate TD index & canonical") - err = etl.Transform("Torrent post-processing 2", tx.(ethdb.HasTx).Tx().(ethdb.RwTx), dbutils.HeadersBucket, dbutils.HeaderCanonicalBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { + err = etl.Transform("Torrent post-processing 2", tx, dbutils.HeadersBucket, dbutils.HeaderCanonicalBucket, os.TempDir(), func(k []byte, v []byte, next etl.ExtractNextFunc) error { return next(k, common.CopyBytes(k[:8]), common.CopyBytes(k[8:])) }, etl.IdentityLoadFunc, etl.TransformArgs{ Quit: ctx.Done(), @@ -299,30 +311,24 @@ func generateHeaderTDAndCanonicalIndexes(ctx context.Context, tx ethdb.DbWithPen return nil } -func GenerateHeaderIndexes(ctx context.Context, db ethdb.Database) error { - v, err1 := stages.GetStageProgress(db, HeadersPostProcessingStage) +func GenerateHeaderIndexes(ctx context.Context, tx ethdb.RwTx) error { + v, err1 := stages.GetStageProgress(tx, HeadersPostProcessingStage) if err1 != nil { return err1 } if v == 0 { - tx, err := db.Begin(context.Background(), ethdb.RW) - if err != nil { + if err := generateHeaderHashToNumberIndex(ctx, tx); err != nil { return err } - defer tx.Rollback() - if err = generateHeaderHashToNumberIndex(ctx, tx); err != nil { + if err := generateHeaderTDAndCanonicalIndexes(ctx, tx); err != nil { return err } - if err = generateHeaderTDAndCanonicalIndexes(ctx, tx); err != nil { - return err - } - err = stages.SaveStageProgress(tx, HeadersPostProcessingStage, 1) - if err != nil { + if err := stages.SaveStageProgress(tx, HeadersPostProcessingStage, 1); err != nil { return err1 } - return tx.Commit() + return nil } return nil } diff --git a/turbo/snapshotsync/postprocessing_test.go b/turbo/snapshotsync/postprocessing_test.go index ec47a1afcfca0a6e38d0ec05348a44d469904294..f0b89c99380f31b5cc318f8c2fca54326a9112e4 100644 --- a/turbo/snapshotsync/postprocessing_test.go +++ b/turbo/snapshotsync/postprocessing_test.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/rlp" + "github.com/stretchr/testify/require" "github.com/torquem-ch/mdbx-go/mdbx" ) @@ -41,8 +42,10 @@ func TestHeadersGenerateIndex(t *testing.T) { db := kv.NewMDBX().InMem().WithBucketsConfig(kv.DefaultBucketConfigs).MustOpen() defer db.Close() //we need genesis - err = rawdb.WriteCanonicalHash(kv.NewObjectDatabase(db), headers[0].Hash(), headers[0].Number.Uint64()) - if err != nil { + if err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + return rawdb.WriteCanonicalHash(tx, headers[0].Hash(), headers[0].Number.Uint64()) + + }); err != nil { t.Fatal(err) } @@ -50,16 +53,18 @@ func TestHeadersGenerateIndex(t *testing.T) { defer snKV.Close() snKV = kv.NewSnapshotKV().HeadersSnapshot(snKV).DB(db).Open() - snDb := kv.NewObjectDatabase(snKV) - err = GenerateHeaderIndexes(context.Background(), snDb) + snTx, err := snKV.BeginRw(context.Background()) + require.NoError(t, err) + defer snTx.Rollback() + + err = GenerateHeaderIndexes(context.Background(), snTx) if err != nil { t.Fatal(err) } - snDB := kv.NewObjectDatabase(snKV) td := big.NewInt(0) for i, header := range headers { td = td.Add(td, header.Difficulty) - canonical, err1 := rawdb.ReadCanonicalHash(snDB, header.Number.Uint64()) + canonical, err1 := rawdb.ReadCanonicalHash(snTx, header.Number.Uint64()) if err1 != nil { t.Errorf("reading canonical hash for block %d: %v", header.Number.Uint64(), err1) } @@ -67,11 +72,11 @@ func TestHeadersGenerateIndex(t *testing.T) { t.Error(i, "canonical not correct", canonical) } - hasHeader := rawdb.HasHeader(snDB, header.Hash(), header.Number.Uint64()) + hasHeader := rawdb.HasHeader(snTx, header.Hash(), header.Number.Uint64()) if !hasHeader { t.Error(i, header.Hash(), header.Number.Uint64(), "not exists") } - headerNumber := rawdb.ReadHeaderNumber(snDB, header.Hash()) + headerNumber := rawdb.ReadHeaderNumber(snTx, header.Hash()) if headerNumber == nil { t.Error(i, "empty header number") } else if *headerNumber != header.Number.Uint64() { @@ -80,7 +85,7 @@ func TestHeadersGenerateIndex(t *testing.T) { if td == nil { t.Error(i, "empty td") } else { - td, err := rawdb.ReadTd(snDB, header.Hash(), header.Number.Uint64()) + td, err := rawdb.ReadTd(snTx, header.Hash(), header.Number.Uint64()) if err != nil { panic(err) } diff --git a/turbo/snapshotsync/server.go b/turbo/snapshotsync/server.go index 99e9d1d27a837f02de5d9579e053094e78e48de3..446d477c98e8f833bcf0db0ce103fd6e1dacb7cb 100644 --- a/turbo/snapshotsync/server.go +++ b/turbo/snapshotsync/server.go @@ -23,7 +23,7 @@ var ( func NewServer(dir string, seeding bool) (*SNDownloaderServer, error) { db := kv.MustOpen(dir + "/db") sn := &SNDownloaderServer{ - db: kv.NewObjectDatabase(db), + db: db, } if err := db.Update(context.Background(), func(tx ethdb.RwTx) error { peerID, err := tx.GetOne(dbutils.BittorrentInfoBucket, []byte(dbutils.BittorrentPeerID)) @@ -51,23 +51,31 @@ func NewServer(dir string, seeding bool) (*SNDownloaderServer, error) { type SNDownloaderServer struct { DownloaderServer t *Client - db ethdb.Database + db ethdb.RwKV } func (s *SNDownloaderServer) Download(ctx context.Context, request *DownloadSnapshotRequest) (*empty.Empty, error) { - err := s.t.AddSnapshotsTorrents(ctx, s.db, request.NetworkId, FromSnapshotTypes(request.Type)) - if err != nil { + if err := s.db.Update(ctx, func(tx ethdb.RwTx) error { + return s.t.AddSnapshotsTorrents(ctx, tx, request.NetworkId, FromSnapshotTypes(request.Type)) + }); err != nil { return nil, err } return &empty.Empty{}, nil } func (s *SNDownloaderServer) Load() error { - return s.t.Load(s.db) + return s.db.View(context.Background(), func(tx ethdb.Tx) error { + return s.t.Load(tx) + }) } func (s *SNDownloaderServer) Snapshots(ctx context.Context, request *SnapshotsRequest) (*SnapshotsInfoReply, error) { reply := SnapshotsInfoReply{} - resp, err := s.t.GetSnapshots(s.db, request.NetworkId) + tx, err := s.db.BeginRo(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + resp, err := s.t.GetSnapshots(tx, request.NetworkId) if err != nil { return nil, err } diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index 503bfd1f44182c5c3983442c52f7279b3cf380b0..57a23f8b039b93b572f01e0d9d0416d685940666 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -171,45 +171,45 @@ func DownloadSnapshots(torrentClient *Client, ExternalSnapshotDownloaderAddr str } chainDb.(ethdb.HasRwKV).SetRwKV(snapshotKV) - innerErr = PostProcessing(chainDb, downloadedSnapshots) - if innerErr != nil { - return innerErr - } - } else { - err := torrentClient.Load(chainDb) - if err != nil { + if err := PostProcessing(chainDb.RwKV(), downloadedSnapshots); err != nil { return err } - err = torrentClient.AddSnapshotsTorrents(context.Background(), chainDb, networkID, snapshotMode) - if err == nil { + + } else { + if err := chainDb.RwKV().Update(context.Background(), func(tx ethdb.RwTx) error { + err := torrentClient.Load(tx) + if err != nil { + return err + } + return torrentClient.AddSnapshotsTorrents(context.Background(), tx, networkID, snapshotMode) + }); err != nil { + log.Error("There was an error in snapshot init. Swithing to regular sync", "err", err) + } else { torrentClient.Download() var innerErr error - snapshotKV := chainDb.(ethdb.HasRwKV).RwKV() - downloadedSnapshots, innerErr := torrentClient.GetSnapshots(chainDb, networkID) - if innerErr != nil { - return innerErr + var downloadedSnapshots map[SnapshotType]*SnapshotsInfo + if err := chainDb.RwKV().View(context.Background(), func(tx ethdb.Tx) (err error) { + downloadedSnapshots, err = torrentClient.GetSnapshots(tx, networkID) + if err != nil { + return err + } + return nil + }); err != nil { + return err } + snapshotKV := chainDb.(ethdb.HasRwKV).RwKV() snapshotKV, innerErr = WrapBySnapshotsFromDownloader(snapshotKV, downloadedSnapshots) if innerErr != nil { return innerErr } chainDb.(ethdb.HasRwKV).SetRwKV(snapshotKV) - tx, err := chainDb.Begin(context.Background(), ethdb.RW) - if err != nil { - return err - } - defer tx.Rollback() - innerErr = PostProcessing(chainDb, downloadedSnapshots) - if err = tx.Commit(); err != nil { + if err := PostProcessing(snapshotKV, downloadedSnapshots); err != nil { + return err } - if innerErr != nil { - return innerErr - } - } else { - log.Error("There was an error in snapshot init. Swithing to regular sync", "err", err) } + } return nil } diff --git a/turbo/trie/sync_bloom.go b/turbo/trie/sync_bloom.go deleted file mode 100644 index bbce53a404a16de61bce0a7350a2f15f3966e916..0000000000000000000000000000000000000000 --- a/turbo/trie/sync_bloom.go +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package trie - -/* -import ( - "encoding/binary" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/log" - "github.com/ledgerwatch/erigon/metrics" - - bloomfilter "github.com/holiman/bloomfilter/v2" -) - -var ( - bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil) - bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil) - bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil) - bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil) - bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil) - bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil) -) - -// SyncBloom is a bloom filter used during fast sync to quickly decide if a trie -// node or contract code already exists on disk or not. It self populates from the -// provided disk database on creation in a background thread and will only start -// returning live results once that's finished. -type SyncBloom struct { - bloom *bloomfilter.Filter - inited uint32 - closer sync.Once - closed uint32 - pend sync.WaitGroup -} - -// NewSyncBloom creates a new bloom filter of the given size (in megabytes) and -// initializes it from the database. The bloom is hard coded to use 3 filters. -func NewSyncBloom(memory uint64, database ethdb.Database) *SyncBloom { - // Create the bloom filter to track known trie nodes - bloom, err := bloomfilter.New(memory*1024*1024*8, 4) - if err != nil { - panic(fmt.Sprintf("failed to create bloom: %v", err)) - } - log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024)) - - // Assemble the fast sync bloom and init it from previous sessions - b := &SyncBloom{ - bloom: bloom, - } - b.pend.Add(2) - go func() { - defer b.pend.Done() - b.init(database) - }() - go func() { - defer b.pend.Done() - b.meter() - }() - return b -} - -// init iterates over the database, pushing every trie hash into the bloom filter. -func (b *SyncBloom) init(database ethdb.Database) { - // Iterate over the database, but restart every now and again to avoid holding - // a persistent snapshot since fast sync can push a ton of data concurrently, - // bloating the disk. - // - // Note, this is fine, because everything inserted into leveldb by fast sync is - // also pushed into the bloom directly, so we're not missing anything when the - // iterator is swapped out for a new one. - var ( - start = time.Now() - //swap = time.Now() - ) - if atomic.LoadUint32(&b.closed) == 0 { - _ = database.Walk(dbutils.CurrentStateBucket, []byte{}, 0, func(key, val []byte) (bool, error) { - // If the database entry is a trie node, add it to the bloom - if len(key) == common.HashLength { - b.bloom.Add(syncBloomHasher(common.CopyBytes(key))) - bloomLoadMeter.Mark(1) - } - return true, nil - // FIXME: restore or remove in Erigon - // If enough time elapsed since the last iterator swap, restart - //if time.Since(swap) > 8*time.Second { - // key := common.CopyBytes(it.Key()) - // - // it.Release() - // it = database.NewIteratorWithStart(key) - // - // log.Info("Initializing fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start))) - // swap = time.Now() - //} - }) - } - - // Mark the bloom filter inited and return - log.Info("Initialized state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start))) - atomic.StoreUint32(&b.inited, 1) -} - -// meter periodically recalculates the false positive error rate of the bloom -// filter and reports it in a metric. -func (b *SyncBloom) meter() { - for { - // Report the current error ration. No floats, lame, scale it up. - bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000)) - - // Wait one second, but check termination more frequently - for i := 0; i < 10; i++ { - if atomic.LoadUint32(&b.closed) == 1 { - return - } - time.Sleep(100 * time.Millisecond) - } - } -} - -// Close terminates any background initializer still running and releases all the -// memory allocated for the bloom. -func (b *SyncBloom) Close() error { - b.closer.Do(func() { - // Ensure the initializer is stopped - atomic.StoreUint32(&b.closed, 1) - b.pend.Wait() - - // Wipe the bloom, but mark it "uninited" just in case someone attempts an access - log.Info("Deallocated state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability()) - - atomic.StoreUint32(&b.inited, 0) - b.bloom = nil - }) - return nil -} - -// Add inserts a new trie node hash into the bloom filter. -func (b *SyncBloom) Add(hash []byte) { - if atomic.LoadUint32(&b.closed) == 1 { - return - } - b.bloom.AddHash(binary.BigEndian.Uint64(hash)) - bloomAddMeter.Mark(1) -} - -// Contains tests if the bloom filter contains the given hash: -// - false: the bloom definitely does not contain hash -// - true: the bloom maybe contains hash -// -// While the bloom is being initialized, any query will return true. -func (b *SyncBloom) Contains(hash []byte) bool { - bloomTestMeter.Mark(1) - if atomic.LoadUint32(&b.inited) == 0 { - // We didn't load all the trie nodes from the previous run of Geth yet. As - // such, we can't say for sure if a hash is not present for anything. Until - // the init is done, we're faking "possible presence" for everything. - return true - } - // Bloom initialized, check the real one and report any successful misses - maybe := b.bloom.ContainsHash(binary.BigEndian.Uint64(hash)) - if !maybe { - bloomMissMeter.Mark(1) - } - return maybe -} - -// errorRate calculates the probability of a random containment test returning a -// false positive. -// -// We're calculating it ourselves because the bloom library we used missed a -// parentheses in the formula and calculates it wrong. And it's discontinued... -func (b *SyncBloom) errorRate() float64 { - k := float64(b.bloom.K()) - n := float64(b.bloom.N()) - m := float64(b.bloom.M()) - - return math.Pow(1.0-math.Exp((-k)*(n+0.5)/(m-1)), k) -} - -*/