diff --git a/common/etl/buffers.go b/common/etl/buffers.go index 013a4fa30b4017dd054c6390a0d74c571f735e9c..6f353d7c8d71981dffca76f6a442ddbdc967751e 100644 --- a/common/etl/buffers.go +++ b/common/etl/buffers.go @@ -4,13 +4,18 @@ import ( "bytes" "sort" "strconv" + + "github.com/ledgerwatch/turbo-geth/common" ) const ( //SliceBuffer - just simple slice w SortableSliceBuffer = iota - //SortableAppendBuffer - just simple slice w + //SortableAppendBuffer - map[k] [v1 v2 v3] SortableAppendBuffer + // SortableOldestAppearedBuffer - buffer that keeps only the oldest entries. + // if first v1 was added under key K, then v2; only v1 will stay + SortableOldestAppearedBuffer BufferOptimalSize = 256 * 1024 * 1024 /* 256 mb | var because we want to sometimes change it from tests */ ) @@ -155,12 +160,80 @@ func (b *appendSortableBuffer) CheckFlushSize() bool { return b.size >= b.optimalSize } +func NewOldestEntryBuffer(bufferOptimalSize int) *oldestEntrySortableBuffer { + return &oldestEntrySortableBuffer{ + entries: make(map[string][]byte), + size: 0, + optimalSize: bufferOptimalSize, + } +} + +type oldestEntrySortableBuffer struct { + entries map[string][]byte + size int + optimalSize int + sortedBuf []sortableBufferEntry +} + +func (b *oldestEntrySortableBuffer) Put(k, v []byte) { + _, ok := b.entries[string(k)] + if ok { + // if we already had this entry, we are going to keep it and ignore new value + return + } + + b.size += len(k) + b.size += len(v) + b.entries[string(k)] = common.CopyBytes(v) +} + +func (b *oldestEntrySortableBuffer) Size() int { + return b.size +} + +func (b *oldestEntrySortableBuffer) Len() int { + return len(b.entries) +} +func (b *oldestEntrySortableBuffer) Sort() { + for i := range b.entries { + b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(i), value: b.entries[i]}) + } + sort.Sort(b) +} + +func (b *oldestEntrySortableBuffer) Less(i, j int) bool { + return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0 +} + +func (b *oldestEntrySortableBuffer) Swap(i, j int) { + b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i] +} + +func (b *oldestEntrySortableBuffer) Get(i int) sortableBufferEntry { + return b.sortedBuf[i] +} +func (b *oldestEntrySortableBuffer) Reset() { + b.sortedBuf = b.sortedBuf[:0] + b.entries = make(map[string][]byte) + b.size = 0 +} + +func (b *oldestEntrySortableBuffer) GetEntries() []sortableBufferEntry { + return b.sortedBuf +} + +func (b *oldestEntrySortableBuffer) CheckFlushSize() bool { + return b.size >= b.optimalSize +} + func getBufferByType(tp int, size int) Buffer { switch tp { case SortableSliceBuffer: return NewSortableBuffer(size) case SortableAppendBuffer: return NewAppendBuffer(size) + case SortableOldestAppearedBuffer: + return NewOldestEntryBuffer(size) default: panic("unknown buffer type " + strconv.Itoa(tp)) } diff --git a/common/etl/collector.go b/common/etl/collector.go index 5088c840d19666c7c151c98bb5de8b98a0db19e4..df9e59612ad4ebc452f6d9b7b0fa95c62f23b123 100644 --- a/common/etl/collector.go +++ b/common/etl/collector.go @@ -4,12 +4,13 @@ import ( "bytes" "container/heap" "fmt" + "io" + "runtime" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ugorji/go/codec" - "io" - "runtime" ) type LoadNextFunc func(k []byte, v []byte) error @@ -112,12 +113,14 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, providers []dataProvi } batchSize := batch.BatchSize() if batchSize > batch.IdealBatchSize() || args.loadBatchSize > 0 && batchSize > args.loadBatchSize { + if args.OnLoadCommit != nil { + if err := args.OnLoadCommit(batch, k, false); err != nil { + return err + } + } if _, err := batch.Commit(); err != nil { return err } - if args.OnLoadCommit != nil { - args.OnLoadCommit(k, false) - } var currentKeyStr string if len(k) < 4 { currentKeyStr = fmt.Sprintf("%x", k) @@ -152,9 +155,11 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, providers []dataProvi return fmt.Errorf("error while reading next element from disk: %v", err) } } - _, err := batch.Commit() if args.OnLoadCommit != nil { - args.OnLoadCommit([]byte{}, true) + if err := args.OnLoadCommit(batch, []byte{}, true); err != nil { + return err + } } + _, err := batch.Commit() return err } diff --git a/common/etl/etl.go b/common/etl/etl.go index 9358a4b645d08e2680031a01ca1fead29cd2dc08..e611fde6e494b6567e3d2f118a1933838d539e72 100644 --- a/common/etl/etl.go +++ b/common/etl/etl.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "fmt" + "io" + "time" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ugorji/go/codec" "golang.org/x/sync/errgroup" - "io" - "time" ) var ( @@ -54,7 +55,7 @@ func NextKey(key []byte) ([]byte, error) { // loaded from files into a DB // * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) // * `isDone`: true, if everything is processed -type LoadCommitHandler func(key []byte, isDone bool) +type LoadCommitHandler func(ethdb.Putter, []byte, bool) error type TransformArgs struct { ExtractStartKey []byte diff --git a/common/etl/etl_test.go b/common/etl/etl_test.go index d36bd7eec8b4824090f1a6a78b5fb00e1fe7c276..16994cd218ca1283abd40f5567a8509d07a00c5e 100644 --- a/common/etl/etl_test.go +++ b/common/etl/etl_test.go @@ -3,15 +3,16 @@ package etl import ( "bytes" "fmt" + "io" + "os" + "strings" + "testing" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/stretchr/testify/assert" "github.com/ugorji/go/codec" - "io" - "os" - "strings" - "testing" ) func TestWriteAndReadBufferEntry(t *testing.T) { @@ -164,11 +165,12 @@ func TestTransformOnLoadCommitCustomBatchSize(t *testing.T) { testExtractToMapFunc, testLoadFromMapFunc, TransformArgs{ - OnLoadCommit: func(_ []byte, isDone bool) { + OnLoadCommit: func(_ ethdb.Putter, _ []byte, isDone bool) error { numberOfCalls++ if isDone { finalized = true } + return nil }, loadBatchSize: 1, }, @@ -198,11 +200,12 @@ func TestTransformOnLoadCommitDefaultBatchSize(t *testing.T) { testExtractToMapFunc, testLoadFromMapFunc, TransformArgs{ - OnLoadCommit: func(_ []byte, isDone bool) { + OnLoadCommit: func(_ ethdb.Putter, _ []byte, isDone bool) error { numberOfCalls++ if isDone { finalized = true } + return nil }, }, ) diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 790ff9cfaeb54cb0d94e58d271d61ec5cfe4ae2d..035476aaf8503e38079ddf6fb9fbfb9395172661 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -1,15 +1,9 @@ package stagedsync import ( - "bufio" - "bytes" - "container/heap" + "errors" "fmt" - "io" - "io/ioutil" "os" - "runtime" - "sort" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/changeset" @@ -21,7 +15,6 @@ import ( "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/trie" - "github.com/pkg/errors" "github.com/ugorji/go/codec" ) @@ -44,7 +37,7 @@ func SpawnHashStateStage(s *StageState, stateDB ethdb.Database, datadir string, if core.UsePlainStateExecution { log.Info("Promoting plain state", "from", hashProgress, "to", syncHeadNumber) - err := promoteHashedState(stateDB, hashProgress, syncHeadNumber, datadir, quit) + err := promoteHashedState(s, stateDB, hashProgress, syncHeadNumber, datadir, quit) if err != nil { return err } @@ -63,7 +56,7 @@ func verifyRootHash(stateDB ethdb.Database, syncHeadNumber uint64) error { rl := trie.NewRetainList(0) subTries, err1 := loader.LoadFromFlatDB(stateDB, rl, nil /*HashCollector*/, [][]byte{nil}, []int{0}, false) if err1 != nil { - return errors.Wrap(err1, "checking root hash failed") + return fmt.Errorf("checking root hash failed (err=%v)", err1) } if len(subTries.Hashes) != 1 { return fmt.Errorf("expected 1 hash, got %d", len(subTries.Hashes)) @@ -92,38 +85,76 @@ func unwindHashStateStageImpl(u *UnwindState, s *StageState, stateDB ethdb.Datab // and recomputes the state root from scratch prom := NewPromoter(stateDB, quit) prom.TempDir = datadir - if err := prom.Unwind(s.BlockNumber, u.UnwindPoint, dbutils.PlainAccountChangeSetBucket); err != nil { + if err := prom.Unwind(s, u, dbutils.PlainAccountChangeSetBucket, 0x00); err != nil { return err } - if err := prom.Unwind(s.BlockNumber, u.UnwindPoint, dbutils.PlainStorageChangeSetBucket); err != nil { + if err := prom.Unwind(s, u, dbutils.PlainStorageChangeSetBucket, 0x01); err != nil { return err } return nil } -func promoteHashedState(db ethdb.Database, from, to uint64, datadir string, quit chan struct{}) error { +func promoteHashedState(s *StageState, db ethdb.Database, from, to uint64, datadir string, quit chan struct{}) error { if from == 0 { - return promoteHashedStateCleanly(db, datadir, quit) + return promoteHashedStateCleanly(s, db, to, datadir, quit) } - return promoteHashedStateIncrementally(from, to, db, datadir, quit) + return promoteHashedStateIncrementally(s, from, to, db, datadir, quit) } -func promoteHashedStateCleanly(db ethdb.Database, datadir string, quit chan struct{}) error { - if err := common.Stopped(quit); err != nil { +func promoteHashedStateCleanly(s *StageState, db ethdb.Database, to uint64, datadir string, quit chan struct{}) error { + var err error + if err = common.Stopped(quit); err != nil { return err } - err := etl.Transform( - db, - dbutils.PlainStateBucket, - dbutils.CurrentStateBucket, - datadir, - keyTransformExtractFunc(transformPlainStateKey), - etl.IdentityLoadFunc, - etl.TransformArgs{Quit: quit}, - ) + var loadStartKey []byte + skipCurrentState := false + if len(s.StageData) == 1 && s.StageData[0] == byte(0xFF) { + skipCurrentState = true + } else if len(s.StageData) > 0 { + loadStartKey, err = etl.NextKey(s.StageData[1:]) + if err != nil { + return err + } + } - if err != nil { - return err + if !skipCurrentState { + toStateStageData := func(k []byte) []byte { + return append([]byte{0xFF}, k...) + } + + err = etl.Transform( + db, + dbutils.PlainStateBucket, + dbutils.CurrentStateBucket, + datadir, + keyTransformExtractFunc(transformPlainStateKey), + etl.IdentityLoadFunc, + etl.TransformArgs{ + Quit: quit, + LoadStartKey: loadStartKey, + OnLoadCommit: func(batch ethdb.Putter, key []byte, isDone bool) error { + if isDone { + return s.UpdateWithStageData(batch, s.BlockNumber, toStateStageData(nil)) + } + return s.UpdateWithStageData(batch, s.BlockNumber, toStateStageData(key)) + }, + }, + ) + + if err != nil { + return err + } + } + + toCodeStageData := func(k []byte) []byte { + return append([]byte{0xCD}, k...) + } + + if len(s.StageData) > 0 && s.StageData[0] == byte(0xCD) { + loadStartKey, err = etl.NextKey(s.StageData[1:]) + if err != nil { + return err + } } return etl.Transform( @@ -133,7 +164,16 @@ func promoteHashedStateCleanly(db ethdb.Database, datadir string, quit chan stru datadir, keyTransformExtractFunc(transformContractCodeKey), etl.IdentityLoadFunc, - etl.TransformArgs{Quit: quit}, + etl.TransformArgs{ + Quit: quit, + LoadStartKey: loadStartKey, + OnLoadCommit: func(batch ethdb.Putter, key []byte, isDone bool) error { + if isDone { + return s.UpdateWithStageData(batch, to, nil) + } + return s.UpdateWithStageData(batch, s.BlockNumber, toCodeStageData(key)) + }, + }, ) } @@ -245,353 +285,154 @@ var promoterMapper = map[string]struct { }, } -func (p *Promoter) fillChangeSetBuffer(bucket []byte, blockNum, to uint64, changesets []byte, offsets []int) (bool, uint64, []int, error) { - offset := 0 - offsets = offsets[:0] - startKey := dbutils.EncodeTimestamp(blockNum) - done := true - if err := p.db.Walk(bucket, startKey, 0, func(k, v []byte) (bool, error) { - if err := common.Stopped(p.quitCh); err != nil { - return false, err - } - blockNum, _ = dbutils.DecodeTimestamp(k) - if blockNum > to { - return false, nil - } - if offset+len(v) > len(changesets) { // Adding the current changeset would overflow the buffer - done = false - return false, nil - } - copy(changesets[offset:], v) - offset += len(v) - offsets = append(offsets, offset) - return true, nil - }); err != nil { - return true, blockNum, offsets, fmt.Errorf("walking over account changeset for block %d: %v", blockNum, err) - } - return done, blockNum, offsets, nil -} - -// writeBufferMapToTempFile creates temp file in the datadir and writes bufferMap into it -// if sucessful, returns the name of the created file. File is closed -func (p *Promoter) writeBufferMapToTempFile(pattern string, bufferMap map[string]struct{}) (string, error) { - var filename string - keys := make([]string, len(bufferMap)) - i := 0 - for key := range bufferMap { - keys[i] = key - i++ - } - sort.Strings(keys) - var w *bufio.Writer - if bufferFile, err := ioutil.TempFile(p.TempDir, pattern); err == nil { - //nolint:errcheck - defer bufferFile.Close() - filename = bufferFile.Name() - w = bufio.NewWriter(bufferFile) - } else { - return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err) - } - for _, key := range keys { - if _, err := w.Write([]byte(key)); err != nil { - return filename, err +func getExtractFunc(changeSetBucket []byte) etl.ExtractFunc { + mapping, ok := promoterMapper[string(changeSetBucket)] + return func(_, changesetBytes []byte, next etl.ExtractNextFunc) error { + if !ok { + return fmt.Errorf("unknown bucket type: %s", changeSetBucket) } + return mapping.WalkerAdapter(changesetBytes).Walk(func(k, v []byte) error { + return next(k, k, nil) + }) } - if err := w.Flush(); err != nil { - return filename, fmt.Errorf("flushing file %s: %v", filename, err) - } - return filename, nil } -func (p *Promoter) writeUnwindBufferMapToTempFile(pattern string, bufferMap map[string][]byte) (string, error) { - var filename string - keys := make([]string, len(bufferMap)) - i := 0 - for key := range bufferMap { - keys[i] = key - i++ - } - sort.Strings(keys) - var w *bufio.Writer - if bufferFile, err := ioutil.TempFile(p.TempDir, pattern); err == nil { - //nolint:errcheck - defer bufferFile.Close() - filename = bufferFile.Name() - w = bufio.NewWriter(bufferFile) - } else { - return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err) - } - for _, key := range keys { - if _, err := w.Write([]byte(key)); err != nil { - return filename, err - } - value := bufferMap[key] - if err := w.WriteByte(byte(len(value))); err != nil { - return filename, err - } - if _, err := w.Write(value); err != nil { - return filename, err +func getUnwindExtractFunc(changeSetBucket []byte) etl.ExtractFunc { + mapping, ok := promoterMapper[string(changeSetBucket)] + return func(_, changesetBytes []byte, next etl.ExtractNextFunc) error { + if !ok { + return fmt.Errorf("unknown bucket type: %s", changeSetBucket) } + return mapping.WalkerAdapter(changesetBytes).Walk(func(k, v []byte) error { + return next(k, k, v) + }) } - if err := w.Flush(); err != nil { - return filename, fmt.Errorf("flushing file %s: %v", filename, err) - } - return filename, nil } -func (p *Promoter) mergeFilesAndCollect(bufferFileNames []string, keyLength int, collector *etl.Collector) error { - h := &etl.Heap{} - heap.Init(h) - readers := make([]io.Reader, len(bufferFileNames)) - for i, fileName := range bufferFileNames { - if f, err := os.Open(fileName); err == nil { - readers[i] = bufio.NewReader(f) - //nolint:errcheck - defer f.Close() - } else { - return err - } - // Read first key - keyBuf := make([]byte, keyLength) - if n, err := io.ReadFull(readers[i], keyBuf); err == nil && n == keyLength { - heap.Push(h, etl.HeapElem{keyBuf, i, nil}) - } else { - return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err) - } - } - // By now, the heap has one element for each buffer file - var prevKey []byte - for h.Len() > 0 { - if err := common.Stopped(p.quitCh); err != nil { - return err - } - element := (heap.Pop(h)).(etl.HeapElem) - if !bytes.Equal(element.Key, prevKey) { - // Ignore all the repeating keys - prevKey = common.CopyBytes(element.Key) - if v, err := p.db.Get(dbutils.PlainStateBucket, element.Key); err == nil || err == ethdb.ErrKeyNotFound { - if err1 := collector.Collect(element.Key, v); err1 != nil { - return err1 - } - } else { - return err - } - } - reader := readers[element.TimeIdx] - // Try to read the next key (reuse the element) - if n, err := io.ReadFull(reader, element.Key); err == nil && n == keyLength { - heap.Push(h, element) - } else if err != io.EOF { - // If it is EOF, we simply do not return anything into the heap - return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.Key[:n], err) - } - } - return nil -} - -func (p *Promoter) mergeUnwindFilesAndCollect(bufferFileNames []string, keyLength int, collector *etl.Collector) error { - h := &etl.Heap{} - heap.Init(h) - readers := make([]io.Reader, len(bufferFileNames)) - for i, fileName := range bufferFileNames { - if f, err := os.Open(fileName); err == nil { - readers[i] = bufio.NewReader(f) - //nolint:errcheck - defer f.Close() - } else { - return err - } - // Read first key - keyBuf := make([]byte, keyLength) - if n, err := io.ReadFull(readers[i], keyBuf); err != nil || n != keyLength { - return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err) - } - var l [1]byte - if n, err := io.ReadFull(readers[i], l[:]); err != nil || n != 1 { - return fmt.Errorf("init reading from account buffer file: %d %v", n, err) - } - var valBuf []byte - valLength := int(l[0]) - if valLength > 0 { - valBuf = make([]byte, valLength) - if n, err := io.ReadFull(readers[i], valBuf); err != nil || n != valLength { - return fmt.Errorf("init reading from account buffer file: %d %v", n, err) - } - } - heap.Push(h, etl.HeapElem{keyBuf, i, valBuf}) - } - // By now, the heap has one element for each buffer file - var prevKey []byte - for h.Len() > 0 { - if err := common.Stopped(p.quitCh); err != nil { - return err - } - element := (heap.Pop(h)).(etl.HeapElem) - if !bytes.Equal(element.Key, prevKey) { - // Ignore all the repeating keys, and take the earlist - prevKey = common.CopyBytes(element.Key) - if err := collector.Collect(element.Key, element.Value); err != nil { - return err - } - } - reader := readers[element.TimeIdx] - // Try to read the next key (reuse the element) - if n, err := io.ReadFull(reader, element.Key); err == nil && n == keyLength { - var l [1]byte - if n1, err1 := io.ReadFull(reader, l[:]); err1 != nil || n1 != 1 { - return fmt.Errorf("reading from account buffer file: %d %v", n1, err1) - } - var valBuf []byte - valLength := int(l[0]) - if valLength > 0 { - valBuf = make([]byte, valLength) - if n1, err1 := io.ReadFull(reader, valBuf); err1 != nil || n1 != valLength { - return fmt.Errorf("reading from account buffer file: %d %v", n1, err1) - } - } - element.Value = valBuf - heap.Push(h, element) - } else if err != io.EOF { - // If it is EOF, we simply do not return anything into the heap - return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.Key[:n], err) +func getFromPlainStateAndLoad(db ethdb.Getter, loadFunc etl.LoadFunc) etl.LoadFunc { + return func(k []byte, _ []byte, state etl.State, next etl.LoadNextFunc) error { + // ignoring value un purpose, we want the latest one and it is in PlainStateBucket + value, err := db.Get(dbutils.PlainStateBucket, k) + if err == nil || errors.Is(err, ethdb.ErrKeyNotFound) { + return loadFunc(k, value, state, next) } + return err } - return nil } -func (p *Promoter) Promote(from, to uint64, changeSetBucket []byte) error { - v, ok := promoterMapper[string(changeSetBucket)] - if !ok { - return fmt.Errorf("unknown bucket type: %s", changeSetBucket) - } +func (p *Promoter) Promote(s *StageState, from, to uint64, changeSetBucket []byte, index byte) error { log.Info("Incremental promotion started", "from", from, "to", to, "csbucket", string(changeSetBucket)) - var m runtime.MemStats - var bufferFileNames []string - changesets := make([]byte, p.ChangeSetBufSize) // 256 Mb buffer by default - var offsets []int - var done = false - blockNum := from + 1 - for !done { - if newDone, newBlockNum, newOffsets, err := p.fillChangeSetBuffer(changeSetBucket, blockNum, to, changesets, offsets); err == nil { - done = newDone - blockNum = newBlockNum - offsets = newOffsets - } else { - return err - } - if len(offsets) == 0 { - break - } - bufferMap := make(map[string]struct{}) - prevOffset := 0 - for _, offset := range offsets { - if err := v.WalkerAdapter(changesets[prevOffset:offset]).Walk(func(k, v []byte) error { - bufferMap[string(k)] = struct{}{} - return nil - }); err != nil { + startkey := dbutils.EncodeTimestamp(from + 1) + skip := false + + var loadStartKey []byte + if len(s.StageData) != 0 { + // we have finished this stage but didn't start the next one + if len(s.StageData) == 1 && s.StageData[0] == index { + skip = true + // we are already at the next stage + } else if s.StageData[0] > index { + skip = true + // if we at the current stage and we have something meaningful at StageData + } else if s.StageData[0] == index { + var err error + loadStartKey, err = etl.NextKey(s.StageData[1:]) + if err != nil { return err } - prevOffset = offset - } - - if filename, err := p.writeBufferMapToTempFile(v.Template, bufferMap); err == nil { - defer func() { - //nolint:errcheck - os.Remove(filename) - }() - bufferFileNames = append(bufferFileNames, filename) - runtime.ReadMemStats(&m) - log.Info("Created a buffer file", "name", filename, "up to block", blockNum, - "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) - } else { - return err } } - if len(offsets) > 0 { - collector := etl.NewCollector(p.TempDir, etl.NewSortableBuffer(etl.BufferOptimalSize)) - if err := p.mergeFilesAndCollect(bufferFileNames, v.KeySize, collector); err != nil { - return err - } - if err := collector.Load(p.db, dbutils.CurrentStateBucket, keyTransformLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil { - return err - } + if skip { + return nil } - return nil + + return etl.Transform( + p.db, + changeSetBucket, + dbutils.CurrentStateBucket, + p.TempDir, + getExtractFunc(changeSetBucket), + // here we avoid getting the state from changesets, + // we just care about the accounts that did change, + // so we can directly read from the PlainTextBuffer + getFromPlainStateAndLoad(p.db, keyTransformLoadFunc), + etl.TransformArgs{ + BufferType: etl.SortableAppendBuffer, + ExtractStartKey: startkey, + LoadStartKey: loadStartKey, + OnLoadCommit: func(putter ethdb.Putter, key []byte, isDone bool) error { + if isDone { + return s.UpdateWithStageData(putter, from, []byte{index}) + } + return s.UpdateWithStageData(putter, from, append([]byte{index}, key...)) + }, + Quit: p.quitCh, + }, + ) } -func (p *Promoter) Unwind(from, to uint64, changeSetBucket []byte) error { - v, ok := promoterMapper[string(changeSetBucket)] - if !ok { - return fmt.Errorf("unknown bucket type: %s", changeSetBucket) - } +func (p *Promoter) Unwind(s *StageState, u *UnwindState, changeSetBucket []byte, index byte) error { + from := s.BlockNumber + to := u.UnwindPoint + log.Info("Unwinding started", "from", from, "to", to, "csbucket", string(changeSetBucket)) - var m runtime.MemStats - var bufferFileNames []string - changesets := make([]byte, p.ChangeSetBufSize) // 256 Mb buffer by default - var offsets []int - var done = false - blockNum := to + 1 - for !done { - if newDone, newBlockNum, newOffsets, err := p.fillChangeSetBuffer(changeSetBucket, blockNum, from, changesets, offsets); err == nil { - done = newDone - blockNum = newBlockNum - offsets = newOffsets - } else { - return err - } - if len(offsets) == 0 { - break - } - bufferMap := make(map[string][]byte) - prevOffset := 0 - for _, offset := range offsets { - if err := v.WalkerAdapter(changesets[prevOffset:offset]).Walk(func(k, v []byte) error { - ks := string(k) - if _, ok := bufferMap[ks]; !ok { - // Do not replace the existing values, so we end up with the earlier possible values - bufferMap[ks] = v - } - return nil - }); err != nil { + startkey := dbutils.EncodeTimestamp(to + 1) + + var loadStartKey []byte + skip := false + + if len(u.StageData) != 0 { + // we have finished this stage but didn't start the next one + if len(u.StageData) == 1 && u.StageData[0] == index { + skip = true + // we are already at the next stage + } else if u.StageData[0] > index { + skip = true + // if we at the current stage and we have something meaningful at StageData + } else if u.StageData[0] == index { + var err error + loadStartKey, err = etl.NextKey(u.StageData[1:]) + if err != nil { return err } - prevOffset = offset - } - - if filename, err := p.writeUnwindBufferMapToTempFile(v.Template, bufferMap); err == nil { - defer func() { - //nolint:errcheck - os.Remove(filename) - }() - bufferFileNames = append(bufferFileNames, filename) - runtime.ReadMemStats(&m) - log.Info("Created a buffer file", "name", filename, "up to block", blockNum, - "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) - } else { - return err } } - if len(offsets) > 0 { - collector := etl.NewCollector(p.TempDir, etl.NewAppendBuffer(etl.BufferOptimalSize)) - if err := p.mergeUnwindFilesAndCollect(bufferFileNames, v.KeySize, collector); err != nil { - return err - } - if err := collector.Load(p.db, dbutils.CurrentStateBucket, keyTransformLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil { - return err - } + + if skip { + return nil } - return nil + + return etl.Transform( + p.db, + changeSetBucket, + dbutils.CurrentStateBucket, + p.TempDir, + getUnwindExtractFunc(changeSetBucket), + keyTransformLoadFunc, + etl.TransformArgs{ + BufferType: etl.SortableOldestAppearedBuffer, + LoadStartKey: loadStartKey, + ExtractStartKey: startkey, + OnLoadCommit: func(putter ethdb.Putter, key []byte, isDone bool) error { + if isDone { + return u.UpdateWithStageData(putter, []byte{index}) + } + return u.UpdateWithStageData(putter, append([]byte{index}, key...)) + }, + Quit: p.quitCh, + }, + ) } -func promoteHashedStateIncrementally(from, to uint64, db ethdb.Database, datadir string, quit chan struct{}) error { +func promoteHashedStateIncrementally(s *StageState, from, to uint64, db ethdb.Database, datadir string, quit chan struct{}) error { prom := NewPromoter(db, quit) prom.TempDir = datadir - if err := prom.Promote(from, to, dbutils.PlainAccountChangeSetBucket); err != nil { + if err := prom.Promote(s, from, to, dbutils.PlainAccountChangeSetBucket, 0x01); err != nil { return err } - if err := prom.Promote(from, to, dbutils.PlainStorageChangeSetBucket); err != nil { + if err := prom.Promote(s, from, to, dbutils.PlainStorageChangeSetBucket, 0x02); err != nil { return err } return nil diff --git a/eth/stagedsync/stage_hashstate_test.go b/eth/stagedsync/stage_hashstate_test.go index 90ee6107e9c2d0ebacffef506a774d3fb1832063..edc562117e34f79bd1a6befe2f25753afe2d8280 100644 --- a/eth/stagedsync/stage_hashstate_test.go +++ b/eth/stagedsync/stage_hashstate_test.go @@ -27,7 +27,7 @@ func TestPromoteHashedStateClearState(t *testing.T) { generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 0, 50, getDataDir(), nil) + err := promoteHashedState(&StageState{}, m2, 0, 50, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -49,7 +49,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 0, 50, getDataDir(), nil) + err := promoteHashedState(&StageState{}, m2, 0, 50, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -62,7 +62,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 = db2.NewBatch() - err = promoteHashedState(m2, 50, 101, getDataDir(), nil) + err = promoteHashedState(&StageState{BlockNumber: 50}, m2, 50, 101, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -85,7 +85,7 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) { generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 50, 101, getDataDir(), nil) + err := promoteHashedState(&StageState{}, m2, 50, 101, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -106,7 +106,7 @@ func TestUnwindHashed(t *testing.T) { generateBlocks(t, 1, 50, hashedWriterGen(db1), changeCodeWithIncarnations) generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) - err := promoteHashedState(db2, 0, 100, getDataDir(), nil) + err := promoteHashedState(&StageState{}, db2, 0, 100, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) }