From 59a31983829ee542682b842e5b9b12b4cdac193d Mon Sep 17 00:00:00 2001
From: gary rong <garyrong0905@gmail.com>
Date: Thu, 4 Jul 2019 02:23:06 +0800
Subject: [PATCH] les: remove half-finished priority pool APIs (#19780)

* les: remove half-finish APIs

* les: remove half-finish APIs
---
 les/api.go                 | 456 +------------------------------------
 les/costtracker.go         | 135 ++++++-----
 les/csvlogger/csvlogger.go | 227 ------------------
 les/freeclient.go          |  47 ++--
 les/freeclient_test.go     |   4 +-
 les/handler.go             |  15 +-
 les/helper_test.go         |   2 +-
 les/metrics.go             |  57 ++---
 les/server.go              |  70 ++----
 les/servingqueue.go        |  30 +--
 10 files changed, 153 insertions(+), 890 deletions(-)
 delete mode 100644 les/csvlogger/csvlogger.go

diff --git a/les/api.go b/les/api.go
index b53512196..95e1b009e 100644
--- a/les/api.go
+++ b/les/api.go
@@ -17,462 +17,16 @@
 package les
 
 import (
-	"context"
 	"errors"
-	"fmt"
-	"sync"
-	"time"
 
 	"github.com/ethereum/go-ethereum/common/hexutil"
-	"github.com/ethereum/go-ethereum/common/mclock"
-	"github.com/ethereum/go-ethereum/les/csvlogger"
-	"github.com/ethereum/go-ethereum/p2p/enode"
-	"github.com/ethereum/go-ethereum/rpc"
 )
 
 var (
-	ErrMinCap               = errors.New("capacity too small")
-	ErrTotalCap             = errors.New("total capacity exceeded")
-	ErrUnknownBenchmarkType = errors.New("unknown benchmark type")
-	ErrNoCheckpoint         = errors.New("no local checkpoint provided")
-	ErrNotActivated         = errors.New("checkpoint registrar is not activated")
-
-	dropCapacityDelay = time.Second // delay applied to decreasing capacity changes
-)
-
-// PrivateLightServerAPI provides an API to access the LES light server.
-// It offers only methods that operate on public data that is freely available to anyone.
-type PrivateLightServerAPI struct {
-	server *LesServer
-}
-
-// NewPrivateLightServerAPI creates a new LES light server API.
-func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
-	return &PrivateLightServerAPI{
-		server: server,
-	}
-}
-
-// TotalCapacity queries total available capacity for all clients
-func (api *PrivateLightServerAPI) TotalCapacity() hexutil.Uint64 {
-	return hexutil.Uint64(api.server.priorityClientPool.totalCapacity())
-}
-
-// SubscribeTotalCapacity subscribes to changed total capacity events.
-// If onlyUnderrun is true then notification is sent only if the total capacity
-// drops under the total capacity of connected priority clients.
-//
-// Note: actually applying decreasing total capacity values is delayed while the
-// notification is sent instantly. This allows lowering the capacity of a priority client
-// or choosing which one to drop before the system drops some of them automatically.
-func (api *PrivateLightServerAPI) SubscribeTotalCapacity(ctx context.Context, onlyUnderrun bool) (*rpc.Subscription, error) {
-	notifier, supported := rpc.NotifierFromContext(ctx)
-	if !supported {
-		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
-	}
-	rpcSub := notifier.CreateSubscription()
-	api.server.priorityClientPool.subscribeTotalCapacity(&tcSubscription{notifier, rpcSub, onlyUnderrun})
-	return rpcSub, nil
-}
-
-type (
-	// tcSubscription represents a total capacity subscription
-	tcSubscription struct {
-		notifier     *rpc.Notifier
-		rpcSub       *rpc.Subscription
-		onlyUnderrun bool
-	}
-	tcSubs map[*tcSubscription]struct{}
+	errNoCheckpoint = errors.New("no local checkpoint provided")
+	errNotActivated = errors.New("checkpoint registrar is not activated")
 )
 
-// send sends a changed total capacity event to the subscribers
-func (s tcSubs) send(tc uint64, underrun bool) {
-	for sub := range s {
-		select {
-		case <-sub.rpcSub.Err():
-			delete(s, sub)
-		case <-sub.notifier.Closed():
-			delete(s, sub)
-		default:
-			if underrun || !sub.onlyUnderrun {
-				sub.notifier.Notify(sub.rpcSub.ID, tc)
-			}
-		}
-	}
-}
-
-// MinimumCapacity queries minimum assignable capacity for a single client
-func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 {
-	return hexutil.Uint64(api.server.minCapacity)
-}
-
-// FreeClientCapacity queries the capacity provided for free clients
-func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 {
-	return hexutil.Uint64(api.server.freeClientCap)
-}
-
-// SetClientCapacity sets the priority capacity assigned to a given client.
-// If the assigned capacity is bigger than zero then connection is always
-// guaranteed. The sum of capacity assigned to priority clients can not exceed
-// the total available capacity.
-//
-// Note: assigned capacity can be changed while the client is connected with
-// immediate effect.
-func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error {
-	if cap != 0 && cap < api.server.minCapacity {
-		return ErrMinCap
-	}
-	return api.server.priorityClientPool.setClientCapacity(id, cap)
-}
-
-// GetClientCapacity returns the capacity assigned to a given client
-func (api *PrivateLightServerAPI) GetClientCapacity(id enode.ID) hexutil.Uint64 {
-	api.server.priorityClientPool.lock.Lock()
-	defer api.server.priorityClientPool.lock.Unlock()
-
-	return hexutil.Uint64(api.server.priorityClientPool.clients[id].cap)
-}
-
-// clientPool is implemented by both the free and priority client pools
-type clientPool interface {
-	peerSetNotify
-	setLimits(count int, totalCap uint64)
-}
-
-// priorityClientPool stores information about prioritized clients
-type priorityClientPool struct {
-	lock                             sync.Mutex
-	child                            clientPool
-	ps                               *peerSet
-	clients                          map[enode.ID]priorityClientInfo
-	totalCap, totalCapAnnounced      uint64
-	totalConnectedCap, freeClientCap uint64
-	maxPeers, priorityCount          int
-	logger                           *csvlogger.Logger
-	logTotalPriConn                  *csvlogger.Channel
-
-	subs            tcSubs
-	updateSchedule  []scheduledUpdate
-	scheduleCounter uint64
-}
-
-// scheduledUpdate represents a delayed total capacity update
-type scheduledUpdate struct {
-	time         mclock.AbsTime
-	totalCap, id uint64
-}
-
-// priorityClientInfo entries exist for all prioritized clients and currently connected non-priority clients
-type priorityClientInfo struct {
-	cap       uint64 // zero for non-priority clients
-	connected bool
-	peer      *peer
-}
-
-// newPriorityClientPool creates a new priority client pool
-func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool {
-	return &priorityClientPool{
-		clients:         make(map[enode.ID]priorityClientInfo),
-		freeClientCap:   freeClientCap,
-		ps:              ps,
-		child:           child,
-		logger:          eventLogger,
-		logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0),
-	}
-}
-
-// registerPeer is called when a new client is connected. If the client has no
-// priority assigned then it is passed to the child pool which may either keep it
-// or disconnect it.
-//
-// Note: priorityClientPool also stores a record about free clients while they are
-// connected in order to be able to assign priority to them later.
-func (v *priorityClientPool) registerPeer(p *peer) {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	id := p.ID()
-	c := v.clients[id]
-	v.logger.Event(fmt.Sprintf("priorityClientPool: registerPeer  cap=%d  connected=%v, %x", c.cap, c.connected, id.Bytes()))
-	if c.connected {
-		return
-	}
-	if c.cap == 0 && v.child != nil {
-		v.child.registerPeer(p)
-	}
-	if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap {
-		v.logger.Event(fmt.Sprintf("priorityClientPool: rejected, %x", id.Bytes()))
-		go v.ps.Unregister(p.id)
-		return
-	}
-
-	c.connected = true
-	c.peer = p
-	v.clients[id] = c
-	if c.cap != 0 {
-		v.priorityCount++
-		v.totalConnectedCap += c.cap
-		v.logger.Event(fmt.Sprintf("priorityClientPool: accepted with %d capacity, %x", c.cap, id.Bytes()))
-		v.logTotalPriConn.Update(float64(v.totalConnectedCap))
-		if v.child != nil {
-			v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
-		}
-		p.updateCapacity(c.cap)
-	}
-}
-
-// unregisterPeer is called when a client is disconnected. If the client has no
-// priority assigned then it is also removed from the child pool.
-func (v *priorityClientPool) unregisterPeer(p *peer) {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	id := p.ID()
-	c := v.clients[id]
-	v.logger.Event(fmt.Sprintf("priorityClientPool: unregisterPeer  cap=%d  connected=%v, %x", c.cap, c.connected, id.Bytes()))
-	if !c.connected {
-		return
-	}
-	if c.cap != 0 {
-		c.connected = false
-		v.clients[id] = c
-		v.priorityCount--
-		v.totalConnectedCap -= c.cap
-		v.logTotalPriConn.Update(float64(v.totalConnectedCap))
-		if v.child != nil {
-			v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
-		}
-	} else {
-		if v.child != nil {
-			v.child.unregisterPeer(p)
-		}
-		delete(v.clients, id)
-	}
-}
-
-// setLimits updates the allowed peer count and total capacity of the priority
-// client pool. Since the free client pool is a child of the priority pool the
-// remaining peer count and capacity is assigned to the free pool by calling its
-// own setLimits function.
-//
-// Note: a decreasing change of the total capacity is applied with a delay.
-func (v *priorityClientPool) setLimits(count int, totalCap uint64) {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	v.totalCapAnnounced = totalCap
-	if totalCap > v.totalCap {
-		v.setLimitsNow(count, totalCap)
-		v.subs.send(totalCap, false)
-		return
-	}
-	v.setLimitsNow(count, v.totalCap)
-	if totalCap < v.totalCap {
-		v.subs.send(totalCap, totalCap < v.totalConnectedCap)
-		for i, s := range v.updateSchedule {
-			if totalCap >= s.totalCap {
-				s.totalCap = totalCap
-				v.updateSchedule = v.updateSchedule[:i+1]
-				return
-			}
-		}
-		v.updateSchedule = append(v.updateSchedule, scheduledUpdate{time: mclock.Now() + mclock.AbsTime(dropCapacityDelay), totalCap: totalCap})
-		if len(v.updateSchedule) == 1 {
-			v.scheduleCounter++
-			id := v.scheduleCounter
-			v.updateSchedule[0].id = id
-			time.AfterFunc(dropCapacityDelay, func() { v.checkUpdate(id) })
-		}
-	} else {
-		v.updateSchedule = nil
-	}
-}
-
-// checkUpdate performs the next scheduled update if possible and schedules
-// the one after that
-func (v *priorityClientPool) checkUpdate(id uint64) {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	if len(v.updateSchedule) == 0 || v.updateSchedule[0].id != id {
-		return
-	}
-	v.setLimitsNow(v.maxPeers, v.updateSchedule[0].totalCap)
-	v.updateSchedule = v.updateSchedule[1:]
-	if len(v.updateSchedule) != 0 {
-		v.scheduleCounter++
-		id := v.scheduleCounter
-		v.updateSchedule[0].id = id
-		dt := time.Duration(v.updateSchedule[0].time - mclock.Now())
-		time.AfterFunc(dt, func() { v.checkUpdate(id) })
-	}
-}
-
-// setLimits updates the allowed peer count and total capacity immediately
-func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) {
-	if v.priorityCount > count || v.totalConnectedCap > totalCap {
-		for id, c := range v.clients {
-			if c.connected {
-				v.logger.Event(fmt.Sprintf("priorityClientPool: setLimitsNow kicked out, %x", id.Bytes()))
-				c.connected = false
-				v.totalConnectedCap -= c.cap
-				v.logTotalPriConn.Update(float64(v.totalConnectedCap))
-				v.priorityCount--
-				v.clients[id] = c
-				go v.ps.Unregister(c.peer.id)
-				if v.priorityCount <= count && v.totalConnectedCap <= totalCap {
-					break
-				}
-			}
-		}
-	}
-	v.maxPeers = count
-	v.totalCap = totalCap
-	if v.child != nil {
-		v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
-	}
-}
-
-// totalCapacity queries total available capacity for all clients
-func (v *priorityClientPool) totalCapacity() uint64 {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	return v.totalCapAnnounced
-}
-
-// subscribeTotalCapacity subscribes to changed total capacity events
-func (v *priorityClientPool) subscribeTotalCapacity(sub *tcSubscription) {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	v.subs[sub] = struct{}{}
-}
-
-// setClientCapacity sets the priority capacity assigned to a given client
-func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error {
-	v.lock.Lock()
-	defer v.lock.Unlock()
-
-	c := v.clients[id]
-	if c.cap == cap {
-		return nil
-	}
-	if c.connected {
-		if v.totalConnectedCap+cap > v.totalCap+c.cap {
-			return ErrTotalCap
-		}
-		if c.cap == 0 {
-			if v.child != nil {
-				v.child.unregisterPeer(c.peer)
-			}
-			v.priorityCount++
-		}
-		if cap == 0 {
-			v.priorityCount--
-		}
-		v.totalConnectedCap += cap - c.cap
-		v.logTotalPriConn.Update(float64(v.totalConnectedCap))
-		if v.child != nil {
-			v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap)
-		}
-		if cap == 0 {
-			if v.child != nil {
-				v.child.registerPeer(c.peer)
-			}
-			c.peer.updateCapacity(v.freeClientCap)
-		} else {
-			c.peer.updateCapacity(cap)
-		}
-	}
-	if cap != 0 || c.connected {
-		c.cap = cap
-		v.clients[id] = c
-	} else {
-		delete(v.clients, id)
-	}
-	if c.connected {
-		v.logger.Event(fmt.Sprintf("priorityClientPool: changed capacity to %d, %x", cap, id.Bytes()))
-	}
-	return nil
-}
-
-// Benchmark runs a request performance benchmark with a given set of measurement setups
-// in multiple passes specified by passCount. The measurement time for each setup in each
-// pass is specified in milliseconds by length.
-//
-// Note: measurement time is adjusted for each pass depending on the previous ones.
-// Therefore a controlled total measurement time is achievable in multiple passes.
-func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) {
-	benchmarks := make([]requestBenchmark, len(setups))
-	for i, setup := range setups {
-		if t, ok := setup["type"].(string); ok {
-			getInt := func(field string, def int) int {
-				if value, ok := setup[field].(float64); ok {
-					return int(value)
-				}
-				return def
-			}
-			getBool := func(field string, def bool) bool {
-				if value, ok := setup[field].(bool); ok {
-					return value
-				}
-				return def
-			}
-			switch t {
-			case "header":
-				benchmarks[i] = &benchmarkBlockHeaders{
-					amount:  getInt("amount", 1),
-					skip:    getInt("skip", 1),
-					byHash:  getBool("byHash", false),
-					reverse: getBool("reverse", false),
-				}
-			case "body":
-				benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false}
-			case "receipts":
-				benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true}
-			case "proof":
-				benchmarks[i] = &benchmarkProofsOrCode{code: false}
-			case "code":
-				benchmarks[i] = &benchmarkProofsOrCode{code: true}
-			case "cht":
-				benchmarks[i] = &benchmarkHelperTrie{
-					bloom:    false,
-					reqCount: getInt("amount", 1),
-				}
-			case "bloom":
-				benchmarks[i] = &benchmarkHelperTrie{
-					bloom:    true,
-					reqCount: getInt("amount", 1),
-				}
-			case "txSend":
-				benchmarks[i] = &benchmarkTxSend{}
-			case "txStatus":
-				benchmarks[i] = &benchmarkTxStatus{}
-			default:
-				return nil, ErrUnknownBenchmarkType
-			}
-		} else {
-			return nil, ErrUnknownBenchmarkType
-		}
-	}
-	rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
-	result := make([]map[string]interface{}, len(setups))
-	for i, r := range rs {
-		res := make(map[string]interface{})
-		if r.err == nil {
-			res["totalCount"] = r.totalCount
-			res["avgTime"] = r.avgTime
-			res["maxInSize"] = r.maxInSize
-			res["maxOutSize"] = r.maxOutSize
-		} else {
-			res["error"] = r.err.Error()
-		}
-		result[i] = res
-	}
-	return result, nil
-}
-
 // PrivateLightAPI provides an API to access the LES light server or light client.
 type PrivateLightAPI struct {
 	backend *lesCommons
@@ -498,7 +52,7 @@ func (api *PrivateLightAPI) LatestCheckpoint() ([4]string, error) {
 	var res [4]string
 	cp := api.backend.latestLocalCheckpoint()
 	if cp.Empty() {
-		return res, ErrNoCheckpoint
+		return res, errNoCheckpoint
 	}
 	res[0] = hexutil.EncodeUint64(cp.SectionIndex)
 	res[1], res[2], res[3] = cp.SectionHead.Hex(), cp.CHTRoot.Hex(), cp.BloomRoot.Hex()
@@ -515,7 +69,7 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {
 	var res [3]string
 	cp := api.backend.getLocalCheckpoint(index)
 	if cp.Empty() {
-		return res, ErrNoCheckpoint
+		return res, errNoCheckpoint
 	}
 	res[0], res[1], res[2] = cp.SectionHead.Hex(), cp.CHTRoot.Hex(), cp.BloomRoot.Hex()
 	return res, nil
@@ -524,7 +78,7 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {
 // GetCheckpointContractAddress returns the contract contract address in hex format.
 func (api *PrivateLightAPI) GetCheckpointContractAddress() (string, error) {
 	if api.reg == nil {
-		return "", ErrNotActivated
+		return "", errNotActivated
 	}
 	return api.reg.config.Address.Hex(), nil
 }
diff --git a/les/costtracker.go b/les/costtracker.go
index e463c9f8b..2d9c95af7 100644
--- a/les/costtracker.go
+++ b/les/costtracker.go
@@ -18,7 +18,6 @@ package les
 
 import (
 	"encoding/binary"
-	"fmt"
 	"math"
 	"sync"
 	"sync/atomic"
@@ -27,7 +26,6 @@ import (
 	"github.com/ethereum/go-ethereum/common/mclock"
 	"github.com/ethereum/go-ethereum/eth"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/les/csvlogger"
 	"github.com/ethereum/go-ethereum/les/flowcontrol"
 	"github.com/ethereum/go-ethereum/log"
 )
@@ -96,40 +94,50 @@ const (
 // as the number of cost units per nanosecond of serving time in a single thread.
 // It is based on statistics collected during serving requests in high-load periods
 // and practically acts as a one-dimension request price scaling factor over the
-// pre-defined cost estimate table. Instead of scaling the cost values, the real
-// value of cost units is changed by applying the factor to the serving times. This
-// is more convenient because the changes in the cost factor can be applied immediately
-// without always notifying the clients about the changed cost tables.
+// pre-defined cost estimate table.
+//
+// The reason for dynamically maintaining the global factor on the server side is:
+// the estimated time cost of the request is fixed(hardcoded) but the configuration
+// of the machine running the server is really different. Therefore, the request serving
+// time in different machine will vary greatly. And also, the request serving time
+// in same machine may vary greatly with different request pressure.
+//
+// In order to more effectively limit resources, we apply the global factor to serving
+// time to make the result as close as possible to the estimated time cost no matter
+// the server is slow or fast. And also we scale the totalRecharge with global factor
+// so that fast server can serve more requests than estimation and slow server can
+// reduce request pressure.
+//
+// Instead of scaling the cost values, the real value of cost units is changed by
+// applying the factor to the serving times. This is more convenient because the
+// changes in the cost factor can be applied immediately without always notifying
+// the clients about the changed cost tables.
 type costTracker struct {
 	db     ethdb.Database
 	stopCh chan chan struct{}
 
-	inSizeFactor, outSizeFactor float64
-	gf, utilTarget              float64
-	minBufLimit                 uint64
+	inSizeFactor  float64
+	outSizeFactor float64
+	factor        float64
+	utilTarget    float64
+	minBufLimit   uint64
 
-	gfUpdateCh      chan gfUpdate
 	gfLock          sync.RWMutex
+	reqInfoCh       chan reqInfo
 	totalRechargeCh chan uint64
 
-	stats                                                     map[uint64][]uint64
-	logger                                                    *csvlogger.Logger
-	logRecentTime, logRecentAvg, logTotalRecharge, logRelCost *csvlogger.Channel
+	stats map[uint64][]uint64 // Used for testing purpose.
 }
 
 // newCostTracker creates a cost tracker and loads the cost factor statistics from the database.
 // It also returns the minimum capacity that can be assigned to any peer.
-func newCostTracker(db ethdb.Database, config *eth.Config, logger *csvlogger.Logger) (*costTracker, uint64) {
+func newCostTracker(db ethdb.Database, config *eth.Config) (*costTracker, uint64) {
 	utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100
 	ct := &costTracker{
-		db:               db,
-		stopCh:           make(chan chan struct{}),
-		utilTarget:       utilTarget,
-		logger:           logger,
-		logRelCost:       logger.NewMinMaxChannel("relativeCost", true),
-		logRecentTime:    logger.NewMinMaxChannel("recentTime", true),
-		logRecentAvg:     logger.NewMinMaxChannel("recentAvg", true),
-		logTotalRecharge: logger.NewChannel("totalRecharge", 0.01),
+		db:         db,
+		stopCh:     make(chan chan struct{}),
+		reqInfoCh:  make(chan reqInfo, 100),
+		utilTarget: utilTarget,
 	}
 	if config.LightBandwidthIn > 0 {
 		ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn)
@@ -204,8 +212,15 @@ func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList {
 	return list
 }
 
-type gfUpdate struct {
-	avgTimeCost, servingTime float64
+// reqInfo contains the estimated time cost and the actual request serving time
+// which acts as a feed source to update factor maintained by costTracker.
+type reqInfo struct {
+	// avgTimeCost is the estimated time cost corresponding to maxCostTable.
+	avgTimeCost float64
+
+	// servingTime is the CPU time corresponding to the actual processing of
+	// the request.
+	servingTime float64
 }
 
 // gfLoop starts an event loop which updates the global cost factor which is
@@ -218,43 +233,48 @@ type gfUpdate struct {
 // total allowed serving time per second but nominated in cost units, should
 // also be scaled with the cost factor and is also updated by this loop.
 func (ct *costTracker) gfLoop() {
-	var gfLog, recentTime, recentAvg float64
-	lastUpdate := mclock.Now()
-	expUpdate := lastUpdate
+	var (
+		factor, totalRecharge        float64
+		gfLog, recentTime, recentAvg float64
+
+		lastUpdate, expUpdate = mclock.Now(), mclock.Now()
+	)
 
+	// Load historical cost factor statistics from the database.
 	data, _ := ct.db.Get([]byte(gfDbKey))
 	if len(data) == 8 {
 		gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:]))
 	}
-	gf := math.Exp(gfLog)
-	ct.gf = gf
-	totalRecharge := ct.utilTarget * gf
-	ct.gfUpdateCh = make(chan gfUpdate, 100)
-	threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / 1000000
+	ct.factor = math.Exp(gfLog)
+	factor, totalRecharge = ct.factor, ct.utilTarget*ct.factor
+
+	// In order to perform factor data statistics under the high request pressure,
+	// we only adjust factor when recent factor usage beyond the threshold.
+	threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / flowcontrol.FixedPointMultiplier
 
 	go func() {
 		saveCostFactor := func() {
 			var data [8]byte
 			binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog))
 			ct.db.Put([]byte(gfDbKey), data[:])
-			log.Debug("global cost factor saved", "value", gf)
+			log.Debug("global cost factor saved", "value", factor)
 		}
 		saveTicker := time.NewTicker(time.Minute * 10)
 
 		for {
 			select {
-			case r := <-ct.gfUpdateCh:
+			case r := <-ct.reqInfoCh:
+				requestServedMeter.Mark(int64(r.servingTime))
+				requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
+				requestServedTimer.Update(time.Duration(r.servingTime))
+				relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))
+
 				now := mclock.Now()
-				if ct.logRelCost != nil && r.avgTimeCost > 1e-20 {
-					ct.logRelCost.Update(r.servingTime * gf / r.avgTimeCost)
-				}
-				if r.servingTime > 1000000000 {
-					ct.logger.Event(fmt.Sprintf("Very long servingTime = %f  avgTimeCost = %f  costFactor = %f", r.servingTime, r.avgTimeCost, gf))
-				}
 				dt := float64(now - expUpdate)
 				expUpdate = now
 				exp := math.Exp(-dt / float64(gfUsageTC))
-				// calculate gf correction until now, based on previous values
+
+				// calculate factor correction until now, based on previous values
 				var gfCorr float64
 				max := recentTime
 				if recentAvg > max {
@@ -268,27 +288,28 @@ func (ct *costTracker) gfLoop() {
 					} else {
 						gfCorr = math.Log(max/threshold) * float64(gfUsageTC)
 					}
-					// calculate log(gf) correction with the right direction and time constant
+					// calculate log(factor) correction with the right direction and time constant
 					if recentTime > recentAvg {
-						// drop gf if actual serving times are larger than average estimates
+						// drop factor if actual serving times are larger than average estimates
 						gfCorr /= -float64(gfDropTC)
 					} else {
-						// raise gf if actual serving times are smaller than average estimates
+						// raise factor if actual serving times are smaller than average estimates
 						gfCorr /= float64(gfRaiseTC)
 					}
 				}
 				// update recent cost values with current request
 				recentTime = recentTime*exp + r.servingTime
-				recentAvg = recentAvg*exp + r.avgTimeCost/gf
+				recentAvg = recentAvg*exp + r.avgTimeCost/factor
 
 				if gfCorr != 0 {
+					// Apply the correction to factor
 					gfLog += gfCorr
-					gf = math.Exp(gfLog)
+					factor = math.Exp(gfLog)
+					// Notify outside modules the new factor and totalRecharge.
 					if time.Duration(now-lastUpdate) > time.Second {
-						totalRecharge = ct.utilTarget * gf
-						lastUpdate = now
+						totalRecharge, lastUpdate = ct.utilTarget*factor, now
 						ct.gfLock.Lock()
-						ct.gf = gf
+						ct.factor = factor
 						ch := ct.totalRechargeCh
 						ct.gfLock.Unlock()
 						if ch != nil {
@@ -297,12 +318,12 @@ func (ct *costTracker) gfLoop() {
 							default:
 							}
 						}
-						log.Debug("global cost factor updated", "gf", gf)
+						log.Debug("global cost factor updated", "factor", factor)
 					}
 				}
-				ct.logRecentTime.Update(recentTime)
-				ct.logRecentAvg.Update(recentAvg)
-				ct.logTotalRecharge.Update(totalRecharge)
+				recentServedGauge.Update(int64(recentTime))
+				recentEstimatedGauge.Update(int64(recentAvg))
+				totalRechargeGauge.Update(int64(totalRecharge))
 
 			case <-saveTicker.C:
 				saveCostFactor()
@@ -321,7 +342,7 @@ func (ct *costTracker) globalFactor() float64 {
 	ct.gfLock.RLock()
 	defer ct.gfLock.RUnlock()
 
-	return ct.gf
+	return ct.factor
 }
 
 // totalRecharge returns the current total recharge parameter which is used by
@@ -330,7 +351,7 @@ func (ct *costTracker) totalRecharge() uint64 {
 	ct.gfLock.RLock()
 	defer ct.gfLock.RUnlock()
 
-	return uint64(ct.gf * ct.utilTarget)
+	return uint64(ct.factor * ct.utilTarget)
 }
 
 // subscribeTotalRecharge returns all future updates to the total recharge value
@@ -340,7 +361,7 @@ func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 {
 	defer ct.gfLock.Unlock()
 
 	ct.totalRechargeCh = ch
-	return uint64(ct.gf * ct.utilTarget)
+	return uint64(ct.factor * ct.utilTarget)
 }
 
 // updateStats updates the global cost factor and (if enabled) the real cost vs.
@@ -349,7 +370,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
 	avg := reqAvgTimeCost[code]
 	avgTimeCost := avg.baseCost + amount*avg.reqCost
 	select {
-	case ct.gfUpdateCh <- gfUpdate{float64(avgTimeCost), float64(servingTime)}:
+	case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}:
 	default:
 	}
 	if makeCostStats {
diff --git a/les/csvlogger/csvlogger.go b/les/csvlogger/csvlogger.go
deleted file mode 100644
index 9a4093cb9..000000000
--- a/les/csvlogger/csvlogger.go
+++ /dev/null
@@ -1,227 +0,0 @@
-// Copyright 2019 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package csvlogger
-
-import (
-	"fmt"
-	"os"
-	"sync"
-	"time"
-
-	"github.com/ethereum/go-ethereum/common/mclock"
-	"github.com/ethereum/go-ethereum/log"
-)
-
-// Logger is a metrics/events logger that writes logged values and events into a comma separated file
-type Logger struct {
-	file            *os.File
-	started         mclock.AbsTime
-	channels        []*Channel
-	period          time.Duration
-	stopCh, stopped chan struct{}
-	storeCh         chan string
-	eventHeader     string
-}
-
-// NewLogger creates a new Logger
-func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger {
-	if fileName == "" {
-		return nil
-	}
-	f, err := os.Create(fileName)
-	if err != nil {
-		log.Error("Error creating log file", "name", fileName, "error", err)
-		return nil
-	}
-	return &Logger{
-		file:        f,
-		period:      updatePeriod,
-		stopCh:      make(chan struct{}),
-		storeCh:     make(chan string, 1),
-		eventHeader: eventHeader,
-	}
-}
-
-// NewChannel creates a new value logger channel that writes values in a single
-// column. If the relative change of the value is bigger than the given threshold
-// then a new line is added immediately (threshold can also be 0).
-func (l *Logger) NewChannel(name string, threshold float64) *Channel {
-	if l == nil {
-		return nil
-	}
-	c := &Channel{
-		logger:    l,
-		name:      name,
-		threshold: threshold,
-	}
-	l.channels = append(l.channels, c)
-	return c
-}
-
-// NewMinMaxChannel creates a new value logger channel that writes the minimum and
-// maximum of the tracked value in two columns. It never triggers adding a new line.
-// If zeroDefault is true then 0 is written to both min and max columns if no update
-// was given during the last period. If it is false then the last update will appear
-// in both columns.
-func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel {
-	if l == nil {
-		return nil
-	}
-	c := &Channel{
-		logger:        l,
-		name:          name,
-		minmax:        true,
-		mmZeroDefault: zeroDefault,
-	}
-	l.channels = append(l.channels, c)
-	return c
-}
-
-func (l *Logger) store(event string) {
-	s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000)
-	for _, ch := range l.channels {
-		s += ", " + ch.store()
-	}
-	if event != "" {
-		s += ", " + event
-	}
-	l.file.WriteString(s + "\n")
-}
-
-// Start writes the header line and starts the logger
-func (l *Logger) Start() {
-	if l == nil {
-		return
-	}
-	l.started = mclock.Now()
-	s := "Time"
-	for _, ch := range l.channels {
-		s += ", " + ch.header()
-	}
-	if l.eventHeader != "" {
-		s += ", " + l.eventHeader
-	}
-	l.file.WriteString(s + "\n")
-	go func() {
-		timer := time.NewTimer(l.period)
-		for {
-			select {
-			case <-timer.C:
-				l.store("")
-				timer.Reset(l.period)
-			case event := <-l.storeCh:
-				l.store(event)
-				if !timer.Stop() {
-					<-timer.C
-				}
-				timer.Reset(l.period)
-			case <-l.stopCh:
-				close(l.stopped)
-				return
-			}
-		}
-	}()
-}
-
-// Stop stops the logger and closes the file
-func (l *Logger) Stop() {
-	if l == nil {
-		return
-	}
-	l.stopped = make(chan struct{})
-	close(l.stopCh)
-	<-l.stopped
-	l.file.Close()
-}
-
-// Event immediately adds a new line and adds the given event string in the last column
-func (l *Logger) Event(event string) {
-	if l == nil {
-		return
-	}
-	select {
-	case l.storeCh <- event:
-	case <-l.stopCh:
-	}
-}
-
-// Channel represents a logger channel tracking a single value
-type Channel struct {
-	logger                                             *Logger
-	lock                                               sync.Mutex
-	name                                               string
-	threshold, storeMin, storeMax, lastValue, min, max float64
-	minmax, mmSet, mmZeroDefault                       bool
-}
-
-// Update updates the tracked value
-func (lc *Channel) Update(value float64) {
-	if lc == nil {
-		return
-	}
-	lc.lock.Lock()
-	defer lc.lock.Unlock()
-
-	lc.lastValue = value
-	if lc.minmax {
-		if value > lc.max || !lc.mmSet {
-			lc.max = value
-		}
-		if value < lc.min || !lc.mmSet {
-			lc.min = value
-		}
-		lc.mmSet = true
-	} else {
-		if value < lc.storeMin || value > lc.storeMax {
-			select {
-			case lc.logger.storeCh <- "":
-			default:
-			}
-		}
-	}
-}
-
-func (lc *Channel) store() (s string) {
-	lc.lock.Lock()
-	defer lc.lock.Unlock()
-
-	if lc.minmax {
-		s = fmt.Sprintf("%g, %g", lc.min, lc.max)
-		lc.mmSet = false
-		if lc.mmZeroDefault {
-			lc.min = 0
-		} else {
-			lc.min = lc.lastValue
-		}
-		lc.max = lc.min
-	} else {
-		s = fmt.Sprintf("%g", lc.lastValue)
-		lc.storeMin = lc.lastValue * (1 - lc.threshold)
-		lc.storeMax = lc.lastValue * (1 + lc.threshold)
-		if lc.lastValue < 0 {
-			lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin
-		}
-	}
-	return
-}
-
-func (lc *Channel) header() string {
-	if lc.minmax {
-		return lc.name + " (min), " + lc.name + " (max)"
-	}
-	return lc.name
-}
diff --git a/les/freeclient.go b/les/freeclient.go
index f434ea0b9..934b88153 100644
--- a/les/freeclient.go
+++ b/les/freeclient.go
@@ -26,7 +26,6 @@ import (
 	"github.com/ethereum/go-ethereum/common/mclock"
 	"github.com/ethereum/go-ethereum/common/prque"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/les/csvlogger"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/rlp"
 )
@@ -53,8 +52,7 @@ type freeClientPool struct {
 
 	connectedLimit, totalLimit int
 	freeClientCap              uint64
-	logger                     *csvlogger.Logger
-	logTotalFreeConn           *csvlogger.Channel
+	connectedCap               uint64
 
 	addressMap            map[string]*freeClientPoolEntry
 	connPool, disconnPool *prque.Prque
@@ -69,18 +67,16 @@ const (
 )
 
 // newFreeClientPool creates a new free client pool
-func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string), metricsLogger, eventLogger *csvlogger.Logger) *freeClientPool {
+func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool {
 	pool := &freeClientPool{
-		db:               db,
-		clock:            clock,
-		addressMap:       make(map[string]*freeClientPoolEntry),
-		connPool:         prque.New(poolSetIndex),
-		disconnPool:      prque.New(poolSetIndex),
-		freeClientCap:    freeClientCap,
-		totalLimit:       totalLimit,
-		logger:           eventLogger,
-		logTotalFreeConn: metricsLogger.NewChannel("totalFreeConn", 0),
-		removePeer:       removePeer,
+		db:            db,
+		clock:         clock,
+		addressMap:    make(map[string]*freeClientPoolEntry),
+		connPool:      prque.New(poolSetIndex),
+		disconnPool:   prque.New(poolSetIndex),
+		freeClientCap: freeClientCap,
+		totalLimit:    totalLimit,
+		removePeer:    removePeer,
 	}
 	pool.loadFromDb()
 	return pool
@@ -126,10 +122,7 @@ func (f *freeClientPool) connect(address, id string) bool {
 	if f.closed {
 		return false
 	}
-
-	f.logger.Event("freeClientPool: connecting from " + address + ", " + id)
 	if f.connectedLimit == 0 {
-		f.logger.Event("freeClientPool: rejected, " + id)
 		log.Debug("Client rejected", "address", address)
 		return false
 	}
@@ -141,7 +134,6 @@ func (f *freeClientPool) connect(address, id string) bool {
 		f.addressMap[address] = e
 	} else {
 		if e.connected {
-			f.logger.Event("freeClientPool: already connected, " + id)
 			log.Debug("Client already connected", "address", address)
 			return false
 		}
@@ -154,12 +146,13 @@ func (f *freeClientPool) connect(address, id string) bool {
 		if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
 			// kick it out and accept the new client
 			f.dropClient(i, now)
-			f.logger.Event("freeClientPool: kicked out, " + i.id)
+			clientKickedMeter.Mark(1)
+			f.connectedCap -= f.freeClientCap
 		} else {
 			// keep the old client and reject the new one
 			f.connPool.Push(i, i.linUsage)
-			f.logger.Event("freeClientPool: rejected, " + id)
 			log.Debug("Client rejected", "address", address)
+			clientRejectedMeter.Mark(1)
 			return false
 		}
 	}
@@ -167,11 +160,12 @@ func (f *freeClientPool) connect(address, id string) bool {
 	e.connected = true
 	e.id = id
 	f.connPool.Push(e, e.linUsage)
-	f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
 	if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
 		f.disconnPool.Pop()
 	}
-	f.logger.Event("freeClientPool: accepted, " + id)
+	f.connectedCap += f.freeClientCap
+	totalConnectedGauge.Update(int64(f.connectedCap))
+	clientConnectedMeter.Mark(1)
 	log.Debug("Client accepted", "address", address)
 	return true
 }
@@ -203,13 +197,12 @@ func (f *freeClientPool) disconnect(address string) {
 		log.Debug("Client already disconnected", "address", address)
 		return
 	}
-
 	f.connPool.Remove(e.index)
-	f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
 	f.calcLogUsage(e, now)
 	e.connected = false
 	f.disconnPool.Push(e, -e.logUsage)
-	f.logger.Event("freeClientPool: disconnected, " + e.id)
+	f.connectedCap -= f.freeClientCap
+	totalConnectedGauge.Update(int64(f.connectedCap))
 	log.Debug("Client disconnected", "address", address)
 }
 
@@ -227,15 +220,15 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) {
 	for f.connPool.Size() > f.connectedLimit {
 		i := f.connPool.PopItem().(*freeClientPoolEntry)
 		f.dropClient(i, now)
-		f.logger.Event("freeClientPool: setLimits kicked out, " + i.id)
+		f.connectedCap -= f.freeClientCap
 	}
+	totalConnectedGauge.Update(int64(f.connectedCap))
 }
 
 // dropClient disconnects a client and also moves it from the connected to the
 // disconnected pool
 func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) {
 	f.connPool.Remove(i.index)
-	f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
 	f.calcLogUsage(i, now)
 	i.connected = false
 	f.disconnPool.Push(i, -i.logUsage)
diff --git a/les/freeclient_test.go b/les/freeclient_test.go
index 5a58a6c1c..191822264 100644
--- a/les/freeclient_test.go
+++ b/les/freeclient_test.go
@@ -61,7 +61,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
 			}
 			disconnCh <- i
 		}
-		pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil)
+		pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
 	)
 	pool.setLimits(connLimit, uint64(connLimit))
 
@@ -130,7 +130,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
 
 	// close and restart pool
 	pool.stop()
-	pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil)
+	pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
 	pool.setLimits(connLimit, uint64(connLimit))
 
 	// try connecting all known peers (connLimit should be filled up)
diff --git a/les/handler.go b/les/handler.go
index c902db65a..d9d07f014 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -35,7 +35,6 @@ import (
 	"github.com/ethereum/go-ethereum/eth/downloader"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/event"
-	"github.com/ethereum/go-ethereum/les/csvlogger"
 	"github.com/ethereum/go-ethereum/light"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p"
@@ -124,7 +123,6 @@ type ProtocolManager struct {
 
 	wg       *sync.WaitGroup
 	eventMux *event.TypeMux
-	logger   *csvlogger.Logger
 
 	// Callbacks
 	synced func() bool
@@ -262,11 +260,12 @@ func (pm *ProtocolManager) handle(p *peer) error {
 	// Ignore maxPeers if this is a trusted peer
 	// In server mode we try to check into the client pool after handshake
 	if pm.client && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
-		pm.logger.Event("Rejected (too many peers), " + p.id)
+		clientRejectedMeter.Mark(1)
 		return p2p.DiscTooManyPeers
 	}
 	// Reject light clients if server is not synced.
 	if !pm.client && !pm.synced() {
+		clientRejectedMeter.Mark(1)
 		return p2p.DiscRequested
 	}
 	p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
@@ -281,7 +280,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
 	)
 	if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
 		p.Log().Debug("Light Ethereum handshake failed", "err", err)
-		pm.logger.Event("Handshake error: " + err.Error() + ", " + p.id)
+		clientErrorMeter.Mark(1)
 		return err
 	}
 	if p.fcClient != nil {
@@ -294,14 +293,14 @@ func (pm *ProtocolManager) handle(p *peer) error {
 
 	// Register the peer locally
 	if err := pm.peers.Register(p); err != nil {
+		clientErrorMeter.Mark(1)
 		p.Log().Error("Light Ethereum peer registration failed", "err", err)
-		pm.logger.Event("Peer registration error: " + err.Error() + ", " + p.id)
 		return err
 	}
-	pm.logger.Event("Connection established, " + p.id)
+	connectedAt := time.Now()
 	defer func() {
-		pm.logger.Event("Closed connection, " + p.id)
 		pm.removePeer(p.id)
+		connectionTimer.UpdateSince(connectedAt)
 	}()
 
 	// Register the peer in the downloader. If the downloader considers it banned, we disconnect
@@ -317,11 +316,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
 			pm.serverPool.registered(p.poolEntry)
 		}
 	}
