From 6d5e100d0dc6fc0b905610850a75b5d4fa907739 Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Tue, 19 Jul 2016 01:39:12 +0200
Subject: [PATCH] event: add new Subscription type and related utilities

This commit introduces a new Subscription type, which is synonymous with
ethereum.Subscription. It also adds a couple of utilities that make
working with Subscriptions easier. The mot complex utility is Feed, a
synchronisation device that implements broadcast subscriptions. Feed is
slightly faster than TypeMux and will replace uses of TypeMux across the
go-ethereum codebase in the future.
---
 event/event.go                     |   2 +-
 event/event_test.go                |  30 +++-
 event/example_feed_test.go         |  73 ++++++++
 event/example_scope_test.go        | 128 ++++++++++++++
 event/example_subscription_test.go |  56 ++++++
 event/feed.go                      | 240 +++++++++++++++++++++++++
 event/feed_test.go                 | 226 ++++++++++++++++++++++++
 event/subscription.go              | 275 +++++++++++++++++++++++++++++
 event/subscription_test.go         | 121 +++++++++++++
 9 files changed, 1144 insertions(+), 7 deletions(-)
 create mode 100644 event/example_feed_test.go
 create mode 100644 event/example_scope_test.go
 create mode 100644 event/example_subscription_test.go
 create mode 100644 event/feed.go
 create mode 100644 event/feed_test.go
 create mode 100644 event/subscription.go
 create mode 100644 event/subscription_test.go

diff --git a/event/event.go b/event/event.go
index f8a2eb013e..d3e84f0f7f 100644
--- a/event/event.go
+++ b/event/event.go
@@ -14,7 +14,7 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-// Package event implements an event multiplexer.
+// Package event deals with subscriptions to real-time events.
 package event
 
 import (
diff --git a/event/event_test.go b/event/event_test.go
index 2c56ecf29f..a12945a471 100644
--- a/event/event_test.go
+++ b/event/event_test.go
@@ -149,16 +149,34 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) {
 	}()
 }
 
