From 0ac9bbba6cd259bc9895e786121cdcb6e7c5d9d2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= <zsfelfoldi@gmail.com>
Date: Tue, 17 Sep 2019 15:28:41 +0200
Subject: [PATCH] les: multiple server bugfixes (#20079)

* les: detailed relative cost metrics

* les: filter txpool relative request statistic

* les: initialize price factors

* les: increased connected bias to lower churn rate

* les: fixed clientPool.setLimits

* core: do not use mutex in GetAncestor

* les: bump factor db version again

* les: add metrics

* les, light: minor fixes
---
 core/blockchain.go    |  3 ---
 core/headerchain.go   |  7 +++++--
 les/clientpool.go     | 16 +++++++++-------
 les/costtracker.go    | 43 ++++++++++++++++++++++++++++++++++++++++---
 les/metrics.go        | 30 ++++++++++++++++++++++++------
 les/server.go         |  1 +
 les/server_handler.go |  8 ++++++++
 light/lightchain.go   |  3 ---
 8 files changed, 87 insertions(+), 24 deletions(-)

diff --git a/core/blockchain.go b/core/blockchain.go
index 833de3bc7..1bf31b239 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -2151,9 +2151,6 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
 //
 // Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
 func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
-	bc.chainmu.RLock()
-	defer bc.chainmu.RUnlock()
-
 	return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
 }
 
diff --git a/core/headerchain.go b/core/headerchain.go
index 034858f65..a2faa3303 100644
--- a/core/headerchain.go
+++ b/core/headerchain.go
@@ -349,8 +349,11 @@ func (hc *HeaderChain) GetAncestor(hash common.Hash, number, ancestor uint64, ma
 	}
 	for ancestor != 0 {
 		if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
-			number -= ancestor
-			return rawdb.ReadCanonicalHash(hc.chainDb, number), number
+			ancestorHash := rawdb.ReadCanonicalHash(hc.chainDb, number-ancestor)
+			if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
+				number -= ancestor
+				return ancestorHash, number
+			}
 		}
 		if *maxNonCanonical == 0 {
 			return common.Hash{}, 0
diff --git a/les/clientpool.go b/les/clientpool.go
index cff5f41ed..6773aab55 100644
--- a/les/clientpool.go
+++ b/les/clientpool.go
@@ -33,7 +33,7 @@ import (
 const (
 	negBalanceExpTC      = time.Hour        // time constant for exponentially reducing negative balance
 	fixedPointMultiplier = 0x1000000        // constant to convert logarithms to fixed point format
-	connectedBias        = time.Minute      // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
+	connectedBias        = time.Minute * 5  // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
 	lazyQueueRefresh     = time.Second * 10 // refresh period of the connected queue
 )
 
@@ -366,12 +366,14 @@ func (f *clientPool) setLimits(count int, totalCap uint64) {
 
 	f.countLimit = count
 	f.capacityLimit = totalCap
-	now := mclock.Now()
-	f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
-		c := data.(*clientInfo)
-		f.dropClient(c, now, true)
-		return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
-	})
+	if f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit {
+		now := mclock.Now()
+		f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
+			c := data.(*clientInfo)
+			f.dropClient(c, now, true)
+			return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
+		})
+	}
 }
 
 // requestCost feeds request cost after serving a request from the given peer.
diff --git a/les/costtracker.go b/les/costtracker.go
index d1f5b54ca..81da04566 100644
--- a/les/costtracker.go
+++ b/les/costtracker.go
@@ -28,6 +28,7 @@ import (
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/les/flowcontrol"
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 )
 
 const makeCostStats = false // make request cost statistics during operation
@@ -87,7 +88,7 @@ const (
 	gfUsageTC        = time.Second
 	gfRaiseTC        = time.Second * 200
 	gfDropTC         = time.Second * 50
-	gfDbKey          = "_globalCostFactorV3"
+	gfDbKey          = "_globalCostFactorV6"
 )
 
 // costTracker is responsible for calculating costs and cost estimates on the
@@ -226,6 +227,9 @@ type reqInfo struct {
 	// servingTime is the CPU time corresponding to the actual processing of
 	// the request.
 	servingTime float64
+
+	// msgCode indicates the type of request.
+	msgCode uint64
 }
 
 // gfLoop starts an event loop which updates the global cost factor which is
@@ -269,11 +273,43 @@ func (ct *costTracker) gfLoop() {
 		for {
 			select {
 			case r := <-ct.reqInfoCh:
+				relCost := int64(factor * r.servingTime * 100 / r.avgTimeCost) // Convert the value to a percentage form
+
+				// Record more metrics if we are debugging
+				if metrics.EnabledExpensive {
+					switch r.msgCode {
+					case GetBlockHeadersMsg:
+						relativeCostHeaderHistogram.Update(relCost)
+					case GetBlockBodiesMsg:
+						relativeCostBodyHistogram.Update(relCost)
+					case GetReceiptsMsg:
+						relativeCostReceiptHistogram.Update(relCost)
+					case GetCodeMsg:
+						relativeCostCodeHistogram.Update(relCost)
+					case GetProofsV2Msg:
+						relativeCostProofHistogram.Update(relCost)
+					case GetHelperTrieProofsMsg:
+						relativeCostHelperProofHistogram.Update(relCost)
+					case SendTxV2Msg:
+						relativeCostSendTxHistogram.Update(relCost)
+					case GetTxStatusMsg:
+						relativeCostTxStatusHistogram.Update(relCost)
+					}
+				}
+				// SendTxV2 and GetTxStatus requests are two special cases.
+				// All other requests will only put pressure on the database, and
+				// the corresponding delay is relatively stable. While these two
+				// requests involve txpool query, which is usually unstable.
+				//
+				// TODO(rjl493456442) fixes this.
+				if r.msgCode == SendTxV2Msg || r.msgCode == GetTxStatusMsg {
+					continue
+				}
 				requestServedMeter.Mark(int64(r.servingTime))
 				requestServedTimer.Update(time.Duration(r.servingTime))
 				requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
 				requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
-				relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))
+				relativeCostHistogram.Update(relCost)
 
 				now := mclock.Now()
 				dt := float64(now - expUpdate)
@@ -324,6 +360,7 @@ func (ct *costTracker) gfLoop() {
 							default:
 							}
 						}
+						globalFactorGauge.Update(int64(1000 * factor))
 						log.Debug("global cost factor updated", "factor", factor)
 					}
 				}
