From d36e974ba303d12d79d769d0811dd5babcf6688f Mon Sep 17 00:00:00 2001
From: lash <nolash@users.noreply.github.com>
Date: Wed, 20 Feb 2019 14:50:37 +0100
Subject: [PATCH] swarm/network: Keep span across roundtrip (#19140)

* swarm/newtork: WIP Span request span until delivery and put

* swarm/storage: Introduce new trace across single fetcher lifespan

* swarm/network: Put span ids for sendpriority in context value

* swarm: Add global span store in tracing

* swarm/tracing: Add context key constants

* swarm/tracing: Add comments

* swarm/storage: Remove redundant fix for filestore

* swarm/tracing: Elaborate constants comments

* swarm/network, swarm/storage, swarm:tracing: Minor cleanup
---
 swarm/network/stream/delivery.go | 27 +++++++----
 swarm/network/stream/messages.go |  2 +-
 swarm/network/stream/peer.go     | 29 +++++-------
 swarm/network/stream/stream.go   |  4 +-
 swarm/storage/netstore.go        | 32 ++++++++-----
 swarm/swarm.go                   |  1 +
 swarm/tracing/tracing.go         | 77 ++++++++++++++++++++++++++++++--
 7 files changed, 127 insertions(+), 45 deletions(-)

diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index fae6994f0..02c5f222c 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -27,6 +27,7 @@ import (
 	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/spancontext"
 	"github.com/ethereum/go-ethereum/swarm/storage"
+	"github.com/ethereum/go-ethereum/swarm/tracing"
 	opentracing "github.com/opentracing/opentracing-go"
 )
 
@@ -143,7 +144,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 	var osp opentracing.Span
 	ctx, osp = spancontext.StartSpan(
 		ctx,
-		"retrieve.request")
+		"stream.handle.retrieve")
 
 	s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
 	if err != nil {
@@ -207,17 +208,19 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
 //defines a chunk delivery for syncing (without accounting)
 type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
 
-// TODO: Fix context SNAFU
+// chunk delivery msg is response to retrieverequest msg
 func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
-	var osp opentracing.Span
-	ctx, osp = spancontext.StartSpan(
-		ctx,
-		"chunk.delivery")
 
 	processReceivedChunksCount.Inc(1)
 
+	// retrieve the span for the originating retrieverequest
+	spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
+	span := tracing.ShiftSpanByKey(spanId)
+
 	go func() {
-		defer osp.Finish()
+		if span != nil {
+			defer span.(opentracing.Span).Finish()
+		}
 
 		req.peer = sp
 		err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
@@ -233,7 +236,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
 	return nil
 }
 
-// RequestFromPeers sends a chunk retrieve request to
+// RequestFromPeers sends a chunk retrieve request to a peer
+// The most eligible peer that hasn't already been sent to is chosen
+// TODO: define "eligible"
 func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
 	requestFromPeersCount.Inc(1)
 	var sp *Peer
@@ -268,11 +273,15 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
 		}
 	}
 
+	// setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
+	// this span will finish only when delivery is handled (or times out)
+	ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
+	ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
 	err := sp.SendPriority(ctx, &RetrieveRequestMsg{
 		Addr:      req.Addr,
 		SkipCheck: req.SkipCheck,
 		HopCount:  req.HopCount,
-	}, Top, "request.from.peers")
+	}, Top)
 	if err != nil {
 		return nil, nil, err
 	}
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index de4e8a3bb..b293724cc 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
 			return
 		}
 		log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
-		err := p.SendPriority(ctx, msg, c.priority, "")
+		err := p.SendPriority(ctx, msg, c.priority)
 		if err != nil {
 			log.Warn("SendPriority error", "err", err)
 		}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 68da8f44a..c59799e08 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -31,6 +31,7 @@ import (
 	"github.com/ethereum/go-ethereum/swarm/spancontext"
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
+	"github.com/ethereum/go-ethereum/swarm/tracing"
 	opentracing "github.com/opentracing/opentracing-go"
 )
 
