good morning!!!!

Skip to content
Snippets Groups Projects
Commit aed060a4 authored by Jeffrey Wilcke's avatar Jeffrey Wilcke
Browse files

Updated the wire protocol

parent df7967c5
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ import (
const (
// The size of the output buffer for writing messages
outputBufferSize = 50
outputBufferSize = 50
)
type Peer struct {
......@@ -26,7 +26,7 @@ type Peer struct {
// Determines whether it's an inbound or outbound peer
inbound bool
// Flag for checking the peer's connectivity state
connected int32
connected int32
disconnect int32
// Last known message send
lastSend time.Time
......@@ -90,8 +90,8 @@ func (p *Peer) writeMessage(msg *ethwire.InOutMsg) {
}
if !p.versionKnown {
switch msg.MsgType {
case "verack": // Ok
switch msg.Type {
case ethwire.MsgHandshakeTy: // Ok
default: // Anything but ack is allowed
return
}
......@@ -108,6 +108,8 @@ func (p *Peer) writeMessage(msg *ethwire.InOutMsg) {
// Outbound message handler. Outbound messages are handled here
func (p *Peer) HandleOutbound() {
// The ping timer. Makes sure that every 2 minutes a ping is send to the peer
tickleTimer := time.NewTimer(2 * time.Minute)
out:
for {
select {
......@@ -116,6 +118,10 @@ out:
p.writeMessage(msg)
p.lastSend = time.Now()
case <-tickleTimer.C:
p.writeMessage(&ethwire.InOutMsg{Type: ethwire.MsgPingTy})
// Break out of the for loop if a quit message is posted
case <-p.quit:
break out
......@@ -126,7 +132,7 @@ clean:
// This loop is for draining the output queue and anybody waiting for us
for {
select {
case <- p.outputQueue:
case <-p.outputQueue:
// TODO
default:
break clean
......@@ -148,23 +154,32 @@ out:
}
if Debug {
log.Printf("Received %s\n", msg.MsgType)
log.Printf("Received %s\n", msg.Type.String())
}
// TODO Hash data and check if for existence (= ignore)
switch msg.MsgType {
case "verack":
switch msg.Type {
case ethwire.MsgHandshakeTy:
// Version message
p.handleVersionAck(msg)
case "block":
err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(msg.Data))
p.handleHandshake(msg)
case ethwire.MsgBlockTy:
err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(ethutil.Encode(msg.Data)))
if err != nil {
log.Println(err)
}
case "blockmine":
d, _ := ethutil.Decode(msg.Data, 0)
log.Printf("block mined %s\n", d)
case ethwire.MsgTxTy:
case ethwire.MsgInvTy:
case ethwire.MsgGetPeersTy:
case ethwire.MsgPeersTy:
case ethwire.MsgPingTy:
case ethwire.MsgPongTy:
/*
case "blockmine":
d, _ := ethutil.Decode(msg.Data, 0)
log.Printf("block mined %s\n", d)
*/
}
}
......@@ -173,7 +188,7 @@ out:
func (p *Peer) Start() {
if !p.inbound {
err := p.pushVersionAck()
err := p.pushHandshake()
if err != nil {
log.Printf("Peer can't send outbound version ack", err)
......@@ -200,17 +215,21 @@ func (p *Peer) Stop() {
log.Println("Peer shutdown")
}
func (p *Peer) pushVersionAck() error {
msg := ethwire.NewMessage("verack", p.server.Nonce, []byte("01"))
func (p *Peer) pushHandshake() error {
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{
1, 0, p.server.Nonce,
}))
p.QueueMessage(msg)
return nil
}
func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) {
// Detect self connect
if msg.Nonce == p.server.Nonce {
func (p *Peer) handleHandshake(msg *ethwire.InOutMsg) {
c := ethutil.Conv(msg.Data)
// [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
if c.Get(2).AsUint() == p.server.Nonce {
//if msg.Nonce == p.server.Nonce {
log.Println("Peer connected to self, disconnecting")
p.Stop()
......@@ -222,7 +241,7 @@ func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) {
// If this is an inbound connection send an ack back
if p.inbound {
err := p.pushVersionAck()
err := p.pushHandshake()
if err != nil {
log.Println("Peer can't send ack back")
......
......@@ -7,8 +7,8 @@ import (
"github.com/ethereum/ethwire-go"
"log"
"net"
"time"
"sync/atomic"
"time"
)
func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
......@@ -20,7 +20,6 @@ func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
}
}
type Server struct {
// Channel for shutting down the server
shutdownChan chan bool
......@@ -75,14 +74,14 @@ func (s *Server) ConnectToPeer(addr string) error {
return nil
}
func (s *Server) Broadcast(msgType string, data []byte) {
func (s *Server) Broadcast(msgType ethwire.MsgType, data []byte) {
eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(ethwire.NewMessage(msgType, 0, data))
p.QueueMessage(ethwire.NewMessage(msgType, data))
})
}
const (
processReapingTimeout = 10 // TODO increase
processReapingTimeout = 10 // TODO increase
)
func (s *Server) ReapDeadPeers() {
......@@ -139,13 +138,13 @@ func (s *Server) Start() {
// TMP
/*
go func() {
for {
s.Broadcast("block", s.blockManager.bc.GenesisBlock().MarshalRlp())
go func() {
for {
s.Broadcast("block", s.blockManager.bc.GenesisBlock().RlpEncode())
time.Sleep(1000 * time.Millisecond)
}
}()
time.Sleep(1000 * time.Millisecond)
}
}()
*/
}
......@@ -154,7 +153,7 @@ func (s *Server) Stop() {
defer s.db.Close()
eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.Stop()
p.Stop()
})
s.shutdownChan <- true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment