From 3fd6db2bf63ce90232de445c7f33943406a5e634 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= <janos@users.noreply.github.com>
Date: Wed, 13 Feb 2019 13:03:23 +0100
Subject: [PATCH] swarm: fix network/stream data races (#19051)

* swarm/network/stream: newStreamerTester cleanup only if err is nil

* swarm/network/stream: raise newStreamerTester waitForPeers timeout

* swarm/network/stream: fix data races in GetPeerSubscriptions

* swarm/storage: prevent data race on LDBStore.batchesC

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461775049

* swarm/network/stream: fix TestGetSubscriptionsRPC data race

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461768477

* swarm/network/stream: correctly use Simulation.Run callback

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-461783804

* swarm/network: protect addrCountC in Kademlia.AddrCountC function

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462273444

* p2p/simulations: fix a deadlock calling getRandomNode with lock

https://github.com/ethersphere/go-ethereum/issues/1198#issuecomment-462317407

* swarm/network/stream: terminate disconnect goruotines in tests

* swarm/network/stream: reduce memory consumption when testing data races

* swarm/network/stream: add watchDisconnections helper function

* swarm/network/stream: add concurrent counter for tests

* swarm/network/stream: rename race/norace test files and use const

* swarm/network/stream: remove watchSim and its panic

* swarm/network/stream: pass context in watchDisconnections

* swarm/network/stream: add concurrent safe bool for watchDisconnections

* swarm/storage: fix LDBStore.batchesC data race by not closing it
---
 p2p/simulations/network.go                    |  2 +-
 swarm/network/kademlia.go                     |  3 +
 swarm/network/stream/common_test.go           | 62 ++++++++++++++-
 swarm/network/stream/delivery_test.go         | 75 +++++--------------
 swarm/network/stream/intervals_test.go        | 30 ++------
 swarm/network/stream/lightnode_test.go        |  8 +-
 swarm/network/stream/norace_test.go           | 24 ++++++
 swarm/network/stream/race_test.go             | 23 ++++++
 swarm/network/stream/snapshot_sync_test.go    | 58 ++++++++------
 swarm/network/stream/stream.go                |  6 ++
 swarm/network/stream/streamer_test.go         | 70 +++++++++++------
 swarm/network/stream/syncer_test.go           | 65 +++++++++-------
 .../visualized_snapshot_sync_sim_test.go      | 44 +++--------
 swarm/storage/ldbstore.go                     |  1 -
 14 files changed, 274 insertions(+), 197 deletions(-)
 create mode 100644 swarm/network/stream/norace_test.go
 create mode 100644 swarm/network/stream/race_test.go

diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go
index c2a3b9647..d99633c9d 100644
--- a/p2p/simulations/network.go
+++ b/p2p/simulations/network.go
@@ -461,7 +461,7 @@ func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
 	if l == 0 {
 		return nil
 	}
-	return net.GetNode(filtered[rand.Intn(l)])
+	return net.getNode(filtered[rand.Intn(l)])
 }
 
 func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index 9f4245603..1193e3b65 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -353,6 +353,9 @@ func (k *Kademlia) sendNeighbourhoodDepthChange() {
 // Not receiving from the returned channel will block Register function
 // when address count value changes.
 func (k *Kademlia) AddrCountC() <-chan int {
+	k.lock.Lock()
+	defer k.lock.Unlock()
+
 	if k.addrCountC == nil {
 		k.addrCountC = make(chan int)
 	}
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 8a7d851fb..afd08d275 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
 	// temp datadir
 	datadir, err := ioutil.TempDir("", "streamer")
 	if err != nil {
-		return nil, nil, nil, func() {}, err
+		return nil, nil, nil, nil, err
 	}
 	removeDataDir := func() {
 		os.RemoveAll(datadir)
@@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
 
 	localStore, err := storage.NewTestLocalStoreForAddr(params)
 	if err != nil {
-		return nil, nil, nil, removeDataDir, err
+		removeDataDir()
+		return nil, nil, nil, nil, err
 	}
 
 	netStore, err := storage.NewNetStore(localStore, nil)
 	if err != nil {
-		return nil, nil, nil, removeDataDir, err
+		removeDataDir()
+		return nil, nil, nil, nil, err
 	}
 
 	delivery := NewDelivery(to, netStore)
@@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
 	}
 	protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)
 
-	err = waitForPeers(streamer, 1*time.Second, 1)
+	err = waitForPeers(streamer, 10*time.Second, 1)
 	if err != nil {
+		teardown()
 		return nil, nil, nil, nil, errors.New("timeout: peer is not created")
 	}
 
@@ -317,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
 	}
 	return store, datadir, nil
 }