@@ -375,7 +412,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
 	avg := reqAvgTimeCost[code]
 	avgTimeCost := avg.baseCost + amount*avg.reqCost
 	select {
-	case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}:
+	case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime), code}:
 	default:
 	}
 	if makeCostStats {
diff --git a/les/metrics.go b/les/metrics.go
index 797631b8e..9ef8c3651 100644
--- a/les/metrics.go
+++ b/les/metrics.go
@@ -60,6 +60,15 @@ var (
 	miscOutTxStatusPacketsMeter   = metrics.NewRegisteredMeter("les/misc/out/packets/txStatus", nil)
 	miscOutTxStatusTrafficMeter   = metrics.NewRegisteredMeter("les/misc/out/traffic/txStatus", nil)
 
+	miscServingTimeHeaderTimer     = metrics.NewRegisteredTimer("les/misc/serve/header", nil)
+	miscServingTimeBodyTimer       = metrics.NewRegisteredTimer("les/misc/serve/body", nil)
+	miscServingTimeCodeTimer       = metrics.NewRegisteredTimer("les/misc/serve/code", nil)
+	miscServingTimeReceiptTimer    = metrics.NewRegisteredTimer("les/misc/serve/receipt", nil)
+	miscServingTimeTrieProofTimer  = metrics.NewRegisteredTimer("les/misc/serve/proof", nil)
+	miscServingTimeHelperTrieTimer = metrics.NewRegisteredTimer("les/misc/serve/helperTrie", nil)
+	miscServingTimeTxTimer         = metrics.NewRegisteredTimer("les/misc/serve/txs", nil)
+	miscServingTimeTxStatusTimer   = metrics.NewRegisteredTimer("les/misc/serve/txStatus", nil)
+
 	connectionTimer       = metrics.NewRegisteredTimer("les/connection/duration", nil)
 	serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
 	clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
@@ -69,12 +78,21 @@ var (
 	totalConnectedGauge  = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
 	blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
 
-	requestServedMeter    = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
-	requestServedTimer    = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
-	requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
-	requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
-	relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
-
+	requestServedMeter               = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
+	requestServedTimer               = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
+	requestEstimatedMeter            = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
+	requestEstimatedTimer            = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
+	relativeCostHistogram            = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostHeaderHistogram      = metrics.NewRegisteredHistogram("les/server/req/relative/header", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostBodyHistogram        = metrics.NewRegisteredHistogram("les/server/req/relative/body", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostReceiptHistogram     = metrics.NewRegisteredHistogram("les/server/req/relative/receipt", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostCodeHistogram        = metrics.NewRegisteredHistogram("les/server/req/relative/code", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostProofHistogram       = metrics.NewRegisteredHistogram("les/server/req/relative/proof", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostHelperProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/helperTrie", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostSendTxHistogram      = metrics.NewRegisteredHistogram("les/server/req/relative/txs", nil, metrics.NewExpDecaySample(1028, 0.015))
+	relativeCostTxStatusHistogram    = metrics.NewRegisteredHistogram("les/server/req/relative/txStatus", nil, metrics.NewExpDecaySample(1028, 0.015))
+
+	globalFactorGauge    = metrics.NewRegisteredGauge("les/server/globalFactor", nil)
 	recentServedGauge    = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
 	recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
 	sqServedGauge        = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
diff --git a/les/server.go b/les/server.go
index 8e790323f..592858cb9 100644
--- a/les/server.go
+++ b/les/server.go
@@ -113,6 +113,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
 	}
 	srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
 	srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
+	srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
 
 	checkpoint := srv.latestLocalCheckpoint()
 	if !checkpoint.Empty() {
diff --git a/les/server_handler.go b/les/server_handler.go
index 79c0a08a9..16249ef1b 100644
--- a/les/server_handler.go
+++ b/les/server_handler.go
@@ -268,6 +268,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInHeaderPacketsMeter.Mark(1)
 			miscInHeaderTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeHeaderTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID uint64
@@ -380,6 +381,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInBodyPacketsMeter.Mark(1)
 			miscInBodyTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeBodyTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID  uint64
@@ -428,6 +430,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInCodePacketsMeter.Mark(1)
 			miscInCodeTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeCodeTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID uint64
@@ -499,6 +502,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInReceiptPacketsMeter.Mark(1)
 			miscInReceiptTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeReceiptTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID  uint64
@@ -555,6 +559,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInTrieProofPacketsMeter.Mark(1)
 			miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeTrieProofTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID uint64
@@ -657,6 +662,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInHelperTriePacketsMeter.Mark(1)
 			miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeHelperTrieTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID uint64
@@ -731,6 +737,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInTxsPacketsMeter.Mark(1)
 			miscInTxsTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeTxTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID uint64
@@ -779,6 +786,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
 		if metrics.EnabledExpensive {
 			miscInTxStatusPacketsMeter.Mark(1)
 			miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
+			defer func(start time.Time) { miscServingTimeTxStatusTimer.UpdateSince(start) }(time.Now())
 		}
 		var req struct {
 			ReqID  uint64
diff --git a/light/lightchain.go b/light/lightchain.go
index 7f64d1c28..9529c2e1b 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -438,9 +438,6 @@ func (lc *LightChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
 //
 // Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
 func (lc *LightChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
-	lc.chainmu.RLock()
-	defer lc.chainmu.RUnlock()
-
 	return lc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
 }
 
-- 
GitLab