diff --git a/common/etl/etl.go b/common/etl/etl.go index d7320d1dbe800f82abfcb614480cffe1f9070a80..429af44d8f04bbd4921499a23de2713b4f1df7dd 100644 --- a/common/etl/etl.go +++ b/common/etl/etl.go @@ -203,8 +203,14 @@ func loadFilesIntoBucket(db ethdb.Database, bucket []byte, providers []dataProvi if bytes.Compare(k, args.LoadStartKey) < 0 { return nil } - if err := batch.Put(bucket, k, v); err != nil { - return err + if len(v) == 0 { + if err := batch.Delete(bucket, k); err != nil { + return err + } + } else { + if err := batch.Put(bucket, k, v); err != nil { + return err + } } batchSize := batch.BatchSize() if batchSize > batch.IdealBatchSize() || args.loadBatchSize > 0 && batchSize > args.loadBatchSize { diff --git a/eth/stagedsync/stage_hashcheck.go b/eth/stagedsync/stage_hashcheck.go index 3e534408ab7d5cdd3a49bf092a3e9dbb8193da49..66fdec544f6c7d33612a64713fabb106d8cacf5c 100644 --- a/eth/stagedsync/stage_hashcheck.go +++ b/eth/stagedsync/stage_hashcheck.go @@ -1,9 +1,18 @@ package stagedsync import ( + "bufio" + "bytes" + "container/heap" "fmt" + "io" + "io/ioutil" + "os" + "runtime" + "sort" "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/changeset" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/core" @@ -36,7 +45,7 @@ func SpawnCheckFinalHashStage(s *StageState, stateDB ethdb.Database, datadir str if core.UsePlainStateExecution { log.Info("Promoting plain state", "from", hashProgress, "to", syncHeadNumber) - err := promoteHashedState(stateDB, hashProgress, datadir, quit) + err := promoteHashedState(stateDB, hashProgress, syncHeadNumber, datadir, quit) if err != nil { return err } @@ -90,11 +99,11 @@ func unwindHashCheckStage(unwindPoint uint64, stateDB ethdb.Database) error { return nil } -func promoteHashedState(db ethdb.Database, progress uint64, datadir string, quit chan struct{}) error { - if progress == 0 { +func promoteHashedState(db ethdb.Database, from, to uint64, datadir string, quit chan struct{}) error { + if from == 0 { return promoteHashedStateCleanly(db, datadir, quit) } - return errors.New("incremental state promotion not implemented") + return promoteHashedStateIncrementally(from, to, db, datadir, quit) } func promoteHashedStateCleanly(db ethdb.Database, datadir string, quit chan struct{}) error { @@ -175,3 +184,232 @@ func transformContractCodeKey(key []byte) ([]byte, error) { compositeKey := dbutils.GenerateStoragePrefix(addrHash[:], incarnation) return compositeKey, nil } + +func keyTransformLoadFunc(k []byte, valueDecoder etl.Decoder, state etl.State, next etl.LoadNextFunc) error { + var v []byte + if err := valueDecoder.Decode(&v); err != nil { + return err + } + newK, err := transformPlainStateKey(k) + if err != nil { + return err + } + return next(newK, v) +} + +func NewPromoter(db ethdb.Database, quitCh chan struct{}) *Promoter { + return &Promoter{ + db: db, + ChangeSetBufSize: 256 * 1024 * 1024, + TempDir: os.TempDir(), + } +} + +type Promoter struct { + db ethdb.Database + ChangeSetBufSize uint64 + TempDir string + quitCh chan struct{} +} + +var promoterMapper = map[string]struct { + WalkerAdapter func(v []byte) changeset.Walker + KeySize int + Template string +}{ + string(dbutils.PlainAccountChangeSetBucket): { + WalkerAdapter: func(v []byte) changeset.Walker { + return changeset.AccountChangeSetPlainBytes(v) + }, + KeySize: common.AddressLength, + Template: "acc-prom-", + }, + string(dbutils.PlainStorageChangeSetBucket): { + WalkerAdapter: func(v []byte) changeset.Walker { + return changeset.StorageChangeSetPlainBytes(v) + }, + KeySize: common.AddressLength + common.IncarnationLength + common.HashLength, + Template: "st-prom-", + }, +} + +func (p *Promoter) fillChangeSetBuffer(bucket []byte, blockNum, to uint64, changesets []byte, offsets []int) (bool, uint64, []int, error) { + offset := 0 + offsets = offsets[:0] + startKey := dbutils.EncodeTimestamp(blockNum) + done := true + if err := p.db.Walk(bucket, startKey, 0, func(k, v []byte) (bool, error) { + if err := common.Stopped(p.quitCh); err != nil { + return false, err + } + blockNum, _ = dbutils.DecodeTimestamp(k) + if blockNum > to { + return false, nil + } + if offset+len(v) > len(changesets) { // Adding the current changeset would overflow the buffer + done = false + return false, nil + } + copy(changesets[offset:], v) + offset += len(v) + offsets = append(offsets, offset) + return true, nil + }); err != nil { + return true, blockNum, offsets, fmt.Errorf("walking over account changeset for block %d: %v", blockNum, err) + } + return done, blockNum, offsets, nil +} + +// writeBufferMapToTempFile creates temp file in the datadir and writes bufferMap into it +// if sucessful, returns the name of the created file. File is closed +func (p *Promoter) writeBufferMapToTempFile(pattern string, bufferMap map[string]struct{}) (string, error) { + var filename string + keys := make([]string, len(bufferMap)) + i := 0 + for key := range bufferMap { + keys[i] = key + i++ + } + sort.Strings(keys) + var w *bufio.Writer + if bufferFile, err := ioutil.TempFile(p.TempDir, pattern); err == nil { + //nolint:errcheck + defer bufferFile.Close() + filename = bufferFile.Name() + w = bufio.NewWriter(bufferFile) + } else { + return filename, fmt.Errorf("creating temp buf file %s: %v", pattern, err) + } + for _, key := range keys { + if _, err := w.Write([]byte(key)); err != nil { + return filename, err + } + } + if err := w.Flush(); err != nil { + return filename, fmt.Errorf("flushing file %s: %v", filename, err) + } + return filename, nil +} + +func (p *Promoter) mergeFilesAndCollect(bufferFileNames []string, keyLength int, collector *etl.Collector) error { + h := &etl.Heap{} + heap.Init(h) + readers := make([]io.Reader, len(bufferFileNames)) + for i, fileName := range bufferFileNames { + if f, err := os.Open(fileName); err == nil { + readers[i] = bufio.NewReader(f) + //nolint:errcheck + defer f.Close() + } else { + return err + } + // Read first key + keyBuf := make([]byte, keyLength) + if n, err := io.ReadFull(readers[i], keyBuf); err == nil && n == keyLength { + heap.Push(h, etl.HeapElem{keyBuf, i, nil}) + } else { + return fmt.Errorf("init reading from account buffer file: %d %x %v", n, keyBuf[:n], err) + } + } + // By now, the heap has one element for each buffer file + var prevKey []byte + for h.Len() > 0 { + if err := common.Stopped(p.quitCh); err != nil { + return err + } + element := (heap.Pop(h)).(etl.HeapElem) + if !bytes.Equal(element.Key, prevKey) { + // Ignore all the repeating keys + prevKey = common.CopyBytes(element.Key) + if v, err := p.db.Get(dbutils.PlainStateBucket, element.Key); err == nil || err == ethdb.ErrKeyNotFound { + if err1 := collector.Collect(element.Key, v); err1 != nil { + return err1 + } + } else { + return err + } + } + reader := readers[element.TimeIdx] + // Try to read the next key (reuse the element) + if n, err := io.ReadFull(reader, element.Key); err == nil && n == keyLength { + heap.Push(h, element) + } else if err != io.EOF { + // If it is EOF, we simply do not return anything into the heap + return fmt.Errorf("next reading from account buffer file: %d %x %v", n, element.Key[:n], err) + } + } + return nil +} + +func (p *Promoter) Promote(from, to uint64, changeSetBucket []byte) error { + v, ok := promoterMapper[string(changeSetBucket)] + if !ok { + return fmt.Errorf("unknown bucket type: %s", changeSetBucket) + } + log.Info("Incremental promotion started", "from", from, "to", to, "csbucket", string(changeSetBucket)) + var m runtime.MemStats + var bufferFileNames []string + changesets := make([]byte, p.ChangeSetBufSize) // 256 Mb buffer by default + var offsets []int + var done = false + blockNum := from + 1 + for !done { + if newDone, newBlockNum, newOffsets, err := p.fillChangeSetBuffer(changeSetBucket, blockNum, to, changesets, offsets); err == nil { + done = newDone + blockNum = newBlockNum + offsets = newOffsets + } else { + return err + } + if len(offsets) == 0 { + break + } + + bufferMap := make(map[string]struct{}) + prevOffset := 0 + for _, offset := range offsets { + if err := v.WalkerAdapter(changesets[prevOffset:offset]).Walk(func(k, v []byte) error { + bufferMap[string(k)] = struct{}{} + return nil + }); err != nil { + return err + } + prevOffset = offset + } + + if filename, err := p.writeBufferMapToTempFile(v.Template, bufferMap); err == nil { + defer func() { + //nolint:errcheck + os.Remove(filename) + }() + bufferFileNames = append(bufferFileNames, filename) + runtime.ReadMemStats(&m) + log.Info("Created a buffer file", "name", filename, "up to block", blockNum, + "alloc", int(m.Alloc/1024), "sys", int(m.Sys/1024), "numGC", int(m.NumGC)) + } else { + return err + } + } + if len(offsets) > 0 { + collector := etl.NewCollector(p.TempDir) + if err := p.mergeFilesAndCollect(bufferFileNames, v.KeySize, collector); err != nil { + return err + } + if err := collector.Load(p.db, dbutils.CurrentStateBucket, keyTransformLoadFunc, etl.TransformArgs{Quit: p.quitCh}); err != nil { + return err + } + } + return nil +} + +func promoteHashedStateIncrementally(from, to uint64, db ethdb.Database, datadir string, quit chan struct{}) error { + prom := NewPromoter(db, quit) + prom.TempDir = datadir + if err := prom.Promote(from, to, dbutils.PlainAccountChangeSetBucket); err != nil { + return err + } + if err := prom.Promote(from, to, dbutils.PlainStorageChangeSetBucket); err != nil { + return err + } + return nil +} diff --git a/eth/stagedsync/stage_hashcheck_test.go b/eth/stagedsync/stage_hashcheck_test.go index 5fe429a4fa7aee1aa93ac91818d0525a0838991f..cd36ad444b48704a624a973cb799e0565bdc7e07 100644 --- a/eth/stagedsync/stage_hashcheck_test.go +++ b/eth/stagedsync/stage_hashcheck_test.go @@ -25,7 +25,7 @@ func TestPromoteHashedStateClearState(t *testing.T) { generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 0, getDataDir(), nil) + err := promoteHashedState(m2, 0, 50, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -38,7 +38,6 @@ func TestPromoteHashedStateClearState(t *testing.T) { } func TestPromoteHashedStateIncremental(t *testing.T) { - t.Skip("not implemented yet") db1 := ethdb.NewMemDatabase() db2 := ethdb.NewMemDatabase() @@ -46,7 +45,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 1, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 0, getDataDir(), nil) + err := promoteHashedState(m2, 0, 50, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -59,7 +58,7 @@ func TestPromoteHashedStateIncremental(t *testing.T) { generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 = db2.NewBatch() - err = promoteHashedState(m2, 50, getDataDir(), nil) + err = promoteHashedState(m2, 50, 101, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -68,20 +67,19 @@ func TestPromoteHashedStateIncremental(t *testing.T) { t.Errorf("error while commiting state: %v", err) } - compareCurrentState(t, db1, db2, dbutils.CurrentStateBucket, dbutils.ContractCodeBucket) + compareCurrentState(t, db1, db2, dbutils.CurrentStateBucket) } func TestPromoteHashedStateIncrementalMixed(t *testing.T) { - t.Skip("not implemented yet") db1 := ethdb.NewMemDatabase() db2 := ethdb.NewMemDatabase() generateBlocks(t, 1, 100, hashedWriterGen(db1), changeCodeWithIncarnations) - generateBlocks(t, 1, 50, hashedWriterGen(db1), changeCodeWithIncarnations) + generateBlocks(t, 1, 50, hashedWriterGen(db2), changeCodeWithIncarnations) generateBlocks(t, 51, 50, plainWriterGen(db2), changeCodeWithIncarnations) m2 := db2.NewBatch() - err := promoteHashedState(m2, 50, getDataDir(), nil) + err := promoteHashedState(m2, 50, 101, getDataDir(), nil) if err != nil { t.Errorf("error while promoting state: %v", err) } @@ -91,5 +89,5 @@ func TestPromoteHashedStateIncrementalMixed(t *testing.T) { t.Errorf("error while commiting state: %v", err) } - compareCurrentState(t, db1, db2, dbutils.CurrentStateBucket, dbutils.ContractCodeBucket) + compareCurrentState(t, db1, db2, dbutils.CurrentStateBucket) } diff --git a/eth/stagedsync/testutil.go b/eth/stagedsync/testutil.go index 9a24a68bbb223aab9e582c2dd457484c32102882..f08546de4408bf6cd5b21c2410069149fe462c37 100644 --- a/eth/stagedsync/testutil.go +++ b/eth/stagedsync/testutil.go @@ -81,7 +81,7 @@ func generateBlocks(t *testing.T, from uint64, numberOfBlocks uint64, stateWrite } ctx := context.Background() - for blockNumber := from; blockNumber < from+numberOfBlocks; blockNumber++ { + for blockNumber := uint64(1); blockNumber < from+numberOfBlocks; blockNumber++ { updateIncarnation := difficulty != staticCodeStaticIncarnations && blockNumber%10 == 0 blockWriter := stateWriterGen(blockNumber) @@ -95,17 +95,20 @@ func generateBlocks(t *testing.T, from uint64, numberOfBlocks uint64, stateWrite } if blockNumber == 1 && newAcc.Incarnation > 0 { - err := blockWriter.CreateContract(addr) - if err != nil { - t.Fatal(err) + if blockNumber >= from { + if err := blockWriter.CreateContract(addr); err != nil { + t.Fatal(err) + } } } if blockNumber == 1 || updateIncarnation || difficulty == changeCodeIndepenentlyOfIncarnations { if newAcc.Incarnation > 0 { code := []byte(fmt.Sprintf("acc-code-%v", blockNumber)) codeHash, _ := common.HashData(code) - if err := blockWriter.UpdateAccountCode(addr, newAcc.Incarnation, codeHash, code); err != nil { - t.Fatal(err) + if blockNumber >= from { + if err := blockWriter.UpdateAccountCode(addr, newAcc.Incarnation, codeHash, code); err != nil { + t.Fatal(err) + } } newAcc.CodeHash = codeHash } @@ -116,15 +119,21 @@ func generateBlocks(t *testing.T, from uint64, numberOfBlocks uint64, stateWrite newValue.SetOne() var location common.Hash location.SetBytes(big.NewInt(int64(blockNumber)).Bytes()) - if err := blockWriter.WriteAccountStorage(ctx, addr, newAcc.Incarnation, &location, &oldValue, &newValue); err != nil { - t.Fatal(err) + if blockNumber >= from { + if err := blockWriter.WriteAccountStorage(ctx, addr, newAcc.Incarnation, &location, &oldValue, &newValue); err != nil { + t.Fatal(err) + } } } - if err := blockWriter.UpdateAccountData(ctx, addr, oldAcc /* original */, newAcc /* new account */); err != nil { - t.Fatal(err) + if blockNumber >= from { + if err := blockWriter.UpdateAccountData(ctx, addr, oldAcc /* original */, newAcc /* new account */); err != nil { + t.Fatal(err) + } } - if err := blockWriter.WriteChangeSets(); err != nil { - t.Fatal(err) + if blockNumber >= from { + if err := blockWriter.WriteChangeSets(); err != nil { + t.Fatal(err) + } } testAccounts[i] = newAcc }