From f6be0b601e7ea2e9b53b215c48881b13243fd054 Mon Sep 17 00:00:00 2001
From: ledgerwatch <akhounov@gmail.com>
Date: Sun, 24 May 2020 15:48:15 +0100
Subject: [PATCH] Fix history generation and optimise via pre-sorting in the
 files (#572)

* Rewrite

* Fix linter, make counter

* Skip hash stage

* Fix linter

* Add common

* Fix wrong bucket

* Use temp files to generate index

* Fix hack, defer

* Add logging when creating files

* Properly pass datadir

* Bigger buffers, less logging

* Log current key, enforce batching

* Limit to 4m, print more

* Use ReadFull

* Optimised version of storage generation

* Don't print entire key

* Commit at the end

* Fix linter

* Remove a bit of copy-paste

* More copy-paste reduction

* Fixes

* Fix key length

* Not delete files too early

* Fix linter

* Fix logging

* Fix starting block

* Skip test

* Remove limit on the stage 4

* Fix test while disabling the hash check

* Fix tests

* Fix unreachable
---
 eth/backend.go                               |   2 +
 eth/downloader/downloader.go                 |   6 +
 eth/downloader/stagedsync_downloader.go      |   8 +-
 eth/downloader/stagedsync_stage_hashcheck.go |   5 +-
 eth/downloader/stagedsync_stage_indexes.go   | 366 ++++++++++++++++++-
 eth/handler.go                               |   9 +
 6 files changed, 372 insertions(+), 24 deletions(-)

diff --git a/eth/backend.go b/eth/backend.go
index 1600d0629e..058358ff46 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -273,6 +273,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
 		return nil, err
 	}
 
+	eth.protocolManager.SetDataDir(ctx.Config.DataDir)
+
 	if config.SyncMode != downloader.StagedSync {
 		eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
 		_ = eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 59a241c2fe..0cf03d61c2 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -157,6 +157,7 @@ type Downloader struct {
 	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
 	// generate history index, disable/enable pruning
 	history bool
+	datadir string
 }
 
 // LightChain encapsulates functions required to synchronise a light chain.
@@ -262,6 +263,11 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
 	return dl
 }
 
+// DataDir sets the directory where download is allowed to create temporary files
+func (d *Downloader) SetDataDir(datadir string) {
+	d.datadir = datadir
+}
+
 // Progress retrieves the synchronisation boundaries, specifically the origin
 // block where synchronisation started at (may have failed/suspended); the block
 // or header sync is currently at; and the latest known block which the sync targets.
diff --git a/eth/downloader/stagedsync_downloader.go b/eth/downloader/stagedsync_downloader.go
index 9484ba4bb6..b9db114da4 100644
--- a/eth/downloader/stagedsync_downloader.go
+++ b/eth/downloader/stagedsync_downloader.go
@@ -79,8 +79,8 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
 		return err
 	}
 
-	log.Info("Sync stage 3/6. Recovering senders from tx signatures... Complete!")
-	log.Info("Sync stage 4/6. Executing blocks w/o hash checks...")
+	log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!")
+	log.Info("Sync stage 4/7. Executing blocks w/o hash checks...")
 
 	/*
 	* Stage 4. Execute block bodies w/o calculating trie roots
@@ -106,7 +106,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
 
 	if d.history {
 		log.Info("Sync stage 6/7. Generating account history index")
-		err = spawnAccountHistoryIndex(d.stateDB, core.UsePlainStateExecution)
+		err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution)
 		if err != nil {
 			return err
 		}
@@ -117,7 +117,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers
 
 	if d.history {
 		log.Info("Sync stage 7/7. Generating storage history index")
-		err = spawnStorageHistoryIndex(d.stateDB, core.UsePlainStateExecution)
+		err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution)
 		if err != nil {
 			return err
 		}
diff --git a/eth/downloader/stagedsync_stage_hashcheck.go b/eth/downloader/stagedsync_stage_hashcheck.go
index bda2ad6d8c..80e29096fd 100644
--- a/eth/downloader/stagedsync_stage_hashcheck.go
+++ b/eth/downloader/stagedsync_stage_hashcheck.go
@@ -14,6 +14,10 @@ func (d *Downloader) spawnCheckFinalHashStage(syncHeadNumber uint64) error {
 		return err
 	}
 
+	//REMOVE THE FOLLOWING LINE WHEN PLAIN => HASHED TRANSFORMATION IS READY
+	if hashProgress == 0 {
+		return nil
+	}
 	if hashProgress == syncHeadNumber {
 		// we already did hash check for this block
 		// we don't do the obvious `if hashProgress > syncHeadNumber` to support reorgs more naturally
@@ -29,7 +33,6 @@ func (d *Downloader) spawnCheckFinalHashStage(syncHeadNumber uint64) error {
 	blockNr := syncHeadBlock.Header().Number.Uint64()
 
 	log.Info("Validating root hash", "block", blockNr, "blockRoot", syncHeadBlock.Root().Hex())
-
 	loader := trie.NewSubTrieLoader(blockNr)
 	rl := trie.NewRetainList(0)
 	subTries, err1 := loader.LoadFromFlatDB(euphemeralMutation, rl, [][]byte{nil}, []int{0}, false)
diff --git a/eth/downloader/stagedsync_stage_indexes.go b/eth/downloader/stagedsync_stage_indexes.go
index f1e03034c6..7b0412c97a 100644
--- a/eth/downloader/stagedsync_stage_indexes.go
+++ b/eth/downloader/stagedsync_stage_indexes.go
@@ -1,48 +1,376 @@
 package downloader
 
 import (
+	"bufio"
 	"bytes"
+	"container/heap"
+	"encoding/binary"
 	"fmt"
+	"github.com/ledgerwatch/turbo-geth/common"
 	"github.com/ledgerwatch/turbo-geth/common/changeset"
 	"github.com/ledgerwatch/turbo-geth/common/dbutils"
 	"github.com/ledgerwatch/turbo-geth/core"
 	"github.com/ledgerwatch/turbo-geth/ethdb"
 	"github.com/ledgerwatch/turbo-geth/log"
+	"io"
+	"io/ioutil"
+	"os"
+	"runtime"
+	"sort"
 )
 
-func spawnAccountHistoryIndex(db ethdb.Database, plainState bool) error {
-	lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex)
-	if err != nil {
+func fillChangeSetBuffer(db ethdb.Database, bucket []byte, blockNum uint64, changesets []byte, offsets []int, blockNums []uint64) (bool, uint64, []int, []uint64, error) {
+	offset := 0
+	offsets = offsets[:0]
+	blockNums = blockNums[:0]
+	startKey := dbutils.EncodeTimestamp(blockNum)
+	done := true
+	if err := db.Walk(bucket, startKey, 0, func(k, v []byte) (bool, error) {
+		blockNum, _ = dbutils.DecodeTimestamp(k)
+		if offset+len(v) > len(changesets) { // Adding the current changeset would overflow the buffer
+			done = false
+			return false, nil
+		}
+		copy(changesets[offset:], v)
+		offset += len(v)
+		offsets = append(offsets, offset)
+		blockNums = append(blockNums, blockNum)
+		return true, nil
+	}); err != nil {
+		return true, blockNum, offsets, blockNums, fmt.Errorf("walking over account changeset for block %d: %v", blockNum, err)
+	}
+	return done, blockNum, offsets, blockNums, nil
+}
+
+const emptyValBit uint64 = 0x8000000000000000
+
+// writeBufferMapToTempFile creates temp file in the datadir and writes bufferMap into it
+// if sucessful, returns the name of the created file. File is closed
+func writeBufferMapToTempFile(datadir string, pattern string, bufferMap map[string][]uint64) (string, error) {
+	var filename string
+	keys := make([]string, len(bufferMap))
+	i := 0
+	for key := range bufferMap {
+		keys[i] = key
+		i++
+	}
+	sort.Strings(keys)
+	var w *bufio.Writer
+	if bufferFile, err := ioutil.TempFile(datadir, pattern); err == nil {
+		//nolint:errcheck
+		defer bufferFile.Close()
+		filename = bufferFile.Name()
+		w = bufio.NewWriter(bufferFile)
+	} else {
+		return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err)
+	}
+	var nbytes [8]byte
+	for _, key := range keys {
+		if _, err := w.Write([]byte(key)); err != nil {
+			return filename, err
+		}
+		list := bufferMap[key]
+		binary.BigEndian.PutUint64(nbytes[:], uint64(len(list)))
+		if _, err := w.Write(nbytes[:]); err != nil {
+			return filename, err
+		}
+		for _, b := range list {
+			binary.BigEndian.PutUint64(nbytes[:], b)
+			if _, err := w.Write(nbytes[:]); err != nil {
+				return filename, err
+			}
+		}
+	}
+	if err := w.Flush(); err != nil {
+		return filename, fmt.Errorf("flushing file %s: %v", filename, err)
+	}
+	return filename, nil
+}
+
+type HeapElem struct {
+	key     []byte
+	timeIdx int
+}
+
+type Heap []HeapElem
+
+func (h Heap) Len() int {
+	return len(h)
+}
+
+func (h Heap) Less(i, j int) bool {
+	if c := bytes.Compare(h[i].key, h[j].key); c != 0 {
+		return c < 0
+	}
+	return h[i].timeIdx < h[j].timeIdx
+}
+
+func (h Heap) Swap(i, j int) {
+	h[i], h[j] = h[j], h[i]
+}
+
+func (h *Heap) Push(x interface{}) {
+	// Push and Pop use pointer receivers because they modify the slice's length,
+	// not just its contents.
+	*h = append(*h, x.(HeapElem))
+}
+
+func (h *Heap) Pop() interface{} {
+	old := *h
+	n := len(old)
+	x := old[n-1]
+	*h = old[0 : n-1]
+	return x
+}
+
+func mergeFilesIntoBucket(bufferFileNames []string, db ethdb.Database, bucket []byte, keyLength int) error {
+	var m runtime.MemStats
+	h := &Heap{}
+	heap.Init(h)
+	readers := make([]io.Reader, len(bufferFileNames))
+	for i, fileName := range bufferFileNames {
+		if f, err := os.Open(fileName); err == nil {
+			readers[i] = bufio.NewReader(f)
+			//nolint:errcheck
+			defer f.Close()
+		} else {
+			return err
+		}
+		// Read first key
+		keyBuf := make([]byte, keyLength)
+		if n, err := io.ReadFull(readers[i], keyBuf); err == nil && n == keyLength {
+			heap.Push(h, HeapElem{keyBuf, i})
+		} else {
+			return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err)
+		}
+	}
+	// By now, the heap has one element for each buffer file
+	batch := db.NewBatch()
+	var nbytes [8]byte
+	for h.Len() > 0 {
+		element := (heap.Pop(h)).(HeapElem)
+		reader := readers[element.timeIdx]
+		k := element.key
+		// Read number of items for this key
+		var count int
+		if n, err := io.ReadFull(reader, nbytes[:]); err == nil && n == 8 {
+			count = int(binary.BigEndian.Uint64(nbytes[:]))
+		} else {
+			return fmt.Errorf("reading from account buffer file: %d %v", n, err)
+		}
+		for i := 0; i < count; i++ {
+			var b uint64
+			if n, err := io.ReadFull(reader, nbytes[:]); err == nil && n == 8 {
+				b = binary.BigEndian.Uint64(nbytes[:])
+			} else {
+				return fmt.Errorf("reading from account buffer file: %d %v", n, err)
+			}
+			vzero := (b & emptyValBit) != 0
+			blockNr := b &^ emptyValBit
+			currentChunkKey := dbutils.IndexChunkKey(k, ^uint64(0))
+			indexBytes, err1 := batch.Get(bucket, currentChunkKey)
+			if err1 != nil && err1 != ethdb.ErrKeyNotFound {
+				return fmt.Errorf("find chunk failed: %w", err1)
+			}
+			var index dbutils.HistoryIndexBytes
+			if len(indexBytes) == 0 {
+				index = dbutils.NewHistoryIndex()
+			} else if dbutils.CheckNewIndexChunk(indexBytes, blockNr) {
+				// Chunk overflow, need to write the "old" current chunk under its key derived from the last element
+				index = dbutils.WrapHistoryIndex(indexBytes)
+				indexKey, err3 := index.Key(k)
+				if err3 != nil {
+					return err3
+				}
+				// Flush the old chunk
+				if err4 := batch.Put(bucket, indexKey, index); err4 != nil {
+					return err4
+				}
+				// Start a new chunk
+				index = dbutils.NewHistoryIndex()
+			} else {
+				index = dbutils.WrapHistoryIndex(indexBytes)
+			}
+			index = index.Append(blockNr, vzero)
+
+			if err := batch.Put(bucket, currentChunkKey, index); err != nil {
+				return err
+			}
+			batchSize := batch.BatchSize()
+			if batchSize > batch.IdealBatchSize() {
+				if _, err := batch.Commit(); err != nil {
+					return err
+				}
+				runtime.ReadMemStats(&m)
+				log.Info("Commited index batch", "bucket", string(bucket), "size", common.StorageSize(batchSize), "current key", fmt.Sprintf("%x...", k[:4]),
+					"alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC))
+			}
+		}
+		// Try to read the next key (reuse the element)
+		if n, err := io.ReadFull(reader, element.key); err == nil && n == keyLength {
+			heap.Push(h, element)
+		} else if err != io.EOF {
+			// If it is EOF, we simply do not return anything into the heap
+			return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.key[:n], err)
+		}
+	}
+	if _, err := batch.Commit(); err != nil {
 		return err
 	}
+	return nil
+}
+
+const changeSetBufSize = 256 * 1024 * 1024
+
+func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool) error {
 	if plainState {
 		log.Info("Skipped account index generation for plain state")
 		return nil
 	}
-	ig := core.NewIndexGenerator(db)
-	if err := ig.GenerateIndex(lastProcessedBlockNumber, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, walkerFactory(dbutils.AccountChangeSetBucket, plainState), func(innerDB ethdb.Database, blockNum uint64) error {
-		return SaveStageProgress(innerDB, AccountHistoryIndex, blockNum)
-	}); err != nil {
-		fmt.Println("AccountChangeSetBucket, err", err)
+	var blockNum uint64
+	if lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex); err == nil {
+		if lastProcessedBlockNumber > 0 {
+			blockNum = lastProcessedBlockNumber + 1
+		}
+	} else {
+		return fmt.Errorf("reading account history process: %v", err)
+	}
+	log.Info("Account history index generation started", "from", blockNum)
+	var m runtime.MemStats
+	var bufferFileNames []string
+	changesets := make([]byte, changeSetBufSize) // 256 Mb buffer
+	var offsets []int
+	var blockNums []uint64
+	var done = false
+	// In the first loop, we read all the changesets, create partial history indices, sort them, and
+	// write each batch into a file
+	for !done {
+		if newDone, newBlockNum, newOffsets, newBlockNums, err := fillChangeSetBuffer(db, dbutils.AccountChangeSetBucket, blockNum, changesets, offsets, blockNums); err == nil {
+			done = newDone
+			blockNum = newBlockNum
+			offsets = newOffsets
+			blockNums = newBlockNums
+		} else {
+			return err
+		}
+		if len(offsets) == 0 {
+			break
+		}
+		bufferMap := make(map[string][]uint64)
+		prevOffset := 0
+		for i, offset := range offsets {
+			blockNr := blockNums[i]
+			if err := changeset.AccountChangeSetBytes(changesets[prevOffset:offset]).Walk(func(k, v []byte) error {
+				sKey := string(k)
+				list := bufferMap[sKey]
+				b := blockNr
+				if len(v) == 0 {
+					b |= emptyValBit
+				}
+				list = append(list, b)
+				bufferMap[sKey] = list
+				return nil
+			}); err != nil {
+				return err
+			}
+			prevOffset = offset
+		}
+		if filename, err := writeBufferMapToTempFile(datadir, "account-history-indx-", bufferMap); err == nil {
+			defer func() {
+				//nolint:errcheck
+				os.Remove(filename)
+			}()
+			bufferFileNames = append(bufferFileNames, filename)
+			runtime.ReadMemStats(&m)
+			log.Info("Created a buffer file", "name", filename, "up to block", blockNum,
+				"alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC))
+		} else {
+			return err
+		}
+	}
+	if len(offsets) > 0 {
+		if err := mergeFilesIntoBucket(bufferFileNames, db, dbutils.AccountsHistoryBucket, common.HashLength); err != nil {
+			return err
+		}
+	}
+	if err := SaveStageProgress(db, AccountHistoryIndex, blockNum); err != nil {
 		return err
 	}
 	return nil
 }
 
-func spawnStorageHistoryIndex(db ethdb.Database, plainState bool) error {
-	lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex)
-	if err != nil {
-		return err
-	}
+func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool) error {
 	if plainState {
-		log.Info("Skipped storaged index generation for plain state")
+		log.Info("Skipped storage index generation for plain state")
 		return nil
 	}
-	ig := core.NewIndexGenerator(db)
-	if err := ig.GenerateIndex(lastProcessedBlockNumber, dbutils.StorageChangeSetBucket, dbutils.StorageHistoryBucket, walkerFactory(dbutils.StorageChangeSetBucket, plainState), func(innerDB ethdb.Database, blockNum uint64) error {
-		return SaveStageProgress(innerDB, StorageHistoryIndex, blockNum)
-	}); err != nil {
-		fmt.Println("StorageChangeSetBucket, err", err)
+	var blockNum uint64
+	if lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex); err == nil {
+		if lastProcessedBlockNumber > 0 {
+			blockNum = lastProcessedBlockNumber + 1
+		}
+	} else {
+		return fmt.Errorf("reading storage history process: %v", err)
+	}
+	log.Info("Storage history index generation started", "from", blockNum)
+	var m runtime.MemStats
+	var bufferFileNames []string
+	changesets := make([]byte, changeSetBufSize) // 256 Mb buffer
+	var offsets []int
+	var blockNums []uint64
+	var done = false
+	// In the first loop, we read all the changesets, create partial history indices, sort them, and
+	// write each batch into a file
+	for !done {
+		if newDone, newBlockNum, newOffsets, newBlockNums, err := fillChangeSetBuffer(db, dbutils.StorageChangeSetBucket, blockNum, changesets, offsets, blockNums); err == nil {
+			done = newDone
+			blockNum = newBlockNum
+			offsets = newOffsets
+			blockNums = newBlockNums
+		} else {
+			return err
+		}
+		if len(offsets) == 0 {
+			break
+		}
+		bufferMap := make(map[string][]uint64)
+		prevOffset := 0
+		for i, offset := range offsets {
+			blockNr := blockNums[i]
+			if err := changeset.StorageChangeSetBytes(changesets[prevOffset:offset]).Walk(func(k, v []byte) error {
+				sKey := string(k)
+				list := bufferMap[sKey]
+				b := blockNr
+				if len(v) == 0 {
+					b |= emptyValBit
+				}
+				list = append(list, b)
+				bufferMap[sKey] = list
+				return nil
+			}); err != nil {
+				return err
+			}
+			prevOffset = offset
+		}
+		if filename, err := writeBufferMapToTempFile(datadir, "storage-history-indx-", bufferMap); err == nil {
+			defer func() {
+				//nolint:errcheck
+				os.Remove(filename)
+			}()
+			bufferFileNames = append(bufferFileNames, filename)
+			runtime.ReadMemStats(&m)
+			log.Info("Created a buffer file", "name", filename, "up to block", blockNum,
+				"alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC))
+		} else {
+			return err
+		}
+	}
+	if len(offsets) > 0 {
+		if err := mergeFilesIntoBucket(bufferFileNames, db, dbutils.StorageHistoryBucket, 2*common.HashLength+common.IncarnationLength); err != nil {
+			return err
+		}
+	}
+	if err := SaveStageProgress(db, StorageHistoryIndex, blockNum); err != nil {
 		return err
 	}
 	return nil
diff --git a/eth/handler.go b/eth/handler.go
index 98bc7384d4..f6b87e9649 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -107,6 +107,7 @@ type ProtocolManager struct {
 	broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
 
 	mode downloader.SyncMode // Sync mode passed from the command line
+	datadir string
 }
 
 // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -162,6 +163,13 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
 	return manager, nil
 }
 
+func (manager *ProtocolManager) SetDataDir(datadir string) {
+	manager.datadir = datadir
+	if manager.downloader != nil {
+		manager.downloader.SetDataDir(datadir)
+	}
+}
+
 func initPm(manager *ProtocolManager, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) {
 	sm, err := GetStorageModeFromDB(chaindb)
 	if err != nil {
@@ -169,6 +177,7 @@ func initPm(manager *ProtocolManager, txpool txPool, engine consensus.Engine, bl
 	}
 	// Construct the different synchronisation mechanisms
 	manager.downloader = downloader.New(manager.checkpointNumber, chaindb, nil /*stateBloom */, manager.eventMux, blockchain, nil, manager.removePeer, sm.History)
+	manager.downloader.SetDataDir(manager.datadir)
 
 	// Construct the fetcher (short sync)
 	validator := func(header *types.Header) error {
-- 
GitLab