From 384235129344b9ffa482f9769b23ed8609fa5e2b Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Tue, 9 Nov 2021 10:12:25 +0700
Subject: [PATCH] move DictAggregator to erigon-lib (#2933)

---
 cmd/hack/hack.go | 111 ++++++++---------------------------------------
 go.mod           |   2 +-
 go.sum           |   4 +-
 3 files changed, 20 insertions(+), 97 deletions(-)

diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go
index 44cf06ca95..e5e1c8588d 100644
--- a/cmd/hack/hack.go
+++ b/cmd/hack/hack.go
@@ -13,6 +13,7 @@ import (
 	"io"
 	"io/ioutil"
 	"math/big"
+	"net/http"
 	_ "net/http/pprof" //nolint:gosec
 	"os"
 	"os/signal"
@@ -55,7 +56,6 @@ import (
 	"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
 	"github.com/ledgerwatch/erigon/ethdb"
 	"github.com/ledgerwatch/erigon/ethdb/cbor"
-	"github.com/ledgerwatch/erigon/metrics/exp"
 	"github.com/ledgerwatch/erigon/migrations"
 	"github.com/ledgerwatch/erigon/params"
 	"github.com/ledgerwatch/erigon/rlp"
@@ -64,7 +64,7 @@ import (
 	"github.com/wcharczuk/go-chart/v2"
 )
 
-const ASSERT = true
+const ASSERT = false
 
 var (
 	verbosity  = flag.Uint("verbosity", 3, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default 3)")
@@ -1311,7 +1311,7 @@ func mphf(chaindata string, block int) error {
 	if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{
 		KeyCount:   int(count),
 		BucketSize: 2000,
-		Salt:       0,
+		Salt:       1,
 		LeafSize:   8,
 		TmpDir:     "",
 		StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73,
@@ -1492,6 +1492,7 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
 		}
 		//log.Info("Kasai algorithm finished")
 		// Checking LCP array
+
 		if ASSERT {
 			for i := 0; i < n-1; i++ {
 				var prefixLen int
@@ -1502,6 +1503,7 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
 				}
 				if prefixLen != int(lcp[i]) {
 					log.Error("Mismatch", "prefixLen", prefixLen, "lcp[i]", lcp[i])
+					break
 				}
 				l := int(lcp[i]) // Length of potential dictionary word
 				if l < 2 {
@@ -1511,7 +1513,7 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
 				for s := 0; s < l; s++ {
 					dictKey[s] = superstring[(filtered[i]+s)*2+1]
 				}
-				fmt.Printf("%d %d %s\n", filtered[i], lcp[i], dictKey)
+				//fmt.Printf("%d %d %s\n", filtered[i], lcp[i], dictKey)
 			}
 		}
 		//log.Info("LCP array checked")
@@ -1567,73 +1569,6 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
 	completion.Done()
 }
 
-type DictionaryItem struct {
-	word  []byte
-	score uint64
-}
-
-type DictionaryBuilder struct {
-	limit         int
-	lastWord      []byte
-	lastWordScore uint64
-	items         []DictionaryItem
-}
-
-func (db DictionaryBuilder) Len() int {
-	return len(db.items)
-}
-
-func (db DictionaryBuilder) Less(i, j int) bool {
-	if db.items[i].score == db.items[j].score {
-		return bytes.Compare(db.items[i].word, db.items[j].word) < 0
-	}
-	return db.items[i].score < db.items[j].score
-}
-
-func (db *DictionaryBuilder) Swap(i, j int) {
-	db.items[i], db.items[j] = db.items[j], db.items[i]
-}
-
-func (db *DictionaryBuilder) Push(x interface{}) {
-	db.items = append(db.items, x.(DictionaryItem))
-}
-
-func (db *DictionaryBuilder) Pop() interface{} {
-	old := db.items
-	n := len(old)
-	x := old[n-1]
-	db.items = old[0 : n-1]
-	return x
-}
-
-func (db *DictionaryBuilder) processWord(word []byte, score uint64) {
-	heap.Push(db, DictionaryItem{word: word, score: score})
-	if db.Len() > db.limit {
-		// Remove the element with smallest score
-		heap.Pop(db)
-	}
-}
-
-func (db *DictionaryBuilder) compressLoadFunc(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
-	score := binary.BigEndian.Uint64(v)
-	if bytes.Equal(k, db.lastWord) {
-		db.lastWordScore += score
-	} else {
-		if db.lastWord != nil {
-			db.processWord(db.lastWord, db.lastWordScore)
-		}
-		db.lastWord = common.CopyBytes(k)
-		db.lastWordScore = score
-	}
-	return nil
-}
-
-func (db *DictionaryBuilder) finish() {
-	if db.lastWord != nil {
-		db.processWord(db.lastWord, db.lastWordScore)
-	}
-}
-
 type DictAggregator struct {
 	lastWord      []byte
 	lastWordScore uint64
@@ -1751,35 +1686,19 @@ func compress1(chaindata string, name string) error {
 	}
 	close(ch)
 	wg.Wait()
-	dictCollector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
-	dictAggregator := &DictAggregator{collector: dictCollector}
-	for _, collector := range collectors {
-		if err = collector.Load(nil, "", dictAggregator.aggLoadFunc, etl.TransformArgs{}); err != nil {
-			return err
-		}
-		collector.Close()
-	}
-	if err = dictAggregator.finish(); err != nil {
-		return err
-	}
-	db := &DictionaryBuilder{limit: maxDictPatterns} // Only collect 1m words with highest scores
-	if err = dictCollector.Load(nil, "", db.compressLoadFunc, etl.TransformArgs{}); err != nil {
+
+	db, err := compress.DictionaryBuilderFromCollectors(context.Background(), CompressLogPrefix, tmpDir, collectors)
+	if err != nil {
 		return err
 	}
-	db.finish()
-	dictCollector.Close()
+
 	var df *os.File
 	df, err = os.Create(name + ".dictionary.txt")
 	if err != nil {
 		return err
 	}
 	w := bufio.NewWriterSize(df, etl.BufIOSize)
-	// Sort dictionary builder
-	sort.Sort(db)
-
-	for i := len(db.items); i > 0; i-- {
-		fmt.Fprintf(w, "%d %x\n", db.items[i-1].score, db.items[i-1].word)
-	}
+	db.ForEach(func(score uint64, word []byte) { fmt.Fprintf(w, "%d %x\n", score, word) })
 	if err = w.Flush(); err != nil {
 		return err
 	}
@@ -2920,7 +2839,7 @@ RETRY:
 
 	if err = rs.Build(); err != nil {
 		if errors.Is(err, recsplit.ErrCollision) {
-			log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
+			log.Info("Building recsplit. Collision happened. It's ok. Restarting...", "err", err)
 			rs.ResetNextSalt()
 			goto RETRY
 		}
@@ -4142,7 +4061,11 @@ func main() {
 		}
 		defer pprof.StopCPUProfile()
 	}
-	exp.Setup("0.0.0.0:6060")
+	go func() {
+		if err := http.ListenAndServe("localhost:6060", nil); err != nil {
+			log.Error("Failure in running pprof server", "err", err)
+		}
+	}()
 
 	var err error
 	switch *action {
diff --git a/go.mod b/go.mod
index 85940a1cfd..9a380c8e9b 100644
--- a/go.mod
+++ b/go.mod
@@ -35,7 +35,7 @@ require (
 	github.com/json-iterator/go v1.1.12
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/kevinburke/go-bindata v3.21.0+incompatible
-	github.com/ledgerwatch/erigon-lib v0.0.0-20211108033615-b426ff2271f0
+	github.com/ledgerwatch/erigon-lib v0.0.0-20211109030232-5677f0c2bd53
 	github.com/ledgerwatch/log/v3 v3.4.0
 	github.com/ledgerwatch/secp256k1 v1.0.0
 	github.com/logrusorgru/aurora/v3 v3.0.0
diff --git a/go.sum b/go.sum
index afe80cb796..d09d0c0362 100644
--- a/go.sum
+++ b/go.sum
@@ -596,8 +596,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
 github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
 github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
-github.com/ledgerwatch/erigon-lib v0.0.0-20211108033615-b426ff2271f0 h1:uCFtgY4IegzfrNvfAwQNIebBk74I9folhLyyB4lSOjY=
-github.com/ledgerwatch/erigon-lib v0.0.0-20211108033615-b426ff2271f0/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
+github.com/ledgerwatch/erigon-lib v0.0.0-20211109030232-5677f0c2bd53 h1:SRxoOSlbv2o1qzU5Cqx59ZVBKh7x8baEF2taRySNubk=
+github.com/ledgerwatch/erigon-lib v0.0.0-20211109030232-5677f0c2bd53/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
 github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
 github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
 github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
-- 
GitLab