@@ -65,7 +66,6 @@ type Peer struct {
 	// on creating a new client in offered hashes handler.
 	clientParams map[Stream]*clientParams
 	quit         chan struct{}
-	spans        sync.Map
 }
 
 type WrappedPriorityMsg struct {
@@ -83,16 +83,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
 		clients:      make(map[Stream]*client),
 		clientParams: make(map[Stream]*clientParams),
 		quit:         make(chan struct{}),
-		spans:        sync.Map{},
 	}
 	ctx, cancel := context.WithCancel(context.Background())
 	go p.pq.Run(ctx, func(i interface{}) {
 		wmsg := i.(WrappedPriorityMsg)
-		defer p.spans.Delete(wmsg.Context)
-		sp, ok := p.spans.Load(wmsg.Context)
-		if ok {
-			defer sp.(opentracing.Span).Finish()
-		}
 		err := p.Send(wmsg.Context, wmsg.Msg)
 		if err != nil {
 			log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@@ -129,6 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
 
 	go func() {
 		<-p.quit
+
 		cancel()
 	}()
 	return p
@@ -158,21 +153,15 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
 		spanName += ".retrieval"
 	}
 
-	return p.SendPriority(ctx, msg, priority, spanName)
+	ctx = context.WithValue(ctx, "stream_send_tag", nil)
+	return p.SendPriority(ctx, msg, priority)
 }
 
 // SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
 	defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
+	tracing.StartSaveSpan(ctx)
 	metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
-	if traceId != "" {
-		var sp opentracing.Span
-		ctx, sp = spancontext.StartSpan(
-			ctx,
-			traceId,
-		)
-		p.spans.Store(ctx, sp)
-	}
 	wmsg := WrappedPriorityMsg{
 		Context: ctx,
 		Msg:     msg,
@@ -190,7 +179,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
 	var sp opentracing.Span
 	ctx, sp := spancontext.StartSpan(
 		context.TODO(),
-		"send.offered.hashes")
+		"send.offered.hashes",
+	)
 	defer sp.Finish()
 
 	hashes, from, to, proof, err := s.setNextBatch(f, t)
@@ -215,7 +205,8 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
 		Stream:        s.stream,
 	}
 	log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
-	return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
+	ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
+	return p.SendPriority(ctx, msg, s.priority)
 }
 
 func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 622b46e4c..8e2a5f31a 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -381,7 +381,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
 	}
 	log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
 
-	return peer.SendPriority(context.TODO(), msg, priority, "")
+	return peer.SendPriority(context.TODO(), msg, priority)
 }
 
 func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -757,7 +757,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
 			return err
 		}
 
-		if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
+		if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
 			return err
 		}
 		if c.to > 0 && tp.Takeover.End >= c.to {
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index 202af2bf5..8a44f51a8 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -26,6 +26,9 @@ import (
 
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/swarm/log"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+	"github.com/opentracing/opentracing-go"
+
 	lru "github.com/hashicorp/golang-lru"
 )
 
@@ -208,7 +211,11 @@ func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher
 	// the peers which requested the chunk should not be requested to deliver it.
 	peers := &sync.Map{}
 
-	fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
+	cctx, sp := spancontext.StartSpan(
+		cctx,
+		"netstore.fetcher",
+	)
+	fetcher := newFetcher(sp, ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
 	n.fetchers.Add(key, fetcher)
 
 	return fetcher
@@ -233,15 +240,16 @@ func (n *NetStore) RequestsCacheLen() int {
 // One fetcher object is responsible to fetch one chunk for one address, and keep track of all the
 // peers who have requested it and did not receive it yet.
 type fetcher struct {
-	addr        Address       // address of chunk
-	chunk       Chunk         // fetcher can set the chunk on the fetcher
-	deliveredC  chan struct{} // chan signalling chunk delivery to requests
-	cancelledC  chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore)
-	netFetcher  NetFetcher    // remote fetch function to be called with a request source taken from the context
-	cancel      func()        // cleanup function for the remote fetcher to call when all upstream contexts are called
-	peers       *sync.Map     // the peers which asked for the chunk
-	requestCnt  int32         // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
-	deliverOnce *sync.Once    // guarantees that we only close deliveredC once
+	addr        Address          // address of chunk
+	chunk       Chunk            // fetcher can set the chunk on the fetcher
+	deliveredC  chan struct{}    // chan signalling chunk delivery to requests
+	cancelledC  chan struct{}    // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore)
+	netFetcher  NetFetcher       // remote fetch function to be called with a request source taken from the context
+	cancel      func()           // cleanup function for the remote fetcher to call when all upstream contexts are called
+	peers       *sync.Map        // the peers which asked for the chunk
+	requestCnt  int32            // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
+	deliverOnce *sync.Once       // guarantees that we only close deliveredC once
+	span        opentracing.Span // measure retrieve time per chunk
 }
 
 // newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually
@@ -250,7 +258,7 @@ type fetcher struct {
 //     1. when the chunk has been fetched all peers have been either notified or their context has been done
 //     2. the chunk has not been fetched but all context from all the requests has been done
 // The peers map stores all the peers which have requested chunk.
-func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
+func newFetcher(span opentracing.Span, addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
 	cancelOnce := &sync.Once{} // cancel should only be called once
 	return &fetcher{
 		addr:        addr,
@@ -264,6 +272,7 @@ func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, clo
 			})
 		},
 		peers: peers,
+		span:  span,
 	}
 }
 
