From 324027640bcaf137b8c9e96bc26f0833711497af Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= <janos@users.noreply.github.com>
Date: Thu, 15 Nov 2018 21:06:27 +0100
Subject: [PATCH] swarm/network/simulation: use simulations.Event instead
 p2p.PeerEvent (#18098)

---
 swarm/network/simulation/events.go         | 114 +++++++++++++++------
 swarm/network/simulation/example_test.go   |  13 ++-
 swarm/network/stream/delivery_test.go      |   8 +-
 swarm/network/stream/intervals_test.go     |   5 +-
 swarm/network/stream/snapshot_sync_test.go |  11 +-
 swarm/network/stream/syncer_test.go        |   5 +-
 6 files changed, 101 insertions(+), 55 deletions(-)

diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go
index 594d36225..d73c3af4e 100644
--- a/swarm/network/simulation/events.go
+++ b/swarm/network/simulation/events.go
@@ -20,16 +20,18 @@ import (
 	"context"
 	"sync"
 
-	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
+	"github.com/ethereum/go-ethereum/p2p/simulations"
 )
 
 // PeerEvent is the type of the channel returned by Simulation.PeerEvents.
 type PeerEvent struct {
 	// NodeID is the ID of node that the event is caught on.
 	NodeID enode.ID
+	// PeerID is the ID of the peer node that the event is caught on.
+	PeerID enode.ID
 	// Event is the event that is caught.
-	Event *p2p.PeerEvent
+	Event *simulations.Event
 	// Error is the error that may have happened during event watching.
 	Error error
 }
@@ -37,9 +39,13 @@ type PeerEvent struct {
 // PeerEventsFilter defines a filter on PeerEvents to exclude messages with
 // defined properties. Use PeerEventsFilter methods to set required options.
 type PeerEventsFilter struct {
-	t        *p2p.PeerEventType
-	protocol *string
-	msgCode  *uint64
+	eventType simulations.EventType
+
+	connUp *bool
+
+	msgReceive *bool
+	protocol   *string
+	msgCode    *uint64
 }
 
 // NewPeerEventsFilter returns a new PeerEventsFilter instance.
@@ -47,20 +53,48 @@ func NewPeerEventsFilter() *PeerEventsFilter {
 	return &PeerEventsFilter{}
 }
 
-// Type sets the filter to only one peer event type.
-func (f *PeerEventsFilter) Type(t p2p.PeerEventType) *PeerEventsFilter {
-	f.t = &t
+// Connect sets the filter to events when two nodes connect.
+func (f *PeerEventsFilter) Connect() *PeerEventsFilter {
+	f.eventType = simulations.EventTypeConn
+	b := true
+	f.connUp = &b
+	return f
+}
+
+// Drop sets the filter to events when two nodes disconnect.
+func (f *PeerEventsFilter) Drop() *PeerEventsFilter {
+	f.eventType = simulations.EventTypeConn
+	b := false
+	f.connUp = &b
+	return f
+}
+
+// ReceivedMessages sets the filter to only messages that are received.
+func (f *PeerEventsFilter) ReceivedMessages() *PeerEventsFilter {
+	f.eventType = simulations.EventTypeMsg
+	b := true
+	f.msgReceive = &b
+	return f
+}
+
+// SentMessages sets the filter to only messages that are sent.
+func (f *PeerEventsFilter) SentMessages() *PeerEventsFilter {
+	f.eventType = simulations.EventTypeMsg
+	b := false
+	f.msgReceive = &b
 	return f
 }
 
 // Protocol sets the filter to only one message protocol.
 func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter {
+	f.eventType = simulations.EventTypeMsg
 	f.protocol = &p
 	return f
 }
 
 // MsgCode sets the filter to only one msg code.
 func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
+	f.eventType = simulations.EventTypeMsg
 	f.msgCode = &c
 	return f
 }
@@ -80,19 +114,8 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...
 		go func(id enode.ID) {
 			defer s.shutdownWG.Done()
 
-			client, err := s.Net.GetNode(id).Client()
-			if err != nil {
-				subsWG.Done()
-				eventC <- PeerEvent{NodeID: id, Error: err}
-				return
-			}
-			events := make(chan *p2p.PeerEvent)
-			sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
-			if err != nil {
-				subsWG.Done()
-				eventC <- PeerEvent{NodeID: id, Error: err}
-				return
-			}
+			events := make(chan *simulations.Event)
+			sub := s.Net.Events().Subscribe(events)
 			defer sub.Unsubscribe()
 
 			subsWG.Done()
@@ -110,28 +133,55 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...
 				case <-s.Done():
 					return
 				case e := <-events:
+					// ignore control events
+					if e.Control {
+						continue
+					}
 					match := len(filters) == 0 // if there are no filters match all events
 					for _, f := range filters {
-						if f.t != nil && *f.t != e.Type {
-							continue
+						if f.eventType == simulations.EventTypeConn && e.Conn != nil {
+							if *f.connUp != e.Conn.Up {
+								continue
+							}
+							// all connection filter parameters matched, break the loop
+							match = true
+							break
+						}
+						if f.eventType == simulations.EventTypeMsg && e.Msg != nil {
+							if f.msgReceive != nil && *f.msgReceive != e.Msg.Received {
+								continue
+							}
+							if f.protocol != nil && *f.protocol != e.Msg.Protocol {
+								continue
+							}
+							if f.msgCode != nil && *f.msgCode != e.Msg.Code {
+								continue
+							}
+							// all message filter parameters matched, break the loop
+							match = true
+							break
 						}
-						if f.protocol != nil && *f.protocol != e.Protocol {
-							continue
+					}
+					var peerID enode.ID
+					switch e.Type {
+					case simulations.EventTypeConn:
+						peerID = e.Conn.One
+						if peerID == id {
+							peerID = e.Conn.Other
 						}
-						if f.msgCode != nil && e.MsgCode != nil && *f.msgCode != *e.MsgCode {
-							continue
+					case simulations.EventTypeMsg:
+						peerID = e.Msg.One
+						if peerID == id {
+							peerID = e.Msg.Other
 						}
-						// all filter parameters matched, break the loop
-						match = true
-						break
 					}
 					if match {
 						select {
-						case eventC <- PeerEvent{NodeID: id, Event: e}:
+						case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Event: e}:
 						case <-ctx.Done():
 							if err := ctx.Err(); err != nil {
 								select {
-								case eventC <- PeerEvent{NodeID: id, Error: err}:
+								case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Error: err}:
 								case <-s.Done():
 								}
 							}
diff --git a/swarm/network/simulation/example_test.go b/swarm/network/simulation/example_test.go
index 84b0634b4..bacc64d53 100644
--- a/swarm/network/simulation/example_test.go
+++ b/swarm/network/simulation/example_test.go
@@ -24,7 +24,6 @@ import (
 
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/node"
-	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/swarm/network"
 	"github.com/ethereum/go-ethereum/swarm/network/simulation"
@@ -87,7 +86,7 @@ func ExampleSimulation_PeerEvents() {
 				log.Error("peer event", "err", e.Error)
 				continue
 			}
-			log.Info("peer event", "node", e.NodeID, "peer", e.Event.Peer, "msgcode", e.Event.MsgCode)
+			log.Info("peer event", "node", e.NodeID, "peer", e.PeerID, "type", e.Event.Type)
 		}
 	}()
 }
@@ -100,7 +99,7 @@ func ExampleSimulation_PeerEvents_disconnections() {
 	disconnections := sim.PeerEvents(
 		context.Background(),
 		sim.NodeIDs(),
-		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+		simulation.NewPeerEventsFilter().Drop(),
 	)
 
 	go func() {
@@ -109,7 +108,7 @@ func ExampleSimulation_PeerEvents_disconnections() {
 				log.Error("peer drop", "err", d.Error)
 				continue
 			}
-			log.Warn("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+			log.Warn("peer drop", "node", d.NodeID, "peer", d.PeerID)
 		}
 	}()
 }
@@ -124,8 +123,8 @@ func ExampleSimulation_PeerEvents_multipleFilters() {
 		context.Background(),
 		sim.NodeIDs(),
 		// Watch when bzz messages 1 and 4 are received.
-		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("bzz").MsgCode(1),
-		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("bzz").MsgCode(4),
+		simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(1),
+		simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(4),
 	)
 
 	go func() {
@@ -134,7 +133,7 @@ func ExampleSimulation_PeerEvents_multipleFilters() {
 				log.Error("bzz message", "err", m.Error)
 				continue
 			}
-			log.Info("bzz message", "node", m.NodeID, "peer", m.Event.Peer)
+			log.Info("bzz message", "node", m.NodeID, "peer", m.PeerID)
 		}
 	}()
 }
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index c9a530115..6b6025115 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -565,13 +565,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
 		disconnections := sim.PeerEvents(
 			context.Background(),
 			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+			simulation.NewPeerEventsFilter().Drop(),
 		)
 
 		go func() {
 			for d := range disconnections {
 				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
 					t.Fatal(d.Error)
 				}
 			}
@@ -697,13 +697,13 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
 		disconnections := sim.PeerEvents(
 			context.Background(),
 			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+			simulation.NewPeerEventsFilter().Drop(),
 		)
 
 		go func() {
 			for d := range disconnections {
 				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
 					b.Fatal(d.Error)
 				}
 			}
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 037984f22..b9525d4a4 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -27,7 +27,6 @@ import (
 
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/node"
-	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/swarm/network"
@@ -154,7 +153,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
 		disconnections := sim.PeerEvents(
 			context.Background(),
 			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+			simulation.NewPeerEventsFilter().Drop(),
 		)
 
 		err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
@@ -165,7 +164,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
 		go func() {
 			for d := range disconnections {
 				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
 					t.Fatal(d.Error)
 				}
 			}
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 4bd7f38f5..96c37bddc 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -27,7 +27,6 @@ import (
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/node"
-	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
@@ -210,12 +209,12 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 	disconnections := sim.PeerEvents(
 		context.Background(),
 		sim.NodeIDs(),
-		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+		simulation.NewPeerEventsFilter().Drop(),
 	)
 
 	go func() {
 		for d := range disconnections {
-			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
 			t.Fatal("unexpected disconnect")
 			cancelSimRun()
 		}
@@ -402,12 +401,12 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
 	disconnections := sim.PeerEvents(
 		context.Background(),
 		sim.NodeIDs(),
-		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+		simulation.NewPeerEventsFilter().Drop(),
 	)
 
 	go func() {
 		for d := range disconnections {
-			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
 			t.Fatal("unexpected disconnect")
 			cancelSimRun()
 		}
@@ -428,7 +427,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
 
 		var subscriptionCount int
 
-		filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4)
+		filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
 		eventC := sim.PeerEvents(ctx, nodeIDs, filter)
 
 		for j, node := range nodeIDs {
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index a543cae05..f4e055451 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -28,7 +28,6 @@ import (
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/node"
-	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 	"github.com/ethereum/go-ethereum/swarm/log"
@@ -151,13 +150,13 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
 		disconnections := sim.PeerEvents(
 			context.Background(),
 			sim.NodeIDs(),
-			simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+			simulation.NewPeerEventsFilter().Drop(),
 		)
 
 		go func() {
 			for d := range disconnections {
 				if d.Error != nil {
-					log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
 					t.Fatal(d.Error)
 				}
 			}
-- 
GitLab