From bf5c6b29faa2ed863ef48903321b2ce362ed6f83 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= <zsfelfoldi@gmail.com>
Date: Wed, 13 Nov 2019 23:47:03 +0100
Subject: [PATCH] les: implement server priority API (#20070)

This PR implements the LES server RPC API. Methods for server
capacity, client balance and client priority management are provided.
---
 internal/web3ext/web3ext.go |  34 +++++
 les/api.go                  | 281 +++++++++++++++++++++++++++++++++-
 les/balance.go              |   8 +
 les/clientpool.go           | 296 +++++++++++++++++++++++++++---------
 les/clientpool_test.go      |  54 +++----
 les/server.go               |  31 ++--
 6 files changed, 597 insertions(+), 107 deletions(-)

diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go
index 86e575439..ac6acc799 100644
--- a/internal/web3ext/web3ext.go
+++ b/internal/web3ext/web3ext.go
@@ -445,6 +445,11 @@ web3._extend({
 			params: 2,
 			inputFormatter:[null, null],
 		}),
+		new web3._extend.Method({
+			name: 'freezeClient',
+			call: 'debug_freezeClient',
+			params: 1,
+		}),
 	],
 	properties: []
 });
@@ -798,6 +803,31 @@ web3._extend({
 			call: 'les_getCheckpoint',
 			params: 1
 		}),
+		new web3._extend.Method({
+			name: 'clientInfo',
+			call: 'les_clientInfo',
+			params: 1
+		}),
+		new web3._extend.Method({
+			name: 'priorityClientInfo',
+			call: 'les_priorityClientInfo',
+			params: 3
+		}),
+		new web3._extend.Method({
+			name: 'setClientParams',
+			call: 'les_setClientParams',
+			params: 2
+		}),
+		new web3._extend.Method({
+			name: 'setDefaultParams',
+			call: 'les_setDefaultParams',
+			params: 1
+		}),
+		new web3._extend.Method({
+			name: 'updateBalance',
+			call: 'les_updateBalance',
+			params: 3
+		}),
 	],
 	properties:
 	[
@@ -809,6 +839,10 @@ web3._extend({
 			name: 'checkpointContractAddress',
 			getter: 'les_getCheckpointContractAddress'
 		}),
+		new web3._extend.Property({
+			name: 'serverInfo',
+			getter: 'les_serverInfo'
+		}),
 	]
 });
 `
diff --git a/les/api.go b/les/api.go
index bbef771f0..c417b4ec0 100644
--- a/les/api.go
+++ b/les/api.go
@@ -18,15 +18,292 @@ package les
 
 import (
 	"errors"
+	"fmt"
+	"math"
+	"time"
 
 	"github.com/ethereum/go-ethereum/common/hexutil"
+	"github.com/ethereum/go-ethereum/common/mclock"
+	"github.com/ethereum/go-ethereum/p2p/enode"
 )
 
 var (
-	errNoCheckpoint = errors.New("no local checkpoint provided")
-	errNotActivated = errors.New("checkpoint registrar is not activated")
+	errNoCheckpoint         = errors.New("no local checkpoint provided")
+	errNotActivated         = errors.New("checkpoint registrar is not activated")
+	errUnknownBenchmarkType = errors.New("unknown benchmark type")
+	errBalanceOverflow      = errors.New("balance overflow")
+	errNoPriority           = errors.New("priority too low to raise capacity")
 )
 
+const maxBalance = math.MaxInt64
+
+// PrivateLightServerAPI provides an API to access the LES light server.
+type PrivateLightServerAPI struct {
+	server                               *LesServer
+	defaultPosFactors, defaultNegFactors priceFactors
+}
+
+// NewPrivateLightServerAPI creates a new LES light server API.
+func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
+	return &PrivateLightServerAPI{
+		server:            server,
+		defaultPosFactors: server.clientPool.defaultPosFactors,
+		defaultNegFactors: server.clientPool.defaultNegFactors,
+	}
+}
+
+// ServerInfo returns global server parameters
+func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} {
+	res := make(map[string]interface{})
+	res["minimumCapacity"] = api.server.minCapacity
+	res["maximumCapacity"] = api.server.maxCapacity
+	res["freeClientCapacity"] = api.server.freeCapacity
+	res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo()
+	return res
+}
+
+// ClientInfo returns information about clients listed in the ids list or matching the given tags
+func (api *PrivateLightServerAPI) ClientInfo(ids []enode.ID) map[enode.ID]map[string]interface{} {
+	res := make(map[enode.ID]map[string]interface{})
+	api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error {
+		res[id] = api.clientInfo(client, id)
+		return nil
+	})
+	return res
+}
+
+// PriorityClientInfo returns information about clients with a positive balance
+// in the given ID range (stop excluded). If stop is null then the iterator stops
+// only at the end of the ID space. MaxCount limits the number of results returned.
+// If maxCount limit is applied but there are more potential results then the ID
+// of the next potential result is included in the map with an empty structure
+// assigned to it.
+func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} {
+	res := make(map[enode.ID]map[string]interface{})
+	ids := api.server.clientPool.ndb.getPosBalanceIDs(start, stop, maxCount+1)
+	if len(ids) > maxCount {
+		res[ids[maxCount]] = make(map[string]interface{})
+		ids = ids[:maxCount]
+	}
+	if len(ids) != 0 {
+		api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error {
+			res[id] = api.clientInfo(client, id)
+			return nil
+		})
+	}
+	return res
+}
+
+// clientInfo creates a client info data structure
+func (api *PrivateLightServerAPI) clientInfo(c *clientInfo, id enode.ID) map[string]interface{} {
+	info := make(map[string]interface{})
+	if c != nil {
+		now := mclock.Now()
+		info["isConnected"] = true
+		info["connectionTime"] = float64(now-c.connectedAt) / float64(time.Second)
+		info["capacity"] = c.capacity
+		pb, nb := c.balanceTracker.getBalance(now)
+		info["pricing/balance"], info["pricing/negBalance"] = pb, nb
+		info["pricing/balanceMeta"] = c.balanceMetaInfo
+		info["priority"] = pb != 0
+	} else {
+		info["isConnected"] = false
+		pb := api.server.clientPool.getPosBalance(id)
+		info["pricing/balance"], info["pricing/balanceMeta"] = pb.value, pb.meta
+		info["priority"] = pb.value != 0
+	}
+	return info
+}
+
+// setParams either sets the given parameters for a single connected client (if specified)
+// or the default parameters applicable to clients connected in the future
+func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *priceFactors) (updateFactors bool, err error) {
+	defParams := client == nil
+	if !defParams {
+		posFactors, negFactors = &client.posFactors, &client.negFactors
+	}
+	for name, value := range params {
+		errValue := func() error {
+			return fmt.Errorf("invalid value for parameter '%s'", name)
+		}
+		setFactor := func(v *float64) {
+			if val, ok := value.(float64); ok && val >= 0 {
+				*v = val / float64(time.Second)
+				updateFactors = true
+			} else {
+				err = errValue()
+			}
+		}
+
+		switch {
+		case name == "pricing/timeFactor":
+			setFactor(&posFactors.timeFactor)
+		case name == "pricing/capacityFactor":
+			setFactor(&posFactors.capacityFactor)
+		case name == "pricing/requestCostFactor":
+			setFactor(&posFactors.requestFactor)
+		case name == "pricing/negative/timeFactor":
+			setFactor(&negFactors.timeFactor)
+		case name == "pricing/negative/capacityFactor":
+			setFactor(&negFactors.capacityFactor)
+		case name == "pricing/negative/requestCostFactor":
+			setFactor(&negFactors.requestFactor)
+		case !defParams && name == "capacity":
+			if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity {
+				err = api.server.clientPool.setCapacity(client, uint64(capacity))
+				// Don't have to call factor update explicitly. It's already done
+				// in setCapacity function.
+			} else {
+				err = errValue()
+			}
+		default:
+			if defParams {
+				err = fmt.Errorf("invalid default parameter '%s'", name)
+			} else {
+				err = fmt.Errorf("invalid client parameter '%s'", name)
+			}
+		}
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+// UpdateBalance updates the balance of a client (either overwrites it or adds to it).
+// It also updates the balance meta info string.
+func (api *PrivateLightServerAPI) UpdateBalance(id enode.ID, value int64, meta string) (map[string]uint64, error) {
+	oldBalance, newBalance, err := api.server.clientPool.updateBalance(id, value, meta)
+	m := make(map[string]uint64)
+	m["old"] = oldBalance
+	m["new"] = newBalance
+	return m, err
+}
+
+// SetClientParams sets client parameters for all clients listed in the ids list
+// or all connected clients if the list is empty
+func (api *PrivateLightServerAPI) SetClientParams(ids []enode.ID, params map[string]interface{}) error {
+	return api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error {
+		if client != nil {
+			update, err := api.setParams(params, client, nil, nil)
+			if update {
+				client.updatePriceFactors()
+			}
+			return err
+		} else {
+			return fmt.Errorf("client %064x is not connected", id[:])
+		}
+	})
+}
+
+// SetDefaultParams sets the default parameters applicable to clients connected in the future
+func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{}) error {
+	update, err := api.setParams(params, nil, &api.defaultPosFactors, &api.defaultNegFactors)
+	if update {
+		api.server.clientPool.setDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
+	}
+	return err
+}
+
+// Benchmark runs a request performance benchmark with a given set of measurement setups
+// in multiple passes specified by passCount. The measurement time for each setup in each
+// pass is specified in milliseconds by length.
+//
+// Note: measurement time is adjusted for each pass depending on the previous ones.
+// Therefore a controlled total measurement time is achievable in multiple passes.
+func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) {
+	benchmarks := make([]requestBenchmark, len(setups))
+	for i, setup := range setups {
+		if t, ok := setup["type"].(string); ok {
+			getInt := func(field string, def int) int {
+				if value, ok := setup[field].(float64); ok {
+					return int(value)
+				}
+				return def
+			}
+			getBool := func(field string, def bool) bool {
+				if value, ok := setup[field].(bool); ok {
+					return value
+				}
+				return def
+			}
+			switch t {
+			case "header":
+				benchmarks[i] = &benchmarkBlockHeaders{
+					amount:  getInt("amount", 1),
+					skip:    getInt("skip", 1),
+					byHash:  getBool("byHash", false),
+					reverse: getBool("reverse", false),
+				}
+			case "body":
+				benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false}
+			case "receipts":
+				benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true}
+			case "proof":
+				benchmarks[i] = &benchmarkProofsOrCode{code: false}
+			case "code":
+				benchmarks[i] = &benchmarkProofsOrCode{code: true}
+			case "cht":
+				benchmarks[i] = &benchmarkHelperTrie{
+					bloom:    false,
+					reqCount: getInt("amount", 1),
+				}
+			case "bloom":
+				benchmarks[i] = &benchmarkHelperTrie{
+					bloom:    true,
+					reqCount: getInt("amount", 1),
+				}
+			case "txSend":
+				benchmarks[i] = &benchmarkTxSend{}
+			case "txStatus":
+				benchmarks[i] = &benchmarkTxStatus{}
+			default:
+				return nil, errUnknownBenchmarkType
+			}
+		} else {
+			return nil, errUnknownBenchmarkType
+		}
+	}
+	rs := api.server.handler.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
+	result := make([]map[string]interface{}, len(setups))
+	for i, r := range rs {
+		res := make(map[string]interface{})
+		if r.err == nil {
+			res["totalCount"] = r.totalCount
+			res["avgTime"] = r.avgTime
+			res["maxInSize"] = r.maxInSize
+			res["maxOutSize"] = r.maxOutSize
+		} else {
+			res["error"] = r.err.Error()
+		}
+		result[i] = res
+	}
+	return result, nil
+}
+
+// PrivateDebugAPI provides an API to debug LES light server functionality.
+type PrivateDebugAPI struct {
+	server *LesServer
+}
+
+// NewPrivateDebugAPI creates a new LES light server debug API.
+func NewPrivateDebugAPI(server *LesServer) *PrivateDebugAPI {
+	return &PrivateDebugAPI{
+		server: server,
+	}
+}
+
+// FreezeClient forces a temporary client freeze which normally happens when the server is overloaded
+func (api *PrivateDebugAPI) FreezeClient(id enode.ID) error {
+	return api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo, id enode.ID) error {
+		if c == nil {
+			return fmt.Errorf("client %064x is not connected", id[:])
+		}
+		c.peer.freezeClient()
+		return nil
+	})
+}
+
 // PrivateLightAPI provides an API to access the LES light server or light client.
 type PrivateLightAPI struct {
 	backend *lesCommons
diff --git a/les/balance.go b/les/balance.go
index 2813db01c..6d1b20c01 100644
--- a/les/balance.go
+++ b/les/balance.go
@@ -160,6 +160,14 @@ func (bt *balanceTracker) timeUntil(priority int64) (time.Duration, bool) {
 	return time.Duration(dt), true
 }
 
+// setCapacity updates the capacity value used for priority calculation
+func (bt *balanceTracker) setCapacity(capacity uint64) {
+	bt.lock.Lock()
+	defer bt.lock.Unlock()
+
+	bt.capacity = capacity
+}
+
 // getPriority returns the actual priority based on the current balance
 func (bt *balanceTracker) getPriority(now mclock.AbsTime) int64 {
 	bt.lock.Lock()
diff --git a/les/clientpool.go b/les/clientpool.go
index 0b4d1b961..0169dc706 100644
--- a/les/clientpool.go
+++ b/les/clientpool.go
@@ -17,7 +17,9 @@
 package les
 
 import (
+	"bytes"
 	"encoding/binary"
+	"fmt"
 	"io"
 	"math"
 	"sync"
@@ -83,15 +85,16 @@ type clientPool struct {
 	connectedMap   map[enode.ID]*clientInfo
 	connectedQueue *prque.LazyQueue
 
-	posFactors, negFactors priceFactors
+	defaultPosFactors, defaultNegFactors priceFactors
 
-	connLimit      int            // The maximum number of connections that clientpool can support
-	capLimit       uint64         // The maximum cumulative capacity that clientpool can support
-	connectedCap   uint64         // The sum of the capacity of the current clientpool connected
-	freeClientCap  uint64         // The capacity value of each free client
-	startTime      mclock.AbsTime // The timestamp at which the clientpool started running
-	cumulativeTime int64          // The cumulative running time of clientpool at the start point.
-	disableBias    bool           // Disable connection bias(used in testing)
+	connLimit         int            // The maximum number of connections that clientpool can support
+	capLimit          uint64         // The maximum cumulative capacity that clientpool can support
+	connectedCap      uint64         // The sum of the capacity of the current clientpool connected
+	priorityConnected uint64         // The sum of the capacity of currently connected priority clients
+	freeClientCap     uint64         // The capacity value of each free client
+	startTime         mclock.AbsTime // The timestamp at which the clientpool started running
+	cumulativeTime    int64          // The cumulative running time of clientpool at the start point.
+	disableBias       bool           // Disable connection bias(used in testing)
 }
 
 // clientPeer represents a client in the pool.
@@ -103,18 +106,22 @@ type clientPeer interface {
 	ID() enode.ID
 	freeClientId() string
 	updateCapacity(uint64)
+	freezeClient()
 }
 
 // clientInfo represents a connected client
 type clientInfo struct {
-	address        string
-	id             enode.ID
-	capacity       uint64
-	priority       bool
-	pool           *clientPool
-	peer           clientPeer
-	queueIndex     int // position in connectedQueue
-	balanceTracker balanceTracker
+	address                string
+	id                     enode.ID
+	connectedAt            mclock.AbsTime
+	capacity               uint64
+	priority               bool
+	pool                   *clientPool
+	peer                   clientPeer
+	queueIndex             int // position in connectedQueue
+	balanceTracker         balanceTracker
+	posFactors, negFactors priceFactors
+	balanceMetaInfo        string
 }
 
 // connSetIndex callback updates clientInfo item index in connectedQueue
@@ -223,20 +230,26 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
 	)
 	pb := f.ndb.getOrNewPB(id)
 	posBalance = pb.value
-	e := &clientInfo{pool: f, peer: peer, address: freeID, queueIndex: -1, id: id, priority: posBalance != 0}
 
 	nb := f.ndb.getOrNewNB(freeID)
 	if nb.logValue != 0 {
-		negBalance = uint64(math.Exp(float64(nb.logValue-f.logOffset(now)) / fixedPointMultiplier))
-		negBalance *= uint64(time.Second)
+		negBalance = uint64(math.Exp(float64(nb.logValue-f.logOffset(now))/fixedPointMultiplier) * float64(time.Second))
+	}
+	e := &clientInfo{
+		pool:            f,
+		peer:            peer,
+		address:         freeID,
+		queueIndex:      -1,
+		id:              id,
+		connectedAt:     now,
+		priority:        posBalance != 0,
+		posFactors:      f.defaultPosFactors,
+		negFactors:      f.defaultNegFactors,
+		balanceMetaInfo: pb.meta,
 	}
 	// If the client is a free client, assign with a low free capacity,
 	// Otherwise assign with the given value(priority client)
-	if !e.priority {
-		capacity = f.freeClientCap
-	}
-	// Ensure the capacity will never lower than the free capacity.
-	if capacity < f.freeClientCap {
+	if !e.priority || capacity == 0 {
 		capacity = f.freeClientCap
 	}
 	e.capacity = capacity
@@ -244,7 +257,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
 	// Starts a balance tracker
 	e.balanceTracker.init(f.clock, capacity)
 	e.balanceTracker.setBalance(posBalance, negBalance)
-	f.setClientPriceFactors(e)
+	e.updatePriceFactors()
 
 	// If the number of clients already connected in the clientpool exceeds its
 	// capacity, evict some clients with lowest priority.
@@ -283,6 +296,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
 			f.dropClient(c, now, true)
 		}
 	}
+
 	// Register new client to connection queue.
 	f.connectedMap[id] = e
 	f.connectedQueue.Push(e)
@@ -291,6 +305,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
 	// If the current client is a paid client, monitor the status of client,
 	// downgrade it to normal client if positive balance is used up.
 	if e.priority {
+		f.priorityConnected += capacity
 		e.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) })
 	}
 	// If the capacity of client is not the default value(free capacity), notify
@@ -324,6 +339,38 @@ func (f *clientPool) disconnect(p clientPeer) {
 	f.dropClient(e, f.clock.Now(), false)
 }
 
+// forClients iterates through a list of clients, calling the callback for each one.
+// If a client is not connected then clientInfo is nil. If the specified list is empty
+// then the callback is called for all connected clients.
+func (f *clientPool) forClients(ids []enode.ID, callback func(*clientInfo, enode.ID) error) error {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	if len(ids) > 0 {
+		for _, id := range ids {
+			if err := callback(f.connectedMap[id], id); err != nil {
+				return err
+			}
+		}
+	} else {
+		for _, c := range f.connectedMap {
+			if err := callback(c, c.id); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+// setDefaultFactors sets the default price factors applied to subsequently connected clients
+func (f *clientPool) setDefaultFactors(posFactors, negFactors priceFactors) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	f.defaultPosFactors = posFactors
+	f.defaultNegFactors = negFactors
+}
+
 // dropClient removes a client from the connected queue and finalizes its balance.
 // If kick is true then it also initiates the disconnection.
 func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) {
@@ -334,6 +381,9 @@ func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) {
 	f.connectedQueue.Remove(e.queueIndex)
 	delete(f.connectedMap, e.id)
 	f.connectedCap -= e.capacity
+	if e.priority {
+		f.priorityConnected -= e.capacity
+	}
 	totalConnectedGauge.Update(int64(f.connectedCap))
 	if kick {
 		clientKickedMeter.Mark(1)
@@ -345,6 +395,15 @@ func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) {
 	}
 }
 
+// capacityInfo returns the total capacity allowance, the total capacity of connected
+// clients and the total capacity of connected and prioritized clients
+func (f *clientPool) capacityInfo() (uint64, uint64, uint64) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	return f.capLimit, f.connectedCap, f.priorityConnected
+}
+
 // finalizeBalance stops the balance tracker, retrieves the final balances and
 // stores them in posBalanceQueue and negBalanceQueue
 func (f *clientPool) finalizeBalance(c *clientInfo, now mclock.AbsTime) {
@@ -374,14 +433,20 @@ func (f *clientPool) balanceExhausted(id enode.ID) {
 	if c == nil || !c.priority {
 		return
 	}
+	if c.priority {
+		f.priorityConnected -= c.capacity
+	}
 	c.priority = false
 	if c.capacity != f.freeClientCap {
 		f.connectedCap += f.freeClientCap - c.capacity
 		totalConnectedGauge.Update(int64(f.connectedCap))
 		c.capacity = f.freeClientCap
+		c.balanceTracker.setCapacity(c.capacity)
 		c.peer.updateCapacity(c.capacity)
 	}
-	f.ndb.delPB(id)
+	pb := f.ndb.getOrNewPB(id)
+	pb.value = 0
+	f.ndb.setPB(id, pb)
 }
 
 // setConnLimit sets the maximum number and total capacity of connected clients,
@@ -400,6 +465,56 @@ func (f *clientPool) setLimits(totalConn int, totalCap uint64) {
 	}
 }
 
+// setCapacity sets the assigned capacity of a connected client
+func (f *clientPool) setCapacity(c *clientInfo, capacity uint64) error {
+	if f.connectedMap[c.id] != c {
+		return fmt.Errorf("client %064x is not connected", c.id[:])
+	}
+	if c.capacity == capacity {
+		return nil
+	}
+	if !c.priority {
+		return errNoPriority
+	}
+	oldCapacity := c.capacity
+	c.capacity = capacity
+	f.connectedCap += capacity - oldCapacity
+	c.balanceTracker.setCapacity(capacity)
+	f.connectedQueue.Update(c.queueIndex)
+	if f.connectedCap > f.capLimit {
+		var kickList []*clientInfo
+		kick := true
+		f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
+			client := data.(*clientInfo)
+			kickList = append(kickList, client)
+			f.connectedCap -= client.capacity
+			if client == c {
+				kick = false
+			}
+			return kick && (f.connectedCap > f.capLimit)
+		})
+		if kick {
+			now := mclock.Now()
+			for _, c := range kickList {
+				f.dropClient(c, now, true)
+			}
+		} else {
+			c.capacity = oldCapacity
+			c.balanceTracker.setCapacity(oldCapacity)
+			for _, c := range kickList {
+				f.connectedCap += c.capacity
+				f.connectedQueue.Push(c)
+			}
+			return errNoPriority
+		}
+	}
+	totalConnectedGauge.Update(int64(f.connectedCap))
+	f.priorityConnected += capacity - oldCapacity
+	c.updatePriceFactors()
+	c.peer.updateCapacity(c.capacity)
+	return nil
+}
+
 // requestCost feeds request cost after serving a request from the given peer.
 func (f *clientPool) requestCost(p *peer, cost uint64) {
 	f.lock.Lock()
@@ -424,83 +539,86 @@ func (f *clientPool) logOffset(now mclock.AbsTime) int64 {
 	return f.cumulativeTime + cumulativeTime
 }
 
-// setPriceFactors changes pricing factors for both positive and negative balances.
-// Applies to connected clients and also future connections.
-func (f *clientPool) setPriceFactors(posFactors, negFactors priceFactors) {
+// setClientPriceFactors sets the pricing factors for an individual connected client
+func (c *clientInfo) updatePriceFactors() {
+	c.balanceTracker.setFactors(true, c.negFactors.timeFactor+float64(c.capacity)*c.negFactors.capacityFactor/1000000, c.negFactors.requestFactor)
+	c.balanceTracker.setFactors(false, c.posFactors.timeFactor+float64(c.capacity)*c.posFactors.capacityFactor/1000000, c.posFactors.requestFactor)
+}
+
+// getPosBalance retrieves a single positive balance entry from cache or the database
+func (f *clientPool) getPosBalance(id enode.ID) posBalance {
 	f.lock.Lock()
 	defer f.lock.Unlock()
 
-	f.posFactors, f.negFactors = posFactors, negFactors
-	for _, c := range f.connectedMap {
-		f.setClientPriceFactors(c)
-	}
-}
-
-// setClientPriceFactors sets the pricing factors for an individual connected client
-func (f *clientPool) setClientPriceFactors(c *clientInfo) {
-	c.balanceTracker.setFactors(true, f.negFactors.timeFactor+float64(c.capacity)*f.negFactors.capacityFactor/1000000, f.negFactors.requestFactor)
-	c.balanceTracker.setFactors(false, f.posFactors.timeFactor+float64(c.capacity)*f.posFactors.capacityFactor/1000000, f.posFactors.requestFactor)
+	return f.ndb.getOrNewPB(id)
 }
 
-// addBalance updates the positive balance of a client.
-// If setTotal is false then the given amount is added to the balance.
-// If setTotal is true then amount represents the total amount ever added to the
-// given ID and positive balance is increased by (amount-lastTotal) while lastTotal
-// is updated to amount. This method also allows removing positive balance.
-func (f *clientPool) addBalance(id enode.ID, amount uint64, setTotal bool) {
+// updateBalance updates the balance of a client (either overwrites it or adds to it).
+// It also updates the balance meta info string.
+func (f *clientPool) updateBalance(id enode.ID, amount int64, meta string) (uint64, uint64, error) {
 	f.lock.Lock()
 	defer f.lock.Unlock()
 
 	pb := f.ndb.getOrNewPB(id)
+	var negBalance uint64
 	c := f.connectedMap[id]
 	if c != nil {
-		posBalance, negBalance := c.balanceTracker.getBalance(f.clock.Now())
-		pb.value = posBalance
-		defer func() {
-			c.balanceTracker.setBalance(pb.value, negBalance)
-			if !c.priority && pb.value > 0 {
-				// The capacity should be adjusted based on the requirement,
-				// but we have no idea about the new capacity, need a second
-				// call to udpate it.
-				c.priority = true
-				c.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) })
-			}
-		}()
+		pb.value, negBalance = c.balanceTracker.getBalance(f.clock.Now())
 	}
-	if setTotal {
-		if pb.value+amount > pb.lastTotal {
-			pb.value += amount - pb.lastTotal
-		} else {
-			pb.value = 0
+	oldBalance := pb.value
+	if amount > 0 {
+		if amount > maxBalance || pb.value > maxBalance-uint64(amount) {
+			return oldBalance, oldBalance, errBalanceOverflow
 		}
-		pb.lastTotal = amount
+		pb.value += uint64(amount)
 	} else {
-		pb.value += amount
-		pb.lastTotal += amount
+		if uint64(-amount) > pb.value {
+			pb.value = 0
+		} else {
+			pb.value -= uint64(-amount)
+		}
 	}
+	pb.meta = meta
 	f.ndb.setPB(id, pb)
+	if c != nil {
+		c.balanceTracker.setBalance(pb.value, negBalance)
+		if !c.priority && pb.value > 0 {
+			// The capacity should be adjusted based on the requirement,
+			// but we have no idea about the new capacity, need a second
+			// call to udpate it.
+			c.priority = true
+			f.priorityConnected += c.capacity
+			c.balanceTracker.addCallback(balanceCallbackZero, 0, func() { f.balanceExhausted(id) })
+		}
+		// if balance is set to zero then reverting to non-priority status
+		// is handled by the balanceExhausted callback
+		c.balanceMetaInfo = meta
+	}
+	return oldBalance, pb.value, nil
 }
 
 // posBalance represents a recently accessed positive balance entry
 type posBalance struct {
-	value, lastTotal uint64
+	value uint64
+	meta  string
 }
 
 // EncodeRLP implements rlp.Encoder
 func (e *posBalance) EncodeRLP(w io.Writer) error {
-	return rlp.Encode(w, []interface{}{e.value, e.lastTotal})
+	return rlp.Encode(w, []interface{}{e.value, e.meta})
 }
 
 // DecodeRLP implements rlp.Decoder
 func (e *posBalance) DecodeRLP(s *rlp.Stream) error {
 	var entry struct {
-		Value, LastTotal uint64
+		Value uint64
+		Meta  string
 	}
 	if err := s.Decode(&entry); err != nil {
 		return err
 	}
 	e.value = entry.Value
-	e.lastTotal = entry.LastTotal
+	e.meta = entry.Meta
 	return nil
 }
 
@@ -526,7 +644,10 @@ func (e *negBalance) DecodeRLP(s *rlp.Stream) error {
 
 const (
 	// nodeDBVersion is the version identifier of the node data in db
-	nodeDBVersion = 0
+	//
+	// Changelog:
+	// * Replace `lastTotal` with `meta` in positive balance: version 0=>1
+	nodeDBVersion = 1
 
 	// dbCleanupCycle is the cycle of db for useless data cleanup
 	dbCleanupCycle = time.Hour
@@ -614,6 +735,10 @@ func (db *nodeDB) getOrNewPB(id enode.ID) posBalance {
 }
 
 func (db *nodeDB) setPB(id enode.ID, b posBalance) {
+	if b.value == 0 && len(b.meta) == 0 {
+		db.delPB(id)
+		return
+	}
 	key := db.key(id.Bytes(), false)
 	enc, err := rlp.EncodeToBytes(&(b))
 	if err != nil {
@@ -630,6 +755,37 @@ func (db *nodeDB) delPB(id enode.ID) {
 	db.pcache.Remove(string(key))
 }
 
+// getPosBalanceIDs returns a lexicographically ordered list of IDs of accounts
+// with a positive balance
+func (db *nodeDB) getPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) {
+	if maxCount <= 0 {
+		return
+	}
+	it := db.db.NewIteratorWithStart(db.key(start.Bytes(), false))
+	defer it.Release()
+	for i := len(stop[:]) - 1; i >= 0; i-- {
+		stop[i]--
+		if stop[i] != 255 {
+			break
+		}
+	}
+	stopKey := db.key(stop.Bytes(), false)
+	keyLen := len(stopKey)
+
+	for it.Next() {
+		var id enode.ID
+		if len(it.Key()) != keyLen || bytes.Compare(it.Key(), stopKey) == 1 {
+			return
+		}
+		copy(id[:], it.Key()[keyLen-len(id):])
+		result = append(result, id)
+		if len(result) == maxCount {
+			return
+		}
+	}
+	return
+}
+
 func (db *nodeDB) getOrNewNB(id string) negBalance {
 	key := db.key([]byte(id), true)
 	item, exist := db.ncache.Get(string(key))
diff --git a/les/clientpool_test.go b/les/clientpool_test.go
index 53973696c..8b990e1dc 100644
--- a/les/clientpool_test.go
+++ b/les/clientpool_test.go
@@ -76,6 +76,8 @@ type poolTestPeerWithCap struct {
 
 func (i *poolTestPeerWithCap) updateCapacity(cap uint64) { i.cap = cap }
 
+func (i poolTestPeer) freezeClient() {}
+
 func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomDisconnect bool) {
 	rand.Seed(time.Now().UnixNano())
 	var (
@@ -91,7 +93,7 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD
 	)
 	pool.disableBias = true
 	pool.setLimits(connLimit, uint64(connLimit))
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	// pool should accept new peers up to its connected limit
 	for i := 0; i < connLimit; i++ {
@@ -107,9 +109,9 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD
 
 		if tickCounter == testClientPoolTicks/4 {
 			// give a positive balance to some of the peers
-			amount := uint64(testClientPoolTicks / 2 * 1000000000) // enough for half of the simulation period
+			amount := testClientPoolTicks / 2 * int64(time.Second) // enough for half of the simulation period
 			for i := 0; i < paidCount; i++ {
-				pool.addBalance(poolTestPeer(i).ID(), amount, false)
+				pool.updateBalance(poolTestPeer(i).ID(), amount, "")
 			}
 		}
 
@@ -173,10 +175,10 @@ func TestConnectPaidClient(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, nil)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10))
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	// Add balance for an external client and mark it as paid client
-	pool.addBalance(poolTestPeer(0).ID(), 1000, false)
+	pool.updateBalance(poolTestPeer(0).ID(), 1000, "")
 
 	if !pool.connect(poolTestPeer(0), 10) {
 		t.Fatalf("Failed to connect paid client")
@@ -191,10 +193,10 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, nil)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	// Add balance for an external client and mark it as paid client
-	pool.addBalance(poolTestPeer(0).ID(), 1000, false)
+	pool.updateBalance(poolTestPeer(0).ID(), 1000, "")
 
 	// Connect a fat paid client to pool, should reject it.
 	if pool.connect(poolTestPeer(0), 100) {
@@ -211,18 +213,18 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	for i := 0; i < 10; i++ {
-		pool.addBalance(poolTestPeer(i).ID(), 1000000000, false)
+		pool.updateBalance(poolTestPeer(i).ID(), 1000000000, "")
 		pool.connect(poolTestPeer(i), 1)
 	}
-	pool.addBalance(poolTestPeer(11).ID(), 1000, false) // Add low balance to new paid client
+	pool.updateBalance(poolTestPeer(11).ID(), 1000, "") // Add low balance to new paid client
 	if pool.connect(poolTestPeer(11), 1) {
 		t.Fatalf("Low balance paid client should be rejected")
 	}
 	clock.Run(time.Second)
-	pool.addBalance(poolTestPeer(12).ID(), 1000000000*60*3, false) // Add high balance to new paid client
+	pool.updateBalance(poolTestPeer(12).ID(), 1000000000*60*3, "") // Add high balance to new paid client
 	if !pool.connect(poolTestPeer(12), 1) {
 		t.Fatalf("High balance paid client should be accpected")
 	}
@@ -238,10 +240,10 @@ func TestPaidClientKickedOut(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	for i := 0; i < 10; i++ {
-		pool.addBalance(poolTestPeer(i).ID(), 1000000000, false) // 1 second allowance
+		pool.updateBalance(poolTestPeer(i).ID(), 1000000000, "") // 1 second allowance
 		pool.connect(poolTestPeer(i), 1)
 		clock.Run(time.Millisecond)
 	}
@@ -268,7 +270,7 @@ func TestConnectFreeClient(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, nil)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10))
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 	if !pool.connect(poolTestPeer(0), 10) {
 		t.Fatalf("Failed to connect free client")
 	}
@@ -283,7 +285,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	for i := 0; i < 10; i++ {
 		pool.connect(poolTestPeer(i), 1)
@@ -312,7 +314,7 @@ func TestFreeClientKickedOut(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	for i := 0; i < 10; i++ {
 		pool.connect(poolTestPeer(i), 1)
@@ -347,9 +349,9 @@ func TestPositiveBalanceCalculation(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
-	pool.addBalance(poolTestPeer(0).ID(), uint64(time.Minute*3), false)
+	pool.updateBalance(poolTestPeer(0).ID(), int64(time.Minute*3), "")
 	pool.connect(poolTestPeer(0), 10)
 	clock.Run(time.Minute)
 
@@ -370,12 +372,12 @@ func TestDowngradePriorityClient(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	p := &poolTestPeerWithCap{
 		poolTestPeer: poolTestPeer(0),
 	}
-	pool.addBalance(p.ID(), uint64(time.Minute), false)
+	pool.updateBalance(p.ID(), int64(time.Minute), "")
 	pool.connect(p, 10)
 	if p.cap != 10 {
 		t.Fatalf("The capcacity of priority peer hasn't been updated, got: %d", p.cap)
@@ -391,7 +393,7 @@ func TestDowngradePriorityClient(t *testing.T) {
 		t.Fatalf("Positive balance mismatch, want %v, got %v", 0, pb.value)
 	}
 
-	pool.addBalance(poolTestPeer(0).ID(), uint64(time.Minute), false)
+	pool.updateBalance(poolTestPeer(0).ID(), int64(time.Minute), "")
 	pb = pool.ndb.getOrNewPB(poolTestPeer(0).ID())
 	if pb.value != uint64(time.Minute) {
 		t.Fatalf("Positive balance mismatch, want %v, got %v", uint64(time.Minute), pb.value)
@@ -408,7 +410,7 @@ func TestNegativeBalanceCalculation(t *testing.T) {
 	pool := newClientPool(db, 1, &clock, removeFn)
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
-	pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
+	pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
 
 	for i := 0; i < 10; i++ {
 		pool.connect(poolTestPeer(i), 1)
@@ -442,8 +444,8 @@ func TestNodeDB(t *testing.T) {
 	ndb := newNodeDB(rawdb.NewMemoryDatabase(), mclock.System{})
 	defer ndb.close()
 
-	if !bytes.Equal(ndb.verbuf[:], []byte{0x00, 0x00}) {
-		t.Fatalf("version buffer mismatch, want %v, got %v", []byte{0x00, 0x00}, ndb.verbuf)
+	if !bytes.Equal(ndb.verbuf[:], []byte{0x00, nodeDBVersion}) {
+		t.Fatalf("version buffer mismatch, want %v, got %v", []byte{0x00, nodeDBVersion}, ndb.verbuf)
 	}
 	var cases = []struct {
 		id       enode.ID
@@ -451,8 +453,8 @@ func TestNodeDB(t *testing.T) {
 		balance  interface{}
 		positive bool
 	}{
-		{enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 100, lastTotal: 200}, true},
-		{enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 200, lastTotal: 300}, true},
+		{enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 100}, true},
+		{enode.ID{0x00, 0x01, 0x02}, "", posBalance{value: 200}, true},
 		{enode.ID{}, "127.0.0.1", negBalance{logValue: 10}, false},
 		{enode.ID{}, "127.0.0.1", negBalance{logValue: 20}, false},
 	}
diff --git a/les/server.go b/les/server.go
index 997a24191..e68903dd8 100644
--- a/les/server.go
+++ b/les/server.go
@@ -50,9 +50,9 @@ type LesServer struct {
 	servingQueue *servingQueue
 	clientPool   *clientPool
 
-	freeCapacity uint64 // The minimal client capacity used for free client.
-	threadsIdle  int    // Request serving threads count when system is idle.
-	threadsBusy  int    // Request serving threads count when system is busy(block insertion).
+	minCapacity, maxCapacity, freeCapacity uint64
+	threadsIdle                            int // Request serving threads count when system is idle.
+	threadsBusy                            int // Request serving threads count when system is busy(block insertion).
 }
 
 func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
@@ -88,7 +88,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
 		threadsIdle:  threads,
 	}
 	srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), e.Synced)
-	srv.costTracker, srv.freeCapacity = newCostTracker(e.ChainDb(), config)
+	srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
+	srv.freeCapacity = srv.minCapacity
 
 	// Set up checkpoint oracle.
 	oracle := config.CheckpointOracle
@@ -108,13 +109,13 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
 	// to send requests most of the time. Our goal is to serve as many clients as
 	// possible while the actually used server capacity does not exceed the limits
 	totalRecharge := srv.costTracker.totalRecharge()
-	maxCapacity := srv.freeCapacity * uint64(srv.config.LightPeers)
-	if totalRecharge > maxCapacity {
-		maxCapacity = totalRecharge
+	srv.maxCapacity = srv.freeCapacity * uint64(srv.config.LightPeers)
+	if totalRecharge > srv.maxCapacity {
+		srv.maxCapacity = totalRecharge
 	}
-	srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
+	srv.fcManager.SetCapacityLimits(srv.freeCapacity, srv.maxCapacity, srv.freeCapacity*2)
 	srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
-	srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
+	srv.clientPool.setDefaultFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
 
 	checkpoint := srv.latestLocalCheckpoint()
 	if !checkpoint.Empty() {
@@ -133,6 +134,18 @@ func (s *LesServer) APIs() []rpc.API {
 			Service:   NewPrivateLightAPI(&s.lesCommons),
 			Public:    false,
 		},
+		{
+			Namespace: "les",
+			Version:   "1.0",
+			Service:   NewPrivateLightServerAPI(s),
+			Public:    false,
+		},
+		{
+			Namespace: "debug",
+			Version:   "1.0",
+			Service:   NewPrivateDebugAPI(s),
+			Public:    false,
+		},
 	}
 }
 
-- 
GitLab