diff --git a/eth/backend.go b/eth/backend.go index 1600d0629e39f175d42055a2e14e4c845c7383a0..058358ff4688ceb31ab19955545e8cb350011909 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -273,6 +273,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { return nil, err } + eth.protocolManager.SetDataDir(ctx.Config.DataDir) + if config.SyncMode != downloader.StagedSync { eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) _ = eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 59a241c2fe2bd045c4e21685cf943d2f191e23d6..0cf03d61c27f397a36b36e106ceadd983a2fbc95 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -157,6 +157,7 @@ type Downloader struct { chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) // generate history index, disable/enable pruning history bool + datadir string } // LightChain encapsulates functions required to synchronise a light chain. @@ -262,6 +263,11 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, return dl } +// DataDir sets the directory where download is allowed to create temporary files +func (d *Downloader) SetDataDir(datadir string) { + d.datadir = datadir +} + // Progress retrieves the synchronisation boundaries, specifically the origin // block where synchronisation started at (may have failed/suspended); the block // or header sync is currently at; and the latest known block which the sync targets. diff --git a/eth/downloader/stagedsync_downloader.go b/eth/downloader/stagedsync_downloader.go index 9484ba4bb694f357b11f1faff440f40ffe22c0b3..b9db114da421864b451bcb698b480ef8fa2dd4e5 100644 --- a/eth/downloader/stagedsync_downloader.go +++ b/eth/downloader/stagedsync_downloader.go @@ -79,8 +79,8 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers return err } - log.Info("Sync stage 3/6. Recovering senders from tx signatures... Complete!") - log.Info("Sync stage 4/6. Executing blocks w/o hash checks...") + log.Info("Sync stage 3/7. Recovering senders from tx signatures... Complete!") + log.Info("Sync stage 4/7. Executing blocks w/o hash checks...") /* * Stage 4. Execute block bodies w/o calculating trie roots @@ -106,7 +106,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers if d.history { log.Info("Sync stage 6/7. Generating account history index") - err = spawnAccountHistoryIndex(d.stateDB, core.UsePlainStateExecution) + err = spawnAccountHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution) if err != nil { return err } @@ -117,7 +117,7 @@ func (d *Downloader) doStagedSyncWithFetchers(p *peerConnection, headersFetchers if d.history { log.Info("Sync stage 7/7. Generating storage history index") - err = spawnStorageHistoryIndex(d.stateDB, core.UsePlainStateExecution) + err = spawnStorageHistoryIndex(d.stateDB, d.datadir, core.UsePlainStateExecution) if err != nil { return err } diff --git a/eth/downloader/stagedsync_stage_hashcheck.go b/eth/downloader/stagedsync_stage_hashcheck.go index bda2ad6d8c5ff4ca38df73465eeac849ed684ff4..80e29096fde95a19b000f700d14f4d3043cfca12 100644 --- a/eth/downloader/stagedsync_stage_hashcheck.go +++ b/eth/downloader/stagedsync_stage_hashcheck.go @@ -14,6 +14,10 @@ func (d *Downloader) spawnCheckFinalHashStage(syncHeadNumber uint64) error { return err } + //REMOVE THE FOLLOWING LINE WHEN PLAIN => HASHED TRANSFORMATION IS READY + if hashProgress == 0 { + return nil + } if hashProgress == syncHeadNumber { // we already did hash check for this block // we don't do the obvious `if hashProgress > syncHeadNumber` to support reorgs more naturally @@ -29,7 +33,6 @@ func (d *Downloader) spawnCheckFinalHashStage(syncHeadNumber uint64) error { blockNr := syncHeadBlock.Header().Number.Uint64() log.Info("Validating root hash", "block", blockNr, "blockRoot", syncHeadBlock.Root().Hex()) - loader := trie.NewSubTrieLoader(blockNr) rl := trie.NewRetainList(0) subTries, err1 := loader.LoadFromFlatDB(euphemeralMutation, rl, [][]byte{nil}, []int{0}, false) diff --git a/eth/downloader/stagedsync_stage_indexes.go b/eth/downloader/stagedsync_stage_indexes.go index f1e03034c60a6dcc61de9ceebe455ae5e02787fa..7b0412c97a76a36924ae3343a416d0b47aeb247e 100644 --- a/eth/downloader/stagedsync_stage_indexes.go +++ b/eth/downloader/stagedsync_stage_indexes.go @@ -1,48 +1,376 @@ package downloader import ( + "bufio" "bytes" + "container/heap" + "encoding/binary" "fmt" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/changeset" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" + "io" + "io/ioutil" + "os" + "runtime" + "sort" ) -func spawnAccountHistoryIndex(db ethdb.Database, plainState bool) error { - lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex) - if err != nil { +func fillChangeSetBuffer(db ethdb.Database, bucket []byte, blockNum uint64, changesets []byte, offsets []int, blockNums []uint64) (bool, uint64, []int, []uint64, error) { + offset := 0 + offsets = offsets[:0] + blockNums = blockNums[:0] + startKey := dbutils.EncodeTimestamp(blockNum) + done := true + if err := db.Walk(bucket, startKey, 0, func(k, v []byte) (bool, error) { + blockNum, _ = dbutils.DecodeTimestamp(k) + if offset+len(v) > len(changesets) { // Adding the current changeset would overflow the buffer + done = false + return false, nil + } + copy(changesets[offset:], v) + offset += len(v) + offsets = append(offsets, offset) + blockNums = append(blockNums, blockNum) + return true, nil + }); err != nil { + return true, blockNum, offsets, blockNums, fmt.Errorf("walking over account changeset for block %d: %v", blockNum, err) + } + return done, blockNum, offsets, blockNums, nil +} + +const emptyValBit uint64 = 0x8000000000000000 + +// writeBufferMapToTempFile creates temp file in the datadir and writes bufferMap into it +// if sucessful, returns the name of the created file. File is closed +func writeBufferMapToTempFile(datadir string, pattern string, bufferMap map[string][]uint64) (string, error) { + var filename string + keys := make([]string, len(bufferMap)) + i := 0 + for key := range bufferMap { + keys[i] = key + i++ + } + sort.Strings(keys) + var w *bufio.Writer + if bufferFile, err := ioutil.TempFile(datadir, pattern); err == nil { + //nolint:errcheck + defer bufferFile.Close() + filename = bufferFile.Name() + w = bufio.NewWriter(bufferFile) + } else { + return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err) + } + var nbytes [8]byte + for _, key := range keys { + if _, err := w.Write([]byte(key)); err != nil { + return filename, err + } + list := bufferMap[key] + binary.BigEndian.PutUint64(nbytes[:], uint64(len(list))) + if _, err := w.Write(nbytes[:]); err != nil { + return filename, err + } + for _, b := range list { + binary.BigEndian.PutUint64(nbytes[:], b) + if _, err := w.Write(nbytes[:]); err != nil { + return filename, err + } + } + } + if err := w.Flush(); err != nil { + return filename, fmt.Errorf("flushing file %s: %v", filename, err) + } + return filename, nil +} + +type HeapElem struct { + key []byte + timeIdx int +} + +type Heap []HeapElem + +func (h Heap) Len() int { + return len(h) +} + +func (h Heap) Less(i, j int) bool { + if c := bytes.Compare(h[i].key, h[j].key); c != 0 { + return c < 0 + } + return h[i].timeIdx < h[j].timeIdx +} + +func (h Heap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *Heap) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(HeapElem)) +} + +func (h *Heap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func mergeFilesIntoBucket(bufferFileNames []string, db ethdb.Database, bucket []byte, keyLength int) error { + var m runtime.MemStats + h := &Heap{} + heap.Init(h) + readers := make([]io.Reader, len(bufferFileNames)) + for i, fileName := range bufferFileNames { + if f, err := os.Open(fileName); err == nil { + readers[i] = bufio.NewReader(f) + //nolint:errcheck + defer f.Close() + } else { + return err + } + // Read first key + keyBuf := make([]byte, keyLength) + if n, err := io.ReadFull(readers[i], keyBuf); err == nil && n == keyLength { + heap.Push(h, HeapElem{keyBuf, i}) + } else { + return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err) + } + } + // By now, the heap has one element for each buffer file + batch := db.NewBatch() + var nbytes [8]byte + for h.Len() > 0 { + element := (heap.Pop(h)).(HeapElem) + reader := readers[element.timeIdx] + k := element.key + // Read number of items for this key + var count int + if n, err := io.ReadFull(reader, nbytes[:]); err == nil && n == 8 { + count = int(binary.BigEndian.Uint64(nbytes[:])) + } else { + return fmt.Errorf("reading from account buffer file: %d %v", n, err) + } + for i := 0; i < count; i++ { + var b uint64 + if n, err := io.ReadFull(reader, nbytes[:]); err == nil && n == 8 { + b = binary.BigEndian.Uint64(nbytes[:]) + } else { + return fmt.Errorf("reading from account buffer file: %d %v", n, err) + } + vzero := (b & emptyValBit) != 0 + blockNr := b &^ emptyValBit + currentChunkKey := dbutils.IndexChunkKey(k, ^uint64(0)) + indexBytes, err1 := batch.Get(bucket, currentChunkKey) + if err1 != nil && err1 != ethdb.ErrKeyNotFound { + return fmt.Errorf("find chunk failed: %w", err1) + } + var index dbutils.HistoryIndexBytes + if len(indexBytes) == 0 { + index = dbutils.NewHistoryIndex() + } else if dbutils.CheckNewIndexChunk(indexBytes, blockNr) { + // Chunk overflow, need to write the "old" current chunk under its key derived from the last element + index = dbutils.WrapHistoryIndex(indexBytes) + indexKey, err3 := index.Key(k) + if err3 != nil { + return err3 + } + // Flush the old chunk + if err4 := batch.Put(bucket, indexKey, index); err4 != nil { + return err4 + } + // Start a new chunk + index = dbutils.NewHistoryIndex() + } else { + index = dbutils.WrapHistoryIndex(indexBytes) + } + index = index.Append(blockNr, vzero) + + if err := batch.Put(bucket, currentChunkKey, index); err != nil { + return err + } + batchSize := batch.BatchSize() + if batchSize > batch.IdealBatchSize() { + if _, err := batch.Commit(); err != nil { + return err + } + runtime.ReadMemStats(&m) + log.Info("Commited index batch", "bucket", string(bucket), "size", common.StorageSize(batchSize), "current key", fmt.Sprintf("%x...", k[:4]), + "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) + } + } + // Try to read the next key (reuse the element) + if n, err := io.ReadFull(reader, element.key); err == nil && n == keyLength { + heap.Push(h, element) + } else if err != io.EOF { + // If it is EOF, we simply do not return anything into the heap + return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.key[:n], err) + } + } + if _, err := batch.Commit(); err != nil { return err } + return nil +} + +const changeSetBufSize = 256 * 1024 * 1024 + +func spawnAccountHistoryIndex(db ethdb.Database, datadir string, plainState bool) error { if plainState { log.Info("Skipped account index generation for plain state") return nil } - ig := core.NewIndexGenerator(db) - if err := ig.GenerateIndex(lastProcessedBlockNumber, dbutils.AccountChangeSetBucket, dbutils.AccountsHistoryBucket, walkerFactory(dbutils.AccountChangeSetBucket, plainState), func(innerDB ethdb.Database, blockNum uint64) error { - return SaveStageProgress(innerDB, AccountHistoryIndex, blockNum) - }); err != nil { - fmt.Println("AccountChangeSetBucket, err", err) + var blockNum uint64 + if lastProcessedBlockNumber, err := GetStageProgress(db, AccountHistoryIndex); err == nil { + if lastProcessedBlockNumber > 0 { + blockNum = lastProcessedBlockNumber + 1 + } + } else { + return fmt.Errorf("reading account history process: %v", err) + } + log.Info("Account history index generation started", "from", blockNum) + var m runtime.MemStats + var bufferFileNames []string + changesets := make([]byte, changeSetBufSize) // 256 Mb buffer + var offsets []int + var blockNums []uint64 + var done = false + // In the first loop, we read all the changesets, create partial history indices, sort them, and + // write each batch into a file + for !done { + if newDone, newBlockNum, newOffsets, newBlockNums, err := fillChangeSetBuffer(db, dbutils.AccountChangeSetBucket, blockNum, changesets, offsets, blockNums); err == nil { + done = newDone + blockNum = newBlockNum + offsets = newOffsets + blockNums = newBlockNums + } else { + return err + } + if len(offsets) == 0 { + break + } + bufferMap := make(map[string][]uint64) + prevOffset := 0 + for i, offset := range offsets { + blockNr := blockNums[i] + if err := changeset.AccountChangeSetBytes(changesets[prevOffset:offset]).Walk(func(k, v []byte) error { + sKey := string(k) + list := bufferMap[sKey] + b := blockNr + if len(v) == 0 { + b |= emptyValBit + } + list = append(list, b) + bufferMap[sKey] = list + return nil + }); err != nil { + return err + } + prevOffset = offset + } + if filename, err := writeBufferMapToTempFile(datadir, "account-history-indx-", bufferMap); err == nil { + defer func() { + //nolint:errcheck + os.Remove(filename) + }() + bufferFileNames = append(bufferFileNames, filename) + runtime.ReadMemStats(&m) + log.Info("Created a buffer file", "name", filename, "up to block", blockNum, + "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) + } else { + return err + } + } + if len(offsets) > 0 { + if err := mergeFilesIntoBucket(bufferFileNames, db, dbutils.AccountsHistoryBucket, common.HashLength); err != nil { + return err + } + } + if err := SaveStageProgress(db, AccountHistoryIndex, blockNum); err != nil { return err } return nil } -func spawnStorageHistoryIndex(db ethdb.Database, plainState bool) error { - lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex) - if err != nil { - return err - } +func spawnStorageHistoryIndex(db ethdb.Database, datadir string, plainState bool) error { if plainState { - log.Info("Skipped storaged index generation for plain state") + log.Info("Skipped storage index generation for plain state") return nil } - ig := core.NewIndexGenerator(db) - if err := ig.GenerateIndex(lastProcessedBlockNumber, dbutils.StorageChangeSetBucket, dbutils.StorageHistoryBucket, walkerFactory(dbutils.StorageChangeSetBucket, plainState), func(innerDB ethdb.Database, blockNum uint64) error { - return SaveStageProgress(innerDB, StorageHistoryIndex, blockNum) - }); err != nil { - fmt.Println("StorageChangeSetBucket, err", err) + var blockNum uint64 + if lastProcessedBlockNumber, err := GetStageProgress(db, StorageHistoryIndex); err == nil { + if lastProcessedBlockNumber > 0 { + blockNum = lastProcessedBlockNumber + 1 + } + } else { + return fmt.Errorf("reading storage history process: %v", err) + } + log.Info("Storage history index generation started", "from", blockNum) + var m runtime.MemStats + var bufferFileNames []string + changesets := make([]byte, changeSetBufSize) // 256 Mb buffer + var offsets []int + var blockNums []uint64 + var done = false + // In the first loop, we read all the changesets, create partial history indices, sort them, and + // write each batch into a file + for !done { + if newDone, newBlockNum, newOffsets, newBlockNums, err := fillChangeSetBuffer(db, dbutils.StorageChangeSetBucket, blockNum, changesets, offsets, blockNums); err == nil { + done = newDone + blockNum = newBlockNum + offsets = newOffsets + blockNums = newBlockNums + } else { + return err + } + if len(offsets) == 0 { + break + } + bufferMap := make(map[string][]uint64) + prevOffset := 0 + for i, offset := range offsets { + blockNr := blockNums[i] + if err := changeset.StorageChangeSetBytes(changesets[prevOffset:offset]).Walk(func(k, v []byte) error { + sKey := string(k) + list := bufferMap[sKey] + b := blockNr + if len(v) == 0 { + b |= emptyValBit + } + list = append(list, b) + bufferMap[sKey] = list + return nil + }); err != nil { + return err + } + prevOffset = offset + } + if filename, err := writeBufferMapToTempFile(datadir, "storage-history-indx-", bufferMap); err == nil { + defer func() { + //nolint:errcheck + os.Remove(filename) + }() + bufferFileNames = append(bufferFileNames, filename) + runtime.ReadMemStats(&m) + log.Info("Created a buffer file", "name", filename, "up to block", blockNum, + "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) + } else { + return err + } + } + if len(offsets) > 0 { + if err := mergeFilesIntoBucket(bufferFileNames, db, dbutils.StorageHistoryBucket, 2*common.HashLength+common.IncarnationLength); err != nil { + return err + } + } + if err := SaveStageProgress(db, StorageHistoryIndex, blockNum); err != nil { return err } return nil diff --git a/eth/handler.go b/eth/handler.go index 98bc7384d4c6a1838b055141d03091a5b285e6e9..f6b87e964954986b8db43d0afcc27fc8770fa09b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -107,6 +107,7 @@ type ProtocolManager struct { broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation mode downloader.SyncMode // Sync mode passed from the command line + datadir string } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -162,6 +163,13 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh return manager, nil } +func (manager *ProtocolManager) SetDataDir(datadir string) { + manager.datadir = datadir + if manager.downloader != nil { + manager.downloader.SetDataDir(datadir) + } +} + func initPm(manager *ProtocolManager, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) { sm, err := GetStorageModeFromDB(chaindb) if err != nil { @@ -169,6 +177,7 @@ func initPm(manager *ProtocolManager, txpool txPool, engine consensus.Engine, bl } // Construct the different synchronisation mechanisms manager.downloader = downloader.New(manager.checkpointNumber, chaindb, nil /*stateBloom */, manager.eventMux, blockchain, nil, manager.removePeer, sm.History) + manager.downloader.SetDataDir(manager.datadir) // Construct the fetcher (short sync) validator := func(header *types.Header) error {