-func BenchmarkPost3(b *testing.B) {
-	var mux = new(TypeMux)
-	defer mux.Stop()
-	emptySubscriber(mux, testEvent(0))
-	emptySubscriber(mux, testEvent(0))
-	emptySubscriber(mux, testEvent(0))
+func BenchmarkPost1000(b *testing.B) {
+	var (
+		mux              = new(TypeMux)
+		subscribed, done sync.WaitGroup
+		nsubs            = 1000
+	)
+	subscribed.Add(nsubs)
+	done.Add(nsubs)
+	for i := 0; i < nsubs; i++ {
+		go func() {
+			s := mux.Subscribe(testEvent(0))
+			subscribed.Done()
+			for range s.Chan() {
+			}
+			done.Done()
+		}()
+	}
+	subscribed.Wait()
 
+	// The actual benchmark.
+	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
 		mux.Post(testEvent(0))
 	}
+
+	b.StopTimer()
+	mux.Stop()
+	done.Wait()
 }
 
 func BenchmarkPostConcurrent(b *testing.B) {
diff --git a/event/example_feed_test.go b/event/example_feed_test.go
new file mode 100644
index 0000000000..63436b2269
--- /dev/null
+++ b/event/example_feed_test.go
@@ -0,0 +1,73 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event_test
+
+import (
+	"fmt"
+
+	"github.com/ethereum/go-ethereum/event"
+)
+
+func ExampleFeed_acknowledgedEvents() {
+	// This example shows how the return value of Send can be used for request/reply
+	// interaction between event consumers and producers.
+	var feed event.Feed
+	type ackedEvent struct {
+		i   int
+		ack chan<- struct{}
+	}
+
+	// Consumers wait for events on the feed and acknowledge processing.
+	done := make(chan struct{})
+	defer close(done)
+	for i := 0; i < 3; i++ {
+		ch := make(chan ackedEvent, 100)
+		sub := feed.Subscribe(ch)
+		go func() {
+			defer sub.Unsubscribe()
+			for {
+				select {
+				case ev := <-ch:
+					fmt.Println(ev.i) // "process" the event
+					ev.ack <- struct{}{}
+				case <-done:
+					return
+				}
+			}
+		}()
+	}
+
+	// The producer sends values of type ackedEvent with increasing values of i.
+	// It waits for all consumers to acknowledge before sending the next event.
+	for i := 0; i < 3; i++ {
+		acksignal := make(chan struct{})
+		n := feed.Send(ackedEvent{i, acksignal})
+		for ack := 0; ack < n; ack++ {
+			<-acksignal
+		}
+	}
+	// Output:
+	// 0
+	// 0
+	// 0
+	// 1
+	// 1
+	// 1
+	// 2
+	// 2
+	// 2
+}
diff --git a/event/example_scope_test.go b/event/example_scope_test.go
new file mode 100644
index 0000000000..c517a8324d
--- /dev/null
+++ b/event/example_scope_test.go
@@ -0,0 +1,128 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event_test
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/ethereum/go-ethereum/event"
+)
+
+// This example demonstrates how SubscriptionScope can be used to control the lifetime of
+// subscriptions.
+//
+// Our example program consists of two servers, each of which performs a calculation when
+// requested. The servers also allow subscribing to results of all computations.
+type divServer struct{ results event.Feed }
+type mulServer struct{ results event.Feed }
+
+func (s *divServer) do(a, b int) int {
+	r := a / b
+	s.results.Send(r)
+	return r
+}
+
+func (s *mulServer) do(a, b int) int {
+	r := a * b
+	s.results.Send(r)
+	return r
+}
+
+// The servers are contained in an App. The app controls the servers and exposes them
+// through its API.
+type App struct {
+	divServer
+	mulServer
+	scope event.SubscriptionScope
+}
+
+func (s *App) Calc(op byte, a, b int) int {
+	switch op {
+	case '/':
+		return s.divServer.do(a, b)
+	case '*':
+		return s.mulServer.do(a, b)
+	default:
+		panic("invalid op")
+	}
+}
+
+// The app's SubscribeResults method starts sending calculation results to the given
+// channel. Subscriptions created through this method are tied to the lifetime of the App
+// because they are registered in the scope.
+func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
+	switch op {
+	case '/':
+		return s.scope.Track(s.divServer.results.Subscribe(ch))
+	case '*':
+		return s.scope.Track(s.mulServer.results.Subscribe(ch))
+	default:
+		panic("invalid op")
+	}
+}
+
+// Stop stops the App, closing all subscriptions created through SubscribeResults.
+func (s *App) Stop() {
+	s.scope.Close()
+}
+
+func ExampleSubscriptionScope() {
+	// Create the app.
+	var (
+		app  App
+		wg   sync.WaitGroup
+		divs = make(chan int)
+		muls = make(chan int)
+	)
+
+	// Run a subscriber in the background.
+	divsub := app.SubscribeResults('/', divs)
+	mulsub := app.SubscribeResults('*', muls)
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		defer fmt.Println("subscriber exited")
+		defer divsub.Unsubscribe()
+		defer mulsub.Unsubscribe()
+		for {
+			select {
+			case result := <-divs:
+				fmt.Println("division happened:", result)
+			case result := <-muls:
+				fmt.Println("multiplication happened:", result)
+			case <-divsub.Err():
+				return
+			case <-mulsub.Err():
+				return
+			}
+		}
+	}()
+
+	// Interact with the app.
+	app.Calc('/', 22, 11)
+	app.Calc('*', 3, 4)
+
+	// Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
+	app.Stop()
+	wg.Wait()
+
+	// Output:
+	// division happened: 2
+	// multiplication happened: 12
+	// subscriber exited
+}
diff --git a/event/example_subscription_test.go b/event/example_subscription_test.go
new file mode 100644
index 0000000000..de11266893
--- /dev/null
+++ b/event/example_subscription_test.go
@@ -0,0 +1,56 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event_test
+
+import (
+	"fmt"
+
+	"github.com/ethereum/go-ethereum/event"
+)
+
+func ExampleNewSubscription() {
+	// Create a subscription that sends 10 integers on ch.
+	ch := make(chan int)
+	sub := event.NewSubscription(func(quit <-chan struct{}) error {
+		for i := 0; i < 10; i++ {
+			select {
+			case ch <- i:
+			case <-quit:
+				fmt.Println("unsubscribed")
+				return nil
+			}
+		}
+		return nil
+	})
+
+	// This is the consumer. It reads 5 integers, then aborts the subscription.
+	// Note that Unsubscribe waits until the producer has shut down.
+	for i := range ch {
+		fmt.Println(i)
+		if i == 4 {
+			sub.Unsubscribe()
+			break
+		}
+	}
+	// Output:
+	// 0
+	// 1
+	// 2
+	// 3
+	// 4
+	// unsubscribed
+}
diff --git a/event/feed.go b/event/feed.go
new file mode 100644
index 0000000000..bd8e26321d
--- /dev/null
+++ b/event/feed.go
@@ -0,0 +1,240 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"errors"
+	"reflect"
+	"sync"
+)
+
+var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")
+
+// Feed implements one-to-many subscriptions where the carrier of events is a channel.
+// Values sent to a Feed are delivered to all subscribed channels simultaneously.
+//
+// Feeds can only be used with a single type. The type is determined by the first Send or
+// Subscribe operation. Subsequent calls to these methods panic if the type does not
+// match.
+//
+// The zero value is ready to use.
+type Feed struct {
+	sendLock  chan struct{}    // one-element buffer, empty when held
+	removeSub chan interface{} // interrupts Send
+	sendCases caseList         // the active set of select cases used by Send
+
+	// The inbox holds newly subscribed channels until they are added to sendCases.
+	mu     sync.Mutex
+	inbox  caseList
+	etype  reflect.Type
+	closed bool
+}
+
+type feedTypeError struct {
+	got, want reflect.Type
+	op        string
+}
+
+func (e feedTypeError) Error() string {
+	return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String()
+}
+
+func (f *Feed) init() {
+	if f.sendLock != nil {
+		return
+	}
+	f.removeSub = make(chan interface{})
+	f.sendLock = make(chan struct{}, 1)
+	f.sendLock <- struct{}{}
+	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
+}
+
+// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
+// until the subscription is canceled. All channels added must have the same element type.
+//
+// The channel should have ample buffer space to avoid blocking other subscribers.
+func (f *Feed) Subscribe(channel interface{}) Subscription {
+	chanval := reflect.ValueOf(channel)
+	chantyp := chanval.Type()
+	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
+		panic(errBadChannel)
+	}
+	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
+
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	f.init()
+	if !f.typecheck(chantyp.Elem()) {
+		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
+	}
+	// Add the select case to the inbox.
+	// The next Send will add it to f.sendCases.
+	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
+	f.inbox = append(f.inbox, cas)
+	return sub
+}
+
+// note: callers must hold f.mu
+func (f *Feed) typecheck(typ reflect.Type) bool {
+	if f.etype == nil {
+		f.etype = typ
+		return true
+	}
+	return f.etype == typ
+}
+
+func (f *Feed) remove(sub *feedSub) {
+	// Delete from inbox first, which covers channels
+	// that have not been added to f.sendCases yet.
+	ch := sub.channel.Interface()
+	f.mu.Lock()
+	index := f.inbox.find(ch)
+	if index != -1 {
+		f.inbox = f.inbox.delete(index)
+		f.mu.Unlock()
+		return
+	}
+	f.mu.Unlock()
+
+	select {
+	case f.removeSub <- ch:
+		// Send will remove the channel from f.sendCases.
+	case <-f.sendLock:
+		// No Send is in progress, delete the channel now that we have the send lock.
+		f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
+		f.sendLock <- struct{}{}
+	}
+}
+
+// Send delivers to all subscribed channels simultaneously.
+// It returns the number of subscribers that the value was sent to.
+func (f *Feed) Send(value interface{}) (nsent int) {
+	f.mu.Lock()
+	f.init()
+	<-f.sendLock
+	// Add new subscriptions from the inbox, then clear it.
+	f.sendCases = append(f.sendCases, f.inbox...)
+	for i := range f.inbox {
+		f.inbox[i] = reflect.SelectCase{}
+	}
+	f.inbox = f.inbox[:0]
+	f.mu.Unlock()
+
+	// Set the sent value on all channels.
+	rvalue := reflect.ValueOf(value)
+	if !f.typecheck(rvalue.Type()) {
+		f.sendLock <- struct{}{}
+		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
+	}
+	for i := 1; i < len(f.sendCases); i++ {
+		f.sendCases[i].Send = rvalue
+	}
+
+	// Send until all channels except removeSub have been chosen.
+	cases := f.sendCases
+	for {
+		// Fast path: try sending without blocking before adding to the select set.
+		// This should usually succeed if subscribers are fast enough and have free
+		// buffer space.
+		for i := 1; i < len(cases); i++ {
+			if cases[i].Chan.TrySend(rvalue) {
+				cases = cases.deactivate(i)
+				nsent++
+			}
+		}
+		if len(cases) == 1 {
+			break
+		}
+		// Select on all the receivers, waiting for them to unblock.
+		chosen, recv, _ := reflect.Select(cases)
+		if chosen == 0 /* <-f.removeSub */ {
+			index := f.sendCases.find(recv.Interface())
+			f.sendCases = f.sendCases.delete(index)
+			if index >= 0 && index < len(cases) {
+				cases = f.sendCases[:len(cases)-1]
+			}
+		} else {
+			cases = cases.deactivate(chosen)
+			nsent++
+		}
+	}
+
+	// Forget about the sent value and hand off the send lock.
+	for i := 1; i < len(f.sendCases); i++ {
+		f.sendCases[i].Send = reflect.Value{}
+	}
+	f.sendLock <- struct{}{}
+	return nsent
+}
+
+type feedSub struct {
+	feed    *Feed
+	channel reflect.Value
+	errOnce sync.Once
+	err     chan error
+}
+
+func (sub *feedSub) Unsubscribe() {
+	sub.errOnce.Do(func() {
+		sub.feed.remove(sub)
+		close(sub.err)
+	})
+}
+
+func (sub *feedSub) Err() <-chan error {
+	return sub.err
+}
+
+type caseList []reflect.SelectCase
+
+// find returns the index of a case containing the given channel.
+func (cs caseList) find(channel interface{}) int {
+	for i, cas := range cs {
+		if cas.Chan.Interface() == channel {
+			return i
+		}
+	}
+	return -1
+}
+
+// delete removes the given case from cs.
+func (cs caseList) delete(index int) caseList {
+	return append(cs[:index], cs[index+1:]...)
+}
+
+// deactivate moves the case at index into the non-accessible portion of the cs slice.
+func (cs caseList) deactivate(index int) caseList {
+	last := len(cs) - 1
+	cs[index], cs[last] = cs[last], cs[index]
+	return cs[:last]
+}
+
+// func (cs caseList) String() string {
+//     s := "["
+//     for i, cas := range cs {
+//             if i != 0 {
+//                     s += ", "
+//             }
+//             switch cas.Dir {
+//             case reflect.SelectSend:
+//                     s += fmt.Sprintf("%v<-", cas.Chan.Interface())
+//             case reflect.SelectRecv:
+//                     s += fmt.Sprintf("<-%v", cas.Chan.Interface())
+//             }
+//     }
+//     return s + "]"
+// }
diff --git a/event/feed_test.go b/event/feed_test.go
new file mode 100644
index 0000000000..4f897c162c
--- /dev/null
+++ b/event/feed_test.go
@@ -0,0 +1,226 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"fmt"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestFeedPanics(t *testing.T) {
+	{
+		var f Feed
+		f.Send(int(2))
+		want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+		if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		ch := make(chan int)
+		f.Subscribe(ch)
+		want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+		if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		f.Send(int(2))
+		want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
+		if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
+			t.Error(err)
+		}
+	}
+	{
+		var f Feed
+		if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
+			t.Error(err)
+		}
+	}
+}
+
+func checkPanic(want error, fn func()) (err error) {
+	defer func() {
+		panic := recover()
+		if panic == nil {
+			err = fmt.Errorf("didn't panic")
+		} else if !reflect.DeepEqual(panic, want) {
+			err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
+		}
+	}()
+	fn()
+	return nil
+}
+
+func TestFeed(t *testing.T) {
+	var feed Feed
+	var done, subscribed sync.WaitGroup
+	subscriber := func(i int) {
+		defer done.Done()
+
+		subchan := make(chan int)
+		sub := feed.Subscribe(subchan)
+		timeout := time.NewTimer(2 * time.Second)
+		subscribed.Done()
+
+		select {
+		case v := <-subchan:
+			if v != 1 {
+				t.Errorf("%d: received value %d, want 1", i, v)
+			}
+		case <-timeout.C:
+			t.Errorf("%d: receive timeout", i)
+		}
+
+		sub.Unsubscribe()
+		select {
+		case _, ok := <-sub.Err():
+			if ok {
+				t.Errorf("%d: error channel not closed after unsubscribe", i)
+			}
+		case <-timeout.C:
+			t.Errorf("%d: unsubscribe timeout", i)
+		}
+	}
+
+	const n = 1000
+	done.Add(n)
+	subscribed.Add(n)
+	for i := 0; i < n; i++ {
+		go subscriber(i)
+	}
+	subscribed.Wait()
+	if nsent := feed.Send(1); nsent != n {
+		t.Errorf("first send delivered %d times, want %d", nsent, n)
+	}
+	if nsent := feed.Send(2); nsent != 0 {
+		t.Errorf("second send delivered %d times, want 0", nsent)
+	}
+	done.Wait()
+}
+
+func TestFeedSubscribeSameChannel(t *testing.T) {
+	var (
+		feed Feed
+		done sync.WaitGroup
+		ch   = make(chan int)
+		sub1 = feed.Subscribe(ch)
+		sub2 = feed.Subscribe(ch)
+		_    = feed.Subscribe(ch)
+	)
+	expectSends := func(value, n int) {
+		if nsent := feed.Send(value); nsent != n {
+			t.Errorf("send delivered %d times, want %d", nsent, n)
+		}
+		done.Done()
+	}
+	expectRecv := func(wantValue, n int) {
+		for i := 0; i < n; i++ {
+			if v := <-ch; v != wantValue {
+				t.Errorf("received %d, want %d", v, wantValue)
+			}
+		}
+	}
+
+	done.Add(1)
+	go expectSends(1, 3)
+	expectRecv(1, 3)
+	done.Wait()
+
+	sub1.Unsubscribe()
+
+	done.Add(1)
+	go expectSends(2, 2)
+	expectRecv(2, 2)
+	done.Wait()
+
+	sub2.Unsubscribe()
+
+	done.Add(1)
+	go expectSends(3, 1)
+	expectRecv(3, 1)
+	done.Wait()
+}
+
+func TestFeedUnsubscribeFromInbox(t *testing.T) {
+	var (
+		feed Feed
+		ch1  = make(chan int)
+		ch2  = make(chan int)
+		sub1 = feed.Subscribe(ch1)
+		sub2 = feed.Subscribe(ch1)
+		sub3 = feed.Subscribe(ch2)
+	)
+	if len(feed.inbox) != 3 {
+		t.Errorf("inbox length != 3 after subscribe")
+	}
+	if len(feed.sendCases) != 1 {
+		t.Errorf("sendCases is non-empty after unsubscribe")
+	}
+
+	sub1.Unsubscribe()
+	sub2.Unsubscribe()
+	sub3.Unsubscribe()
+	if len(feed.inbox) != 0 {
+		t.Errorf("inbox is non-empty after unsubscribe")
+	}
+	if len(feed.sendCases) != 1 {
+		t.Errorf("sendCases is non-empty after unsubscribe")
+	}
+}
+
+func BenchmarkFeedSend1000(b *testing.B) {
+	var (
+		done  sync.WaitGroup
+		feed  Feed
+		nsubs = 1000
+	)
+	subscriber := func(ch <-chan int) {
+		for i := 0; i < b.N; i++ {
+			<-ch
+		}
+		done.Done()
+	}
+	done.Add(nsubs)
+	for i := 0; i < nsubs; i++ {
+		ch := make(chan int, 200)
+		feed.Subscribe(ch)
+		go subscriber(ch)
+	}
+
+	// The actual benchmark.
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		if feed.Send(i) != nsubs {
+			panic("wrong number of sends")
+		}
+	}
+
+	b.StopTimer()
+	done.Wait()
+}
diff --git a/event/subscription.go b/event/subscription.go
new file mode 100644
index 0000000000..7f2619b2d7
--- /dev/null
+++ b/event/subscription.go
@@ -0,0 +1,275 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"sync"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common/mclock"
+	"golang.org/x/net/context"
+)
+
+// Subscription represents a stream of events. The carrier of the events is typically a
+// channel, but isn't part of the interface.
+//
+// Subscriptions can fail while established. Failures are reported through an error
+// channel. It receives a value if there is an issue with the subscription (e.g. the
+// network connection delivering the events has been closed). Only one value will ever be
+// sent.
+//
+// The error channel is closed when the subscription ends successfully (i.e. when the
+// source of events is closed). It is also closed when Unsubscribe is called.
+//
+// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
+// cases to ensure that resources related to the subscription are released. It can be
+// called any number of times.
+type Subscription interface {
+	Err() <-chan error // returns the error channel
+	Unsubscribe()      // cancels sending of events, closing the error channel
+}
+
+// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn
+// is closed when Unsubscribe is called. If fn returns an error, it is sent on the
+// subscription's error channel.
+func NewSubscription(fn func(<-chan struct{}) error) Subscription {
+	s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
+	go func() {
+		defer close(s.err)
+		err := fn(s.unsub)
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		if !s.unsubscribed {
+			if err != nil {
+				s.err <- err
+			}
+			s.unsubscribed = true
+		}
+	}()
+	return s
+}
+
+type funcSub struct {
+	unsub        chan struct{}
+	err          chan error
+	mu           sync.Mutex
+	unsubscribed bool
+}
+
+func (s *funcSub) Unsubscribe() {
+	s.mu.Lock()
+	if s.unsubscribed {
+		s.mu.Unlock()
+		return
+	}
+	s.unsubscribed = true
+	close(s.unsub)
+	s.mu.Unlock()
+	// Wait for producer shutdown.
+	<-s.err
+}
+
+func (s *funcSub) Err() <-chan error {
+	return s.err
+}
+
+// Resubscribe calls fn repeatedly to keep a subscription established. When the
+// subscription is established, Resubscribe waits for it to fail and calls fn again. This
+// process repeats until Unsubscribe is called or the active subscription ends
+// successfully.
+//
+// Resubscribe applies backoff between calls to fn. The time between calls is adapted
+// based on the error rate, but will never exceed backoffMax.
+func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
+	s := &resubscribeSub{
+		waitTime:   backoffMax / 10,
+		backoffMax: backoffMax,
+		fn:         fn,
+		err:        make(chan error),
+		unsub:      make(chan struct{}),
+	}
+	go s.loop()
+	return s
+}
+
+// A ResubscribeFunc attempts to establish a subscription.
+type ResubscribeFunc func(context.Context) (Subscription, error)
+
+type resubscribeSub struct {
+	fn                   ResubscribeFunc
+	err                  chan error
+	unsub                chan struct{}
+	unsubOnce            sync.Once
+	lastTry              mclock.AbsTime
+	waitTime, backoffMax time.Duration
+}
+
+func (s *resubscribeSub) Unsubscribe() {
+	s.unsubOnce.Do(func() {
+		s.unsub <- struct{}{}
+		<-s.err
+	})
+}
+
+func (s *resubscribeSub) Err() <-chan error {
+	return s.err
+}
+
+func (s *resubscribeSub) loop() {
+	defer close(s.err)
+	var done bool
+	for !done {
+		sub := s.subscribe()
+		if sub == nil {
+			break
+		}
+		done = s.waitForError(sub)
+		sub.Unsubscribe()
+	}
+}
+
+func (s *resubscribeSub) subscribe() Subscription {
+	subscribed := make(chan error)
+	var sub Subscription
+retry:
+	for {
+		s.lastTry = mclock.Now()
+		ctx, cancel := context.WithCancel(context.Background())
+		go func() {
+			rsub, err := s.fn(ctx)
+			sub = rsub
+			subscribed <- err
+		}()
+		select {
+		case err := <-subscribed:
+			cancel()
+			if err != nil {
+				// Subscribing failed, wait before launching the next try.
+				if s.backoffWait() {
+					return nil
+				}
+				continue retry
+			}
+			if sub == nil {
+				panic("event: ResubscribeFunc returned nil subscription and no error")
+			}
+			return sub
+		case <-s.unsub:
+			cancel()
+			return nil
+		}
+	}
+}
+
+func (s *resubscribeSub) waitForError(sub Subscription) bool {
+	defer sub.Unsubscribe()
+	select {
+	case err := <-sub.Err():
+		return err == nil
+	case <-s.unsub:
+		return true
+	}
+}
+
+func (s *resubscribeSub) backoffWait() bool {
+	if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
+		s.waitTime = s.backoffMax / 10
+	} else {
+		s.waitTime *= 2
+		if s.waitTime > s.backoffMax {
+			s.waitTime = s.backoffMax
+		}
+	}
+
+	t := time.NewTimer(s.waitTime)
+	defer t.Stop()
+	select {
+	case <-t.C:
+		return false
+	case <-s.unsub:
+		return true
+	}
+}
+
+// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
+//
+// For code that handle more than one subscription, a scope can be used to conveniently
+// unsubscribe all of them with a single call. The example demonstrates a typical use in a
+// larger program.
+//
+// The zero value is ready to use.
+type SubscriptionScope struct {
+	mu     sync.Mutex
+	subs   map[*scopeSub]struct{}
+	closed bool
+}
+
+type scopeSub struct {
+	sc *SubscriptionScope
+	s  Subscription
+}
+
+// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
+// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
+// scope.
+func (sc *SubscriptionScope) Track(s Subscription) Subscription {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	if sc.closed {
+		return nil
+	}
+	if sc.subs == nil {
+		sc.subs = make(map[*scopeSub]struct{})
+	}
+	ss := &scopeSub{sc, s}
+	sc.subs[ss] = struct{}{}
+	return ss
+}
+
+// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
+// the tracked set. Calls to Track after Close return nil.
+func (sc *SubscriptionScope) Close() {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	if sc.closed {
+		return
+	}
+	sc.closed = true
+	for s := range sc.subs {
+		s.s.Unsubscribe()
+	}
+	sc.subs = nil
+}
+
+// Count returns the number of tracked subscriptions.
+// It is meant to be used for debugging.
+func (sc *SubscriptionScope) Count() int {
+	sc.mu.Lock()
+	defer sc.mu.Unlock()
+	return len(sc.subs)
+}
+
+func (s *scopeSub) Unsubscribe() {
+	s.s.Unsubscribe()
+	s.sc.mu.Lock()
+	defer s.sc.mu.Unlock()
+	delete(s.sc.subs, s)
+}
+
+func (s *scopeSub) Err() <-chan error {
+	return s.s.Err()
+}
diff --git a/event/subscription_test.go b/event/subscription_test.go
new file mode 100644
index 0000000000..a4fe30298c
--- /dev/null
+++ b/event/subscription_test.go
@@ -0,0 +1,121 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+	"errors"
+	"testing"
+	"time"
+
+	"golang.org/x/net/context"
+)
+
+var errInts = errors.New("error in subscribeInts")
+
+func subscribeInts(max, fail int, c chan<- int) Subscription {
+	return NewSubscription(func(quit <-chan struct{}) error {
+		for i := 0; i < max; i++ {
+			if i >= fail {
+				return errInts
+			}
+			select {
+			case c <- i:
+			case <-quit:
+				return nil
+			}
+		}
+		return nil
+	})
+}
+
+func TestNewSubscriptionError(t *testing.T) {
+	t.Parallel()
+
+	channel := make(chan int)
+	sub := subscribeInts(10, 2, channel)
+loop:
+	for want := 0; want < 10; want++ {
+		select {
+		case got := <-channel:
+			if got != want {
+				t.Fatalf("wrong int %d, want %d", got, want)
+			}
+		case err := <-sub.Err():
+			if err != errInts {
+				t.Fatalf("wrong error: got %q, want %q", err, errInts)
+			}
+			if want != 2 {
+				t.Fatalf("got errInts at int %d, should be received at 2", want)
+			}
+			break loop
+		}
+	}
+	sub.Unsubscribe()
+
+	err, ok := <-sub.Err()
+	if err != nil {
+		t.Fatal("got non-nil error after Unsubscribe")
+	}
+	if ok {
+		t.Fatal("channel still open after Unsubscribe")
+	}
+}
+
+func TestResubscribe(t *testing.T) {
+	t.Parallel()
+
+	var i int
+	nfails := 6
+	sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) {
+		// fmt.Printf("call #%d @ %v\n", i, time.Now())
+		i++
+		if i == 2 {
+			// Delay the second failure a bit to reset the resubscribe interval.
+			time.Sleep(200 * time.Millisecond)
+		}
+		if i < nfails {
+			return nil, errors.New("oops")
+		}
+		sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil })
+		return sub, nil
+	})
+
+	<-sub.Err()
+	if i != nfails {
+		t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
+	}
+}
+
+func TestResubscribeAbort(t *testing.T) {
+	t.Parallel()
+
+	done := make(chan error)
+	sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
+		select {
+		case <-ctx.Done():
+			done <- nil
+		case <-time.After(2 * time.Second):
+			done <- errors.New("context given to resubscribe function not canceled within 2s")
+		}
+		return nil, nil
+	})
+
+	sub.Unsubscribe()
+	if err := <-done; err != nil {
+		t.Fatal(err)
+	}
+}
-- 
GitLab