+
+// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
+// disconnected to true in case of a disconnect event.
+func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
+	log.Debug("Watching for disconnections")
+	disconnections := sim.PeerEvents(
+		ctx,
+		sim.NodeIDs(),
+		simulation.NewPeerEventsFilter().Drop(),
+	)
+	disconnected = new(boolean)
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case d := <-disconnections:
+				if d.Error != nil {
+					log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
+				} else {
+					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
+				}
+				disconnected.set(true)
+			}
+		}
+	}()
+	return disconnected
+}
+
+// boolean is used to concurrently set
+// and read a boolean value.
+type boolean struct {
+	v  bool
+	mu sync.RWMutex
+}
+
+// set sets the value.
+func (b *boolean) set(v bool) {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+
+	b.v = v
+}
+
+// bool reads the value.
+func (b *boolean) bool() bool {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+
+	return b.v
+}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index cb7690f3e..e5821df4f 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -22,7 +22,6 @@ import (
 	"errors"
 	"fmt"
 	"sync"
-	"sync/atomic"
 	"testing"
 	"time"
 
@@ -48,10 +47,10 @@ func TestStreamerRetrieveRequest(t *testing.T) {
 		Syncing:   SyncingDisabled,
 	}
 	tester, streamer, _, teardown, err := newStreamerTester(regOpts)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
@@ -100,10 +99,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
 		Retrieval: RetrievalEnabled,
 		Syncing:   SyncingDisabled, //do no syncing
 	})
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
@@ -172,10 +171,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
 		Retrieval: RetrievalEnabled,
 		Syncing:   SyncingDisabled,
 	})
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
@@ -362,10 +361,10 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
 		Retrieval: RetrievalDisabled,
 		Syncing:   SyncingDisabled,
 	})
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
 		return &testClient{
@@ -485,7 +484,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
 	}
 
 	log.Info("Starting simulation")
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
 		nodeIDs := sim.UpNodeIDs()
 		//determine the pivot node to be the first node of the simulation
@@ -548,27 +548,10 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
 			retErrC <- err
 		}()
 
-		log.Debug("Watching for disconnections")
-		disconnections := sim.PeerEvents(
-			context.Background(),
-			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Drop(),
-		)
-
-		var disconnected atomic.Value
-		go func() {
-			for d := range disconnections {
-				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
-					disconnected.Store(true)
-				}
-			}
-		}()
+		disconnected := watchDisconnections(ctx, sim)
 		defer func() {
-			if err != nil {
-				if yes, ok := disconnected.Load().(bool); ok && yes {
-					err = errors.New("disconnect events received")
-				}
+			if err != nil && disconnected.bool() {
+				err = errors.New("disconnect events received")
 			}
 		}()
 
@@ -589,7 +572,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
 			return fmt.Errorf("Test failed, chunks not available on all nodes")
 		}
 		if err := <-retErrC; err != nil {
-			t.Fatalf("requesting chunks: %v", err)
+			return fmt.Errorf("requesting chunks: %v", err)
 		}
 		log.Debug("Test terminated successfully")
 		return nil
