From 96750b371145e12609a5f6986cec99aac3c76a22 Mon Sep 17 00:00:00 2001
From: Igor Mandrigin <mandrigin@users.noreply.github.com>
Date: Sat, 13 Jun 2020 18:03:38 +0300
Subject: [PATCH] Support ETL interruptions while promoting plain state (#662)
* lil etl changez
* fix test compile
* fixups to the unwinds
* add new buffer mode
* support unfinished transitions
* fixups
* fix tests
* linters
* linters
---
common/etl/buffers.go | 75 +++-
common/etl/collector.go | 19 +-
common/etl/etl.go | 7 +-
common/etl/etl_test.go | 15 +-
eth/stagedsync/stage_hashstate.go | 525 +++++++++----------------
eth/stagedsync/stage_hashstate_test.go | 10 +-
6 files changed, 287 insertions(+), 364 deletions(-)
diff --git a/common/etl/buffers.go b/common/etl/buffers.go
index 013a4fa30b..6f353d7c8d 100644
--- a/common/etl/buffers.go
+++ b/common/etl/buffers.go
@@ -4,13 +4,18 @@ import (
"bytes"
"sort"
"strconv"
+
+ "github.com/ledgerwatch/turbo-geth/common"
)
const (
//SliceBuffer - just simple slice w
SortableSliceBuffer = iota
- //SortableAppendBuffer - just simple slice w
+ //SortableAppendBuffer - map[k] [v1 v2 v3]
SortableAppendBuffer
+ // SortableOldestAppearedBuffer - buffer that keeps only the oldest entries.
+ // if first v1 was added under key K, then v2; only v1 will stay
+ SortableOldestAppearedBuffer
BufferOptimalSize = 256 * 1024 * 1024 /* 256 mb | var because we want to sometimes change it from tests */
)
@@ -155,12 +160,80 @@ func (b *appendSortableBuffer) CheckFlushSize() bool {
return b.size >= b.optimalSize
}
+func NewOldestEntryBuffer(bufferOptimalSize int) *oldestEntrySortableBuffer {
+ return &oldestEntrySortableBuffer{
+ entries: make(map[string][]byte),
+ size: 0,
+ optimalSize: bufferOptimalSize,
+ }
+}
+
+type oldestEntrySortableBuffer struct {
+ entries map[string][]byte
+ size int
+ optimalSize int
+ sortedBuf []sortableBufferEntry
+}
+
+func (b *oldestEntrySortableBuffer) Put(k, v []byte) {
+ _, ok := b.entries[string(k)]
+ if ok {
+ // if we already had this entry, we are going to keep it and ignore new value
+ return
+ }
+
+ b.size += len(k)
+ b.size += len(v)
+ b.entries[string(k)] = common.CopyBytes(v)
+}
+
+func (b *oldestEntrySortableBuffer) Size() int {
+ return b.size
+}
+
+func (b *oldestEntrySortableBuffer) Len() int {
+ return len(b.entries)
+}
+func (b *oldestEntrySortableBuffer) Sort() {
+ for i := range b.entries {
+ b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(i), value: b.entries[i]})
+ }
+ sort.Sort(b)
+}
+
+func (b *oldestEntrySortableBuffer) Less(i, j int) bool {
+ return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0
+}
+
+func (b *oldestEntrySortableBuffer) Swap(i, j int) {
+ b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i]
+}
+
+func (b *oldestEntrySortableBuffer) Get(i int) sortableBufferEntry {
+ return b.sortedBuf[i]
+}
+func (b *oldestEntrySortableBuffer) Reset() {
+ b.sortedBuf = b.sortedBuf[:0]
+ b.entries = make(map[string][]byte)
+ b.size = 0
+}
+
+func (b *oldestEntrySortableBuffer) GetEntries() []sortableBufferEntry {
+ return b.sortedBuf
+}
+
+func (b *oldestEntrySortableBuffer) CheckFlushSize() bool {
+ return b.size >= b.optimalSize
+}
+
func getBufferByType(tp int, size int) Buffer {
switch tp {
case SortableSliceBuffer:
return NewSortableBuffer(size)
case SortableAppendBuffer:
return NewAppendBuffer(size)
+ case SortableOldestAppearedBuffer:
+ return NewOldestEntryBuffer(size)
default:
panic("unknown buffer type " + strconv.Itoa(tp))
}
diff --git a/common/etl/collector.go b/common/etl/collector.go
index 5088c840d1..df9e59612a 100644
--- a/common/etl/collector.go
+++ b/common/etl/collector.go
@@ -4,12 +4,13 @@ import (
"bytes"
"container/heap"
"fmt"
+ "io"
+ "runtime"
+
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ugorji/go/codec"
- "io"
- "runtime"
)
type LoadNextFunc func(k []byte, v []byte) error
@@ -112,12 +113,14 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, providers []dataProvi
}
batchSize := batch.BatchSize()
if batchSize > batch.IdealBatchSize() || args.loadBatchSize > 0 && batchSize > args.loadBatchSize {
+ if args.OnLoadCommit != nil {
+ if err := args.OnLoadCommit(batch, k, false); err != nil {
+ return err
+ }
+ }
if _, err := batch.Commit(); err != nil {
return err
}
- if args.OnLoadCommit != nil {
- args.OnLoadCommit(k, false)
- }
var currentKeyStr string
if len(k) < 4 {
currentKeyStr = fmt.Sprintf("%x", k)
@@ -152,9 +155,11 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, providers []dataProvi
return fmt.Errorf("error while reading next element from disk: %v", err)
}
}
- _, err := batch.Commit()
if args.OnLoadCommit != nil {
- args.OnLoadCommit([]byte{}, true)
+ if err := args.OnLoadCommit(batch, []byte{}, true); err != nil {
+ return err
+ }
}
+ _, err := batch.Commit()
return err
}
diff --git a/common/etl/etl.go b/common/etl/etl.go
index 9358a4b645..e611fde6e4 100644
--- a/common/etl/etl.go
+++ b/common/etl/etl.go
@@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
+ "io"
+ "time"
+
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ugorji/go/codec"
"golang.org/x/sync/errgroup"
- "io"
- "time"
)
var (
@@ -54,7 +55,7 @@ func NextKey(key []byte) ([]byte, error) {
// loaded from files into a DB
// * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey)
// * `isDone`: true, if everything is processed
-type LoadCommitHandler func(key []byte, isDone bool)
+type LoadCommitHandler func(ethdb.Putter, []byte, bool) error
type TransformArgs struct {
ExtractStartKey []byte
diff --git a/common/etl/etl_test.go b/common/etl/etl_test.go
index d36bd7eec8..16994cd218 100644
--- a/common/etl/etl_test.go
+++ b/common/etl/etl_test.go
@@ -3,15 +3,16 @@ package etl
import (
"bytes"
"fmt"
+ "io"
+ "os"
+ "strings"
+ "testing"
+
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/stretchr/testify/assert"
"github.com/ugorji/go/codec"
- "io"
- "os"
- "strings"
- "testing"
)
func TestWriteAndReadBufferEntry(t *testing.T) {
@@ -164,11 +165,12 @@ func TestTransformOnLoadCommitCustomBatchSize(t *testing.T) {
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{
- OnLoadCommit: func(_ []byte, isDone bool) {
+ OnLoadCommit: func(_ ethdb.Putter, _ []byte, isDone bool) error {
numberOfCalls++
if isDone {
finalized = true
}
+ return nil
},
loadBatchSize: 1,
},
@@ -198,11 +200,12 @@ func TestTransformOnLoadCommitDefaultBatchSize(t *testing.T) {
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{
- OnLoadCommit: func(_ []byte, isDone bool) {
+ OnLoadCommit: func(_ ethdb.Putter, _ []byte, isDone bool) error {
numberOfCalls++
if isDone {
finalized = true
}
+ return nil
},
},
)
diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go
index 790ff9cfae..035476aaf8 100644
--- a/eth/stagedsync/stage_hashstate.go
+++ b/eth/stagedsync/stage_hashstate.go
@@ -1,15 +1,9 @@
package stagedsync
import (
- "bufio"
- "bytes"
- "container/heap"
+ "errors"
"fmt"
- "io"
- "io/ioutil"
"os"
- "runtime"
- "sort"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/changeset"
@@ -21,7 +15,6 @@ import (
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/trie"
- "github.com/pkg/errors"
"github.com/ugorji/go/codec"
)
@@ -44,7 +37,7 @@ func SpawnHashStateStage(s *StageState, stateDB ethdb.Database, datadir string,
if core.UsePlainStateExecution {
log.Info("Promoting plain state", "from", hashProgress, "to", syncHeadNumber)
- err := promoteHashedState(stateDB, hashProgress, syncHeadNumber, datadir, quit)
+ err := promoteHashedState(s, stateDB, hashProgress, syncHeadNumber, datadir, quit)
if err != nil {
return err
}
@@ -63,7 +56,7 @@ func verifyRootHash(stateDB ethdb.Database, syncHeadNumber uint64) error {
rl := trie.NewRetainList(0)
subTries, err1 := loader.LoadFromFlatDB(stateDB, rl, nil /*HashCollector*/, [][]byte{nil}, []int{0}, false)
if err1 != nil {
- return errors.Wrap(err1, "checking root hash failed")
+ return fmt.Errorf("checking root hash failed (err=%v)", err1)
}
if len(subTries.Hashes) != 1 {
return fmt.Errorf("expected 1 hash, got %d", len(subTries.Hashes))
@@ -92,38 +85,76 @@ func unwindHashStateStageImpl(u *UnwindState, s *StageState, stateDB ethdb.Datab
// and recomputes the state root from scratch
prom := NewPromoter(stateDB, quit)
prom.TempDir = datadir
- if err := prom.Unwind(s.BlockNumber, u.UnwindPoint, dbutils.PlainAccountChangeSetBucket); err != nil {
+ if err := prom.Unwind(s, u, dbutils.PlainAccountChangeSetBucket, 0x00); err != nil {
return err
}
- if err := prom.Unwind(s.BlockNumber, u.UnwindPoint, dbutils.PlainStorageChangeSetBucket); err != nil {
+ if err := prom.Unwind(s, u, dbutils.PlainStorageChangeSetBucket, 0x01); err != nil {
return err
}
return nil
}
-func promoteHashedState(db ethdb.Database, from, to uint64, datadir string, quit chan struct{}) error {
+func promoteHashedState(s *StageState, db ethdb.Database, from, to uint64, datadir string, quit chan struct{}) error {
if from == 0 {
- return promoteHashedStateCleanly(db, datadir, quit)
+ return promoteHashedStateCleanly(s, db, to, datadir, quit)
}
- return promoteHashedStateIncrementally(from, to, db, datadir, quit)
+ return promoteHashedStateIncrementally(s, from, to, db, datadir, quit)
}
-func promoteHashedStateCleanly(db ethdb.Database, datadir string, quit chan struct{}) error {
- if err := common.Stopped(quit); err != nil {
+func promoteHashedStateCleanly(s *StageState, db ethdb.Database, to uint64, datadir string, quit chan struct{}) error {
+ var err error
+ if err = common.Stopped(quit); err != nil {
return err
}
- err := etl.Transform(
- db,
- dbutils.PlainStateBucket,
- dbutils.CurrentStateBucket,
- datadir,
- keyTransformExtractFunc(transformPlainStateKey),
- etl.IdentityLoadFunc,
- etl.TransformArgs{Quit: quit},
- )
+ var loadStartKey []byte
+ skipCurrentState := false
+ if len(s.StageData) == 1 && s.StageData[0] == byte(0xFF) {
+ skipCurrentState = true
+ } else if len(s.StageData) > 0 {
+ loadStartKey, err = etl.NextKey(s.StageData[1:])
+ if err != nil {
+ return err
+ }
+ }
- if err != nil {
- return err
+ if !skipCurrentState {
+ toStateStageData := func(k []byte) []byte {
+ return append([]byte{0xFF}, k...)
+ }
+
+ err = etl.Transform(
+ db,
+ dbutils.PlainStateBucket,
+ dbutils.CurrentStateBucket,
+ datadir,
+ keyTransformExtractFunc(transformPlainStateKey),
+ etl.IdentityLoadFunc,
+ etl.TransformArgs{
+ Quit: quit,
+ LoadStartKey: loadStartKey,
+ OnLoadCommit: func(batch ethdb.Putter, key []byte, isDone bool) error {
+ if isDone {
+ return s.UpdateWithStageData(batch, s.BlockNumber, toStateStageData(nil))
+ }
+ return s.UpdateWithStageData(batch, s.BlockNumber, toStateStageData(key))
+ },
+ },
+ )
+
+ if err != nil {
+ return err
+ }
+ }
+
+ toCodeStageData := func(k []byte) []byte {
+ return append([]byte{0xCD}, k...)
+ }
+
+ if len(s.StageData) > 0 && s.StageData[0] == byte(0xCD) {
+ loadStartKey, err = etl.NextKey(s.StageData[1:])
+ if err != nil {
+ return err
+ }
}
return etl.Transform(
@@ -133,7 +164,16 @@ func promoteHashedStateCleanly(db ethdb.Database, datadir string, quit chan stru
datadir,
keyTransformExtractFunc(transformContractCodeKey),
etl.IdentityLoadFunc,
- etl.TransformArgs{Quit: quit},
+ etl.TransformArgs{
+ Quit: quit,
+ LoadStartKey: loadStartKey,
+ OnLoadCommit: func(batch ethdb.Putter, key []byte, isDone bool) error {
+ if isDone {
+ return s.UpdateWithStageData(batch, to, nil)
+ }
+ return s.UpdateWithStageData(batch, s.BlockNumber, toCodeStageData(key))
+ },
+ },
)
}
@@ -245,353 +285,154 @@ var promoterMapper = map[string]struct {
},
}
-func (p *Promoter) fillChangeSetBuffer(bucket []byte, blockNum, to uint64, changesets []byte, offsets []int) (bool, uint64, []int, error) {
- offset := 0
- offsets = offsets[:0]
- startKey := dbutils.EncodeTimestamp(blockNum)
- done := true
- if err := p.db.Walk(bucket, startKey, 0, func(k, v []byte) (bool, error) {
- if err := common.Stopped(p.quitCh); err != nil {
- return false, err
- }
- blockNum, _ = dbutils.DecodeTimestamp(k)
- if blockNum > to {
- return false, nil
- }
- if offset+len(v) > len(changesets) { // Adding the current changeset would overflow the buffer
- done = false
- return false, nil
- }
- copy(changesets[offset:], v)
- offset += len(v)
- offsets = append(offsets, offset)
- return true, nil
- }); err != nil {
- return true, blockNum, offsets, fmt.Errorf("walking over account changeset for block %d: %v", blockNum, err)
- }
- return done, blockNum, offsets, nil
-}
-
-// writeBufferMapToTempFile creates temp file in the datadir and writes bufferMap into it
-// if sucessful, returns the name of the created file. File is closed
-func (p *Promoter) writeBufferMapToTempFile(pattern string, bufferMap map[string]struct{}) (string, error) {
- var filename string
- keys := make([]string, len(bufferMap))
- i := 0
- for key := range bufferMap {
- keys[i] = key
- i++
- }
- sort.Strings(keys)
- var w *bufio.Writer
- if bufferFile, err := ioutil.TempFile(p.TempDir, pattern); err == nil {
- //nolint:errcheck
- defer bufferFile.Close()
- filename = bufferFile.Name()
- w = bufio.NewWriter(bufferFile)
- } else {
- return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err)
- }
- for _, key := range keys {
- if _, err := w.Write([]byte(key)); err != nil {
- return filename, err
+func getExtractFunc(changeSetBucket []byte) etl.ExtractFunc {
+ mapping, ok := promoterMapper[string(changeSetBucket)]
+ return func(_, changesetBytes []byte, next etl.ExtractNextFunc) error {
+ if !ok {
+ return fmt.Errorf("unknown bucket type: %s", changeSetBucket)
}
+ return mapping.WalkerAdapter(changesetBytes).Walk(func(k, v []byte) error {
+ return next(k, k, nil)
+ })
}
- if err := w.Flush(); err != nil {
- return filename, fmt.Errorf("flushing file %s: %v", filename, err)
- }
- return filename, nil
}
-func (p *Promoter) writeUnwindBufferMapToTempFile(pattern string, bufferMap map[string][]byte) (string, error) {
- var filename string
- keys := make([]string, len(bufferMap))
- i := 0
- for key := range bufferMap {
- keys[i] = key
- i++
- }
- sort.Strings(keys)
- var w *bufio.Writer
- if bufferFile, err := ioutil.TempFile(p.TempDir, pattern); err == nil {
- //nolint:errcheck
- defer bufferFile.Close()
- filename = bufferFile.Name()
- w = bufio.NewWriter(bufferFile)
- } else {
- return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err)
- }
- for _, key := range keys {
- if _, err := w.Write([]byte(key)); err != nil {
- return filename, err
- }
- value := bufferMap[key]
- if err := w.WriteByte(byte(len(value))); err != nil {
- return filename, err
- }
- if _, err := w.Write(value); err != nil {
- return filename, err
+func getUnwindExtractFunc(changeSetBucket []byte) etl.ExtractFunc {
+ mapping, ok := promoterMapper[string(changeSetBucket)]
+ return func(_, changesetBytes []byte, next etl.ExtractNextFunc) error {
+ if !ok {
+ return fmt.Errorf("unknown bucket type: %s", changeSetBucket)
}
+ return mapping.WalkerAdapter(changesetBytes).Walk(func(k, v []byte) error {
+ return next(k, k, v)
+ })
}
- if err := w.Flush(); err != nil {
- return filename, fmt.Errorf("flushing file %s: %v", filename, err)
- }
- return filename, nil
}
-func (p *Promoter) mergeFilesAndCollect(bufferFileNames []string, keyLength int, collector *etl.Collector) error {
- h := &etl.Heap{}
- heap.Init(h)
- readers := make([]io.Reader, len(bufferFileNames))
- for i, fileName := range bufferFileNames {
- if f, err := os.Open(fileName); err == nil {
- readers[i] = bufio.NewReader(f)
- //nolint:errcheck
- defer f.Close()
- } else {
- return err
- }
- // Read first key
- keyBuf := make([]byte, keyLength)
- if n, err := io.ReadFull(readers[i], keyBuf); err == nil && n == keyLength {
- heap.Push(h, etl.HeapElem{keyBuf, i, nil})
- } else {
- return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err)
- }
- }
- // By now, the heap has one element for each buffer file
- var prevKey []byte
- for h.Len() > 0 {
- if err := common.Stopped(p.quitCh); err != nil {
- return err
- }
- element := (heap.Pop(h)).(etl.HeapElem)
- if !bytes.Equal(element.Key, prevKey) {
- // Ignore all the repeating keys
- prevKey = common.CopyBytes(element.Key)
- if v, err := p.db.Get(dbutils.PlainStateBucket, element.Key); err == nil || err == ethdb.ErrKeyNotFound {
- if err1 := collector.Collect(element.Key, v); err1 != nil {
- return err1
- }
- } else {
- return err
- }
- }
- reader := readers[element.TimeIdx]
- // Try to read the next key (reuse the element)
- if n, err := io.ReadFull(reader, element.Key); err == nil && n == keyLength {
- heap.Push(h, element)
- } else if err != io.EOF {
- // If it is EOF, we simply do not return anything into the heap
- return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.Key[:n], err)
- }
- }
- return nil
-}
-
-func (p *Promoter) mergeUnwindFilesAndCollect(bufferFileNames []string, keyLength int, collector *etl.Collector) error {
- h := &etl.Heap{}
- heap.Init(h)
- readers := make([]io.Reader, len(bufferFileNames))
- for i, fileName := range bufferFileNames {
- if f, err := os.Open(fileName); err == nil {
- readers[i] = bufio.NewReader(f)
- //nolint:errcheck
- defer f.Close()
- } else {
- return err
- }
- // Read first key
- keyBuf := make([]byte, keyLength)
- if n, err := io.ReadFull(readers[i], keyBuf); err != nil || n != keyLength {
- return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err)
- }
- var l [1]byte
- if n, err := io.ReadFull(readers[i], l[:]); err != nil || n != 1 {
- return fmt.Errorf("init reading from account buffer file: %d %v", n, err)
- }
- var valBuf []byte
- valLength := int(l[0])
- if valLength > 0 {
- valBuf = make([]byte, valLength)
- if n, err := io.ReadFull(readers[i], valBuf); err != nil || n != valLength {
- return fmt.Errorf("init reading from account buffer file: %d %v", n, err)
- }
- }
- heap.Push(h, etl.HeapElem{keyBuf, i, valBuf})
- }
- // By now, the heap has one element for each buffer file
- var prevKey []byte
- for h.Len() > 0 {
- if err := common.Stopped(p.quitCh); err != nil {
- return err
- }
- element := (heap.Pop(h)).(etl.HeapElem)
- if !bytes.Equal(element.Key, prevKey) {
- // Ignore all the repeating keys, and take the earlist
- prevKey = common.CopyBytes(element.Key)
- if err := collector.Collect(element.Key, element.Value); err != nil {
- return err
- }
- }
- reader := readers[element.TimeIdx]
- // Try to read the next key (reuse the element)
- if n, err := io.ReadFull(reader, element.Key); err == nil && n == keyLength {
- var l [1]byte
- if n1, err1 := io.ReadFull(reader, l[:]); err1 != nil || n1 != 1 {
- return fmt.Errorf("reading from account buffer file: %d %v", n1, err1)
- }
- var valBuf []byte
- valLength := int(l[0])
- if valLength > 0 {
- valBuf = make([]byte, valLength)
- if n1, err1 := io.ReadFull(reader, valBuf); err1 != nil || n1 != valLength {
- return fmt.Errorf("reading from account buffer file: %d %v", n1, err1)
- }
- }
- element.Value = valBuf
- heap.Push(h, element)
- } else if err != io.EOF {
- // If it is EOF, we simply do not return anything into the heap
- return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.Key[:n], err)
+func getFromPlainStateAndLoad(db ethdb.Getter, loadFunc etl.LoadFunc) etl.LoadFunc {
+ return func(k []byte, _ []byte, state etl.State, next etl.LoadNextFunc) error {
+ // ignoring value un purpose, we want the latest one and it is in PlainStateBucket
+ value, err := db.Get(dbutils.PlainStateBucket, k)
+ if err == nil || errors.Is(err, ethdb.ErrKeyNotFound) {
+ return loadFunc(k, value, state, next)
}
+ return err
}
- return nil
}
-func (p *Promoter) Promote(from, to uint64, changeSetBucket []byte) error {
- v, ok := promoterMapper[string(changeSetBucket)]
- if !ok {
- return fmt.Errorf("unknown bucket type: %s", changeSetBucket)
- }
+func (p *Promoter) Promote(s *StageState, from, to uint64, changeSetBucket []byte, index byte) error {
log.Info("Incremental promotion started", "from", from, "to", to, "csbucket", string(changeSetBucket))
- var m runtime.MemStats
- var bufferFileNames []string
- changesets := make([]byte, p.ChangeSetBufSize) // 256 Mb buffer by default
- var offsets []int
- var done = false
- blockNum := from + 1
- for !done {
- if newDone, newBlockNum, newOffsets, err := p.fillChangeSetBuffer(changeSetBucket, blockNum, to, changesets, offsets); err == nil {
- done = newDone
- blockNum = newBlockNum
- offsets = newOffsets
- } else {
- return err
- }
- if len(offsets) == 0 {
- break
- }
- bufferMap := make(map[string]struct{})
- prevOffset := 0
- for _, offset := range offsets {
- if err := v.WalkerAdapter(changesets[prevOffset:offset]).Walk(func(k, v []byte) error {
- bufferMap[string(k)] = struct{}{}
- return nil
- }); err != nil {
+ startkey := dbutils.EncodeTimestamp(from + 1)
+ skip := false
+
+ var loadStartKey []byte
+ if len(s.StageData) != 0 {
+ // we have finished this stage but didn't start the next one
+ if len(s.StageData) == 1 && s.StageData[0] == index {
+ skip = true
+ // we are already at the next stage
+ } else if s.StageData[0] > index {
+ skip = true
+ // if we at the current stage and we have something meaningful at StageData
+ } else if s.StageData[0] == index {
+ var err error
+ loadStartKey, err = etl.NextKey(s.StageData[1:])
+ if err != nil {
return err
}
- prevOffset = offset
- }
-
- if filename, err := p.writeBufferMapToTempFile(v.Template, bufferMap); err == nil {
- defer func() {
- //nolint:errcheck
- os.Remove(filename)
- }()
- bufferFileNames = append(bufferFileNames, filename)
- runtime.ReadMemStats(&m)
- log.Info("Created a buffer file", "name", filename, "up to block", blockNum,
- "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC))
- } else {
- return err
}
}
- if len(offsets) > 0 {
- collector := etl.NewCollector(p.TempDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
- if err := p.mergeFilesAndCollect(bufferFileNames, v.KeySize, collector); err != nil {
- return err
- }
- if err := collector.Load(p.db, dbutils.CurrentStateBucket, keyTransformLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil {
- return err
- }
+ if skip {
+ return nil
}
- return nil
+
+ return etl.Transform(
+ p.db,
+ changeSetBucket,
+ dbutils.CurrentStateBucket,
+ p.TempDir,
+ getExtractFunc(changeSetBucket),
+ // here we avoid getting the state from changesets,
+ // we just care about the accounts that did change,
+ // so we can directly read from the PlainTextBuffer
+ getFromPlainStateAndLoad(p.db, keyTransformLoadFunc),
+ etl.TransformArgs{
+ BufferType: etl.SortableAppendBuffer,
+ ExtractStartKey: startkey,
+ LoadStartKey: loadStartKey,
+ OnLoadCommit: func(putter ethdb.Putter, key []byte, isDone bool) error {
+ if isDone {
+ return s.UpdateWithStageData(putter, from, []byte{index})
+ }
+ return s.UpdateWithStageData(putter, from, append([]byte{index}, key...))
+ },
+ Quit: p.quitCh,
+ },
+ )
}
-func (p *Promoter) Unwind(from, to uint64, changeSetBucket []byte) error {
- v, ok := promoterMapper[string(changeSetBucket)]
- if !ok {
- return fmt.Errorf("unknown bucket type: %s", changeSetBucket)
- }
+func (p *Promoter) Unwind(s *StageState, u *UnwindState, changeSetBucket []byte, index byte) error {
+ from := s.BlockNumber
+ to := u.UnwindPoint
+
log.Info("Unwinding started", "from", from, "to", to, "csbucket", string(changeSetBucket))
- var m runtime.MemStats
- var bufferFileNames []string
- changesets := make([]byte, p.ChangeSetBufSize) // 256 Mb buffer by default
- var offsets []int
- var done = false
- blockNum := to + 1
- for !done {
- if newDone, newBlockNum, newOffsets, err := p.fillChangeSetBuffer(changeSetBucket, blockNum, from, changesets, offsets); err == nil {
- done = newDone
- blockNum = newBlockNum
- offsets = newOffsets
- } else {
- return err
- }
- if len(offsets) == 0 {
- break
- }
- bufferMap := make(map[string][]byte)
- prevOffset := 0
- for _, offset := range offsets {
- if err := v.WalkerAdapter(changesets[prevOffset:offset]).Walk(func(k, v []byte) error {
- ks := string(k)
- if _, ok := bufferMap[ks]; !ok {
- // Do not replace the existing values, so we end up with the earlier possible values
- bufferMap[ks] = v
- }
- return nil
- }); err != nil {
+ startkey := dbutils.EncodeTimestamp(to + 1)
+
+ var loadStartKey []byte
+ skip := false
+
+ if len(u.StageData) != 0 {
+ // we have finished this stage but didn't start the next one
+ if len(u.StageData) == 1 && u.StageData[0] == index {
+ skip = true
+ // we are already at the next stage
+ } else if u.StageData[0] > index {
+ skip = true
+ // if we at the current stage and we have something meaningful at StageData
+ } else if u.StageData[0] == index {
+ var err error
+ loadStartKey, err = etl.NextKey(u.StageData[1:])
+ if err != nil {
return err
}
- prevOffset = offset
- }
-
- if filename, err := p.writeUnwindBufferMapToTempFile(v.Template, bufferMap); err == nil {
- defer func() {
- //nolint:errcheck
- os.Remove(filename)
- }()
- bufferFileNames = append(bufferFileNames, filename)
- runtime.ReadMemStats(&m)
- log.Info("Created a buffer file", "name", filename, "up to block", blockNum,
- "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC))
- } else {
- return err
}
}
- if len(offsets) > 0 {
- collector := etl.NewCollector(p.TempDir, etl.NewAppendBuffer(etl.BufferOptimalSize))
- if err := p.mergeUnwindFilesAndCollect(bufferFileNames, v.KeySize, collector); err != nil {
- return err
- }
- if err := collector.Load(p.db, dbutils.CurrentStateBucket, keyTransformLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil {
- return err
- }
+
+ if skip {
+ return nil
}
- return nil
+
+ return etl.Transform(
+ p.db,
+ changeSetBucket,
+ dbutils.CurrentStateBucket,
+ p.TempDir,
+ getUnwindExtractFunc(changeSetBucket),
+ keyTransformLoadFunc,
+ etl.TransformArgs{
+ BufferType: etl.SortableOldestAppearedBuffer,
+ LoadStartKey: loadStartKey,
+ ExtractStartKey: startkey,
+ OnLoadCommit: func(putter ethdb.Putter, key []byte, isDone bool) error {
+ if isDone {
+ return u.UpdateWithStageData(putter, []byte{index})
+ }
+ return u.UpdateWithStageData(putter, append([]byte{index}, key...))
+ },
+ Quit: p.quitCh,
+ },
+ )
}
-func promoteHashedStateIncrementally(from, to uint64, db ethdb.Database, datadir string, quit chan struct{}) error {
+func promoteHashedStateIncrementally(s *StageState, from, to uint64, db ethdb.Database, datadir string, quit chan struct{}) error {
prom := NewPromoter(db, quit)
prom.TempDir = datadir
- if err := prom.Promote(from, to, dbutils.PlainAccountChangeSetBucket); err != nil {
+ if err := prom.Promote(s, from, to, dbutils.PlainAccountChangeSetBucket, 0x01); err != nil {
return err
}
- if err := prom.Promote(from, to, dbutils.PlainStorageChangeSetBucket); err != nil {
+ if err := prom.Promote(s, from, to, dbutils.PlainStorageChangeSetBucket, 0x02); err != nil {
return err
}
return nil
diff --git a/eth/stagedsync/stage_hashstate_test.go b/eth/stagedsync/stage_hashstate_test.go
index 90ee6107e9..edc562117e 100644
--- a/eth/stagedsync/stage_hashstate_test.go
+++ b/eth/stagedsync/stage_hashstate_test.go
@@ -27,7 +27,7 @@ func TestPromoteHashedStateClearState(t *testing.T) {
generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations)
m2 := db2.NewBatch()
- err := promoteHashedState(m2, 0, 50, getDataDir(), nil)
+ err := promoteHashedState(&StageState{}, m2, 0, 50, getDataDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@@ -49,7 +49,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) {
generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations)
m2 := db2.NewBatch()
- err := promoteHashedState(m2, 0, 50, getDataDir(), nil)
+ err := promoteHashedState(&StageState{}, m2, 0, 50, getDataDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@@ -62,7 +62,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) {
generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations)
m2 = db2.NewBatch()
- err = promoteHashedState(m2, 50, 101, getDataDir(), nil)
+ err = promoteHashedState(&StageState{BlockNumber: 50}, m2, 50, 101, getDataDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@@ -85,7 +85,7 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) {
generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations)
m2 := db2.NewBatch()
- err := promoteHashedState(m2, 50, 101, getDataDir(), nil)
+ err := promoteHashedState(&StageState{}, m2, 50, 101, getDataDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
@@ -106,7 +106,7 @@ func TestUnwindHashed(t *testing.T) {
generateBlocks(t, 1, 50, hashedWriterGen(db1), changeCodeWithIncarnations)
generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations)
- err := promoteHashedState(db2, 0, 100, getDataDir(), nil)
+ err := promoteHashedState(&StageState{}, db2, 0, 100, getDataDir(), nil)
if err != nil {
t.Errorf("error while promoting state: %v", err)
}
--
GitLab