From 745757ac6bb10c296ab30874ddde774f4fcdec1e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Thu, 29 Apr 2021 17:33:45 +0300
Subject: [PATCH] core, eth: abort snapshot generation on snap sync and resume
 later

---
 core/blockchain.go                |  3 +-
 core/rawdb/accessors_snapshot.go  | 20 +++++++++
 core/rawdb/database.go            |  6 +--
 core/rawdb/schema.go              |  3 ++
 core/state/snapshot/generate.go   | 10 -----
 core/state/snapshot/journal.go    | 15 ++++---
 core/state/snapshot/snapshot.go   | 68 +++++++++++++++++++++++++++----
 eth/downloader/downloader.go      | 10 +++++
 eth/downloader/downloader_test.go |  6 +++
 eth/protocols/snap/sync.go        |  5 ---
 10 files changed, 115 insertions(+), 31 deletions(-)

diff --git a/core/blockchain.go b/core/blockchain.go
index 49aa1c3e8..c3562902d 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -640,7 +640,8 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
 	headBlockGauge.Update(int64(block.NumberU64()))
 	bc.chainmu.Unlock()
 
-	// Destroy any existing state snapshot and regenerate it in the background
+	// Destroy any existing state snapshot and regenerate it in the background,
+	// also resuming the normal maintenance of any previously paused snapshot.
 	if bc.snaps != nil {
 		bc.snaps.Rebuild(block.Root())
 	}
diff --git a/core/rawdb/accessors_snapshot.go b/core/rawdb/accessors_snapshot.go
index c3616ba3a..88446e079 100644
--- a/core/rawdb/accessors_snapshot.go
+++ b/core/rawdb/accessors_snapshot.go
@@ -24,6 +24,26 @@ import (
 	"github.com/ethereum/go-ethereum/log"
 )
 
+// ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled.
+func ReadSnapshotDisabled(db ethdb.KeyValueReader) bool {
+	disabled, _ := db.Has(snapshotDisabledKey)
+	return disabled
+}
+
+// WriteSnapshotDisabled stores the snapshot pause flag.
+func WriteSnapshotDisabled(db ethdb.KeyValueWriter) {
+	if err := db.Put(snapshotDisabledKey, []byte("42")); err != nil {
+		log.Crit("Failed to store snapshot disabled flag", "err", err)
+	}
+}
+
+// DeleteSnapshotDisabled deletes the flag keeping the snapshot maintenance disabled.
+func DeleteSnapshotDisabled(db ethdb.KeyValueWriter) {
+	if err := db.Delete(snapshotDisabledKey); err != nil {
+		log.Crit("Failed to remove snapshot disabled flag", "err", err)
+	}
+}
+
 // ReadSnapshotRoot retrieves the root of the block whose state is contained in
 // the persisted snapshot.
 func ReadSnapshotRoot(db ethdb.KeyValueReader) common.Hash {
diff --git a/core/rawdb/database.go b/core/rawdb/database.go
index 94759eb98..3a0a26c61 100644
--- a/core/rawdb/database.go
+++ b/core/rawdb/database.go
@@ -371,9 +371,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
 			var accounted bool
 			for _, meta := range [][]byte{
 				databaseVersionKey, headHeaderKey, headBlockKey, headFastBlockKey, lastPivotKey,
-				fastTrieProgressKey, snapshotRootKey, snapshotJournalKey, snapshotGeneratorKey,
-				snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey,
-				badBlockKey,
+				fastTrieProgressKey, snapshotDisabledKey, snapshotRootKey, snapshotJournalKey,
+				snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
+				uncleanShutdownKey, badBlockKey,
 			} {
 				if bytes.Equal(key, meta) {
 					metadata.Add(size)
diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go
index 7a9738910..2505ce90b 100644
--- a/core/rawdb/schema.go
+++ b/core/rawdb/schema.go
@@ -45,6 +45,9 @@ var (
 	// fastTrieProgressKey tracks the number of trie entries imported during fast sync.
 	fastTrieProgressKey = []byte("TrieSync")
 
+	// snapshotDisabledKey flags that the snapshot should not be maintained due to initial sync.
+	snapshotDisabledKey = []byte("SnapshotDisabled")
+
 	// snapshotRootKey tracks the hash of the last snapshot.
 	snapshotRootKey = []byte("SnapshotRoot")
 
diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go
index 8992d3f91..7e29e51b2 100644
--- a/core/state/snapshot/generate.go
+++ b/core/state/snapshot/generate.go
@@ -141,16 +141,6 @@ func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) {
 	log.Info(msg, ctx...)
 }
 
-// ClearSnapshotMarker sets the snapshot marker to zero, meaning that snapshots
-// are not usable.
-func ClearSnapshotMarker(diskdb ethdb.KeyValueStore) {
-	batch := diskdb.NewBatch()
-	journalProgress(batch, []byte{}, nil)
-	if err := batch.Write(); err != nil {
-		log.Crit("Failed to write initialized state marker", "err", err)
-	}
-}
-
 // generateSnapshot regenerates a brand new snapshot based on an existing state
 // database and head block asynchronously. The snapshot is returned immediately
 // and generation is continued in the background until done.
diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go
index f8cec4d4e..5cfb9a9f2 100644
--- a/core/state/snapshot/journal.go
+++ b/core/state/snapshot/journal.go
@@ -126,12 +126,17 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
 }
 
 // loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
-func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, error) {
+func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) {
+	// If snapshotting is disabled (initial sync in progress), don't do anything,
+	// wait for the chain to permit us to do something meaningful
+	if rawdb.ReadSnapshotDisabled(diskdb) {
+		return nil, true, nil
+	}
 	// Retrieve the block number and hash of the snapshot, failing if no snapshot
 	// is present in the database (or crashed mid-update).
 	baseRoot := rawdb.ReadSnapshotRoot(diskdb)
 	if baseRoot == (common.Hash{}) {
-		return nil, errors.New("missing or corrupted snapshot")
+		return nil, false, errors.New("missing or corrupted snapshot")
 	}
 	base := &diskLayer{
 		diskdb: diskdb,
@@ -142,7 +147,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
 	snapshot, generator, err := loadAndParseJournal(diskdb, base)
 	if err != nil {
 		log.Warn("Failed to load new-format journal", "error", err)
-		return nil, err
+		return nil, false, err
 	}
 	// Entire snapshot journal loaded, sanity check the head. If the loaded
 	// snapshot is not matched with current state root, print a warning log
@@ -157,7 +162,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
 		// it's not in recovery mode, returns the error here for
 		// rebuilding the entire snapshot forcibly.
 		if !recovery {
-			return nil, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root)
+			return nil, false, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root)
 		}
 		// It's in snapshot recovery, the assumption is held that
 		// the disk layer is always higher than chain head. It can
@@ -187,7 +192,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
 			storage:  common.StorageSize(generator.Storage),
 		})
 	}