@@ -657,21 +640,22 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
 		b.Fatal(err)
 	}
 
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
 		nodeIDs := sim.UpNodeIDs()
 		node := nodeIDs[len(nodeIDs)-1]
 
 		item, ok := sim.NodeItem(node, bucketKeyFileStore)
 		if !ok {
-			b.Fatal("No filestore")
+			return errors.New("No filestore")
 		}
 		remoteFileStore := item.(*storage.FileStore)
 
 		pivotNode := nodeIDs[0]
 		item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
 		if !ok {
-			b.Fatal("No filestore")
+			return errors.New("No filestore")
 		}
 		netStore := item.(*storage.NetStore)
 
@@ -679,26 +663,10 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
 			return err
 		}
 
-		disconnections := sim.PeerEvents(
-			context.Background(),
-			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Drop(),
-		)
-
-		var disconnected atomic.Value
-		go func() {
-			for d := range disconnections {
-				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
-					disconnected.Store(true)
-				}
-			}
-		}()
+		disconnected := watchDisconnections(ctx, sim)
 		defer func() {
-			if err != nil {
-				if yes, ok := disconnected.Load().(bool); ok && yes {
-					err = errors.New("disconnect events received")
-				}
+			if err != nil && disconnected.bool() {
+				err = errors.New("disconnect events received")
 			}
 		}()
 		// benchmark loop
@@ -713,12 +681,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
 				ctx := context.TODO()
 				hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
 				if err != nil {
-					b.Fatalf("expected no error. got %v", err)
+					return fmt.Errorf("store: %v", err)
 				}
 				// wait until all chunks stored
 				err = wait(ctx)
 				if err != nil {
-					b.Fatalf("expected no error. got %v", err)
+					return fmt.Errorf("wait store: %v", err)
 				}
 				// collect the hashes
 				hashes[i] = hash
@@ -754,10 +722,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
 				break Loop
 			}
 		}
-		if err != nil {
-			b.Fatal(err)
-		}
-		return nil
+		return err
 	})
 	if result.Error != nil {
 		b.Fatal(result.Error)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 248ba0c84..009a941ef 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -22,7 +22,6 @@ import (
 	"errors"
 	"fmt"
 	"sync"
-	"sync/atomic"
 	"testing"
 	"time"
 
@@ -118,13 +117,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
 
 		_, wait, err := fileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
 		if err != nil {
-			log.Error("Store error: %v", "err", err)
-			t.Fatal(err)
+			return fmt.Errorf("store: %v", err)
 		}
 		err = wait(ctx)
 		if err != nil {
-			log.Error("Wait error: %v", "err", err)
-			t.Fatal(err)
+			return fmt.Errorf("wait store: %v", err)
 		}
 
 		item, ok = sim.NodeItem(checker, bucketKeyRegistry)
@@ -136,32 +133,15 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
 		liveErrC := make(chan error)
 		historyErrC := make(chan error)
 
-		log.Debug("Watching for disconnections")
-		disconnections := sim.PeerEvents(
-			context.Background(),
-			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Drop(),
-		)
-
 		err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
 		if err != nil {
 			return err
 		}
 
-		var disconnected atomic.Value
-		go func() {
-			for d := range disconnections {
-				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
-					disconnected.Store(true)
-				}
-			}
-		}()
+		disconnected := watchDisconnections(ctx, sim)
 		defer func() {
-			if err != nil {
-				if yes, ok := disconnected.Load().(bool); ok && yes {
-					err = errors.New("disconnect events received")
-				}
+			if err != nil && disconnected.bool() {
+				err = errors.New("disconnect events received")
 			}
 		}()
 
diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go
index 043f2ea77..501660fab 100644
--- a/swarm/network/stream/lightnode_test.go
+++ b/swarm/network/stream/lightnode_test.go
@@ -29,10 +29,10 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
 		Syncing:   SyncingDisabled,
 	}
 	tester, _, _, teardown, err := newStreamerTester(registryOptions)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
@@ -68,10 +68,10 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
 		Syncing:   SyncingDisabled,
 	}
 	tester, _, _, teardown, err := newStreamerTester(registryOptions)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
