From 2a08dbd3d98ad0b9a56470cc68fbfc6c9a1834c9 Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Fri, 19 Nov 2021 12:12:25 +0700
Subject: [PATCH] Recsplit: move files read/write helpers to erigon-lib (#2993)

---
 cmd/hack/hack.go                      | 219 +++++++++-----------------
 go.mod                                |   2 +-
 go.sum                                |   4 +-
 turbo/snapshotsync/block_snapshots.go |   8 +-
 4 files changed, 81 insertions(+), 152 deletions(-)

diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go
index 6331db1afb..cc9dc58f6d 100644
--- a/cmd/hack/hack.go
+++ b/cmd/hack/hack.go
@@ -6,7 +6,6 @@ import (
 	"container/heap"
 	"context"
 	"encoding/binary"
-	"encoding/hex"
 	"errors"
 	"flag"
 	"fmt"
@@ -20,7 +19,6 @@ import (
 	"runtime"
 	"runtime/pprof"
 	"sort"
-	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -60,9 +58,11 @@ import (
 	"github.com/ledgerwatch/erigon/migrations"
 	"github.com/ledgerwatch/erigon/params"
 	"github.com/ledgerwatch/erigon/rlp"
+	"github.com/ledgerwatch/erigon/turbo/snapshotsync"
 	"github.com/ledgerwatch/erigon/turbo/trie"
 	"github.com/ledgerwatch/log/v3"
 	"github.com/wcharczuk/go-chart/v2"
+	atomic2 "go.uber.org/atomic"
 )
 
 const ASSERT = false
@@ -1594,14 +1594,10 @@ func compress1(chaindata string, name string) error {
 	logEvery := time.NewTicker(20 * time.Second)
 	defer logEvery.Stop()
 
-	var superstring []byte
 	// Read keys from the file and generate superstring (with extra byte 0x1 prepended to each character, and with 0x0 0x0 pair inserted between keys and values)
 	// We only consider values with length > 2, because smaller values are not compressible without going into bits
-	f, err := os.Open(name + ".dat")
-	if err != nil {
-		return err
-	}
-	r := bufio.NewReaderSize(f, etl.BufIOSize)
+	var superstring []byte
+
 	// Collector for dictionary words (sorted by their score)
 	tmpDir := ""
 	ch := make(chan []byte, runtime.NumCPU())
@@ -1613,23 +1609,13 @@ func compress1(chaindata string, name string) error {
 		collectors[i] = collector
 		go processSuperstring(ch, collector, &wg)
 	}
-	var buf []byte
-	l, e := binary.ReadUvarint(r)
 	i := 0
-	for ; e == nil; l, e = binary.ReadUvarint(r) {
-		if len(buf) < int(l) {
-			buf = make([]byte, l)
-		}
-		if _, e = io.ReadFull(r, buf[:l]); e != nil {
-			return e
-		}
-		if len(superstring)+2*int(l)+2 > superstringLimit {
+	if err := compress.ReadDatFile(name+".dat", func(v []byte) error {
+		if len(superstring)+2*len(v)+2 > superstringLimit {
 			ch <- superstring
 			superstring = nil
 		}
-		//fmt.Printf("\"%x\",\n", buf[:l])
-
-		for _, a := range buf[:l] {
+		for _, a := range v {
 			superstring = append(superstring, 1, a)
 		}
 		superstring = append(superstring, 0, 0)
@@ -1639,15 +1625,11 @@ func compress1(chaindata string, name string) error {
 		case <-logEvery.C:
 			log.Info("Dictionary preprocessing", "millions", i/1_000_000)
 		}
-
-	}
-	itemsCount := i
-	if e != nil && !errors.Is(e, io.EOF) {
-		return e
-	}
-	if err = f.Close(); err != nil {
+		return nil
+	}); err != nil {
 		return err
 	}
+	itemsCount := i
 	if len(superstring) > 0 {
 		ch <- superstring
 	}
@@ -1658,18 +1640,9 @@ func compress1(chaindata string, name string) error {
 	if err != nil {
 		return err
 	}
-
-	var df *os.File
-	df, err = os.Create(name + ".dictionary.txt")
-	if err != nil {
+	if err := compress.PersistDictrionary(name+".dictionary.txt", db); err != nil {
 		return err
 	}
-	w := bufio.NewWriterSize(df, etl.BufIOSize)
-	db.ForEach(func(score uint64, word []byte) { fmt.Fprintf(w, "%d %x\n", score, word) })
-	if err = w.Flush(); err != nil {
-		return err
-	}
-	df.Close()
 
 	if err := reducedict(name); err != nil {
 		return err
@@ -1680,7 +1653,6 @@ func compress1(chaindata string, name string) error {
 	return nil
 }
 
-// DynamicCell represents result of dynamic programming for certain starting position
 type DynamicCell struct {
 	optimStart  int
 	coverStart  int
@@ -1905,7 +1877,7 @@ func optimiseCluster(trace bool, numBuf []byte, input []byte, trie *patricia.Pat
 	return output, patterns, uncovered
 }
 
-func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *patricia.PatriciaTree, collector *etl.Collector, inputSize *uint64, outputSize *uint64, posMap map[uint64]uint64) {
+func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *patricia.PatriciaTree, collector *etl.Collector, inputSize, outputSize atomic2.Uint64, posMap map[uint64]uint64) {
 	defer completion.Done()
 	var output = make([]byte, 0, 256)
 	var uncovered = make([]int, 256)
@@ -1924,8 +1896,8 @@ func reduceDictWorker(inputCh chan []byte, completion *sync.WaitGroup, trie *pat
 				return
 			}
 		}
-		atomic.AddUint64(inputSize, 1+uint64(len(input)-8))
-		atomic.AddUint64(outputSize, uint64(len(output)))
+		inputSize.Add(1 + uint64(len(input)-8))
+		outputSize.Add(uint64(len(output)))
 		posMap[uint64(len(input)-8+1)]++
 		posMap[0]++
 	}
@@ -2173,27 +2145,13 @@ func (hf *HuffmanCoder) flush() error {
 func reducedict(name string) error {
 	logEvery := time.NewTicker(20 * time.Second)
 	defer logEvery.Stop()
-	// Read up the dictionary
-	df, err := os.Open(name + ".dictionary.txt")
-	if err != nil {
-		return err
-	}
-	// DictonaryBuilder is for sorting words by their freuency (to assign codes)
+
+	// DictionaryBuilder is for sorting words by their freuency (to assign codes)
 	var pt patricia.PatriciaTree
 	code2pattern := make([]*Pattern, 0, 256)
-	ds := bufio.NewScanner(df)
-	for ds.Scan() {
-		tokens := strings.Split(ds.Text(), " ")
-		var score int64
-		if score, err = strconv.ParseInt(tokens[0], 10, 64); err != nil {
-			return err
-		}
-		var word []byte
-		if word, err = hex.DecodeString(tokens[1]); err != nil {
-			return err
-		}
+	if err := compress.ReadDictrionary(name+".dictionary.txt", func(score uint64, word []byte) error {
 		p := &Pattern{
-			score:    uint64(score),
+			score:    score,
 			uses:     0,
 			code:     uint64(len(code2pattern)),
 			codeBits: 0,
@@ -2201,44 +2159,31 @@ func reducedict(name string) error {
 		}
 		pt.Insert(word, p)
 		code2pattern = append(code2pattern, p)
-	}
-	if err = df.Close(); err != nil {
+		return nil
+	}); err != nil {
 		return err
 	}
 	log.Info("dictionary file parsed", "entries", len(code2pattern))
-	statefile := name + ".dat"
-	f, err := os.Open(statefile)
-	if err != nil {
-		return err
-	}
-	r := bufio.NewReader(f)
-	var buf []byte
 	tmpDir := ""
 	ch := make(chan []byte, 10000)
-	var inputSize, outputSize uint64
+	var inputSize, outputSize atomic2.Uint64
 	var wg sync.WaitGroup
+	workers := runtime.NumCPU()
 	var collectors []*etl.Collector
 	var posMaps []map[uint64]uint64
-	for i := 0; i < runtime.NumCPU(); i++ {
+	for i := 0; i < workers; i++ {
 		collector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
 		collectors = append(collectors, collector)
 		posMap := make(map[uint64]uint64)
 		posMaps = append(posMaps, posMap)
 		wg.Add(1)
-		go reduceDictWorker(ch, &wg, &pt, collector, &inputSize, &outputSize, posMap)
+		go reduceDictWorker(ch, &wg, &pt, collector, inputSize, outputSize, posMap)
 	}
-	l, e := binary.ReadUvarint(r)
 	i := 0
-	for ; e == nil; l, e = binary.ReadUvarint(r) {
-		if len(buf) < int(l) {
-			buf = make([]byte, l)
-		}
-		if _, e = io.ReadFull(r, buf[:l]); e != nil {
-			return e
-		}
-		input := make([]byte, 8+int(l))
+	if err := compress.ReadDatFile(name+".dat", func(v []byte) error {
+		input := make([]byte, 8+int(len(v)))
 		binary.BigEndian.PutUint64(input, uint64(i))
-		copy(input[8:], buf[:l])
+		copy(input[8:], v)
 		ch <- input
 		i++
 		select {
@@ -2246,20 +2191,17 @@ func reducedict(name string) error {
 		case <-logEvery.C:
 			var m runtime.MemStats
 			runtime.ReadMemStats(&m)
-			log.Info("Replacement preprocessing", "millions", i/1_000_000, "input", common.StorageSize(atomic.LoadUint64(&inputSize)), "output", common.StorageSize(atomic.LoadUint64(&outputSize)), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
+			log.Info("Replacement preprocessing", "millions", i/1_000_000, "input", common.StorageSize(inputSize.Load()), "output", common.StorageSize(outputSize.Load()), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
 		}
-	}
-	if e != nil && !errors.Is(e, io.EOF) {
-		return e
-	}
-	if err = f.Close(); err != nil {
+		return nil
+	}); err != nil {
 		return err
 	}
 	close(ch)
 	wg.Wait()
 	var m runtime.MemStats
 	runtime.ReadMemStats(&m)
-	log.Info("Done", "input", common.StorageSize(atomic.LoadUint64(&inputSize)), "output", common.StorageSize(atomic.LoadUint64(&outputSize)), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
+	log.Info("Done", "input", common.StorageSize(inputSize.Load()), "output", common.StorageSize(outputSize.Load()), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
 	posMap := make(map[uint64]uint64)
 	for _, m := range posMaps {
 		for l, c := range m {
@@ -2336,6 +2278,7 @@ func reducedict(name string) error {
 	}
 	root := heap.Pop(&codeHeap).(*PatternHuff)
 	var cf *os.File
+	var err error
 	if cf, err = os.Create(name + ".compressed.dat"); err != nil {
 		return err
 	}
@@ -2498,7 +2441,7 @@ func reducedict(name string) error {
 		}
 	}
 	log.Info("Positional dictionary", "size", offset, "position cutoff", positionCutoff)
-	df, err = os.Create("huffman_codes.txt")
+	df, err := os.Create("huffman_codes.txt")
 	if err != nil {
 		return err
 	}
@@ -2638,16 +2581,15 @@ func recsplitWholeChain(chaindata string) error {
 
 	log.Info("Last body number", "last", last)
 	for i := uint64(*block); i < last; i += blocksPerFile {
-		*name = fmt.Sprintf("bodies%d-%dm", i/1_000_000, i%1_000_000/100_000)
-		log.Info("Creating", "file", *name)
-
-		if err := dumpTxs(chaindata, i, *blockTotal, *name); err != nil {
+		fileName := snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Transactions)
+		log.Info("Creating", "file", fileName+".seg")
+		if err := dumpTxs(chaindata, i, *blockTotal, fileName); err != nil {
 			return err
 		}
-		if err := compress1(chaindata, *name); err != nil {
+		if err := compress1(chaindata, fileName); err != nil {
 			return err
 		}
-		_ = os.Remove(*name + ".dat")
+		_ = os.Remove(fileName + ".dat")
 	}
 	return nil
 }
@@ -3530,21 +3472,6 @@ func dumpTxs(chaindata string, block uint64, blockTotal int, name string) error
 	chainConfig := tool.ChainConfigFromDB(db)
 	chainID, _ := uint256.FromBig(chainConfig.ChainID)
 
-	tx, err := db.BeginRo(context.Background())
-	if err != nil {
-		return err
-	}
-	defer tx.Rollback()
-	var txs kv.Cursor
-	if txs, err = tx.Cursor(kv.EthTx); err != nil {
-		return err
-	}
-	defer txs.Close()
-	var bodies kv.Cursor
-	if bodies, err = tx.Cursor(kv.BlockBody); err != nil {
-		return err
-	}
-	defer bodies.Close()
 	f, err := os.Create(name + ".dat")
 	if err != nil {
 		return err
@@ -3554,56 +3481,54 @@ func dumpTxs(chaindata string, block uint64, blockTotal int, name string) error
 	defer w.Flush()
 	i := 0
 	numBuf := make([]byte, binary.MaxVarintLen64)
-	blockEncoded := dbutils.EncodeBlockNumber(block)
 	parseCtx := txpool.NewTxParseContext(*chainID)
 	parseCtx.WithSender(false)
 	slot := txpool.TxSlot{}
 	valueBuf := make([]byte, 16*4096)
-	k, v, e := bodies.Seek(blockEncoded)
-	for ; k != nil && e == nil; k, v, e = bodies.Next() {
+	from := dbutils.EncodeBlockNumber(block)
+	if err := kv.BigChunks(db, kv.BlockBody, from, func(tx kv.Tx, k, v []byte) (bool, error) {
 		bodyNum := binary.BigEndian.Uint64(k)
 		if bodyNum >= block+uint64(blockTotal) {
-			break
+			return false, nil
 		}
+
 		var body types.BodyForStorage
-		if e = rlp.DecodeBytes(v, &body); err != nil {
-			return e
+		if e := rlp.DecodeBytes(v, &body); err != nil {
+			return false, e
 		}
-		if body.TxAmount > 0 {
-			binary.BigEndian.PutUint64(numBuf, body.BaseTxId)
-			tk, tv, te := txs.Seek(numBuf[:8])
-			for ; tk != nil && te == nil; tk, tv, te = txs.Next() {
-				txId := binary.BigEndian.Uint64(tk)
-				if txId >= body.BaseTxId+uint64(body.TxAmount) {
-					break
-				}
-				if _, err := parseCtx.ParseTransaction(tv, 0, &slot, nil); err != nil {
-					panic(err)
-				}
-				valueBuf = valueBuf[:0]
-				valueBuf = append(append(valueBuf, slot.IdHash[:1]...), tv...)
-				n := binary.PutUvarint(numBuf, uint64(len(valueBuf)))
-				if _, e = w.Write(numBuf[:n]); e != nil {
-					return err
-				}
-				if len(valueBuf) > 0 {
-					if _, e = w.Write(valueBuf); e != nil {
-						return e
-					}
-				}
-				i++
-				if i%1_000_000 == 0 {
-					log.Info("Wrote into file", "million txs", i/1_000_000, "block num", bodyNum)
+		if body.TxAmount == 0 {
+			return true, nil
+		}
+
+		binary.BigEndian.PutUint64(numBuf, body.BaseTxId)
+		if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount, func(tk, tv []byte) error {
+			if _, err := parseCtx.ParseTransaction(tv, 0, &slot, nil); err != nil {
+				return err
+			}
+			valueBuf = valueBuf[:0]
+			valueBuf = append(append(valueBuf, slot.IdHash[:1]...), tv...)
+			n := binary.PutUvarint(numBuf, uint64(len(valueBuf)))
+			if _, e := w.Write(numBuf[:n]); e != nil {
+				return e
+			}
+			if len(valueBuf) > 0 {
+				if _, e := w.Write(valueBuf); e != nil {
+					return e
 				}
 			}
-			if te != nil && !errors.Is(te, io.EOF) {
-				return te
+			i++
+			if i%1_000_000 == 0 {
+				log.Info("Wrote into file", "million txs", i/1_000_000, "block num", bodyNum)
 			}
+			return nil
+		}); err != nil {
+			return false, err
 		}
+		return true, nil
+	}); err != nil {
+		return err
 	}
-	if e != nil && !errors.Is(e, io.EOF) {
-		return e
-	}
+
 	return nil
 }
 
diff --git a/go.mod b/go.mod
index 519d1347bd..bbe4937333 100644
--- a/go.mod
+++ b/go.mod
@@ -35,7 +35,7 @@ require (
 	github.com/json-iterator/go v1.1.12
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/kevinburke/go-bindata v3.21.0+incompatible
-	github.com/ledgerwatch/erigon-lib v0.0.0-20211118093743-9ba9ebf3c17e
+	github.com/ledgerwatch/erigon-lib v0.0.0-20211119034502-bef3d1e3bd42
 	github.com/ledgerwatch/log/v3 v3.4.0
 	github.com/ledgerwatch/secp256k1 v1.0.0
 	github.com/logrusorgru/aurora/v3 v3.0.0
diff --git a/go.sum b/go.sum
index f09b0ee1bf..203126dc19 100644
--- a/go.sum
+++ b/go.sum
@@ -596,8 +596,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
 github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
 github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
-github.com/ledgerwatch/erigon-lib v0.0.0-20211118093743-9ba9ebf3c17e h1:i0AuM72J9lbqTRwd5GPBQE/08QEM/9mGS5vLwMau2kw=
-github.com/ledgerwatch/erigon-lib v0.0.0-20211118093743-9ba9ebf3c17e/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
+github.com/ledgerwatch/erigon-lib v0.0.0-20211119034502-bef3d1e3bd42 h1:u6JmlvVVy285hy3H21PUOqMk6W3FWwDvcM/FVQtGwC0=
+github.com/ledgerwatch/erigon-lib v0.0.0-20211119034502-bef3d1e3bd42/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
 github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
 github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
 github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go
index 88a562f5a4..4b0564a704 100644
--- a/turbo/snapshotsync/block_snapshots.go
+++ b/turbo/snapshotsync/block_snapshots.go
@@ -27,12 +27,16 @@ var (
 	ErrInvalidCompressedFileName = fmt.Errorf("invalid compressed file name")
 )
 
+func FileName(from, to uint64, name SnapshotType) string {
+	return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, name)
+}
+
 func CompressedFileName(from, to uint64, name SnapshotType) string {
-	return fmt.Sprintf("v1-%06d-%06d-%s.seg", from/1_000, to/1_000, name)
+	return FileName(from, to, name) + ".seg"
 }
 
 func IdxFileName(from, to uint64, name SnapshotType) string {
-	return fmt.Sprintf("v1-%06d-%06d-%s.idx", from/1_000, to/1_000, name)
+	return FileName(from, to, name) + ".idx"
 }
 
 type Snapshot struct {
-- 
GitLab