good morning!!!!
Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
B
bor
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Harbor Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
open
bor
Commits
2a7411bc
Commit
2a7411bc
authored
Jun 16, 2015
by
Péter Szilágyi
Browse files
Options
Downloads
Patches
Plain Diff
eth/fetcher: fix premature queue cleanup, general polishes
parent
497a7f17
Branches
Branches containing commit
Tags
Tags containing commit
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
eth/fetcher/fetcher.go
+47
-35
47 additions, 35 deletions
eth/fetcher/fetcher.go
with
47 additions
and
35 deletions
eth/fetcher/fetcher.go
+
47
−
35
View file @
2a7411bc
...
@@ -61,6 +61,10 @@ type Fetcher struct {
...
@@ -61,6 +61,10 @@ type Fetcher struct {
filter
chan
chan
[]
*
types
.
Block
filter
chan
chan
[]
*
types
.
Block
quit
chan
struct
{}
quit
chan
struct
{}
// Announce states
announced
map
[
common
.
Hash
][]
*
announce
// Announced blocks, scheduled for fetching
fetching
map
[
common
.
Hash
]
*
announce
// Announced blocks, currently fetching
// Block cache
// Block cache
queue
*
prque
.
Prque
// Queue containing the import operations (block number sorted)
queue
*
prque
.
Prque
// Queue containing the import operations (block number sorted)
queued
map
[
common
.
Hash
]
struct
{}
// Presence set of already queued blocks (to dedup imports)
queued
map
[
common
.
Hash
]
struct
{}
// Presence set of already queued blocks (to dedup imports)
...
@@ -78,6 +82,8 @@ func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHei
...
@@ -78,6 +82,8 @@ func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHei
insert
:
make
(
chan
*
inject
),
insert
:
make
(
chan
*
inject
),
filter
:
make
(
chan
chan
[]
*
types
.
Block
),
filter
:
make
(
chan
chan
[]
*
types
.
Block
),
quit
:
make
(
chan
struct
{}),
quit
:
make
(
chan
struct
{}),
announced
:
make
(
map
[
common
.
Hash
][]
*
announce
),
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
queue
:
prque
.
New
(),
queue
:
prque
.
New
(),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
hasBlock
:
hasBlock
,
hasBlock
:
hasBlock
,
...
@@ -158,19 +164,16 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
...
@@ -158,19 +164,16 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
// Loop is the main fetcher loop, checking and processing various notification
// Loop is the main fetcher loop, checking and processing various notification
// events.
// events.
func
(
f
*
Fetcher
)
loop
()
{
func
(
f
*
Fetcher
)
loop
()
{
announced
:=
make
(
map
[
common
.
Hash
][]
*
announce
)
fetching
:=
make
(
map
[
common
.
Hash
]
*
announce
)
// Iterate the block fetching until a quit is requested
// Iterate the block fetching until a quit is requested
fetch
:=
time
.
NewTimer
(
0
)
fetch
:=
time
.
NewTimer
(
0
)
done
:=
make
(
chan
common
.
Hash
)
done
:=
make
(
chan
common
.
Hash
)
for
{
for
{
// Clean up any expired block fetches
// Clean up any expired block fetches
for
hash
,
announce
:=
range
fetching
{
for
hash
,
announce
:=
range
f
.
fetching
{
if
time
.
Since
(
announce
.
time
)
>
fetchTimeout
{
if
time
.
Since
(
announce
.
time
)
>
fetchTimeout
{
delete
(
announced
,
hash
)
delete
(
f
.
announced
,
hash
)
delete
(
fetching
,
hash
)
delete
(
f
.
fetching
,
hash
)
}
}
}
}
// Import any queued blocks that could potentially fit
// Import any queued blocks that could potentially fit
...
@@ -184,17 +187,16 @@ func (f *Fetcher) loop() {
...
@@ -184,17 +187,16 @@ func (f *Fetcher) loop() {
}
}
// Otherwise if not known yet, try and import
// Otherwise if not known yet, try and import
hash
:=
op
.
block
.
Hash
()
hash
:=
op
.
block
.
Hash
()
delete
(
f
.
queued
,
hash
)
if
f
.
hasBlock
(
hash
)
{
if
f
.
hasBlock
(
hash
)
{
continue
continue
}
}
// Block may just fit, try to import it
// Block may just fit, try to import it
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: importing block %x"
,
op
.
origin
,
hash
.
Bytes
()[
:
4
])
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: importing block
#%d [
%x
]
"
,
op
.
origin
,
op
.
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
])
go
func
()
{
go
func
()
{
defer
func
()
{
done
<-
hash
}()
defer
func
()
{
done
<-
hash
}()
if
err
:=
f
.
importBlock
(
op
.
origin
,
op
.
block
);
err
!=
nil
{
if
err
:=
f
.
importBlock
(
op
.
origin
,
op
.
block
);
err
!=
nil
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: block %x import failed: %v"
,
op
.
origin
,
hash
.
Bytes
()[
:
4
],
err
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: block
#%d [
%x
]
import failed: %v"
,
op
.
origin
,
op
.
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
err
)
return
return
}
}
}()
}()
...
@@ -208,14 +210,13 @@ func (f *Fetcher) loop() {
...
@@ -208,14 +210,13 @@ func (f *Fetcher) loop() {
case
notification
:=
<-
f
.
notify
:
case
notification
:=
<-
f
.
notify
:
// A block was announced, schedule if it's not yet downloading
// A block was announced, schedule if it's not yet downloading
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: scheduling %x"
,
notification
.
origin
,
notification
.
hash
[
:
4
])
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: scheduling %x"
,
notification
.
origin
,
notification
.
hash
[
:
4
])
if
_
,
ok
:=
fetching
[
notification
.
hash
];
ok
{
if
_
,
ok
:=
f
.
fetching
[
notification
.
hash
];
ok
{
break
break
}
}
if
len
(
announced
)
==
0
{
f
.
announced
[
notification
.
hash
]
=
append
(
f
.
announced
[
notification
.
hash
],
notification
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Scheduling fetch in %v, at %v"
,
arriveTimeout
-
time
.
Since
(
notification
.
time
),
notification
.
time
.
Add
(
arriveTimeout
))
if
len
(
f
.
announced
)
==
1
{
f
etch
.
Reset
(
arriveTimeout
-
time
.
Since
(
notification
.
time
)
)
f
.
reschedule
(
fetch
)
}
}
announced
[
notification
.
hash
]
=
append
(
announced
[
notification
.
hash
],
notification
)
case
op
:=
<-
f
.
insert
:
case
op
:=
<-
f
.
insert
:
// A direct block insertion was requested, try and fill any pending gaps
// A direct block insertion was requested, try and fill any pending gaps
...
@@ -223,39 +224,31 @@ func (f *Fetcher) loop() {
...
@@ -223,39 +224,31 @@ func (f *Fetcher) loop() {
case
hash
:=
<-
done
:
case
hash
:=
<-
done
:
// A pending import finished, remove all traces of the notification
// A pending import finished, remove all traces of the notification
delete
(
announced
,
hash
)
delete
(
f
.
announced
,
hash
)
delete
(
fetching
,
hash
)
delete
(
f
.
fetching
,
hash
)
delete
(
f
.
queued
,
hash
)
case
<-
fetch
.
C
:
case
<-
fetch
.
C
:
// At least one block's timer ran out, check for needing retrieval
// At least one block's timer ran out, check for needing retrieval
request
:=
make
(
map
[
string
][]
common
.
Hash
)
request
:=
make
(
map
[
string
][]
common
.
Hash
)
for
hash
,
announces
:=
range
announced
{
for
hash
,
announces
:=
range
f
.
announced
{
if
time
.
Since
(
announces
[
0
]
.
time
)
>
arriveTimeout
{
if
time
.
Since
(
announces
[
0
]
.
time
)
>
arriveTimeout
{
announce
:=
announces
[
rand
.
Intn
(
len
(
announces
))]
announce
:=
announces
[
rand
.
Intn
(
len
(
announces
))]
if
!
f
.
hasBlock
(
hash
)
{
if
!
f
.
hasBlock
(
hash
)
{
request
[
announce
.
origin
]
=
append
(
request
[
announce
.
origin
],
hash
)
request
[
announce
.
origin
]
=
append
(
request
[
announce
.
origin
],
hash
)
fetching
[
hash
]
=
announce
f
.
fetching
[
hash
]
=
announce
}
}
delete
(
announced
,
hash
)
delete
(
f
.
announced
,
hash
)
}
}
}
}
// Send out all block requests
// Send out all block requests
for
peer
,
hashes
:=
range
request
{
for
peer
,
hashes
:=
range
request
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: explicitly fetching %d blocks"
,
peer
,
len
(
hashes
))
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: explicitly fetching %d blocks"
,
peer
,
len
(
hashes
))
go
fetching
[
hashes
[
0
]]
.
fetch
(
hashes
)
go
f
.
fetching
[
hashes
[
0
]]
.
fetch
(
hashes
)
}
}
// Schedule the next fetch if blocks are still pending
// Schedule the next fetch if blocks are still pending
if
len
(
announced
)
>
0
{
f
.
reschedule
(
fetch
)
nearest
:=
time
.
Now
()
for
_
,
announces
:=
range
announced
{
if
nearest
.
After
(
announces
[
0
]
.
time
)
{
nearest
=
announces
[
0
]
.
time
}
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Rescheduling fetch in %v, at %v"
,
arriveTimeout
-
time
.
Since
(
nearest
),
nearest
.
Add
(
arriveTimeout
))
fetch
.
Reset
(
arriveTimeout
-
time
.
Since
(
nearest
))
}
case
filter
:=
<-
f
.
filter
:
case
filter
:=
<-
f
.
filter
:
// Blocks arrived, extract any explicit fetches, return all else
// Blocks arrived, extract any explicit fetches, return all else
...
@@ -271,12 +264,12 @@ func (f *Fetcher) loop() {
...
@@ -271,12 +264,12 @@ func (f *Fetcher) loop() {
hash
:=
block
.
Hash
()
hash
:=
block
.
Hash
()
// Filter explicitly requested blocks from hash announcements
// Filter explicitly requested blocks from hash announcements
if
_
,
ok
:=
fetching
[
hash
];
ok
{
if
_
,
ok
:=
f
.
fetching
[
hash
];
ok
{
// Discard if already imported by other means
// Discard if already imported by other means
if
!
f
.
hasBlock
(
hash
)
{
if
!
f
.
hasBlock
(
hash
)
{
explicit
=
append
(
explicit
,
block
)
explicit
=
append
(
explicit
,
block
)
}
else
{
}
else
{
delete
(
fetching
,
hash
)
delete
(
f
.
fetching
,
hash
)
}
}
}
else
{
}
else
{
download
=
append
(
download
,
block
)
download
=
append
(
download
,
block
)
...
@@ -290,7 +283,7 @@ func (f *Fetcher) loop() {
...
@@ -290,7 +283,7 @@ func (f *Fetcher) loop() {
}
}
// Schedule the retrieved blocks for ordered import
// Schedule the retrieved blocks for ordered import
for
_
,
block
:=
range
explicit
{
for
_
,
block
:=
range
explicit
{
if
announce
:=
fetching
[
block
.
Hash
()];
announce
!=
nil
{
if
announce
:=
f
.
fetching
[
block
.
Hash
()];
announce
!=
nil
{
f
.
enqueue
(
announce
.
origin
,
block
)
f
.
enqueue
(
announce
.
origin
,
block
)
}
}
}
}
...
@@ -298,21 +291,40 @@ func (f *Fetcher) loop() {
...
@@ -298,21 +291,40 @@ func (f *Fetcher) loop() {
}
}
}
}
// reschedule resets the specified fetch timer to the next announce timeout.
func
(
f
*
Fetcher
)
reschedule
(
fetch
*
time
.
Timer
)
{
// Short circuit if no blocks are announced
if
len
(
f
.
announced
)
==
0
{
return
}
// Otherwise find the earliest expiring announcement
earliest
:=
time
.
Now
()
for
_
,
announces
:=
range
f
.
announced
{
if
earliest
.
After
(
announces
[
0
]
.
time
)
{
earliest
=
announces
[
0
]
.
time
}
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Scheduling next fetch in %v"
,
arriveTimeout
-
time
.
Since
(
earliest
))
fetch
.
Reset
(
arriveTimeout
-
time
.
Since
(
earliest
))
}
// enqueue schedules a new future import operation, if the block to be imported
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
// has not yet been seen.
func
(
f
*
Fetcher
)
enqueue
(
peer
string
,
block
*
types
.
Block
)
{
func
(
f
*
Fetcher
)
enqueue
(
peer
string
,
block
*
types
.
Block
)
{
hash
:=
block
.
Hash
()
// Make sure the block isn't in some weird place
// Make sure the block isn't in some weird place
if
math
.
Abs
(
float64
(
f
.
chainHeight
())
-
float64
(
block
.
NumberU64
()))
>
maxQueueDist
{
if
math
.
Abs
(
float64
(
f
.
chainHeight
())
-
float64
(
block
.
NumberU64
()))
>
maxQueueDist
{
glog
.
Infof
(
"Peer %s: discarded block #%d [%x] too far from head"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
])
return
return
}
}
// Schedule the block for future importing
// Schedule the block for future importing
hash
:=
block
.
Hash
()
if
_
,
ok
:=
f
.
queued
[
hash
];
!
ok
{
if
_
,
ok
:=
f
.
queued
[
hash
];
!
ok
{
f
.
queued
[
hash
]
=
struct
{}{}
f
.
queued
[
hash
]
=
struct
{}{}
f
.
queue
.
Push
(
&
inject
{
origin
:
peer
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
f
.
queue
.
Push
(
&
inject
{
origin
:
peer
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
if
glog
.
V
(
logger
.
Detail
)
{
if
glog
.
V
(
logger
.
Detail
)
{
glog
.
Infof
(
"Peer %s: queued block %x, total %v"
,
peer
,
hash
.
Bytes
()[
:
4
],
f
.
queue
.
Size
())
glog
.
Infof
(
"Peer %s: queued block
#%d [
%x
]
, total %v"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
f
.
queue
.
Size
())
}
}
}
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment