From 597597e8b27ee60a25b4533771702892e72898a5 Mon Sep 17 00:00:00 2001
From: Anton Evangelatov <anton.evangelatov@gmail.com>
Date: Fri, 1 Feb 2019 09:58:46 +0100
Subject: [PATCH] swarm/network: refactor simulation tests bootstrap (#18975)

---
 swarm/network/stream/common_test.go           |  77 ++++++++++++++
 swarm/network/stream/delivery_test.go         |  51 ++-------
 swarm/network/stream/intervals_test.go        |  28 ++---
 .../network/stream/snapshot_retrieval_test.go |  52 +++------
 swarm/network/stream/snapshot_sync_test.go    |  51 ++++-----
 swarm/network/stream/streamer_test.go         |  24 ++---
 swarm/network/stream/syncer_test.go           | 100 ++++--------------
 swarm/network/stream/testing/snapshot_4.json  |   2 +-
 .../visualized_snapshot_sync_sim_test.go      |  18 +---
 9 files changed, 156 insertions(+), 247 deletions(-)

diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 7b2962608..3b6e4a946 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -26,16 +26,19 @@ import (
 	"math/rand"
 	"os"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"time"
 
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p/enode"
+	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
 	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
+	mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
 	"github.com/ethereum/go-ethereum/swarm/testutil"
 	colorable "github.com/mattn/go-colorable"
 )
@@ -66,6 +69,80 @@ func init() {
 	log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
 }
 
+// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
+func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+	addr := network.NewAddr(ctx.Config.Node())
+
+	netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+	if err != nil {
+		return nil, nil, nil, nil, err
+	}
+
+	netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+	return addr, netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
+func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+	netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+	return netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
+func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+	addr := network.NewAddr(ctx.Config.Node())
+
+	netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+	if err != nil {
+		return nil, nil, nil, nil, err
+	}
+
+	netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
+
+	return addr, netStore, delivery, cleanup, nil
+}
+
+func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+	n := ctx.Config.Node()
+
+	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+	if *useMockStore {
+		store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
+	}
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	localStore := store.(*storage.LocalStore)
+	netStore, err := storage.NewNetStore(localStore, nil)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+
+	kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+	delivery := NewDelivery(kad, netStore)
+
+	bucket.Store(bucketKeyStore, store)
+	bucket.Store(bucketKeyDB, netStore)
+	bucket.Store(bucketKeyDelivery, delivery)
+	bucket.Store(bucketKeyFileStore, fileStore)
+
+	cleanup := func() {
+		netStore.Close()
+		os.RemoveAll(datadir)
+	}
+
+	return netStore, delivery, cleanup, nil
+}
+
 func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
 	// setup
 	addr := network.RandomAddr() // tested peers peer address
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 6f1ddc659..cb7690f3e 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -21,7 +21,6 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"os"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -457,27 +456,11 @@ func TestDeliveryFromNodes(t *testing.T) {
 func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
-			store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyStore, store)
-			cleanup = func() {
-				os.RemoveAll(datadir)
-				store.Close()
-			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
+			addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
 			if err != nil {
 				return nil, nil, err
 			}
 
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				SkipCheck: skipCheck,
 				Syncing:   SyncingDisabled,
@@ -485,11 +468,12 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
 			}, nil)
 			bucket.Store(bucketKeyRegistry, r)
 
-			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
-			bucket.Store(bucketKeyFileStore, fileStore)
+			cleanup = func() {
+				r.Close()
+				clean()
+			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
@@ -644,25 +628,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
 func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
-			store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
+			addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
 			if err != nil {
 				return nil, nil, err
 			}
-			bucket.Store(bucketKeyStore, store)
-			cleanup = func() {
-				os.RemoveAll(datadir)
-				store.Close()
-			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
 
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				SkipCheck:       skipCheck,
@@ -670,12 +639,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
 				Retrieval:       RetrievalDisabled,
 				SyncUpdateDelay: 0,
 			}, nil)
+			bucket.Store(bucketKeyRegistry, r)
 