-
 	// main loop. handle incoming messages.
 	for {
 		if err := pm.handleMsg(p); err != nil {
-			pm.logger.Event("Message handling error: " + err.Error() + ", " + p.id)
 			p.Log().Debug("Light Ethereum message handling failed", "err", err)
 			if p.fcServer != nil {
 				p.fcServer.DumpLogs()
diff --git a/les/helper_test.go b/les/helper_test.go
index 035865b08..fd5236a99 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -231,7 +231,7 @@ func newTestProtocolManager(lightSync bool, blocks int, odr *LesOdr, indexers []
 	if !lightSync {
 		srv := &LesServer{lesCommons: lesCommons{protocolManager: pm, chainDb: db}}
 		pm.server = srv
-		pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1, nil)
+		pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1)
 		pm.servingQueue.setThreads(4)
 
 		srv.defParams = flowcontrol.ServerParams{
diff --git a/les/metrics.go b/les/metrics.go
index c282a62a1..4c6737a4e 100644
--- a/les/metrics.go
+++ b/les/metrics.go
@@ -22,46 +22,31 @@ import (
 )
 
 var (
-	/*	propTxnInPacketsMeter     = metrics.NewMeter("eth/prop/txns/in/packets")
-		propTxnInTrafficMeter     = metrics.NewMeter("eth/prop/txns/in/traffic")
-		propTxnOutPacketsMeter    = metrics.NewMeter("eth/prop/txns/out/packets")
-		propTxnOutTrafficMeter    = metrics.NewMeter("eth/prop/txns/out/traffic")
-		propHashInPacketsMeter    = metrics.NewMeter("eth/prop/hashes/in/packets")
-		propHashInTrafficMeter    = metrics.NewMeter("eth/prop/hashes/in/traffic")
-		propHashOutPacketsMeter   = metrics.NewMeter("eth/prop/hashes/out/packets")
-		propHashOutTrafficMeter   = metrics.NewMeter("eth/prop/hashes/out/traffic")
-		propBlockInPacketsMeter   = metrics.NewMeter("eth/prop/blocks/in/packets")
-		propBlockInTrafficMeter   = metrics.NewMeter("eth/prop/blocks/in/traffic")
-		propBlockOutPacketsMeter  = metrics.NewMeter("eth/prop/blocks/out/packets")
-		propBlockOutTrafficMeter  = metrics.NewMeter("eth/prop/blocks/out/traffic")
-		reqHashInPacketsMeter     = metrics.NewMeter("eth/req/hashes/in/packets")
-		reqHashInTrafficMeter     = metrics.NewMeter("eth/req/hashes/in/traffic")
-		reqHashOutPacketsMeter    = metrics.NewMeter("eth/req/hashes/out/packets")
-		reqHashOutTrafficMeter    = metrics.NewMeter("eth/req/hashes/out/traffic")
-		reqBlockInPacketsMeter    = metrics.NewMeter("eth/req/blocks/in/packets")
-		reqBlockInTrafficMeter    = metrics.NewMeter("eth/req/blocks/in/traffic")
-		reqBlockOutPacketsMeter   = metrics.NewMeter("eth/req/blocks/out/packets")
-		reqBlockOutTrafficMeter   = metrics.NewMeter("eth/req/blocks/out/traffic")
-		reqHeaderInPacketsMeter   = metrics.NewMeter("eth/req/headers/in/packets")
-		reqHeaderInTrafficMeter   = metrics.NewMeter("eth/req/headers/in/traffic")
-		reqHeaderOutPacketsMeter  = metrics.NewMeter("eth/req/headers/out/packets")
-		reqHeaderOutTrafficMeter  = metrics.NewMeter("eth/req/headers/out/traffic")
-		reqBodyInPacketsMeter     = metrics.NewMeter("eth/req/bodies/in/packets")
-		reqBodyInTrafficMeter     = metrics.NewMeter("eth/req/bodies/in/traffic")
-		reqBodyOutPacketsMeter    = metrics.NewMeter("eth/req/bodies/out/packets")
-		reqBodyOutTrafficMeter    = metrics.NewMeter("eth/req/bodies/out/traffic")
-		reqStateInPacketsMeter    = metrics.NewMeter("eth/req/states/in/packets")
-		reqStateInTrafficMeter    = metrics.NewMeter("eth/req/states/in/traffic")
-		reqStateOutPacketsMeter   = metrics.NewMeter("eth/req/states/out/packets")
-		reqStateOutTrafficMeter   = metrics.NewMeter("eth/req/states/out/traffic")
-		reqReceiptInPacketsMeter  = metrics.NewMeter("eth/req/receipts/in/packets")
-		reqReceiptInTrafficMeter  = metrics.NewMeter("eth/req/receipts/in/traffic")
-		reqReceiptOutPacketsMeter = metrics.NewMeter("eth/req/receipts/out/packets")
-		reqReceiptOutTrafficMeter = metrics.NewMeter("eth/req/receipts/out/traffic")*/
 	miscInPacketsMeter  = metrics.NewRegisteredMeter("les/misc/in/packets", nil)
 	miscInTrafficMeter  = metrics.NewRegisteredMeter("les/misc/in/traffic", nil)
 	miscOutPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets", nil)
 	miscOutTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic", nil)
+
+	connectionTimer = metrics.NewRegisteredTimer("les/connectionTime", nil)
+
+	totalConnectedGauge   = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
+	totalCapacityGauge    = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
+	totalRechargeGauge    = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
+	blockProcessingTimer  = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
+	requestServedTimer    = metrics.NewRegisteredTimer("les/server/requestServed", nil)
+	requestServedMeter    = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil)
+	requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil)
+	relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015))
+	recentServedGauge     = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
+	recentEstimatedGauge  = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
+	sqServedGauge         = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
+	sqQueuedGauge         = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil)
+	clientConnectedMeter  = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil)
+	clientRejectedMeter   = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil)
+	clientKickedMeter     = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil)
+	// clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil)
+	clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
+	clientErrorMeter  = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
 )
 
 // meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
diff --git a/les/server.go b/les/server.go
index 08d973416..86570aa54 100644
--- a/les/server.go
+++ b/les/server.go
@@ -28,7 +28,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/rawdb"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/eth"
-	"github.com/ethereum/go-ethereum/les/csvlogger"
 	"github.com/ethereum/go-ethereum/les/flowcontrol"
 	"github.com/ethereum/go-ethereum/light"
 	"github.com/ethereum/go-ethereum/log"
@@ -40,15 +39,6 @@ import (
 
 const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio
 
-const (
-	logFileName          = ""    // csv log file name (disabled if empty)
-	logClientPoolMetrics = true  // log client pool metrics
-	logClientPoolEvents  = false // detailed client pool event logging
-	logRequestServing    = true  // log request serving metrics and events
-	logBlockProcEvents   = true  // log block processing events
-	logProtocolHandler   = true  // log protocol handler events
-)
-
 type LesServer struct {
 	lesCommons
 
@@ -62,26 +52,15 @@ type LesServer struct {
 	privateKey   *ecdsa.PrivateKey
 	quitSync     chan struct{}
 	onlyAnnounce bool
-	csvLogger    *csvlogger.Logger
-	logTotalCap  *csvlogger.Channel
 
 	thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode
 
 	maxPeers                   int
 	minCapacity, freeClientCap uint64
 	freeClientPool             *freeClientPool
-	priorityClientPool         *priorityClientPool
 }
 
 func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
-	var csvLogger *csvlogger.Logger
-	if logFileName != "" {
-		csvLogger = csvlogger.NewLogger(logFileName, time.Second*10, "event, peerId")
-	}
-	requestLogger := csvLogger
-	if !logRequestServing {
-		requestLogger = nil
-	}
 	lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
 	for i, pv := range AdvertiseProtocolVersions {
 		lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
@@ -99,10 +78,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
 		quitSync:     quitSync,
 		lesTopics:    lesTopics,
 		onlyAnnounce: config.OnlyAnnounce,
-		csvLogger:    csvLogger,
-		logTotalCap:  requestLogger.NewChannel("totalCapacity", 0.01),
 	}
-	srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config, requestLogger)
+	srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
 
 	logger := log.New()
 	srv.thcNormal = config.LightServ * 4 / 100
@@ -131,10 +108,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
 		return nil, err
 	}
 	srv.protocolManager = pm
