From 38b1e8ee207da636d95bea760f577eca462307a6 Mon Sep 17 00:00:00 2001
From: gluk256 <gluk256@users.noreply.github.com>
Date: Thu, 21 Dec 2017 16:17:27 +0200
Subject: [PATCH] whisper/whisperv6: PoW requirement  (#15701)

New Whisper-level message introduced (PoW requirement),
corresponding logic added, plus some tests.
---
 whisper/whisperv6/doc.go          | 11 ++--
 whisper/whisperv6/peer.go         | 31 ++++++----
 whisper/whisperv6/peer_test.go    | 97 +++++++++++++++++++++++++------
 whisper/whisperv6/whisper.go      | 86 ++++++++++++++++++++++-----
 whisper/whisperv6/whisper_test.go | 14 ++---
 5 files changed, 185 insertions(+), 54 deletions(-)

diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go
index 64925ba48..2a4911d65 100644
--- a/whisper/whisperv6/doc.go
+++ b/whisper/whisperv6/doc.go
@@ -40,10 +40,13 @@ const (
 	ProtocolVersionStr = "6.0"
 	ProtocolName       = "shh"
 
-	statusCode           = 0 // used by whisper protocol
-	messagesCode         = 1 // normal whisper message
-	p2pCode              = 2 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
-	p2pRequestCode       = 3 // peer-to-peer message, used by Dapp protocol
+	// whisper protocol message codes, according to EIP-627
+	statusCode           = 0   // used by whisper protocol
+	messagesCode         = 1   // normal whisper message
+	powRequirementCode   = 2   // PoW requirement
+	bloomFilterExCode    = 3   // bloom filter exchange
+	p2pRequestCode       = 126 // peer-to-peer message, used by Dapp protocol
+	p2pMessageCode       = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
 	NumberOfMessageCodes = 128
 
 	paddingMask   = byte(3)
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go
index ffc39505e..65e0c77b0 100644
--- a/whisper/whisperv6/peer.go
+++ b/whisper/whisperv6/peer.go
@@ -18,6 +18,7 @@ package whisperv6
 
 import (
 	"fmt"
+	"math"
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -29,10 +30,12 @@ import (
 
 // peer represents a whisper protocol peer connection.
 type Peer struct {
-	host    *Whisper
-	peer    *p2p.Peer
-	ws      p2p.MsgReadWriter
-	trusted bool
+	host *Whisper
+	peer *p2p.Peer
+	ws   p2p.MsgReadWriter
+
+	trusted        bool
+	powRequirement float64
 
 	known *set.Set // Messages already known by the peer to avoid wasting bandwidth
 
@@ -42,12 +45,13 @@ type Peer struct {
 // newPeer creates a new whisper peer object, but does not run the handshake itself.
 func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
 	return &Peer{
-		host:    host,
-		peer:    remote,
-		ws:      rw,
-		trusted: false,
-		known:   set.New(),
-		quit:    make(chan struct{}),
+		host:           host,
+		peer:           remote,
+		ws:             rw,
+		trusted:        false,
+		powRequirement: 0.0,
+		known:          set.New(),
+		quit:           make(chan struct{}),
 	}
 }
 
@@ -152,7 +156,7 @@ func (p *Peer) broadcast() error {
 	envelopes := p.host.Envelopes()
 	bundle := make([]*Envelope, 0, len(envelopes))
 	for _, envelope := range envelopes {
-		if !p.marked(envelope) {
+		if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
 			bundle = append(bundle, envelope)
 		}
 	}
@@ -177,3 +181,8 @@ func (p *Peer) ID() []byte {
 	id := p.peer.ID()
 	return id[:]
 }
+
+func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
+	i := math.Float64bits(pow)
+	return p2p.Send(p.ws, powRequirementCode, i)
+}
diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go
index 39a4ab198..599a479be 100644
--- a/whisper/whisperv6/peer_test.go
+++ b/whisper/whisperv6/peer_test.go
@@ -88,21 +88,31 @@ var sharedKey []byte = []byte("some arbitrary data here")
 var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
 var expectedMessage []byte = []byte("per rectum ad astra")
 
-// This test does the following:
-// 1. creates a chain of whisper nodes,
-// 2. installs the filters with shared (predefined) parameters,
-// 3. each node sends a number of random (undecryptable) messages,
-// 4. first node sends one expected (decryptable) message,
-// 5. checks if each node have received and decrypted exactly one message.
 func TestSimulation(t *testing.T) {
+	// create a chain of whisper nodes,
+	// installs the filters with shared (predefined) parameters
 	initialize(t)
 
+	// each node sends a number of random (undecryptable) messages
 	for i := 0; i < NumNodes; i++ {
 		sendMsg(t, false, i)
 	}
 
+	// node #0 sends one expected (decryptable) message
 	sendMsg(t, true, 0)
-	checkPropagation(t)
+
+	// check if each node have received and decrypted exactly one message
+	checkPropagation(t, true)
+
+	// send protocol-level messages (powRequirementCode) and check the new PoW requirement values
+	powReqExchange(t)
+
+	// node #1 sends one expected (decryptable) message
+	sendMsg(t, true, 1)
+
+	// check if each node (except node #0) have received and decrypted exactly one message
+	checkPropagation(t, false)
+
 	stopServers()
 }
 
@@ -114,7 +124,7 @@ func initialize(t *testing.T) {
 	for i := 0; i < NumNodes; i++ {
 		var node TestNode
 		node.shh = New(&DefaultConfig)
-		node.shh.SetMinimumPoW(0.00000001)
+		node.shh.SetMinimumPowTest(0.00000001)
 		node.shh.Start(nil)
 		topics := make([]TopicType, 0)
 		topics = append(topics, sharedTopic)
@@ -154,13 +164,18 @@ func initialize(t *testing.T) {
 			},
 		}
 
-		err = node.server.Start()
-		if err != nil {
-			t.Fatalf("failed to start server %d.", i)
-		}
-
 		nodes[i] = &node
 	}
+
+	for i := 1; i < NumNodes; i++ {
+		go nodes[i].server.Start()
+	}
+
+	// we need to wait until the first node actually starts
+	err = nodes[0].server.Start()
+	if err != nil {
+		t.Fatalf("failed to start the fisrt server.")
+	}
 }
 
 func stopServers() {
@@ -174,18 +189,21 @@ func stopServers() {
 	}
 }
 
-func checkPropagation(t *testing.T) {
+func checkPropagation(t *testing.T, includingNodeZero bool) {
 	if t.Failed() {
 		return
 	}
 
-	const cycle = 100
-	const iterations = 100
+	const cycle = 50
+	const iterations = 200
 
-	for j := 0; j < iterations; j++ {
-		time.Sleep(cycle * time.Millisecond)
+	first := 0
+	if !includingNodeZero {
+		first = 1
+	}
 
-		for i := 0; i < NumNodes; i++ {
+	for j := 0; j < iterations; j++ {
+		for i := first; i < NumNodes; i++ {
 			f := nodes[i].shh.GetFilter(nodes[i].filerId)
 			if f == nil {
 				t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i)
@@ -200,9 +218,18 @@ func checkPropagation(t *testing.T) {
 				return
 			}
 		}
+
+		time.Sleep(cycle * time.Millisecond)
 	}
 
 	t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000)
+
+	if !includingNodeZero {
+		f := nodes[0].shh.GetFilter(nodes[0].filerId)
+		if f != nil {
+			t.Fatalf("node zero received a message with low PoW.")
+		}
+	}
 }
 
 func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
@@ -304,3 +331,35 @@ func TestPeerBasic(t *testing.T) {
 		t.Fatalf("failed mark with seed %d.", seed)
 	}
 }
+
+func powReqExchange(t *testing.T) {
+	for i, node := range nodes {
+		for peer := range node.shh.peers {
+			if peer.powRequirement > 1000.0 {
+				t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement)
+			}
+		}
+	}
+
+	const pow float64 = 7777777.0
+	nodes[0].shh.SetMinimumPoW(pow)
+
+	// wait until all the messages are delivered
+	time.Sleep(64 * time.Millisecond)
+
+	cnt := 0
+	for i, node := range nodes {
+		for peer := range node.shh.peers {
+			if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
+				cnt++
+				if peer.powRequirement != pow {
+					t.Fatalf("node %d: failed to set the new pow requirement.", i)
+				}
+			}
+		}
+	}
+
+	if cnt == 0 {
+		t.Fatalf("no matching peers found.")
+	}
+}
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 2cc1e64f5..492591486 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -22,6 +22,7 @@ import (
 	crand "crypto/rand"
 	"crypto/sha256"
 	"fmt"
+	"math"
 	"runtime"
 	"sync"
 	"time"
@@ -30,6 +31,7 @@ import (
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/rpc"
 	"github.com/syndtr/goleveldb/leveldb/errors"
 	"golang.org/x/crypto/pbkdf2"
@@ -74,6 +76,8 @@ type Whisper struct {
 
 	settings syncmap.Map // holds configuration settings that can be dynamically changed
 
+	reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
+
 	statsMu sync.Mutex // guard stats
 	stats   Statistics // Statistics of whisper node
 
@@ -87,14 +91,15 @@ func New(cfg *Config) *Whisper {
 	}
 
 	whisper := &Whisper{
-		privateKeys:  make(map[string]*ecdsa.PrivateKey),
-		symKeys:      make(map[string][]byte),
-		envelopes:    make(map[common.Hash]*Envelope),
-		expirations:  make(map[uint32]*set.SetNonTS),
-		peers:        make(map[*Peer]struct{}),
-		messageQueue: make(chan *Envelope, messageQueueLimit),
-		p2pMsgQueue:  make(chan *Envelope, messageQueueLimit),
-		quit:         make(chan struct{}),
+		privateKeys:       make(map[string]*ecdsa.PrivateKey),
+		symKeys:           make(map[string][]byte),
+		envelopes:         make(map[common.Hash]*Envelope),
+		expirations:       make(map[uint32]*set.SetNonTS),
+		peers:             make(map[*Peer]struct{}),
+		messageQueue:      make(chan *Envelope, messageQueueLimit),
+		p2pMsgQueue:       make(chan *Envelope, messageQueueLimit),
+		quit:              make(chan struct{}),
+		reactionAllowance: SynchAllowance,
 	}
 
 	whisper.filters = NewFilters(whisper)
@@ -177,13 +182,50 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
 
 // SetMinimumPoW sets the minimal PoW required by this node
 func (w *Whisper) SetMinimumPoW(val float64) error {
-	if val <= 0.0 {
+	if val < 0.0 {
 		return fmt.Errorf("invalid PoW: %f", val)
 	}
-	w.settings.Store(minPowIdx, val)
+
+	w.notifyPeersAboutPowRequirementChange(val)
+
+	go func() {
+		// allow some time before all the peers have processed the notification
+		time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
+		w.settings.Store(minPowIdx, val)
+	}()
+
 	return nil
 }
 
+// SetMinimumPoW sets the minimal PoW in test environment
+func (w *Whisper) SetMinimumPowTest(val float64) {
+	w.notifyPeersAboutPowRequirementChange(val)
+	w.settings.Store(minPowIdx, val)
+}
+
+func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
+	arr := make([]*Peer, len(w.peers))
+	i := 0
+
+	w.peerMu.Lock()
+	for p := range w.peers {
+		arr[i] = p
+		i++
+	}
+	w.peerMu.Unlock()
+
+	for _, p := range arr {
+		err := p.notifyAboutPowRequirementChange(pow)
+		if err != nil {
+			// allow one retry
+			err = p.notifyAboutPowRequirementChange(pow)
+		}
+		if err != nil {
+			log.Warn("oversized message received", "peer", p.ID(), "error", err)
+		}
+	}
+}
+
 // getPeer retrieves peer by ID
 func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
 	w.peerMu.Lock()
@@ -233,7 +275,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
 
 // SendP2PDirect sends a peer-to-peer message to a specific peer.
 func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
-	return p2p.Send(peer.ws, p2pCode, envelope)
+	return p2p.Send(peer.ws, p2pMessageCode, envelope)
 }
 
 // NewKeyPair generates a new cryptographic identity for the client, and injects
@@ -536,7 +578,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
 			if trouble {
 				return errors.New("invalid envelope")
 			}
-		case p2pCode:
+		case powRequirementCode:
+			s := rlp.NewStream(packet.Payload, uint64(packet.Size))
+			i, err := s.Uint()
+			if err != nil {
+				log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+				return errors.New("invalid powRequirementCode message")
+			}
+			f := math.Float64frombits(i)
+			if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
+				log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+				return errors.New("invalid value in powRequirementCode message")
+			}
+			p.powRequirement = f
+		case bloomFilterExCode:
+			// to be implemented
+		case p2pMessageCode:
 			// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
 			// this message is not supposed to be forwarded to other peers, and
 			// therefore might not satisfy the PoW, expiry and other requirements.
@@ -599,7 +656,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
 
 	if envelope.PoW() < wh.MinPow() {
 		log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
-		return false, nil // drop envelope without error
+		return false, nil // drop envelope without error for now
+
+		// once the status message includes the PoW requirement, an error should be returned here:
+		//return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
 	}
 
 	hash := envelope.Hash()
diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go
index c8f3a9ed7..b391a1161 100644
--- a/whisper/whisperv6/whisper_test.go
+++ b/whisper/whisperv6/whisper_test.go
@@ -472,8 +472,8 @@ func TestExpiry(t *testing.T) {
 	InitSingleTest()
 
 	w := New(&DefaultConfig)
-	w.SetMinimumPoW(0.0000001)
-	defer w.SetMinimumPoW(DefaultMinimumPoW)
+	w.SetMinimumPowTest(0.0000001)
+	defer w.SetMinimumPowTest(DefaultMinimumPoW)
 	w.Start(nil)
 	defer w.Stop()
 
@@ -529,7 +529,7 @@ func TestCustomization(t *testing.T) {
 	InitSingleTest()
 
 	w := New(&DefaultConfig)
-	defer w.SetMinimumPoW(DefaultMinimumPoW)
+	defer w.SetMinimumPowTest(DefaultMinimumPoW)
 	defer w.SetMaxMessageSize(DefaultMaxMessageSize)
 	w.Start(nil)
 	defer w.Stop()
@@ -563,7 +563,7 @@ func TestCustomization(t *testing.T) {
 		t.Fatalf("successfully sent envelope with PoW %.06f, false positive (seed %d).", env.PoW(), seed)
 	}
 
-	w.SetMinimumPoW(smallPoW / 2)
+	w.SetMinimumPowTest(smallPoW / 2)
 	err = w.Send(env)
 	if err != nil {
 		t.Fatalf("failed to send envelope with seed %d: %s.", seed, err)
@@ -625,7 +625,7 @@ func TestSymmetricSendCycle(t *testing.T) {
 	InitSingleTest()
 
 	w := New(&DefaultConfig)
-	defer w.SetMinimumPoW(DefaultMinimumPoW)
+	defer w.SetMinimumPowTest(DefaultMinimumPoW)
 	defer w.SetMaxMessageSize(DefaultMaxMessageSize)
 	w.Start(nil)
 	defer w.Stop()
@@ -714,7 +714,7 @@ func TestSymmetricSendWithoutAKey(t *testing.T) {
 	InitSingleTest()
 
 	w := New(&DefaultConfig)
-	defer w.SetMinimumPoW(DefaultMinimumPoW)
+	defer w.SetMinimumPowTest(DefaultMinimumPoW)
 	defer w.SetMaxMessageSize(DefaultMaxMessageSize)
 	w.Start(nil)
 	defer w.Stop()
@@ -782,7 +782,7 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
 	InitSingleTest()
 
 	w := New(&DefaultConfig)
-	defer w.SetMinimumPoW(DefaultMinimumPoW)
+	defer w.SetMinimumPowTest(DefaultMinimumPoW)
 	defer w.SetMaxMessageSize(DefaultMaxMessageSize)
 	w.Start(nil)
 	defer w.Stop()
-- 
GitLab