From a139f31dcd219788d3bec028e2deb550ec9fedfd Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Wed, 28 Oct 2020 16:52:15 +0700
Subject: [PATCH] Exclusive lock manual (#1302)

* exclusive lock for chaindata folder

* exclusive lock for chaindata folder

* exclusive lock for chaindata folder

* mdb tools

* resolve abs path

* rpcdaemon to check exclusive lock

* rpcdaemon to check exclusive lock

* exclusive lock for mdbx

* exclusive lock for mdbx
---
 cmd/integration/commands/root.go | 54 ++++++++++++++++------------
 cmd/utils/flags.go               |  3 +-
 eth/backend.go                   | 28 +++++++++------
 ethdb/kv_lmdb.go                 | 62 ++++++++++++++++++++------------
 ethdb/kv_mdbx.go                 |  9 +++++
 migrations/migrations.go         | 34 ++++++++++++++++++
 node/config.go                   | 45 ++++++++++++++---------
 node/node.go                     | 52 +++++++++++++++++++++++----
 8 files changed, 208 insertions(+), 79 deletions(-)

diff --git a/cmd/integration/commands/root.go b/cmd/integration/commands/root.go
index 706f004239..57f767b063 100644
--- a/cmd/integration/commands/root.go
+++ b/cmd/integration/commands/root.go
@@ -28,20 +28,29 @@ func RootCommand() *cobra.Command {
 }
 
 func openDatabase(path string, applyMigrations bool) *ethdb.ObjectDatabase {
-	var kv ethdb.KV
+	if applyMigrations {
+		db := ethdb.NewObjectDatabase(openKV(path, true))
+		if err := migrations.NewMigrator().Apply(db, datadir); err != nil {
+			panic(err)
+		}
+		db.Close()
+	}
+
+	db := ethdb.NewObjectDatabase(openKV(path, false))
+	err := SetSnapshotKV(db, snapshotDir, snapshotMode)
+	if err != nil {
+		panic(err)
+	}
+
+	return db
+}
+
+func openKV(path string, exclusive bool) ethdb.KV {
 	if database == "mdbx" {
 		opts := ethdb.NewMDBX().Path(path)
-		if mapSizeStr != "" {
-			var mapSize datasize.ByteSize
-			must(mapSize.UnmarshalText([]byte(mapSizeStr)))
-			opts = opts.MapSize(mapSize)
-		}
-		if freelistReuse > 0 {
-			opts = opts.MaxFreelistReuse(uint(freelistReuse))
+		if exclusive {
+			opts = opts.Exclusive()
 		}
-		kv = opts.MustOpen()
-	} else {
-		opts := ethdb.NewLMDB().Path(path)
 		if mapSizeStr != "" {
 			var mapSize datasize.ByteSize
 			must(mapSize.UnmarshalText([]byte(mapSizeStr)))
@@ -50,19 +59,20 @@ func openDatabase(path string, applyMigrations bool) *ethdb.ObjectDatabase {
 		if freelistReuse > 0 {
 			opts = opts.MaxFreelistReuse(uint(freelistReuse))
 		}
-		kv = opts.MustOpen()
+		return opts.MustOpen()
 	}
 
-	db := ethdb.NewObjectDatabase(kv)
-	if applyMigrations {
-		if err := migrations.NewMigrator().Apply(db, datadir); err != nil {
-			panic(err)
-		}
+	opts := ethdb.NewLMDB().Path(path)
+	if exclusive {
+		opts = opts.Exclusive()
 	}
-	err := SetSnapshotKV(db, snapshotDir, snapshotMode)
-	if err != nil {
-		panic(err)
+	if mapSizeStr != "" {
+		var mapSize datasize.ByteSize
+		must(mapSize.UnmarshalText([]byte(mapSizeStr)))
+		opts = opts.MapSize(mapSize)
 	}
-
-	return db
+	if freelistReuse > 0 {
+		opts = opts.MaxFreelistReuse(uint(freelistReuse))
+	}
+	return opts.MustOpen()
 }
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 12ff66514a..d16a30fefc 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -1753,10 +1753,11 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readOnly bool) (chainConfig *
 	} else {
 		engine = ethash.NewFaker()
 		if !ctx.GlobalBool(FakePoWFlag.Name) {
+			datasetDir, _ := stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir)
 			engine = ethash.New(ethash.Config{
 				CachesInMem:      eth.DefaultConfig.Ethash.CachesInMem,
 				CachesLockMmap:   eth.DefaultConfig.Ethash.CachesLockMmap,
-				DatasetDir:       stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir),
+				DatasetDir:       datasetDir,
 				DatasetsInMem:    eth.DefaultConfig.Ethash.DatasetsInMem,
 				DatasetsOnDisk:   eth.DefaultConfig.Ethash.DatasetsOnDisk,
 				DatasetsLockMmap: eth.DefaultConfig.Ethash.DatasetsLockMmap,
diff --git a/eth/backend.go b/eth/backend.go
index 1099d3c72d..57121c70b5 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -57,7 +57,6 @@ import (
 	"github.com/ledgerwatch/turbo-geth/event"
 	"github.com/ledgerwatch/turbo-geth/internal/ethapi"
 	"github.com/ledgerwatch/turbo-geth/log"
-	"github.com/ledgerwatch/turbo-geth/migrations"
 	"github.com/ledgerwatch/turbo-geth/miner"
 	"github.com/ledgerwatch/turbo-geth/node"
 	"github.com/ledgerwatch/turbo-geth/p2p"
@@ -130,6 +129,8 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
 		config.TrieDirtyCache = 0
 	}
 
+	tmpdir := path.Join(stack.Config().DataDir, etl.TmpDirName)
+
 	// Assemble the Ethereum object
 	var chainDb *ethdb.ObjectDatabase
 	var err error
@@ -139,18 +140,17 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
 		}
 		chainDb = ethdb.MustOpen("simulator")
 	} else {
+		err = stack.ApplyMigrations("chaindata", tmpdir)
+		if err != nil {
+			return nil, fmt.Errorf("failed stack.ApplyMigrations: %w", err)
+		}
+
 		chainDb, err = stack.OpenDatabaseWithFreezer("chaindata", 0, 0, "", "")
 		if err != nil {
 			return nil, err
 		}
 	}
 
-	tmpdir := path.Join(stack.Config().DataDir, etl.TmpDirName)
-	err = migrations.NewMigrator().Apply(chainDb, tmpdir)
-	if err != nil {
-		return nil, err
-	}
-
 	chainConfig, genesisHash, _, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis, config.StorageMode.History, false /* overwrite */)
 
 	if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
@@ -161,14 +161,19 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
 	var torrentClient *torrent.Client
 	if config.SyncMode == downloader.StagedSync && config.SnapshotMode != (torrent.SnapshotMode{}) && config.NetworkID == params.MainnetChainConfig.ChainID.Uint64() {
 		config.SnapshotSeeding = true
-		torrentClient = torrent.New(stack.Config().ResolvePath("snapshots"), config.SnapshotMode, config.SnapshotSeeding)
+		var dbPath string
+		dbPath, err = stack.Config().ResolvePath("snapshots")
+		if err != nil {
+			return nil, err
+		}
+		torrentClient = torrent.New(dbPath, config.SnapshotMode, config.SnapshotSeeding)
 		err = torrentClient.Run(chainDb)
 		if err != nil {
 			return nil, err
 		}
 
 		snapshotKV := chainDb.KV()
-		snapshotKV, err = torrent.WrapBySnapshots(snapshotKV, stack.Config().ResolvePath("snapshots"), config.SnapshotMode)
+		snapshotKV, err = torrent.WrapBySnapshots(snapshotKV, dbPath, config.SnapshotMode)
 		if err != nil {
 			return nil, err
 		}
@@ -253,7 +258,10 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
 	}
 
 	if config.TxPool.Journal != "" {
-		config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
+		config.TxPool.Journal, err = stack.ResolvePath(config.TxPool.Journal)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	eth.txPool = core.NewTxPool(config.TxPool, chainConfig, chainDb, txCacher)
diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go
index 42f25365d5..436b87dbf2 100644
--- a/ethdb/kv_lmdb.go
+++ b/ethdb/kv_lmdb.go
@@ -86,7 +86,7 @@ func DefaultBucketConfigs(defaultBuckets dbutils.BucketsCfg) dbutils.BucketsCfg
 	return defaultBuckets
 }
 
-func (opts LmdbOpts) Open() (KV, error) {
+func (opts LmdbOpts) Open() (kv KV, err error) {
 	env, err := lmdb.NewEnv()
 	if err != nil {
 		return nil, err
@@ -131,14 +131,6 @@ func (opts LmdbOpts) Open() (KV, error) {
 		}
 	}
 
-	if opts.exclusive {
-		releaser, _, err := fileutil.Flock(path.Join(opts.path, "data.mdb"))
-		if err != nil {
-			return nil, err
-		}
-		defer releaser.Release()
-	}
-
 	var flags uint = lmdb.NoReadahead
 	if opts.readOnly {
 		flags |= lmdb.Readonly
@@ -147,18 +139,40 @@ func (opts LmdbOpts) Open() (KV, error) {
 		flags |= lmdb.NoMetaSync
 	}
 	flags |= lmdb.NoSync
+
+	var exclusiveLock fileutil.Releaser
+	if opts.exclusive {
+		exclusiveLock, _, err = fileutil.Flock(path.Join(opts.path, "LOCK"))
+		if err != nil {
+			return nil, fmt.Errorf("failed exclusive Flock for lmdb, path=%s: %w", opts.path, err)
+		}
+		defer func() { // if kv.Open() returns error - then kv.Close() will not called - just release lock
+			if err != nil && exclusiveLock != nil {
+				_ = exclusiveLock.Release()
+			}
+		}()
+	} else { // try exclusive lock (release immediately)
+		exclusiveLock, _, err = fileutil.Flock(path.Join(opts.path, "LOCK"))
+		if err != nil {
+			return nil, fmt.Errorf("failed exclusive Flock for lmdb, path=%s: %w", opts.path, err)
+		}
+		_ = exclusiveLock.Release()
+	}
+
+	db := &LmdbKV{
+		exclusiveLock: exclusiveLock,
+		opts:          opts,
+		env:           env,
+		log:           logger,
+		wg:            &sync.WaitGroup{},
+		buckets:       dbutils.BucketsCfg{},
+	}
+
 	err = env.Open(opts.path, flags, 0664)
 	if err != nil {
 		return nil, fmt.Errorf("%w, path: %s", err, opts.path)
 	}
 
-	db := &LmdbKV{
-		opts:    opts,
-		env:     env,
-		log:     logger,
-		wg:      &sync.WaitGroup{},
-		buckets: dbutils.BucketsCfg{},
-	}
 	customBuckets := opts.bucketsCfg(dbutils.BucketsConfigs)
 	for name, cfg := range customBuckets { // copy map to avoid changing global variable
 		db.buckets[name] = cfg
@@ -251,11 +265,12 @@ func (opts LmdbOpts) MustOpen() KV {
 }
 
 type LmdbKV struct {
-	opts    LmdbOpts
-	env     *lmdb.Env
-	log     log.Logger
-	buckets dbutils.BucketsCfg
-	wg      *sync.WaitGroup
+	opts          LmdbOpts
+	env           *lmdb.Env
+	log           log.Logger
+	buckets       dbutils.BucketsCfg
+	wg            *sync.WaitGroup
+	exclusiveLock fileutil.Releaser
 }
 
 func NewLMDB() LmdbOpts {
@@ -269,6 +284,10 @@ func (db *LmdbKV) Close() {
 		db.wg.Wait()
 	}
 
+	if db.exclusiveLock != nil {
+		_ = db.exclusiveLock.Release()
+	}
+
 	if db.env != nil {
 		env := db.env
 		db.env = nil
@@ -284,7 +303,6 @@ func (db *LmdbKV) Close() {
 			db.log.Warn("failed to remove in-mem db file", "err", err)
 		}
 	}
-
 }
 
 func (db *LmdbKV) DiskSize(_ context.Context) (uint64, error) {
diff --git a/ethdb/kv_mdbx.go b/ethdb/kv_mdbx.go
index f7ec60f379..b98b724d63 100644
--- a/ethdb/kv_mdbx.go
+++ b/ethdb/kv_mdbx.go
@@ -19,6 +19,7 @@ import (
 
 type MdbxOpts struct {
 	inMem            bool
+	exclusive        bool
 	readOnly         bool
 	path             string
 	bucketsCfg       BucketConfigsFunc
@@ -40,6 +41,11 @@ func (opts MdbxOpts) InMem() MdbxOpts {
 	return opts
 }
 
+func (opts MdbxOpts) Exclusive() MdbxOpts {
+	opts.exclusive = true
+	return opts
+}
+
 func (opts MdbxOpts) MapSize(sz datasize.ByteSize) MdbxOpts {
 	opts.mapSize = sz
 	return opts
@@ -111,6 +117,9 @@ func (opts MdbxOpts) Open() (KV, error) {
 	if opts.inMem {
 		flags |= mdbx.NoMetaSync | mdbx.SafeNoSync
 	}
+	if opts.exclusive {
+		flags |= mdbx.Exclusive
+	}
 
 	//flags |= mdbx.LifoReclaim
 	flags |= mdbx.Coalesce
diff --git a/migrations/migrations.go b/migrations/migrations.go
index f4a234797b..49588a840f 100644
--- a/migrations/migrations.go
+++ b/migrations/migrations.go
@@ -105,6 +105,40 @@ func AppliedMigrations(db ethdb.Database, withPayload bool) (map[string][]byte,
 	return applied, err
 }
 
+func (m *Migrator) HasPendingMigrations(db ethdb.Database) (bool, error) {
+	pending, err := m.PendingMigrations(db)
+	if err != nil {
+		return false, err
+	}
+	return len(pending) > 0, nil
+}
+
+func (m *Migrator) PendingMigrations(db ethdb.Database) ([]Migration, error) {
+	applied, err := AppliedMigrations(db, false)
+	if err != nil {
+		return nil, err
+	}
+
+	counter := 0
+	for i := range m.Migrations {
+		v := m.Migrations[i]
+		if _, ok := applied[v.Name]; ok {
+			continue
+		}
+		counter++
+	}
+
+	pending := make([]Migration, 0, counter)
+	for i := range m.Migrations {
+		v := m.Migrations[i]
+		if _, ok := applied[v.Name]; ok {
+			continue
+		}
+		pending = append(pending, v)
+	}
+	return pending, nil
+}
+
 func (m *Migrator) Apply(db ethdb.Database, tmpdir string) error {
 	if len(m.Migrations) == 0 {
 		return nil
diff --git a/node/config.go b/node/config.go
index 0e7458c180..d5bd97911d 100644
--- a/node/config.go
+++ b/node/config.go
@@ -229,7 +229,7 @@ func (c *Config) IPCEndpoint() string {
 }
 
 // NodeDB returns the path to the discovery node database.
-func (c *Config) NodeDB() string {
+func (c *Config) NodeDB() (string, error) {
 	return c.ResolvePath(datadirNodeDatabase)
 }
 
@@ -307,14 +307,14 @@ func (c *Config) name() string {
 }
 
 // ResolvePath resolves path in the instance directory.
-func (c *Config) ResolvePath(path string) string {
+func (c *Config) ResolvePath(path string) (string, error) {
 	if filepath.IsAbs(path) {
-		return path
+		return path, nil
 	}
 	if c.DataDir == "" {
-		return ""
+		return filepath.Abs("")
 	}
-	return filepath.Join(c.instanceDir(), path)
+	return filepath.Join(c.instanceDir(), path), nil
 }
 
 func (c *Config) instanceDir() string {
@@ -331,23 +331,26 @@ func (c *Config) instanceDir() string {
 // NodeKey retrieves the currently configured private key of the node, checking
 // first any manually set key, falling back to the one found in the configured
 // data folder. If no key can be found, a new one is generated.
-func (c *Config) NodeKey() *ecdsa.PrivateKey {
+func (c *Config) NodeKey() (*ecdsa.PrivateKey, error) {
 	// Use any specifically configured key.
 	if c.P2P.PrivateKey != nil {
-		return c.P2P.PrivateKey
+		return c.P2P.PrivateKey, nil
 	}
 	// Generate ephemeral key if no datadir is being used.
 	if c.DataDir == "" {
 		key, err := crypto.GenerateKey()
 		if err != nil {
-			log.Crit(fmt.Sprintf("Failed to generate ephemeral node key: %v", err))
+			return key, fmt.Errorf("failed to generate ephemeral node key: %w", err)
 		}
-		return key
+		return key, nil
 	}
 
-	keyfile := c.ResolvePath(datadirPrivateKey)
+	keyfile, err := c.ResolvePath(datadirPrivateKey)
+	if err != nil {
+		return nil, err
+	}
 	if key, err := crypto.LoadECDSA(keyfile); err == nil {
-		return key
+		return key, nil
 	}
 	// No persistent key found, generate and store a new one.
 	key, err := crypto.GenerateKey()
@@ -357,23 +360,31 @@ func (c *Config) NodeKey() *ecdsa.PrivateKey {
 	instanceDir := c.instanceDir()
 	if err := os.MkdirAll(instanceDir, 0700); err != nil {
 		log.Error(fmt.Sprintf("Failed to persist node key: %v", err))
-		return key
+		return key, nil
 	}
 	keyfile = filepath.Join(instanceDir, datadirPrivateKey)
 	if err := crypto.SaveECDSA(keyfile, key); err != nil {
 		log.Error(fmt.Sprintf("Failed to persist node key: %v", err))
 	}
-	return key
+	return key, nil
 }
 
 // StaticNodes returns a list of node enode URLs configured as static nodes.
-func (c *Config) StaticNodes() []*enode.Node {
-	return c.parsePersistentNodes(&c.staticNodesWarning, c.ResolvePath(datadirStaticNodes))
+func (c *Config) StaticNodes() ([]*enode.Node, error) {
+	dbPath, err := c.ResolvePath(datadirStaticNodes)
+	if err != nil {
+		return nil, err
+	}
+	return c.parsePersistentNodes(&c.staticNodesWarning, dbPath), nil
 }
 
 // TrustedNodes returns a list of node enode URLs configured as trusted nodes.
-func (c *Config) TrustedNodes() []*enode.Node {
-	return c.parsePersistentNodes(&c.trustedNodesWarning, c.ResolvePath(datadirTrustedNodes))
+func (c *Config) TrustedNodes() ([]*enode.Node, error) {
+	dbPath, err := c.ResolvePath(datadirTrustedNodes)
+	if err != nil {
+		return nil, err
+	}
+	return c.parsePersistentNodes(&c.trustedNodesWarning, dbPath), nil
 }
 
 // parsePersistentNodes parses a list of discovery node URLs loaded from a .json
diff --git a/node/node.go b/node/node.go
index 71c70689c6..73c41e19bc 100644
--- a/node/node.go
+++ b/node/node.go
@@ -31,6 +31,7 @@ import (
 	"github.com/ledgerwatch/turbo-geth/ethdb"
 	"github.com/ledgerwatch/turbo-geth/event"
 	"github.com/ledgerwatch/turbo-geth/log"
+	"github.com/ledgerwatch/turbo-geth/migrations"
 	"github.com/ledgerwatch/turbo-geth/p2p"
 	"github.com/ledgerwatch/turbo-geth/rpc"
 	"github.com/prometheus/tsdb/fileutil"
@@ -122,17 +123,29 @@ func New(conf *Config) (*Node, error) {
 	node.ephemKeystore = ephemeralKeystore
 
 	// Initialize the p2p server. This creates the node key and discovery databases.
-	node.server.Config.PrivateKey = node.config.NodeKey()
+	node.server.Config.PrivateKey, err = node.config.NodeKey()
+	if err != nil {
+		return nil, err
+	}
 	node.server.Config.Name = node.config.NodeName()
 	node.server.Config.Logger = node.log
 	if node.server.Config.StaticNodes == nil {
-		node.server.Config.StaticNodes = node.config.StaticNodes()
+		node.server.Config.StaticNodes, err = node.config.StaticNodes()
+		if err != nil {
+			return nil, err
+		}
 	}
 	if node.server.Config.TrustedNodes == nil {
-		node.server.Config.TrustedNodes = node.config.TrustedNodes()
+		node.server.Config.TrustedNodes, err = node.config.TrustedNodes()
+		if err != nil {
+			return nil, err
+		}
 	}
 	if node.server.Config.NodeDatabase == "" {
-		node.server.Config.NodeDatabase = node.config.NodeDB()
+		node.server.Config.NodeDatabase, err = node.config.NodeDB()
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	// Configure RPC servers.
@@ -544,6 +557,23 @@ func (n *Node) OpenDatabase(name string) (*ethdb.ObjectDatabase, error) {
 	return n.OpenDatabaseWithFreezer(name, 0, 0, "", "")
 }
 
+func (n *Node) ApplyMigrations(name string, tmpdir string) error {
+	n.lock.Lock()
+	defer n.lock.Unlock()
+
+	dbPath, err := n.config.ResolvePath(name)
+	if err != nil {
+		return err
+	}
+	kv, err := ethdb.NewLMDB().Path(dbPath).MapSize(n.config.LMDBMapSize).MaxFreelistReuse(n.config.LMDBMaxFreelistReuse).Exclusive().Open()
+	if err != nil {
+		return fmt.Errorf("failed to open kv inside stack.ApplyMigrations: %w", err)
+	}
+	defer kv.Close()
+
+	return migrations.NewMigrator().Apply(ethdb.NewObjectDatabase(kv), tmpdir)
+}
+
 // OpenDatabaseWithFreezer opens an existing database with the given name (or
 // creates one if no previous can be found) from within the node's data directory,
 // also attaching a chain freezer to it that moves ancient chain data from the
@@ -565,14 +595,22 @@ func (n *Node) OpenDatabaseWithFreezer(name string, _, _ int, _, _ string) (*eth
 	} else {
 		if n.config.MDBX {
 			log.Info("Opening Database (MDBX)", "mapSize", n.config.LMDBMapSize.HR())
-			kv, err := ethdb.NewMDBX().Path(n.config.ResolvePath(name)).MapSize(n.config.LMDBMapSize).Open()
+			dbPath, err := n.config.ResolvePath(name)
+			if err != nil {
+				return nil, err
+			}
+			kv, err := ethdb.NewMDBX().Path(dbPath).MapSize(n.config.LMDBMapSize).Open()
 			if err != nil {
 				return nil, err
 			}
 			db = ethdb.NewObjectDatabase(kv)
 		} else {
 			log.Info("Opening Database (LMDB)", "mapSize", n.config.LMDBMapSize.HR(), "maxFreelistReuse", n.config.LMDBMaxFreelistReuse)
-			kv, err := ethdb.NewLMDB().Path(n.config.ResolvePath(name)).MapSize(n.config.LMDBMapSize).MaxFreelistReuse(n.config.LMDBMaxFreelistReuse).Open()
+			dbPath, err := n.config.ResolvePath(name)
+			if err != nil {
+				return nil, err
+			}
+			kv, err := ethdb.NewLMDB().Path(dbPath).MapSize(n.config.LMDBMapSize).MaxFreelistReuse(n.config.LMDBMaxFreelistReuse).Open()
 			if err != nil {
 				return nil, err
 			}
@@ -585,6 +623,6 @@ func (n *Node) OpenDatabaseWithFreezer(name string, _, _ int, _, _ string) (*eth
 }
 
 // ResolvePath returns the absolute path of a resource in the instance directory.
-func (n *Node) ResolvePath(x string) string {
+func (n *Node) ResolvePath(x string) (string, error) {
 	return n.config.ResolvePath(x)
 }
-- 
GitLab