From 1bed9b3fea9939581b03cae9d6b4984ced456748 Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Thu, 26 Jan 2017 11:57:31 +0100
Subject: [PATCH] event: address review issues (multiple commits)

event: address Feed review issues

event: clarify role of NewSubscription function

event: more Feed review fixes

* take sendLock after dropping f.mu
* add constant for number of special cases

event: fix subscribing/unsubscribing while Send is blocked
---
 event/feed.go         | 31 +++++++++++++-------
 event/feed_test.go    | 68 +++++++++++++++++++++++++++++++++++++++++++
 event/subscription.go | 10 +++----
 3 files changed, 93 insertions(+), 16 deletions(-)

diff --git a/event/feed.go b/event/feed.go
index bd8e26321..4568304df 100644
--- a/event/feed.go
+++ b/event/feed.go
@@ -33,7 +33,9 @@ var errBadChannel = errors.New("event: Subscribe argument does not have sendable
 //
 // The zero value is ready to use.
 type Feed struct {
-	sendLock  chan struct{}    // one-element buffer, empty when held
+	// sendLock has a one-element buffer and is empty when held.
+	// It protects sendCases.
+	sendLock  chan struct{}
 	removeSub chan interface{} // interrupts Send
 	sendCases caseList         // the active set of select cases used by Send
 
@@ -44,6 +46,10 @@ type Feed struct {
 	closed bool
 }
 
+// This is the index of the first actual subscription channel in sendCases.
+// sendCases[0] is a SelectRecv case for the removeSub channel.
+const firstSubSendCase = 1
+
 type feedTypeError struct {
 	got, want reflect.Type
 	op        string
@@ -67,6 +73,7 @@ func (f *Feed) init() {
 // until the subscription is canceled. All channels added must have the same element type.
 //
 // The channel should have ample buffer space to avoid blocking other subscribers.
+// Slow subscribers are not dropped.
 func (f *Feed) Subscribe(channel interface{}) Subscription {
 	chanval := reflect.ValueOf(channel)
 	chantyp := chanval.Type()
@@ -125,13 +132,14 @@ func (f *Feed) remove(sub *feedSub) {
 func (f *Feed) Send(value interface{}) (nsent int) {
 	f.mu.Lock()
 	f.init()
+	f.mu.Unlock()
+
 	<-f.sendLock
-	// Add new subscriptions from the inbox, then clear it.
+
+	// Add new cases from the inbox after taking the send lock.
+	f.mu.Lock()
 	f.sendCases = append(f.sendCases, f.inbox...)
-	for i := range f.inbox {
-		f.inbox[i] = reflect.SelectCase{}
-	}
-	f.inbox = f.inbox[:0]
+	f.inbox = nil
 	f.mu.Unlock()
 
 	// Set the sent value on all channels.
@@ -140,7 +148,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
 		f.sendLock <- struct{}{}
 		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
 	}
-	for i := 1; i < len(f.sendCases); i++ {
+	for i := firstSubSendCase; i < len(f.sendCases); i++ {
 		f.sendCases[i].Send = rvalue
 	}
 
@@ -150,13 +158,14 @@ func (f *Feed) Send(value interface{}) (nsent int) {
 		// Fast path: try sending without blocking before adding to the select set.
 		// This should usually succeed if subscribers are fast enough and have free
 		// buffer space.
-		for i := 1; i < len(cases); i++ {
+		for i := firstSubSendCase; i < len(cases); i++ {
 			if cases[i].Chan.TrySend(rvalue) {
-				cases = cases.deactivate(i)
 				nsent++
+				cases = cases.deactivate(i)
+				i--
 			}
 		}
-		if len(cases) == 1 {
+		if len(cases) == firstSubSendCase {
 			break
 		}
 		// Select on all the receivers, waiting for them to unblock.
@@ -174,7 +183,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
 	}
 
 	// Forget about the sent value and hand off the send lock.
-	for i := 1; i < len(f.sendCases); i++ {
+	for i := firstSubSendCase; i < len(f.sendCases); i++ {
 		f.sendCases[i].Send = reflect.Value{}
 	}
 	f.sendLock <- struct{}{}
diff --git a/event/feed_test.go b/event/feed_test.go
index 4f897c162..a82c10303 100644
--- a/event/feed_test.go
+++ b/event/feed_test.go
@@ -167,6 +167,74 @@ func TestFeedSubscribeSameChannel(t *testing.T) {
 	done.Wait()
 }
 
+func TestFeedSubscribeBlockedPost(t *testing.T) {
+	var (
+		feed   Feed
+		nsends = 2000
+		ch1    = make(chan int)
+		ch2    = make(chan int)
+		wg     sync.WaitGroup
+	)
+	defer wg.Wait()
+
+	feed.Subscribe(ch1)
+	wg.Add(nsends)
+	for i := 0; i < nsends; i++ {
+		go func() {
+			feed.Send(99)
+			wg.Done()
+		}()
+	}
+
+	sub2 := feed.Subscribe(ch2)
+	defer sub2.Unsubscribe()
+
+	// We're done when ch1 has received N times.
+	// The number of receives on ch2 depends on scheduling.
+	for i := 0; i < nsends; {
+		select {
+		case <-ch1:
+			i++
+		case <-ch2:
+		}
+	}
+}
+
+func TestFeedUnsubscribeBlockedPost(t *testing.T) {
+	var (
+		feed   Feed
+		nsends = 200
+		chans  = make([]chan int, 2000)
+		subs   = make([]Subscription, len(chans))
+		bchan  = make(chan int)
+		bsub   = feed.Subscribe(bchan)
+		wg     sync.WaitGroup
+	)
+	for i := range chans {
+		chans[i] = make(chan int, nsends)
+	}
+
+	// Queue up some Sends. None of these can make progress while bchan isn't read.
+	wg.Add(nsends)
+	for i := 0; i < nsends; i++ {
+		go func() {
+			feed.Send(99)
+			wg.Done()
+		}()
+	}
+	// Subscribe the other channels.
+	for i, ch := range chans {
+		subs[i] = feed.Subscribe(ch)
+	}
+	// Unsubscribe them again.
+	for _, sub := range subs {
+		sub.Unsubscribe()
+	}
+	// Unblock the Sends.
+	bsub.Unsubscribe()
+	wg.Wait()
+}
+
 func TestFeedUnsubscribeFromInbox(t *testing.T) {
 	var (
 		feed Feed
diff --git a/event/subscription.go b/event/subscription.go
index 7f2619b2d..83bd21213 100644
--- a/event/subscription.go
+++ b/event/subscription.go
@@ -43,14 +43,14 @@ type Subscription interface {
 	Unsubscribe()      // cancels sending of events, closing the error channel
 }
 
-// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn
-// is closed when Unsubscribe is called. If fn returns an error, it is sent on the
-// subscription's error channel.
-func NewSubscription(fn func(<-chan struct{}) error) Subscription {
+// NewSubscription runs a producer function as a subscription in a new goroutine. The
+// channel given to the producer is closed when Unsubscribe is called. If fn returns an
+// error, it is sent on the subscription's error channel.
+func NewSubscription(producer func(<-chan struct{}) error) Subscription {
 	s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
 	go func() {
 		defer close(s.err)
-		err := fn(s.unsub)
+		err := producer(s.unsub)
 		s.mu.Lock()
 		defer s.mu.Unlock()
 		if !s.unsubscribed {
-- 
GitLab