good morning!!!!
Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
B
bor
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Harbor Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
open
bor
Commits
ee445a2b
Commit
ee445a2b
authored
8 years ago
by
Péter Szilágyi
Committed by
GitHub
8 years ago
Browse files
Options
Downloads
Plain Diff
Merge pull request #3425 from karalabe/netstats-time-fixup
netstats: time and block history
parents
4f9ccdd7
b2c226cb
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
ethstats/ethstats.go
+197
-38
197 additions, 38 deletions
ethstats/ethstats.go
with
197 additions
and
38 deletions
ethstats/ethstats.go
+
197
−
38
View file @
ee445a2b
...
...
@@ -25,6 +25,7 @@ import (
"regexp"
"runtime"
"strconv"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
...
...
@@ -41,6 +42,10 @@ import (
"golang.org/x/net/websocket"
)
// historyUpdateRange is the number of blocks a node should report upon login or
// history request.
const
historyUpdateRange
=
50
// Service implements an Ethereum netstats reporting daemon that pushes local
// chain statistics up to a monitoring server.
type
Service
struct
{
...
...
@@ -53,6 +58,9 @@ type Service struct {
node
string
// Name of the node to display on the monitoring page
pass
string
// Password to authorize access to the monitoring page
host
string
// Remote address of the monitoring service
pongCh
chan
struct
{}
// Pong notifications are fed into this channel
histCh
chan
[]
uint64
// History request block numbers are fed into this channel
}
// New returns a monitoring service ready for stats reporting.
...
...
@@ -65,11 +73,13 @@ func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Servic
}
// Assemble and return the stats service
return
&
Service
{
eth
:
ethServ
,
les
:
lesServ
,
node
:
parts
[
1
],
pass
:
parts
[
3
],
host
:
parts
[
4
],
eth
:
ethServ
,
les
:
lesServ
,
node
:
parts
[
1
],
pass
:
parts
[
3
],
host
:
parts
[
4
],
pongCh
:
make
(
chan
struct
{}),
histCh
:
make
(
chan
[]
uint64
,
1
),
},
nil
}
...
...
@@ -115,7 +125,11 @@ func (s *Service) loop() {
// Loop reporting until termination
for
{
// Establish a websocket connection to the server and authenticate the node
conn
,
err
:=
websocket
.
Dial
(
fmt
.
Sprintf
(
"wss://%s/api"
,
s
.
host
),
""
,
"http://localhost/"
)
url
:=
fmt
.
Sprintf
(
"%s/api"
,
s
.
host
)
if
!
strings
.
Contains
(
url
,
"://"
)
{
url
=
"wss://"
+
url
}
conn
,
err
:=
websocket
.
Dial
(
url
,
""
,
"http://localhost/"
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Stats server unreachable: %v"
,
err
)
time
.
Sleep
(
10
*
time
.
Second
)
...
...
@@ -130,22 +144,34 @@ func (s *Service) loop() {
time
.
Sleep
(
10
*
time
.
Second
)
continue
}
if
err
=
s
.
report
(
in
,
out
);
err
!=
nil
{
go
s
.
readLoop
(
conn
,
in
)
// Send the initial stats so our node looks decent from the get go
if
err
=
s
.
report
(
out
);
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Initial stats report failed: %v"
,
err
)
conn
.
Close
()
continue
}
if
err
=
s
.
reportHistory
(
out
,
nil
);
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"History report failed: %v"
,
err
)
conn
.
Close
()
continue
}
// Keep sending status updates until the connection breaks
fullReport
:=
time
.
NewTicker
(
15
*
time
.
Second
)
for
err
==
nil
{
select
{
case
<-
fullReport
.
C
:
if
err
=
s
.
report
(
in
,
out
);
err
!=
nil
{
if
err
=
s
.
report
(
out
);
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Full stats report failed: %v"
,
err
)
}
case
head
:=
<-
headSub
.
Chan
()
:
if
head
==
nil
{
// node stopped
case
list
:=
<-
s
.
histCh
:
if
err
=
s
.
reportHistory
(
out
,
list
);
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Block history report failed: %v"
,
err
)
}
case
head
,
ok
:=
<-
headSub
.
Chan
()
:
if
!
ok
{
// node stopped
conn
.
Close
()
return
}
...
...
@@ -155,8 +181,8 @@ func (s *Service) loop() {
if
err
=
s
.
reportPending
(
out
);
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Post-block transaction stats report failed: %v"
,
err
)
}
case
ev
:=
<-
txSub
.
Chan
()
:
if
ev
==
nil
{
// node stopped
case
_
,
ok
:=
<-
txSub
.
Chan
()
:
if
!
ok
{
// node stopped
conn
.
Close
()
return
}
...
...
@@ -178,6 +204,76 @@ func (s *Service) loop() {
}
}
// readLoop loops as long as the connection is alive and retrieves data packets
// from the network socket. If any of them match an active request, it forwards
// it, if they themselves are requests it initiates a reply, and lastly it drops
// unknown packets.
func
(
s
*
Service
)
readLoop
(
conn
*
websocket
.
Conn
,
in
*
json
.
Decoder
)
{
// If the read loop exists, close the connection
defer
conn
.
Close
()
for
{
// Retrieve the next generic network packet and bail out on error
var
msg
map
[
string
][]
interface
{}
if
err
:=
in
.
Decode
(
&
msg
);
err
!=
nil
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Failed to decode stats server message: %v"
,
err
)
return
}
if
len
(
msg
[
"emit"
])
==
0
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Stats server sent non-broadcast: %v"
,
msg
)
return
}
command
,
ok
:=
msg
[
"emit"
][
0
]
.
(
string
)
if
!
ok
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Invalid stats server message type: %v"
,
msg
[
"emit"
][
0
])
return
}
// If the message is a ping reply, deliver (someone must be listening!)
if
len
(
msg
[
"emit"
])
==
2
&&
command
==
"node-pong"
{
select
{
case
s
.
pongCh
<-
struct
{}{}
:
// Pong delivered, continue listening
continue
default
:
// Ping routine dead, abort
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Stats server pinger seems to have died"
)
return
}
}
// If the message is a history request, forward to the event processor
if
len
(
msg
[
"emit"
])
==
2
&&
command
==
"history"
{
// Make sure the request is valid and doesn't crash us
request
,
ok
:=
msg
[
"emit"
][
1
]
.
(
map
[
string
]
interface
{})
if
!
ok
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Invalid history request: %v"
,
msg
[
"emit"
][
1
])
return
}
list
,
ok
:=
request
[
"list"
]
.
([]
interface
{})
if
!
ok
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Invalid history block list: %v"
,
request
[
"list"
])
return
}
// Convert the block number list to an integer list
numbers
:=
make
([]
uint64
,
len
(
list
))
for
i
,
num
:=
range
list
{
n
,
ok
:=
num
.
(
float64
)
if
!
ok
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Invalid history block number: %v"
,
num
)
return
}
numbers
[
i
]
=
uint64
(
n
)
}
select
{
case
s
.
histCh
<-
numbers
:
continue
default
:
}
}
// Report anything else and continue
glog
.
V
(
logger
.
Info
)
.
Infof
(
"Unknown stats message: %v"
,
msg
)
}
}
// nodeInfo is the collection of metainformation about a node that is displayed
// on the monitoring page.
type
nodeInfo
struct
{
...
...
@@ -190,6 +286,7 @@ type nodeInfo struct {
Os
string
`json:"os"`
OsVer
string
`json:"os_v"`
Client
string
`json:"client"`
History
bool
`json:"canUpdateHistory"`
}
// authMsg is the authentication infos needed to login to a monitoring server.
...
...
@@ -224,6 +321,7 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
Os
:
runtime
.
GOOS
,
OsVer
:
runtime
.
GOARCH
,
Client
:
"0.1.1"
,
History
:
true
,
},
Secret
:
s
.
pass
,
}
...
...
@@ -244,8 +342,8 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
// report collects all possible data to report and send it to the stats server.
// This should only be used on reconnects or rarely to avoid overloading the
// server. Use the individual methods for reporting subscribed events.
func
(
s
*
Service
)
report
(
in
*
json
.
Decoder
,
out
*
json
.
Encoder
)
error
{
if
err
:=
s
.
reportLatency
(
in
,
out
);
err
!=
nil
{
func
(
s
*
Service
)
report
(
out
*
json
.
Encoder
)
error
{
if
err
:=
s
.
reportLatency
(
out
);
err
!=
nil
{
return
err
}
if
err
:=
s
.
reportBlock
(
out
,
nil
);
err
!=
nil
{
...
...
@@ -262,7 +360,7 @@ func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
// reportLatency sends a ping request to the server, measures the RTT time and
// finally sends a latency update.
func
(
s
*
Service
)
reportLatency
(
in
*
json
.
Decoder
,
out
*
json
.
Encoder
)
error
{
func
(
s
*
Service
)
reportLatency
(
out
*
json
.
Encoder
)
error
{
// Send the current time to the ethstats server
start
:=
time
.
Now
()
...
...
@@ -276,9 +374,12 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
return
err
}
// Wait for the pong request to arrive back
var
pong
map
[
string
][]
interface
{}
if
err
:=
in
.
Decode
(
&
pong
);
err
!=
nil
||
len
(
pong
[
"emit"
])
!=
2
||
pong
[
"emit"
][
0
]
.
(
string
)
!=
"node-pong"
{
return
errors
.
New
(
"unexpected ping reply"
)
select
{
case
<-
s
.
pongCh
:
// Pong delivered, report the latency
case
<-
time
.
After
(
3
*
time
.
Second
)
:
// Ping timeout, abort
return
errors
.
New
(
"ping timed out"
)
}
// Send back the measured latency
latency
:=
map
[
string
][]
interface
{}{
...
...
@@ -297,6 +398,7 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
type
blockStats
struct
{
Number
*
big
.
Int
`json:"number"`
Hash
common
.
Hash
`json:"hash"`
Timestamp
*
big
.
Int
`json:"timestamp"`
Miner
common
.
Address
`json:"miner"`
GasUsed
*
big
.
Int
`json:"gasUsed"`
GasLimit
*
big
.
Int
`json:"gasLimit"`
...
...
@@ -330,9 +432,26 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
// reportBlock retrieves the current chain head and repors it to the stats server.
func
(
s
*
Service
)
reportBlock
(
out
*
json
.
Encoder
,
block
*
types
.
Block
)
error
{
// Gather the head block infos from the local blockchain
// Assemble the block stats report and send it to the server
stats
:=
map
[
string
]
interface
{}{
"id"
:
s
.
node
,
"block"
:
s
.
assembleBlockStats
(
block
),
}
report
:=
map
[
string
][]
interface
{}{
"emit"
:
[]
interface
{}{
"block"
,
stats
},
}
if
err
:=
out
.
Encode
(
report
);
err
!=
nil
{
return
err
}
return
nil
}
// assembleBlockStats retrieves any required metadata to report a single block
// and assembles the block stats. If block is nil, the current head is processed.
func
(
s
*
Service
)
assembleBlockStats
(
block
*
types
.
Block
)
*
blockStats
{
// Gather the block infos from the local blockchain
var
(
head
*
types
.
Header
head
er
*
types
.
Header
td
*
big
.
Int
txs
[]
*
types
.
Transaction
uncles
[]
*
types
.
Header
...
...
@@ -342,37 +461,77 @@ func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
if
block
==
nil
{
block
=
s
.
eth
.
BlockChain
()
.
CurrentBlock
()
}
head
=
block
.
Header
()
td
=
s
.
eth
.
BlockChain
()
.
GetTd
(
head
.
Hash
(),
head
.
Number
.
Uint64
())
head
er
=
block
.
Header
()
td
=
s
.
eth
.
BlockChain
()
.
GetTd
(
head
er
.
Hash
(),
head
er
.
Number
.
Uint64
())
txs
=
block
.
Transactions
()
uncles
=
block
.
Uncles
()
}
else
{
// Light nodes would need on-demand lookups for transactions/uncles, skip
if
block
!=
nil
{
head
=
block
.
Header
()
header
=
block
.
Header
()
}
else
{
header
=
s
.
les
.
BlockChain
()
.
CurrentHeader
()
}
td
=
s
.
les
.
BlockChain
()
.
GetTd
(
header
.
Hash
(),
header
.
Number
.
Uint64
())
}
// Assemble and return the block stats
return
&
blockStats
{
Number
:
header
.
Number
,
Hash
:
header
.
Hash
(),
Timestamp
:
header
.
Time
,
Miner
:
header
.
Coinbase
,
GasUsed
:
new
(
big
.
Int
)
.
Set
(
header
.
GasUsed
),
GasLimit
:
new
(
big
.
Int
)
.
Set
(
header
.
GasLimit
),
Diff
:
header
.
Difficulty
.
String
(),
TotalDiff
:
td
.
String
(),
Txs
:
txs
,
Uncles
:
uncles
,
}
}
// reportHistory retrieves the most recent batch of blocks and reports it to the
// stats server.
func
(
s
*
Service
)
reportHistory
(
out
*
json
.
Encoder
,
list
[]
uint64
)
error
{
// Figure out the indexes that need reporting
indexes
:=
make
([]
uint64
,
0
,
historyUpdateRange
)
if
len
(
list
)
>
0
{
// Specific indexes requested, send them back in particular
for
_
,
idx
:=
range
list
{
indexes
=
append
(
indexes
,
idx
)
}
}
else
{
// No indexes requested, send back the top ones
var
head
*
types
.
Header
if
s
.
eth
!=
nil
{
head
=
s
.
eth
.
BlockChain
()
.
CurrentHeader
()
}
else
{
head
=
s
.
les
.
BlockChain
()
.
CurrentHeader
()
}
td
=
s
.
les
.
BlockChain
()
.
GetTd
(
head
.
Hash
(),
head
.
Number
.
Uint64
())
start
:=
head
.
Number
.
Int64
()
-
historyUpdateRange
if
start
<
0
{
start
=
0
}
for
i
:=
uint64
(
start
);
i
<=
head
.
Number
.
Uint64
();
i
++
{
indexes
=
append
(
indexes
,
i
)
}
}
// Assemble the block stats report and send it to the server
// Gather the batch of blocks to report
history
:=
make
([]
*
blockStats
,
len
(
indexes
))
for
i
,
number
:=
range
indexes
{
if
s
.
eth
!=
nil
{
history
[
i
]
=
s
.
assembleBlockStats
(
s
.
eth
.
BlockChain
()
.
GetBlockByNumber
(
number
))
}
else
{
history
[
i
]
=
s
.
assembleBlockStats
(
types
.
NewBlockWithHeader
(
s
.
les
.
BlockChain
()
.
GetHeaderByNumber
(
number
)))
}
}
// Assemble the history report and send it to the server
stats
:=
map
[
string
]
interface
{}{
"id"
:
s
.
node
,
"block"
:
&
blockStats
{
Number
:
head
.
Number
,
Hash
:
head
.
Hash
(),
Miner
:
head
.
Coinbase
,
GasUsed
:
new
(
big
.
Int
)
.
Set
(
head
.
GasUsed
),
GasLimit
:
new
(
big
.
Int
)
.
Set
(
head
.
GasLimit
),
Diff
:
head
.
Difficulty
.
String
(),
TotalDiff
:
td
.
String
(),
Txs
:
txs
,
Uncles
:
uncles
,
},
"id"
:
s
.
node
,
"history"
:
history
,
}
report
:=
map
[
string
][]
interface
{}{
"emit"
:
[]
interface
{}{
"
block
"
,
stats
},
"emit"
:
[]
interface
{}{
"
history
"
,
stats
},
}
if
err
:=
out
.
Encode
(
report
);
err
!=
nil
{
return
err
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment