From 40230d5ab5531781958920ab1cdf62b3ff9f58db Mon Sep 17 00:00:00 2001 From: Giulio rebuffo <giulio.rebuffo@gmail.com> Date: Wed, 1 Jun 2022 23:48:42 +0200 Subject: [PATCH] Refactor mining batch (#4322) * refactooor * updated ethdb * more fixing * more fixing * more * refactoor * fixed mdbx crash --- ethdb/olddb/miningmutation.go | 163 +++++-------- ethdb/olddb/miningmutationcursor.go | 346 +++++++++------------------- turbo/stages/stageloop.go | 5 +- 3 files changed, 168 insertions(+), 346 deletions(-) diff --git a/ethdb/olddb/miningmutation.go b/ethdb/olddb/miningmutation.go index edf8d9f7c4..7581ac1f79 100644 --- a/ethdb/olddb/miningmutation.go +++ b/ethdb/olddb/miningmutation.go @@ -1,30 +1,24 @@ package olddb import ( - "bytes" "context" "encoding/binary" - "sort" - "sync" - "unsafe" + + "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/ethdb" ) type miningmutation struct { // Bucket => Key => Value - puts map[string]map[string][]byte - dupsortPuts map[string]map[string][][]byte - - dupsortTables map[string]struct{} - clearedTables map[string]struct{} - - ignoreDbEntries map[string]map[string]struct{} - - db kv.Tx - mu sync.RWMutex + memTx kv.RwTx + memDb kv.RwDB + deletedEntries map[string]map[string]struct{} + clearedTables map[string]struct{} + dupsortTables map[string]struct{} + db kv.Tx } // NewBatch - starts in-mem batch @@ -36,16 +30,22 @@ type miningmutation struct { // ... some calculations on `batch` // batch.Commit() func NewMiningBatch(tx kv.Tx) *miningmutation { + tmpDB := mdbx.NewMDBX(log.New()).InMem().MustOpen() + memTx, err := tmpDB.BeginRw(context.Background()) + if err != nil { + panic(err) + } return &miningmutation{ - db: tx, - puts: make(map[string]map[string][]byte), - dupsortPuts: make(map[string]map[string][][]byte), + db: tx, + memDb: tmpDB, + memTx: memTx, + deletedEntries: make(map[string]map[string]struct{}), + clearedTables: make(map[string]struct{}), dupsortTables: map[string]struct{}{ kv.AccountChangeSet: {}, kv.StorageChangeSet: {}, + kv.HashedStorage: {}, }, - clearedTables: make(map[string]struct{}), - ignoreDbEntries: make(map[string]map[string]struct{}), } } @@ -56,28 +56,27 @@ func (m *miningmutation) RwKV() kv.RwDB { return nil } -func (m *miningmutation) isDupsortedTable(table string) bool { - _, ok := m.dupsortTables[table] +func (m *miningmutation) isTableCleared(table string) bool { + _, ok := m.clearedTables[table] return ok } -func (m *miningmutation) ignoreDb(table string, key []byte) bool { - _, ok := m.ignoreDbEntries[table][string(key)] +func (m *miningmutation) isEntryDeleted(table string, key []byte) bool { + _, ok := m.deletedEntries[table] + if !ok { + return ok + } + _, ok = m.deletedEntries[table][string(key)] return ok } // getMem Retrieve database entry from memory (hashed storage will be left out for now because it is the only non auto-DupSorted table) func (m *miningmutation) getMem(table string, key []byte) ([]byte, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - if _, ok := m.puts[table]; !ok { - return nil, false - } - if value, ok := m.puts[table][*(*string)(unsafe.Pointer(&key))]; ok { - return value, ok + val, err := m.memTx.GetOne(table, key) + if err != nil { + panic(err) } - - return nil, false + return val, val != nil } func (m *miningmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { @@ -127,7 +126,7 @@ func (m *miningmutation) GetOne(table string, key []byte) ([]byte, error) { } return value, nil } - if m.db != nil && !m.isBucketCleared(table) && !m.ignoreDb(table, key) { + if m.db != nil && !m.isTableCleared(table) && !m.isEntryDeleted(table, key) { // TODO: simplify when tx can no longer be parent of mutation value, err := m.db.GetOne(table, key) if err != nil { @@ -169,34 +168,7 @@ func (m *miningmutation) Has(table string, key []byte) (bool, error) { // Put insert a new entry in the database, if it is hashed storage it will add it to a slice instead of a map. func (m *miningmutation) Put(table string, key []byte, value []byte) error { - m.mu.Lock() - defer m.mu.Unlock() - if value == nil { - if _, ok := m.ignoreDbEntries[table][string(key)]; !ok { - m.ignoreDbEntries[table] = make(map[string]struct{}) - } - m.ignoreDbEntries[table][string(key)] = struct{}{} - } - dupsort := m.isDupsortedTable(table) - if _, ok := m.puts[table]; !ok && !dupsort { - m.puts[table] = make(map[string][]byte) - } - - cValue := common.CopyBytes(value) - stringKey := string(key) - - if dupsort { - if _, ok := m.dupsortPuts[table]; !ok { - m.dupsortPuts[table] = make(map[string][][]byte) - } - if _, ok := m.dupsortPuts[table][stringKey]; !ok { - m.dupsortPuts[table][stringKey] = [][]byte{} - } - m.dupsortPuts[table][stringKey] = append(m.dupsortPuts[table][stringKey], cValue) - return nil - } - m.puts[table][stringKey] = cValue - return nil + return m.memTx.Put(table, key, value) } func (m *miningmutation) Append(table string, key []byte, value []byte) error { @@ -208,8 +180,6 @@ func (m *miningmutation) AppendDup(table string, key []byte, value []byte) error } func (m *miningmutation) BatchSize() int { - m.mu.RLock() - defer m.mu.RUnlock() return 0 } @@ -229,7 +199,11 @@ func (m *miningmutation) ForAmount(bucket string, prefix []byte, amount uint32, } func (m *miningmutation) Delete(table string, k, v []byte) error { - return m.Put(table, k, nil) + if _, ok := m.deletedEntries[table]; !ok { + m.deletedEntries[table] = make(map[string]struct{}) + } + m.deletedEntries[table][string(k)] = struct{}{} + return m.memTx.Delete(table, k, v) } func (m *miningmutation) Commit() error { @@ -237,9 +211,9 @@ func (m *miningmutation) Commit() error { } func (m *miningmutation) Rollback() { - m.mu.Lock() - defer m.mu.Unlock() - m.puts = map[string]map[string][]byte{} + m.memTx.Rollback() + m.memDb.Close() + return } func (m *miningmutation) Close() { @@ -261,7 +235,7 @@ func (m *miningmutation) SetRwKV(kv kv.RwDB) { } func (m *miningmutation) BucketSize(bucket string) (uint64, error) { - panic("Not implemented") + return 0, nil } func (m *miningmutation) DropBucket(bucket string) error { @@ -278,9 +252,7 @@ func (m *miningmutation) ListBuckets() ([]string, error) { func (m *miningmutation) ClearBucket(bucket string) error { m.clearedTables[bucket] = struct{}{} - m.puts[bucket] = make(map[string][]byte) - m.dupsortPuts[bucket] = make(map[string][][]byte) - return nil + return m.memTx.ClearBucket(bucket) } func (m *miningmutation) isBucketCleared(bucket string) bool { @@ -299,48 +271,23 @@ func (m *miningmutation) CreateBucket(bucket string) error { func (m *miningmutation) makeCursor(bucket string) (kv.RwCursorDupSort, error) { c := &miningmutationcursor{} // We can filter duplicates in dup sorted table - filterMap := make(map[string]struct{}) c.table = bucket + var err error - if bucket == kv.HashedStorage { - for key, value := range m.puts[bucket] { - byteKey := []byte(key) - c.pairs = append(c.pairs, cursorentry{byteKey[:40], append(byteKey[40:], value...)}) - } - c.dupCursor, err = m.db.CursorDupSort(bucket) - c.cursor = c.dupCursor - } else if !m.isDupsortedTable(bucket) { - for key, value := range m.puts[bucket] { - c.pairs = append(c.pairs, cursorentry{[]byte(key), value}) - } - if !m.isBucketCleared(bucket) { - c.cursor, err = m.db.Cursor(bucket) - } - } else { - if !m.isBucketCleared(bucket) { - c.dupCursor, err = m.db.CursorDupSort(bucket) - c.cursor = c.dupCursor - } - for key, values := range m.dupsortPuts[bucket] { - for _, value := range values { - var dbValue []byte - if !m.isBucketCleared(bucket) { - dbValue, err = c.dupCursor.SeekBothRange([]byte(key), value) - if err != nil { - return nil, err - } - } - if _, ok := filterMap[string(append([]byte(key), value...))]; bytes.Compare(dbValue, value) != 0 && !ok { - c.pairs = append(c.pairs, cursorentry{[]byte(key), value}) - } - filterMap[string(append([]byte(key), value...))] = struct{}{} - } - } + // Initialize db cursors + c.dupCursor, err = m.db.CursorDupSort(bucket) + if err != nil { + return nil, err } + c.cursor = c.dupCursor + // Initialize memory cursors + c.memDupCursor, err = m.memTx.RwCursorDupSort(bucket) if err != nil { return nil, err } - sort.Sort(c.pairs) + _, isDupsort := m.dupsortTables[bucket] + c.isDupsort = isDupsort + c.memCursor = c.memDupCursor c.mutation = m return c, err } diff --git a/ethdb/olddb/miningmutationcursor.go b/ethdb/olddb/miningmutationcursor.go index b6b57275af..2d1db38c8f 100644 --- a/ethdb/olddb/miningmutationcursor.go +++ b/ethdb/olddb/miningmutationcursor.go @@ -13,92 +13,32 @@ type cursorentry struct { value []byte } -func compareEntries(a, b cursorentry) bool { - if bytes.Compare(a.key, b.key) == 0 { - return bytes.Compare(a.value, b.value) < 0 - } - return bytes.Compare(a.key, b.key) < 0 -} - -type cursorentries []cursorentry - -func (cur cursorentries) Less(i, j int) bool { - return compareEntries(cur[i], cur[j]) -} - -func (cur cursorentries) Len() int { - return len(cur) -} - -func (cur cursorentries) Swap(i, j int) { - cur[j], cur[i] = cur[i], cur[j] -} - // cursor type miningmutationcursor struct { - // sorted slice of the entries in the bucket we iterate on. - pairs cursorentries // we can keep one cursor type if we store 2 of each kind. cursor kv.Cursor dupCursor kv.CursorDupSort + // Mem cursors + memCursor kv.RwCursor + memDupCursor kv.RwCursorDupSort // we keep the index in the slice of pairs we are at. - current int isPrevFromDb bool - // current cursor entry - currentPair cursorentry + // Flag for dupsort mode + isDupsort bool + // entry history + currentPair cursorentry + currentDbEntry cursorentry + currentMemEntry cursorentry // we keep the mining mutation so that we can insert new elements in db mutation *miningmutation table string } -func (m *miningmutationcursor) endOfNextDb() (bool, error) { - dbCurrK, dbCurrV, _ := m.cursor.Current() - lastK, lastV, err := m.cursor.Last() - if err != nil { - return false, err - } - if m.table == kv.HashedStorage && len(dbCurrK) == 72 { - dbCurrV = append(dbCurrK[40:], dbCurrV...) - dbCurrK = dbCurrK[:40] - } - - if m.table == kv.HashedStorage && len(lastK) == 72 { - lastV = append(lastK[40:], lastV...) - lastK = lastK[:40] - } - currK, currV, _ := m.Current() - if m.dupCursor != nil { - _, err = m.dupCursor.SeekBothRange(dbCurrK, dbCurrV) - } else { - _, _, err = m.cursor.Seek(dbCurrK) - } - if err != nil { - return false, err - } - - if bytes.Compare(lastK, currK) == 0 { - return bytes.Compare(lastV, currV) <= 0, nil - } - return bytes.Compare(lastK, currK) <= 0, nil -} - -func (m *miningmutationcursor) convertToHashedStoraFormat(k []byte, v []byte) ([]byte, []byte, error) { - if len(k) == 72 && m.table == kv.HashedStorage { - return k[:40], append(k[40:], v...), nil - } - return k, v, nil -} - -func (m miningmutationcursor) isDupsortedEnabled() bool { - return m.dupCursor != nil -} - // First move cursor to first position and return key and value accordingly. func (m *miningmutationcursor) First() ([]byte, []byte, error) { - m.current = 0 - - if m.cursor == nil { - return m.goForward(nil, nil) + memKey, memValue, err := m.memCursor.First() + if err != nil { + return nil, nil, err } dbKey, dbValue, err := m.cursor.First() @@ -106,7 +46,7 @@ func (m *miningmutationcursor) First() ([]byte, []byte, error) { return nil, nil, err } - return m.goForward(dbKey, dbValue) + return m.goForward(memKey, memValue, dbKey, dbValue) } // Current return the current key and values the cursor is on. @@ -114,203 +54,144 @@ func (m *miningmutationcursor) Current() ([]byte, []byte, error) { return common.CopyBytes(m.currentPair.key), common.CopyBytes(m.currentPair.value), nil } -func (m *miningmutationcursor) goForward(dbKey, dbValue []byte) ([]byte, []byte, error) { - // is this db less than memory? - if m.current > m.pairs.Len()-1 { - m.isPrevFromDb = true - m.currentPair = cursorentry{dbKey, dbValue} - return dbKey, dbValue, nil - } +func (m *miningmutationcursor) goForward(memKey, memValue, dbKey, dbValue []byte) ([]byte, []byte, error) { var err error - if !m.isDupsortedEnabled() && bytes.Compare(dbKey, m.pairs[m.current].key) == 0 { - if dbKey, dbValue, err = m.cursor.Next(); err != nil { - return nil, nil, err - } - } else if m.isDupsortedEnabled() && bytes.Compare(dbValue, m.pairs[m.current].value) == 0 { - if dbKey, dbValue, err = m.dupCursor.NextDup(); err != nil { - return nil, nil, err - } - } else if m.table == kv.HashedStorage && len(dbKey) == 40 && len(dbValue) >= 32 && - bytes.Compare(append(dbKey, dbValue[:32]...), append(m.pairs[m.current].key, m.pairs[m.current].value[:32]...)) == 0 { - if dbKey, dbValue, err = m.dupCursor.NextDup(); err != nil { - return nil, nil, err + if memValue == nil && dbValue == nil { + return nil, nil, nil + } + // Check for duplicates + if bytes.Compare(memKey, dbKey) == 0 { + if !m.isDupsort { + if dbKey, dbValue, err = m.cursor.Next(); err != nil { + return nil, nil, err + } + } else if bytes.Compare(memValue, dbValue) == 0 { + if dbKey, dbValue, err = m.dupCursor.NextDup(); err != nil { + return nil, nil, err + } + } else if len(memValue) >= 32 && len(dbValue) >= 32 && m.table == kv.HashedStorage && bytes.Compare(memValue[:32], dbValue[:32]) == 0 { + if dbKey, dbValue, err = m.dupCursor.NextDup(); err != nil { + return nil, nil, err + } } } - - if dbKey != nil && dbValue != nil && compareEntries(cursorentry{dbKey, dbValue}, m.pairs[m.current]) { - m.isPrevFromDb = true + m.currentDbEntry = cursorentry{dbKey, dbValue} + m.currentMemEntry = cursorentry{memKey, memValue} + // compare entries + if bytes.Compare(memKey, dbKey) == 0 { + m.isPrevFromDb = dbValue != nil && (memValue == nil || bytes.Compare(memValue, dbValue) > 0) + } else { + m.isPrevFromDb = dbValue != nil && (memKey == nil || bytes.Compare(memKey, dbKey) > 0) + } + if dbValue == nil { + m.currentDbEntry = cursorentry{} + } + if memValue == nil { + m.currentMemEntry = cursorentry{} + } + if m.isPrevFromDb { m.currentPair = cursorentry{dbKey, dbValue} return dbKey, dbValue, nil } - m.isPrevFromDb = false - m.currentPair = cursorentry{common.CopyBytes(m.pairs[m.current].key), common.CopyBytes(m.pairs[m.current].value)} - // return current - return common.CopyBytes(m.pairs[m.current].key), common.CopyBytes(m.pairs[m.current].value), nil + + m.currentPair = cursorentry{memKey, memValue} + return memKey, memValue, nil } -func (m *miningmutationcursor) goNext(nextDup bool) ([]byte, []byte, error) { - if m.cursor == nil { - return m.goForward(nil, nil) - } - var err error - if m.pairs.Len()-1 < m.current { - var nextK, nextV []byte - if m.isPrevFromDb { - if nextDup { - nextK, nextV, err = m.dupCursor.NextDup() - } else { - nextK, nextV, err = m.cursor.Next() - } - } else { - nextK, nextV, err = m.cursor.Current() - if m.table == kv.HashedStorage && len(nextK) == 72 { - nextK = nextK[:40] - nextV = append(nextK[40:], nextV...) - } - } +// Next returns the next element of the mutation. +func (m *miningmutationcursor) Next() ([]byte, []byte, error) { + if m.isPrevFromDb { + k, v, err := m.cursor.Next() if err != nil { return nil, nil, err } - - if nextK != nil { - m.currentPair = cursorentry{nextK, nextV} - } - m.isPrevFromDb = true - return nextK, nextV, nil + return m.goForward(m.currentMemEntry.key, m.currentMemEntry.value, k, v) } - isEndDb, err := m.endOfNextDb() + memK, memV, err := m.memCursor.Next() if err != nil { return nil, nil, err } - if isEndDb { - if !m.isPrevFromDb { - m.current++ - } - m.isPrevFromDb = false - if m.current > m.pairs.Len()-1 { - return nil, nil, nil - } - m.currentPair = cursorentry{common.CopyBytes(m.pairs[m.current].key), common.CopyBytes(m.pairs[m.current].value)} - return common.CopyBytes(m.pairs[m.current].key), common.CopyBytes(m.pairs[m.current].value), nil - } + return m.goForward(memK, memV, m.currentDbEntry.key, m.currentDbEntry.value) +} +// NextDup returns the next element of the mutation. +func (m *miningmutationcursor) NextDup() ([]byte, []byte, error) { if m.isPrevFromDb { - var dbKey, dbValue []byte - // we check current of memory against next in db - if nextDup { - dbKey, dbValue, err = m.dupCursor.NextDup() - } else { - dbKey, dbValue, err = m.cursor.Next() - } + k, v, err := m.dupCursor.NextDup() + if err != nil { return nil, nil, err } - return m.goForward(dbKey, dbValue) - } - // We check current of memory, against next in db - var dbKey, dbValue []byte - if nextDup { - dbKey, dbValue, err = m.dupCursor.Current() - } else { - dbKey, dbValue, err = m.cursor.Current() + return m.goForward(m.currentMemEntry.key, m.currentMemEntry.value, k, v) } + memK, memV, err := m.memDupCursor.NextDup() if err != nil { return nil, nil, err } - m.current++ - if m.table == kv.HashedStorage && len(dbKey) == 72 { - return m.goForward(common.CopyBytes(dbKey[:40]), append(dbKey[40:], dbValue...)) - } - return m.goForward(dbKey, dbValue) -} - -// Next returns the next element of the mutation. -func (m *miningmutationcursor) Next() ([]byte, []byte, error) { - return m.goNext(false) -} - -// NextDup returns the next dupsorted element of the mutation (We do not apply recovery when ending of nextDup) -func (m *miningmutationcursor) NextDup() ([]byte, []byte, error) { - currK := m.currentPair.key - nextK, nextV, err := m.goNext(true) - if err != nil { - return nil, nil, err - } - - if bytes.Compare(currK, nextK) != 0 { - return nil, nil, nil - } - return nextK, nextV, nil + return m.goForward(memK, memV, m.currentDbEntry.key, m.currentDbEntry.value) } // Seek move pointer to a key at a certain position. func (m *miningmutationcursor) Seek(seek []byte) ([]byte, []byte, error) { - var dbKey, dbValue []byte - var err error - if m.cursor != nil { - dbKey, dbValue, err = m.cursor.Seek(seek) - if err != nil { - return nil, nil, err - } + dbKey, dbValue, err := m.cursor.Seek(seek) + if err != nil { + return nil, nil, err } - // TODO(Giulio2002): Use Golang search - for i := range m.pairs { - if len(m.pairs[i].key) >= len(seek) && bytes.Compare(m.pairs[i].key[:len(seek)], seek) >= 0 { - m.current = i - return m.goForward(dbKey, dbValue) - } + memKey, memValue, err := m.memCursor.Seek(seek) + if err != nil { + return nil, nil, err } - m.current = len(m.pairs) - m.isPrevFromDb = true - return dbKey, dbValue, nil + return m.goForward(memKey, memValue, dbKey, dbValue) } // Seek move pointer to a key at a certain position. func (m *miningmutationcursor) SeekExact(seek []byte) ([]byte, []byte, error) { - current := -1 - for i, pair := range m.pairs { - if bytes.Compare(pair.key, seek) == 0 { - current = i - break - } + memKey, memValue, err := m.memCursor.SeekExact(seek) + if err != nil { + return nil, nil, err } - if current >= 0 { - m.current = current - dbKey, dbValue, err := m.cursor.Seek(seek) - if err != nil { - return nil, nil, err - } - return m.goForward(dbKey, dbValue) + if memKey != nil { + m.currentMemEntry.key = memKey + m.currentMemEntry.value = memValue + m.currentDbEntry.key, m.currentDbEntry.value, err = m.cursor.Seek(seek) + m.isPrevFromDb = false + m.currentPair = cursorentry{memKey, memValue} + return memKey, memValue, err } dbKey, dbValue, err := m.cursor.SeekExact(seek) if err != nil { return nil, nil, err } + if dbKey != nil { + m.currentDbEntry.key = dbKey + m.currentDbEntry.value = dbValue + m.currentMemEntry.key, m.currentMemEntry.value, err = m.memCursor.Seek(seek) + m.isPrevFromDb = true m.currentPair = cursorentry{dbKey, dbValue} - m.current = len(m.pairs) + return dbKey, dbValue, err } - m.isPrevFromDb = true - return dbKey, dbValue, err + return nil, nil, nil } func (m *miningmutationcursor) Put(k, v []byte) error { - return m.mutation.Put(m.table, k, v) + return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v)) } func (m *miningmutationcursor) Append(k []byte, v []byte) error { - return m.Put(k, v) + return m.mutation.Put(m.table, common.CopyBytes(k), common.CopyBytes(v)) + } func (m *miningmutationcursor) AppendDup(k []byte, v []byte) error { - return m.Put(k, v) + return m.memDupCursor.AppendDup(common.CopyBytes(k), common.CopyBytes(v)) } func (m *miningmutationcursor) PutNoDupData(key, value []byte) error { @@ -332,48 +213,36 @@ func (m *miningmutationcursor) DeleteCurrentDuplicates() error { // Seek move pointer to a key at a certain position. func (m *miningmutationcursor) SeekBothRange(key, value []byte) ([]byte, error) { if value == nil { - k, v, err := m.SeekExact(key) - m.currentPair = cursorentry{common.CopyBytes(k), common.CopyBytes(v)} - m.isPrevFromDb = true + _, v, err := m.SeekExact(key) return v, err } + dbValue, err := m.dupCursor.SeekBothRange(key, value) if err != nil { return nil, err } - // TODO(Giulio2002): Use Golang search - for i := range m.pairs { - if bytes.Compare(m.pairs[i].key, key) == 0 && len(m.pairs[i].value) >= len(value) && bytes.Compare(m.pairs[i].value[:len(value)], value) >= 0 { - m.current = i - _, retValue, err := m.goForward(key, dbValue) - return retValue, err - } + + memValue, err := m.memDupCursor.SeekBothRange(key, value) + if err != nil { + return nil, err } - m.currentPair = cursorentry{common.CopyBytes(key), common.CopyBytes(dbValue)} - m.current = len(m.pairs) - m.isPrevFromDb = true - return dbValue, nil + _, retValue, err := m.goForward(key, memValue, key, dbValue) + return retValue, err } func (m *miningmutationcursor) Last() ([]byte, []byte, error) { - m.current = len(m.pairs) - 1 - if m.cursor == nil { - if m.current == -1 { - return nil, nil, nil - } - return m.goForward(nil, nil) - } - dbKey, dbValue, err := m.cursor.Last() + // TODO(Giulio2002): make fixes. + memKey, memValue, err := m.memCursor.Last() if err != nil { return nil, nil, err } - if m.current == -1 { - m.currentPair = cursorentry{dbKey, dbValue} - return dbKey, dbValue, nil + dbKey, dbValue, err := m.cursor.Last() + if err != nil { + return nil, nil, err } - return m.goForward(dbKey, dbValue) + return m.goForward(memKey, memValue, dbKey, dbValue) } func (m *miningmutationcursor) Prev() ([]byte, []byte, error) { @@ -384,6 +253,9 @@ func (m *miningmutationcursor) Close() { if m.cursor != nil { m.cursor.Close() } + if m.memCursor != nil { + m.memCursor.Close() + } return } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 7d80f4b3cb..7b2758e689 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -245,7 +245,10 @@ func MiningStep(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync) (err e } defer tx.Rollback() - if err = mining.Run(nil, olddb.NewMiningBatch(tx), false); err != nil { + miningBatch := olddb.NewMiningBatch(tx) + defer miningBatch.Rollback() + + if err = mining.Run(nil, miningBatch, false); err != nil { return err } tx.Rollback() -- GitLab