From d96870428f116494d5190a8e595189e283dd144b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= <zsfelfoldi@gmail.com>
Date: Mon, 1 Mar 2021 10:24:20 +0100
Subject: [PATCH] les: UDP pre-negotiation of available server capacity
 (#22183)

This PR implements the first one of the "lespay" UDP queries which
is already useful in itself: the capacity query. The server pool is making
use of this query by doing a cheap UDP query to determine whether it is
worth starting the more expensive TCP connection process.
---
 common/prque/lazyqueue.go             |   5 +-
 common/prque/lazyqueue_test.go        |   2 +-
 les/client.go                         |  85 +++++++++--
 les/clientpool.go                     |  55 +++++++
 les/clientpool_test.go                |   6 +-
 les/enr_entry.go                      |   3 +-
 les/server.go                         |  29 +++-
 les/vflux/client/serverpool.go        | 105 +++++++++++--
 les/vflux/client/serverpool_test.go   |   8 +-
 les/vflux/requests.go                 | 180 +++++++++++++++++++++++
 les/vflux/server/balance.go           |  58 +++++---
 les/vflux/server/balance_test.go      |   4 +-
 les/vflux/server/prioritypool.go      | 202 +++++++++++++++++++++++---
 les/vflux/server/prioritypool_test.go | 126 +++++++++++++++-
 les/vflux/server/service.go           | 122 ++++++++++++++++
 p2p/discover/v5_udp.go                |  11 +-
 p2p/discover/v5_udp_test.go           |   2 +-
 p2p/nodestate/nodestate.go            |   1 +
 18 files changed, 915 insertions(+), 89 deletions(-)
 create mode 100644 les/vflux/requests.go
 create mode 100644 les/vflux/server/service.go

diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go
index 52403df46..c74faab7e 100644
--- a/common/prque/lazyqueue.go
+++ b/common/prque/lazyqueue.go
@@ -48,7 +48,7 @@ type LazyQueue struct {
 }
 
 type (
-	PriorityCallback    func(data interface{}, now mclock.AbsTime) int64   // actual priority callback
+	PriorityCallback    func(data interface{}) int64                       // actual priority callback
 	MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
 )
 
@@ -139,11 +139,10 @@ func (q *LazyQueue) peekIndex() int {
 // Pop multiple times. Popped items are passed to the callback. MultiPop returns
 // when the callback returns false or there are no more items to pop.
 func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
-	now := q.clock.Now()
 	nextIndex := q.peekIndex()
 	for nextIndex != -1 {
 		data := heap.Pop(q.queue[nextIndex]).(*item).value
-		heap.Push(q.popQueue, &item{data, q.priority(data, now)})
+		heap.Push(q.popQueue, &item{data, q.priority(data)})
 		nextIndex = q.peekIndex()
 		for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
 			i := heap.Pop(q.popQueue).(*item)
diff --git a/common/prque/lazyqueue_test.go b/common/prque/lazyqueue_test.go
index be9491e24..9a831d628 100644
--- a/common/prque/lazyqueue_test.go
+++ b/common/prque/lazyqueue_test.go
@@ -40,7 +40,7 @@ type lazyItem struct {
 	index   int
 }
 
-func testPriority(a interface{}, now mclock.AbsTime) int64 {
+func testPriority(a interface{}) int64 {
 	return a.(*lazyItem).p
 }
 
diff --git a/les/client.go b/les/client.go
index 4d07f844f..ecabfdf50 100644
--- a/les/client.go
+++ b/les/client.go
@@ -36,30 +36,33 @@ import (
 	"github.com/ethereum/go-ethereum/eth/gasprice"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/internal/ethapi"
+	"github.com/ethereum/go-ethereum/les/vflux"
 	vfc "github.com/ethereum/go-ethereum/les/vflux/client"
 	"github.com/ethereum/go-ethereum/light"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/enode"
+	"github.com/ethereum/go-ethereum/p2p/enr"
 	"github.com/ethereum/go-ethereum/params"
+	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/rpc"
 )
 
 type LightEthereum struct {
 	lesCommons
 
-	peers          *serverPeerSet
-	reqDist        *requestDistributor
-	retriever      *retrieveManager
-	odr            *LesOdr
-	relay          *lesTxRelay
-	handler        *clientHandler
-	txPool         *light.TxPool
-	blockchain     *light.LightChain
-	serverPool     *vfc.ServerPool
-	dialCandidates enode.Iterator
-	pruner         *pruner
+	peers              *serverPeerSet
+	reqDist            *requestDistributor
+	retriever          *retrieveManager
+	odr                *LesOdr
+	relay              *lesTxRelay
+	handler            *clientHandler
+	txPool             *light.TxPool
+	blockchain         *light.LightChain
+	serverPool         *vfc.ServerPool
+	serverPoolIterator enode.Iterator
+	pruner             *pruner
 
 	bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
 	bloomIndexer  *core.ChainIndexer             // Bloom indexer operating during block imports
@@ -112,7 +115,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 		p2pConfig:      &stack.Config().P2P,
 	}
 
-	leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
+	leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, leth.prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
 	leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)
 
 	leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
@@ -189,6 +192,62 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 	return leth, nil
 }
 
+// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
+func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
+	reqsEnc, _ := rlp.EncodeToBytes(&reqs)
+	repliesEnc, _ := s.p2pServer.DiscV5.TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc)
+	var replies vflux.Replies
+	if len(repliesEnc) == 0 || rlp.DecodeBytes(repliesEnc, &replies) != nil {
+		return nil
+	}
+	return replies
+}
+
+// vfxVersion returns the version number of the "les" service subdomain of the vflux UDP
+// service, as advertised in the ENR record
+func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
+	if n.Seq() == 0 {
+		var err error
+		if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
+			s.serverPool.Persist(n)
+		} else {
+			return 0
+		}
+	}
+
+	var les []rlp.RawValue
+	if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 1 {
+		return 0
+	}
+	var version uint
+	rlp.DecodeBytes(les[0], &version) // Ignore additional fields (for forward compatibility).
+	return version
+}
+
+// prenegQuery sends a capacity query to the given server node to determine whether
+// a connection slot is immediately available
+func (s *LightEthereum) prenegQuery(n *enode.Node) int {
+	if s.vfxVersion(n) < 1 {
+		// UDP query not supported, always try TCP connection
+		return 1
+	}
+
+	var requests vflux.Requests
+	requests.Add("les", vflux.CapacityQueryName, vflux.CapacityQueryReq{
+		Bias:      180,
+		AddTokens: []vflux.IntOrInf{{}},
+	})
+	replies := s.VfluxRequest(n, requests)
+	var cqr vflux.CapacityQueryReply
+	if replies.Get(0, &cqr) != nil || len(cqr) != 1 { // Note: Get returns an error if replies is nil
+		return -1
+	}
+	if cqr[0] > 0 {
+		return 1
+	}
+	return 0
+}
+
 type LightDummyAPI struct{}
 
 // Etherbase is the address that mining rewards will be send to
@@ -269,7 +328,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
 			return p.Info()
 		}
 		return nil
-	}, s.dialCandidates)
+	}, s.serverPoolIterator)
 }
 
 // Start implements node.Lifecycle, starting all internal goroutines needed by the
