diff --git a/cmd/erigon/main.go b/cmd/erigon/main.go index 93400c9f0d9f59fee50c5829c9eb24244d114a40..a348ad84eb199d4a6194e47bc74b8e89693a231c 100644 --- a/cmd/erigon/main.go +++ b/cmd/erigon/main.go @@ -1,12 +1,15 @@ package main import ( + "context" "fmt" - "net" "os" "github.com/ledgerwatch/erigon/cmd/utils" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/ethdb" "github.com/ledgerwatch/erigon/log" "github.com/ledgerwatch/erigon/params" erigoncli "github.com/ledgerwatch/erigon/turbo/cli" @@ -25,19 +28,30 @@ func main() { } func runErigon(cliCtx *cli.Context) { - // creating staged sync with all default parameters - ctx, _ := utils.RootContext() - // initializing the node and providing the current git commit there log.Info("Build info", "git_branch", params.GitBranch, "git_tag", params.GitTag, "git_commit", params.GitCommit) - eri := node.New(cliCtx, node.Params{}) - eri.SetP2PListenFunc(func(network, addr string) (net.Listener, error) { - var lc net.ListenConfig - return lc.Listen(ctx, network, addr) - }) - // running the node - err := eri.Serve() + nodeCfg := node.NewNodConfigUrfave(cliCtx) + ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg) + if cliCtx.GlobalIsSet(utils.DataDirFlag.Name) { + // Check if we have an already initialized chain and fall back to + // that if so. Otherwise we need to generate a new genesis spec. + chaindb := utils.MakeChainDatabase(nodeCfg) + if err := chaindb.View(context.Background(), func(tx ethdb.Tx) error { + h, err := rawdb.ReadCanonicalHash(tx, 0) + if err != nil { + panic(err) + } + if h != (common.Hash{}) { + ethCfg.Genesis = nil // fallback to db content + } + return nil + }); err != nil { + panic(err) + } + chaindb.Close() + } + err := node.New(nodeCfg, ethCfg).Serve() if err != nil { log.Error("error while serving a Erigon node", "err", err) } diff --git a/cmd/erigoncustom/main.go b/cmd/erigoncustom/main.go index 81f3378f955b93a02ccfd4d3d1db598275efd61e..f3d7f9db9241e99c623bb63dead8071e6e5a948e 100644 --- a/cmd/erigoncustom/main.go +++ b/cmd/erigoncustom/main.go @@ -4,10 +4,6 @@ import ( "fmt" "os" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/log" - "github.com/ledgerwatch/erigon/turbo/node" - erigoncli "github.com/ledgerwatch/erigon/turbo/cli" "github.com/urfave/cli" @@ -21,7 +17,7 @@ var flag = cli.StringFlag{ // defining a custom bucket name const ( - customBucketName = "ch.torquem.demo.tgcustom.CUSTOM_BUCKET" + customBucketName = "ch.torquem.demo.tgcustom.CUSTOM_BUCKET" //nolint ) // the regular main function @@ -39,15 +35,15 @@ func main() { // Erigon main function func runErigon(ctx *cli.Context) { // running a node and initializing a custom bucket with all default settings - eri := node.New(ctx, node.Params{ - CustomBuckets: map[string]dbutils.BucketConfigItem{ - customBucketName: {}, - }, - }) + //eri := node.New(ctx, node.Params{ + // CustomBuckets: map[string]dbutils.BucketConfigItem{ + // customBucketName: {}, + // }, + //}) - err := eri.Serve() + //err := eri.Serve() - if err != nil { - log.Error("error while serving a Erigon node", "err", err) - } + //if err != nil { + // log.Error("error while serving a Erigon node", "err", err) + //} } diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go index 6d78a063357bf90fc19a528a3017c69f963348db..8f1a2b6f64e6bbc0e83b03292afbacbf81618e5f 100644 --- a/cmd/integration/commands/root.go +++ b/cmd/integration/commands/root.go @@ -38,24 +38,24 @@ func RootCommand() *cobra.Command { func openDB(path string, applyMigrations bool) ethdb.RwKV { label := ethdb.Chain - db := kv2.NewObjectDatabase(openKV(label, path, false)) + db := openKV(label, path, false) if applyMigrations { - has, err := migrations.NewMigrator(label).HasPendingMigrations(db.RwKV()) + has, err := migrations.NewMigrator(label).HasPendingMigrations(db) if err != nil { panic(err) } if has { log.Info("Re-Opening DB in exclusive mode to apply DB migrations") db.Close() - db = kv2.NewObjectDatabase(openKV(label, path, true)) + db = openKV(label, path, true) if err := migrations.NewMigrator(label).Apply(db, datadir); err != nil { panic(err) } db.Close() - db = kv2.NewObjectDatabase(openKV(label, path, false)) + db = openKV(label, path, false) } } - return db.RwKV() + return db } func openKV(label ethdb.Label, path string, exclusive bool) ethdb.RwKV { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c210a072ddf1bbd99b43cb73af2a5d080b62563f..f934d109ab93347efd1f6d5bf786b947d04bc1b5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -31,7 +31,6 @@ import ( "text/template" "github.com/ledgerwatch/erigon/eth/protocols/eth" - "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/urfave/cli" @@ -40,7 +39,6 @@ import ( "github.com/ledgerwatch/erigon/common/paths" "github.com/ledgerwatch/erigon/consensus/ethash" "github.com/ledgerwatch/erigon/core" - "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/gasprice" @@ -1185,18 +1183,18 @@ func CheckExclusive(ctx *cli.Context, args ...interface{}) { } // SetEthConfig applies eth-related command line flags to the config. -func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { +func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Config) { CheckExclusive(ctx, MinerSigningKeyFlag, MinerEtherbaseFlag) setEtherbase(ctx, cfg) setGPO(ctx, &cfg.GPO) setTxPool(ctx, &cfg.TxPool) - setEthash(ctx, stack.Config().DataDir, cfg) - setClique(ctx, &cfg.Clique, stack.Config().DataDir) - setAuRa(ctx, &cfg.Aura, stack.Config().DataDir) + setEthash(ctx, nodeConfig.DataDir, cfg) + setClique(ctx, &cfg.Clique, nodeConfig.DataDir) + setAuRa(ctx, &cfg.Aura, nodeConfig.DataDir) setMiner(ctx, &cfg.Miner) setWhitelist(ctx, cfg) - cfg.P2PEnabled = len(stack.Config().P2P.SentryAddr) == 0 + cfg.P2PEnabled = len(nodeConfig.P2P.SentryAddr) == 0 if ctx.GlobalIsSet(NetworkIdFlag.Name) { cfg.NetworkID = ctx.GlobalUint64(NetworkIdFlag.Name) @@ -1292,19 +1290,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { // Create a new developer genesis block or reuse existing one cfg.Genesis = core.DeveloperGenesisBlock(uint64(ctx.GlobalInt(DeveloperPeriodFlag.Name)), developer) - if ctx.GlobalIsSet(DataDirFlag.Name) { - // Check if we have an already initialized chain and fall back to - // that if so. Otherwise we need to generate a new genesis spec. - chaindb := MakeChainDatabase(ctx, stack) - h, err := rawdb.ReadCanonicalHash(chaindb, 0) - if err != nil { - panic(err) - } - if h != (common.Hash{}) { - cfg.Genesis = nil // fallback to db content - } - chaindb.Close() - } if !ctx.GlobalIsSet(MinerGasPriceFlag.Name) { cfg.Miner.GasPrice = big.NewInt(1) } @@ -1343,8 +1328,8 @@ func SplitTagsFlag(tagsFlag string) map[string]string { } // MakeChainDatabase open a database using the flags passed to the client and will hard crash if it fails. -func MakeChainDatabase(ctx *cli.Context, stack *node.Node) *kv.ObjectDatabase { - chainDb, err := stack.OpenDatabase(ethdb.Chain, stack.Config().DataDir) +func MakeChainDatabase(cfg *node.Config) ethdb.RwKV { + chainDb, err := node.OpenDatabase(cfg, ethdb.Chain) if err != nil { Fatalf("Could not open database: %v", err) } diff --git a/eth/backend.go b/eth/backend.go index a8611889c059af8cf7207994c12407033fe24b3e..b1fcac9069efa2952503cf33eddee88357f51b49 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -127,9 +127,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } // Assemble the Ethereum object - var chainDb ethdb.Database - var err error - chainDb, err = stack.OpenDatabase(ethdb.Chain, stack.Config().DataDir) + chainKv, err := node.OpenDatabase(stack.Config(), ethdb.Chain) if err != nil { return nil, err } @@ -137,33 +135,41 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { var torrentClient *snapshotsync.Client snapshotsDir := stack.Config().ResolvePath("snapshots") if config.SnapshotLayout { - v, err := chainDb.Get(dbutils.BittorrentInfoBucket, []byte(dbutils.BittorrentPeerID)) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + var peerID string + if err = chainKv.View(context.Background(), func(tx ethdb.Tx) error { + v, err := tx.GetOne(dbutils.BittorrentInfoBucket, []byte(dbutils.BittorrentPeerID)) + if err != nil { + return err + } + peerID = string(v) + return nil + }); err != nil { log.Error("Get bittorrent peer", "err", err) } - torrentClient, err = snapshotsync.New(snapshotsDir, config.SnapshotSeeding, string(v)) + torrentClient, err = snapshotsync.New(snapshotsDir, config.SnapshotSeeding, peerID) if err != nil { return nil, err } - if len(v) == 0 { + if len(peerID) == 0 { log.Info("Generate new bittorent peerID", "id", common.Bytes2Hex(torrentClient.PeerID())) - err = torrentClient.SavePeerID(chainDb) - if err != nil { + if err = chainKv.Update(context.Background(), func(tx ethdb.RwTx) error { + return torrentClient.SavePeerID(tx) + }); err != nil { log.Error("Bittorrent peerID haven't saved", "err", err) } } - err = snapshotsync.WrapSnapshots(chainDb, snapshotsDir) + chainKv, err = snapshotsync.WrapSnapshots(chainKv, snapshotsDir) if err != nil { return nil, err } - err = snapshotsync.SnapshotSeeding(chainDb, torrentClient, "headers", snapshotsDir) + err = snapshotsync.SnapshotSeeding(chainKv, torrentClient, "headers", snapshotsDir) if err != nil { return nil, err } } - chainConfig, genesis, genesisErr := core.CommitGenesisBlock(chainDb.RwKV(), config.Genesis, config.StorageMode.History) + chainConfig, genesis, genesisErr := core.CommitGenesisBlock(chainKv, config.Genesis, config.StorageMode.History) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr } @@ -171,7 +177,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { backend := &Ethereum{ config: config, - chainKV: chainDb.(ethdb.HasRwKV).RwKV(), + chainKV: chainKv, networkID: config.NetworkID, etherbase: config.Miner.Etherbase, torrentClient: torrentClient, @@ -198,7 +204,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Info("Initialising Ethereum protocol", "network", config.NetworkID) - if err := chainDb.RwKV().Update(context.Background(), func(tx ethdb.RwTx) error { + if err := chainKv.Update(context.Background(), func(tx ethdb.RwTx) error { if err := ethdb.SetStorageModeIfNotExist(tx, config.StorageMode); err != nil { return err } @@ -230,13 +236,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } - backend.txPool = core.NewTxPool(config.TxPool, chainConfig, chainDb.RwKV()) + backend.txPool = core.NewTxPool(config.TxPool, chainConfig, chainKv) // setting notifier to support streaming events to rpc daemon backend.events = remotedbserver.NewEvents() var mg *snapshotsync.SnapshotMigrator if config.SnapshotLayout { - currentSnapshotBlock, currentInfohash, err := snapshotsync.GetSnapshotInfo(chainDb.RwKV()) + currentSnapshotBlock, currentInfohash, err := snapshotsync.GetSnapshotInfo(chainKv) if err != nil { return nil, err } @@ -372,7 +378,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { backend.sentryServers = append(backend.sentryServers, server65) backend.sentries = append(backend.sentries, remote.NewSentryClientDirect(eth.ETH65, server65)) } - backend.downloadServer, err = download.NewControlServer(chainDb.RwKV(), stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, backend.sentries, config.BlockDownloaderWindow) + backend.downloadServer, err = download.NewControlServer(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, backend.sentries, config.BlockDownloaderWindow) if err != nil { return nil, err } diff --git a/migrations/header_prefix_test.go b/migrations/header_prefix_test.go index 8969a7f5d78fd5b6c4a8874b7c120e66d28fed5e..4b3c962d592ce6599aa928a8008d95e4f3ca9aee 100644 --- a/migrations/header_prefix_test.go +++ b/migrations/header_prefix_test.go @@ -17,9 +17,9 @@ import ( func TestHeaderPrefix(t *testing.T) { require := require.New(t) - db := kv.NewTestDB(t) + db := kv.NewTestKV(t) - err := db.RwKV().Update(context.Background(), func(tx ethdb.RwTx) error { + err := db.Update(context.Background(), func(tx ethdb.RwTx) error { err := tx.(ethdb.BucketMigrator).CreateBucket(dbutils.HeaderPrefixOld) if err != nil { return err @@ -48,31 +48,37 @@ func TestHeaderPrefix(t *testing.T) { require.NoError(err) num := 0 - err = db.Walk(dbutils.HeaderCanonicalBucket, []byte{}, 0, func(k, v []byte) (bool, error) { - require.Len(k, 8) - bytes.Equal(v, common.Hash{uint8(binary.BigEndian.Uint64(k))}.Bytes()) - num++ - return true, nil + err = db.View(context.Background(), func(tx ethdb.Tx) error { + return tx.ForEach(dbutils.HeaderCanonicalBucket, []byte{}, func(k, v []byte) error { + require.Len(k, 8) + bytes.Equal(v, common.Hash{uint8(binary.BigEndian.Uint64(k))}.Bytes()) + num++ + return nil + }) }) require.NoError(err) require.Equal(num, 10) num = 0 - err = db.Walk(dbutils.HeaderTDBucket, []byte{}, 0, func(k, v []byte) (bool, error) { - require.Len(k, 40) - bytes.Equal(v, []byte{uint8(binary.BigEndian.Uint64(k))}) - num++ - return true, nil + err = db.View(context.Background(), func(tx ethdb.Tx) error { + return tx.ForEach(dbutils.HeaderTDBucket, []byte{}, func(k, v []byte) error { + require.Len(k, 40) + bytes.Equal(v, []byte{uint8(binary.BigEndian.Uint64(k))}) + num++ + return nil + }) }) require.NoError(err) require.Equal(num, 10) num = 0 - err = db.Walk(dbutils.HeadersBucket, []byte{}, 0, func(k, v []byte) (bool, error) { - require.Len(k, 40) - bytes.Equal(v, []byte("header "+strconv.Itoa(int(binary.BigEndian.Uint64(k))))) - num++ - return true, nil + err = db.View(context.Background(), func(tx ethdb.Tx) error { + return tx.ForEach(dbutils.HeadersBucket, []byte{}, func(k, v []byte) error { + require.Len(k, 40) + bytes.Equal(v, []byte("header "+strconv.Itoa(int(binary.BigEndian.Uint64(k))))) + num++ + return nil + }) }) require.NoError(err) require.Equal(num, 10) diff --git a/migrations/migrations.go b/migrations/migrations.go index 220c07eab03e363a25ec29669cdb1026281e87a4..bdcc50ce8ced6b74a17eb7d5f245652524fc757f 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/common/etl" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" + kv2 "github.com/ledgerwatch/erigon/ethdb/kv" "github.com/ledgerwatch/erigon/log" "github.com/ugorji/go/codec" ) @@ -145,13 +146,13 @@ func (m *Migrator) PendingMigrations(tx ethdb.Tx) ([]Migration, error) { return pending, nil } -func (m *Migrator) Apply(db ethdb.Database, datadir string) error { +func (m *Migrator) Apply(kv ethdb.RwKV, datadir string) error { if len(m.Migrations) == 0 { return nil } var applied map[string][]byte - if err := db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + if err := kv.View(context.Background(), func(tx ethdb.Tx) error { var err error applied, err = AppliedMigrations(tx, false) return err @@ -159,6 +160,7 @@ func (m *Migrator) Apply(db ethdb.Database, datadir string) error { return err } + db := kv2.NewObjectDatabase(kv) tx, err1 := db.Begin(context.Background(), ethdb.RW) if err1 != nil { return err1 diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index 8c39813587d3041ef14942a6ce8121f072572907..c31027b6bee1a75ef10549eb32e32b4841a6060e 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -15,7 +15,7 @@ import ( ) func TestApplyWithInit(t *testing.T) { - require, db := require.New(t), kv.NewTestDB(t) + require, db := require.New(t), kv.NewTestKV(t) m := []Migration{ { "one", @@ -36,7 +36,7 @@ func TestApplyWithInit(t *testing.T) { err := migrator.Apply(db, "") require.NoError(err) var applied map[string][]byte - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied, err = AppliedMigrations(tx, false) require.NoError(err) @@ -51,7 +51,7 @@ func TestApplyWithInit(t *testing.T) { // apply again err = migrator.Apply(db, "") require.NoError(err) - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied2, err := AppliedMigrations(tx, false) require.NoError(err) require.Equal(applied, applied2) @@ -61,7 +61,7 @@ func TestApplyWithInit(t *testing.T) { } func TestApplyWithoutInit(t *testing.T) { - require, db := require.New(t), kv.NewTestDB(t) + require, db := require.New(t), kv.NewTestKV(t) m := []Migration{ { "one", @@ -77,7 +77,9 @@ func TestApplyWithoutInit(t *testing.T) { }, }, } - err := db.Put(dbutils.Migrations, []byte(m[0].Name), []byte{1}) + err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + return tx.Put(dbutils.Migrations, []byte(m[0].Name), []byte{1}) + }) require.NoError(err) migrator := NewMigrator(ethdb.Chain) @@ -86,7 +88,7 @@ func TestApplyWithoutInit(t *testing.T) { require.NoError(err) var applied map[string][]byte - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied, err = AppliedMigrations(tx, false) require.NoError(err) @@ -103,7 +105,7 @@ func TestApplyWithoutInit(t *testing.T) { err = migrator.Apply(db, "") require.NoError(err) - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied2, err := AppliedMigrations(tx, false) require.NoError(err) require.Equal(applied, applied2) @@ -114,7 +116,7 @@ func TestApplyWithoutInit(t *testing.T) { } func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { - require, db := require.New(t), kv.NewTestDB(t) + require, db := require.New(t), kv.NewTestKV(t) m := []Migration{ { "one", @@ -130,7 +132,9 @@ func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { }, }, } - err := db.Put(dbutils.Migrations, []byte(m[1].Name), []byte{1}) // apply non-first migration + err := db.Update(context.Background(), func(tx ethdb.RwTx) error { + return tx.Put(dbutils.Migrations, []byte(m[1].Name), []byte{1}) // apply non-first migration + }) require.NoError(err) migrator := NewMigrator(ethdb.Chain) @@ -139,7 +143,7 @@ func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { require.NoError(err) var applied map[string][]byte - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied, err = AppliedMigrations(tx, false) require.NoError(err) @@ -155,7 +159,7 @@ func TestWhenNonFirstMigrationAlreadyApplied(t *testing.T) { // apply again err = migrator.Apply(db, "") require.NoError(err) - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied2, err := AppliedMigrations(tx, false) require.NoError(err) require.Equal(applied, applied2) @@ -183,7 +187,7 @@ func TestMarshalStages(t *testing.T) { } func TestValidation(t *testing.T) { - require, db := require.New(t), kv.NewTestDB(t) + require, db := require.New(t), kv.NewTestKV(t) m := []Migration{ { Name: "repeated_name", @@ -204,7 +208,7 @@ func TestValidation(t *testing.T) { require.True(errors.Is(err, ErrMigrationNonUniqueName)) var applied map[string][]byte - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied, err = AppliedMigrations(tx, false) require.NoError(err) require.Equal(0, len(applied)) @@ -214,7 +218,7 @@ func TestValidation(t *testing.T) { } func TestCommitCallRequired(t *testing.T) { - require, db := require.New(t), kv.NewTestDB(t) + require, db := require.New(t), kv.NewTestKV(t) m := []Migration{ { Name: "one", @@ -229,7 +233,7 @@ func TestCommitCallRequired(t *testing.T) { require.True(errors.Is(err, ErrMigrationCommitNotCalled)) var applied map[string][]byte - err = db.RwKV().View(context.Background(), func(tx ethdb.Tx) error { + err = db.View(context.Background(), func(tx ethdb.Tx) error { applied, err = AppliedMigrations(tx, false) require.NoError(err) require.Equal(0, len(applied)) diff --git a/node/node.go b/node/node.go index 0f479ac69005eaddf3745ab10c5506e1789911a7..333364ec50c66afab62b5d7533081d2b58e11d01 100644 --- a/node/node.go +++ b/node/node.go @@ -506,7 +506,7 @@ func (n *Node) WSEndpoint() string { // OpenDatabase opens an existing database with the given name (or creates one if no // previous can be found) from within the node's instance directory. If the node is // ephemeral, a memory database is returned. -func (n *Node) OpenDatabase(label ethdb.Label, datadir string) (*kv2.ObjectDatabase, error) { +func (n *Node) OpenDatabase(label ethdb.Label, datadir string) (ethdb.RwKV, error) { n.lock.Lock() defer n.lock.Unlock() @@ -523,17 +523,17 @@ func (n *Node) OpenDatabase(label ethdb.Label, datadir string) (*kv2.ObjectDatab default: name = "test" } - var db *kv2.ObjectDatabase + var db ethdb.RwKV if n.config.DataDir == "" { - db = kv2.NewMemDatabase() + db = kv2.NewMemKV() n.databases = append(n.databases, db) return db, nil } dbPath := n.config.ResolvePath(name) - var openFunc func(exclusive bool) (*kv2.ObjectDatabase, error) + var openFunc func(exclusive bool) (ethdb.RwKV, error) log.Info("Opening Database", "label", name) - openFunc = func(exclusive bool) (*kv2.ObjectDatabase, error) { + openFunc = func(exclusive bool) (ethdb.RwKV, error) { opts := kv2.NewMDBX().Path(dbPath).Label(label).DBVerbosity(n.config.DatabaseVerbosity) if exclusive { opts = opts.Exclusive() @@ -542,7 +542,7 @@ func (n *Node) OpenDatabase(label ethdb.Label, datadir string) (*kv2.ObjectDatab if err1 != nil { return nil, err1 } - return kv2.NewObjectDatabase(kv), nil + return kv, nil } var err error db, err = openFunc(false) @@ -550,7 +550,7 @@ func (n *Node) OpenDatabase(label ethdb.Label, datadir string) (*kv2.ObjectDatab return nil, err } migrator := migrations.NewMigrator(label) - has, err := migrator.HasPendingMigrations(db.RwKV()) + has, err := migrator.HasPendingMigrations(db) if err != nil { return nil, err } @@ -575,6 +575,66 @@ func (n *Node) OpenDatabase(label ethdb.Label, datadir string) (*kv2.ObjectDatab return db, nil } +func OpenDatabase(config *Config, label ethdb.Label) (ethdb.RwKV, error) { + var name string + switch label { + case ethdb.Chain: + name = "chaindata" + case ethdb.TxPool: + name = "txpool" + default: + name = "test" + } + var db ethdb.RwKV + if config.DataDir == "" { + db = kv2.NewMemKV() + return db, nil + } + dbPath := config.ResolvePath(name) + + var openFunc func(exclusive bool) (ethdb.RwKV, error) + log.Info("Opening Database", "label", name) + openFunc = func(exclusive bool) (ethdb.RwKV, error) { + opts := kv2.NewMDBX().Path(dbPath).Label(label).DBVerbosity(config.DatabaseVerbosity) + if exclusive { + opts = opts.Exclusive() + } + kv, err1 := opts.Open() + if err1 != nil { + return nil, err1 + } + return kv, nil + } + var err error + db, err = openFunc(false) + if err != nil { + return nil, err + } + migrator := migrations.NewMigrator(label) + has, err := migrator.HasPendingMigrations(db) + if err != nil { + return nil, err + } + if has { + log.Info("Re-Opening DB in exclusive mode to apply migrations") + db.Close() + db, err = openFunc(true) + if err != nil { + return nil, err + } + if err = migrator.Apply(db, config.DataDir); err != nil { + return nil, err + } + db.Close() + db, err = openFunc(false) + if err != nil { + return nil, err + } + } + + return db, nil +} + // ResolvePath returns the absolute path of a resource in the instance directory. func (n *Node) ResolvePath(x string) string { return n.config.ResolvePath(x) diff --git a/node/node_test.go b/node/node_test.go index 6465b5cfece6761469ce24a1f6f17f68c0c71f43..caf4b4efcf4a20fc9c083bc582feadf004850c77 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -17,6 +17,7 @@ package node import ( + "context" "errors" "fmt" "io" @@ -174,12 +175,16 @@ func TestNodeCloseClosesDB(t *testing.T) { if err != nil { t.Fatal("can't open DB:", err) } - if err = db.Put(dbutils.HashedAccountsBucket, []byte("testK"), []byte{}); err != nil { + if err = db.Update(context.Background(), func(tx ethdb.RwTx) error { + return tx.Put(dbutils.HashedAccountsBucket, []byte("testK"), []byte{}) + }); err != nil { t.Fatal("can't Put on open DB:", err) } stack.Close() - if err = db.Put(dbutils.HashedAccountsBucket, []byte("testK"), []byte{}); err == nil { + if err = db.Update(context.Background(), func(tx ethdb.RwTx) error { + return tx.Put(dbutils.HashedAccountsBucket, []byte("testK"), []byte{}) + }); err == nil { t.Fatal("Put succeeded after node is closed") } } @@ -193,7 +198,7 @@ func TestNodeOpenDatabaseFromLifecycleStart(t *testing.T) { stack, _ := New(testNodeConfig()) defer stack.Close() - var db ethdb.Database + var db ethdb.RwKV var err error stack.RegisterLifecycle(&InstrumentedService{ startHook: func() { diff --git a/turbo/node/buckets.go b/turbo/node/buckets.go index 95c9c5fe6bf22e88ded5d9c098f4276bd23263dd..48b42d8568a58ab1148921d7585a84a18c8ce345 100644 --- a/turbo/node/buckets.go +++ b/turbo/node/buckets.go @@ -6,6 +6,7 @@ import ( "github.com/ledgerwatch/erigon/common/dbutils" ) +//nolint func prepareBuckets(customBuckets dbutils.BucketsCfg) { if len(customBuckets) == 0 { return diff --git a/turbo/node/node.go b/turbo/node/node.go index 33b04fe828fde9120067e5ef12a5ba805be24185..73e27bdc01bbb024eb4e0593be4422e833c831a9 100644 --- a/turbo/node/node.go +++ b/turbo/node/node.go @@ -2,15 +2,11 @@ package node import ( - "net" - "time" - "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/eth" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/log" - "github.com/ledgerwatch/erigon/metrics" "github.com/ledgerwatch/erigon/node" "github.com/ledgerwatch/erigon/params" erigoncli "github.com/ledgerwatch/erigon/turbo/cli" @@ -25,10 +21,6 @@ type ErigonNode struct { backend *eth.Ethereum } -func (eri *ErigonNode) SetP2PListenFunc(listenFunc func(network, addr string) (net.Listener, error)) { - eri.stack.SetP2PListenFunc(listenFunc) -} - // Serve runs the node and blocks the execution. It returns when the node is existed. func (eri *ErigonNode) Serve() error { defer eri.stack.Close() @@ -56,26 +48,53 @@ type Params struct { CustomBuckets dbutils.BucketsCfg } +func NewNodConfigUrfave(ctx *cli.Context) *node.Config { + // If we're running a known preset, log it for convenience. + chain := ctx.GlobalString(utils.ChainFlag.Name) + switch chain { + case params.RopstenChainName: + log.Info("Starting Erigon on Ropsten testnet...") + + case params.RinkebyChainName: + log.Info("Starting Erigon on Rinkeby testnet...") + + case params.GoerliChainName: + log.Info("Starting Erigon on Görli testnet...") + + case params.DevChainName: + log.Info("Starting Erigon in ephemeral dev mode...") + + case "", params.MainnetChainName: + if !ctx.GlobalIsSet(utils.NetworkIdFlag.Name) { + log.Info("Starting Erigon on Ethereum mainnet...") + } + default: + log.Info("Starting Erigon on", "devnet", chain) + } + + nodeConfig := NewNodeConfig() + utils.SetNodeConfig(ctx, nodeConfig) + erigoncli.ApplyFlagsForNodeConfig(ctx, nodeConfig) + return nodeConfig +} +func NewEthConfigUrfave(ctx *cli.Context, nodeConfig *node.Config) *ethconfig.Config { + ethConfig := ðconfig.Defaults + utils.SetEthConfig(ctx, nodeConfig, ethConfig) + erigoncli.ApplyFlagsForEthConfig(ctx, ethConfig) + return ethConfig +} + // New creates a new `ErigonNode`. // * ctx - `*cli.Context` from the main function. Necessary to be able to configure the node based on the command-line flags // * sync - `stagedsync.StagedSync`, an instance of staged sync, setup just as needed. // * optionalParams - additional parameters for running a node. func New( - ctx *cli.Context, - optionalParams Params, + nodeConfig *node.Config, + ethConfig *ethconfig.Config, ) *ErigonNode { - prepareBuckets(optionalParams.CustomBuckets) - prepare(ctx) - - nodeConfig := NewNodeConfig() - utils.SetNodeConfig(ctx, nodeConfig) - erigoncli.ApplyFlagsForNodeConfig(ctx, nodeConfig) - + //prepareBuckets(optionalParams.CustomBuckets) node := makeConfigNode(nodeConfig) - ethConfig := makeEthConfig(ctx, node) - ethereum := RegisterEthService(node, ethConfig) - return &ErigonNode{stack: node, backend: ethereum} } @@ -88,13 +107,6 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) *eth.Ethereum { return backend } -func makeEthConfig(ctx *cli.Context, node *node.Node) *ethconfig.Config { - ethConfig := ðconfig.Defaults - utils.SetEthConfig(ctx, node, ethConfig) - erigoncli.ApplyFlagsForEthConfig(ctx, ethConfig) - return ethConfig -} - func NewNodeConfig() *node.Config { nodeConfig := node.DefaultConfig // see simiar changes in `cmd/geth/config.go#defaultNodeConfig` @@ -116,33 +128,3 @@ func makeConfigNode(config *node.Config) *node.Node { return stack } - -// prepare manipulates memory cache allowance and setups metric system. -// This function should be called before launching devp2p stack. -func prepare(ctx *cli.Context) { - // If we're running a known preset, log it for convenience. - chain := ctx.GlobalString(utils.ChainFlag.Name) - switch chain { - case params.RopstenChainName: - log.Info("Starting Erigon on Ropsten testnet...") - - case params.RinkebyChainName: - log.Info("Starting Erigon on Rinkeby testnet...") - - case params.GoerliChainName: - log.Info("Starting Erigon on Görli testnet...") - - case params.DevChainName: - log.Info("Starting Erigon in ephemeral dev mode...") - - case "", params.MainnetChainName: - if !ctx.GlobalIsSet(utils.NetworkIdFlag.Name) { - log.Info("Starting Erigon on Ethereum mainnet...") - } - default: - log.Info("Starting Erigon on", "devnet", chain) - } - - // Start system runtime metrics collection - go metrics.CollectProcessMetrics(10 * time.Second) -} diff --git a/turbo/snapshotsync/downloader.go b/turbo/snapshotsync/downloader.go index 1d8dc184c8a9c6c7c505a8ed0efe561df49e1d6f..158d71437fa37edb7051c95e467ca2c9313b74b0 100644 --- a/turbo/snapshotsync/downloader.go +++ b/turbo/snapshotsync/downloader.go @@ -374,20 +374,35 @@ func ParseInfoHashKey(k []byte) (uint64, string) { return binary.BigEndian.Uint64(k), string(bytes.TrimPrefix(k[8:], []byte(SnapshotInfoHashPrefix))) } -func SnapshotSeeding(chainDB ethdb.Database, cli *Client, name string, snapshotsDir string) error { - snapshotBlock, err := chainDB.Get(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { +func GetInfo() { + +} + +func SnapshotSeeding(chainDB ethdb.RwKV, cli *Client, name string, snapshotsDir string) error { + var snapshotBlock uint64 + var hasSnapshotBlock bool + if err := chainDB.View(context.Background(), func(tx ethdb.Tx) error { + v, err := tx.GetOne(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock) + if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { + return err + } + hasSnapshotBlock = len(v) == 8 + if hasSnapshotBlock { + snapshotBlock = binary.BigEndian.Uint64(v) + } else { + log.Warn("Snapshot block unknown", "snapshot", name, "v", common.Bytes2Hex(v)) + } + return nil + }); err != nil { return err } - if len(snapshotBlock) == 8 { - hash, err := cli.SeedSnapshot(name, SnapshotName(snapshotsDir, name, binary.BigEndian.Uint64(snapshotBlock))) + if hasSnapshotBlock { + hash, err := cli.SeedSnapshot(name, SnapshotName(snapshotsDir, name, snapshotBlock)) if err != nil { return err } log.Info("Start seeding", "snapshot", name, "hash", hash.String()) - } else { - log.Warn("Snapshot block unknown", "snapshot", name, "v", common.Bytes2Hex(snapshotBlock)) } return nil } diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index 7d0787596281f466a8072d25e0013f6393bc908d..ad49be366672e920e31d6ae5de573bc82f6d8f9d 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -65,23 +65,32 @@ func WrapBySnapshotsFromDownloader(kv ethdb.RwKV, snapshots map[SnapshotType]*Sn return snKV.Open(), nil } -func WrapSnapshots(chainDb ethdb.Database, snapshotsDir string) error { - snapshotBlock, err := chainDb.Get(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { - return err +func WrapSnapshots(chainDb ethdb.RwKV, snapshotsDir string) (ethdb.RwKV, error) { + var snapshotBlock uint64 + var hasSnapshotBlock bool + if err := chainDb.View(context.Background(), func(tx ethdb.Tx) error { + v, err := tx.GetOne(dbutils.BittorrentInfoBucket, dbutils.CurrentHeadersSnapshotBlock) + if err != nil { + return err + } + hasSnapshotBlock = len(v) == 8 + if hasSnapshotBlock { + snapshotBlock = binary.BigEndian.Uint64(v) + } + return nil + }); err != nil { + return chainDb, err } - snKVOpts := kv2.NewSnapshotKV().DB(chainDb.(ethdb.HasRwKV).RwKV()) - if len(snapshotBlock) == 8 { - snKV, innerErr := OpenHeadersSnapshot(SnapshotName(snapshotsDir, "headers", binary.BigEndian.Uint64(snapshotBlock))) + + snKVOpts := kv2.NewSnapshotKV().DB(chainDb) + if hasSnapshotBlock { + snKV, innerErr := OpenHeadersSnapshot(SnapshotName(snapshotsDir, "headers", snapshotBlock)) if innerErr != nil { - return innerErr + return chainDb, innerErr } snKVOpts = snKVOpts.SnapshotDB([]string{dbutils.HeadersBucket}, snKV) } - //manually wrap current db for snapshot generation - chainDb.(ethdb.HasRwKV).SetRwKV(snKVOpts.Open()) - - return nil + return snKVOpts.Open(), nil } func DownloadSnapshots(torrentClient *Client, ExternalSnapshotDownloaderAddr string, networkID uint64, snapshotMode SnapshotMode, chainDb ethdb.Database) error {