diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go index 706f0042393b563a2fb0b122f29ffd2709b5f6bf..57f767b06373c1f6d676252ade06bc80089d8a8a 100644 --- a/cmd/integration/commands/root.go +++ b/cmd/integration/commands/root.go @@ -28,20 +28,29 @@ func RootCommand() *cobra.Command { } func openDatabase(path string, applyMigrations bool) *ethdb.ObjectDatabase { - var kv ethdb.KV + if applyMigrations { + db := ethdb.NewObjectDatabase(openKV(path, true)) + if err := migrations.NewMigrator().Apply(db, datadir); err != nil { + panic(err) + } + db.Close() + } + + db := ethdb.NewObjectDatabase(openKV(path, false)) + err := SetSnapshotKV(db, snapshotDir, snapshotMode) + if err != nil { + panic(err) + } + + return db +} + +func openKV(path string, exclusive bool) ethdb.KV { if database == "mdbx" { opts := ethdb.NewMDBX().Path(path) - if mapSizeStr != "" { - var mapSize datasize.ByteSize - must(mapSize.UnmarshalText([]byte(mapSizeStr))) - opts = opts.MapSize(mapSize) - } - if freelistReuse > 0 { - opts = opts.MaxFreelistReuse(uint(freelistReuse)) + if exclusive { + opts = opts.Exclusive() } - kv = opts.MustOpen() - } else { - opts := ethdb.NewLMDB().Path(path) if mapSizeStr != "" { var mapSize datasize.ByteSize must(mapSize.UnmarshalText([]byte(mapSizeStr))) @@ -50,19 +59,20 @@ func openDatabase(path string, applyMigrations bool) *ethdb.ObjectDatabase { if freelistReuse > 0 { opts = opts.MaxFreelistReuse(uint(freelistReuse)) } - kv = opts.MustOpen() + return opts.MustOpen() } - db := ethdb.NewObjectDatabase(kv) - if applyMigrations { - if err := migrations.NewMigrator().Apply(db, datadir); err != nil { - panic(err) - } + opts := ethdb.NewLMDB().Path(path) + if exclusive { + opts = opts.Exclusive() } - err := SetSnapshotKV(db, snapshotDir, snapshotMode) - if err != nil { - panic(err) + if mapSizeStr != "" { + var mapSize datasize.ByteSize + must(mapSize.UnmarshalText([]byte(mapSizeStr))) + opts = opts.MapSize(mapSize) } - - return db + if freelistReuse > 0 { + opts = opts.MaxFreelistReuse(uint(freelistReuse)) + } + return opts.MustOpen() } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 12ff66514af7482ad5acabddbc8d5e8949ca9cb5..d16a30fefc46fcfdc4deec3d62803f4acbc3432b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1753,10 +1753,11 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readOnly bool) (chainConfig * } else { engine = ethash.NewFaker() if !ctx.GlobalBool(FakePoWFlag.Name) { + datasetDir, _ := stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir) engine = ethash.New(ethash.Config{ CachesInMem: eth.DefaultConfig.Ethash.CachesInMem, CachesLockMmap: eth.DefaultConfig.Ethash.CachesLockMmap, - DatasetDir: stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir), + DatasetDir: datasetDir, DatasetsInMem: eth.DefaultConfig.Ethash.DatasetsInMem, DatasetsOnDisk: eth.DefaultConfig.Ethash.DatasetsOnDisk, DatasetsLockMmap: eth.DefaultConfig.Ethash.DatasetsLockMmap, diff --git a/eth/backend.go b/eth/backend.go index 1099d3c72db57c94c99f4481ab0a3fe8d04fb950..57121c70b539f68da7ce8154bf0a034b369d3e0a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -57,7 +57,6 @@ import ( "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/internal/ethapi" "github.com/ledgerwatch/turbo-geth/log" - "github.com/ledgerwatch/turbo-geth/migrations" "github.com/ledgerwatch/turbo-geth/miner" "github.com/ledgerwatch/turbo-geth/node" "github.com/ledgerwatch/turbo-geth/p2p" @@ -130,6 +129,8 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { config.TrieDirtyCache = 0 } + tmpdir := path.Join(stack.Config().DataDir, etl.TmpDirName) + // Assemble the Ethereum object var chainDb *ethdb.ObjectDatabase var err error @@ -139,18 +140,17 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { } chainDb = ethdb.MustOpen("simulator") } else { + err = stack.ApplyMigrations("chaindata", tmpdir) + if err != nil { + return nil, fmt.Errorf("failed stack.ApplyMigrations: %w", err) + } + chainDb, err = stack.OpenDatabaseWithFreezer("chaindata", 0, 0, "", "") if err != nil { return nil, err } } - tmpdir := path.Join(stack.Config().DataDir, etl.TmpDirName) - err = migrations.NewMigrator().Apply(chainDb, tmpdir) - if err != nil { - return nil, err - } - chainConfig, genesisHash, _, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis, config.StorageMode.History, false /* overwrite */) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { @@ -161,14 +161,19 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { var torrentClient *torrent.Client if config.SyncMode == downloader.StagedSync && config.SnapshotMode != (torrent.SnapshotMode{}) && config.NetworkID == params.MainnetChainConfig.ChainID.Uint64() { config.SnapshotSeeding = true - torrentClient = torrent.New(stack.Config().ResolvePath("snapshots"), config.SnapshotMode, config.SnapshotSeeding) + var dbPath string + dbPath, err = stack.Config().ResolvePath("snapshots") + if err != nil { + return nil, err + } + torrentClient = torrent.New(dbPath, config.SnapshotMode, config.SnapshotSeeding) err = torrentClient.Run(chainDb) if err != nil { return nil, err } snapshotKV := chainDb.KV() - snapshotKV, err = torrent.WrapBySnapshots(snapshotKV, stack.Config().ResolvePath("snapshots"), config.SnapshotMode) + snapshotKV, err = torrent.WrapBySnapshots(snapshotKV, dbPath, config.SnapshotMode) if err != nil { return nil, err } @@ -253,7 +258,10 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { } if config.TxPool.Journal != "" { - config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) + config.TxPool.Journal, err = stack.ResolvePath(config.TxPool.Journal) + if err != nil { + return nil, err + } } eth.txPool = core.NewTxPool(config.TxPool, chainConfig, chainDb, txCacher) diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index 42f25365d53c3367ab11de67d66d2cd65d783571..436b87dbf2dc405cb8cdc0ab4349babdd1472241 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -86,7 +86,7 @@ func DefaultBucketConfigs(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg return defaultBuckets } -func (opts LmdbOpts) Open() (KV, error) { +func (opts LmdbOpts) Open() (kv KV, err error) { env, err := lmdb.NewEnv() if err != nil { return nil, err @@ -131,14 +131,6 @@ func (opts LmdbOpts) Open() (KV, error) { } } - if opts.exclusive { - releaser, _, err := fileutil.Flock(path.Join(opts.path, "data.mdb")) - if err != nil { - return nil, err - } - defer releaser.Release() - } - var flags uint = lmdb.NoReadahead if opts.readOnly { flags |= lmdb.Readonly @@ -147,18 +139,40 @@ func (opts LmdbOpts) Open() (KV, error) { flags |= lmdb.NoMetaSync } flags |= lmdb.NoSync + + var exclusiveLock fileutil.Releaser + if opts.exclusive { + exclusiveLock, _, err = fileutil.Flock(path.Join(opts.path, "LOCK")) + if err != nil { + return nil, fmt.Errorf("failed exclusive Flock for lmdb, path=%s: %w", opts.path, err) + } + defer func() { // if kv.Open() returns error - then kv.Close() will not called - just release lock + if err != nil && exclusiveLock != nil { + _ = exclusiveLock.Release() + } + }() + } else { // try exclusive lock (release immediately) + exclusiveLock, _, err = fileutil.Flock(path.Join(opts.path, "LOCK")) + if err != nil { + return nil, fmt.Errorf("failed exclusive Flock for lmdb, path=%s: %w", opts.path, err) + } + _ = exclusiveLock.Release() + } + + db := &LmdbKV{ + exclusiveLock: exclusiveLock, + opts: opts, + env: env, + log: logger, + wg: &sync.WaitGroup{}, + buckets: dbutils.BucketsCfg{}, + } + err = env.Open(opts.path, flags, 0664) if err != nil { return nil, fmt.Errorf("%w, path: %s", err, opts.path) } - db := &LmdbKV{ - opts: opts, - env: env, - log: logger, - wg: &sync.WaitGroup{}, - buckets: dbutils.BucketsCfg{}, - } customBuckets := opts.bucketsCfg(dbutils.BucketsConfigs) for name, cfg := range customBuckets { // copy map to avoid changing global variable db.buckets[name] = cfg @@ -251,11 +265,12 @@ func (opts LmdbOpts) MustOpen() KV { } type LmdbKV struct { - opts LmdbOpts - env *lmdb.Env - log log.Logger - buckets dbutils.BucketsCfg - wg *sync.WaitGroup + opts LmdbOpts + env *lmdb.Env + log log.Logger + buckets dbutils.BucketsCfg + wg *sync.WaitGroup + exclusiveLock fileutil.Releaser } func NewLMDB() LmdbOpts { @@ -269,6 +284,10 @@ func (db *LmdbKV) Close() { db.wg.Wait() } + if db.exclusiveLock != nil { + _ = db.exclusiveLock.Release() + } + if db.env != nil { env := db.env db.env = nil @@ -284,7 +303,6 @@ func (db *LmdbKV) Close() { db.log.Warn("failed to remove in-mem db file", "err", err) } } - } func (db *LmdbKV) DiskSize(_ context.Context) (uint64, error) { diff --git a/ethdb/kv_mdbx.go b/ethdb/kv_mdbx.go index f7ec60f37990c65d9a9728ee635cd5e88a81a712..b98b724d63b5de957728e37025db210e69697279 100644 --- a/ethdb/kv_mdbx.go +++ b/ethdb/kv_mdbx.go @@ -19,6 +19,7 @@ import ( type MdbxOpts struct { inMem bool + exclusive bool readOnly bool path string bucketsCfg BucketConfigsFunc @@ -40,6 +41,11 @@ func (opts MdbxOpts) InMem() MdbxOpts { return opts } +func (opts MdbxOpts) Exclusive() MdbxOpts { + opts.exclusive = true + return opts +} + func (opts MdbxOpts) MapSize(sz datasize.ByteSize) MdbxOpts { opts.mapSize = sz return opts @@ -111,6 +117,9 @@ func (opts MdbxOpts) Open() (KV, error) { if opts.inMem { flags |= mdbx.NoMetaSync | mdbx.SafeNoSync } + if opts.exclusive { + flags |= mdbx.Exclusive + } //flags |= mdbx.LifoReclaim flags |= mdbx.Coalesce diff --git a/migrations/migrations.go b/migrations/migrations.go index f4a234797b696e04fffde97179f8ada8960a84ab..49588a840f0c9a90d0eea762898dc16c1e173d0c 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -105,6 +105,40 @@ func AppliedMigrations(db ethdb.Database, withPayload bool) (map[string][]byte, return applied, err } +func (m *Migrator) HasPendingMigrations(db ethdb.Database) (bool, error) { + pending, err := m.PendingMigrations(db) + if err != nil { + return false, err + } + return len(pending) > 0, nil +} + +func (m *Migrator) PendingMigrations(db ethdb.Database) ([]Migration, error) { + applied, err := AppliedMigrations(db, false) + if err != nil { + return nil, err + } + + counter := 0 + for i := range m.Migrations { + v := m.Migrations[i] + if _, ok := applied[v.Name]; ok { + continue + } + counter++ + } + + pending := make([]Migration, 0, counter) + for i := range m.Migrations { + v := m.Migrations[i] + if _, ok := applied[v.Name]; ok { + continue + } + pending = append(pending, v) + } + return pending, nil +} + func (m *Migrator) Apply(db ethdb.Database, tmpdir string) error { if len(m.Migrations) == 0 { return nil diff --git a/node/config.go b/node/config.go index 0e7458c18031302d5642b7f9bab1883a6a1d2702..d5bd97911d3a5f93b1651a15b150e62c5bb73f15 100644 --- a/node/config.go +++ b/node/config.go @@ -229,7 +229,7 @@ func (c *Config) IPCEndpoint() string { } // NodeDB returns the path to the discovery node database. -func (c *Config) NodeDB() string { +func (c *Config) NodeDB() (string, error) { return c.ResolvePath(datadirNodeDatabase) } @@ -307,14 +307,14 @@ func (c *Config) name() string { } // ResolvePath resolves path in the instance directory. -func (c *Config) ResolvePath(path string) string { +func (c *Config) ResolvePath(path string) (string, error) { if filepath.IsAbs(path) { - return path + return path, nil } if c.DataDir == "" { - return "" + return filepath.Abs("") } - return filepath.Join(c.instanceDir(), path) + return filepath.Join(c.instanceDir(), path), nil } func (c *Config) instanceDir() string { @@ -331,23 +331,26 @@ func (c *Config) instanceDir() string { // NodeKey retrieves the currently configured private key of the node, checking // first any manually set key, falling back to the one found in the configured // data folder. If no key can be found, a new one is generated. -func (c *Config) NodeKey() *ecdsa.PrivateKey { +func (c *Config) NodeKey() (*ecdsa.PrivateKey, error) { // Use any specifically configured key. if c.P2P.PrivateKey != nil { - return c.P2P.PrivateKey + return c.P2P.PrivateKey, nil } // Generate ephemeral key if no datadir is being used. if c.DataDir == "" { key, err := crypto.GenerateKey() if err != nil { - log.Crit(fmt.Sprintf("Failed to generate ephemeral node key: %v", err)) + return key, fmt.Errorf("failed to generate ephemeral node key: %w", err) } - return key + return key, nil } - keyfile := c.ResolvePath(datadirPrivateKey) + keyfile, err := c.ResolvePath(datadirPrivateKey) + if err != nil { + return nil, err + } if key, err := crypto.LoadECDSA(keyfile); err == nil { - return key + return key, nil } // No persistent key found, generate and store a new one. key, err := crypto.GenerateKey() @@ -357,23 +360,31 @@ func (c *Config) NodeKey() *ecdsa.PrivateKey { instanceDir := c.instanceDir() if err := os.MkdirAll(instanceDir, 0700); err != nil { log.Error(fmt.Sprintf("Failed to persist node key: %v", err)) - return key + return key, nil } keyfile = filepath.Join(instanceDir, datadirPrivateKey) if err := crypto.SaveECDSA(keyfile, key); err != nil { log.Error(fmt.Sprintf("Failed to persist node key: %v", err)) } - return key + return key, nil } // StaticNodes returns a list of node enode URLs configured as static nodes. -func (c *Config) StaticNodes() []*enode.Node { - return c.parsePersistentNodes(&c.staticNodesWarning, c.ResolvePath(datadirStaticNodes)) +func (c *Config) StaticNodes() ([]*enode.Node, error) { + dbPath, err := c.ResolvePath(datadirStaticNodes) + if err != nil { + return nil, err + } + return c.parsePersistentNodes(&c.staticNodesWarning, dbPath), nil } // TrustedNodes returns a list of node enode URLs configured as trusted nodes. -func (c *Config) TrustedNodes() []*enode.Node { - return c.parsePersistentNodes(&c.trustedNodesWarning, c.ResolvePath(datadirTrustedNodes)) +func (c *Config) TrustedNodes() ([]*enode.Node, error) { + dbPath, err := c.ResolvePath(datadirTrustedNodes) + if err != nil { + return nil, err + } + return c.parsePersistentNodes(&c.trustedNodesWarning, dbPath), nil } // parsePersistentNodes parses a list of discovery node URLs loaded from a .json diff --git a/node/node.go b/node/node.go index 71c70689c6df473ae3a0ca20638bf4431f8bc374..73c41e19bc653c996b27f1d7bf75549edd413e1f 100644 --- a/node/node.go +++ b/node/node.go @@ -31,6 +31,7 @@ import ( "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/migrations" "github.com/ledgerwatch/turbo-geth/p2p" "github.com/ledgerwatch/turbo-geth/rpc" "github.com/prometheus/tsdb/fileutil" @@ -122,17 +123,29 @@ func New(conf *Config) (*Node, error) { node.ephemKeystore = ephemeralKeystore // Initialize the p2p server. This creates the node key and discovery databases. - node.server.Config.PrivateKey = node.config.NodeKey() + node.server.Config.PrivateKey, err = node.config.NodeKey() + if err != nil { + return nil, err + } node.server.Config.Name = node.config.NodeName() node.server.Config.Logger = node.log if node.server.Config.StaticNodes == nil { - node.server.Config.StaticNodes = node.config.StaticNodes() + node.server.Config.StaticNodes, err = node.config.StaticNodes() + if err != nil { + return nil, err + } } if node.server.Config.TrustedNodes == nil { - node.server.Config.TrustedNodes = node.config.TrustedNodes() + node.server.Config.TrustedNodes, err = node.config.TrustedNodes() + if err != nil { + return nil, err + } } if node.server.Config.NodeDatabase == "" { - node.server.Config.NodeDatabase = node.config.NodeDB() + node.server.Config.NodeDatabase, err = node.config.NodeDB() + if err != nil { + return nil, err + } } // Configure RPC servers. @@ -544,6 +557,23 @@ func (n *Node) OpenDatabase(name string) (*ethdb.ObjectDatabase, error) { return n.OpenDatabaseWithFreezer(name, 0, 0, "", "") } +func (n *Node) ApplyMigrations(name string, tmpdir string) error { + n.lock.Lock() + defer n.lock.Unlock() + + dbPath, err := n.config.ResolvePath(name) + if err != nil { + return err + } + kv, err := ethdb.NewLMDB().Path(dbPath).MapSize(n.config.LMDBMapSize).MaxFreelistReuse(n.config.LMDBMaxFreelistReuse).Exclusive().Open() + if err != nil { + return fmt.Errorf("failed to open kv inside stack.ApplyMigrations: %w", err) + } + defer kv.Close() + + return migrations.NewMigrator().Apply(ethdb.NewObjectDatabase(kv), tmpdir) +} + // OpenDatabaseWithFreezer opens an existing database with the given name (or // creates one if no previous can be found) from within the node's data directory, // also attaching a chain freezer to it that moves ancient chain data from the @@ -565,14 +595,22 @@ func (n *Node) OpenDatabaseWithFreezer(name string, _, _ int, _, _ string) (*eth } else { if n.config.MDBX { log.Info("Opening Database (MDBX)", "mapSize", n.config.LMDBMapSize.HR()) - kv, err := ethdb.NewMDBX().Path(n.config.ResolvePath(name)).MapSize(n.config.LMDBMapSize).Open() + dbPath, err := n.config.ResolvePath(name) + if err != nil { + return nil, err + } + kv, err := ethdb.NewMDBX().Path(dbPath).MapSize(n.config.LMDBMapSize).Open() if err != nil { return nil, err } db = ethdb.NewObjectDatabase(kv) } else { log.Info("Opening Database (LMDB)", "mapSize", n.config.LMDBMapSize.HR(), "maxFreelistReuse", n.config.LMDBMaxFreelistReuse) - kv, err := ethdb.NewLMDB().Path(n.config.ResolvePath(name)).MapSize(n.config.LMDBMapSize).MaxFreelistReuse(n.config.LMDBMaxFreelistReuse).Open() + dbPath, err := n.config.ResolvePath(name) + if err != nil { + return nil, err + } + kv, err := ethdb.NewLMDB().Path(dbPath).MapSize(n.config.LMDBMapSize).MaxFreelistReuse(n.config.LMDBMaxFreelistReuse).Open() if err != nil { return nil, err } @@ -585,6 +623,6 @@ func (n *Node) OpenDatabaseWithFreezer(name string, _, _ int, _, _ string) (*eth } // ResolvePath returns the absolute path of a resource in the instance directory. -func (n *Node) ResolvePath(x string) string { +func (n *Node) ResolvePath(x string) (string, error) { return n.config.ResolvePath(x) }