diff --git a/les/clientpool.go b/les/clientpool.go
index 4e1499bf5..1aa63a281 100644
--- a/les/clientpool.go
+++ b/les/clientpool.go
@@ -24,11 +24,13 @@ import (
 	"github.com/ethereum/go-ethereum/common/mclock"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/les/utils"
+	"github.com/ethereum/go-ethereum/les/vflux"
 	vfs "github.com/ethereum/go-ethereum/les/vflux/server"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/enr"
 	"github.com/ethereum/go-ethereum/p2p/nodestate"
+	"github.com/ethereum/go-ethereum/rlp"
 )
 
 const (
@@ -382,3 +384,56 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
 		}
 	}
 }
+
+// serveCapQuery serves a vflux capacity query. It receives multiple token amount values
+// and a bias time value. For each given token amount it calculates the maximum achievable
+// capacity in case the amount is added to the balance.
+func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
+	var req vflux.CapacityQueryReq
+	if rlp.DecodeBytes(data, &req) != nil {
+		return nil
+	}
+	if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
+		return nil
+	}
+	node := f.ns.GetNode(id)
+	if node == nil {
+		node = enode.SignNull(&enr.Record{}, id)
+	}
+	c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
+	if c == nil {
+		c = &clientInfo{node: node}
+		f.ns.SetField(node, clientInfoField, c)
+		f.ns.SetField(node, connAddressField, freeID)
+		defer func() {
+			f.ns.SetField(node, connAddressField, nil)
+			f.ns.SetField(node, clientInfoField, nil)
+		}()
+		if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
+			log.Error("BalanceField is missing", "node", node.ID())
+			return nil
+		}
+	}
+	// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
+	curve := f.pp.GetCapacityCurve().Exclude(id)
+	result := make(vflux.CapacityQueryReply, len(req.AddTokens))
+	bias := time.Second * time.Duration(req.Bias)
+	if f.connectedBias > bias {
+		bias = f.connectedBias
+	}
+	pb, _ := c.balance.GetBalance()
+	for i, addTokens := range req.AddTokens {
+		add := addTokens.Int64()
+		result[i] = curve.MaxCapacity(func(capacity uint64) int64 {
+			return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity)
+		})
+		if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap {
+			result[i] = f.minCap
+		}
+		if result[i] < f.minCap {
+			result[i] = 0
+		}
+	}
+	reply, _ := rlp.EncodeToBytes(&result)
+	return reply
+}
diff --git a/les/clientpool_test.go b/les/clientpool_test.go
index 5cff01040..345b373b0 100644
--- a/les/clientpool_test.go
+++ b/les/clientpool_test.go
@@ -508,8 +508,10 @@ func TestNegativeBalanceCalculation(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		pool.disconnect(newPoolTestPeer(i, nil))
 		_, nb := getBalance(pool, newPoolTestPeer(i, nil))
-		if checkDiff(nb, uint64(time.Minute)/1000) {
-			t.Fatalf("Negative balance mismatch, want %v, got %v", uint64(time.Minute)/1000, nb)
+		exp := uint64(time.Minute) / 1000
+		exp -= exp / 120 // correct for negative balance expiration
+		if checkDiff(nb, exp) {
+			t.Fatalf("Negative balance mismatch, want %v, got %v", exp, nb)
 		}
 	}
 }
diff --git a/les/enr_entry.go b/les/enr_entry.go
index 1e56c1f17..8be4a7a00 100644
--- a/les/enr_entry.go
+++ b/les/enr_entry.go
@@ -27,7 +27,8 @@ import (
 // lesEntry is the "les" ENR entry. This is set for LES servers only.
 type lesEntry struct {
 	// Ignore additional fields (for forward compatibility).
-	_ []rlp.RawValue `rlp:"tail"`
+	VfxVersion uint
+	Rest       []rlp.RawValue `rlp:"tail"`
 }
 
 func (lesEntry) ENRKey() string { return "les" }
diff --git a/les/server.go b/les/server.go
index 359784cf7..63feaf892 100644
--- a/les/server.go
+++ b/les/server.go
@@ -26,6 +26,7 @@ import (
 	"github.com/ethereum/go-ethereum/eth/ethconfig"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/les/flowcontrol"
+	"github.com/ethereum/go-ethereum/les/vflux"
 	vfs "github.com/ethereum/go-ethereum/les/vflux/server"
 	"github.com/ethereum/go-ethereum/light"
 	"github.com/ethereum/go-ethereum/log"
@@ -68,6 +69,7 @@ type LesServer struct {
 	archiveMode bool // Flag whether the ethereum node runs in archive mode.
 	handler     *serverHandler
 	broadcaster *broadcaster
+	vfluxServer *vfs.Server
 	privateKey  *ecdsa.PrivateKey
 
 	// Flow control and capacity management
@@ -112,12 +114,14 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
 		ns:           ns,
 		archiveMode:  e.ArchiveMode(),
 		broadcaster:  newBroadcaster(ns),
+		vfluxServer:  vfs.NewServer(time.Millisecond * 10),
 		fcManager:    flowcontrol.NewClientManager(nil, &mclock.System{}),
 		servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
 		threadsBusy:  config.LightServ/100 + 1,
 		threadsIdle:  threads,
 		p2pSrv:       node.Server(),
 	}
+	srv.vfluxServer.Register(srv)
 	issync := e.Synced
 	if config.LightNoSyncServe {
 		issync = func() bool { return true }
@@ -201,7 +205,9 @@ func (s *LesServer) Protocols() []p2p.Protocol {
 	}, nil)
 	// Add "les" ENR entries.
 	for i := range ps {
-		ps[i].Attributes = []enr.Entry{&lesEntry{}}
+		ps[i].Attributes = []enr.Entry{&lesEntry{
+			VfxVersion: 1,
+		}}
 	}
 	return ps
 }
@@ -211,10 +217,11 @@ func (s *LesServer) Start() error {
 	s.privateKey = s.p2pSrv.PrivateKey
 	s.broadcaster.setSignerKey(s.privateKey)
 	s.handler.start()
-
 	s.wg.Add(1)
 	go s.capacityManagement()
-
+	if s.p2pSrv.DiscV5 != nil {
+		s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
+	}
 	return nil
 }
 
@@ -228,6 +235,7 @@ func (s *LesServer) Stop() error {
 	s.costTracker.stop()
 	s.handler.stop()
 	s.servingQueue.stop()
+	s.vfluxServer.Stop()
 
 	// Note, bloom trie indexer is closed by parent bloombits indexer.
 	s.chtIndexer.Close()
@@ -311,3 +319,18 @@ func (s *LesServer) dropClient(id enode.ID) {
 		p.Peer.Disconnect(p2p.DiscRequested)
 	}
 }
+
+// ServiceInfo implements vfs.Service
+func (s *LesServer) ServiceInfo() (string, string) {
+	return "les", "Ethereum light client service"
+}
+
+// Handle implements vfs.Service
+func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte {
+	switch name {
+	case vflux.CapacityQueryName:
+		return s.clientPool.serveCapQuery(id, address, data)
+	default:
+		return nil
+	}
+}
diff --git a/les/vflux/client/serverpool.go b/les/vflux/client/serverpool.go
index 95f724609..47ec4fee7 100644
--- a/les/vflux/client/serverpool.go
+++ b/les/vflux/client/serverpool.go
@@ -94,7 +94,7 @@ type nodeHistoryEnc struct {
 type queryFunc func(*enode.Node) int
 
 var (
-	clientSetup        = &nodestate.Setup{Version: 1}
+	clientSetup        = &nodestate.Setup{Version: 2}
 	sfHasValue         = clientSetup.NewPersistentFlag("hasValue")
 	sfQueried          = clientSetup.NewFlag("queried")
 	sfCanDial          = clientSetup.NewFlag("canDial")
@@ -131,9 +131,25 @@ var (
 	)
 	sfiNodeWeight     = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
 	sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{}))
+	sfiLocalAddress   = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}),
+		func(field interface{}) ([]byte, error) {
+			if enr, ok := field.(*enr.Record); ok {
+				enc, err := rlp.EncodeToBytes(enr)
+				return enc, err
+			}
+			return nil, errors.New("invalid field type")
+		},
+		func(enc []byte) (interface{}, error) {
+			var enr enr.Record
+			if err := rlp.DecodeBytes(enc, &enr); err != nil {
+				return nil, err
+			}
+			return &enr, nil
+		},
+	)
 )
 
