From ae857e74bfda1f961dc5741441e5b36a2bb9aa93 Mon Sep 17 00:00:00 2001
From: holisticode <holistic.computing@gmail.com>
Date: Mon, 7 Jan 2019 18:59:00 -0500
Subject: [PATCH] swarm, p2p/protocols: Stream accounting (#18337)

* swarm: completed 1st phase of swap accounting

* swarm, p2p/protocols: added stream pricing

* swarm/network/stream: gofmt simplify stream.go

* swarm: fixed review comments

* swarm: used snapshots for swap tests

* swarm: custom retrieve for swap (less cascaded requests at any one time)

* swarm: addressed PR comments

* swarm: log output formatting

* swarm: removed parallelism in swap tests

* swarm: swap tests simplification

* swarm: removed swap_test.go

* swarm/network/stream: added prefix space for comments

* swarm/network/stream: unit test for prices

* swarm/network/stream: don't hardcode price

* swarm/network/stream: fixed invalid price check
---
 p2p/protocols/accounting.go           | 148 +++++++++++++-------------
 swarm/network/stream/stream.go        | 110 +++++++++++++------
 swarm/network/stream/streamer_test.go |  31 ++++++
 3 files changed, 185 insertions(+), 104 deletions(-)

diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go
index 770406a27..bdc490e59 100644
--- a/p2p/protocols/accounting.go
+++ b/p2p/protocols/accounting.go
@@ -22,31 +22,33 @@ import (
 	"github.com/ethereum/go-ethereum/metrics"
 )
 
-//define some metrics
+// define some metrics
 var (
-	//All metrics are cumulative
+	// All metrics are cumulative
 
-	//total amount of units credited
+	// total amount of units credited
 	mBalanceCredit metrics.Counter
-	//total amount of units debited
+	// total amount of units debited
 	mBalanceDebit metrics.Counter
-	//total amount of bytes credited
+	// total amount of bytes credited
 	mBytesCredit metrics.Counter
-	//total amount of bytes debited
+	// total amount of bytes debited
 	mBytesDebit metrics.Counter
-	//total amount of credited messages
+	// total amount of credited messages
 	mMsgCredit metrics.Counter
-	//total amount of debited messages
+	// total amount of debited messages
 	mMsgDebit metrics.Counter
-	//how many times local node had to drop remote peers
+	// how many times local node had to drop remote peers
 	mPeerDrops metrics.Counter
-	//how many times local node overdrafted and dropped
+	// how many times local node overdrafted and dropped
 	mSelfDrops metrics.Counter
+
+	MetricsRegistry metrics.Registry
 )
 
-//Prices defines how prices are being passed on to the accounting instance
+// Prices defines how prices are being passed on to the accounting instance
 type Prices interface {
-	//Return the Price for a message
+	// Return the Price for a message
 	Price(interface{}) *Price
 }
 
@@ -57,20 +59,20 @@ const (
 	Receiver = Payer(false)
 )
 
-//Price represents the costs of a message
+// Price represents the costs of a message
 type Price struct {
-	Value   uint64 //
-	PerByte bool   //True if the price is per byte or for unit
+	Value   uint64
+	PerByte bool // True if the price is per byte or for unit
 	Payer   Payer
 }
 
-//For gives back the price for a message
-//A protocol provides the message price in absolute value
-//This method then returns the correct signed amount,
-//depending on who pays, which is identified by the `payer` argument:
-//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
-//Thus: If Sending and sender pays, amount positive, otherwise negative
-//If Receiving, and receiver pays, amount positive, otherwise negative
+// For gives back the price for a message
+// A protocol provides the message price in absolute value
+// This method then returns the correct signed amount,
+// depending on who pays, which is identified by the `payer` argument:
+// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
+// Thus: If Sending and sender pays, amount positive, otherwise negative
+// If Receiving, and receiver pays, amount positive, otherwise negative
 func (p *Price) For(payer Payer, size uint32) int64 {
 	price := p.Value
 	if p.PerByte {
@@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 {
 	return int64(price)
 }
 
-//Balance is the actual accounting instance
-//Balance defines the operations needed for accounting
-//Implementations internally maintain the balance for every peer
+// Balance is the actual accounting instance
+// Balance defines the operations needed for accounting
+// Implementations internally maintain the balance for every peer
 type Balance interface {
-	//Adds amount to the local balance with remote node `peer`;
-	//positive amount = credit local node
-	//negative amount = debit local node
+	// Adds amount to the local balance with remote node `peer`;
+	// positive amount = credit local node
+	// negative amount = debit local node
 	Add(amount int64, peer *Peer) error
 }
 
-//Accounting implements the Hook interface
-//It interfaces to the balances through the Balance interface,
-//while interfacing with protocols and its prices through the Prices interface
+// Accounting implements the Hook interface
+// It interfaces to the balances through the Balance interface,
+// while interfacing with protocols and its prices through the Prices interface
 type Accounting struct {
-	Balance //interface to accounting logic
-	Prices  //interface to prices logic
+	Balance // interface to accounting logic
+	Prices  // interface to prices logic
 }
 
 func NewAccounting(balance Balance, po Prices) *Accounting {
@@ -108,79 +110,77 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
 	return ah
 }
 
-//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
-//this registry should be independent of any other metrics as it persists at different endpoints.
-//It also instantiates the given metrics and starts the persisting go-routine which
-//at the passed interval writes the metrics to a LevelDB
+// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
+// this registry should be independent of any other metrics as it persists at different endpoints.
+// It also instantiates the given metrics and starts the persisting go-routine which
+// at the passed interval writes the metrics to a LevelDB
 func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
-	//create an empty registry
-	registry := metrics.NewRegistry()
-	//instantiate the metrics
-	mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
-	mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
-	mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
-	mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
-	mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
-	mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
-	mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
-	mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
-	//create the DB and start persisting
-	return NewAccountingMetrics(registry, reportInterval, path)
+	// create an empty registry
+	MetricsRegistry = metrics.NewRegistry()
+	// instantiate the metrics
+	mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
+	mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
+	mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
+	mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
+	mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
+	mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
+	mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
+	mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
+	// create the DB and start persisting
+	return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
 }
 
-//Implement Hook.Send
 // Send takes a peer, a size and a msg and
-// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
-// - credits/debits local node using balance interface
+//   - calculates the cost for the local node sending a msg of size to peer using the Prices interface
+//   - credits/debits local node using balance interface
 func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
-	//get the price for a message (through the protocol spec)
+	// get the price for a message (through the protocol spec)
 	price := ah.Price(msg)
-	//this message doesn't need accounting
+	// this message doesn't need accounting
 	if price == nil {
 		return nil
 	}
-	//evaluate the price for sending messages
+	// evaluate the price for sending messages
 	costToLocalNode := price.For(Sender, size)
-	//do the accounting
+	// do the accounting
 	err := ah.Add(costToLocalNode, peer)
-	//record metrics: just increase counters for user-facing metrics
+	// record metrics: just increase counters for user-facing metrics
 	ah.doMetrics(costToLocalNode, size, err)
 	return err
 }
 
-//Implement Hook.Receive
 // Receive takes a peer, a size and a msg and
-// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
-// - credits/debits local node using balance interface
+//   - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
+//   - credits/debits local node using balance interface
 func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
-	//get the price for a message (through the protocol spec)
+	// get the price for a message (through the protocol spec)
 	price := ah.Price(msg)
-	//this message doesn't need accounting
+	// this message doesn't need accounting
 	if price == nil {
 		return nil
 	}
-	//evaluate the price for receiving messages
+	// evaluate the price for receiving messages
 	costToLocalNode := price.For(Receiver, size)
-	//do the accounting
+	// do the accounting
 	err := ah.Add(costToLocalNode, peer)
-	//record metrics: just increase counters for user-facing metrics
+	// record metrics: just increase counters for user-facing metrics
 	ah.doMetrics(costToLocalNode, size, err)
 	return err
 }
 