-	if logProtocolHandler {
-		pm.logger = csvLogger
-	}
-	pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100, requestLogger)
+	pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100)
 	pm.server = srv
 
 	return srv, nil
@@ -142,12 +116,6 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
 
 func (s *LesServer) APIs() []rpc.API {
 	return []rpc.API{
-		{
-			Namespace: "les",
-			Version:   "1.0",
-			Service:   NewPrivateLightServerAPI(s),
-			Public:    false,
-		},
 		{
 			Namespace: "les",
 			Version:   "1.0",
@@ -163,11 +131,10 @@ func (s *LesServer) APIs() []rpc.API {
 func (s *LesServer) startEventLoop() {
 	s.protocolManager.wg.Add(1)
 
-	blockProcLogger := s.csvLogger
-	if !logBlockProcEvents {
-		blockProcLogger = nil
-	}
-	var processing, procLast bool
+	var (
+		processing, procLast bool
+		procStarted          time.Time
+	)
 	blockProcFeed := make(chan bool, 100)
 	s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed)
 	totalRechargeCh := make(chan uint64, 100)
@@ -176,13 +143,13 @@ func (s *LesServer) startEventLoop() {
 	updateRecharge := func() {
 		if processing {
 			if !procLast {
-				blockProcLogger.Event("block processing started")
+				procStarted = time.Now()
 			}
 			s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing)
 			s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
 		} else {
 			if procLast {
-				blockProcLogger.Event("block processing finished")
+				blockProcessingTimer.UpdateSince(procStarted)
 			}
 			s.protocolManager.servingQueue.setThreads(s.thcNormal)
 			s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}})
@@ -191,7 +158,7 @@ func (s *LesServer) startEventLoop() {
 	}
 	updateRecharge()
 	totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
-	s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
+	s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
 
 	var maxFreePeers uint64
 	go func() {
@@ -202,13 +169,13 @@ func (s *LesServer) startEventLoop() {
 			case totalRecharge = <-totalRechargeCh:
 				updateRecharge()
 			case totalCapacity = <-totalCapacityCh:
-				s.logTotalCap.Update(float64(totalCapacity))
+				totalCapacityGauge.Update(int64(totalCapacity))
 				newFreePeers := totalCapacity / s.freeClientCap
 				if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) {
 					log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers)
 				}
 				maxFreePeers = newFreePeers
-				s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
+				s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
 			case <-s.protocolManager.quitSync:
 				s.protocolManager.wg.Done()
 				return
@@ -243,19 +210,9 @@ func (s *LesServer) Start(srvr *p2p.Server) {
 		maxCapacity = totalRecharge
 	}
 	s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2)
-	poolMetricsLogger := s.csvLogger
-	if !logClientPoolMetrics {
-		poolMetricsLogger = nil
-	}
-	poolEventLogger := s.csvLogger
-	if !logClientPoolEvents {
-		poolEventLogger = nil
-	}
-	s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }, poolMetricsLogger, poolEventLogger)
-	s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool, poolMetricsLogger, poolEventLogger)
+	s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) })
+	s.protocolManager.peers.notify(s.freeClientPool)
 