@@ -276,6 +285,7 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
 		if atomic.AddInt32(&f.requestCnt, -1) == 0 {
 			f.cancel()
 		}
+		f.span.Finish()
 	}()
 
 	// The peer asking for the chunk. Store in the shared peers map, but delete after the request
diff --git a/swarm/swarm.go b/swarm/swarm.go
index 3ab98b3ab..651ad97c7 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -426,6 +426,7 @@ func (s *Swarm) Start(srv *p2p.Server) error {
 func (s *Swarm) Stop() error {
 	if s.tracerClose != nil {
 		err := s.tracerClose.Close()
+		tracing.FinishSpans()
 		if err != nil {
 			return err
 		}
diff --git a/swarm/tracing/tracing.go b/swarm/tracing/tracing.go
index f95fa41b8..55875464b 100644
--- a/swarm/tracing/tracing.go
+++ b/swarm/tracing/tracing.go
@@ -1,21 +1,39 @@
 package tracing
 
 import (
+	"context"
 	"io"
 	"os"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/swarm/spancontext"
+
+	opentracing "github.com/opentracing/opentracing-go"
 	jaeger "github.com/uber/jaeger-client-go"
 	jaegercfg "github.com/uber/jaeger-client-go/config"
 	cli "gopkg.in/urfave/cli.v1"
 )
 
-var Enabled bool = false
+var (
+	// Enabled turns tracing on for the current swarm instance
+	Enabled bool = false
+	store        = spanStore{}
+)
+
+const (
+	// TracingEnabledFlag is the CLI flag name to use to enable trace collections.
+	TracingEnabledFlag = "tracing"
+
+	// StoreLabelId is the context value key of the name of the span to be saved
+	StoreLabelId = "span_save_id"
 
-// TracingEnabledFlag is the CLI flag name to use to enable trace collections.
-const TracingEnabledFlag = "tracing"
+	// StoreLabelMeta is the context value key that together with StoreLabelId constitutes the retrieval key for saved spans in the span store
+	// StartSaveSpan and ShiftSpanByKey
+	StoreLabelMeta = "span_save_meta"
+)
 
 var (
 	Closer io.Closer
@@ -100,3 +118,56 @@ func initTracer(endpoint, svc string) (closer io.Closer) {
 
 	return closer
 }
+
+// spanStore holds saved spans
+type spanStore struct {
+	spans sync.Map
+}
+
+// StartSaveSpan stores the span specified in the passed context for later retrieval
+// The span object but be context value on the key StoreLabelId.
+// It will be stored under the the following string key context.Value(StoreLabelId)|.|context.Value(StoreLabelMeta)
+func StartSaveSpan(ctx context.Context) context.Context {
+	if !Enabled {
+		return ctx
+	}
+	traceId := ctx.Value(StoreLabelId)
+
+	if traceId != nil {
+		traceStr := traceId.(string)
+		var sp opentracing.Span
+		ctx, sp = spancontext.StartSpan(
+			ctx,
+			traceStr,
+		)
+		traceMeta := ctx.Value(StoreLabelMeta)
+		if traceMeta != nil {
+			traceStr = traceStr + "." + traceMeta.(string)
+		}
+		store.spans.Store(traceStr, sp)
+	}
+	return ctx
+}
+
+// ShiftSpanByKey retrieves the span stored under the key of the string given as argument
+// The span is then deleted from the store
+func ShiftSpanByKey(k string) opentracing.Span {
+	if !Enabled {
+		return nil
+	}
+	span, spanOk := store.spans.Load(k)
+	if !spanOk {
+		return nil
+	}
+	store.spans.Delete(k)
+	return span.(opentracing.Span)
+}
+
+// FinishSpans calls `Finish()` on all stored spans
+// It should be called on instance shutdown
+func FinishSpans() {
+	store.spans.Range(func(_, v interface{}) bool {
+		v.(opentracing.Span).Finish()
+		return true
+	})
+}
-- 
GitLab