@@ -112,10 +112,10 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
 		Syncing:   SyncingRegisterOnly,
 	}
 	tester, _, _, teardown, err := newStreamerTester(registryOptions)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
@@ -157,10 +157,10 @@ func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
 		Syncing:   SyncingDisabled,
 	}
 	tester, _, _, teardown, err := newStreamerTester(registryOptions)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	node := tester.Nodes[0]
 
diff --git a/swarm/network/stream/norace_test.go b/swarm/network/stream/norace_test.go
new file mode 100644
index 000000000..b324f6939
--- /dev/null
+++ b/swarm/network/stream/norace_test.go
@@ -0,0 +1,24 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !race
+
+package stream
+
+// Provide a flag to reduce the scope of tests when running them
+// with race detector. Some of the tests are doing a lot of allocations
+// on the heap, and race detector uses much more memory to track them.
+const raceTest = false
diff --git a/swarm/network/stream/race_test.go b/swarm/network/stream/race_test.go
new file mode 100644
index 000000000..8aed3542b
--- /dev/null
+++ b/swarm/network/stream/race_test.go
@@ -0,0 +1,23 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build race
+
+package stream
+
+// Reduce the scope of some tests when running with race detector,
+// as it raises the memory consumption significantly.
+const raceTest = true
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index c32ed7d07..b45d0aed5 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -17,11 +17,12 @@ package stream
 
 import (
 	"context"
+	"errors"
 	"fmt"
+	"io/ioutil"
 	"os"
 	"runtime"
 	"sync"
-	"sync/atomic"
 	"testing"
 	"time"
 
@@ -92,6 +93,15 @@ func TestSyncingViaGlobalSync(t *testing.T) {
 		if *longrunning {
 			chnkCnt = []int{1, 8, 32, 256, 1024}
 			nodeCnt = []int{16, 32, 64, 128, 256}
+		} else if raceTest {
+			// TestSyncingViaGlobalSync allocates a lot of memory
+			// with race detector. By reducing the number of chunks
+			// and nodes, memory consumption is lower and data races
+			// are still checked, while correctness of syncing is
+			// tested with more chunks and nodes in regular (!race)
+			// tests.
+			chnkCnt = []int{4}
+			nodeCnt = []int{16}
 		} else {
 			//default test
 			chnkCnt = []int{4, 32}
@@ -113,7 +123,23 @@ var simServiceMap = map[string]simulation.ServiceFunc{
 			return nil, nil, err
 		}
 
-		r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+		var dir string
+		var store *state.DBStore
+		if raceTest {
+			// Use on-disk DBStore to reduce memory consumption in race tests.
+			dir, err = ioutil.TempDir("", "swarm-stream-")
+			if err != nil {
+				return nil, nil, err
+			}
+			store, err = state.NewDBStore(dir)
+			if err != nil {
+				return nil, nil, err
+			}
+		} else {
+			store = state.NewInmemoryStore()
+		}
+
+		r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
 			Retrieval:       RetrievalDisabled,
 			Syncing:         SyncingAutoSubscribe,
 			SyncUpdateDelay: 3 * time.Second,
@@ -156,36 +182,24 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 		t.Fatal(err)
 	}
 
-	disconnections := sim.PeerEvents(
-		context.Background(),
-		sim.NodeIDs(),
-		simulation.NewPeerEventsFilter().Drop(),
-	)
-
-	var disconnected atomic.Value
-	go func() {
-		for d := range disconnections {
-			if d.Error != nil {
-				log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
-				disconnected.Store(true)
-			}
-		}
-	}()
-
 	result := runSim(conf, ctx, sim, chunkCount)
 
 	if result.Error != nil {
 		t.Fatal(result.Error)
 	}
-	if yes, ok := disconnected.Load().(bool); ok && yes {
-		t.Fatal("disconnect events received")
-	}
 	log.Info("Simulation ended")
 }
 
 func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {
 
-	return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+	return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+		disconnected := watchDisconnections(ctx, sim)
+		defer func() {
+			if err != nil && disconnected.bool() {
+				err = errors.New("disconnect events received")
+			}
+		}()
+
 		nodeIDs := sim.UpNodeIDs()
 		for _, n := range nodeIDs {
 			//get the kademlia overlay address from this ID
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 5d3e23eb1..65bcce8b9 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -939,16 +939,22 @@ It returns a map of node IDs with an array of string representations of Stream o
 func (api *API) GetPeerSubscriptions() map[string][]string {
 	//create the empty map
 	pstreams := make(map[string][]string)
+
 	//iterate all streamer peers
+	api.streamer.peersMu.RLock()
+	defer api.streamer.peersMu.RUnlock()
+
 	for id, p := range api.streamer.peers {
 		var streams []string
 		//every peer has a map of stream servers
 		//every stream server represents a subscription
+		p.serverMu.RLock()
 		for s := range p.servers {
 			//append the string representation of the stream
 			//to the list for this peer
 			streams = append(streams, s.String())
 		}
+		p.serverMu.RUnlock()
 		//set the array of stream servers to the map
 		pstreams[id.String()] = streams
 	}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index c2aee61b7..e92ee3783 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -41,10 +41,10 @@ import (
 
 func TestStreamerSubscribe(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", true)
 	err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
@@ -55,10 +55,10 @@ func TestStreamerSubscribe(t *testing.T) {
 
 func TestStreamerRequestSubscription(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", false)
 	err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
@@ -146,10 +146,10 @@ func (self *testServer) Close() {
 
 func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
 		return newTestClient(t), nil
@@ -239,10 +239,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
 
 func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", false)
 
@@ -306,10 +306,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
 
 func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", true)
 
@@ -372,10 +372,10 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
 
 func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 		return newTestServer(t, 0), nil
@@ -416,10 +416,10 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
 
 func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", true)
 
@@ -479,10 +479,10 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
 
 func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", true)
 
@@ -544,10 +544,10 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
 
 func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	stream := NewStream("foo", "", true)
 
@@ -643,10 +643,10 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
 
 func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(nil)
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 		return newTestServer(t, 10), nil
@@ -780,10 +780,10 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
 		Syncing:        SyncingDisabled,
 		MaxPeerServers: maxPeerServers,
 	})
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 		return newTestServer(t, 0), nil
@@ -854,10 +854,10 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
 	tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
 		MaxPeerServers: maxPeerServers,
 	})
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
 		return newTestServer(t, 0), nil
@@ -940,10 +940,10 @@ func TestHasPriceImplementation(t *testing.T) {
 		Retrieval: RetrievalDisabled,
 		Syncing:   SyncingDisabled,
 	})
-	defer teardown()
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer teardown()
 
 	if r.prices == nil {
 		t.Fatal("No prices implementation available for the stream protocol")
@@ -1177,6 +1177,7 @@ starts the simulation, waits for SyncUpdateDelay in order to kick off
 stream registration, then tests that there are subscriptions.
 */
 func TestGetSubscriptionsRPC(t *testing.T) {
+
 	// arbitrarily set to 4
 	nodeCount := 4
 	// run with more nodes if `longrunning` flag is set
@@ -1188,19 +1189,16 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 	// holds the msg code for SubscribeMsg
 	var subscribeMsgCode uint64
 	var ok bool
-	var expectedMsgCount = 0
+	var expectedMsgCount counter
 
 	// this channel signalizes that the expected amount of subscriptiosn is done
 	allSubscriptionsDone := make(chan struct{})
-	lock := sync.RWMutex{}
 	// after the test, we need to reset the subscriptionFunc to the default
 	defer func() { subscriptionFunc = doRequestSubscription }()
 
 	// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
 	subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
-		lock.Lock()
-		expectedMsgCount++
-		lock.Unlock()
+		expectedMsgCount.inc()
 		doRequestSubscription(r, p, bin, subs)
 		return true
 	}
@@ -1290,24 +1288,24 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 		select {
 		case <-allSubscriptionsDone:
 		case <-ctx.Done():
-			t.Fatal("Context timed out")
+			return errors.New("Context timed out")
 		}
 
-		log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount)
+		log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
 		//now iterate again, this time we call each node via RPC to get its subscriptions
 		realCount := 0
 		for _, node := range nodes {
 			//create rpc client
 			client, err := node.Client()
 			if err != nil {
-				t.Fatalf("create node 1 rpc client fail: %v", err)
+				return fmt.Errorf("create node 1 rpc client fail: %v", err)
 			}
 
 			//ask it for subscriptions
 			pstreams := make(map[string][]string)
 			err = client.Call(&pstreams, "stream_getPeerSubscriptions")
 			if err != nil {
-				t.Fatal(err)
+				return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
 			}
 			//length of the subscriptions can not be smaller than number of peers
 			log.Debug("node subscriptions", "node", node.String())
@@ -1324,8 +1322,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 			}
 		}
 		// every node is mutually subscribed to each other, so the actual count is half of it
-		if realCount/2 != expectedMsgCount {
-			return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, expectedMsgCount)
+		emc := expectedMsgCount.count()
+		if realCount/2 != emc {
+			return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
 		}
 		return nil
 	})
@@ -1333,3 +1332,26 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 		t.Fatal(result.Error)
 	}
 }
