From 30263ad37d49c014512a9a7d8abdd17f305843e9 Mon Sep 17 00:00:00 2001
From: Anton Evangelatov <anton.evangelatov@gmail.com>
Date: Fri, 31 May 2019 11:13:34 +0200
Subject: [PATCH] swarm/storage: set false, only when we get a chunk back
 (#19599)

---
 swarm/network/stream/syncer.go                |  4 --
 swarm/storage/localstore/subscription_pull.go |  8 +--
 .../localstore/subscription_pull_test.go      | 49 +++++++++++++++++++
 3 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index 7a61950ed..7957a8bf7 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -93,10 +93,6 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
 // are added in batchTimeout period, the batch will be returned. This function
 // will block until new chunks are received from localstore pull subscription.
 func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
-	//TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer?
-	if from > 0 {
-		from--
-	}
 	batchStart := time.Now()
 	descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
 	defer stop()
diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go
index ce539924b..0f7e48729 100644
--- a/swarm/storage/localstore/subscription_pull.go
+++ b/swarm/storage/localstore/subscription_pull.go
@@ -31,9 +31,9 @@ import (
 
 // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
 // Pull syncing index can be only subscribed to a particular proximity order bin. If since
-// is not 0, the iteration will start from the first item stored after that id. If until is not 0,
+// is not 0, the iteration will start from the since item (the item with binID == since). If until is not 0,
 // only chunks stored up to this id will be sent to the channel, and the returned channel will be
-// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop
+// closed. The since-until interval is closed on since side, and closed on until side: [since,until]. Returned stop
 // function will terminate current and further iterations without errors, and also close the returned channel.
 // Make sure that you check the second returned parameter from the channel to stop iteration when its value
 // is false.
@@ -135,7 +135,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
 					log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
 					return
 				}
-				first = false
+				if count > 0 {
+					first = false
+				}
 			case <-stopChan:
 				// terminate the subscription
 				// on stop
diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go
index bf364ed44..95a2fa8b1 100644
--- a/swarm/storage/localstore/subscription_pull_test.go
+++ b/swarm/storage/localstore/subscription_pull_test.go
@@ -28,6 +28,55 @@ import (
 	"github.com/ethereum/go-ethereum/swarm/shed"
 )
 
+// TestDB_SubscribePull_first is a regression test for the first=false (from-1) bug
+// The bug was that `first=false` was not behind an if-condition `if count > 0`. This resulted in chunks being missed, when
+// the subscription is established before the chunk is actually uploaded. For example if a subscription is established with since=49,
+// which means that the `SubscribePull` method should return chunk with BinID=49 via the channel, and the chunk for BinID=49 is uploaded,
+// after the subscription, then it would have been skipped, where the correct behaviour is to not skip it and return it via the channel.
+func TestDB_SubscribePull_first(t *testing.T) {
+	db, cleanupFunc := newTestDB(t, nil)
+	defer cleanupFunc()
+
+	addrs := make(map[uint8][]chunk.Address)
+	var addrsMu sync.Mutex
+	var wantedChunksCount int
+
+	// prepopulate database with some chunks
+	// before the subscription
+	uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 100)
+
+	// any bin should do the trick
+	bin := uint8(1)
+
+	chunksInGivenBin := uint64(len(addrs[bin]))
+
+	errc := make(chan error)
+
+	since := chunksInGivenBin + 1
+
+	go func() {
+		ch, stop := db.SubscribePull(context.TODO(), bin, since, 0)
+		defer stop()
+
+		chnk := <-ch
+
+		if chnk.BinID != since {
+			errc <- fmt.Errorf("expected chunk.BinID to be %v , but got %v", since, chnk.BinID)
+		} else {
+			errc <- nil
+		}
+	}()
+
+	time.Sleep(100 * time.Millisecond)
+
+	uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 100)
+
+	err := <-errc
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 // TestDB_SubscribePull uploads some chunks before and after
 // pull syncing subscription is created and validates if
 // all addresses are received in the right order
-- 
GitLab