diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index aa6005b5c15e7602e69ab3ea7748836a81de6fe6..5ca29c632d5eb14c846f3b3d3ee1be1d7ff376ff 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -816,7 +816,7 @@ Loop: for i := importedBn; i <= block; i++ { lastBlock = bcb.GetBlockByNumber(i) blocks = append(blocks, lastBlock) - if len(blocks) >= 20000 || i == block { + if len(blocks) >= 1000 || i == block { _, err = bc.InsertChain(context.Background(), blocks) if err != nil { log.Error("Could not insert blocks (group)", "number", len(blocks), "error", err) @@ -831,7 +831,7 @@ Loop: } blocks = types.Blocks{} } - if i%20000 == 0 { + if i%10000 == 0 { fmt.Printf("Inserted %dK, %s \n", i/1000, time.Since(now)) } } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 70bdae1a8781e25fe2051798565bd1aca7dc52ea..7d9ac664ba9e01f92c904c8c8a8ab4944faba6f2 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -213,9 +213,6 @@ func WriteHeader(ctx context.Context, db DatabaseWriter, header *types.Header) { if err != nil { log.Crit("Failed to RLP encode header", "err", err) } - if common.IsCanceled(ctx) { - return - } if err := db.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(number, hash), data); err != nil { log.Crit("Failed to store header", "err", err) } diff --git a/ethdb/abstractbench/abstract_bench_test.go b/ethdb/abstractbench/abstract_bench_test.go index 6bc7042248e05bef8d65726e9da09a6efb2c3780..a5a8e7b8fb9ead98c71c7a73325ea78a93962eb0 100644 --- a/ethdb/abstractbench/abstract_bench_test.go +++ b/ethdb/abstractbench/abstract_bench_test.go @@ -3,14 +3,12 @@ package abstractbench import ( "context" "encoding/binary" - "fmt" "os" + "sort" "testing" - "time" "github.com/dgraph-io/badger/v2" "github.com/ledgerwatch/bolt" - "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/ethdb" ) @@ -21,13 +19,20 @@ var boltDb ethdb.KV var badgerDb ethdb.KV var lmdbKV ethdb.KV -var keysAmount = 100_000 +var keysAmount = 1_000_000 -func setupDatabases() { - vsize, ctx := 10, context.Background() +func setupDatabases() func() { + //vsize, ctx := 10, context.Background() + clean := func() { + os.Remove("test") + os.RemoveAll("test2") + os.Remove("test3") + os.RemoveAll("test4") + os.RemoveAll("test5") + } boltDb = ethdb.NewBolt().Path("test").MustOpen() - badgerDb = ethdb.NewBadger().Path("test2").MustOpen() + //badgerDb = ethdb.NewBadger().Path("test2").MustOpen() lmdbKV = ethdb.NewLMDB().Path("test4").MustOpen() var errOpen error boltOriginDb, errOpen = bolt.Open("test3", 0600, &bolt.Options{KeysPrefixCompressionDisable: true}) @@ -35,110 +40,107 @@ func setupDatabases() { panic(errOpen) } - badgerOriginDb, errOpen = badger.Open(badger.DefaultOptions("test4")) - if errOpen != nil { - panic(errOpen) - } + //badgerOriginDb, errOpen = badger.Open(badger.DefaultOptions("test4")) + //if errOpen != nil { + // panic(errOpen) + //} _ = boltOriginDb.Update(func(tx *bolt.Tx) error { _, _ = tx.CreateBucketIfNotExists(dbutils.CurrentStateBucket, false) return nil }) - if err := boltOriginDb.Update(func(tx *bolt.Tx) error { - defer func(t time.Time) { fmt.Println("origin bolt filled:", time.Since(t)) }(time.Now()) - for i := 0; i < keysAmount; i++ { - v := make([]byte, vsize) - k := make([]byte, 8) - binary.BigEndian.PutUint64(k, uint64(i)) - bucket := tx.Bucket(dbutils.CurrentStateBucket) - if err := bucket.Put(k, common.CopyBytes(v)); err != nil { - return err - } - } - return nil - }); err != nil { - panic(err) - } - - if err := boltDb.Update(ctx, func(tx ethdb.Tx) error { - defer func(t time.Time) { fmt.Println("abstract bolt filled:", time.Since(t)) }(time.Now()) - - for i := 0; i < keysAmount; i++ { - v := make([]byte, vsize) - k := make([]byte, 8) - binary.BigEndian.PutUint64(k, uint64(i)) - bucket := tx.Bucket(dbutils.CurrentStateBucket) - if err := bucket.Put(k, common.CopyBytes(v)); err != nil { - panic(err) - } - } - - return nil - }); err != nil { - panic(err) - } - - if err := badgerDb.Update(ctx, func(tx ethdb.Tx) error { - defer func(t time.Time) { fmt.Println("abstract badger filled:", time.Since(t)) }(time.Now()) - - for i := 0; i < keysAmount; i++ { - v := make([]byte, vsize) - k := make([]byte, 8) - binary.BigEndian.PutUint64(k, uint64(i)) - bucket := tx.Bucket(dbutils.CurrentStateBucket) - if err := bucket.Put(k, common.CopyBytes(v)); err != nil { - panic(err) - } - } - - return nil - }); err != nil { - panic(err) - } - - if err := badgerOriginDb.Update(func(tx *badger.Txn) error { - defer func(t time.Time) { fmt.Println("pure badger filled:", time.Since(t)) }(time.Now()) - - for i := 0; i < keysAmount; i++ { - v := make([]byte, vsize) - k := make([]byte, 8) - binary.BigEndian.PutUint64(k, uint64(i)) - _ = tx.Set(append(dbutils.CurrentStateBucket, k...), common.CopyBytes(v)) - } - - return nil - }); err != nil { - panic(err) - } - - if err := lmdbKV.Update(ctx, func(tx ethdb.Tx) error { - defer func(t time.Time) { fmt.Println("abstract lmdb filled:", time.Since(t)) }(time.Now()) - - bucket := tx.Bucket(dbutils.CurrentStateBucket) - for i := 0; i < keysAmount; i++ { - v := make([]byte, vsize) - k := make([]byte, 8) - binary.BigEndian.PutUint64(k, uint64(i)) - if err := bucket.Put(k, common.CopyBytes(v)); err != nil { - panic(err) - } - } - - return nil - }); err != nil { - panic(err) - } + //if err := boltOriginDb.Update(func(tx *bolt.Tx) error { + // defer func(t time.Time) { fmt.Println("origin bolt filled:", time.Since(t)) }(time.Now()) + // for i := 0; i < keysAmount; i++ { + // v := make([]byte, vsize) + // k := make([]byte, 8) + // binary.BigEndian.PutUint64(k, uint64(i)) + // bucket := tx.Bucket(dbutils.CurrentStateBucket) + // if err := bucket.Put(k, common.CopyBytes(v)); err != nil { + // return err + // } + // } + // return nil + //}); err != nil { + // panic(err) + //} + // + //if err := boltDb.Update(ctx, func(tx ethdb.Tx) error { + // defer func(t time.Time) { fmt.Println("abstract bolt filled:", time.Since(t)) }(time.Now()) + // + // for i := 0; i < keysAmount; i++ { + // v := make([]byte, vsize) + // k := make([]byte, 8) + // binary.BigEndian.PutUint64(k, uint64(i)) + // bucket := tx.Bucket(dbutils.CurrentStateBucket) + // if err := bucket.Put(k, common.CopyBytes(v)); err != nil { + // panic(err) + // } + // } + // + // return nil + //}); err != nil { + // panic(err) + //} + // + //if err := badgerDb.Update(ctx, func(tx ethdb.Tx) error { + // defer func(t time.Time) { fmt.Println("abstract badger filled:", time.Since(t)) }(time.Now()) + // + // //for i := 0; i < keysAmount; i++ { + // // v := make([]byte, vsize) + // // k := make([]byte, 8) + // // binary.BigEndian.PutUint64(k, uint64(i)) + // // bucket := tx.Bucket(dbutils.CurrentStateBucket) + // // if err := bucket.Put(k, common.CopyBytes(v)); err != nil { + // // panic(err) + // // } + // //} + // + // return nil + //}); err != nil { + // panic(err) + //} + // + //if err := badgerOriginDb.Update(func(tx *badger.Txn) error { + // defer func(t time.Time) { fmt.Println("pure badger filled:", time.Since(t)) }(time.Now()) + // + // for i := 0; i < keysAmount; i++ { + // v := make([]byte, vsize) + // k := make([]byte, 8) + // binary.BigEndian.PutUint64(k, uint64(i)) + // _ = tx.Set(append(dbutils.CurrentStateBucket, k...), common.CopyBytes(v)) + // } + // + // return nil + //}); err != nil { + // panic(err) + //} + // + //if err := lmdbKV.Update(ctx, func(tx ethdb.Tx) error { + // defer func(t time.Time) { fmt.Println("abstract lmdb filled:", time.Since(t)) }(time.Now()) + // + // bucket := tx.Bucket(dbutils.CurrentStateBucket) + // for i := 0; i < keysAmount; i++ { + // v := make([]byte, vsize) + // k := make([]byte, 8) + // binary.BigEndian.PutUint64(k, uint64(i)) + // if err := bucket.Put(k, common.CopyBytes(v)); err != nil { + // panic(err) + // } + // } + // + // return nil + //}); err != nil { + // panic(err) + //} + return clean } func BenchmarkGet(b *testing.B) { - setupDatabases() - defer os.Remove("test") - defer os.RemoveAll("test2") - defer os.Remove("test3") - defer os.RemoveAll("test4") - defer os.RemoveAll("test5") + clean := setupDatabases() + defer clean() k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(keysAmount-1)) @@ -168,13 +170,43 @@ func BenchmarkGet(b *testing.B) { }) } +func BenchmarkPut(b *testing.B) { + clean := setupDatabases() + defer clean() + tuples := make(ethdb.MultiPutTuples, 0, keysAmount*3) + for i := 0; i < keysAmount; i++ { + k := make([]byte, 8) + binary.BigEndian.PutUint64(k, uint64(i)) + v := []byte{1, 2, 3, 4, 5, 6, 7, 8} + tuples = append(tuples, dbutils.CurrentStateBucket, k, v) + } + sort.Sort(tuples) + + b.Run("bolt", func(b *testing.B) { + db := ethdb.NewWrapperBoltDatabase(boltOriginDb).NewBatch() + for i := 0; i < b.N; i++ { + _, _ = db.MultiPut(tuples...) + _, _ = db.Commit() + } + }) + //b.Run("badger", func(b *testing.B) { + // db := ethdb.NewObjectDatabase(badgerDb) + // for i := 0; i < b.N; i++ { + // _, _ = db.MultiPut(tuples...) + // } + //}) + b.Run("lmdb", func(b *testing.B) { + db := ethdb.NewObjectDatabase(lmdbKV).NewBatch() + for i := 0; i < b.N; i++ { + _, _ = db.MultiPut(tuples...) + _, _ = db.Commit() + } + }) +} + func BenchmarkCursor(b *testing.B) { - setupDatabases() - defer os.Remove("test") - defer os.RemoveAll("test2") - defer os.Remove("test3") - defer os.RemoveAll("test4") - defer os.RemoveAll("test5") + clean := setupDatabases() + defer clean() ctx := context.Background() diff --git a/ethdb/bolt_db.go b/ethdb/bolt_db.go index ab0991c200a4b71dd4dc3be3780b873f95f2e6e3..2b99724012c782cf388257998d04a53eb34efcbb 100644 --- a/ethdb/bolt_db.go +++ b/ethdb/bolt_db.go @@ -140,16 +140,15 @@ func (db *BoltDatabase) MultiPut(tuples ...[]byte) (uint64, error) { } c := b.Cursor() l := (bucketEnd - bucketStart) / 3 - pairs := make([][]byte, 2*l) for i := 0; i < l; i++ { - pairs[2*i] = tuples[bucketStart+3*i+1] - pairs[2*i+1] = tuples[bucketStart+3*i+2] - if pairs[2*i+1] == nil { - if err := c.Delete2(pairs[2*i]); err != nil { + k := tuples[bucketStart+3*i+1] + v := tuples[bucketStart+3*i+2] + if v == nil { + if err := c.Delete2(k); err != nil { return err } } else { - if err := c.Put(pairs[2*i], pairs[2*i+1]); err != nil { + if err := c.Put(k, v); err != nil { return err } } diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index 924dd9408959e7993633dd69337465d745b2cac4..657a1abbb0ae7764a68d5c311f66554c9c84275a 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -20,6 +20,7 @@ import ( var ( lmdbKvTxPool = sync.Pool{New: func() interface{} { return &lmdbTx{} }} lmdbKvCursorPool = sync.Pool{New: func() interface{} { return &lmdbCursor{} }} + lmdbKvBucketPool = sync.Pool{New: func() interface{} { return &lmdbBucket{} }} ) type lmdbOpts struct { @@ -295,6 +296,7 @@ type lmdbTx struct { ctx context.Context db *LmdbKV cursors []*lmdbCursor + buckets []*lmdbBucket } type lmdbBucket struct { @@ -305,7 +307,7 @@ type lmdbBucket struct { type lmdbCursor struct { ctx context.Context - bucket lmdbBucket + bucket *lmdbBucket prefix []byte cursor *lmdb.Cursor @@ -347,7 +349,18 @@ func (tx *lmdbTx) Bucket(name []byte) Bucket { panic(fmt.Errorf("unknown bucket: %s. add it to dbutils.Buckets", string(name))) } - return lmdbBucket{tx: tx, dbi: tx.db.buckets[id], id: id} + b := lmdbKvBucketPool.Get().(*lmdbBucket) + b.tx = tx + b.dbi = tx.db.buckets[id] + b.id = id + + // add to auto-close on end of transactions + if b.tx.buckets == nil { + b.tx.buckets = make([]*lmdbBucket, 0, 1) + } + b.tx.buckets = append(b.tx.buckets, b) + + return b } func (tx *lmdbTx) Commit(ctx context.Context) error { @@ -373,6 +386,10 @@ func (tx *lmdbTx) closeCursors() { lmdbKvCursorPool.Put(c) } tx.cursors = tx.cursors[:0] + for _, b := range tx.buckets { + lmdbKvBucketPool.Put(b) + } + tx.buckets = tx.buckets[:0] } func (c *lmdbCursor) Prefix(v []byte) Cursor { @@ -408,7 +425,7 @@ func (b lmdbBucket) Get(key []byte) (val []byte, err error) { return val, err } -func (b lmdbBucket) Put(key []byte, value []byte) error { +func (b *lmdbBucket) Put(key []byte, value []byte) error { select { case <-b.tx.ctx.Done(): return b.tx.ctx.Err() @@ -422,7 +439,7 @@ func (b lmdbBucket) Put(key []byte, value []byte) error { return nil } -func (b lmdbBucket) Delete(key []byte) error { +func (b *lmdbBucket) Delete(key []byte) error { select { case <-b.tx.ctx.Done(): return b.tx.ctx.Err() @@ -436,7 +453,7 @@ func (b lmdbBucket) Delete(key []byte) error { return err } -func (b lmdbBucket) Size() (uint64, error) { +func (b *lmdbBucket) Size() (uint64, error) { st, err := b.tx.tx.Stat(b.dbi) if err != nil { return 0, err @@ -444,7 +461,7 @@ func (b lmdbBucket) Size() (uint64, error) { return (st.LeafPages + st.BranchPages + st.OverflowPages) * uint64(os.Getpagesize()), nil } -func (b lmdbBucket) Cursor() Cursor { +func (b *lmdbBucket) Cursor() Cursor { c := lmdbKvCursorPool.Get().(*lmdbCursor) c.ctx = b.tx.ctx c.bucket = b diff --git a/ethdb/mutation.go b/ethdb/mutation.go index dff46e5478d55a4348e9a1e2b61947c579903d2b..b1b318be1c68783c36b9f6cda5049b147d5e6081 100644 --- a/ethdb/mutation.go +++ b/ethdb/mutation.go @@ -11,9 +11,10 @@ import ( ) type mutation struct { - puts *puts // Map buckets to map[key]value - mu sync.RWMutex - db Database + puts *puts // Map buckets to map[key]value + mu sync.RWMutex + db Database + tuples MultiPutTuples } func (m *mutation) KV() KV { @@ -145,22 +146,26 @@ func (m *mutation) Commit() (uint64, error) { } m.mu.Lock() defer m.mu.Unlock() - tuples := make(MultiPutTuples, 0, m.puts.Len()*3) + if m.tuples == nil { + m.tuples = make(MultiPutTuples, 0, m.puts.Len()*3) + } + m.tuples = m.tuples[:0] for bucketStr, bt := range m.puts.mp { bucketB := []byte(bucketStr) for key := range bt { value, _ := bt.GetStr(key) - tuples = append(tuples, bucketB, []byte(key), value) + m.tuples = append(m.tuples, bucketB, []byte(key), value) } } - sort.Sort(tuples) + sort.Sort(m.tuples) - written, err := m.db.MultiPut(tuples...) + written, err := m.db.MultiPut(m.tuples...) if err != nil { return 0, fmt.Errorf("db.MultiPut failed: %w", err) } m.puts = newPuts() + m.tuples = m.tuples[:0] return written, nil } @@ -168,6 +173,7 @@ func (m *mutation) Rollback() { m.mu.Lock() defer m.mu.Unlock() m.puts = newPuts() + m.tuples = m.tuples[:0] } func (m *mutation) Keys() ([][]byte, error) { diff --git a/ethdb/object_db.go b/ethdb/object_db.go index 392a938d28af571e9fb49d5db8225b9fc328128d..80b4fddfdf2b3c35f9124657437fcd81e7ab4a0d 100644 --- a/ethdb/object_db.go +++ b/ethdb/object_db.go @@ -59,16 +59,15 @@ func (db *ObjectDatabase) MultiPut(tuples ...[]byte) (uint64, error) { b := tx.Bucket(tuples[bucketStart]) c := b.Cursor() l := (bucketEnd - bucketStart) / 3 - pairs := make([][]byte, 2*l) for i := 0; i < l; i++ { - pairs[2*i] = tuples[bucketStart+3*i+1] - pairs[2*i+1] = tuples[bucketStart+3*i+2] - if pairs[2*i+1] == nil { - if err := c.Delete(pairs[2*i]); err != nil { + k := tuples[bucketStart+3*i+1] + v := tuples[bucketStart+3*i+2] + if v == nil { + if err := c.Delete(k); err != nil { return err } } else { - if err := c.Put(pairs[2*i], pairs[2*i+1]); err != nil { + if err := c.Put(k, v); err != nil { return err } }