+
+// counter is used to concurrently increment
+// and read an integer value.
+type counter struct {
+	v  int
+	mu sync.RWMutex
+}
+
+// Increment the counter.
+func (c *counter) inc() {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	c.v++
+}
+
+// Read the counter value.
+func (c *counter) count() int {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	return c.v
+}
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index 5656963d9..be0752a9d 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -22,8 +22,8 @@ import (
 	"fmt"
 	"io/ioutil"
 	"math"
+	"os"
 	"sync"
-	"sync/atomic"
 	"testing"
 	"time"
 
@@ -44,9 +44,15 @@ const dataChunkCount = 200
 
 func TestSyncerSimulation(t *testing.T) {
 	testSyncBetweenNodes(t, 2, dataChunkCount, true, 1)
-	testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
-	testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
-	testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
+	// This test uses much more memory when running with
+	// race detector. Allow it to finish successfully by
+	// reducing its scope, and still check for data races
+	// with the smallest number of nodes.
+	if !raceTest {
+		testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
+		testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
+		testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
+	}
 }
 
 func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
@@ -80,7 +86,23 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 				return nil, nil, err
 			}
 
-			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+			var dir string
+			var store *state.DBStore
+			if raceTest {
+				// Use on-disk DBStore to reduce memory consumption in race tests.
+				dir, err = ioutil.TempDir("", "swarm-stream-")
+				if err != nil {
+					return nil, nil, err
+				}
+				store, err = state.NewDBStore(dir)
+				if err != nil {
+					return nil, nil, err
+				}
+			} else {
+				store = state.NewInmemoryStore()
+			}
+
+			r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
 				Retrieval: RetrievalDisabled,
 				Syncing:   SyncingAutoSubscribe,
 				SkipCheck: skipCheck,
