From 0c10d376066cb7e57d3bfc03f950c7750cd90640 Mon Sep 17 00:00:00 2001
From: lash <nolash@users.noreply.github.com>
Date: Fri, 8 Feb 2019 16:57:48 +0100
Subject: [PATCH] swarm/network, swarm/storage: Preserve opentracing contexts
 (#19022)

---
 swarm/network/fetcher.go         |  36 +++++-----
 swarm/network/fetcher_test.go    | 113 +++++++++++++++----------------
 swarm/network/stream/delivery.go |   7 +-
 swarm/network/stream/messages.go |   2 +-
 swarm/network/stream/peer.go     |  26 ++++---
 swarm/network/stream/stream.go   |   5 +-
 swarm/storage/chunker.go         |  16 +++--
 swarm/storage/feed/testutil.go   |   4 +-
 swarm/storage/netstore.go        |  16 ++---
 swarm/storage/netstore_test.go   |   4 +-
 10 files changed, 122 insertions(+), 107 deletions(-)

diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
index 3043f6475..6b2175166 100644
--- a/swarm/network/fetcher.go
+++ b/swarm/network/fetcher.go
@@ -52,6 +52,7 @@ type Fetcher struct {
 	requestC         chan uint8      // channel for incoming requests (with the hopCount value in it)
 	searchTimeout    time.Duration
 	skipCheck        bool
+	ctx              context.Context
 }
 
 type Request struct {
@@ -109,14 +110,14 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
 // contain the peers which are actively requesting this chunk, to make sure we
 // don't request back the chunks from them.
 // The created Fetcher is started and returned.
-func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
-	fetcher := NewFetcher(source, f.request, f.skipCheck)
-	go fetcher.run(ctx, peersToSkip)
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
+	fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
+	go fetcher.run(peers)
 	return fetcher
 }
 
 // NewFetcher creates a new Fetcher for the given chunk address using the given request function.
-func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
 	return &Fetcher{
 		addr:             addr,
 		protoRequestFunc: rf,
@@ -124,14 +125,15 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
 		requestC:         make(chan uint8),
 		searchTimeout:    defaultSearchTimeout,
 		skipCheck:        skipCheck,
+		ctx:              ctx,
 	}
 }
 
 // Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
-func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
+func (f *Fetcher) Offer(source *enode.ID) {
 	// First we need to have this select to make sure that we return if context is done
 	select {
-	case <-ctx.Done():
+	case <-f.ctx.Done():
 		return
 	default:
 	}
@@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
 	// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
 	select {
 	case f.offerC <- source:
-	case <-ctx.Done():
+	case <-f.ctx.Done():
 	}
 }
 
 // Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
-func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
+func (f *Fetcher) Request(hopCount uint8) {
 	// First we need to have this select to make sure that we return if context is done
 	select {
-	case <-ctx.Done():
+	case <-f.ctx.Done():
 		return
 	default:
 	}
@@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
 	// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
 	select {
 	case f.requestC <- hopCount + 1:
-	case <-ctx.Done():
+	case <-f.ctx.Done():
 	}
 }
 
 // start prepares the Fetcher
 // it keeps the Fetcher alive within the lifecycle of the passed context