-	s.protocolManager.peers.notify(s.priorityClientPool)
-	s.csvLogger.Start()
 	s.startEventLoop()
 	s.protocolManager.Start(s.config.LightPeers)
 	if srvr.DiscV5 != nil {
@@ -296,7 +253,6 @@ func (s *LesServer) Stop() {
 	s.freeClientPool.stop()
 	s.costTracker.stop()
 	s.protocolManager.Stop()
-	s.csvLogger.Stop()
 }
 
 // todo(rjl493456442) separate client and server implementation.
diff --git a/les/servingqueue.go b/les/servingqueue.go
index 26656ec01..a9e8369fe 100644
--- a/les/servingqueue.go
+++ b/les/servingqueue.go
@@ -17,14 +17,12 @@
 package les
 
 import (
-	"fmt"
 	"sort"
 	"sync"
 	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/common/mclock"
 	"github.com/ethereum/go-ethereum/common/prque"
-	"github.com/ethereum/go-ethereum/les/csvlogger"
 )
 
 // servingQueue allows running tasks in a limited number of threads and puts the
@@ -44,10 +42,6 @@ type servingQueue struct {
 	queue       *prque.Prque // priority queue for waiting or suspended tasks
 	best        *servingTask // the highest priority task (not included in the queue)
 	suspendBias int64        // priority bias against suspending an already running task
-
-	logger        *csvlogger.Logger
-	logRecentTime *csvlogger.Channel
-	logQueuedTime *csvlogger.Channel
 }
 
 // servingTask represents a request serving task. Tasks can be implemented to
@@ -127,7 +121,7 @@ func (t *servingTask) waitOrStop() bool {
 }
 
 // newServingQueue returns a new servingQueue
-func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue {
+func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
 	sq := &servingQueue{
 		queue:          prque.New(nil),
 		suspendBias:    suspendBias,
@@ -140,9 +134,6 @@ func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Lo
 		burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
 		burstDecRate:   utilTarget,
 		lastUpdate:     mclock.Now(),
-		logger:         logger,
-		logRecentTime:  logger.NewMinMaxChannel("recentTime", false),
-		logQueuedTime:  logger.NewMinMaxChannel("queuedTime", false),
 	}
 	sq.wg.Add(2)
 	go sq.queueLoop()
@@ -246,16 +237,13 @@ func (sq *servingQueue) freezePeers() {
 	}
 	sort.Sort(peerList)
 	drop := true
-	sq.logger.Event("freezing peers")
 	for _, tasks := range peerList {
 		if drop {
 			tasks.peer.freezeClient()
 			tasks.peer.fcClient.Freeze()
 			sq.queuedTime -= tasks.sumTime
-			if sq.logQueuedTime != nil {
-				sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
-			}
-			sq.logger.Event(fmt.Sprintf("frozen peer  sumTime=%d, %v", tasks.sumTime, tasks.peer.id))
+			sqQueuedGauge.Update(int64(sq.queuedTime))
+			clientFreezeMeter.Mark(1)
 			drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
 			for _, task := range tasks.list {
 				task.tokenCh <- nil
@@ -299,10 +287,8 @@ func (sq *servingQueue) addTask(task *servingTask) {
 	}
 	sq.updateRecentTime()
 	sq.queuedTime += task.expTime
-	if sq.logQueuedTime != nil {
-		sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
-		sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
-	}
+	sqServedGauge.Update(int64(sq.recentTime))
+	sqQueuedGauge.Update(int64(sq.queuedTime))
 	if sq.recentTime+sq.queuedTime > sq.burstLimit {
 		sq.freezePeers()
 	}
@@ -322,10 +308,8 @@ func (sq *servingQueue) queueLoop() {
 				sq.updateRecentTime()
 				sq.queuedTime -= expTime
 				sq.recentTime += expTime
-				if sq.logQueuedTime != nil {
-					sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
-					sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
-				}
+				sqServedGauge.Update(int64(sq.recentTime))
+				sqQueuedGauge.Update(int64(sq.queuedTime))
 				if sq.queue.Size() == 0 {
 					sq.best = nil
 				} else {
-- 
GitLab