@@ -89,6 +111,9 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 			cleanup = func() {
 				r.Close()
 				clean()
+				if dir != "" {
+					os.RemoveAll(dir)
+				}
 			}
 
 			return r, cleanup, nil
@@ -114,26 +139,10 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 			nodeIndex[id] = i
 		}
 
-		disconnections := sim.PeerEvents(
-			context.Background(),
-			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Drop(),
-		)
-
-		var disconnected atomic.Value
-		go func() {
-			for d := range disconnections {
-				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
-					disconnected.Store(true)
-				}
-			}
-		}()
+		disconnected := watchDisconnections(ctx, sim)
 		defer func() {
-			if err != nil {
-				if yes, ok := disconnected.Load().(bool); ok && yes {
-					err = errors.New("disconnect events received")
-				}
+			if err != nil && disconnected.bool() {
+				err = errors.New("disconnect events received")
 			}
 		}()
 
@@ -142,7 +151,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 			id := nodeIDs[j]
 			client, err := sim.Net.GetNode(id).Client()
 			if err != nil {
-				t.Fatal(err)
+				return fmt.Errorf("node %s client: %v", id, err)
 			}
 			sid := nodeIDs[j+1]
 			client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
@@ -158,7 +167,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 				size := chunkCount * chunkSize
 				_, wait, err := fileStore.Store(ctx, testutil.RandomReader(j, size), int64(size), false)
 				if err != nil {
-					t.Fatal(err.Error())
+					return fmt.Errorf("fileStore.Store: %v", err)
 				}
 				wait(ctx)
 			}