-	return snapshot, nil
+	return snapshot, false, nil
 }
 
 // loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new
diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go
index 9ecbd4a6c..cb8ec7a70 100644
--- a/core/state/snapshot/snapshot.go
+++ b/core/state/snapshot/snapshot.go
@@ -148,11 +148,11 @@ type snapshot interface {
 	StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
 }
 
-// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent
-// base layer backed by a key-value store, on top of which arbitrarily many in-
-// memory diff layers are topped. The memory diffs can form a tree with branching,
-// but the disk layer is singleton and common to all. If a reorg goes deeper than
-// the disk layer, everything needs to be deleted.
+// Tree is an Ethereum state snapshot tree. It consists of one persistent base
+// layer backed by a key-value store, on top of which arbitrarily many in-memory
+// diff layers are topped. The memory diffs can form a tree with branching, but
+// the disk layer is singleton and common to all. If a reorg goes deeper than the
+// disk layer, everything needs to be deleted.
 //
 // The goal of a state snapshot is twofold: to allow direct access to account and
 // storage data to avoid expensive multi-level trie lookups; and to allow sorted,
@@ -186,7 +186,11 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm
 		defer snap.waitBuild()
 	}
 	// Attempt to load a previously persisted snapshot and rebuild one if failed
-	head, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
+	head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
+	if disabled {
+		log.Warn("Snapshot maintenance disabled (syncing)")
+		return snap, nil
+	}
 	if err != nil {
 		if rebuild {
 			log.Warn("Failed to load snapshot, regenerating", "err", err)
@@ -224,6 +228,55 @@ func (t *Tree) waitBuild() {
 	}
 }
 
+// Disable interrupts any pending snapshot generator, deletes all the snapshot
+// layers in memory and marks snapshots disabled globally. In order to resume
+// the snapshot functionality, the caller must invoke Rebuild.
+func (t *Tree) Disable() {
+	// Interrupt any live snapshot layers
+	t.lock.Lock()
+	defer t.lock.Unlock()
+
+	for _, layer := range t.layers {
+		switch layer := layer.(type) {
+		case *diskLayer:
+			// If the base layer is generating, abort it
+			if layer.genAbort != nil {
+				abort := make(chan *generatorStats)
+				layer.genAbort <- abort
+				<-abort
+			}
+			// Layer should be inactive now, mark it as stale
+			layer.lock.Lock()
+			layer.stale = true
+			layer.lock.Unlock()
+
+		case *diffLayer:
+			// If the layer is a simple diff, simply mark as stale
+			layer.lock.Lock()
+			atomic.StoreUint32(&layer.stale, 1)
+			layer.lock.Unlock()
+
+		default:
+			panic(fmt.Sprintf("unknown layer type: %T", layer))
+		}
+	}
+	t.layers = map[common.Hash]snapshot{}
+
+	// Delete all snapshot liveness information from the database
+	batch := t.diskdb.NewBatch()
+
+	rawdb.WriteSnapshotDisabled(batch)
+	rawdb.DeleteSnapshotRoot(batch)
+	rawdb.DeleteSnapshotJournal(batch)
+	rawdb.DeleteSnapshotGenerator(batch)
+	rawdb.DeleteSnapshotRecoveryNumber(batch)
+	// Note, we don't delete the sync progress
+
+	if err := batch.Write(); err != nil {
+		log.Crit("Failed to disable snapshots", "err", err)
+	}
+}
+
 // Snapshot retrieves a snapshot belonging to the given block root, or nil if no
 // snapshot is maintained for that block.
 func (t *Tree) Snapshot(blockRoot common.Hash) Snapshot {
@@ -626,8 +679,9 @@ func (t *Tree) Rebuild(root common.Hash) {
 	defer t.lock.Unlock()
 
 	// Firstly delete any recovery flag in the database. Because now we are
-	// building a brand new snapshot.
+	// building a brand new snapshot. Also reenable the snapshot feature.
 	rawdb.DeleteSnapshotRecoveryNumber(t.diskdb)
+	rawdb.DeleteSnapshotDisabled(t.diskdb)
 
 	// Iterate over and mark all layers stale
 	for _, layer := range t.layers {
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index b8cb48914..6f59b29a5 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -28,6 +28,7 @@ import (
 	"github.com/ethereum/go-ethereum"
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/rawdb"
+	"github.com/ethereum/go-ethereum/core/state/snapshot"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/eth/protocols/eth"
 	"github.com/ethereum/go-ethereum/eth/protocols/snap"
@@ -214,6 +215,9 @@ type BlockChain interface {
 
 	// InsertReceiptChain inserts a batch of receipts into the local chain.
 	InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
+
+	// Snapshots returns the blockchain snapshot tree to paused it during sync.
+	Snapshots() *snapshot.Tree
 }
 
 // New creates a new downloader to fetch hashes and blocks from remote peers.
@@ -393,6 +397,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 	// but until snap becomes prevalent, we should support both. TODO(karalabe).
 	if mode == SnapSync {
 		if !d.snapSync {
+			// Snap sync uses the snapshot namespace to store potentially flakey data until
+			// sync completely heals and finishes. Pause snapshot maintenance in the mean
+			// time to prevent access.
+			if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests
+				snapshots.Disable()
+			}
 			log.Warn("Enabling snapshot sync prototype")
 			d.snapSync = true
 		}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 1140a444c..794160993 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -29,6 +29,7 @@ import (
 	"github.com/ethereum/go-ethereum"
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/rawdb"
+	"github.com/ethereum/go-ethereum/core/state/snapshot"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/eth/protocols/eth"
 	"github.com/ethereum/go-ethereum/ethdb"
@@ -409,6 +410,11 @@ func (dl *downloadTester) dropPeer(id string) {
 	dl.downloader.UnregisterPeer(id)
 }
 
+// Snapshots implements the BlockChain interface for the downloader, but is a noop.
+func (dl *downloadTester) Snapshots() *snapshot.Tree {
+	return nil
+}
+
 type downloadTesterPeer struct {
 	dl            *downloadTester
 	id            string
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index d9c0cb9b1..237314916 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -551,11 +551,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
 		log.Debug("Snapshot sync already completed")
 		return nil
 	}
-	// If sync is still not finished, we need to ensure that any marker is wiped.
-	// Otherwise, it may happen that requests for e.g. genesis-data is delivered
-	// from the snapshot data, instead of from the trie
-	snapshot.ClearSnapshotMarker(s.db)
-
 	defer func() { // Persist any progress, independent of failure
 		for _, task := range s.tasks {
 			s.forwardAccountTask(task)
-- 
GitLab