-//record some metrics
-//this is not an error handling. `err` is returned by both `Send` and `Receive`
-//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
-//if the limit has been violated and `err` is thus not nil:
-// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
-// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
+// record some metrics
+// this is not an error handling. `err` is returned by both `Send` and `Receive`
+// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
+// if the limit has been violated and `err` is thus not nil:
+//   * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
+//   * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
 func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
 	if price > 0 {
 		mBalanceCredit.Inc(price)
 		mBytesCredit.Inc(int64(size))
 		mMsgCredit.Inc(1)
 		if err != nil {
-			//increase the number of times a remote node has been dropped due to "overdraft"
+			// increase the number of times a remote node has been dropped due to "overdraft"
 			mPeerDrops.Inc(1)
 		}
 	} else {
@@ -188,7 +188,7 @@ func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
 		mBytesDebit.Inc(int64(size))
 		mMsgDebit.Inc(1)
 		if err != nil {
-			//increase the number of times the local node has done an "overdraft" in respect to other nodes
+			// increase the number of times the local node has done an "overdraft" in respect to other nodes
 			mSelfDrops.Inc(1)
 		}
 	}
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 090bef8d1..2e2c3c418 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -48,28 +48,28 @@ const (
 	HashSize         = 32
 )
 
-//Enumerate options for syncing and retrieval
+// Enumerate options for syncing and retrieval
 type SyncingOption int
 type RetrievalOption int
 
