good morning!!!!

Skip to content
Snippets Groups Projects
Unverified Commit 80c2fc6a authored by Evgeny Danilenko's avatar Evgeny Danilenko Committed by GitHub
Browse files

Remove stacked defers (#2149)


* rewrite everything

* remove stacked defers

* debug

* fix panic on nil tx

* use db.View

* panic

* block based progress

* errors

* - assume that batches are big  enough to not worry about saving carefully logTime
- use only 1 digit as progress indicator
- log only if processing > 16 blocks

* - update metric when printing logs

* remove excluded

Co-authored-by: default avataralex.sharov <AskAlexSharov@gmail.com>
parent 24a76a00
No related branches found
No related tags found
No related merge requests found
......@@ -46,75 +46,134 @@ func StageTranspileCfg(
}
}
func transpileBatch(logPrefix string, s *StageState, fromBlock uint64, toBlock uint64, tx ethdb.RwTx, batch ethdb.DbWithPendingMutations, cfg TranspileCfg, useExternalTx bool, quitCh <-chan struct{}) error {
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
func SpawnTranspileStage(s *StageState, tx ethdb.RwTx, toBlock uint64, quit <-chan struct{}, cfg TranspileCfg) error {
var prevStageProgress uint64
var errStart error
if tx == nil {
errStart = cfg.db.View(context.Background(), func(tx ethdb.Tx) error {
prevStageProgress, errStart = stages.GetStageProgress(tx, stages.Execution)
return errStart
})
} else {
prevStageProgress, errStart = stages.GetStageProgress(tx, stages.Execution)
}
if errStart != nil {
return errStart
}
var to = prevStageProgress
if toBlock > 0 {
to = min(prevStageProgress, toBlock)
}
if to <= s.BlockNumber {
s.Done()
return nil
}
stageProgress := uint64(0)
logBlock := stageProgress
logTime := time.Now()
logPrefix := s.state.LogPrefix()
if to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Contract translation", logPrefix), "from", s.BlockNumber, "to", to)
}
empty := common.Address{}
observedAddresses := map[common.Address]struct{}{
empty: {},
}
observedCodeHashes := map[common.Hash]struct{}{}
var err error
for stageProgress <= toBlock {
stageProgress, err = transpileBatch(logPrefix, stageProgress, to, cfg, tx, observedAddresses, observedCodeHashes, quit)
if err != nil {
return fmt.Errorf("[%s] %w", logPrefix, err)
}
}
s.Done()
if to > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Completed on", logPrefix), "block", toBlock)
}
return nil
}
func transpileBatch(logPrefix string, stageProgress, toBlock uint64, cfg TranspileCfg, tx ethdb.RwTx, observedAddresses map[common.Address]struct{}, observedCodeHashes map[common.Hash]struct{}, quitCh <-chan struct{}) (uint64, error) {
useExternalTx := tx != nil
var err error
if !useExternalTx {
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return 0, err
}
defer tx.Rollback()
}
batch := kv.NewBatch(tx)
defer batch.Rollback()
// read contracts pending for translation
keyStart := dbutils.EncodeBlockNumber(fromBlock + 1)
c, err := tx.CursorDupSort(dbutils.CallTraceSet)
if err != nil {
return err
return 0, err
}
defer c.Close()
logTime := time.Now()
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
stateReader := state.NewPlainStateReader(batch)
var (
codeHash common.Hash
codeHashBytes []byte
addr common.Address
addrBytes []byte
acc *accounts.Account
block uint64
evmContract []byte
transpiledCode []byte
ok bool
)
stateReader := state.NewPlainStateReader(batch)
excludedAddress := common.Address{}
excludedAddress[len(excludedAddress)-1] = 1
empty := common.Address{}
observedAddresses := map[common.Address]struct{}{
empty: {},
excludedAddress: {},
}
observedCodeHashes := map[common.Hash]struct{}{}
prevContract := stageProgress
blockKey := dbutils.EncodeBlockNumber(stageProgress)
for k, addrStatus, err := c.Seek(keyStart); k != nil; k, addrStatus, err = c.Next() {
var addressStatus []byte
for blockKey, addressStatus, err = c.SeekExact(blockKey); blockKey != nil; blockKey, addressStatus, err = c.Next() {
if err != nil {
return fmt.Errorf("can't read pending code translations: %w", err)
}
if err = common.Stopped(quitCh); err != nil {
return fmt.Errorf("can't read pending code translations: %w", err)
return 0, fmt.Errorf("can't read pending code translations: %w", err)
}
select {
case <-quitCh:
return 0, common.ErrStopped
case <-logEvery.C:
logBlock, logTime = logTEVMProgress(logPrefix, logBlock, logTime, stageProgress)
prevContract, logTime = logTEVMProgress(logPrefix, prevContract, logTime, stageProgress)
tx.CollectMetrics()
stageTranspileGauge.Inc(int64(stageProgress))
default:
}
block, err = dbutils.DecodeBlockNumber(k)
stageProgress, err = dbutils.DecodeBlockNumber(blockKey)
if err != nil {
return fmt.Errorf("can't read pending code translations: %w", err)
return 0, fmt.Errorf("can't read pending code translations. incorrect block key: %w", err)
}
if block > toBlock {
if stageProgress > toBlock {
break
}
if addrStatus[len(addrStatus)-1]&4 == 0 {
if addressStatus[len(addressStatus)-1]&4 == 0 {
continue
}
addrBytes = addrStatus[:len(addrStatus)-1]
addrBytes = addressStatus[:len(addressStatus)-1]
addr = common.BytesToAddress(addrBytes)
_, ok = observedAddresses[addr]
......@@ -128,7 +187,7 @@ func transpileBatch(logPrefix string, s *StageState, fromBlock uint64, toBlock u
if errors.Is(err, ethdb.ErrKeyNotFound) {
continue
}
return fmt.Errorf("can't read account by address %q: %w", addr, err)
return 0, fmt.Errorf("can't read account by address %q: %w", addr, err)
}
if acc == nil {
continue
......@@ -149,7 +208,7 @@ func transpileBatch(logPrefix string, s *StageState, fromBlock uint64, toBlock u
// check if we already have TEVM code
ok, err = batch.Has(dbutils.ContractTEVMCodeBucket, codeHashBytes)
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
return fmt.Errorf("can't read code TEVM bucket by contract hash %q: %w", codeHash, err)
return 0, fmt.Errorf("can't read code TEVM bucket by contract hash %q: %w", codeHash, err)
}
if ok && err == nil {
// already has TEVM code
......@@ -162,7 +221,7 @@ func transpileBatch(logPrefix string, s *StageState, fromBlock uint64, toBlock u
if errors.Is(err, ethdb.ErrKeyNotFound) {
continue
}
return fmt.Errorf("can't read pending code translations: %w", err)
return 0, fmt.Errorf("can't read pending code translations. incorrect code hash in the bucket: %w", err)
}
if len(evmContract) == 0 {
continue
......@@ -175,57 +234,31 @@ func transpileBatch(logPrefix string, s *StageState, fromBlock uint64, toBlock u
log.Warn("cannot find EVM contract", "address", addr, "hash", codeHash)
continue
}
return fmt.Errorf("contract %q cannot be translated: %w", codeHash, err)
return 0, fmt.Errorf("contract %q cannot be translated: %w", codeHash, err)
}
// store TEVM contract code
err = batch.Put(dbutils.ContractTEVMCodeBucket, codeHashBytes, transpiledCode)
if err != nil {
return fmt.Errorf("cannot store TEVM code %q: %w", codeHash, err)
return 0, fmt.Errorf("cannot store TEVM code %q: %w", codeHash, err)
}
stageProgress++
currentSize := batch.BatchSize()
updateProgress := currentSize >= int(cfg.batchSize)
if updateProgress {
if err = batch.Commit(); err != nil {
return fmt.Errorf("cannot commit the batch of translations on %q: %w", codeHash, err)
}
if !useExternalTx {
if err = s.Update(tx, stageProgress); err != nil {
return fmt.Errorf("cannot update the stage status on %q: %w", codeHash, err)
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("cannot commit the external transation on %q: %w", codeHash, err)
}
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return fmt.Errorf("cannot begin the batch transaction on %q: %w", codeHash, err)
}
//k, hash = common.CopyBytes(k), common.CopyBytes(hash)
//_, err = c.SeekBothRange(k, hash)
//if err != nil {
// return err
//}
// TODO: This creates stacked up deferrals
defer tx.Rollback()
}
if batch.BatchSize() >= int(cfg.batchSize) {
break // limit RAM usage. Break to commit batch
}
}
batch = kv.NewBatch(tx)
// TODO: This creates stacked up deferrals
defer batch.Rollback()
if err = batch.Commit(); err != nil {
return 0, fmt.Errorf("cannot commit the batch of translations on %q: %w", codeHash, err)
}
stageTranspileGauge.Inc(int64(currentSize))
if !useExternalTx {
if err = tx.Commit(); err != nil {
return 0, fmt.Errorf("cannot commit the external transation on %q: %w", codeHash, err)
}
}
log.Info(fmt.Sprintf("[%s] Completed on", logPrefix), "block", toBlock, "contracts", stageProgress)
return nil
return stageProgress, nil
}
func logTEVMProgress(logPrefix string, prevContract uint64, prevTime time.Time, currentContract uint64) (uint64, time.Time) {
......@@ -236,78 +269,14 @@ func logTEVMProgress(logPrefix string, prevContract uint64, prevTime time.Time,
runtime.ReadMemStats(&m)
var logpairs = []interface{}{
"number", currentContract,
"contracts/second", speed,
"contracts/s", speed,
}
logpairs = append(logpairs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
logpairs = append(logpairs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
log.Info(fmt.Sprintf("[%s] Translated contracts", logPrefix), logpairs...)
return currentContract, currentTime
}
func SpawnTranspileStage(s *StageState, tx ethdb.RwTx, toBlock uint64, quit <-chan struct{}, cfg TranspileCfg) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = cfg.db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
}
prevStageProgress, errStart := stages.GetStageProgress(tx, stages.Execution)
if errStart != nil {
return errStart
}
var to = prevStageProgress
if toBlock > 0 {
to = min(prevStageProgress, toBlock)
}
if to <= s.BlockNumber {
s.Done()
return nil
}
logPrefix := s.state.LogPrefix()
log.Info(fmt.Sprintf("[%s] Contract translation", logPrefix), "from", s.BlockNumber, "to", to)
batch := kv.NewBatch(tx)
defer batch.Rollback()
err := common.Stopped(quit)
if err != nil {
return err
}
if err = transpileBatch(logPrefix, s, s.BlockNumber, to, tx, batch, cfg, useExternalTx, quit); err != nil {
return err
}
currentSize := batch.BatchSize()
// commit the same number as execution
if err := s.Update(batch, prevStageProgress); err != nil {
return err
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err)
}
if !useExternalTx {
if err := tx.Commit(); err != nil {
return err
}
}
stageTranspileGauge.Inc(int64(currentSize))
s.Done()
return nil
}
func UnwindTranspileStage(u *UnwindState, s *StageState, tx ethdb.RwTx, _ <-chan struct{}, cfg TranspileCfg) error {
useExternalTx := tx != nil
if !useExternalTx {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment