From 0d34a5b9c571ab2f88d7c69ade884771cb34cf59 Mon Sep 17 00:00:00 2001 From: ledgerwatch <akhounov@gmail.com> Date: Mon, 27 Sep 2021 21:32:53 +0100 Subject: [PATCH] [State sync experiment] Building dictionary for state file compression (#2721) * Kasai experiments * Use divsufsort instead of standard lib * Refactor experiments, split dictionary building into chunks * Fixes * Estimate compression, sample * More stats * Parallelise * Fix lint * dictionary aggregator * Actual replacement * Fixes, separate dictionary processing * Test file * More correct dictionary, decompression * Use dynamic programming to reduce dictionary Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local> --- cmd/hack/hack.go | 1037 +++++++++++++++++++++++++++++++++++++++++++--- go.mod | 2 + go.sum | 4 + 3 files changed, 984 insertions(+), 59 deletions(-) diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index 94eb75b035..d7f302ba3e 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -3,10 +3,14 @@ package main import ( "bufio" "bytes" + "container/heap" "context" "encoding/binary" + "encoding/hex" + "errors" "flag" "fmt" + "io" "io/ioutil" "math/big" "os" @@ -14,12 +18,16 @@ import ( "runtime" "runtime/pprof" "sort" + "strconv" "strings" + "sync" + "sync/atomic" "syscall" "time" "github.com/RoaringBitmap/roaring/roaring64" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/recsplit" @@ -30,6 +38,9 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/wcharczuk/go-chart/v2" + ahocorasick "github.com/BobuSumisu/aho-corasick" + "github.com/flanglet/kanzi-go/transform" + hackdb "github.com/ledgerwatch/erigon/cmd/hack/db" "github.com/ledgerwatch/erigon/cmd/hack/flow" "github.com/ledgerwatch/erigon/cmd/hack/tool" @@ -1117,19 +1128,18 @@ func testGetProof(chaindata string, address common.Address, rewind int, regen bo return nil } -func dumpState(chaindata string, block uint64) error { +// dumpState writes the content of current state into a file with given name +func dumpState(chaindata string, statefile string) error { db := mdbx.MustOpen(chaindata) defer db.Close() - f, err := os.Create("statedump") + f, err := os.Create(statefile) if err != nil { return err } defer f.Close() w := bufio.NewWriter(f) defer w.Flush() - stAccounts := 0 - stStorage := 0 - var rs *recsplit.RecSplit + i := 0 if err := db.View(context.Background(), func(tx kv.Tx) error { c, err := tx.Cursor(kv.PlainState) if err != nil { @@ -1139,86 +1149,987 @@ func dumpState(chaindata string, block uint64) error { if count, err = c.Count(); err != nil { return err } - if block > 1 { - count = block - } - if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{ - KeyCount: int(count), - BucketSize: 2000, - Salt: 0, - LeafSize: 8, - TmpDir: "", - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - }); err != nil { + // Write out number of key/value pairs first + var countBytes [8]byte + binary.BigEndian.PutUint64(countBytes[:], count) + if _, err = w.Write(countBytes[:]); err != nil { return err } - k, _, e := c.First() - i := 0 - for ; k != nil && e == nil; k, _, e = c.Next() { - if err := rs.AddKey(k); err != nil { + k, v, e := c.First() + for ; k != nil && e == nil; k, v, e = c.Next() { + if err = w.WriteByte(byte(len(k))); err != nil { return err } - i++ - if i == int(count) { - break + if _, err = w.Write(k); err != nil { + return err } + if err = w.WriteByte(byte(len(v))); err != nil { + return err + } + if len(v) > 0 { + if _, err = w.Write(v); err != nil { + return err + } + } + i++ if i%1_000_000 == 0 { - log.Info("Added", "keys", i) + log.Info("Written into file", "key/value pairs", i) } } if e != nil { return e } - start := time.Now() - log.Info("Building recsplit...") - if err = rs.Build(); err != nil { + return nil + }); err != nil { + return err + } + return nil +} + +func mphf(chaindata string, block uint64) error { + // Create a file to compress if it does not exist already + statefile := "statedump.dat" + if _, err := os.Stat(statefile); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("not sure if statedump.dat exists: %v", err) + } + if err = dumpState(chaindata, statefile); err != nil { return err } - s1, s2 := rs.Stats() - log.Info("Done", "time", time.Since(start), "s1", s1, "s2", s2) - log.Info("Testing bijection") - bitCount := (count + 63) / 64 - bits := make([]uint64, bitCount) - k, _, e = c.First() - i = 0 - var collisionMap map[int]string - if count < 1000000 { - collisionMap = make(map[int]string) - } - var lookupTime time.Duration - for ; k != nil && e == nil; k, _, e = c.Next() { + } + var rs *recsplit.RecSplit + f, err := os.Open(statefile) + if err != nil { + return err + } + r := bufio.NewReader(f) + defer f.Close() + var countBuf [8]byte + if _, err = io.ReadFull(r, countBuf[:]); err != nil { + return err + } + count := binary.BigEndian.Uint64(countBuf[:]) + if block > 1 { + count = block + } + if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{ + KeyCount: int(count), + BucketSize: 2000, + Salt: 0, + LeafSize: 8, + TmpDir: "", + StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, + 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, + 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, + }); err != nil { + return err + } + var buf [256]byte + l, e := r.ReadByte() + i := 0 + for ; e == nil; l, e = r.ReadByte() { + if _, e = io.ReadFull(r, buf[:l]); e != nil { + return e + } + if i%1 == 0 { + // It is key, we skip the values here + if err := rs.AddKey(buf[:l]); err != nil { + return err + } + } + i++ + if i == int(count*2) { + break + } + } + if e != nil && !errors.Is(e, io.EOF) { + return e + } + start := time.Now() + log.Info("Building recsplit...") + if err = rs.Build(); err != nil { + return err + } + s1, s2 := rs.Stats() + log.Info("Done", "time", time.Since(start), "s1", s1, "s2", s2) + log.Info("Testing bijection") + bitCount := (count + 63) / 64 + bits := make([]uint64, bitCount) + if _, err = f.Seek(8, 0); err != nil { + return err + } + r = bufio.NewReader(f) + l, e = r.ReadByte() + i = 0 + var lookupTime time.Duration + for ; e == nil; l, e = r.ReadByte() { + if _, e = io.ReadFull(r, buf[:l]); e != nil { + return e + } + if i%1 == 0 { + // It is key, we skip the values here start := time.Now() - idx := rs.Lookup(k, false /* trace */) + idx := rs.Lookup(buf[:l], false /* trace */) lookupTime += time.Since(start) if idx >= int(count) { return fmt.Errorf("idx %d >= count %d", idx, count) } mask := uint64(1) << (idx & 63) if bits[idx>>6]&mask != 0 { - if collisionMap != nil { - fmt.Printf("Key %x collided with key %x\n", k, collisionMap[idx]) - } - rs.Lookup([]byte(collisionMap[idx]), true) - rs.Lookup(k, true) return fmt.Errorf("no bijection key idx=%d, lookup up idx = %d", i, idx) } - if collisionMap != nil { - collisionMap[idx] = string(k) - } bits[idx>>6] |= mask - i++ - if i == int(count) { - break + } + i++ + if i == int(count*2) { + break + } + } + if e != nil && !errors.Is(e, io.EOF) { + return e + } + log.Info("Average lookup time", "per key", time.Duration(uint64(lookupTime)/count)) + return nil +} + +// genstate generates statedump.dat file for testing +func genstate() error { + f, err := os.Create("statedump.dat") + if err != nil { + return err + } + defer f.Close() + w := bufio.NewWriter(f) + defer w.Flush() + var count uint64 = 25 + var countBuf [8]byte + binary.BigEndian.PutUint64(countBuf[:], count) + if _, err = w.Write(countBuf[:]); err != nil { + return err + } + for i := 0; i < 5; i++ { + for j := 0; j < 5; j++ { + key := fmt.Sprintf("addr%dxlocation%d", i, j) + val := "value" + if err = w.WriteByte(byte(len(key))); err != nil { + return err + } + if _, err = w.Write([]byte(key)); err != nil { + return err + } + if err = w.WriteByte(byte(len(val))); err != nil { + return err + } + if _, err = w.Write([]byte(val)); err != nil { + return err + } + } + } + return nil +} + +// processSuperstring is the worker that processes one superstring and puts results +// into the collector, using lock to mutual exclusion. At the end (when the input channel is closed), +// it notifies the waitgroup before exiting, so that the caller known when all work is done +// No error channels for now +func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector, completion *sync.WaitGroup) { + for superstring := range superstringCh { + //log.Info("Superstring", "len", len(superstring)) + sa := make([]int32, len(superstring)) + divsufsort, err := transform.NewDivSufSort() + if err != nil { + log.Error("processSuperstring", "create divsufsoet", err) + } + //start := time.Now() + divsufsort.ComputeSuffixArray(superstring, sa) + //log.Info("Suffix array built", "in", time.Since(start)) + // filter out suffixes that start with odd positions + n := len(sa) / 2 + filtered := make([]int, n) + var j int + for i := 0; i < len(sa); i++ { + if sa[i]&1 == 0 { + filtered[j] = int(sa[i] >> 1) + j++ + } + } + //log.Info("Suffix array filtered") + // invert suffixes + inv := make([]int, n) + for i := 0; i < n; i++ { + inv[filtered[i]] = i + } + //log.Info("Inverted array done") + lcp := make([]byte, n) + var k int + // Process all suffixes one by one starting from + // first suffix in txt[] + for i := 0; i < n; i++ { + /* If the current suffix is at n-1, then we don’t + have next substring to consider. So lcp is not + defined for this substring, we put zero. */ + if inv[i] == n-1 { + k = 0 + continue + } + + /* j contains index of the next substring to + be considered to compare with the present + substring, i.e., next string in suffix array */ + j := int(filtered[inv[i]+1]) + + // Directly start matching from k'th index as + // at-least k-1 characters will match + for i+k < n && j+k < n && superstring[(i+k)*2] != 0 && superstring[(j+k)*2] != 0 && superstring[(i+k)*2+1] == superstring[(j+k)*2+1] { + k++ + } + + lcp[inv[i]] = byte(k) // lcp for the present suffix. + + // Deleting the starting character from the string. + if k > 0 { + k-- + } + } + //log.Info("Kasai algorithm finished") + // Checking LCP array + /* + for i := 0; i < n-1; i++ { + var prefixLen int + p1 := int(filtered[i]) + p2 := int(filtered[i+1]) + for p1+prefixLen < n && p2+prefixLen < n && superstring[(p1+prefixLen)*2] != 0 && superstring[(p2+prefixLen)*2] != 0 && superstring[(p1+prefixLen)*2+1] == superstring[(p2+prefixLen)*2+1] { + prefixLen++ + } + if prefixLen != int(lcp[i]) { + log.Error("Mismatch", "prefixLen", prefixLen, "lcp[i]", lcp[i]) + } + l := int(lcp[i]) // Length of potential dictionary word + if l < 2 { + continue + } + dictKey := make([]byte, l) + for s := 0; s < l; s++ { + dictKey[s] = superstring[(filtered[i]+s)*2+1] + } + fmt.Printf("%d %d %s\n", filtered[i], lcp[i], dictKey) + } + */ + //log.Info("LCP array checked") + b := make([]int, 1000) // Sorting buffer + // Walk over LCP array and compute the scores of the strings + for i := 0; i < n-1; i++ { + // Only when there is a drop in LCP value + if lcp[i+1] >= lcp[i] { + continue + } + j := i + for l := int(lcp[i]); l > int(lcp[i+1]); l-- { + if l < 2 { + continue + } + // Go back + for j > 0 && int(lcp[j-1]) >= l { + j-- + } + window := i - j + 2 + for len(b) < window { + b = append(b, 0) + } + copy(b, filtered[j:i+2]) + sort.Ints(b[:window]) + repeats := 1 + lastK := 0 + for k := 1; k < window; k++ { + if b[k] >= b[lastK]+l { + repeats++ + lastK = k + } + } + //if repeats > 1 { + score := uint64(repeats * int(l-1)) + // Dictionary key is the concatenation of the score and the dictionary word (to later aggregate the scores from multiple chunks) + dictKey := make([]byte, l) + for s := 0; s < l; s++ { + dictKey[s] = superstring[(filtered[i]+s)*2+1] + } + var dictVal [8]byte + binary.BigEndian.PutUint64(dictVal[:], score) + if err = dictCollector.Collect(dictKey, dictVal[:]); err != nil { + log.Error("processSuperstring", "collect", err) + } + //} + } + } + } + completion.Done() +} + +type DictionaryItem struct { + word []byte + score uint64 +} + +type DictionaryBuilder struct { + limit int + lastWord []byte + lastWordScore uint64 + items []DictionaryItem +} + +func (db DictionaryBuilder) Len() int { + return len(db.items) +} + +func (db DictionaryBuilder) Less(i, j int) bool { + if db.items[i].score == db.items[j].score { + return bytes.Compare(db.items[i].word, db.items[j].word) < 0 + } + return db.items[i].score < db.items[j].score +} + +func (db *DictionaryBuilder) Swap(i, j int) { + db.items[i], db.items[j] = db.items[j], db.items[i] +} + +func (db *DictionaryBuilder) Push(x interface{}) { + db.items = append(db.items, x.(DictionaryItem)) +} + +func (db *DictionaryBuilder) Pop() interface{} { + old := db.items + n := len(old) + x := old[n-1] + db.items = old[0 : n-1] + return x +} + +func (db *DictionaryBuilder) processWord(word []byte, score uint64) { + heap.Push(db, DictionaryItem{word: word, score: score}) + if db.Len() > db.limit { + // Remove the element with smallest score + heap.Pop(db) + } +} + +func (db *DictionaryBuilder) compressLoadFunc(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + score := binary.BigEndian.Uint64(v) + if bytes.Equal(k, db.lastWord) { + db.lastWordScore += score + } else { + if db.lastWord != nil { + db.processWord(db.lastWord, db.lastWordScore) + } + db.lastWord = common.CopyBytes(k) + db.lastWordScore = score + } + return nil +} + +func (db *DictionaryBuilder) finish() { + if db.lastWord != nil { + db.processWord(db.lastWord, db.lastWordScore) + } +} + +type DictAggregator struct { + lastWord []byte + lastWordScore uint64 + collector *etl.Collector +} + +func (da *DictAggregator) processWord(word []byte, score uint64) error { + var scoreBuf [8]byte + binary.BigEndian.PutUint64(scoreBuf[:], score) + return da.collector.Collect(word, scoreBuf[:]) +} + +func (da *DictAggregator) aggLoadFunc(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { + score := binary.BigEndian.Uint64(v) + if bytes.Equal(k, da.lastWord) { + da.lastWordScore += score + } else { + if da.lastWord != nil { + if err := da.processWord(da.lastWord, da.lastWordScore); err != nil { + return err + } + } + da.lastWord = common.CopyBytes(k) + da.lastWordScore = score + } + return nil +} + +func (da *DictAggregator) finish() error { + if da.lastWord != nil { + return da.processWord(da.lastWord, da.lastWordScore) + } + return nil +} + +// MatchSorter is helper type to sort matches by their end position +type MatchSorter []*ahocorasick.Match + +func (ms MatchSorter) Len() int { + return len(ms) +} + +func (ms MatchSorter) Less(i, j int) bool { + return ms[i].Pos() < ms[j].Pos() +} + +func (ms *MatchSorter) Swap(i, j int) { + (*ms)[i], (*ms)[j] = (*ms)[j], (*ms)[i] +} + +const CompressLogPrefix = "compress" + +func compress(chaindata string, block uint64) error { + // Create a file to compress if it does not exist already + statefile := "statedump.dat" + if _, err := os.Stat(statefile); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("not sure if statedump.dat exists: %v", err) + } + if err = dumpState(chaindata, statefile); err != nil { + return err + } + } + var superstring []byte + const superstringLimit = 128 * 1024 * 1024 + // Read keys from the file and generate superstring (with extra byte 0x1 prepended to each character, and with 0x0 0x0 pair inserted between keys and values) + // We only consider values with length > 2, because smaller values are not compressible without going into bits + f, err := os.Open(statefile) + if err != nil { + return err + } + r := bufio.NewReader(f) + defer f.Close() + // Collector for dictionary words (sorted by their score) + tmpDir := "" + // Read number of keys + var countBuf [8]byte + if _, err = io.ReadFull(r, countBuf[:]); err != nil { + return err + } + count := binary.BigEndian.Uint64(countBuf[:]) + var stride int + if block > 1 { + stride = int(count) / int(block) + } + ch := make(chan []byte, runtime.NumCPU()) + var wg sync.WaitGroup + wg.Add(runtime.NumCPU()) + collectors := make([]*etl.Collector, runtime.NumCPU()) + for i := 0; i < runtime.NumCPU(); i++ { + collector := etl.NewCollector(tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + collectors[i] = collector + go processSuperstring(ch, collector, &wg) + } + var buf [256]byte + l, e := r.ReadByte() + i := 0 + p := 0 + for ; e == nil; l, e = r.ReadByte() { + if _, e = io.ReadFull(r, buf[:l]); e != nil { + return e + } + if (stride == 0 || (i/2)%stride == 0) && l > 4 { + if len(superstring)+2*int(l)+2 > superstringLimit { + ch <- superstring + superstring = nil + } + for _, a := range buf[:l] { + superstring = append(superstring, 1, a) + } + superstring = append(superstring, 0, 0) + p++ + if p%2_000_000 == 0 { + log.Info("Dictionary preprocessing", "key/value pairs", p/2) } } - log.Info("Average lookup time", "per key", time.Duration(uint64(lookupTime)/count)) + i++ + } + if e != nil && !errors.Is(e, io.EOF) { return e - }); err != nil { + } + if len(superstring) > 0 { + ch <- superstring + } + close(ch) + wg.Wait() + dictCollector := etl.NewCollector(tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + dictAggregator := &DictAggregator{collector: dictCollector} + for _, collector := range collectors { + if err = collector.Load(CompressLogPrefix, nil /* db */, "" /* toBucket */, dictAggregator.aggLoadFunc, etl.TransformArgs{}); err != nil { + return err + } + collector.Close(CompressLogPrefix) + } + if err = dictAggregator.finish(); err != nil { return err } - fmt.Printf("stAccounts = %d, stStorage = %d\n", stAccounts, stStorage) + // Aggregate the dictionary and build Aho-Corassick matcher + defer dictCollector.Close(CompressLogPrefix) + db := &DictionaryBuilder{limit: 16_000_000} // Only collect 16m words with highest scores + if err = dictCollector.Load(CompressLogPrefix, nil /* db */, "" /* toBucket */, db.compressLoadFunc, etl.TransformArgs{}); err != nil { + return err + } + db.finish() + var df *os.File + df, err = os.Create("dictionary.txt") + if err != nil { + return err + } + w := bufio.NewWriter(df) + // Sort dictionary builder + sort.Sort(db) + for _, item := range db.items { + fmt.Fprintf(w, "%d %x\n", item.score, item.word) + } + if err = w.Flush(); err != nil { + return err + } + df.Close() + return nil +} + +func encodeWithoutDictionary(buf []byte, encodedPos, limit int, w *bufio.Writer, setTerminal bool) error { + for encodedPos < limit { + codingLen := limit - encodedPos + var header byte = 0x00 // No dictionary bit set + if codingLen > 64 { + codingLen = 64 + } else if setTerminal { + header |= 0x80 // Terminal bit set + } + header |= byte(codingLen - 1) + if err := w.WriteByte(header); err != nil { + return err + } + if _, err := w.Write(buf[encodedPos : encodedPos+codingLen]); err != nil { + return err + } + encodedPos += codingLen + } + return nil +} + +// DynamicCell is cell for dynamic programming +type DynamicCell struct { + scores int64 + compression int + lastCompressed int // position of last compressed word + words [][]byte // dictionary words used for this cell +} + +func reduceDictWorker(inputCh chan []byte, dict map[string]int64, usedDict map[string]int, totalCompression *uint64, completion *sync.WaitGroup) { + for input := range inputCh { + l := len(input) + // Dynamic programming. We use 2 programs - one always ending with a compressed word (if possible), another - without + compressedEnd := make(map[int]DynamicCell) + simpleEnd := make(map[int]DynamicCell) + compressedEnd[0] = DynamicCell{scores: 0, compression: 0, lastCompressed: 0, words: nil} + simpleEnd[0] = DynamicCell{scores: 0, compression: 0, lastCompressed: 0, words: nil} + for k := 1; k <= l; k++ { + var maxC, maxS int + var wordsC, wordsS [][]byte + var scoresC, scoresS int64 + var lastC, lastS int + for j := 0; j < k; j++ { + cc := compressedEnd[j] + sc := simpleEnd[j] + // First assume we are not compressing slice [j:k] + ccCompression := ((j - cc.lastCompressed + 63) / 64) - ((k - cc.lastCompressed + 63) / 64) + cc.compression + scCompression := ((j - sc.lastCompressed + 63) / 64) - ((k - sc.lastCompressed + 63) / 64) + sc.compression + var compression int + var words [][]byte + var scores int64 + var lastCompressed int + if (ccCompression == scCompression && cc.scores > sc.scores) || ccCompression > scCompression { + compression = ccCompression + words = cc.words + scores = cc.scores + lastCompressed = cc.lastCompressed + } else { + compression = scCompression + words = sc.words + scores = sc.scores + lastCompressed = sc.lastCompressed + } + if j == 0 || (compression == maxS && scores > scoresS) || compression > maxS { + maxS = compression + wordsS = words + scoresS = scores + lastS = lastCompressed + } + // Now try to apply compression, if possible + if dictScore, ok := dict[string(input[j:k])]; ok { + words = append(append([][]byte{}, words...), input[j:k]) + lastCompressed = k + if (cc.compression == sc.compression && cc.scores > sc.scores) || cc.compression > sc.compression { + compression = cc.compression + (k - j) - 3 + scores = cc.scores + dictScore + } else { + compression = sc.compression + (k - j) - 3 + scores = sc.scores + dictScore + } + } + if j == 0 || compression > maxC { + maxC = compression + wordsC = words + scoresC = scores + lastC = lastCompressed + } + } + compressedEnd[k] = DynamicCell{scores: scoresC, compression: maxC, lastCompressed: lastC, words: wordsC} + simpleEnd[k] = DynamicCell{scores: scoresS, compression: maxS, lastCompressed: lastS, words: wordsS} + } + if (compressedEnd[l].compression == simpleEnd[l].compression && compressedEnd[l].scores > simpleEnd[l].scores) || compressedEnd[l].compression > simpleEnd[l].compression { + atomic.AddUint64(totalCompression, uint64(compressedEnd[l].compression)) + for _, word := range compressedEnd[int(l)].words { + usedDict[string(word)]++ + } + } else { + atomic.AddUint64(totalCompression, uint64(simpleEnd[l].compression)) + for _, word := range simpleEnd[int(l)].words { + usedDict[string(word)]++ + } + } + + } + completion.Done() +} + +// reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word +func reducedict() error { + // Read up the dictionary + df, err := os.Open("dictionary.txt") + if err != nil { + return err + } + defer df.Close() + // DictonaryBuilder is for sorting words by their freuency (to assign codes) + dict := make(map[string]int64) + ds := bufio.NewScanner(df) + for ds.Scan() { + tokens := strings.Split(ds.Text(), " ") + var score int64 + if score, err = strconv.ParseInt(tokens[0], 10, 64); err != nil { + return err + } + var word []byte + if word, err = hex.DecodeString(tokens[1]); err != nil { + return err + } + dict[string(word)] = score + } + df.Close() + log.Info("dictionary file parsed", "entries", len(dict)) + statefile := "statedump.dat" + f, err := os.Open(statefile) + if err != nil { + return err + } + defer f.Close() + if _, err = f.Seek(8, 0); err != nil { + return err + } + + var usedDicts []map[string]int + var totalCompression uint64 + inputCh := make(chan []byte, 10000) + var completion sync.WaitGroup + for p := 0; p < runtime.NumCPU(); p++ { + usedDict := make(map[string]int) + usedDicts = append(usedDicts, usedDict) + completion.Add(1) + go reduceDictWorker(inputCh, dict, usedDict, &totalCompression, &completion) + } + r := bufio.NewReader(f) + var buf [256]byte + l, e := r.ReadByte() + i := 0 + for ; e == nil; l, e = r.ReadByte() { + if _, e = io.ReadFull(r, buf[:l]); e != nil { + return e + } + inputCh <- common.CopyBytes(buf[:l]) + i++ + if i%2_000_000 == 0 { + log.Info("Replacement preprocessing", "key/value pairs", i/2, "estimated saving", common.StorageSize(atomic.LoadUint64(&totalCompression))) + } + } + if e != nil && !errors.Is(e, io.EOF) { + return e + } + close(inputCh) + completion.Wait() + usedDict := make(map[string]int) + for _, d := range usedDicts { + for word, r := range d { + usedDict[word] += r + } + } + log.Info("Done", "estimated saving", common.StorageSize(totalCompression), "effective dict size", len(usedDict)) + var rf *os.File + rf, err = os.Create("reduced_dictionary.txt") + if err != nil { + return err + } + rw := bufio.NewWriter(rf) + for word, used := range usedDict { + fmt.Fprintf(rw, "%d %x\n", used, word) + } + if err = rw.Flush(); err != nil { + return err + } + rf.Close() + return nil +} + +// trueCompress compresses statedump.dat using dictionary in reduced_dictionary.txt +func truecompress() error { + // Read up the reduced dictionary + df, err := os.Open("reduced_dictionary.txt") + if err != nil { + return err + } + defer df.Close() + // DictonaryBuilder is for sorting words by their freuency (to assign codes) + db := DictionaryBuilder{} + ds := bufio.NewScanner(df) + for ds.Scan() { + tokens := strings.Split(ds.Text(), " ") + var freq int64 + if freq, err = strconv.ParseInt(tokens[0], 10, 64); err != nil { + return err + } + var word []byte + if word, err = hex.DecodeString(tokens[1]); err != nil { + return err + } + db.items = append(db.items, DictionaryItem{score: uint64(freq), word: word}) + } + df.Close() + log.Info("Loaded dictionary", "items", len(db.items)) + sort.Sort(&db) + // Matcher + trieBuilder := ahocorasick.NewTrieBuilder() + for _, item := range db.items { + trieBuilder.AddPattern(item.word) + } + trie := trieBuilder.Build() + // Assign codewords to dictionary items + word2code := make(map[string][]byte) + code2word := make(map[string][]byte) + word2codeTerm := make(map[string][]byte) + code2wordTerm := make(map[string][]byte) + lastw := len(db.items) - 1 + // One byte codewords + for c1 := 0; c1 < 32 && lastw >= 0; c1++ { + code := []byte{0x80 | 0x40 | byte(c1)} // 0x40 is "dictionary encoded" flag, and 0x20 is flag meaning "last term of the key or value", 0x80 means "last byte of codeword" + codeTerm := []byte{0x80 | 0x40 | 0x20 | byte(c1)} + word := db.items[lastw].word + code2word[string(code)] = word + word2code[string(word)] = code + code2wordTerm[string(codeTerm)] = word + word2codeTerm[string(word)] = codeTerm + lastw-- + } + // Two byte codewords + for c1 := 0; c1 < 32 && lastw >= 0; c1++ { + for c2 := 0; c2 < 128 && lastw >= 0; c2++ { + code := []byte{0x40 | byte(c1), 0x80 | byte(c2)} // 0x40 is "dictionary encoded" flag, and 0x20 is flag meaning "last term of the key or value", 0x80 means "last byte of codeword" + codeTerm := []byte{0x40 | 0x20 | byte(c1), 0x80 | byte(c2)} + word := db.items[lastw].word + code2word[string(code)] = word + word2code[string(word)] = code + code2wordTerm[string(codeTerm)] = word + word2codeTerm[string(word)] = codeTerm + lastw-- + } + } + // Three byte codewords + for c1 := 0; c1 < 32 && lastw >= 0; c1++ { + for c2 := 0; c2 < 128 && lastw >= 0; c2++ { + for c3 := 0; c3 < 128 && lastw >= 0; c3++ { + code := []byte{0x40 | byte(c1), byte(c2), 0x80 | byte(c3)} // 0x40 is "dictionary encoded" flag, and 0x20 is flag meaning "last term of the key or value", 0x80 means "last byte of codeword" + codeTerm := []byte{0x40 | 0x20 | byte(c1), byte(c2), 0x80 | byte(c3)} + word := db.items[lastw].word + code2word[string(code)] = word + word2code[string(word)] = code + code2wordTerm[string(codeTerm)] = word + word2codeTerm[string(word)] = codeTerm + lastw-- + } + } + } + statefile := "statedump.dat" + f, err := os.Open(statefile) + if err != nil { + return err + } + defer f.Close() + // File to output the compression + var cf *os.File + if cf, err = os.Create("compressed.dat"); err != nil { + return err + } + w := bufio.NewWriter(cf) + if _, err = f.Seek(0, 0); err != nil { + return err + } + r := bufio.NewReader(f) + // Copy key count + if _, err = io.CopyN(w, r, 8); err != nil { + return err + } + var buf [256]byte + l, e := r.ReadByte() + i := 0 + output := 0 + for ; e == nil; l, e = r.ReadByte() { + if _, e = io.ReadFull(r, buf[:l]); e != nil { + return e + } + encodedPos := 0 + matches := MatchSorter(trie.Match(buf[:l])) + if matches.Len() > 0 { + sort.Sort(&matches) + lastMatch := matches[0] + for j := 1; j < matches.Len(); j++ { + match := matches[j] + // check overlap with lastMatch + if int(match.Pos()) < int(lastMatch.Pos())+len(lastMatch.Match()) { + // match with the highest pattern ID (corresponds to pattern with higher score) wins + if match.Pattern() > lastMatch.Pattern() { + // Replace the last match + lastMatch = match + } + } else { + // No overlap, make substituion + // Encode any leading characters + if err = encodeWithoutDictionary(buf[:], encodedPos, int(lastMatch.Pos()), w, false /* setTerminal */); err != nil { + return err + } + if encodedPos < int(lastMatch.Pos()) { + output += 1 + int(lastMatch.Pos()) - encodedPos + } + if int(l) == int(lastMatch.Pos())+len(lastMatch.Match()) { + if _, err = w.Write(word2codeTerm[lastMatch.MatchString()]); err != nil { + return err + } + output += len(word2codeTerm[lastMatch.MatchString()]) + } else { + if _, err = w.Write(word2code[lastMatch.MatchString()]); err != nil { + return err + } + output += len(word2code[lastMatch.MatchString()]) + } + encodedPos = int(lastMatch.Pos()) + len(lastMatch.Match()) + lastMatch = match + } + } + // Encode any leading characters + if err = encodeWithoutDictionary(buf[:], encodedPos, int(lastMatch.Pos()), w, false /* setTerminal */); err != nil { + return err + } + if encodedPos < int(lastMatch.Pos()) { + output += 1 + int(lastMatch.Pos()) - encodedPos + } + if int(l) == int(lastMatch.Pos())+len(lastMatch.Match()) { + if _, err = w.Write(word2codeTerm[lastMatch.MatchString()]); err != nil { + return err + } + output += len(word2codeTerm[lastMatch.MatchString()]) + } else { + if _, err = w.Write(word2code[lastMatch.MatchString()]); err != nil { + return err + } + output += len(word2code[lastMatch.MatchString()]) + } + encodedPos = int(lastMatch.Pos()) + len(lastMatch.Match()) + } + // Encode any remaining things + if err = encodeWithoutDictionary(buf[:], encodedPos, int(l), w, true /* setTerminal */); err != nil { + return err + } + if encodedPos < int(l) { + output += 1 + int(l) - encodedPos + } + i++ + if i%2_000_000 == 0 { + log.Info("Replacement", "key/value pairs", i/2, "output", common.StorageSize(output)) + } + } + if e != nil && !errors.Is(e, io.EOF) { + return e + } + f.Close() + w.Flush() + cf.Close() + // Decompress + if cf, err = os.Open("compressed.dat"); err != nil { + return err + } + if f, err = os.Create("decompressed.dat"); err != nil { + return err + } + r = bufio.NewReader(cf) + w = bufio.NewWriter(f) + // Copy key count + if _, err = io.CopyN(w, r, 8); err != nil { + return err + } + var readBuf [256]byte + var decodeBuf = make([]byte, 0, 256) + h, e := r.ReadByte() + for ; e == nil; h, e = r.ReadByte() { + var term bool + if h&0x40 == 0 { + term = h&0x80 != 0 + l := h&63 + 1 + if _, err = io.ReadFull(r, readBuf[:l]); err != nil { + return err + } + decodeBuf = append(decodeBuf, readBuf[:l]...) + } else { + term = h&0x20 != 0 + readBuf[0] = h + j := 0 + for readBuf[j]&0x80 == 0 { + j++ + if readBuf[j], e = r.ReadByte(); e != nil { + return e + } + } + if term { + decodeBuf = append(decodeBuf, code2wordTerm[string(readBuf[:j+1])]...) + } else { + decodeBuf = append(decodeBuf, code2word[string(readBuf[:j+1])]...) + } + } + if term { + if err = w.WriteByte(byte(len(decodeBuf))); err != nil { + return err + } + if len(decodeBuf) > 0 { + if _, err = w.Write(decodeBuf); err != nil { + return err + } + } + decodeBuf = decodeBuf[:0] + } + } + if e != nil && !errors.Is(e, io.EOF) { + return e + } + cf.Close() + if err = w.Flush(); err != nil { + return err + } + f.Close() return nil } @@ -2426,8 +3337,8 @@ func main() { case "snapSizes": err = snapSizes(*chaindata) - case "dumpState": - err = dumpState(*chaindata, uint64(*block)) + case "mphf": + err = mphf(*chaindata, uint64(*block)) case "readCallTraces": err = readCallTraces(*chaindata, uint64(*block)) @@ -2461,6 +3372,14 @@ func main() { case "devTx": err = devTx(*chaindata) + case "compress": + err = compress(*chaindata, uint64(*block)) + case "reducedict": + err = reducedict() + case "truecompress": + err = truecompress() + case "genstate": + err = genstate() } if err != nil { diff --git a/go.mod b/go.mod index beac1d3a36..14917e744b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/ledgerwatch/erigon go 1.16 require ( + github.com/BobuSumisu/aho-corasick v1.0.3 github.com/RoaringBitmap/roaring v0.9.4 github.com/VictoriaMetrics/fastcache v1.6.0 github.com/VictoriaMetrics/metrics v1.18.0 @@ -20,6 +21,7 @@ require ( github.com/emicklei/dot v0.16.0 github.com/fatih/color v1.12.0 github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f + github.com/flanglet/kanzi-go v1.9.0 github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/goccy/go-json v0.7.4 github.com/gofrs/flock v0.8.1 diff --git a/go.sum b/go.sum index b3baa7c233..030a57a942 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4= dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +github.com/BobuSumisu/aho-corasick v1.0.3 h1:uuf+JHwU9CHP2Vx+wAy6jcksJThhJS9ehR8a+4nPE9g= +github.com/BobuSumisu/aho-corasick v1.0.3/go.mod h1:hm4jLcvZKI2vRF2WDU1N4p/jpWtpOzp3nLmi9AzX/XE= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= @@ -263,6 +265,8 @@ github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f h1:Y/gg/utVetS+WS6htAKCTDralkm/8hLIIUAtLFdbdQ8= github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f/go.mod h1:q+7Z5oyy8cvKF3TakcuihvQvBHFTnXjB+7UP1e2Q+1o= +github.com/flanglet/kanzi-go v1.9.0 h1:bhpFJaGIKGio575OO6mWFec658OKKt7DKS9kMRSl6/U= +github.com/flanglet/kanzi-go v1.9.0/go.mod h1:/sUSVgDcbjsisuW42GPDgaMqvJ0McZERNICnD7b1nRA= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= github.com/frankban/quicktest v1.9.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= -- GitLab