-//Syncing options
+// Syncing options
 const (
-	//Syncing disabled
+	// Syncing disabled
 	SyncingDisabled SyncingOption = iota
-	//Register the client and the server but not subscribe
+	// Register the client and the server but not subscribe
 	SyncingRegisterOnly
-	//Both client and server funcs are registered, subscribe sent automatically
+	// Both client and server funcs are registered, subscribe sent automatically
 	SyncingAutoSubscribe
 )
 
 const (
-	//Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
+	// Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
 	RetrievalDisabled RetrievalOption = iota
-	//Only the client side of the retrieve request is registered.
-	//(light nodes do not serve retrieve requests)
-	//once the client is registered, subscription to retrieve request stream is always sent
+	// Only the client side of the retrieve request is registered.
+	// (light nodes do not serve retrieve requests)
+	// once the client is registered, subscription to retrieve request stream is always sent
 	RetrievalClientOnly
-	//Both client and server funcs are registered, subscribe sent automatically
+	// Both client and server funcs are registered, subscribe sent automatically
 	RetrievalEnabled
 )
 
@@ -86,18 +86,18 @@ type Registry struct {
 	peers          map[enode.ID]*Peer
 	delivery       *Delivery
 	intervalsStore state.Store
-	autoRetrieval  bool //automatically subscribe to retrieve request stream
+	autoRetrieval  bool // automatically subscribe to retrieve request stream
 	maxPeerServers int
-	spec           *protocols.Spec   //this protocol's spec
-	balance        protocols.Balance //implements protocols.Balance, for accounting
-	prices         protocols.Prices  //implements protocols.Prices, provides prices to accounting
+	balance        protocols.Balance // implements protocols.Balance, for accounting
+	prices         protocols.Prices  // implements protocols.Prices, provides prices to accounting
+	spec           *protocols.Spec   // this protocol's spec
 }
 
 // RegistryOptions holds optional values for NewRegistry constructor.
 type RegistryOptions struct {
 	SkipCheck       bool
-	Syncing         SyncingOption   //Defines syncing behavior
-	Retrieval       RetrievalOption //Defines retrieval behavior
+	Syncing         SyncingOption   // Defines syncing behavior
+	Retrieval       RetrievalOption // Defines retrieval behavior
 	SyncUpdateDelay time.Duration
 	MaxPeerServers  int // The limit of servers for each peer in registry
 }
@@ -110,7 +110,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
 	if options.SyncUpdateDelay <= 0 {
 		options.SyncUpdateDelay = 15 * time.Second
 	}
-	//check if retriaval has been disabled
+	// check if retrieval has been disabled
 	retrieval := options.Retrieval != RetrievalDisabled
 
 	streamer := &Registry{
@@ -130,7 +130,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
 	streamer.api = NewAPI(streamer)
 	delivery.getPeer = streamer.getPeer
 
-	//if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
+	// if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
 	if options.Retrieval == RetrievalEnabled {
 		streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
 			if !live {
@@ -140,20 +140,20 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
 		})
 	}
 
-	//if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
+	// if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
 	if options.Retrieval != RetrievalDisabled {
 		streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
 			return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
 		})
 	}
 
-	//If syncing is not disabled, the syncing functions are registered (both client and server)
+	// If syncing is not disabled, the syncing functions are registered (both client and server)
 	if options.Syncing != SyncingDisabled {
 		RegisterSwarmSyncerServer(streamer, syncChunkStore)
 		RegisterSwarmSyncerClient(streamer, syncChunkStore)
 	}
 
