good morning!!!!

Skip to content
Snippets Groups Projects
Commit 84690bfb authored by Jeffrey Wilcke's avatar Jeffrey Wilcke
Browse files

Changed the block fetching code and hash distribution

parent d3a0bb4f
Branches
Tags
No related merge requests found
......@@ -3,17 +3,21 @@ package eth
import (
"bytes"
"container/list"
"fmt"
"math"
"math/big"
"sync"
"time"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
)
var poollogger = ethlog.NewLogger("[BPOOL]")
type block struct {
from *Peer
peer *Peer
block *ethchain.Block
reqAt time.Time
......@@ -30,6 +34,8 @@ type BlockPool struct {
td *big.Int
quit chan bool
ChainLength, BlocksProcessed int
}
func NewBlockPool(eth *Ethereum) *BlockPool {
......@@ -53,9 +59,9 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool {
return self.eth.BlockChain().GetBlock(hash) != nil
}
func (self *BlockPool) AddHash(hash []byte) {
func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
if self.pool[string(hash)] == nil {
self.pool[string(hash)] = &block{nil, nil, time.Now(), 0}
self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0}
self.hashPool = append([][]byte{hash}, self.hashPool...)
}
......@@ -66,10 +72,12 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) {
if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) {
self.hashPool = append(self.hashPool, b.Hash())
self.pool[hash] = &block{peer, b, time.Now(), 0}
self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
} else if self.pool[hash] != nil {
self.pool[hash].block = b
}
self.BlocksProcessed++
}
func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block {
......@@ -94,18 +102,24 @@ func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks
return blocks
}
func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) {
var blocks ethchain.Blocks
func (self *BlockPool) Blocks() (blocks ethchain.Blocks) {
for _, item := range self.pool {
if item.block != nil {
blocks = append(blocks, item.block)
}
}
return
}
func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) {
blocks := self.Blocks()
ethchain.BlockBy(ethchain.Number).Sort(blocks)
for _, block := range blocks {
if self.eth.BlockChain().HasBlock(block.PrevHash) {
procAmount++
f(block)
hash := block.Hash()
......@@ -114,31 +128,58 @@ func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) {
}
}
return
}
func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) {
self.mut.Lock()
defer self.mut.Unlock()
func (self *BlockPool) DistributeHashes() {
var (
peerLen = self.eth.peers.Len()
amount = 200 * peerLen
dist = make(map[*Peer][][]byte)
)
num := int(math.Min(float64(amount), float64(len(self.pool))))
j := 0
for i := 0; i < len(self.hashPool) && j < num; i++ {
hash := string(self.hashPool[i])
item := self.pool[hash]
if item != nil && item.block == nil &&
(item.peer == nil ||
((time.Since(item.reqAt) > 5*time.Second && item.peer != peer) && self.eth.peers.Len() > 1) || // multiple peers
(time.Since(item.reqAt) > 5*time.Second && self.eth.peers.Len() == 1) /* single peer*/) {
self.pool[hash].peer = peer
self.pool[hash].reqAt = time.Now()
self.pool[hash].requested++
for i, j := 0, 0; i < len(self.hashPool) && j < num; i++ {
hash := self.hashPool[i]
item := self.pool[string(hash)]
if item != nil && item.block == nil {
var peer *Peer
lastFetchFailed := time.Since(item.reqAt) > 5*time.Second
// Handle failed requests
if lastFetchFailed && item.requested > 0 && item.peer != nil {
if item.requested < 100 {
// Select peer the hash was retrieved off
peer = item.from
} else {
// Remove it
self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash)
delete(self.pool, string(hash))
}
} else if lastFetchFailed || item.peer == nil {
// Find a suitable, available peer
eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
if peer == nil && len(dist[p]) < amount/peerLen {
peer = p
}
})
}
if peer != nil {
item.reqAt = time.Now()
item.peer = peer
item.requested++
hashes = append(hashes, self.hashPool[i])
j++
dist[peer] = append(dist[peer], hash)
}
}
}
return
for peer, hashes := range dist {
peer.FetchBlocks(hashes)
}
}
func (self *BlockPool) Start() {
......@@ -158,7 +199,8 @@ out:
case <-self.quit:
break out
case <-serviceTimer.C:
// Clean up hashes that can't be fetched
// Check if we're catching up. If not distribute the hashes to
// the peers and download the blockchain
done := true
eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
if p.statusKnown && p.FetchingHashes() {
......@@ -166,29 +208,27 @@ out:
}
})
if done {
eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
if p.statusKnown {
hashes := self.Take(100, p)
if len(hashes) > 0 {
p.FetchBlocks(hashes)
if len(hashes) == 1 {
fmt.Printf("last hash = %x\n", hashes[0])
} else {
fmt.Println("Requesting", len(hashes), "of", p)
}
if done && len(self.hashPool) > 0 {
self.DistributeHashes()
}
}
})
if self.ChainLength < len(self.hashPool) {
self.ChainLength = len(self.hashPool)
}
case <-procTimer.C:
var err error
self.CheckLinkAndProcess(func(block *ethchain.Block) {
err = self.eth.StateManager().Process(block, false)
// XXX We can optimize this lifting this on to a new goroutine.
// We'd need to make sure that the pools are properly protected by a mutex
amount := self.ProcessCanonical(func(block *ethchain.Block) {
err := self.eth.StateManager().Process(block, false)
if err != nil {
poollogger.Infoln(err)
}
})
if err != nil {
peerlogger.Infoln(err)
// Do not propagate to the network on catchups
if amount == 1 {
block := self.eth.BlockChain().CurrentBlock
self.eth.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val})
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment