From c1213bd00c2a84a9dfc218e44cc2f85902f91128 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= <janos@users.noreply.github.com>
Date: Thu, 25 Apr 2019 10:22:57 +0200
Subject: [PATCH] swarm: LocalStore metrics

* swarm/shed: remove metrics fields from DB struct

* swarm/schunk: add String methods to modes

* swarm/storage/localstore: add metrics and traces

* swarm/chunk: unknown modes without spaces in String methods

* swarm/storage/localstore: remove bin number from pull subscription metrics

* swarm/storage/localstore: add resetting time metrics and code improvements
---
 swarm/chunk/chunk.go                          | 39 ++++++++++
 swarm/shed/db.go                              | 71 ++++++++-----------
 swarm/storage/localstore/gc.go                | 12 ++++
 swarm/storage/localstore/localstore.go        | 10 +++
 swarm/storage/localstore/mode_get.go          | 28 +++++++-
 swarm/storage/localstore/mode_has.go          | 21 +++++-
 swarm/storage/localstore/mode_put.go          | 22 +++++-
 swarm/storage/localstore/mode_set.go          | 22 +++++-
 swarm/storage/localstore/subscription_pull.go | 32 +++++++++
 swarm/storage/localstore/subscription_push.go | 29 ++++++++
 10 files changed, 239 insertions(+), 47 deletions(-)

diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go
index 2455904f3..9ae59c95f 100644
--- a/swarm/chunk/chunk.go
+++ b/swarm/chunk/chunk.go
@@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) {
 // ModeGet enumerates different Getter modes.
 type ModeGet int
 
+func (m ModeGet) String() string {
+	switch m {
+	case ModeGetRequest:
+		return "Request"
+	case ModeGetSync:
+		return "Sync"
+	case ModeGetLookup:
+		return "Lookup"
+	default:
+		return "Unknown"
+	}
+}
+
 // Getter modes.
 const (
 	// ModeGetRequest: when accessed for retrieval
@@ -125,6 +138,19 @@ const (
 // ModePut enumerates different Putter modes.
 type ModePut int
 
+func (m ModePut) String() string {
+	switch m {
+	case ModePutRequest:
+		return "Request"
+	case ModePutSync:
+		return "Sync"
+	case ModePutUpload:
+		return "Upload"
+	default:
+		return "Unknown"
+	}
+}
+
 // Putter modes.
 const (
 	// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
@@ -138,6 +164,19 @@ const (
 // ModeSet enumerates different Setter modes.
 type ModeSet int
 
+func (m ModeSet) String() string {
+	switch m {
+	case ModeSetAccess:
+		return "Access"
+	case ModeSetSync:
+		return "Sync"
+	case ModeSetRemove:
+		return "Remove"
+	default:
+		return "Unknown"
+	}
+}
+
 // Setter modes.
 const (
 	// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
diff --git a/swarm/shed/db.go b/swarm/shed/db.go
index 8c11bf48b..6fc520866 100644
--- a/swarm/shed/db.go
+++ b/swarm/shed/db.go
@@ -45,16 +45,7 @@ const (
 // It provides a schema functionality to store fields and indexes
 // information about naming and types.
 type DB struct {
-	ldb *leveldb.DB
-
-	compTimeMeter    metrics.Meter // Meter for measuring the total time spent in database compaction
-	compReadMeter    metrics.Meter // Meter for measuring the data read during compaction
-	compWriteMeter   metrics.Meter // Meter for measuring the data written during compaction
-	writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
-	writeDelayMeter  metrics.Meter // Meter for measuring the write delay duration due to database compaction
-	diskReadMeter    metrics.Meter // Meter for measuring the effective amount of data read
-	diskWriteMeter   metrics.Meter // Meter for measuring the effective amount of data written
-
+	ldb  *leveldb.DB
 	quit chan struct{} // Quit channel to stop the metrics collection before closing the database
 }
 
@@ -86,13 +77,10 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) {
 		}
 	}
 
-	// Configure meters for DB
-	db.configure(metricsPrefix)
-
 	// Create a quit channel for the periodic metrics collector and run it
 	db.quit = make(chan struct{})
 
-	go db.meter(10 * time.Second)
+	go db.meter(metricsPrefix, 10*time.Second)
 
 	return db, nil
 }
@@ -169,19 +157,22 @@ func (db *DB) Close() (err error) {
 	return db.ldb.Close()
 }
 
-// Configure configures the database metrics collectors
-func (db *DB) configure(prefix string) {
-	// Initialize all the metrics collector at the requested prefix
-	db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
-	db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
-	db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
-	db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
-	db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
-	db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
-	db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
-}
+func (db *DB) meter(prefix string, refresh time.Duration) {
+	// Meter for measuring the total time spent in database compaction
+	compTimeMeter := metrics.NewRegisteredMeter(prefix+"compact/time", nil)
+	// Meter for measuring the data read during compaction
+	compReadMeter := metrics.NewRegisteredMeter(prefix+"compact/input", nil)
+	// Meter for measuring the data written during compaction
+	compWriteMeter := metrics.NewRegisteredMeter(prefix+"compact/output", nil)
+	// Meter for measuring the write delay number due to database compaction
+	writeDelayMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
+	// Meter for measuring the write delay duration due to database compaction
+	writeDelayNMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
+	// Meter for measuring the effective amount of data read
+	diskReadMeter := metrics.NewRegisteredMeter(prefix+"disk/read", nil)
+	// Meter for measuring the effective amount of data written
+	diskWriteMeter := metrics.NewRegisteredMeter(prefix+"disk/write", nil)
 
-func (db *DB) meter(refresh time.Duration) {
 	// Create the counters to store current and previous compaction values
 	compactions := make([][]float64, 2)
 	for i := 0; i < 2; i++ {
@@ -234,14 +225,14 @@ func (db *DB) meter(refresh time.Duration) {
 			}
 		}
 		// Update all the requested meters
-		if db.compTimeMeter != nil {
-			db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
+		if compTimeMeter != nil {
+			compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
 		}
-		if db.compReadMeter != nil {
-			db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
+		if compReadMeter != nil {
+			compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
 		}
-		if db.compWriteMeter != nil {
-			db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
+		if compWriteMeter != nil {
+			compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
 		}
 
 		// Retrieve the write delay statistic
@@ -265,11 +256,11 @@ func (db *DB) meter(refresh time.Duration) {
 			log.Error("Failed to parse delay duration", "err", err)
 			continue
 		}
-		if db.writeDelayNMeter != nil {
-			db.writeDelayNMeter.Mark(delayN - delaystats[0])
+		if writeDelayNMeter != nil {
+			writeDelayNMeter.Mark(delayN - delaystats[0])
 		}
-		if db.writeDelayMeter != nil {
-			db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
+		if writeDelayMeter != nil {
+			writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
 		}
 		// If a warning that db is performing compaction has been displayed, any subsequent
 		// warnings will be withheld for one minute not to overwhelm the user.
@@ -300,11 +291,11 @@ func (db *DB) meter(refresh time.Duration) {
 			log.Error("Bad syntax of write entry", "entry", parts[1])
 			continue
 		}
-		if db.diskReadMeter != nil {
-			db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
+		if diskReadMeter != nil {
+			diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
 		}
-		if db.diskWriteMeter != nil {
-			db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
+		if diskWriteMeter != nil {
+			diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
 		}
 		iostats[0], iostats[1] = nRead, nWrite
 
diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go
index 84c4f596d..28c7b6db9 100644
--- a/swarm/storage/localstore/gc.go
+++ b/swarm/storage/localstore/gc.go
@@ -17,7 +17,10 @@
 package localstore
 
 import (
+	"time"
+
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/shed"
 	"github.com/syndtr/goleveldb/leveldb"
 )
@@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
 // the rest of the garbage as the batch size limit is reached.
 // This function is called in collectGarbageWorker.
 func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
+	metricName := "localstore.gc"
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+	defer totalTimeMetric(metricName, time.Now())
+	defer func() {
+		if err != nil {
+			metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+		}
+	}()
+
 	batch := new(leveldb.Batch)
 	target := db.gcTarget()
 
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
index 56a6d10e6..c32d2972d 100644
--- a/swarm/storage/localstore/localstore.go
+++ b/swarm/storage/localstore/localstore.go
@@ -23,6 +23,7 @@ import (
 	"time"
 
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
 	"github.com/ethereum/go-ethereum/swarm/shed"
 	"github.com/ethereum/go-ethereum/swarm/storage/mock"
@@ -388,3 +389,12 @@ func init() {
 		return time.Now().UTC().UnixNano()
 	}
 }
+
+// totalTimeMetric logs a message about time between provided start time
+// and the time when the function is called and sends a resetting timer metric
+// with provided name appended with ".total-time".
+func totalTimeMetric(name string, start time.Time) {
+	totalTime := time.Since(start)
+	log.Trace(name+" total time", "time", totalTime)
+	metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
+}
diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go
index 0df0e9b7d..48603550c 100644
--- a/swarm/storage/localstore/mode_get.go
+++ b/swarm/storage/localstore/mode_get.go
@@ -18,10 +18,15 @@ package localstore
 
 import (
 	"context"
+	"fmt"
+	"time"
 
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
 	"github.com/ethereum/go-ethereum/swarm/shed"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	olog "github.com/opentracing/opentracing-go/log"
 	"github.com/syndtr/goleveldb/leveldb"
 )
 
@@ -30,7 +35,22 @@ import (
 // All required indexes will be updated required by the
 // Getter Mode. Get is required to implement chunk.Store
 // interface.
-func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
+func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
+	metricName := fmt.Sprintf("localstore.Get.%s", mode)
+
+	ctx, sp := spancontext.StartSpan(ctx, metricName)
+	defer sp.Finish()
+	sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-get", mode.String()))
+
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+	defer totalTimeMetric(metricName, time.Now())
+
+	defer func() {
+		if err != nil {
+			metrics.GetOrRegisterCounter(fmt.Sprintf(metricName+".error", mode), nil).Inc(1)
+		}
+	}()
+
 	out, err := db.get(mode, addr)
 	if err != nil {
 		if err == leveldb.ErrNotFound {
@@ -66,8 +86,14 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
 				// for a new goroutine
 				defer func() { <-db.updateGCSem }()
 			}
+
+			metricName := "localstore.updateGC"
+			metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+			defer totalTimeMetric(metricName, time.Now())
+
 			err := db.updateGC(out)
 			if err != nil {
+				metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
 				log.Error("localstore update gc", "err", err)
 			}
 			// if gc update hook is defined, call it
diff --git a/swarm/storage/localstore/mode_has.go b/swarm/storage/localstore/mode_has.go
index fea8a50bf..ae1a8970a 100644
--- a/swarm/storage/localstore/mode_has.go
+++ b/swarm/storage/localstore/mode_has.go
@@ -18,11 +18,28 @@ package localstore
 
 import (
 	"context"
+	"time"
 
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	olog "github.com/opentracing/opentracing-go/log"
 )
 
 // Has returns true if the chunk is stored in database.
-func (db *DB) Has(_ context.Context, addr chunk.Address) (bool, error) {
-	return db.retrievalDataIndex.Has(addressToItem(addr))
+func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
+	metricName := "localstore.Has"
+
+	ctx, sp := spancontext.StartSpan(ctx, metricName)
+	defer sp.Finish()
+	sp.LogFields(olog.String("ref", addr.String()))
+
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+	defer totalTimeMetric(metricName, time.Now())
+
+	has, err := db.retrievalDataIndex.Has(addressToItem(addr))
+	if err != nil {
+		metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+	}
+	return has, err
 }
diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go
index 488e4d8e1..c91a394a0 100644
--- a/swarm/storage/localstore/mode_put.go
+++ b/swarm/storage/localstore/mode_put.go
@@ -18,9 +18,14 @@ package localstore
 
 import (
 	"context"
+	"fmt"
+	"time"
 
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
 	"github.com/ethereum/go-ethereum/swarm/shed"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	olog "github.com/opentracing/opentracing-go/log"
 	"github.com/syndtr/goleveldb/leveldb"
 )
 
@@ -28,8 +33,21 @@ import (
 // on the Putter mode, it updates required indexes.
 // Put is required to implement chunk.Store
 // interface.
-func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
-	return db.put(mode, chunkToItem(ch))
+func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
+	metricName := fmt.Sprintf("localstore.Put.%s", mode)
+
+	ctx, sp := spancontext.StartSpan(ctx, metricName)
+	defer sp.Finish()
+	sp.LogFields(olog.String("ref", ch.Address().String()), olog.String("mode-put", mode.String()))
+
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+	defer totalTimeMetric(metricName, time.Now())
+
+	exists, err = db.put(mode, chunkToItem(ch))
+	if err != nil {
+		metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+	}
+	return exists, err
 }
 
 // put stores Item to database and updates other
diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go
index 13e98d1ec..7edfa6703 100644
--- a/swarm/storage/localstore/mode_set.go
+++ b/swarm/storage/localstore/mode_set.go
@@ -18,8 +18,13 @@ package localstore
 
 import (
 	"context"
+	"fmt"
+	"time"
 
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	olog "github.com/opentracing/opentracing-go/log"
 	"github.com/syndtr/goleveldb/leveldb"
 )
 
@@ -27,8 +32,21 @@ import (
 // chunk represented by the address.
 // Set is required to implement chunk.Store
 // interface.
-func (db *DB) Set(_ context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
-	return db.set(mode, addr)
+func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
+	metricName := fmt.Sprintf("localstore.Set.%s", mode)
+
+	ctx, sp := spancontext.StartSpan(ctx, metricName)
+	defer sp.Finish()
+	sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-set", mode.String()))
+
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+	defer totalTimeMetric(metricName, time.Now())
+
+	err = db.set(mode, addr)
+	if err != nil {
+		metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
+	}
+	return err
 }
 
 // set updates database indexes for a specific
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
index fd81b045b..7a18141b3 100644
--- a/swarm/storage/localstore/subscription_pull.go
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -20,10 +20,15 @@ import (
 	"context"
 	"errors"
 	"sync"
+	"time"
 
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
 	"github.com/ethereum/go-ethereum/swarm/shed"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	"github.com/opentracing/opentracing-go"
+	olog "github.com/opentracing/opentracing-go/log"
 	"github.com/syndtr/goleveldb/leveldb"
 )
 
@@ -36,6 +41,9 @@ import (
 // Make sure that you check the second returned parameter from the channel to stop iteration when its value
 // is false.
 func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
+	metricName := "localstore.SubscribePull"
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
 	chunkDescriptors := make(chan chunk.Descriptor)
 	trigger := make(chan struct{}, 1)
 
@@ -57,6 +65,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
 	var errStopSubscription = errors.New("stop subscription")
 
 	go func() {
+		defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
 		// close the returned chunk.Descriptor channel at the end to
 		// signal that the subscription is done
 		defer close(chunkDescriptors)
@@ -77,12 +86,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
 				// - last index Item is reached
 				// - subscription stop is called
 				// - context is done
+				metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+				ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+				sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until))
+
+				iterStart := time.Now()
+				var count int
 				err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
 					select {
 					case chunkDescriptors <- chunk.Descriptor{
 						Address: item.Address,
 						BinID:   item.BinID,
 					}:
+						count++
 						// until chunk descriptor is sent
 						// break the iteration
 						if until > 0 && item.BinID >= until {
@@ -111,12 +128,25 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
 					SkipStartFromItem: !first,
 					Prefix:            []byte{bin},
 				})
+
+				totalTimeMetric(metricName+".iter", iterStart)
+
+				sp.FinishWithOptions(opentracing.FinishOptions{
+					LogRecords: []opentracing.LogRecord{
+						{
+							Timestamp: time.Now(),
+							Fields:    []olog.Field{olog.Int("count", count)},
+						},
+					},
+				})
+
 				if err != nil {
 					if err == errStopSubscription {
 						// stop subscription without any errors
 						// if until is reached
 						return
 					}
+					metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
 					log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
 					return
 				}
@@ -162,6 +192,8 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
 // in pull syncing index for a provided bin. If there are no chunks in
 // that bin, 0 value is returned.
 func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
+	metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1)
+
 	item, err := db.pullIndex.Last([]byte{bin})
 	if err != nil {
 		if err == leveldb.ErrNotFound {
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
index 5cbc2eb6f..f2463af2a 100644
--- a/swarm/storage/localstore/subscription_push.go
+++ b/swarm/storage/localstore/subscription_push.go
@@ -19,10 +19,15 @@ package localstore
 import (
 	"context"
 	"sync"
+	"time"
 
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/swarm/chunk"
 	"github.com/ethereum/go-ethereum/swarm/shed"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	"github.com/opentracing/opentracing-go"
+	olog "github.com/opentracing/opentracing-go/log"
 )
 
 // SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
@@ -30,6 +35,9 @@ import (
 // the returned channel without any errors. Make sure that you check the second returned parameter
 // from the channel to stop iteration when its value is false.
 func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
+	metricName := "localstore.SubscribePush"
+	metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
+
 	chunks := make(chan chunk.Chunk)
 	trigger := make(chan struct{}, 1)
 
@@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
 	var stopChanOnce sync.Once
 
 	go func() {
+		defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
 		// close the returned chunkInfo channel at the end to
 		// signal that the subscription is done
 		defer close(chunks)
@@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
 				// - last index Item is reached
 				// - subscription stop is called
 				// - context is done
+				metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
+
+				ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
+
+				iterStart := time.Now()
+				var count int
 				err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
 					// get chunk data
 					dataItem, err := db.retrievalDataIndex.Get(item)
@@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
 
 					select {
 					case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+						count++
 						// set next iteration start item
 						// when its chunk is successfully sent to channel
 						sinceItem = &item
@@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
 					// iterator call, skip it in this one
 					SkipStartFromItem: true,
 				})
+
+				totalTimeMetric(metricName+".iter", iterStart)
+
+				sp.FinishWithOptions(opentracing.FinishOptions{
+					LogRecords: []opentracing.LogRecord{
+						{
+							Timestamp: time.Now(),
+							Fields:    []olog.Field{olog.Int("count", count)},
+						},
+					},
+				})
+
 				if err != nil {
+					metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
 					log.Error("localstore push subscription iteration", "err", err)
 					return
 				}
-- 
GitLab