-// newServerPool creates a new server pool
+// NewServerPool creates a new server pool
 func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) {
 	s := &ServerPool{
 		db:           db,
@@ -151,15 +167,10 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio
 	s.mixSources = append(s.mixSources, knownSelector)
 	s.mixSources = append(s.mixSources, alwaysConnect)
 
-	iter := enode.Iterator(s.mixer)
+	s.dialIterator = s.mixer
 	if query != nil {
-		iter = s.addPreNegFilter(iter, query)
+		s.dialIterator = s.addPreNegFilter(s.dialIterator, query)
 	}
-	s.dialIterator = enode.Filter(iter, func(node *enode.Node) bool {
-		s.ns.SetState(node, sfDialing, sfCanDial, 0)
-		s.ns.SetState(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
-		return true
-	})
 
 	s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
 		if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
@@ -169,7 +180,41 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio
 		}
 	})
 
-	return s, s.dialIterator
+	return s, &serverPoolIterator{
+		dialIterator: s.dialIterator,
+		nextFn: func(node *enode.Node) {
+			s.ns.Operation(func() {
+				s.ns.SetStateSub(node, sfDialing, sfCanDial, 0)
+				s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
+			})
+		},
+		nodeFn: s.DialNode,
+	}
+}
+
+type serverPoolIterator struct {
+	dialIterator enode.Iterator
+	nextFn       func(*enode.Node)
+	nodeFn       func(*enode.Node) *enode.Node
+}
+
+// Next implements enode.Iterator
+func (s *serverPoolIterator) Next() bool {
+	if s.dialIterator.Next() {
+		s.nextFn(s.dialIterator.Node())
+		return true
+	}
+	return false
+}
+
+// Node implements enode.Iterator
+func (s *serverPoolIterator) Node() *enode.Node {
+	return s.nodeFn(s.dialIterator.Node())
+}
+
+// Close implements enode.Iterator
+func (s *serverPoolIterator) Close() {
+	s.dialIterator.Close()
 }
 
 // AddMetrics adds metrics to the server pool. Should be called before Start().