-	//if syncing is set to automatically subscribe to the syncing stream, start the subscription process
+	// if syncing is set to automatically subscribe to the syncing stream, start the subscription process
 	if options.Syncing == SyncingAutoSubscribe {
 		// latestIntC function ensures that
 		//   - receiving from the in chan is not blocked by processing inside the for loop
@@ -235,13 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
 	return streamer
 }
 
-//we need to construct a spec instance per node instance
+// This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
+// For simulations to be able to run multiple nodes and not override the hook's balance,
+// we need to construct a spec instance per node instance
 func (r *Registry) setupSpec() {
-	//first create the "bare" spec
+	// first create the "bare" spec
 	r.createSpec()
-	//if balance is nil, this node has been started without swap support (swapEnabled flag is false)
+	// now create the pricing object
+	r.createPriceOracle()
+	// if balance is nil, this node has been started without swap support (swapEnabled flag is false)
 	if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
-		//swap is enabled, so setup the hook
+		// swap is enabled, so setup the hook
 		r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
 	}
 }
@@ -533,11 +537,11 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
 		return p.handleWantedHashesMsg(ctx, msg)
 
 	case *ChunkDeliveryMsgRetrieval:
-		//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+		// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
 		return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
 
 	case *ChunkDeliveryMsgSyncing:
-		//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+		// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
 		return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
 
 	case *RetrieveRequestMsg:
@@ -726,9 +730,9 @@ func (c *clientParams) clientCreated() {
 	close(c.clientCreatedC)
 }
 
-//GetSpec returns the streamer spec to callers
-//This used to be a global variable but for simulations with
-//multiple nodes its fields (notably the Hook) would be overwritten
+// GetSpec returns the streamer spec to callers
+// This used to be a global variable but for simulations with
+// multiple nodes its fields (notably the Hook) would be overwritten
 func (r *Registry) GetSpec() *protocols.Spec {
 	return r.spec
 }
@@ -756,6 +760,52 @@ func (r *Registry) createSpec() {
 	r.spec = spec
 }
 
+// An accountable message needs some meta information attached to it
+// in order to evaluate the correct price
+type StreamerPrices struct {
+	priceMatrix map[reflect.Type]*protocols.Price
+	registry    *Registry
+}
+
+// Price implements the accounting interface and returns the price for a specific message
+func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
+	t := reflect.TypeOf(msg).Elem()
+	return sp.priceMatrix[t]
+}
+
+// Instead of hardcoding the price, get it
+// through a function - it could be quite complex in the future
+func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
+	return uint64(1)
+}
+
+// Instead of hardcoding the price, get it
+// through a function - it could be quite complex in the future
+func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
+	return uint64(1)
+}
+
+// createPriceOracle sets up a matrix which can be queried to get
+// the price for a message via the Price method
+func (r *Registry) createPriceOracle() {
+	sp := &StreamerPrices{
+		registry: r,
+	}
+	sp.priceMatrix = map[reflect.Type]*protocols.Price{
+		reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
+			Value:   sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
+			PerByte: true,
+			Payer:   protocols.Receiver,
+		},
+		reflect.TypeOf(RetrieveRequestMsg{}): {
+			Value:   sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
+			PerByte: false,
+			Payer:   protocols.Sender,
+		},
+	}
+	r.prices = sp
+}
+
 func (r *Registry) Protocols() []p2p.Protocol {
 	return []p2p.Protocol{
 		{
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 77fe55d34..e1b1c8286 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -921,3 +921,34 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
 		}
 	}
 }
+
+//TestHasPriceImplementation is to check that the Registry has a
+//`Price` interface implementation
+func TestHasPriceImplementation(t *testing.T) {
+	_, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+		Retrieval: RetrievalDisabled,
+		Syncing:   SyncingDisabled,
+	})
+	defer teardown()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if r.prices == nil {
+		t.Fatal("No prices implementation available for the stream protocol")
+	}
+
+	pricesInstance, ok := r.prices.(*StreamerPrices)
+	if !ok {
+		t.Fatal("`Registry` does not have the expected Prices instance")
+	}
+	price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{})
+	if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() {
+		t.Fatal("No prices set for chunk delivery msg")
+	}
+
+	price = pricesInstance.Price(&RetrieveRequestMsg{})
+	if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() {
+		t.Fatal("No prices set for chunk delivery msg")
+	}
+}
-- 
GitLab