-func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
+func (f *Fetcher) run(peers *sync.Map) {
 	var (
 		doRequest bool             // determines if retrieval is initiated in the current iteration
 		wait      *time.Timer      // timer for search timeout
@@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 			doRequest = requested
 
 			// all Fetcher context closed, can quit
-		case <-ctx.Done():
+		case <-f.ctx.Done():
 			log.Trace("terminate fetcher", "request addr", f.addr)
 			// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
 			return
@@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 		// need to issue a new request
 		if doRequest {
 			var err error
-			sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
+			sources, err = f.doRequest(gone, peers, sources, hopCount)
 			if err != nil {
 				log.Info("unable to request", "request addr", f.addr, "err", err)
 			}
@@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 // * the peer's address is added to the set of peers to skip
 // * the peer's address is removed from prospective sources, and
 // * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
-func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
+func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
 	var i int
 	var sourceID *enode.ID
 	var quit chan struct{}
@@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
 	for i = 0; i < len(sources); i++ {
 		req.Source = sources[i]
 		var err error
-		sourceID, quit, err = f.protoRequestFunc(ctx, req)
+		sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
 		if err == nil {
 			// remove the peer from known sources
 			// Note: we can modify the source although we are looping on it, because we break from the loop immediately
@@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
 	if !foundSource {
 		req.Source = nil
 		var err error
-		sourceID, quit, err = f.protoRequestFunc(ctx, req)
+		sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
 		if err != nil {
 			// if no peers found to request from
 			return sources, err
@@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
 		select {
 		case <-quit:
 			gone <- sourceID
-		case <-ctx.Done():
+		case <-f.ctx.Done():
 		}
 	}()
 	return sources, nil
diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go
index 563c6cece..4e464f10f 100644
--- a/swarm/network/fetcher_test.go
+++ b/swarm/network/fetcher_test.go
@@ -69,7 +69,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
 func TestFetcherSingleRequest(t *testing.T) {
 	requester := newMockRequester()
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
 
 	peers := []string{"a", "b", "c", "d"}
 	peersToSkip := &sync.Map{}
@@ -77,13 +81,9 @@ func TestFetcherSingleRequest(t *testing.T) {
 		peersToSkip.Store(p, time.Now())
 	}
 
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	go fetcher.run(ctx, peersToSkip)
+	go fetcher.run(peersToSkip)
 
-	rctx := context.Background()
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	select {
 	case request := <-requester.requestC:
@@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
 func TestFetcherCancelStopsFetcher(t *testing.T) {
 	requester := newMockRequester()
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
-
-	peersToSkip := &sync.Map{}
 
 	ctx, cancel := context.WithCancel(context.Background())
 
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+	peersToSkip := &sync.Map{}
+
 	// we start the fetcher, and then we immediately cancel the context
-	go fetcher.run(ctx, peersToSkip)
+	go fetcher.run(peersToSkip)
 	cancel()
 
-	rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
-	defer rcancel()
 	// we call Request with an active context
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
 	select {
@@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
 
 // TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
 func TestFetcherCancelStopsRequest(t *testing.T) {
+	t.Skip("since context is now per fetcher, this test is likely redundant")
+
 	requester := newMockRequester(100 * time.Millisecond)
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
-
-	peersToSkip := &sync.Map{}
 
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	// we start the fetcher with an active context
-	go fetcher.run(ctx, peersToSkip)
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
 
-	rctx, rcancel := context.WithCancel(context.Background())
-	rcancel()
+	peersToSkip := &sync.Map{}
+
+	// we start the fetcher with an active context
+	go fetcher.run(peersToSkip)
 
 	// we call Request with a cancelled context
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
 	select {
@@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
 	}
 
 	// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
-	rctx = context.Background()
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	select {
 	case <-requester.requestC:
@@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
 func TestFetcherOfferUsesSource(t *testing.T) {
 	requester := newMockRequester(100 * time.Millisecond)
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
-
-	peersToSkip := &sync.Map{}
 
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+	peersToSkip := &sync.Map{}
+
 	// start the fetcher
-	go fetcher.run(ctx, peersToSkip)
+	go fetcher.run(peersToSkip)
 
-	rctx := context.Background()
 	// call the Offer function with the source peer
-	fetcher.Offer(rctx, &sourcePeerID)
+	fetcher.Offer(&sourcePeerID)
 
 	// fetcher should not initiate request
 	select {
@@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
 	}
 
 	// call Request after the Offer
-	rctx = context.Background()
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	// there should be exactly 1 request coming from fetcher
 	var request *Request
@@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
 func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
 	requester := newMockRequester(100 * time.Millisecond)
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
-
-	peersToSkip := &sync.Map{}
 
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+	peersToSkip := &sync.Map{}
+
 	// start the fetcher
-	go fetcher.run(ctx, peersToSkip)
+	go fetcher.run(peersToSkip)
 
 	// call Request first
-	rctx := context.Background()
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	// there should be a request coming from fetcher
 	var request *Request
@@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
 	}
 
 	// after the Request call Offer
-	fetcher.Offer(context.Background(), &sourcePeerID)
+	fetcher.Offer(&sourcePeerID)
 
 	// there should be a request coming from fetcher
 	select {
@@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
 func TestFetcherRetryOnTimeout(t *testing.T) {
 	requester := newMockRequester()
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
 	// set searchTimeOut to low value so the test is quicker
 	fetcher.searchTimeout = 250 * time.Millisecond
 
 	peersToSkip := &sync.Map{}
 
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
 	// start the fetcher
-	go fetcher.run(ctx, peersToSkip)
+	go fetcher.run(peersToSkip)
 
 	// call the fetch function with an active context
-	rctx := context.Background()
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	// after 100ms the first request should be initiated
 	time.Sleep(100 * time.Millisecond)
@@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {
 
 	fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
 
-	fetcher.Request(context.Background(), 0)
+	fetcher.Request(0)
 
 	// check if the created fetchFunction really starts a fetcher and initiates a request
 	select {
@@ -353,7 +350,11 @@ func TestFetcherFactory(t *testing.T) {
 func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
 	requester := newMockRequester()
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
 
 	// make sure the searchTimeout is long so it is sure the request is not
 	// retried because of timeout
@@ -361,13 +362,9 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
 
 	peersToSkip := &sync.Map{}
 
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	go fetcher.run(ctx, peersToSkip)
+	go fetcher.run(peersToSkip)
 
-	rctx := context.Background()
-	fetcher.Request(rctx, 0)
+	fetcher.Request(0)
 
 	select {
 	case <-requester.requestC:
@@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
 func TestFetcherMaxHopCount(t *testing.T) {
 	requester := newMockRequester()
 	addr := make([]byte, 32)
-	fetcher := NewFetcher(addr, requester.doRequest, true)
 
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	peersToSkip := &sync.Map{}
+	fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
 
-	go fetcher.run(ctx, peersToSkip)
+	peersToSkip := &sync.Map{}
 
-	rctx := context.Background()
-	fetcher.Request(rctx, maxHopCount)
+	go fetcher.run(peersToSkip)
 
 	// if hopCount is already at max no request should be initiated
 	select {
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 988afcce8..fae6994f0 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 	ctx, osp = spancontext.StartSpan(
 		ctx,
 		"retrieve.request")
-	defer osp.Finish()
 
 	s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
 	if err != nil {
@@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 	}()
 
 	go func() {
+		defer osp.Finish()
 		chunk, err := d.chunkStore.Get(ctx, req.Addr)
 		if err != nil {
 			retrieveChunkFail.Inc(1)
@@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
 	ctx, osp = spancontext.StartSpan(
 		ctx,
 		"chunk.delivery")
-	defer osp.Finish()
 
 	processReceivedChunksCount.Inc(1)
 
 	go func() {
+		defer osp.Finish()
+
 		req.peer = sp
 		err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
 		if err != nil {
@@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
 		Addr:      req.Addr,
 		SkipCheck: req.SkipCheck,
 		HopCount:  req.HopCount,
-	}, Top)
+	}, Top, "request.from.peers")
 	if err != nil {
 		return nil, nil, err
 	}
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc..de4e8a3bb 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
 			return
 		}
 		log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
-		err := p.SendPriority(ctx, msg, c.priority)
+		err := p.SendPriority(ctx, msg, c.priority, "")
 		if err != nil {
 			log.Warn("SendPriority error", "err", err)
 		}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 4bccf56f5..68da8f44a 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -65,6 +65,7 @@ type Peer struct {
 	// on creating a new client in offered hashes handler.
 	clientParams map[Stream]*clientParams
 	quit         chan struct{}
+	spans        sync.Map
 }
 
 type WrappedPriorityMsg struct {
@@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
 		clients:      make(map[Stream]*client),
 		clientParams: make(map[Stream]*clientParams),
 		quit:         make(chan struct{}),
+		spans:        sync.Map{},
 	}
 	ctx, cancel := context.WithCancel(context.Background())
 	go p.pq.Run(ctx, func(i interface{}) {
 		wmsg := i.(WrappedPriorityMsg)
+		defer p.spans.Delete(wmsg.Context)
+		sp, ok := p.spans.Load(wmsg.Context)
+		if ok {
+			defer sp.(opentracing.Span).Finish()
+		}
 		err := p.Send(wmsg.Context, wmsg.Msg)
 		if err != nil {
 			log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
 // Deliver sends a storeRequestMsg protocol message to the peer
 // Depending on the `syncing` parameter we send different message types
 func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
-	var sp opentracing.Span
 	var msg interface{}
 
 	spanName := "send.chunk.delivery"
@@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
 		}
 		spanName += ".retrieval"
 	}
-	ctx, sp = spancontext.StartSpan(
-		ctx,
-		spanName)
-	defer sp.Finish()
 
-	return p.SendPriority(ctx, msg, priority)
+	return p.SendPriority(ctx, msg, priority, spanName)
 }
 
 // SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
 	defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
 	metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
+	if traceId != "" {
+		var sp opentracing.Span
+		ctx, sp = spancontext.StartSpan(
+			ctx,
+			traceId,
+		)
+		p.spans.Store(ctx, sp)
+	}
 	wmsg := WrappedPriorityMsg{
 		Context: ctx,
 		Msg:     msg,
@@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
 		Stream:        s.stream,
 	}
 	log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
-	return p.SendPriority(ctx, msg, s.priority)
+	return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
 }
 
 func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 485a69ea2..5d3e23eb1 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
 	}
 	log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
 
-	return peer.SendPriority(context.TODO(), msg, priority)
+	return peer.SendPriority(context.TODO(), msg, priority, "")
 }
 
 func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -729,7 +729,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
 		if err != nil {
 			return err
 		}
-		if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+
+		if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
 			return err
 		}
 		if c.to > 0 && tp.Takeover.End >= c.to {
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index a8bfe2d1c..0fa5026dc 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -465,7 +465,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
 		length *= r.chunkSize
 	}
 	wg.Add(1)
-	go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
+	go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
 	go func() {
 		wg.Wait()
 		close(errC)
@@ -485,7 +485,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
 	return len(b), nil
 }
 
-func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
+func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
 	defer parentWg.Done()
 	// find appropriate block level
 	for chunkData.Size() < uint64(treeSize) && depth > r.depth {
@@ -533,7 +533,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
 		go func(j int64) {
 			childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
 			startTime := time.Now()
-			chunkData, err := r.getter.Get(r.ctx, Reference(childAddress))
+			chunkData, err := r.getter.Get(ctx, Reference(childAddress))
 			if err != nil {
 				metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
 				log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
@@ -554,7 +554,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
 			if soff < off {
 				soff = off
 			}
-			r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
+			r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
 		}(i)
 	} //for
 }
@@ -581,6 +581,11 @@ var errWhence = errors.New("Seek: invalid whence")
 var errOffset = errors.New("Seek: invalid offset")
 
 func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
+	cctx, sp := spancontext.StartSpan(
+		r.ctx,
+		"lcr.seek")
+	defer sp.Finish()
+
 	log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset)
 	switch whence {
 	default:
@@ -590,8 +595,9 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
 	case 1:
 		offset += r.off
 	case 2:
+
 		if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first
-			_, err := r.Size(context.TODO(), nil)
+			_, err := r.Size(cctx, nil)
 			if err != nil {
 				return 0, fmt.Errorf("can't get size: %v", err)
 			}
diff --git a/swarm/storage/feed/testutil.go b/swarm/storage/feed/testutil.go
index b513fa1f2..caa39d9ff 100644
--- a/swarm/storage/feed/testutil.go
+++ b/swarm/storage/feed/testutil.go
@@ -40,9 +40,9 @@ func (t *TestHandler) Close() {
 
 type mockNetFetcher struct{}
 
-func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
+func (m *mockNetFetcher) Request(hopCount uint8) {
 }
-func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
+func (m *mockNetFetcher) Offer(source *enode.ID) {
 }
 
 func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher {
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index b24d08bc2..a2595d9fa 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -34,8 +34,8 @@ type (
 )
 
 type NetFetcher interface {
-	Request(ctx context.Context, hopCount uint8)
-	Offer(ctx context.Context, source *enode.ID)
+	Request(hopCount uint8)
+	Offer(source *enode.ID)
 }
 
 // NetStore is an extension of local storage
@@ -150,7 +150,7 @@ func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Co
 		}
 		// The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
 		// if it doesn't exist yet
-		f := n.getOrCreateFetcher(ref)
+		f := n.getOrCreateFetcher(ctx, ref)
 		// If the caller needs the chunk, it has to use the returned fetch function to get it
 		return nil, f.Fetch, nil
 	}
@@ -168,7 +168,7 @@ func (n *NetStore) Has(ctx context.Context, ref Address) bool {
 // getOrCreateFetcher attempts at retrieving an existing fetchers
 // if none exists, creates one and saves it in the fetchers cache
 // caller must hold the lock
-func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
+func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher {
 	if f := n.getFetcher(ref); f != nil {
 		return f
 	}
@@ -176,7 +176,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
 	// no fetcher for the given address, we have to create a new one
 	key := hex.EncodeToString(ref)
 	// create the context during which fetching is kept alive
-	ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout)
+	cctx, cancel := context.WithTimeout(ctx, fetcherTimeout)
 	// destroy is called when all requests finish
 	destroy := func() {
 		// remove fetcher from fetchers
@@ -190,7 +190,7 @@ func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
 	// the peers which requested the chunk should not be requested to deliver it.
 	peers := &sync.Map{}
 
-	fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC)
+	fetcher := newFetcher(ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
 	n.fetchers.Add(key, fetcher)
 
 	return fetcher
@@ -278,9 +278,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
 		if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
 			return nil, err
 		}
-		f.netFetcher.Offer(rctx, &source)
+		f.netFetcher.Offer(&source)
 	} else {
-		f.netFetcher.Request(rctx, hopCount)
+		f.netFetcher.Request(hopCount)
 	}
 
 	// wait until either the chunk is delivered or the context is done
diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go
index a6a9f551a..88ec6c28f 100644
--- a/swarm/storage/netstore_test.go
+++ b/swarm/storage/netstore_test.go
@@ -46,12 +46,12 @@ type mockNetFetcher struct {
 	mu              sync.Mutex
 }
 
-func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
+func (m *mockNetFetcher) Offer(source *enode.ID) {
 	m.offerCalled = true
 	m.sources = append(m.sources, source)
 }
 
-func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
+func (m *mockNetFetcher) Request(hopCount uint8) {
 	m.mu.Lock()
 	defer m.mu.Unlock()
 
-- 
GitLab