@@ -285,7 +330,6 @@ func (s *ServerPool) Start() {
 
 // stop stops the server pool
 func (s *ServerPool) Stop() {
-	s.dialIterator.Close()
 	if s.fillSet != nil {
 		s.fillSet.Close()
 	}
@@ -299,18 +343,23 @@ func (s *ServerPool) Stop() {
 	s.vt.Stop()
 }
 
-// registerPeer implements serverPeerSubscriber
+// RegisterNode implements serverPeerSubscriber
 func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) {
 	if atomic.LoadUint32(&s.started) == 0 {
 		return nil, errors.New("server pool not started yet")
 	}
-	s.ns.SetState(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
 	nvt := s.vt.Register(node.ID())
-	s.ns.SetField(node, sfiConnectedStats, nvt.RtStats())
+	s.ns.Operation(func() {
+		s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
+		s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats())
+		if node.IP().IsLoopback() {
+			s.ns.SetFieldSub(node, sfiLocalAddress, node.Record())
+		}
+	})
 	return nvt, nil
 }
 
-// unregisterPeer implements serverPeerSubscriber
+// UnregisterNode implements serverPeerSubscriber
 func (s *ServerPool) UnregisterNode(node *enode.Node) {
 	s.ns.Operation(func() {
 		s.setRedialWait(node, dialCost, dialWaitStep)
@@ -430,6 +479,7 @@ func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDia
 		s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
 		s.ns.SetFieldSub(node, sfiNodeWeight, nil)
 		s.ns.SetFieldSub(node, sfiNodeHistory, nil)
+		s.ns.SetFieldSub(node, sfiLocalAddress, nil)
 	}
 	s.ns.Persist(node) // saved if node history or hasValue changed
 }
@@ -520,3 +570,28 @@ func (s *ServerPool) calculateWeight(node *enode.Node) {
 func (s *ServerPool) API() *PrivateClientAPI {
 	return NewPrivateClientAPI(s.vt)
 }
+
+type dummyIdentity enode.ID
+
+func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
+func (id dummyIdentity) NodeAddr(r *enr.Record) []byte          { return id[:] }
+
+// DialNode replaces the given enode with a locally generated one containing the ENR
+// stored in the sfiLocalAddress field if present. This workaround ensures that nodes
+// on the local network can be dialed at the local address if a connection has been
+// successfully established previously.
+// Note that NodeStateMachine always remembers the enode with the latest version of
+// the remote signed ENR. ENR filtering should be performed on that version while
+// dialNode should be used for dialing the node over TCP or UDP.
+func (s *ServerPool) DialNode(n *enode.Node) *enode.Node {
+	if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok {
+		n, _ := enode.New(dummyIdentity(n.ID()), enr)
+		return n
+	}
+	return n
+}
+
+// Persist immediately stores the state of a node in the node database
+func (s *ServerPool) Persist(n *enode.Node) {
+	s.ns.Persist(n)
+}
diff --git a/les/vflux/client/serverpool_test.go b/les/vflux/client/serverpool_test.go
index 3af3db95b..ee299618c 100644
--- a/les/vflux/client/serverpool_test.go
+++ b/les/vflux/client/serverpool_test.go
@@ -56,6 +56,7 @@ type ServerPoolTest struct {
 	preNeg, preNegFail   bool
 	vt                   *ValueTracker
 	sp                   *ServerPool
+	spi                  enode.Iterator
 	input                enode.Iterator
 	testNodes            []spTestNode
 	trusted              []string
@@ -148,7 +149,7 @@ func (s *ServerPoolTest) start() {
 		requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
 	}
 
-	s.sp, _ = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
+	s.sp, s.spi = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
 	s.sp.AddSource(s.input)
 	s.sp.validSchemes = enode.ValidSchemesForTesting
 	s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
@@ -176,6 +177,7 @@ func (s *ServerPoolTest) start() {
 func (s *ServerPoolTest) stop() {
 	close(s.quit)
 	s.sp.Stop()
+	s.spi.Close()
 	for i := range s.testNodes {
 		n := &s.testNodes[i]
 		if n.connected {
@@ -208,9 +210,9 @@ func (s *ServerPoolTest) run() {
 		if s.conn < spTestTarget {
 			s.dialCount++
 			s.beginWait()
-			s.sp.dialIterator.Next()
+			s.spi.Next()
 			s.endWait()
-			dial := s.sp.dialIterator.Node()
+			dial := s.spi.Node()
 			id := dial.ID()
 			idx := testNodeIndex(id)
 			n := &s.testNodes[idx]
diff --git a/les/vflux/requests.go b/les/vflux/requests.go
new file mode 100644
index 000000000..11255607e
--- /dev/null
+++ b/les/vflux/requests.go
@@ -0,0 +1,180 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package vflux
+
+import (
+	"errors"
+	"math"
+	"math/big"
+
+	"github.com/ethereum/go-ethereum/rlp"
+)
+
+var ErrNoReply = errors.New("no reply for given request")
+
+const (
+	MaxRequestLength    = 16 // max number of individual requests in a batch
+	CapacityQueryName   = "cq"
+	CapacityQueryMaxLen = 16
+)
+
+type (
+	// Request describes a single vflux request inside a batch. Service and request
+	// type are identified by strings, parameters are RLP encoded.
+	Request struct {
+		Service, Name string
+		Params        []byte
+	}
+	// Requests are a batch of vflux requests
+	Requests []Request
+
+	// Replies are the replies to a batch of requests
+	Replies [][]byte
+
+	// CapacityQueryReq is the encoding format of the capacity query
+	CapacityQueryReq struct {
+		Bias      uint64 // seconds
+		AddTokens []IntOrInf
+	}
+	// CapacityQueryReq is the encoding format of the response to the capacity query
+	CapacityQueryReply []uint64
+)
+
+// Add encodes and adds a new request to the batch
+func (r *Requests) Add(service, name string, val interface{}) (int, error) {
+	enc, err := rlp.EncodeToBytes(val)
+	if err != nil {
+		return -1, err
+	}
+	*r = append(*r, Request{
+		Service: service,
+		Name:    name,
+		Params:  enc,
+	})
+	return len(*r) - 1, nil
+}
+
+// Get decodes the reply to the i-th request in the batch
+func (r Replies) Get(i int, val interface{}) error {
+	if i < 0 || i >= len(r) {
+		return ErrNoReply
+	}
+	return rlp.DecodeBytes(r[i], val)
+}
+
+const (
+	IntNonNegative = iota
+	IntNegative
+	IntPlusInf
+	IntMinusInf
+)
+
+// IntOrInf is the encoding format for arbitrary length signed integers that can also
+// hold the values of +Inf or -Inf
+type IntOrInf struct {
+	Type  uint8
+	Value big.Int
+}
+
+// BigInt returns the value as a big.Int or panics if the value is infinity
+func (i *IntOrInf) BigInt() *big.Int {
+	switch i.Type {
+	case IntNonNegative:
+		return new(big.Int).Set(&i.Value)
+	case IntNegative:
+		return new(big.Int).Neg(&i.Value)
+	case IntPlusInf:
+		panic(nil) // caller should check Inf() before trying to convert to big.Int
+	case IntMinusInf:
+		panic(nil)
+	}
+	return &big.Int{} // invalid type decodes to 0 value
+}
+
+// Inf returns 1 if the value is +Inf, -1 if it is -Inf, 0 otherwise
+func (i *IntOrInf) Inf() int {
+	switch i.Type {
+	case IntPlusInf:
+		return 1
+	case IntMinusInf:
+		return -1
+	}
+	return 0 // invalid type decodes to 0 value
+}
+
+// Int64 limits the value between MinInt64 and MaxInt64 (even if it is +-Inf) and returns an int64 type
+func (i *IntOrInf) Int64() int64 {
+	switch i.Type {
+	case IntNonNegative:
+		if i.Value.IsInt64() {
+			return i.Value.Int64()
+		} else {
+			return math.MaxInt64
+		}
+	case IntNegative:
+		if i.Value.IsInt64() {
+			return -i.Value.Int64()
+		} else {
+			return math.MinInt64
+		}
+	case IntPlusInf:
+		return math.MaxInt64
+	case IntMinusInf:
+		return math.MinInt64
+	}
+	return 0 // invalid type decodes to 0 value
+}
+
+// SetBigInt sets the value to the given big.Int
+func (i *IntOrInf) SetBigInt(v *big.Int) {
+	if v.Sign() >= 0 {
+		i.Type = IntNonNegative
+		i.Value.Set(v)
+	} else {
+		i.Type = IntNegative
+		i.Value.Neg(v)
+	}
+}
+
+// SetInt64 sets the value to the given int64. Note that MaxInt64 translates to +Inf
+// while MinInt64 translates to -Inf.
+func (i *IntOrInf) SetInt64(v int64) {
+	if v >= 0 {
+		if v == math.MaxInt64 {
+			i.Type = IntPlusInf
+		} else {
+			i.Type = IntNonNegative
+			i.Value.SetInt64(v)
+		}
+	} else {
+		if v == math.MinInt64 {
+			i.Type = IntMinusInf
+		} else {
+			i.Type = IntNegative
+			i.Value.SetInt64(-v)
+		}
+	}
+}
+
+// SetInf sets the value to +Inf or -Inf
+func (i *IntOrInf) SetInf(sign int) {
+	if sign == 1 {
+		i.Type = IntPlusInf
+	} else {
+		i.Type = IntMinusInf
+	}
+}
diff --git a/les/vflux/server/balance.go b/les/vflux/server/balance.go
index f5073d0db..db12a5c57 100644
--- a/les/vflux/server/balance.go
+++ b/les/vflux/server/balance.go
@@ -243,11 +243,11 @@ func (n *NodeBalance) RequestServed(cost uint64) uint64 {
 }
 
 // Priority returns the actual priority based on the current balance
-func (n *NodeBalance) Priority(now mclock.AbsTime, capacity uint64) int64 {
+func (n *NodeBalance) Priority(capacity uint64) int64 {
 	n.lock.Lock()
 	defer n.lock.Unlock()
 
-	n.updateBalance(now)
+	n.updateBalance(n.bt.clock.Now())
 	return n.balanceToPriority(n.balance, capacity)
 }
 
@@ -256,16 +256,35 @@ func (n *NodeBalance) Priority(now mclock.AbsTime, capacity uint64) int64 {
 // in the current session.
 // If update is true then a priority callback is added that turns UpdateFlag on and off
 // in case the priority goes below the estimated minimum.
-func (n *NodeBalance) EstMinPriority(at mclock.AbsTime, capacity uint64, update bool) int64 {
+func (n *NodeBalance) EstimatePriority(capacity uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
 	n.lock.Lock()
 	defer n.lock.Unlock()
 
-	var avgReqCost float64
-	dt := time.Duration(n.lastUpdate - n.initTime)
-	if dt > time.Second {
-		avgReqCost = float64(n.sumReqCost) * 2 / float64(dt)
+	now := n.bt.clock.Now()
+	n.updateBalance(now)
+	b := n.balance
+	if addBalance != 0 {
+		offset := n.bt.posExp.LogOffset(now)
+		old := n.balance.pos.Value(offset)
+		if addBalance > 0 && (addBalance > maxBalance || old > maxBalance-uint64(addBalance)) {
+			b.pos = utils.ExpiredValue{}
+			b.pos.Add(maxBalance, offset)
+		} else {
+			b.pos.Add(addBalance, offset)
+		}
 	}
-	pri := n.balanceToPriority(n.reducedBalance(at, capacity, avgReqCost), capacity)
+	if future > 0 {
+		var avgReqCost float64
+		dt := time.Duration(n.lastUpdate - n.initTime)
+		if dt > time.Second {
+			avgReqCost = float64(n.sumReqCost) * 2 / float64(dt)
+		}
+		b = n.reducedBalance(b, now, future, capacity, avgReqCost)
+	}
+	if bias > 0 {
+		b = n.reducedBalance(b, now+mclock.AbsTime(future), bias, capacity, 0)
+	}
+	pri := n.balanceToPriority(b, capacity)
 	if update {
 		n.addCallback(balanceCallbackUpdate, pri, n.signalPriorityUpdate)
 	}
@@ -366,7 +385,7 @@ func (n *NodeBalance) deactivate() {
 // updateBalance updates balance based on the time factor
 func (n *NodeBalance) updateBalance(now mclock.AbsTime) {
 	if n.active && now > n.lastUpdate {
-		n.balance = n.reducedBalance(now, n.capacity, 0)
+		n.balance = n.reducedBalance(n.balance, n.lastUpdate, time.Duration(now-n.lastUpdate), n.capacity, 0)
 		n.lastUpdate = now
 	}
 }
@@ -546,23 +565,25 @@ func (n *NodeBalance) balanceToPriority(b balance, capacity uint64) int64 {
 }
 
 // reducedBalance estimates the reduced balance at a given time in the fututre based
-// on the current balance, the time factor and an estimated average request cost per time ratio
-func (n *NodeBalance) reducedBalance(at mclock.AbsTime, capacity uint64, avgReqCost float64) balance {
-	dt := float64(at - n.lastUpdate)
-	b := n.balance
+// on the given balance, the time factor and an estimated average request cost per time ratio
+func (n *NodeBalance) reducedBalance(b balance, start mclock.AbsTime, dt time.Duration, capacity uint64, avgReqCost float64) balance {
+	// since the costs are applied continuously during the dt time period we calculate
+	// the expiration offset at the middle of the period
+	at := start + mclock.AbsTime(dt/2)
+	dtf := float64(dt)
 	if !b.pos.IsZero() {
 		factor := n.posFactor.timePrice(capacity) + n.posFactor.RequestFactor*avgReqCost
-		diff := -int64(dt * factor)
+		diff := -int64(dtf * factor)
 		dd := b.pos.Add(diff, n.bt.posExp.LogOffset(at))
 		if dd == diff {
-			dt = 0
+			dtf = 0
 		} else {
-			dt += float64(dd) / factor
+			dtf += float64(dd) / factor
 		}
 	}
 	if dt > 0 {
 		factor := n.negFactor.timePrice(capacity) + n.negFactor.RequestFactor*avgReqCost
-		b.neg.Add(int64(dt*factor), n.bt.negExp.LogOffset(at))
+		b.neg.Add(int64(dtf*factor), n.bt.negExp.LogOffset(at))
 	}
 	return b
 }
@@ -588,8 +609,9 @@ func (n *NodeBalance) timeUntil(priority int64) (time.Duration, bool) {
 			}
 			dt = float64(posBalance-newBalance) / timePrice
 			return time.Duration(dt), true
+		} else {
+			dt = float64(posBalance) / timePrice
 		}
-		dt = float64(posBalance) / timePrice
 	} else {
 		if priority > 0 {
 			return 0, false
diff --git a/les/vflux/server/balance_test.go b/les/vflux/server/balance_test.go
index 6c817aa26..e22074db2 100644
--- a/les/vflux/server/balance_test.go
+++ b/les/vflux/server/balance_test.go
@@ -231,7 +231,7 @@ func TestBalanceToPriority(t *testing.T) {
 	}
 	for _, i := range inputs {
 		node.SetBalance(i.pos, i.neg)
-		priority := node.Priority(b.clock.Now(), 1000)
+		priority := node.Priority(1000)
 		if priority != i.priority {
 			t.Fatalf("Priority mismatch, want %v, got %v", i.priority, priority)
 		}
@@ -272,7 +272,7 @@ func TestEstimatedPriority(t *testing.T) {
 	for _, i := range inputs {
 		b.clock.Run(i.runTime)
 		node.RequestServed(i.reqCost)
-		priority := node.EstMinPriority(b.clock.Now()+mclock.AbsTime(i.futureTime), 1000000000, false)
+		priority := node.EstimatePriority(1000000000, 0, i.futureTime, 0, false)
 		if priority != i.priority {
 			t.Fatalf("Estimated priority mismatch, want %v, got %v", i.priority, priority)
 		}
diff --git a/les/vflux/server/prioritypool.go b/les/vflux/server/prioritypool.go
index e3327aba7..e940ac7c6 100644
--- a/les/vflux/server/prioritypool.go
+++ b/les/vflux/server/prioritypool.go
@@ -101,17 +101,21 @@ type PriorityPool struct {
 	minCap                 uint64
 	activeBias             time.Duration
 	capacityStepDiv        uint64
+
+	cachedCurve    *CapacityCurve
+	ccUpdatedAt    mclock.AbsTime
+	ccUpdateForced bool
 }
 
 // nodePriority interface provides current and estimated future priorities on demand
 type nodePriority interface {
 	// Priority should return the current priority of the node (higher is better)
-	Priority(now mclock.AbsTime, cap uint64) int64
+	Priority(cap uint64) int64
 	// EstMinPriority should return a lower estimate for the minimum of the node priority
 	// value starting from the current moment until the given time. If the priority goes
 	// under the returned estimate before the specified moment then it is the caller's
 	// responsibility to signal with updateFlag.
-	EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64
+	EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64
 }
 
 // ppNodeInfo is the internal node descriptor of PriorityPool
@@ -131,12 +135,12 @@ func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, cl
 		ns:                ns,
 		PriorityPoolSetup: setup,
 		clock:             clock,
-		activeQueue:       prque.NewLazyQueue(activeSetIndex, activePriority, activeMaxPriority, clock, lazyQueueRefresh),
 		inactiveQueue:     prque.New(inactiveSetIndex),
 		minCap:            minCap,
 		activeBias:        activeBias,
 		capacityStepDiv:   capacityStepDiv,
 	}
+	pp.activeQueue = prque.NewLazyQueue(activeSetIndex, activePriority, pp.activeMaxPriority, clock, lazyQueueRefresh)
 
 	ns.SubscribeField(pp.priorityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
 		if newValue != nil {
@@ -197,6 +201,9 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias
 	if targetCap < pp.minCap {
 		targetCap = pp.minCap
 	}
+	if bias < pp.activeBias {
+		bias = pp.activeBias
+	}
 	c, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo)
 	if c == nil {
 		log.Error("RequestCapacity called for unknown node", "id", node.ID())
@@ -204,9 +211,9 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias
 	}
 	var priority int64
 	if targetCap > c.capacity {
-		priority = c.nodePriority.EstMinPriority(pp.clock.Now()+mclock.AbsTime(bias), targetCap, false)
+		priority = c.nodePriority.EstimatePriority(targetCap, 0, 0, bias, false)
 	} else {
-		priority = c.nodePriority.Priority(pp.clock.Now(), targetCap)
+		priority = c.nodePriority.Priority(targetCap)
 	}
 	pp.markForChange(c)
 	pp.setCapacity(c, targetCap)
@@ -214,7 +221,7 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias
 	pp.activeQueue.Remove(c.activeIndex)
 	pp.inactiveQueue.Remove(c.inactiveIndex)
 	pp.activeQueue.Push(c)
-	minPriority = pp.enforceLimits()
+	_, minPriority = pp.enforceLimits()
 	// if capacity update is possible now then minPriority == math.MinInt64
 	// if it is not possible at all then minPriority == math.MaxInt64
 	allowed = priority > minPriority
@@ -281,29 +288,34 @@ func invertPriority(p int64) int64 {
 }
 
 // activePriority callback returns actual priority of ppNodeInfo item in activeQueue
-func activePriority(a interface{}, now mclock.AbsTime) int64 {
+func activePriority(a interface{}) int64 {
 	c := a.(*ppNodeInfo)
 	if c.forced {
 		return math.MinInt64
 	}
 	if c.bias == 0 {
-		return invertPriority(c.nodePriority.Priority(now, c.capacity))
+		return invertPriority(c.nodePriority.Priority(c.capacity))
+	} else {
+		return invertPriority(c.nodePriority.EstimatePriority(c.capacity, 0, 0, c.bias, true))
 	}
-	return invertPriority(c.nodePriority.EstMinPriority(now+mclock.AbsTime(c.bias), c.capacity, true))
 }
 
 // activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue
-func activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
+func (pp *PriorityPool) activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
 	c := a.(*ppNodeInfo)
 	if c.forced {
 		return math.MinInt64
 	}
-	return invertPriority(c.nodePriority.EstMinPriority(until+mclock.AbsTime(c.bias), c.capacity, false))
+	future := time.Duration(until - pp.clock.Now())
+	if future < 0 {
+		future = 0
+	}
+	return invertPriority(c.nodePriority.EstimatePriority(c.capacity, 0, future, c.bias, false))
 }
 
 // inactivePriority callback returns actual priority of ppNodeInfo item in inactiveQueue
 func (pp *PriorityPool) inactivePriority(p *ppNodeInfo) int64 {
-	return p.nodePriority.Priority(pp.clock.Now(), pp.minCap)
+	return p.nodePriority.Priority(pp.minCap)
 }
 
 // connectedNode is called when a new node has been added to the pool (InactiveFlag set)
@@ -379,16 +391,19 @@ func (pp *PriorityPool) setCapacity(n *ppNodeInfo, cap uint64) {
 // enforceLimits enforces active node count and total capacity limits. It returns the
 // lowest active node priority. Note that this function is performed on the temporary
 // internal state.
-func (pp *PriorityPool) enforceLimits() int64 {
+func (pp *PriorityPool) enforceLimits() (*ppNodeInfo, int64) {
 	if pp.activeCap <= pp.maxCap && pp.activeCount <= pp.maxCount {
-		return math.MinInt64
+		return nil, math.MinInt64
 	}
-	var maxActivePriority int64
+	var (
+		c                 *ppNodeInfo
+		maxActivePriority int64
+	)
 	pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool {
-		c := data.(*ppNodeInfo)
+		c = data.(*ppNodeInfo)
 		pp.markForChange(c)
 		maxActivePriority = priority
-		if c.capacity == pp.minCap {
+		if c.capacity == pp.minCap || pp.activeCount > pp.maxCount {
 			pp.setCapacity(c, 0)
 		} else {
 			sub := c.capacity / pp.capacityStepDiv
@@ -400,7 +415,7 @@ func (pp *PriorityPool) enforceLimits() int64 {
 		}
 		return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
 	})
-	return invertPriority(maxActivePriority)
+	return c, invertPriority(maxActivePriority)
 }
 
 // finalizeChanges either commits or reverts temporary changes. The necessary capacity
@@ -430,6 +445,9 @@ func (pp *PriorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
 		c.origCap = 0
 	}
 	pp.changed = nil
+	if commit {
+		pp.ccUpdateForced = true
+	}
 	return
 }
 
@@ -472,6 +490,7 @@ func (pp *PriorityPool) tryActivate() []capUpdate {
 			break
 		}
 	}
+	pp.ccUpdateForced = true
 	return pp.finalizeChanges(commit)
 }
 
@@ -500,3 +519,150 @@ func (pp *PriorityPool) updatePriority(node *enode.Node) {
 	}
 	updates = pp.tryActivate()
 }
+
+// CapacityCurve is a snapshot of the priority pool contents in a format that can efficiently
+// estimate how much capacity could be granted to a given node at a given priority level.
+type CapacityCurve struct {
+	points       []curvePoint       // curve points sorted in descending order of priority
+	index        map[enode.ID][]int // curve point indexes belonging to each node
+	exclude      []int              // curve point indexes of excluded node
+	excludeFirst bool               // true if activeCount == maxCount
+}
+
+type curvePoint struct {
+	freeCap uint64 // available capacity and node count at the current priority level
+	nextPri int64  // next priority level where more capacity will be available
+}
+
+// GetCapacityCurve returns a new or recently cached CapacityCurve based on the contents of the pool
+func (pp *PriorityPool) GetCapacityCurve() *CapacityCurve {
+	pp.lock.Lock()
+	defer pp.lock.Unlock()
+
+	now := pp.clock.Now()
+	dt := time.Duration(now - pp.ccUpdatedAt)
+	if !pp.ccUpdateForced && pp.cachedCurve != nil && dt < time.Second*10 {
+		return pp.cachedCurve
+	}
+
+	pp.ccUpdateForced = false
+	pp.ccUpdatedAt = now
+	curve := &CapacityCurve{
+		index: make(map[enode.ID][]int),
+	}
+	pp.cachedCurve = curve
+
+	var excludeID enode.ID
+	excludeFirst := pp.maxCount == pp.activeCount
+	// reduce node capacities or remove nodes until nothing is left in the queue;
+	// record the available capacity and the necessary priority after each step
+	for pp.activeCap > 0 {
+		cp := curvePoint{}
+		if pp.activeCap > pp.maxCap {
+			log.Error("Active capacity is greater than allowed maximum", "active", pp.activeCap, "maximum", pp.maxCap)
+		} else {
+			cp.freeCap = pp.maxCap - pp.activeCap
+		}
+		// temporarily increase activeCap to enforce reducing or removing a node capacity
+		tempCap := cp.freeCap + 1
+		pp.activeCap += tempCap
+		var next *ppNodeInfo
+		// enforceLimits removes the lowest priority node if it has minimal capacity,
+		// otherwise reduces its capacity
+		next, cp.nextPri = pp.enforceLimits()
+		pp.activeCap -= tempCap
+		if next == nil {
+			log.Error("GetCapacityCurve: cannot remove next element from the priority queue")
+			break
+		}
+		id := next.node.ID()
+		if excludeFirst {
+			// if the node count limit is already reached then mark the node with the
+			// lowest priority for exclusion
+			curve.excludeFirst = true
+			excludeID = id
+			excludeFirst = false
+		}
+		// multiple curve points and therefore multiple indexes may belong to a node
+		// if it was removed in multiple steps (if its capacity was more than the minimum)
+		curve.index[id] = append(curve.index[id], len(curve.points))
+		curve.points = append(curve.points, cp)
+	}
+	// restore original state of the queue
+	pp.finalizeChanges(false)
+	curve.points = append(curve.points, curvePoint{
+		freeCap: pp.maxCap,
+		nextPri: math.MaxInt64,
+	})
+	if curve.excludeFirst {
+		curve.exclude = curve.index[excludeID]
+	}
+	return curve
+}
+
+// Exclude returns a CapacityCurve with the given node excluded from the original curve
+func (cc *CapacityCurve) Exclude(id enode.ID) *CapacityCurve {
+	if exclude, ok := cc.index[id]; ok {
+		// return a new version of the curve (only one excluded node can be selected)
+		// Note: if the first node was excluded by default (excludeFirst == true) then
+		// we can forget about that and exclude the node with the given id instead.
+		return &CapacityCurve{
+			points:  cc.points,
+			index:   cc.index,
+			exclude: exclude,
+		}
+	}
+	return cc
+}
+
+func (cc *CapacityCurve) getPoint(i int) curvePoint {
+	cp := cc.points[i]
+	if i == 0 && cc.excludeFirst {
+		cp.freeCap = 0
+		return cp
+	}
+	for ii := len(cc.exclude) - 1; ii >= 0; ii-- {
+		ei := cc.exclude[ii]
+		if ei < i {
+			break
+		}
+		e1, e2 := cc.points[ei], cc.points[ei+1]
+		cp.freeCap += e2.freeCap - e1.freeCap
+	}
+	return cp
+}
+
+// MaxCapacity calculates the maximum capacity available for a node with a given
+// (monotonically decreasing) priority vs. capacity function. Note that if the requesting
+// node is already in the pool then it should be excluded from the curve in order to get
+// the correct result.
+func (cc *CapacityCurve) MaxCapacity(priority func(cap uint64) int64) uint64 {
+	min, max := 0, len(cc.points)-1 // the curve always has at least one point
+	for min < max {
+		mid := (min + max) / 2
+		cp := cc.getPoint(mid)
+		if cp.freeCap == 0 || priority(cp.freeCap) > cp.nextPri {
+			min = mid + 1
+		} else {
+			max = mid
+		}
+	}
+	cp2 := cc.getPoint(min)
+	if cp2.freeCap == 0 || min == 0 {
+		return cp2.freeCap
+	}
+	cp1 := cc.getPoint(min - 1)
+	if priority(cp2.freeCap) > cp1.nextPri {
+		return cp2.freeCap
+	}
+	minc, maxc := cp1.freeCap, cp2.freeCap-1
+	for minc < maxc {
+		midc := (minc + maxc + 1) / 2
+		if midc == 0 || priority(midc) > cp1.nextPri {
+			minc = midc
+		} else {
+			maxc = midc - 1
+		}
+	}
+	return maxc
+}
diff --git a/les/vflux/server/prioritypool_test.go b/les/vflux/server/prioritypool_test.go
index cbb3f5b37..d83ddc176 100644
--- a/les/vflux/server/prioritypool_test.go
+++ b/les/vflux/server/prioritypool_test.go
@@ -20,6 +20,7 @@ import (
 	"math/rand"
 	"reflect"
 	"testing"
+	"time"
 
 	"github.com/ethereum/go-ethereum/common/mclock"
 	"github.com/ethereum/go-ethereum/p2p/enode"
@@ -42,6 +43,7 @@ func init() {
 const (
 	testCapacityStepDiv      = 100
 	testCapacityToleranceDiv = 10
+	testMinCap               = 100
 )
 
 type ppTestClient struct {
@@ -49,11 +51,11 @@ type ppTestClient struct {
 	balance, cap uint64
 }
 
-func (c *ppTestClient) Priority(now mclock.AbsTime, cap uint64) int64 {
+func (c *ppTestClient) Priority(cap uint64) int64 {
 	return int64(c.balance / cap)
 }
 
-func (c *ppTestClient) EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64 {
+func (c *ppTestClient) EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
 	return int64(c.balance / cap)
 }
 
@@ -67,7 +69,7 @@ func TestPriorityPool(t *testing.T) {
 			c.cap = newValue.(uint64)
 		}
 	})
-	pp := NewPriorityPool(ns, ppTestSetup, clock, 100, 0, testCapacityStepDiv)
+	pp := NewPriorityPool(ns, ppTestSetup, clock, testMinCap, 0, testCapacityStepDiv)
 	ns.Start()
 	pp.SetLimits(100, 1000000)
 	clients := make([]*ppTestClient, 100)
@@ -94,7 +96,7 @@ func TestPriorityPool(t *testing.T) {
 	for i := range clients {
 		c := &ppTestClient{
 			node:    enode.SignNull(&enr.Record{}, enode.ID{byte(i)}),
-			balance: 1000000000,
+			balance: 100000000000,
 			cap:     1000,
 		}
 		sumBalance += c.balance
@@ -109,7 +111,7 @@ func TestPriorityPool(t *testing.T) {
 	for count := 0; count < 100; count++ {
 		c := clients[rand.Intn(len(clients))]
 		oldBalance := c.balance
-		c.balance = uint64(rand.Int63n(1000000000) + 1000000000)
+		c.balance = uint64(rand.Int63n(100000000000) + 100000000000)
 		sumBalance += c.balance - oldBalance
 		pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
 		pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
@@ -120,10 +122,124 @@ func TestPriorityPool(t *testing.T) {
 				raise(c)
 			}
 		}
+		// check whether capacities are proportional to balances
 		for _, c := range clients {
 			check(c)
 		}
+		if count%10 == 0 {
+			// test available capacity calculation with capacity curve
+			c = clients[rand.Intn(len(clients))]
+			curve := pp.GetCapacityCurve().Exclude(c.node.ID())
+
+			add := uint64(rand.Int63n(10000000000000))
+			c.balance += add
+			sumBalance += add
+			expCap := curve.MaxCapacity(func(cap uint64) int64 {
+				return int64(c.balance / cap)
+			})
+			//fmt.Println(expCap, c.balance, sumBalance)
+			/*for i, cp := range curve.points {
+				fmt.Println("cp", i, cp, "ex", curve.getPoint(i))
+			}*/
+			var ok bool
+			expFail := expCap + 1
+			if expFail < testMinCap {
+				expFail = testMinCap
+			}
+			ns.Operation(func() {
+				_, ok = pp.RequestCapacity(c.node, expFail, 0, true)
+			})
+			if ok {
+				t.Errorf("Request for more than expected available capacity succeeded")
+			}
+			if expCap >= testMinCap {
+				ns.Operation(func() {
+					_, ok = pp.RequestCapacity(c.node, expCap, 0, true)
+				})
+				if !ok {
+					t.Errorf("Request for expected available capacity failed")
+				}
+			}
+			c.balance -= add
+			sumBalance -= add
+			pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
+			pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
+			for _, c := range clients {
+				raise(c)
+			}
+		}
 	}
 
 	ns.Stop()
 }
+
+func TestCapacityCurve(t *testing.T) {
+	clock := &mclock.Simulated{}
+	ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
+	pp := NewPriorityPool(ns, ppTestSetup, clock, 400000, 0, 2)
+	ns.Start()
+	pp.SetLimits(10, 10000000)
+	clients := make([]*ppTestClient, 10)
+
+	for i := range clients {
+		c := &ppTestClient{
+			node:    enode.SignNull(&enr.Record{}, enode.ID{byte(i)}),
+			balance: 100000000000 * uint64(i+1),
+			cap:     1000000,
+		}
+		clients[i] = c
+		ns.SetState(c.node, ppTestClientFlag, nodestate.Flags{}, 0)
+		ns.SetField(c.node, ppTestSetup.priorityField, c)
+		ns.SetState(c.node, ppTestSetup.InactiveFlag, nodestate.Flags{}, 0)
+		ns.Operation(func() {
+			pp.RequestCapacity(c.node, c.cap, 0, true)
+		})
+	}
+
+	curve := pp.GetCapacityCurve()
+	check := func(balance, expCap uint64) {
+		cap := curve.MaxCapacity(func(cap uint64) int64 {
+			return int64(balance / cap)
+		})
+		var fail bool
+		if cap == 0 || expCap == 0 {
+			fail = cap != expCap
+		} else {
+			pri := balance / cap
+			expPri := balance / expCap
+			fail = pri != expPri && pri != expPri+1
+		}
+		if fail {
+			t.Errorf("Incorrect capacity for %d balance (got %d, expected %d)", balance, cap, expCap)
+		}
+	}
+
+	check(0, 0)
+	check(10000000000, 100000)
+	check(50000000000, 500000)
+	check(100000000000, 1000000)
+	check(200000000000, 1000000)
+	check(300000000000, 1500000)
+	check(450000000000, 1500000)
+	check(600000000000, 2000000)
+	check(800000000000, 2000000)
+	check(1000000000000, 2500000)
+
+	pp.SetLimits(11, 10000000)
+	curve = pp.GetCapacityCurve()
+
+	check(0, 0)
+	check(10000000000, 100000)
+	check(50000000000, 500000)
+	check(150000000000, 750000)
+	check(200000000000, 1000000)
+	check(220000000000, 1100000)
+	check(275000000000, 1100000)
+	check(375000000000, 1500000)
+	check(450000000000, 1500000)
+	check(600000000000, 2000000)
+	check(800000000000, 2000000)
+	check(1000000000000, 2500000)
+
+	ns.Stop()
+}
diff --git a/les/vflux/server/service.go b/les/vflux/server/service.go
new file mode 100644
index 000000000..ab759ae44
--- /dev/null
+++ b/les/vflux/server/service.go
@@ -0,0 +1,122 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package server
+
+import (
+	"net"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/ethereum/go-ethereum/les/utils"
+	"github.com/ethereum/go-ethereum/les/vflux"
+	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/p2p/enode"
+	"github.com/ethereum/go-ethereum/rlp"
+)
+
+type (
+	// Server serves vflux requests
+	Server struct {
+		limiter         *utils.Limiter
+		lock            sync.Mutex
+		services        map[string]*serviceEntry
+		delayPerRequest time.Duration
+	}
+
+	// Service is a service registered at the Server and identified by a string id
+	Service interface {
+		ServiceInfo() (id, desc string)                                      // only called during registration
+		Handle(id enode.ID, address string, name string, data []byte) []byte // never called concurrently
+	}
+
+	serviceEntry struct {
+		id, desc string
+		backend  Service
+	}
+)
+
+// NewServer creates a new Server
+func NewServer(delayPerRequest time.Duration) *Server {
+	return &Server{
+		limiter:         utils.NewLimiter(1000),
+		delayPerRequest: delayPerRequest,
+		services:        make(map[string]*serviceEntry),
+	}
+}
+
+// Register registers a Service
+func (s *Server) Register(b Service) {
+	srv := &serviceEntry{backend: b}
+	srv.id, srv.desc = b.ServiceInfo()
+	if strings.Contains(srv.id, ":") {
+		// srv.id + ":" will be used as a service database prefix
+		log.Error("Service ID contains ':'", "id", srv.id)
+		return
+	}
+	s.lock.Lock()
+	s.services[srv.id] = srv
+	s.lock.Unlock()
+}
+
+// Serve serves a vflux request batch
+// Note: requests are served by the Handle functions of the registered services. Serve
+// may be called concurrently but the Handle functions are called sequentially and
+// therefore thread safety is guaranteed.
+func (s *Server) Serve(id enode.ID, address string, requests vflux.Requests) vflux.Replies {
+	reqLen := uint(len(requests))
+	if reqLen == 0 || reqLen > vflux.MaxRequestLength {
+		return nil
+	}
+	// Note: the value parameter will be supplied by the token sale module (total amount paid)
+	ch := <-s.limiter.Add(id, address, 0, reqLen)
+	if ch == nil {
+		return nil
+	}
+	// Note: the limiter ensures that the following section is not running concurrently,
+	// the lock only protects against contention caused by new service registration
+	s.lock.Lock()
+	results := make(vflux.Replies, len(requests))
+	for i, req := range requests {
+		if service := s.services[req.Service]; service != nil {
+			results[i] = service.backend.Handle(id, address, req.Name, req.Params)
+		}
+	}
+	s.lock.Unlock()
+	time.Sleep(s.delayPerRequest * time.Duration(reqLen))
+	close(ch)
+	return results
+}
+
+// ServeEncoded serves an encoded vflux request batch and returns the encoded replies
+func (s *Server) ServeEncoded(id enode.ID, addr *net.UDPAddr, req []byte) []byte {
+	var requests vflux.Requests
+	if err := rlp.DecodeBytes(req, &requests); err != nil {
+		return nil
+	}
+	results := s.Serve(id, addr.String(), requests)
+	if results == nil {
+		return nil
+	}
+	res, _ := rlp.EncodeToBytes(&results)
+	return res
+}
+
+// Stop shuts down the server
+func (s *Server) Stop() {
+	s.limiter.Stop()
+}
diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go
index 9dd2b3173..eb01d95e9 100644
--- a/p2p/discover/v5_udp.go
+++ b/p2p/discover/v5_udp.go
@@ -74,7 +74,7 @@ type UDPv5 struct {
 
 	// talkreq handler registry
 	trlock     sync.Mutex
-	trhandlers map[string]func([]byte) []byte
+	trhandlers map[string]TalkRequestHandler
 
 	// channels into dispatch
 	packetInCh    chan ReadPacket
@@ -96,6 +96,9 @@ type UDPv5 struct {
 	wg             sync.WaitGroup
 }
 
+// TalkRequestHandler callback processes a talk request and optionally returns a reply
+type TalkRequestHandler func(enode.ID, *net.UDPAddr, []byte) []byte
+
 // callV5 represents a remote procedure call against another node.
 type callV5 struct {
 	node         *enode.Node
@@ -145,7 +148,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
 		log:          cfg.Log,
 		validSchemes: cfg.ValidSchemes,
 		clock:        cfg.Clock,
-		trhandlers:   make(map[string]func([]byte) []byte),
+		trhandlers:   make(map[string]TalkRequestHandler),
 		// channels into dispatch
 		packetInCh:    make(chan ReadPacket, 1),
 		readNextCh:    make(chan struct{}, 1),
@@ -233,7 +236,7 @@ func (t *UDPv5) LocalNode() *enode.LocalNode {
 // RegisterTalkHandler adds a handler for 'talk requests'. The handler function is called
 // whenever a request for the given protocol is received and should return the response
 // data or nil.
-func (t *UDPv5) RegisterTalkHandler(protocol string, handler func([]byte) []byte) {
+func (t *UDPv5) RegisterTalkHandler(protocol string, handler TalkRequestHandler) {
 	t.trlock.Lock()
 	defer t.trlock.Unlock()
 	t.trhandlers[protocol] = handler
@@ -841,7 +844,7 @@ func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAd
 
 	var response []byte
 	if handler != nil {
-		response = handler(p.Message)
+		response = handler(fromID, fromAddr, p.Message)
 	}
 	resp := &v5wire.TalkResponse{ReqID: p.ReqID, Message: response}
 	t.sendResponse(fromID, fromAddr, resp)
diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go
index d91a2097d..292785bd5 100644
--- a/p2p/discover/v5_udp_test.go
+++ b/p2p/discover/v5_udp_test.go
@@ -435,7 +435,7 @@ func TestUDPv5_talkHandling(t *testing.T) {
 	defer test.close()
 
 	var recvMessage []byte
-	test.udp.RegisterTalkHandler("test", func(message []byte) []byte {
+	test.udp.RegisterTalkHandler("test", func(id enode.ID, addr *net.UDPAddr, message []byte) []byte {
 		recvMessage = message
 		return []byte("test response")
 	})
diff --git a/p2p/nodestate/nodestate.go b/p2p/nodestate/nodestate.go
index def93bac4..d3166f1d8 100644
--- a/p2p/nodestate/nodestate.go
+++ b/p2p/nodestate/nodestate.go
@@ -599,6 +599,7 @@ func (ns *NodeStateMachine) updateEnode(n *enode.Node) (enode.ID, *nodeInfo) {
 	node := ns.nodes[id]
 	if node != nil && n.Seq() > node.node.Seq() {
 		node.node = n
+		node.dirty = true
 	}
 	return id, node
 }
-- 
GitLab