-			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
-			bucket.Store(bucketKeyFileStore, fileStore)
+			cleanup = func() {
+				r.Close()
+				clean()
+			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 8f2bed9d6..248ba0c84 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -21,7 +21,6 @@ import (
 	"encoding/binary"
 	"errors"
 	"fmt"
-	"os"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -31,7 +30,6 @@ import (
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
-	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
@@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
 	externalStreamMaxKeys := uint64(100)
 
 	sim := simulation.New(map[string]simulation.ServiceFunc{
-		"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			n := ctx.Config.Node()
-			addr := network.NewAddr(n)
-			store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+		"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+			addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
 			if err != nil {
 				return nil, nil, err
 			}
-			bucket.Store(bucketKeyStore, store)
-			cleanup = func() {
-				store.Close()
-				os.RemoveAll(datadir)
-			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
 
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				Retrieval: RetrievalDisabled,
@@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
 				return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
 			})
 
-			fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
-			bucket.Store(bucketKeyFileStore, fileStore)
+			cleanup := func() {
+				r.Close()
+				clean()
+			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index d345ac8d0..f097e4180 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -18,7 +18,6 @@ package stream
 import (
 	"context"
 	"fmt"
-	"os"
 	"sync"
 	"testing"
 	"time"
@@ -27,7 +26,6 @@ import (
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/swarm/log"
-	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
@@ -105,43 +103,25 @@ func TestRetrieval(t *testing.T) {
 }
 
 var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
-	"streamer": retrievalStreamerFunc,
-}
-
-func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-	n := ctx.Config.Node()
-	addr := network.NewAddr(n)
-	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
-	if err != nil {
-		return nil, nil, err
-	}
-	bucket.Store(bucketKeyStore, store)
-
-	localStore := store.(*storage.LocalStore)
-	netStore, err := storage.NewNetStore(localStore, nil)
-	if err != nil {
-		return nil, nil, err
-	}
-	kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-	delivery := NewDelivery(kad, netStore)
-	netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
-	r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
-		Retrieval:       RetrievalEnabled,
-		Syncing:         SyncingAutoSubscribe,
-		SyncUpdateDelay: 3 * time.Second,
-	}, nil)
+	"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+		addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+		if err != nil {
+			return nil, nil, err
+		}
 
-	fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
-	bucket.Store(bucketKeyFileStore, fileStore)
+		r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+			Retrieval:       RetrievalEnabled,
+			Syncing:         SyncingAutoSubscribe,
+			SyncUpdateDelay: 3 * time.Second,
+		}, nil)
 
-	cleanup = func() {
-		os.RemoveAll(datadir)
-		netStore.Close()
-		r.Close()
-	}
+		cleanup = func() {
+			r.Close()
+			clean()
+		}
 
-	return r, cleanup, nil
+		return r, cleanup, nil
+	},
 }
 
 /*
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 6af19c12a..c32ed7d07 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -107,42 +107,27 @@ func TestSyncingViaGlobalSync(t *testing.T) {
 }
 
 var simServiceMap = map[string]simulation.ServiceFunc{
-	"streamer": streamerFunc,
-}
+	"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+		addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+		if err != nil {
+			return nil, nil, err
+		}
 
-func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-	n := ctx.Config.Node()
-	addr := network.NewAddr(n)
-	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
-	if err != nil {
-		return nil, nil, err
-	}
-	bucket.Store(bucketKeyStore, store)
-	localStore := store.(*storage.LocalStore)
-	netStore, err := storage.NewNetStore(localStore, nil)
-	if err != nil {
-		return nil, nil, err
-	}
-	kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-	delivery := NewDelivery(kad, netStore)
-	netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
-
-	r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
-		Retrieval:       RetrievalDisabled,
-		Syncing:         SyncingAutoSubscribe,
-		SyncUpdateDelay: 3 * time.Second,
-	}, nil)
-
-	bucket.Store(bucketKeyRegistry, r)
-
-	cleanup = func() {
-		os.RemoveAll(datadir)
-		netStore.Close()
-		r.Close()
-	}
+		r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+			Retrieval:       RetrievalDisabled,
+			Syncing:         SyncingAutoSubscribe,
+			SyncUpdateDelay: 3 * time.Second,
+		}, nil)
 
-	return r, cleanup, nil
+		bucket.Store(bucketKeyRegistry, r)
+
+		cleanup = func() {
+			r.Close()
+			clean()
+		}
 
+		return r, cleanup, nil
+	},
 }
 
 func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index b83521f06..c2aee61b7 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -21,7 +21,6 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"os"
 	"strconv"
 	"strings"
 	"sync"
@@ -37,7 +36,6 @@ import (
 	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 	"github.com/ethereum/go-ethereum/swarm/state"
-	"github.com/ethereum/go-ethereum/swarm/storage"
 	"golang.org/x/crypto/sha3"
 )
 
@@ -1209,26 +1207,18 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 	// create a standard sim
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			n := ctx.Config.Node()
-			addr := network.NewAddr(n)
-			store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+			addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
 			if err != nil {
 				return nil, nil, err
 			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
+
 			// configure so that sync registrations actually happen
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				Retrieval:       RetrievalEnabled,
 				Syncing:         SyncingAutoSubscribe, //enable sync registrations
 				SyncUpdateDelay: syncUpdateDelay,
 			}, nil)
+
 			// get the SubscribeMsg code
 			subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
 			if !ok {
@@ -1236,13 +1226,11 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 			}
 
 			cleanup = func() {
-				os.RemoveAll(datadir)
-				netStore.Close()
 				r.Close()
+				clean()
 			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
@@ -1322,9 +1310,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
 				t.Fatal(err)
 			}
 			//length of the subscriptions can not be smaller than number of peers
-			log.Debug("node subscriptions:", "node", node.String())
+			log.Debug("node subscriptions", "node", node.String())
 			for p, ps := range pstreams {
-				log.Debug("... with: ", "peer", p)
+				log.Debug("... with", "peer", p)
 				for _, s := range ps {
 					log.Debug(".......", "stream", s)
 					// each node also has subscriptions to RETRIEVE_REQUEST streams,
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index 014ec9a98..5656963d9 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -22,7 +22,6 @@ import (
 	"fmt"
 	"io/ioutil"
 	"math"
-	"os"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -38,7 +37,6 @@ import (
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
 	"github.com/ethereum/go-ethereum/swarm/storage/mock"
-	mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
 	"github.com/ethereum/go-ethereum/swarm/testutil"
 )
 
@@ -73,38 +71,14 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			var store storage.ChunkStore
-			var datadir string
-
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
+			addr := network.NewAddr(ctx.Config.Node())
 			//hack to put addresses in same space
 			addr.OAddr[0] = byte(0)
 
-			if *useMockStore {
-				store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr)
-			} else {
-				store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
-			}
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyStore, store)
-			cleanup = func() {
-				store.Close()
-				os.RemoveAll(datadir)
-			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
+			netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
 			if err != nil {
 				return nil, nil, err
 			}
-			bucket.Store(bucketKeyDB, netStore)
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
-			bucket.Store(bucketKeyDelivery, delivery)
 
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				Retrieval: RetrievalDisabled,
@@ -112,11 +86,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
 				SkipCheck: skipCheck,
 			}, nil)
 
-			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
-			bucket.Store(bucketKeyFileStore, fileStore)
+			cleanup = func() {
+				r.Close()
+				clean()
+			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
@@ -251,44 +226,26 @@ func TestSameVersionID(t *testing.T) {
 	v := uint(1)
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			var store storage.ChunkStore
-			var datadir string
-
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
-
-			store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
+			addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
 			if err != nil {
 				return nil, nil, err
 			}
-			bucket.Store(bucketKeyStore, store)
-			cleanup = func() {
-				store.Close()
-				os.RemoveAll(datadir)
-			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyDB, netStore)
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
-			bucket.Store(bucketKeyDelivery, delivery)
 
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				Retrieval: RetrievalDisabled,
 				Syncing:   SyncingAutoSubscribe,
 			}, nil)
+			bucket.Store(bucketKeyRegistry, r)
+
 			//assign to each node the same version ID
 			r.spec.Version = v
 
-			bucket.Store(bucketKeyRegistry, r)
+			cleanup = func() {
+				r.Close()
+				clean()
+			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
@@ -333,46 +290,27 @@ func TestDifferentVersionID(t *testing.T) {
 	v := uint(0)
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			var store storage.ChunkStore
-			var datadir string
-
-			node := ctx.Config.Node()
-			addr := network.NewAddr(node)
-
-			store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
+			addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
 			if err != nil {
 				return nil, nil, err
 			}
-			bucket.Store(bucketKeyStore, store)
-			cleanup = func() {
-				store.Close()
-				os.RemoveAll(datadir)
-			}
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			bucket.Store(bucketKeyDB, netStore)
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
-			bucket.Store(bucketKeyDelivery, delivery)
 
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				Retrieval: RetrievalDisabled,
 				Syncing:   SyncingAutoSubscribe,
 			}, nil)
+			bucket.Store(bucketKeyRegistry, r)
 
 			//increase the version ID for each node
 			v++
 			r.spec.Version = v
 
-			bucket.Store(bucketKeyRegistry, r)
+			cleanup = func() {
+				r.Close()
+				clean()
+			}
 
 			return r, cleanup, nil
-
 		},
 	})
 	defer sim.Close()
diff --git a/swarm/network/stream/testing/snapshot_4.json b/swarm/network/stream/testing/snapshot_4.json
index a64f31375..a8b617407 100644
--- a/swarm/network/stream/testing/snapshot_4.json
+++ b/swarm/network/stream/testing/snapshot_4.json
@@ -1 +1 @@
-{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]}
\ No newline at end of file
+{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]}
diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
index 3b7d0d743..3694dd311 100644
--- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go
+++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
@@ -24,7 +24,6 @@ import (
 	"errors"
 	"fmt"
 	"io"
-	"os"
 	"sync"
 	"testing"
 	"time"
@@ -37,7 +36,6 @@ import (
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/swarm/log"
-	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 	"github.com/ethereum/go-ethereum/swarm/state"
 	"github.com/ethereum/go-ethereum/swarm/storage"
@@ -169,21 +167,10 @@ func TestSnapshotSyncWithServer(t *testing.T) {
 
 	sim := simulation.New(map[string]simulation.ServiceFunc{
 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			n := ctx.Config.Node()
-			addr := network.NewAddr(n)
-			store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+			addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
 			if err != nil {
 				return nil, nil, err
 			}
-			bucket.Store(bucketKeyStore, store)
-			localStore := store.(*storage.LocalStore)
-			netStore, err := storage.NewNetStore(localStore, nil)
-			if err != nil {
-				return nil, nil, err
-			}
-			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
-			delivery := NewDelivery(kad, netStore)
-			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
 
 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 				Retrieval:       RetrievalDisabled,
@@ -199,9 +186,8 @@ func TestSnapshotSyncWithServer(t *testing.T) {
 			bucket.Store(bucketKeyRegistry, tr)
 
 			cleanup = func() {
-				netStore.Close()
 				tr.Close()
-				os.RemoveAll(datadir)
+				clean()
 			}
 
 			return tr, cleanup, nil
-- 
GitLab