@@ -273,7 +282,7 @@ func TestSameVersionID(t *testing.T) {
 
 		//the peers should connect, thus getting the peer should not return nil
 		if registry.getPeer(nodes[1]) == nil {
-			t.Fatal("Expected the peer to not be nil, but it is")
+			return errors.New("Expected the peer to not be nil, but it is")
 		}
 		return nil
 	})
@@ -338,7 +347,7 @@ func TestDifferentVersionID(t *testing.T) {
 
 		//getting the other peer should fail due to the different version numbers
 		if registry.getPeer(nodes[1]) != nil {
-			t.Fatal("Expected the peer to be nil, but it is not")
+			return errors.New("Expected the peer to be nil, but it is not")
 		}
 		return nil
 	})
diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
index 3694dd311..2e091f991 100644
--- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go
+++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
@@ -66,31 +66,6 @@ func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulati
 	return nodeCount, chunkCount, sim
 }
 
-//watch for disconnections and wait for healthy
-func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) {
-	ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
-
-	if _, err := sim.WaitTillHealthy(ctx); err != nil {
-		panic(err)
-	}
-
-	disconnections := sim.PeerEvents(
-		context.Background(),
-		sim.NodeIDs(),
-		simulation.NewPeerEventsFilter().Drop(),
-	)
-
-	go func() {
-		for d := range disconnections {
-			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
-			panic("unexpected disconnect")
-			cancelSimRun()
-		}
-	}()
-
-	return ctx, cancelSimRun
-}
-
 //This test requests bogus hashes into the network
 func TestNonExistingHashesWithServer(t *testing.T) {
 
@@ -102,19 +77,25 @@ func TestNonExistingHashesWithServer(t *testing.T) {
 		panic(err)
 	}
 
-	ctx, cancelSimRun := watchSim(sim)
-	defer cancelSimRun()
-
 	//in order to get some meaningful visualization, it is beneficial
 	//to define a minimum duration of this test
 	testDuration := 20 * time.Second
 
-	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+		disconnected := watchDisconnections(ctx, sim)
+		defer func() {
+			if err != nil {
+				if yes, ok := disconnected.Load().(bool); ok && yes {
+					err = errors.New("disconnect events received")
+				}
+			}
+		}()
+
 		//check on the node's FileStore (netstore)
 		id := sim.Net.GetRandomUpNode().ID()
 		item, ok := sim.NodeItem(id, bucketKeyFileStore)
 		if !ok {
-			t.Fatalf("No filestore")
+			return errors.New("No filestore")
 		}
 		fileStore := item.(*storage.FileStore)
 		//create a bogus hash
@@ -213,9 +194,6 @@ func TestSnapshotSyncWithServer(t *testing.T) {
 		panic(err)
 	}
 
-	ctx, cancelSimRun := watchSim(sim)
-	defer cancelSimRun()
-
 	//run the sim
 	result := runSim(conf, ctx, sim, chunkCount)
 
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index a2f24eff0..f98809fc6 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -1049,7 +1049,6 @@ func (s *LDBStore) Close() {
 	s.lock.Unlock()
 	// force writing out current batch
 	s.writeCurrentBatch()
-	close(s.batchesC)
 	s.db.Close()
 }
 
-- 
GitLab