From 771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb Mon Sep 17 00:00:00 2001
From: zelig <viktor.tron@gmail.com>
Date: Thu, 23 Oct 2014 16:57:54 +0100
Subject: [PATCH] initial commit of p2p package

---
 p2p/client_identity.go         |  63 +++++
 p2p/client_identity_test.go    |  30 ++
 p2p/connection.go              | 275 +++++++++++++++++++
 p2p/connection_test.go         | 222 +++++++++++++++
 p2p/message.go                 |  75 +++++
 p2p/message_test.go            |  38 +++
 p2p/messenger.go               | 220 +++++++++++++++
 p2p/messenger_test.go          | 146 ++++++++++
 p2p/natpmp.go                  |  55 ++++
 p2p/natupnp.go                 | 335 +++++++++++++++++++++++
 p2p/network.go                 | 196 +++++++++++++
 p2p/peer.go                    |  83 ++++++
 p2p/peer_error.go              |  76 ++++++
 p2p/peer_error_handler.go      | 101 +++++++
 p2p/peer_error_handler_test.go |  34 +++
 p2p/peer_test.go               |  96 +++++++
 p2p/protocol.go                | 278 +++++++++++++++++++
 p2p/server.go                  | 484 +++++++++++++++++++++++++++++++++
 p2p/server_test.go             | 208 ++++++++++++++
 19 files changed, 3015 insertions(+)
 create mode 100644 p2p/client_identity.go
 create mode 100644 p2p/client_identity_test.go
 create mode 100644 p2p/connection.go
 create mode 100644 p2p/connection_test.go
 create mode 100644 p2p/message.go
 create mode 100644 p2p/message_test.go
 create mode 100644 p2p/messenger.go
 create mode 100644 p2p/messenger_test.go
 create mode 100644 p2p/natpmp.go
 create mode 100644 p2p/natupnp.go
 create mode 100644 p2p/network.go
 create mode 100644 p2p/peer.go
 create mode 100644 p2p/peer_error.go
 create mode 100644 p2p/peer_error_handler.go
 create mode 100644 p2p/peer_error_handler_test.go
 create mode 100644 p2p/peer_test.go
 create mode 100644 p2p/protocol.go
 create mode 100644 p2p/server.go
 create mode 100644 p2p/server_test.go

diff --git a/p2p/client_identity.go b/p2p/client_identity.go
new file mode 100644
index 000000000..236b23106
--- /dev/null
+++ b/p2p/client_identity.go
@@ -0,0 +1,63 @@
+package p2p
+
+import (
+	"fmt"
+	"runtime"
+)
+
+// should be used in Peer handleHandshake, incorporate Caps, ProtocolVersion, Pubkey etc.
+type ClientIdentity interface {
+	String() string
+	Pubkey() []byte
+}
+
+type SimpleClientIdentity struct {
+	clientIdentifier string
+	version          string
+	customIdentifier string
+	os               string
+	implementation   string
+	pubkey           string
+}
+
+func NewSimpleClientIdentity(clientIdentifier string, version string, customIdentifier string, pubkey string) *SimpleClientIdentity {
+	clientIdentity := &SimpleClientIdentity{
+		clientIdentifier: clientIdentifier,
+		version:          version,
+		customIdentifier: customIdentifier,
+		os:               runtime.GOOS,
+		implementation:   runtime.Version(),
+		pubkey:           pubkey,
+	}
+
+	return clientIdentity
+}
+
+func (c *SimpleClientIdentity) init() {
+}
+
+func (c *SimpleClientIdentity) String() string {
+	var id string
+	if len(c.customIdentifier) > 0 {
+		id = "/" + c.customIdentifier
+	}
+
+	return fmt.Sprintf("%s/v%s%s/%s/%s",
+		c.clientIdentifier,
+		c.version,
+		id,
+		c.os,
+		c.implementation)
+}
+
+func (c *SimpleClientIdentity) Pubkey() []byte {
+	return []byte(c.pubkey)
+}
+
+func (c *SimpleClientIdentity) SetCustomIdentifier(customIdentifier string) {
+	c.customIdentifier = customIdentifier
+}
+
+func (c *SimpleClientIdentity) GetCustomIdentifier() string {
+	return c.customIdentifier
+}
diff --git a/p2p/client_identity_test.go b/p2p/client_identity_test.go
new file mode 100644
index 000000000..40b0e6f5e
--- /dev/null
+++ b/p2p/client_identity_test.go
@@ -0,0 +1,30 @@
+package p2p
+
+import (
+	"fmt"
+	"runtime"
+	"testing"
+)
+
+func TestClientIdentity(t *testing.T) {
+	clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test", "pubkey")
+	clientString := clientIdentity.String()
+	expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version())
+	if clientString != expected {
+		t.Errorf("Expected clientIdentity to be %v, got %v", expected, clientString)
+	}
+	customIdentifier := clientIdentity.GetCustomIdentifier()
+	if customIdentifier != "test" {
+		t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test', got %v", customIdentifier)
+	}
+	clientIdentity.SetCustomIdentifier("test2")
+	customIdentifier = clientIdentity.GetCustomIdentifier()
+	if customIdentifier != "test2" {
+		t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test2', got %v", customIdentifier)
+	}
+	clientString = clientIdentity.String()
+	expected = fmt.Sprintf("Ethereum(G)/v0.5.16/test2/%s/%s", runtime.GOOS, runtime.Version())
+	if clientString != expected {
+		t.Errorf("Expected clientIdentity to be %v, got %v", expected, clientString)
+	}
+}
diff --git a/p2p/connection.go b/p2p/connection.go
new file mode 100644
index 000000000..e999cbe55
--- /dev/null
+++ b/p2p/connection.go
@@ -0,0 +1,275 @@
+package p2p
+
+import (
+	"bytes"
+	// "fmt"
+	"net"
+	"time"
+
+	"github.com/ethereum/eth-go/ethutil"
+)
+
+type Connection struct {
+	conn net.Conn
+	// conn       NetworkConnection
+	timeout    time.Duration
+	in         chan []byte
+	out        chan []byte
+	err        chan *PeerError
+	closingIn  chan chan bool
+	closingOut chan chan bool
+}
+
+// const readBufferLength = 2 //for testing
+
+const readBufferLength = 1440
+const partialsQueueSize = 10
+const maxPendingQueueSize = 1
+const defaultTimeout = 500
+
+var magicToken = []byte{34, 64, 8, 145}
+
+func (self *Connection) Open() {
+	go self.startRead()
+	go self.startWrite()
+}
+
+func (self *Connection) Close() {
+	self.closeIn()
+	self.closeOut()
+}
+
+func (self *Connection) closeIn() {
+	errc := make(chan bool)
+	self.closingIn <- errc
+	<-errc
+}
+
+func (self *Connection) closeOut() {
+	errc := make(chan bool)
+	self.closingOut <- errc
+	<-errc
+}
+
+func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
+	return &Connection{
+		conn:       conn,
+		timeout:    defaultTimeout,
+		in:         make(chan []byte),
+		out:        make(chan []byte),
+		err:        errchan,
+		closingIn:  make(chan chan bool, 1),
+		closingOut: make(chan chan bool, 1),
+	}
+}
+
+func (self *Connection) Read() <-chan []byte {
+	return self.in
+}
+
+func (self *Connection) Write() chan<- []byte {
+	return self.out
+}
+
+func (self *Connection) Error() <-chan *PeerError {
+	return self.err
+}
+
+func (self *Connection) startRead() {
+	payloads := make(chan []byte)
+	done := make(chan *PeerError)
+	pending := [][]byte{}
+	var head []byte
+	var wait time.Duration // initally 0 (no delay)
+	read := time.After(wait * time.Millisecond)
+
+	for {
+		// if pending empty, nil channel blocks
+		var in chan []byte
+		if len(pending) > 0 {
+			in = self.in // enable send case
+			head = pending[0]
+		} else {
+			in = nil
+		}
+
+		select {
+		case <-read:
+			go self.read(payloads, done)
+		case err := <-done:
+			if err == nil { // no error but nothing to read
+				if len(pending) < maxPendingQueueSize {
+					wait = 100
+				} else if wait == 0 {
+					wait = 100
+				} else {
+					wait = 2 * wait
+				}
+			} else {
+				self.err <- err // report error
+				wait = 100
+			}
+			read = time.After(wait * time.Millisecond)
+		case payload := <-payloads:
+			pending = append(pending, payload)
+			if len(pending) < maxPendingQueueSize {
+				wait = 0
+			} else {
+				wait = 100
+			}
+			read = time.After(wait * time.Millisecond)
+		case in <- head:
+			pending = pending[1:]
+		case errc := <-self.closingIn:
+			errc <- true
+			close(self.in)
+			return
+		}
+
+	}
+}
+
+func (self *Connection) startWrite() {
+	pending := [][]byte{}
+	done := make(chan *PeerError)
+	writing := false
+	for {
+		if len(pending) > 0 && !writing {
+			writing = true
+			go self.write(pending[0], done)
+		}
+		select {
+		case payload := <-self.out:
+			pending = append(pending, payload)
+		case err := <-done:
+			if err == nil {
+				pending = pending[1:]
+				writing = false
+			} else {
+				self.err <- err // report error
+			}
+		case errc := <-self.closingOut:
+			errc <- true
+			close(self.out)
+			return
+		}
+	}
+}
+
+func pack(payload []byte) (packet []byte) {
+	length := ethutil.NumberToBytes(uint32(len(payload)), 32)
+	// return error if too long?
+	// Write magic token and payload length (first 8 bytes)
+	packet = append(magicToken, length...)
+	packet = append(packet, payload...)
+	return
+}
+
+func avoidPanic(done chan *PeerError) {
+	if rec := recover(); rec != nil {
+		err := NewPeerError(MiscError, " %v", rec)
+		logger.Debugln(err)
+		done <- err
+	}
+}
+
+func (self *Connection) write(payload []byte, done chan *PeerError) {
+	defer avoidPanic(done)
+	var err *PeerError
+	_, ok := self.conn.Write(pack(payload))
+	if ok != nil {
+		err = NewPeerError(WriteError, " %v", ok)
+		logger.Debugln(err)
+	}
+	done <- err
+}
+
+func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
+	//defer avoidPanic(done)
+
+	partials := make(chan []byte, partialsQueueSize)
+	errc := make(chan *PeerError)
+	go self.readPartials(partials, errc)
+
+	packet := []byte{}
+	length := 8
+	start := true
+	var err *PeerError
+out:
+	for {
+		// appends partials read via connection until packet is
+		// - either parseable (>=8bytes)
+		// - or complete (payload fully consumed)
+		for len(packet) < length {
+			partial, ok := <-partials
+			if !ok { // partials channel is closed
+				err = <-errc
+				if err == nil && len(packet) > 0 {
+					if start {
+						err = NewPeerError(PacketTooShort, "%v", packet)
+					} else {
+						err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
+					}
+				}
+				break out
+			}
+			packet = append(packet, partial...)
+		}
+		if start {
+			// at least 8 bytes read, can validate packet
+			if bytes.Compare(magicToken, packet[:4]) != 0 {
+				err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
+				break
+			}
+			length = int(ethutil.BytesToNumber(packet[4:8]))
+			packet = packet[8:]
+
+			if length > 0 {
+				start = false // now consuming payload
+			} else { //penalize peer but read on
+				self.err <- NewPeerError(EmptyPayload, "")
+				length = 8
+			}
+		} else {
+			// packet complete (payload fully consumed)
+			payloads <- packet[:length]
+			packet = packet[length:] // resclice packet
+			start = true
+			length = 8
+		}
+	}
+
+	// this stops partials read via the connection, should we?
+	//if err != nil {
+	//  select {
+	//    case errc <- err
+	//  default:
+	//}
+	done <- err
+}
+
+func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
+	defer close(partials)
+	for {
+		// Give buffering some time
+		self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
+		buffer := make([]byte, readBufferLength)
+		// read partial from connection
+		bytesRead, err := self.conn.Read(buffer)
+		if err == nil || err.Error() == "EOF" {
+			if bytesRead > 0 {
+				partials <- buffer[:bytesRead]
+			}
+			if err != nil && err.Error() == "EOF" {
+				break
+			}
+		} else {
+			// unexpected error, report to errc
+			err := NewPeerError(ReadError, " %v", err)
+			logger.Debugln(err)
+			errc <- err
+			return // will close partials channel
+		}
+	}
+	close(errc)
+}
diff --git a/p2p/connection_test.go b/p2p/connection_test.go
new file mode 100644
index 000000000..76ee8021c
--- /dev/null
+++ b/p2p/connection_test.go
@@ -0,0 +1,222 @@
+package p2p
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"net"
+	"testing"
+	"time"
+)
+
+type TestNetworkConnection struct {
+	in      chan []byte
+	current []byte
+	Out     [][]byte
+	addr    net.Addr
+}
+
+func NewTestNetworkConnection(addr net.Addr) *TestNetworkConnection {
+	return &TestNetworkConnection{
+		in:      make(chan []byte),
+		current: []byte{},
+		Out:     [][]byte{},
+		addr:    addr,
+	}
+}
+
+func (self *TestNetworkConnection) In(latency time.Duration, packets ...[]byte) {
+	time.Sleep(latency)
+	for _, s := range packets {
+		self.in <- s
+	}
+}
+
+func (self *TestNetworkConnection) Read(buff []byte) (n int, err error) {
+	if len(self.current) == 0 {
+		select {
+		case self.current = <-self.in:
+		default:
+			return 0, io.EOF
+		}
+	}
+	length := len(self.current)
+	if length > len(buff) {
+		copy(buff[:], self.current[:len(buff)])
+		self.current = self.current[len(buff):]
+		return len(buff), nil
+	} else {
+		copy(buff[:length], self.current[:])
+		self.current = []byte{}
+		return length, io.EOF
+	}
+}
+
+func (self *TestNetworkConnection) Write(buff []byte) (n int, err error) {
+	self.Out = append(self.Out, buff)
+	fmt.Printf("net write %v\n%v\n", len(self.Out), buff)
+	return len(buff), nil
+}
+
+func (self *TestNetworkConnection) Close() (err error) {
+	return
+}
+
+func (self *TestNetworkConnection) LocalAddr() (addr net.Addr) {
+	return
+}
+
+func (self *TestNetworkConnection) RemoteAddr() (addr net.Addr) {
+	return self.addr
+}
+
+func (self *TestNetworkConnection) SetDeadline(t time.Time) (err error) {
+	return
+}
+
+func (self *TestNetworkConnection) SetReadDeadline(t time.Time) (err error) {
+	return
+}
+
+func (self *TestNetworkConnection) SetWriteDeadline(t time.Time) (err error) {
+	return
+}
+
+func setupConnection() (*Connection, *TestNetworkConnection) {
+	addr := &TestAddr{"test:30303"}
+	net := NewTestNetworkConnection(addr)
+	conn := NewConnection(net, NewPeerErrorChannel())
+	conn.Open()
+	return conn, net
+}
+
+func TestReadingNilPacket(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{})
+	// time.Sleep(10 * time.Millisecond)
+	select {
+	case packet := <-conn.Read():
+		t.Errorf("read %v", packet)
+	case err := <-conn.Error():
+		t.Errorf("incorrect error %v", err)
+	default:
+	}
+	conn.Close()
+}
+
+func TestReadingShortPacket(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{0})
+	select {
+	case packet := <-conn.Read():
+		t.Errorf("read %v", packet)
+	case err := <-conn.Error():
+		if err.Code != PacketTooShort {
+			t.Errorf("incorrect error %v, expected %v", err.Code, PacketTooShort)
+		}
+	}
+	conn.Close()
+}
+
+func TestReadingInvalidPacket(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+	select {
+	case packet := <-conn.Read():
+		t.Errorf("read %v", packet)
+	case err := <-conn.Error():
+		if err.Code != MagicTokenMismatch {
+			t.Errorf("incorrect error %v, expected %v", err.Code, MagicTokenMismatch)
+		}
+	}
+	conn.Close()
+}
+
+func TestReadingInvalidPayload(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{34, 64, 8, 145, 0, 0, 0, 2, 0})
+	select {
+	case packet := <-conn.Read():
+		t.Errorf("read %v", packet)
+	case err := <-conn.Error():
+		if err.Code != PayloadTooShort {
+			t.Errorf("incorrect error %v, expected %v", err.Code, PayloadTooShort)
+		}
+	}
+	conn.Close()
+}
+
+func TestReadingEmptyPayload(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{34, 64, 8, 145, 0, 0, 0, 0})
+	time.Sleep(10 * time.Millisecond)
+	select {
+	case packet := <-conn.Read():
+		t.Errorf("read %v", packet)
+	default:
+	}
+	select {
+	case err := <-conn.Error():
+		code := err.Code
+		if code != EmptyPayload {
+			t.Errorf("incorrect error, expected EmptyPayload, got %v", code)
+		}
+	default:
+		t.Errorf("no error, expected EmptyPayload")
+	}
+	conn.Close()
+}
+
+func TestReadingCompletePacket(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{34, 64, 8, 145, 0, 0, 0, 1, 1})
+	time.Sleep(10 * time.Millisecond)
+	select {
+	case packet := <-conn.Read():
+		if bytes.Compare(packet, []byte{1}) != 0 {
+			t.Errorf("incorrect payload read")
+		}
+	case err := <-conn.Error():
+		t.Errorf("incorrect error %v", err)
+	default:
+		t.Errorf("nothing read")
+	}
+	conn.Close()
+}
+
+func TestReadingTwoCompletePackets(t *testing.T) {
+	conn, net := setupConnection()
+	go net.In(0, []byte{34, 64, 8, 145, 0, 0, 0, 1, 0, 34, 64, 8, 145, 0, 0, 0, 1, 1})
+
+	for i := 0; i < 2; i++ {
+		time.Sleep(10 * time.Millisecond)
+		select {
+		case packet := <-conn.Read():
+			if bytes.Compare(packet, []byte{byte(i)}) != 0 {
+				t.Errorf("incorrect payload read")
+			}
+		case err := <-conn.Error():
+			t.Errorf("incorrect error %v", err)
+		default:
+			t.Errorf("nothing read")
+		}
+	}
+	conn.Close()
+}
+
+func TestWriting(t *testing.T) {
+	conn, net := setupConnection()
+	conn.Write() <- []byte{0}
+	time.Sleep(10 * time.Millisecond)
+	if len(net.Out) == 0 {
+		t.Errorf("no output")
+	} else {
+		out := net.Out[0]
+		if bytes.Compare(out, []byte{34, 64, 8, 145, 0, 0, 0, 1, 0}) != 0 {
+			t.Errorf("incorrect packet %v", out)
+		}
+	}
+	conn.Close()
+}
+
+// hello packet with client id ABC: 0x22 40 08 91 00 00 00 08 84 00 00 00 43414243
diff --git a/p2p/message.go b/p2p/message.go
new file mode 100644
index 000000000..4886eaa1f
--- /dev/null
+++ b/p2p/message.go
@@ -0,0 +1,75 @@
+package p2p
+
+import (
+	// "fmt"
+	"github.com/ethereum/eth-go/ethutil"
+)
+
+type MsgCode uint8
+
+type Msg struct {
+	code    MsgCode // this is the raw code as per adaptive msg code scheme
+	data    *ethutil.Value
+	encoded []byte
+}
+
+func (self *Msg) Code() MsgCode {
+	return self.code
+}
+
+func (self *Msg) Data() *ethutil.Value {
+	return self.data
+}
+
+func NewMsg(code MsgCode, params ...interface{}) (msg *Msg, err error) {
+
+	// // data := [][]interface{}{}
+	// data := []interface{}{}
+	// for _, value := range params {
+	// 	if encodable, ok := value.(ethutil.RlpEncodeDecode); ok {
+	// 		data = append(data, encodable.RlpValue())
+	// 	} else if raw, ok := value.([]interface{}); ok {
+	// 		data = append(data, raw)
+	// 	} else {
+	// 		// data = append(data, interface{}(raw))
+	// 		err = fmt.Errorf("Unable to encode object of type %T", value)
+	// 		return
+	// 	}
+	// }
+	return &Msg{
+		code: code,
+		data: ethutil.NewValue(interface{}(params)),
+	}, nil
+}
+
+func NewMsgFromBytes(encoded []byte) (msg *Msg, err error) {
+	value := ethutil.NewValueFromBytes(encoded)
+	// Type of message
+	code := value.Get(0).Uint()
+	// Actual data
+	data := value.SliceFrom(1)
+
+	msg = &Msg{
+		code: MsgCode(code),
+		data: data,
+		// data:    ethutil.NewValue(data),
+		encoded: encoded,
+	}
+	return
+}
+
+func (self *Msg) Decode(offset MsgCode) {
+	self.code = self.code - offset
+}
+
+// encode takes an offset argument to implement adaptive message coding
+// the encoded message is memoized to make msgs relayed to several peers more efficient
+func (self *Msg) Encode(offset MsgCode) (res []byte) {
+	if len(self.encoded) == 0 {
+		res = ethutil.NewValue(append([]interface{}{byte(self.code + offset)}, self.data.Slice()...)).Encode()
+		self.encoded = res
+	} else {
+		res = self.encoded
+	}
+	return
+}
diff --git a/p2p/message_test.go b/p2p/message_test.go
new file mode 100644
index 000000000..e9d46f2c3
--- /dev/null
+++ b/p2p/message_test.go
@@ -0,0 +1,38 @@
+package p2p
+
+import (
+	"testing"
+)
+
+func TestNewMsg(t *testing.T) {
+	msg, _ := NewMsg(3, 1, "000")
+	if msg.Code() != 3 {
+		t.Errorf("incorrect code %v", msg.Code())
+	}
+	data0 := msg.Data().Get(0).Uint()
+	data1 := string(msg.Data().Get(1).Bytes())
+	if data0 != 1 {
+		t.Errorf("incorrect data %v", data0)
+	}
+	if data1 != "000" {
+		t.Errorf("incorrect data %v", data1)
+	}
+}
+
+func TestEncodeDecodeMsg(t *testing.T) {
+	msg, _ := NewMsg(3, 1, "000")
+	encoded := msg.Encode(3)
+	msg, _ = NewMsgFromBytes(encoded)
+	msg.Decode(3)
+	if msg.Code() != 3 {
+		t.Errorf("incorrect code %v", msg.Code())
+	}
+	data0 := msg.Data().Get(0).Uint()
+	data1 := msg.Data().Get(1).Str()
+	if data0 != 1 {
+		t.Errorf("incorrect data %v", data0)
+	}
+	if data1 != "000" {
+		t.Errorf("incorrect data %v", data1)
+	}
+}
diff --git a/p2p/messenger.go b/p2p/messenger.go
new file mode 100644
index 000000000..d42ba1720
--- /dev/null
+++ b/p2p/messenger.go
@@ -0,0 +1,220 @@
+package p2p
+
+import (
+	"fmt"
+	"sync"
+	"time"
+)
+
+const (
+	handlerTimeout = 1000
+)
+
+type Handlers map[string](func(p *Peer) Protocol)
+
+type Messenger struct {
+	conn          *Connection
+	peer          *Peer
+	handlers      Handlers
+	protocolLock  sync.RWMutex
+	protocols     []Protocol
+	offsets       []MsgCode // offsets for adaptive message idss
+	protocolTable map[string]int
+	quit          chan chan bool
+	err           chan *PeerError
+	pulse         chan bool
+}
+
+func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
+	baseProtocol := NewBaseProtocol(peer)
+	return &Messenger{
+		conn:          conn,
+		peer:          peer,
+		offsets:       []MsgCode{baseProtocol.Offset()},
+		handlers:      handlers,
+		protocols:     []Protocol{baseProtocol},
+		protocolTable: make(map[string]int),
+		err:           errchan,
+		pulse:         make(chan bool, 1),
+		quit:          make(chan chan bool, 1),
+	}
+}
+
+func (self *Messenger) Start() {
+	self.conn.Open()
+	go self.messenger()
+	self.protocolLock.RLock()
+	defer self.protocolLock.RUnlock()
+	self.protocols[0].Start()
+}
+
+func (self *Messenger) Stop() {
+	// close pulse to stop ping pong monitoring
+	close(self.pulse)
+	self.protocolLock.RLock()
+	defer self.protocolLock.RUnlock()
+	for _, protocol := range self.protocols {
+		protocol.Stop() // could be parallel
+	}
+	q := make(chan bool)
+	self.quit <- q
+	<-q
+	self.conn.Close()
+}
+
+func (self *Messenger) messenger() {
+	in := self.conn.Read()
+	for {
+		select {
+		case payload, ok := <-in:
+			//dispatches message to the protocol asynchronously
+			if ok {
+				go self.handle(payload)
+			} else {
+				return
+			}
+		case q := <-self.quit:
+			q <- true
+			return
+		}
+	}
+}
+
+// handles each message by dispatching to the appropriate protocol
+// using adaptive message codes
+// this function is started as a separate go routine for each message
+// it waits for the protocol response
+// then encodes and sends outgoing messages to the connection's write channel
+func (self *Messenger) handle(payload []byte) {
+	// send ping to heartbeat channel signalling time of last message
+	// select {
+	// case self.pulse <- true:
+	// default:
+	// }
+	self.pulse <- true
+	// initialise message from payload
+	msg, err := NewMsgFromBytes(payload)
+	if err != nil {
+		self.err <- NewPeerError(MiscError, " %v", err)
+		return
+	}
+	// retrieves protocol based on message Code
+	protocol, offset, peerErr := self.getProtocol(msg.Code())
+	if err != nil {
+		self.err <- peerErr
+		return
+	}
+	// reset message code based on adaptive offset
+	msg.Decode(offset)
+	// dispatches
+	response := make(chan *Msg)
+	go protocol.HandleIn(msg, response)
+	// protocol reponse timeout to prevent leaks
+	timer := time.After(handlerTimeout * time.Millisecond)
+	for {
+		select {
+		case outgoing, ok := <-response:
+			// we check if response channel is not closed
+			if ok {
+				self.conn.Write() <- outgoing.Encode(offset)
+			} else {
+				return
+			}
+		case <-timer:
+			return
+		}
+	}
+}
+
+// negotiated protocols
+// stores offsets needed for adaptive message id scheme
+
+// based on offsets set at handshake
+// get the right protocol to handle the message
+func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
+	self.protocolLock.RLock()
+	defer self.protocolLock.RUnlock()
+	base := MsgCode(0)
+	for index, offset := range self.offsets {
+		if code < offset {
+			return self.protocols[index], base, nil
+		}
+		base = offset
+	}
+	return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
+}
+
+func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
+	fmt.Printf("pingpong keepalive started at %v", time.Now())
+
+	timer := time.After(timeout)
+	pinged := false
+	for {
+		select {
+		case _, ok := <-self.pulse:
+			if ok {
+				pinged = false
+				timer = time.After(timeout)
+			} else {
+				// pulse is closed, stop monitoring
+				return
+			}
+		case <-timer:
+			if pinged {
+				fmt.Printf("timeout at %v", time.Now())
+				timeoutCallback()
+				return
+			} else {
+				fmt.Printf("pinged at %v", time.Now())
+				pingCallback()
+				timer = time.After(gracePeriod)
+				pinged = true
+			}
+		}
+	}
+}
+
+func (self *Messenger) AddProtocols(protocols []string) {
+	self.protocolLock.Lock()
+	defer self.protocolLock.Unlock()
+	i := len(self.offsets)
+	offset := self.offsets[i-1]
+	for _, name := range protocols {
+		protocolFunc, ok := self.handlers[name]
+		if ok {
+			protocol := protocolFunc(self.peer)
+			self.protocolTable[name] = i
+			i++
+			offset += protocol.Offset()
+			fmt.Println("offset ", name, offset)
+
+			self.offsets = append(self.offsets, offset)
+			self.protocols = append(self.protocols, protocol)
+			protocol.Start()
+		} else {
+			fmt.Println("no ", name)
+			// protocol not handled
+		}
+	}
+}
+
+func (self *Messenger) Write(protocol string, msg *Msg) error {
+	self.protocolLock.RLock()
+	defer self.protocolLock.RUnlock()
+	i := 0
+	offset := MsgCode(0)
+	if len(protocol) > 0 {
+		var ok bool
+		i, ok = self.protocolTable[protocol]
+		if !ok {
+			return fmt.Errorf("protocol %v not handled by peer", protocol)
+		}
+		offset = self.offsets[i-1]
+	}
+	handler := self.protocols[i]
+	// checking if protocol status/caps allows the message to be sent out
+	if handler.HandleOut(msg) {
+		self.conn.Write() <- msg.Encode(offset)
+	}
+	return nil
+}
diff --git a/p2p/messenger_test.go b/p2p/messenger_test.go
new file mode 100644
index 000000000..bc21d34ba
--- /dev/null
+++ b/p2p/messenger_test.go
@@ -0,0 +1,146 @@
+package p2p
+
+import (
+	// "fmt"
+	"bytes"
+	"github.com/ethereum/eth-go/ethutil"
+	"testing"
+	"time"
+)
+
+func setupMessenger(handlers Handlers) (*TestNetworkConnection, chan *PeerError, *Messenger) {
+	errchan := NewPeerErrorChannel()
+	addr := &TestAddr{"test:30303"}
+	net := NewTestNetworkConnection(addr)
+	conn := NewConnection(net, errchan)
+	mess := NewMessenger(nil, conn, errchan, handlers)
+	mess.Start()
+	return net, errchan, mess
+}
+
+type TestProtocol struct {
+	Msgs []*Msg
+}
+
+func (self *TestProtocol) Start() {
+}
+
+func (self *TestProtocol) Stop() {
+}
+
+func (self *TestProtocol) Offset() MsgCode {
+	return MsgCode(5)
+}
+
+func (self *TestProtocol) HandleIn(msg *Msg, response chan *Msg) {
+	self.Msgs = append(self.Msgs, msg)
+	close(response)
+}
+
+func (self *TestProtocol) HandleOut(msg *Msg) bool {
+	if msg.Code() > 3 {
+		return false
+	} else {
+		return true
+	}
+}
+
+func (self *TestProtocol) Name() string {
+	return "a"
+}
+
+func Packet(offset MsgCode, code MsgCode, params ...interface{}) []byte {
+	msg, _ := NewMsg(code, params...)
+	encoded := msg.Encode(offset)
+	packet := []byte{34, 64, 8, 145}
+	packet = append(packet, ethutil.NumberToBytes(uint32(len(encoded)), 32)...)
+	return append(packet, encoded...)
+}
+
+func TestRead(t *testing.T) {
+	handlers := make(Handlers)
+	testProtocol := &TestProtocol{Msgs: []*Msg{}}
+	handlers["a"] = func(p *Peer) Protocol { return testProtocol }
+	net, _, mess := setupMessenger(handlers)
+	mess.AddProtocols([]string{"a"})
+	defer mess.Stop()
+	wait := 1 * time.Millisecond
+	packet := Packet(16, 1, uint32(1), "000")
+	go net.In(0, packet)
+	time.Sleep(wait)
+	if len(testProtocol.Msgs) != 1 {
+		t.Errorf("msg not relayed to correct protocol")
+	} else {
+		if testProtocol.Msgs[0].Code() != 1 {
+			t.Errorf("incorrect msg code relayed to protocol")
+		}
+	}
+}
+
+func TestWrite(t *testing.T) {
+	handlers := make(Handlers)
+	testProtocol := &TestProtocol{Msgs: []*Msg{}}
+	handlers["a"] = func(p *Peer) Protocol { return testProtocol }
+	net, _, mess := setupMessenger(handlers)
+	mess.AddProtocols([]string{"a"})
+	defer mess.Stop()
+	wait := 1 * time.Millisecond
+	msg, _ := NewMsg(3, uint32(1), "000")
+	err := mess.Write("b", msg)
+	if err == nil {
+		t.Errorf("expect error for unknown protocol")
+	}
+	err = mess.Write("a", msg)
+	if err != nil {
+		t.Errorf("expect no error for known protocol: %v", err)
+	} else {
+		time.Sleep(wait)
+		if len(net.Out) != 1 {
+			t.Errorf("msg not written")
+		} else {
+			out := net.Out[0]
+			packet := Packet(16, 3, uint32(1), "000")
+			if bytes.Compare(out, packet) != 0 {
+				t.Errorf("incorrect packet %v", out)
+			}
+		}
+	}
+}
+
+func TestPulse(t *testing.T) {
+	net, _, mess := setupMessenger(make(Handlers))
+	defer mess.Stop()
+	ping := false
+	timeout := false
+	pingTimeout := 10 * time.Millisecond
+	gracePeriod := 200 * time.Millisecond
+	go mess.PingPong(pingTimeout, gracePeriod, func() { ping = true }, func() { timeout = true })
+	net.In(0, Packet(0, 1))
+	if ping {
+		t.Errorf("ping sent too early")
+	}
+	time.Sleep(pingTimeout + 100*time.Millisecond)
+	if !ping {
+		t.Errorf("no ping sent after timeout")
+	}
+	if timeout {
+		t.Errorf("timeout too early")
+	}
+	ping = false
+	net.In(0, Packet(0, 1))
+	time.Sleep(pingTimeout + 100*time.Millisecond)
+	if !ping {
+		t.Errorf("no ping sent after timeout")
+	}
+	if timeout {
+		t.Errorf("timeout too early")
+	}
+	ping = false
+	time.Sleep(gracePeriod)
+	if ping {
+		t.Errorf("ping called twice")
+	}
+	if !timeout {
+		t.Errorf("no timeout after grace period")
+	}
+}
diff --git a/p2p/natpmp.go b/p2p/natpmp.go
new file mode 100644
index 000000000..ff966d070
--- /dev/null
+++ b/p2p/natpmp.go
@@ -0,0 +1,55 @@
+package p2p
+
+import (
+	"fmt"
+	"net"
+
+	natpmp "github.com/jackpal/go-nat-pmp"
+)
+
+// Adapt the NAT-PMP protocol to the NAT interface
+
+// TODO:
+//  + Register for changes to the external address.
+//  + Re-register port mapping when router reboots.
+//  + A mechanism for keeping a port mapping registered.
+
+type natPMPClient struct {
+	client *natpmp.Client
+}
+
+func NewNatPMP(gateway net.IP) (nat NAT) {
+	return &natPMPClient{natpmp.NewClient(gateway)}
+}
+
+func (n *natPMPClient) GetExternalAddress() (addr net.IP, err error) {
+	response, err := n.client.GetExternalAddress()
+	if err != nil {
+		return
+	}
+	ip := response.ExternalIPAddress
+	addr = net.IPv4(ip[0], ip[1], ip[2], ip[3])
+	return
+}
+
+func (n *natPMPClient) AddPortMapping(protocol string, externalPort, internalPort int,
+	description string, timeout int) (mappedExternalPort int, err error) {
+	if timeout <= 0 {
+		err = fmt.Errorf("timeout must not be <= 0")
+		return
+	}
+	// Note order of port arguments is switched between our AddPortMapping and the client's AddPortMapping.
+	response, err := n.client.AddPortMapping(protocol, internalPort, externalPort, timeout)
+	if err != nil {
+		return
+	}
+	mappedExternalPort = int(response.MappedExternalPort)
+	return
+}
+
+func (n *natPMPClient) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {
+	// To destroy a mapping, send an add-port with
+	// an internalPort of the internal port to destroy, an external port of zero and a time of zero.
+	_, err = n.client.AddPortMapping(protocol, internalPort, 0, 0)
+	return
+}
diff --git a/p2p/natupnp.go b/p2p/natupnp.go
new file mode 100644
index 000000000..fa9798d4d
--- /dev/null
+++ b/p2p/natupnp.go
@@ -0,0 +1,335 @@
+package p2p
+
+// Just enough UPnP to be able to forward ports
+//
+
+import (
+	"bytes"
+	"encoding/xml"
+	"errors"
+	"net"
+	"net/http"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+)
+
+type upnpNAT struct {
+	serviceURL string
+	ourIP      string
+}
+
+func upnpDiscover(attempts int) (nat NAT, err error) {
+	ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900")
+	if err != nil {
+		return
+	}
+	conn, err := net.ListenPacket("udp4", ":0")
+	if err != nil {
+		return
+	}
+	socket := conn.(*net.UDPConn)
+	defer socket.Close()
+
+	err = socket.SetDeadline(time.Now().Add(10 * time.Second))
+	if err != nil {
+		return
+	}
+
+	st := "ST: urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n"
+	buf := bytes.NewBufferString(
+		"M-SEARCH * HTTP/1.1\r\n" +
+			"HOST: 239.255.255.250:1900\r\n" +
+			st +
+			"MAN: \"ssdp:discover\"\r\n" +
+			"MX: 2\r\n\r\n")
+	message := buf.Bytes()
+	answerBytes := make([]byte, 1024)
+	for i := 0; i < attempts; i++ {
+		_, err = socket.WriteToUDP(message, ssdp)
+		if err != nil {
+			return
+		}
+		var n int
+		n, _, err = socket.ReadFromUDP(answerBytes)
+		if err != nil {
+			continue
+			// socket.Close()
+			// return
+		}
+		answer := string(answerBytes[0:n])
+		if strings.Index(answer, "\r\n"+st) < 0 {
+			continue
+		}
+		// HTTP header field names are case-insensitive.
+		// http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
+		locString := "\r\nlocation: "
+		answer = strings.ToLower(answer)
+		locIndex := strings.Index(answer, locString)
+		if locIndex < 0 {
+			continue
+		}
+		loc := answer[locIndex+len(locString):]
+		endIndex := strings.Index(loc, "\r\n")
+		if endIndex < 0 {
+			continue
+		}
+		locURL := loc[0:endIndex]
+		var serviceURL string
+		serviceURL, err = getServiceURL(locURL)
+		if err != nil {
+			return
+		}
+		var ourIP string
+		ourIP, err = getOurIP()
+		if err != nil {
+			return
+		}
+		nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP}
+		return
+	}
+	err = errors.New("UPnP port discovery failed.")
+	return
+}
+
+// service represents the Service type in an UPnP xml description.
+// Only the parts we care about are present and thus the xml may have more
+// fields than present in the structure.
+type service struct {
+	ServiceType string `xml:"serviceType"`
+	ControlURL  string `xml:"controlURL"`
+}
+
+// deviceList represents the deviceList type in an UPnP xml description.
+// Only the parts we care about are present and thus the xml may have more
+// fields than present in the structure.
+type deviceList struct {
+	XMLName xml.Name `xml:"deviceList"`
+	Device  []device `xml:"device"`
+}
+
+// serviceList represents the serviceList type in an UPnP xml description.
+// Only the parts we care about are present and thus the xml may have more
+// fields than present in the structure.
+type serviceList struct {
+	XMLName xml.Name  `xml:"serviceList"`
+	Service []service `xml:"service"`
+}
+
+// device represents the device type in an UPnP xml description.
+// Only the parts we care about are present and thus the xml may have more
+// fields than present in the structure.
+type device struct {
+	XMLName     xml.Name    `xml:"device"`
+	DeviceType  string      `xml:"deviceType"`
+	DeviceList  deviceList  `xml:"deviceList"`
+	ServiceList serviceList `xml:"serviceList"`
+}
+
+// specVersion represents the specVersion in a UPnP xml description.
+// Only the parts we care about are present and thus the xml may have more
+// fields than present in the structure.
+type specVersion struct {
+	XMLName xml.Name `xml:"specVersion"`
+	Major   int      `xml:"major"`
+	Minor   int      `xml:"minor"`
+}
+
+// root represents the Root document for a UPnP xml description.
+// Only the parts we care about are present and thus the xml may have more
+// fields than present in the structure.
+type root struct {
+	XMLName     xml.Name `xml:"root"`
+	SpecVersion specVersion
+	Device      device
+}
+
+func getChildDevice(d *device, deviceType string) *device {
+	dl := d.DeviceList.Device
+	for i := 0; i < len(dl); i++ {
+		if dl[i].DeviceType == deviceType {
+			return &dl[i]
+		}
+	}
+	return nil
+}
+
+func getChildService(d *device, serviceType string) *service {
+	sl := d.ServiceList.Service
+	for i := 0; i < len(sl); i++ {
+		if sl[i].ServiceType == serviceType {
+			return &sl[i]
+		}
+	}
+	return nil
+}
+
+func getOurIP() (ip string, err error) {
+	hostname, err := os.Hostname()
+	if err != nil {
+		return
+	}
+	p, err := net.LookupIP(hostname)
+	if err != nil && len(p) > 0 {
+		return
+	}
+	return p[0].String(), nil
+}
+
+func getServiceURL(rootURL string) (url string, err error) {
+	r, err := http.Get(rootURL)
+	if err != nil {
+		return
+	}
+	defer r.Body.Close()
+	if r.StatusCode >= 400 {
+		err = errors.New(string(r.StatusCode))
+		return
+	}
+	var root root
+	err = xml.NewDecoder(r.Body).Decode(&root)
+
+	if err != nil {
+		return
+	}
+	a := &root.Device
+	if a.DeviceType != "urn:schemas-upnp-org:device:InternetGatewayDevice:1" {
+		err = errors.New("No InternetGatewayDevice")
+		return
+	}
+	b := getChildDevice(a, "urn:schemas-upnp-org:device:WANDevice:1")
+	if b == nil {
+		err = errors.New("No WANDevice")
+		return
+	}
+	c := getChildDevice(b, "urn:schemas-upnp-org:device:WANConnectionDevice:1")
+	if c == nil {
+		err = errors.New("No WANConnectionDevice")
+		return
+	}
+	d := getChildService(c, "urn:schemas-upnp-org:service:WANIPConnection:1")
+	if d == nil {
+		err = errors.New("No WANIPConnection")
+		return
+	}
+	url = combineURL(rootURL, d.ControlURL)
+	return
+}
+
+func combineURL(rootURL, subURL string) string {
+	protocolEnd := "://"
+	protoEndIndex := strings.Index(rootURL, protocolEnd)
+	a := rootURL[protoEndIndex+len(protocolEnd):]
+	rootIndex := strings.Index(a, "/")
+	return rootURL[0:protoEndIndex+len(protocolEnd)+rootIndex] + subURL
+}
+
+func soapRequest(url, function, message string) (r *http.Response, err error) {
+	fullMessage := "<?xml version=\"1.0\" ?>" +
+		"<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\r\n" +
+		"<s:Body>" + message + "</s:Body></s:Envelope>"
+
+	req, err := http.NewRequest("POST", url, strings.NewReader(fullMessage))
+	if err != nil {
+		return
+	}
+	req.Header.Set("Content-Type", "text/xml ; charset=\"utf-8\"")
+	req.Header.Set("User-Agent", "Darwin/10.0.0, UPnP/1.0, MiniUPnPc/1.3")
+	//req.Header.Set("Transfer-Encoding", "chunked")
+	req.Header.Set("SOAPAction", "\"urn:schemas-upnp-org:service:WANIPConnection:1#"+function+"\"")
+	req.Header.Set("Connection", "Close")
+	req.Header.Set("Cache-Control", "no-cache")
+	req.Header.Set("Pragma", "no-cache")
+
+	r, err = http.DefaultClient.Do(req)
+	if err != nil {
+		return
+	}
+
+	if r.Body != nil {
+		defer r.Body.Close()
+	}
+
+	if r.StatusCode >= 400 {
+		// log.Stderr(function, r.StatusCode)
+		err = errors.New("Error " + strconv.Itoa(r.StatusCode) + " for " + function)
+		r = nil
+		return
+	}
+	return
+}
+
+type statusInfo struct {
+	externalIpAddress string
+}
+
+func (n *upnpNAT) getStatusInfo() (info statusInfo, err error) {
+
+	message := "<u:GetStatusInfo xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" +
+		"</u:GetStatusInfo>"
+
+	var response *http.Response
+	response, err = soapRequest(n.serviceURL, "GetStatusInfo", message)
+	if err != nil {
+		return
+	}
+
+	// TODO: Write a soap reply parser. It has to eat the Body and envelope tags...
+
+	response.Body.Close()
+	return
+}
+
+func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) {
+	info, err := n.getStatusInfo()
+	if err != nil {
+		return
+	}
+	addr = net.ParseIP(info.externalIpAddress)
+	return
+}
+
+func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) {
+	// A single concatenation would break ARM compilation.
+	message := "<u:AddPortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" +
+		"<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort)
+	message += "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>"
+	message += "<NewInternalPort>" + strconv.Itoa(internalPort) + "</NewInternalPort>" +
+		"<NewInternalClient>" + n.ourIP + "</NewInternalClient>" +
+		"<NewEnabled>1</NewEnabled><NewPortMappingDescription>"
+	message += description +
+		"</NewPortMappingDescription><NewLeaseDuration>" + strconv.Itoa(timeout) +
+		"</NewLeaseDuration></u:AddPortMapping>"
+
+	var response *http.Response
+	response, err = soapRequest(n.serviceURL, "AddPortMapping", message)
+	if err != nil {
+		return
+	}
+
+	// TODO: check response to see if the port was forwarded
+	// log.Println(message, response)
+	mappedExternalPort = externalPort
+	_ = response
+	return
+}
+
+func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {
+
+	message := "<u:DeletePortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" +
+		"<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) +
+		"</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" +
+		"</u:DeletePortMapping>"
+
+	var response *http.Response
+	response, err = soapRequest(n.serviceURL, "DeletePortMapping", message)
+	if err != nil {
+		return
+	}
+
+	// TODO: check response to see if the port was deleted
+	// log.Println(message, response)
+	_ = response
+	return
+}
diff --git a/p2p/network.go b/p2p/network.go
new file mode 100644
index 000000000..820cef1a9
--- /dev/null
+++ b/p2p/network.go
@@ -0,0 +1,196 @@
+package p2p
+
+import (
+	"fmt"
+	"math/rand"
+	"net"
+	"strconv"
+	"time"
+)
+
+const (
+	DialerTimeout             = 180 //seconds
+	KeepAlivePeriod           = 60  //minutes
+	portMappingUpdateInterval = 900 // seconds = 15 mins
+	upnpDiscoverAttempts      = 3
+)
+
+// Dialer is not an interface in net, so we define one
+// *net.Dialer conforms to this
+type Dialer interface {
+	Dial(network, address string) (net.Conn, error)
+}
+
+type Network interface {
+	Start() error
+	Listener(net.Addr) (net.Listener, error)
+	Dialer(net.Addr) (Dialer, error)
+	NewAddr(string, int) (addr net.Addr, err error)
+	ParseAddr(string) (addr net.Addr, err error)
+}
+
+type NAT interface {
+	GetExternalAddress() (addr net.IP, err error)
+	AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error)
+	DeletePortMapping(protocol string, externalPort, internalPort int) (err error)
+}
+
+type TCPNetwork struct {
+	nat     NAT
+	natType NATType
+	quit    chan chan bool
+	ports   chan string
+}
+
+type NATType int
+
+const (
+	NONE = iota
+	UPNP
+	PMP
+)
+
+const (
+	portMappingTimeout = 1200 // 20 mins
+)
+
+func NewTCPNetwork(natType NATType) (net *TCPNetwork) {
+	return &TCPNetwork{
+		natType: natType,
+		ports:   make(chan string),
+	}
+}
+
+func (self *TCPNetwork) Dialer(addr net.Addr) (Dialer, error) {
+	return &net.Dialer{
+		Timeout: DialerTimeout * time.Second,
+		// KeepAlive: KeepAlivePeriod * time.Minute,
+		LocalAddr: addr,
+	}, nil
+}
+
+func (self *TCPNetwork) Listener(addr net.Addr) (net.Listener, error) {
+	if self.natType == UPNP {
+		_, port, _ := net.SplitHostPort(addr.String())
+		if self.quit == nil {
+			self.quit = make(chan chan bool)
+			go self.updatePortMappings()
+		}
+		self.ports <- port
+	}
+	return net.Listen(addr.Network(), addr.String())
+}
+
+func (self *TCPNetwork) Start() (err error) {
+	switch self.natType {
+	case NONE:
+	case UPNP:
+		nat, uerr := upnpDiscover(upnpDiscoverAttempts)
+		if uerr != nil {
+			err = fmt.Errorf("UPNP failed: ", uerr)
+		} else {
+			self.nat = nat
+		}
+	case PMP:
+		err = fmt.Errorf("PMP not implemented")
+	default:
+		err = fmt.Errorf("Invalid NAT type: %v", self.natType)
+	}
+	return
+}
+
+func (self *TCPNetwork) Stop() {
+	q := make(chan bool)
+	self.quit <- q
+	<-q
+}
+
+func (self *TCPNetwork) addPortMapping(lport int) (err error) {
+	_, err = self.nat.AddPortMapping("TCP", lport, lport, "p2p listen port", portMappingTimeout)
+	if err != nil {
+		logger.Errorf("unable to add port mapping on %v: %v", lport, err)
+	} else {
+		logger.Debugf("succesfully added port mapping on %v", lport)
+	}
+	return
+}
+
+func (self *TCPNetwork) updatePortMappings() {
+	timer := time.NewTimer(portMappingUpdateInterval * time.Second)
+	lports := []int{}
+out:
+	for {
+		select {
+		case port := <-self.ports:
+			int64lport, _ := strconv.ParseInt(port, 10, 16)
+			lport := int(int64lport)
+			if err := self.addPortMapping(lport); err != nil {
+				lports = append(lports, lport)
+			}
+		case <-timer.C:
+			for lport := range lports {
+				if err := self.addPortMapping(lport); err != nil {
+				}
+			}
+		case errc := <-self.quit:
+			errc <- true
+			break out
+		}
+	}
+
+	timer.Stop()
+	for lport := range lports {
+		if err := self.nat.DeletePortMapping("TCP", lport, lport); err != nil {
+			logger.Debugf("unable to remove port mapping on %v: %v", lport, err)
+		} else {
+			logger.Debugf("succesfully removed port mapping on %v", lport)
+		}
+	}
+}
+
+func (self *TCPNetwork) NewAddr(host string, port int) (net.Addr, error) {
+	ip, err := self.lookupIP(host)
+	if err == nil {
+		return &net.TCPAddr{
+			IP:   ip,
+			Port: port,
+		}, nil
+	}
+	return nil, err
+}
+
+func (self *TCPNetwork) ParseAddr(address string) (net.Addr, error) {
+	host, port, err := net.SplitHostPort(address)
+	if err == nil {
+		iport, _ := strconv.Atoi(port)
+		addr, e := self.NewAddr(host, iport)
+		return addr, e
+	}
+	return nil, err
+}
+
+func (*TCPNetwork) lookupIP(host string) (ip net.IP, err error) {
+	if ip = net.ParseIP(host); ip != nil {
+		return
+	}
+
+	var ips []net.IP
+	ips, err = net.LookupIP(host)
+	if err != nil {
+		logger.Warnln(err)
+		return
+	}
+	if len(ips) == 0 {
+		err = fmt.Errorf("No IP addresses available for %v", host)
+		logger.Warnln(err)
+		return
+	}
+	if len(ips) > 1 {
+		// Pick a random IP address, simulating round-robin DNS.
+		rand.Seed(time.Now().UTC().UnixNano())
+		ip = ips[rand.Intn(len(ips))]
+	} else {
+		ip = ips[0]
+	}
+	return
+}
diff --git a/p2p/peer.go b/p2p/peer.go
new file mode 100644
index 000000000..f4b68a007
--- /dev/null
+++ b/p2p/peer.go
@@ -0,0 +1,83 @@
+package p2p
+
+import (
+	"fmt"
+	"net"
+	"strconv"
+)
+
+type Peer struct {
+	// quit      chan chan bool
+	Inbound          bool // inbound (via listener) or outbound (via dialout)
+	Address          net.Addr
+	Host             []byte
+	Port             uint16
+	Pubkey           []byte
+	Id               string
+	Caps             []string
+	peerErrorChan    chan *PeerError
+	messenger        *Messenger
+	peerErrorHandler *PeerErrorHandler
+	server           *Server
+}
+
+func (self *Peer) Messenger() *Messenger {
+	return self.messenger
+}
+
+func (self *Peer) PeerErrorChan() chan *PeerError {
+	return self.peerErrorChan
+}
+
+func (self *Peer) Server() *Server {
+	return self.server
+}
+
+func NewPeer(conn net.Conn, address net.Addr, inbound bool, server *Server) *Peer {
+	peerErrorChan := NewPeerErrorChannel()
+	host, port, _ := net.SplitHostPort(address.String())
+	intport, _ := strconv.Atoi(port)
+	peer := &Peer{
+		Inbound:       inbound,
+		Address:       address,
+		Port:          uint16(intport),
+		Host:          net.ParseIP(host),
+		peerErrorChan: peerErrorChan,
+		server:        server,
+	}
+	connection := NewConnection(conn, peerErrorChan)
+	peer.messenger = NewMessenger(peer, connection, peerErrorChan, server.Handlers())
+	peer.peerErrorHandler = NewPeerErrorHandler(address, server.PeerDisconnect(), peerErrorChan, server.Blacklist())
+	return peer
+}
+
+func (self *Peer) String() string {
+	var kind string
+	if self.Inbound {
+		kind = "inbound"
+	} else {
+		kind = "outbound"
+	}
+	return fmt.Sprintf("%v:%v (%s) v%v %v", self.Host, self.Port, kind, self.Id, self.Caps)
+}
+
+func (self *Peer) Write(protocol string, msg *Msg) error {
+	return self.messenger.Write(protocol, msg)
+}
+
+func (self *Peer) Start() {
+	self.peerErrorHandler.Start()
+	self.messenger.Start()
+}
+
+func (self *Peer) Stop() {
+	self.peerErrorHandler.Stop()
+	self.messenger.Stop()
+	// q := make(chan bool)
+	// self.quit <- q
+	// <-q
+}
+
+func (p *Peer) Encode() []interface{} {
+	return []interface{}{p.Host, p.Port, p.Pubkey}
+}
diff --git a/p2p/peer_error.go b/p2p/peer_error.go
new file mode 100644
index 000000000..de921878a
--- /dev/null
+++ b/p2p/peer_error.go
@@ -0,0 +1,76 @@
+package p2p
+
+import (
+	"fmt"
+)
+
+type ErrorCode int
+
+const errorChanCapacity = 10
+
+const (
+	PacketTooShort = iota
+	PayloadTooShort
+	MagicTokenMismatch
+	EmptyPayload
+	ReadError
+	WriteError
+	MiscError
+	InvalidMsgCode
+	InvalidMsg
+	P2PVersionMismatch
+	PubkeyMissing
+	PubkeyInvalid
+	PubkeyForbidden
+	ProtocolBreach
+	PortMismatch
+	PingTimeout
+	InvalidGenesis
+	InvalidNetworkId
+	InvalidProtocolVersion
+)
+
+var errorToString = map[ErrorCode]string{
+	PacketTooShort:         "Packet too short",
+	PayloadTooShort:        "Payload too short",
+	MagicTokenMismatch:     "Magic token mismatch",
+	EmptyPayload:           "Empty payload",
+	ReadError:              "Read error",
+	WriteError:             "Write error",
+	MiscError:              "Misc error",
+	InvalidMsgCode:         "Invalid message code",
+	InvalidMsg:             "Invalid message",
+	P2PVersionMismatch:     "P2P Version Mismatch",
+	PubkeyMissing:          "Public key missing",
+	PubkeyInvalid:          "Public key invalid",
+	PubkeyForbidden:        "Public key forbidden",
+	ProtocolBreach:         "Protocol Breach",
+	PortMismatch:           "Port mismatch",
+	PingTimeout:            "Ping timeout",
+	InvalidGenesis:         "Invalid genesis block",
+	InvalidNetworkId:       "Invalid network id",
+	InvalidProtocolVersion: "Invalid protocol version",
+}
+
+type PeerError struct {
+	Code    ErrorCode
+	message string
+}
+
+func NewPeerError(code ErrorCode, format string, v ...interface{}) *PeerError {
+	desc, ok := errorToString[code]
+	if !ok {
+		panic("invalid error code")
+	}
+	format = desc + ": " + format
+	message := fmt.Sprintf(format, v...)
+	return &PeerError{code, message}
+}
+
+func (self *PeerError) Error() string {
+	return self.message
+}
+
+func NewPeerErrorChannel() chan *PeerError {
+	return make(chan *PeerError, errorChanCapacity)
+}
diff --git a/p2p/peer_error_handler.go b/p2p/peer_error_handler.go
new file mode 100644
index 000000000..ca6cae4db
--- /dev/null
+++ b/p2p/peer_error_handler.go
@@ -0,0 +1,101 @@
+package p2p
+
+import (
+	"net"
+)
+
+const (
+	severityThreshold = 10
+)
+
+type DisconnectRequest struct {
+	addr   net.Addr
+	reason DiscReason
+}
+
+type PeerErrorHandler struct {
+	quit           chan chan bool
+	address        net.Addr
+	peerDisconnect chan DisconnectRequest
+	severity       int
+	peerErrorChan  chan *PeerError
+	blacklist      Blacklist
+}
+
+func NewPeerErrorHandler(address net.Addr, peerDisconnect chan DisconnectRequest, peerErrorChan chan *PeerError, blacklist Blacklist) *PeerErrorHandler {
+	return &PeerErrorHandler{
+		quit:           make(chan chan bool),
+		address:        address,
+		peerDisconnect: peerDisconnect,
+		peerErrorChan:  peerErrorChan,
+		blacklist:      blacklist,
+	}
+}
+
+func (self *PeerErrorHandler) Start() {
+	go self.listen()
+}
+
+func (self *PeerErrorHandler) Stop() {
+	q := make(chan bool)
+	self.quit <- q
+	<-q
+}
+
+func (self *PeerErrorHandler) listen() {
+	for {
+		select {
+		case peerError, ok := <-self.peerErrorChan:
+			if ok {
+				logger.Debugf("error %v\n", peerError)
+				go self.handle(peerError)
+			} else {
+				return
+			}
+		case q := <-self.quit:
+			q <- true
+			return
+		}
+	}
+}
+
+func (self *PeerErrorHandler) handle(peerError *PeerError) {
+	reason := DiscReason(' ')
+	switch peerError.Code {
+	case P2PVersionMismatch:
+		reason = DiscIncompatibleVersion
+	case PubkeyMissing, PubkeyInvalid:
+		reason = DiscInvalidIdentity
+	case PubkeyForbidden:
+		reason = DiscUselessPeer
+	case InvalidMsgCode, PacketTooShort, PayloadTooShort, MagicTokenMismatch, EmptyPayload, ProtocolBreach:
+		reason = DiscProtocolError
+	case PingTimeout:
+		reason = DiscReadTimeout
+	case WriteError, MiscError:
+		reason = DiscNetworkError
+	case InvalidGenesis, InvalidNetworkId, InvalidProtocolVersion:
+		reason = DiscSubprotocolError
+	default:
+		self.severity += self.getSeverity(peerError)
+	}
+
+	if self.severity >= severityThreshold {
+		reason = DiscSubprotocolError
+	}
+	if reason != DiscReason(' ') {
+		self.peerDisconnect <- DisconnectRequest{
+			addr:   self.address,
+			reason: reason,
+		}
+	}
+}
+
+func (self *PeerErrorHandler) getSeverity(peerError *PeerError) int {
+	switch peerError.Code {
+	case ReadError:
+		return 4 //tolerate 3 :)
+	default:
+		return 1
+	}
+}
diff --git a/p2p/peer_error_handler_test.go b/p2p/peer_error_handler_test.go
new file mode 100644
index 000000000..790a7443b
--- /dev/null
+++ b/p2p/peer_error_handler_test.go
@@ -0,0 +1,34 @@
+package p2p
+
+import (
+	// "fmt"
+	"net"
+	"testing"
+	"time"
+)
+
+func TestPeerErrorHandler(t *testing.T) {
+	address := &net.TCPAddr{IP: net.IP([]byte{1, 2, 3, 4}), Port: 30303}
+	peerDisconnect := make(chan DisconnectRequest)
+	peerErrorChan := NewPeerErrorChannel()
+	peh := NewPeerErrorHandler(address, peerDisconnect, peerErrorChan, NewBlacklist())
+	peh.Start()
+	defer peh.Stop()
+	for i := 0; i < 11; i++ {
+		select {
+		case <-peerDisconnect:
+			t.Errorf("expected no disconnect request")
+		default:
+		}
+		peerErrorChan <- NewPeerError(MiscError, "")
+	}
+	time.Sleep(1 * time.Millisecond)
+	select {
+	case request := <-peerDisconnect:
+		if request.addr.String() != address.String() {
+			t.Errorf("incorrect address %v != %v", request.addr, address)
+		}
+	default:
+		t.Errorf("expected disconnect request")
+	}
+}
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
new file mode 100644
index 000000000..c37540bef
--- /dev/null
+++ b/p2p/peer_test.go
@@ -0,0 +1,96 @@
+package p2p
+
+import (
+	"bytes"
+	"fmt"
+	// "net"
+	"testing"
+	"time"
+)
+
+func TestPeer(t *testing.T) {
+	handlers := make(Handlers)
+	testProtocol := &TestProtocol{Msgs: []*Msg{}}
+	handlers["aaa"] = func(p *Peer) Protocol { return testProtocol }
+	handlers["ccc"] = func(p *Peer) Protocol { return testProtocol }
+	addr := &TestAddr{"test:30"}
+	conn := NewTestNetworkConnection(addr)
+	_, server := SetupTestServer(handlers)
+	server.Handshake()
+	peer := NewPeer(conn, addr, true, server)
+	// peer.Messenger().AddProtocols([]string{"aaa", "ccc"})
+	peer.Start()
+	defer peer.Stop()
+	time.Sleep(2 * time.Millisecond)
+	if len(conn.Out) != 1 {
+		t.Errorf("handshake not sent")
+	} else {
+		out := conn.Out[0]
+		packet := Packet(0, HandshakeMsg, P2PVersion, []byte(peer.server.identity.String()), []interface{}{peer.server.protocols}, peer.server.port, peer.server.identity.Pubkey()[1:])
+		if bytes.Compare(out, packet) != 0 {
+			t.Errorf("incorrect handshake packet %v != %v", out, packet)
+		}
+	}
+
+	packet := Packet(0, HandshakeMsg, P2PVersion, []byte("peer"), []interface{}{"bbb", "aaa", "ccc"}, 30, []byte("0000000000000000000000000000000000000000000000000000000000000000"))
+	conn.In(0, packet)
+	time.Sleep(10 * time.Millisecond)
+
+	pro, _ := peer.Messenger().protocols[0].(*BaseProtocol)
+	if pro.state != handshakeReceived {
+		t.Errorf("handshake not received")
+	}
+	if peer.Port != 30 {
+		t.Errorf("port incorrectly set")
+	}
+	if peer.Id != "peer" {
+		t.Errorf("id incorrectly set")
+	}
+	if string(peer.Pubkey) != "0000000000000000000000000000000000000000000000000000000000000000" {
+		t.Errorf("pubkey incorrectly set")
+	}
+	fmt.Println(peer.Caps)
+	if len(peer.Caps) != 3 || peer.Caps[0] != "aaa" || peer.Caps[1] != "bbb" || peer.Caps[2] != "ccc" {
+		t.Errorf("protocols incorrectly set")
+	}
+
+	msg, _ := NewMsg(3)
+	err := peer.Write("aaa", msg)
+	if err != nil {
+		t.Errorf("expect no error for known protocol: %v", err)
+	} else {
+		time.Sleep(1 * time.Millisecond)
+		if len(conn.Out) != 2 {
+			t.Errorf("msg not written")
+		} else {
+			out := conn.Out[1]
+			packet := Packet(16, 3)
+			if bytes.Compare(out, packet) != 0 {
+				t.Errorf("incorrect packet %v != %v", out, packet)
+			}
+		}
+	}
+
+	msg, _ = NewMsg(2)
+	err = peer.Write("ccc", msg)
+	if err != nil {
+		t.Errorf("expect no error for known protocol: %v", err)
+	} else {
+		time.Sleep(1 * time.Millisecond)
+		if len(conn.Out) != 3 {
+			t.Errorf("msg not written")
+		} else {
+			out := conn.Out[2]
+			packet := Packet(21, 2)
+			if bytes.Compare(out, packet) != 0 {
+				t.Errorf("incorrect packet %v != %v", out, packet)
+			}
+		}
+	}
+
+	err = peer.Write("bbb", msg)
+	time.Sleep(1 * time.Millisecond)
+	if err == nil {
+		t.Errorf("expect error for unknown protocol")
+	}
+}
diff --git a/p2p/protocol.go b/p2p/protocol.go
new file mode 100644
index 000000000..5d05ced7d
--- /dev/null
+++ b/p2p/protocol.go
@@ -0,0 +1,278 @@
+package p2p
+
+import (
+	"bytes"
+	"fmt"
+	"net"
+	"sort"
+	"sync"
+	"time"
+)
+
+type Protocol interface {
+	Start()
+	Stop()
+	HandleIn(*Msg, chan *Msg)
+	HandleOut(*Msg) bool
+	Offset() MsgCode
+	Name() string
+}
+
+const (
+	P2PVersion      = 0
+	pingTimeout     = 2
+	pingGracePeriod = 2
+)
+
+const (
+	HandshakeMsg = iota
+	DiscMsg
+	PingMsg
+	PongMsg
+	GetPeersMsg
+	PeersMsg
+	offset = 16
+)
+
+type ProtocolState uint8
+
+const (
+	nullState = iota
+	handshakeReceived
+)
+
+type DiscReason byte
+
+const (
+	// Values are given explicitly instead of by iota because these values are
+	// defined by the wire protocol spec; it is easier for humans to ensure
+	// correctness when values are explicit.
+	DiscRequested           = 0x00
+	DiscNetworkError        = 0x01
+	DiscProtocolError       = 0x02
+	DiscUselessPeer         = 0x03
+	DiscTooManyPeers        = 0x04
+	DiscAlreadyConnected    = 0x05
+	DiscIncompatibleVersion = 0x06
+	DiscInvalidIdentity     = 0x07
+	DiscQuitting            = 0x08
+	DiscUnexpectedIdentity  = 0x09
+	DiscSelf                = 0x0a
+	DiscReadTimeout         = 0x0b
+	DiscSubprotocolError    = 0x10
+)
+
+var discReasonToString = map[DiscReason]string{
+	DiscRequested:           "Disconnect requested",
+	DiscNetworkError:        "Network error",
+	DiscProtocolError:       "Breach of protocol",
+	DiscUselessPeer:         "Useless peer",
+	DiscTooManyPeers:        "Too many peers",
+	DiscAlreadyConnected:    "Already connected",
+	DiscIncompatibleVersion: "Incompatible P2P protocol version",
+	DiscInvalidIdentity:     "Invalid node identity",
+	DiscQuitting:            "Client quitting",
+	DiscUnexpectedIdentity:  "Unexpected identity",
+	DiscSelf:                "Connected to self",
+	DiscReadTimeout:         "Read timeout",
+	DiscSubprotocolError:    "Subprotocol error",
+}
+
+func (d DiscReason) String() string {
+	if len(discReasonToString) < int(d) {
+		return "Unknown"
+	}
+
+	return discReasonToString[d]
+}
+
+type BaseProtocol struct {
+	peer      *Peer
+	state     ProtocolState
+	stateLock sync.RWMutex
+}
+
+func NewBaseProtocol(peer *Peer) *BaseProtocol {
+	self := &BaseProtocol{
+		peer: peer,
+	}
+
+	return self
+}
+
+func (self *BaseProtocol) Start() {
+	if self.peer != nil {
+		self.peer.Write("", self.peer.Server().Handshake())
+		go self.peer.Messenger().PingPong(
+			pingTimeout*time.Second,
+			pingGracePeriod*time.Second,
+			self.Ping,
+			self.Timeout,
+		)
+	}
+}
+
+func (self *BaseProtocol) Stop() {
+}
+
+func (self *BaseProtocol) Ping() {
+	msg, _ := NewMsg(PingMsg)
+	self.peer.Write("", msg)
+}
+
+func (self *BaseProtocol) Timeout() {
+	self.peerError(PingTimeout, "")
+}
+
+func (self *BaseProtocol) Name() string {
+	return ""
+}
+
+func (self *BaseProtocol) Offset() MsgCode {
+	return offset
+}
+
+func (self *BaseProtocol) CheckState(state ProtocolState) bool {
+	self.stateLock.RLock()
+	self.stateLock.RUnlock()
+	if self.state != state {
+		return false
+	} else {
+		return true
+	}
+}
+
+func (self *BaseProtocol) HandleIn(msg *Msg, response chan *Msg) {
+	if msg.Code() == HandshakeMsg {
+		self.handleHandshake(msg)
+	} else {
+		if !self.CheckState(handshakeReceived) {
+			self.peerError(ProtocolBreach, "message code %v not allowed", msg.Code())
+			close(response)
+			return
+		}
+		switch msg.Code() {
+		case DiscMsg:
+			logger.Infof("Disconnect requested from peer %v, reason", DiscReason(msg.Data().Get(0).Uint()))
+			self.peer.Server().PeerDisconnect() <- DisconnectRequest{
+				addr:   self.peer.Address,
+				reason: DiscRequested,
+			}
+		case PingMsg:
+			out, _ := NewMsg(PongMsg)
+			response <- out
+		case PongMsg:
+		case GetPeersMsg:
+			// Peer asked for list of connected peers
+			if out, err := self.peer.Server().PeersMessage(); err != nil {
+				response <- out
+			}
+		case PeersMsg:
+			self.handlePeers(msg)
+		default:
+			self.peerError(InvalidMsgCode, "unknown message code %v", msg.Code())
+		}
+	}
+	close(response)
+}
+
+func (self *BaseProtocol) HandleOut(msg *Msg) (allowed bool) {
+	// somewhat overly paranoid
+	allowed = msg.Code() == HandshakeMsg || msg.Code() == DiscMsg || msg.Code() < self.Offset() && self.CheckState(handshakeReceived)
+	return
+}
+
+func (self *BaseProtocol) peerError(errorCode ErrorCode, format string, v ...interface{}) {
+	err := NewPeerError(errorCode, format, v...)
+	logger.Warnln(err)
+	fmt.Println(self.peer, err)
+	if self.peer != nil {
+		self.peer.PeerErrorChan() <- err
+	}
+}
+
+func (self *BaseProtocol) handlePeers(msg *Msg) {
+	it := msg.Data().NewIterator()
+	for it.Next() {
+		ip := net.IP(it.Value().Get(0).Bytes())
+		port := it.Value().Get(1).Uint()
+		address := &net.TCPAddr{IP: ip, Port: int(port)}
+		go self.peer.Server().PeerConnect(address)
+	}
+}
+
+func (self *BaseProtocol) handleHandshake(msg *Msg) {
+	self.stateLock.Lock()
+	defer self.stateLock.Unlock()
+	if self.state != nullState {
+		self.peerError(ProtocolBreach, "extra handshake")
+		return
+	}
+
+	c := msg.Data()
+
+	var (
+		p2pVersion = c.Get(0).Uint()
+		id         = c.Get(1).Str()
+		caps       = c.Get(2)
+		port       = c.Get(3).Uint()
+		pubkey     = c.Get(4).Bytes()
+	)
+	fmt.Printf("handshake received %v, %v, %v, %v, %v ", p2pVersion, id, caps, port, pubkey)
+
+	// Check correctness of p2p protocol version
+	if p2pVersion != P2PVersion {
+		self.peerError(P2PVersionMismatch, "Require protocol %d, received %d\n", P2PVersion, p2pVersion)
+		return
+	}
+
+	// Handle the pub key (validation, uniqueness)
+	if len(pubkey) == 0 {
+		self.peerError(PubkeyMissing, "not supplied in handshake.")
+		return
+	}
+
+	if len(pubkey) != 64 {
+		self.peerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
+		return
+	}
+
+	// Self connect detection
+	if bytes.Compare(self.peer.Server().ClientIdentity().Pubkey()[1:], pubkey) == 0 {
+		self.peerError(PubkeyForbidden, "not allowed to connect to self")
+		return
+	}
+
+	// register pubkey on server. this also sets the pubkey on the peer (need lock)
+	if err := self.peer.Server().RegisterPubkey(self.peer, pubkey); err != nil {
+		self.peerError(PubkeyForbidden, err.Error())
+		return
+	}
+
+	// check port
+	if self.peer.Inbound {
+		uint16port := uint16(port)
+		if self.peer.Port > 0 && self.peer.Port != uint16port {
+			self.peerError(PortMismatch, "port mismatch: %v != %v", self.peer.Port, port)
+			return
+		} else {
+			self.peer.Port = uint16port
+		}
+	}
+
+	capsIt := caps.NewIterator()
+	for capsIt.Next() {
+		cap := capsIt.Value().Str()
+		self.peer.Caps = append(self.peer.Caps, cap)
+	}
+	sort.Strings(self.peer.Caps)
+	self.peer.Messenger().AddProtocols(self.peer.Caps)
+
+	self.peer.Id = id
+
+	self.state = handshakeReceived
+
+	//p.ethereum.PushPeer(p)
+	// p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
+	return
+}
diff --git a/p2p/server.go b/p2p/server.go
new file mode 100644
index 000000000..a6bbd9260
--- /dev/null
+++ b/p2p/server.go
@@ -0,0 +1,484 @@
+package p2p
+
+import (
+	"bytes"
+	"fmt"
+	"net"
+	"sort"
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/ethereum/eth-go/ethlog"
+)
+
+const (
+	outboundAddressPoolSize = 10
+	disconnectGracePeriod   = 2
+)
+
+type Blacklist interface {
+	Get([]byte) (bool, error)
+	Put([]byte) error
+	Delete([]byte) error
+	Exists(pubkey []byte) (ok bool)
+}
+
+type BlacklistMap struct {
+	blacklist map[string]bool
+	lock      sync.RWMutex
+}
+
+func NewBlacklist() *BlacklistMap {
+	return &BlacklistMap{
+		blacklist: make(map[string]bool),
+	}
+}
+
+func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
+	self.lock.RLock()
+	defer self.lock.RUnlock()
+	v, ok := self.blacklist[string(pubkey)]
+	var err error
+	if !ok {
+		err = fmt.Errorf("not found")
+	}
+	return v, err
+}
+
+func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
+	self.lock.RLock()
+	defer self.lock.RUnlock()
+	_, ok = self.blacklist[string(pubkey)]
+	return
+}
+
+func (self *BlacklistMap) Put(pubkey []byte) error {
+	self.lock.RLock()
+	defer self.lock.RUnlock()
+	self.blacklist[string(pubkey)] = true
+	return nil
+}
+
+func (self *BlacklistMap) Delete(pubkey []byte) error {
+	self.lock.RLock()
+	defer self.lock.RUnlock()
+	delete(self.blacklist, string(pubkey))
+	return nil
+}
+
+type Server struct {
+	network   Network
+	listening bool //needed?
+	dialing   bool //needed?
+	closed    bool
+	identity  ClientIdentity
+	addr      net.Addr
+	port      uint16
+	protocols []string
+
+	quit      chan chan bool
+	peersLock sync.RWMutex
+
+	maxPeers   int
+	peers      []*Peer
+	peerSlots  chan int
+	peersTable map[string]int
+	peersMsg   *Msg
+	peerCount  int
+
+	peerConnect    chan net.Addr
+	peerDisconnect chan DisconnectRequest
+	blacklist      Blacklist
+	handlers       Handlers
+}
+
+var logger = ethlog.NewLogger("P2P")
+
+func New(network Network, addr net.Addr, identity ClientIdentity, handlers Handlers, maxPeers int, blacklist Blacklist) *Server {
+	// get alphabetical list of protocol names from handlers map
+	protocols := []string{}
+	for protocol := range handlers {
+		protocols = append(protocols, protocol)
+	}
+	sort.Strings(protocols)
+
+	_, port, _ := net.SplitHostPort(addr.String())
+	intport, _ := strconv.Atoi(port)
+
+	self := &Server{
+		// NewSimpleClientIdentity(clientIdentifier, version, customIdentifier)
+		network:   network,
+		identity:  identity,
+		addr:      addr,
+		port:      uint16(intport),
+		protocols: protocols,
+
+		quit: make(chan chan bool),
+
+		maxPeers:   maxPeers,
+		peers:      make([]*Peer, maxPeers),
+		peerSlots:  make(chan int, maxPeers),
+		peersTable: make(map[string]int),
+
+		peerConnect:    make(chan net.Addr, outboundAddressPoolSize),
+		peerDisconnect: make(chan DisconnectRequest),
+		blacklist:      blacklist,
+
+		handlers: handlers,
+	}
+	for i := 0; i < maxPeers; i++ {
+		self.peerSlots <- i // fill up with indexes
+	}
+	return self
+}
+
+func (self *Server) NewAddr(host string, port int) (addr net.Addr, err error) {
+	addr, err = self.network.NewAddr(host, port)
+	return
+}
+
+func (self *Server) ParseAddr(address string) (addr net.Addr, err error) {
+	addr, err = self.network.ParseAddr(address)
+	return
+}
+
+func (self *Server) ClientIdentity() ClientIdentity {
+	return self.identity
+}
+
+func (self *Server) PeersMessage() (msg *Msg, err error) {
+	// TODO: memoize and reset when peers change
+	self.peersLock.RLock()
+	defer self.peersLock.RUnlock()
+	msg = self.peersMsg
+	if msg == nil {
+		var peerData []interface{}
+		for _, i := range self.peersTable {
+			peer := self.peers[i]
+			peerData = append(peerData, peer.Encode())
+		}
+		if len(peerData) == 0 {
+			err = fmt.Errorf("no peers")
+		} else {
+			msg, err = NewMsg(PeersMsg, peerData...)
+			self.peersMsg = msg //memoize
+		}
+	}
+	return
+}
+
+func (self *Server) Peers() (peers []*Peer) {
+	self.peersLock.RLock()
+	defer self.peersLock.RUnlock()
+	for _, peer := range self.peers {
+		if peer != nil {
+			peers = append(peers, peer)
+		}
+	}
+	return
+}
+
+func (self *Server) PeerCount() int {
+	self.peersLock.RLock()
+	defer self.peersLock.RUnlock()
+	return self.peerCount
+}
+
+var getPeersMsg, _ = NewMsg(GetPeersMsg)
+
+func (self *Server) PeerConnect(addr net.Addr) {
+	// TODO: should buffer, filter and uniq
+	// send GetPeersMsg if not blocking
+	select {
+	case self.peerConnect <- addr: // not enough peers
+		self.Broadcast("", getPeersMsg)
+	default: // we dont care
+	}
+}
+
+func (self *Server) PeerDisconnect() chan DisconnectRequest {
+	return self.peerDisconnect
+}
+
+func (self *Server) Blacklist() Blacklist {
+	return self.blacklist
+}
+
+func (self *Server) Handlers() Handlers {
+	return self.handlers
+}
+
+func (self *Server) Broadcast(protocol string, msg *Msg) {
+	self.peersLock.RLock()
+	defer self.peersLock.RUnlock()
+	for _, peer := range self.peers {
+		if peer != nil {
+			peer.Write(protocol, msg)
+		}
+	}
+}
+
+// Start the server
+func (self *Server) Start(listen bool, dial bool) {
+	self.network.Start()
+	if listen {
+		listener, err := self.network.Listener(self.addr)
+		if err != nil {
+			logger.Warnf("Error initializing listener: %v", err)
+			logger.Warnf("Connection listening disabled")
+			self.listening = false
+		} else {
+			self.listening = true
+			logger.Infoln("Listen on %v: ready and accepting connections", listener.Addr())
+			go self.inboundPeerHandler(listener)
+		}
+	}
+	if dial {
+		dialer, err := self.network.Dialer(self.addr)
+		if err != nil {
+			logger.Warnf("Error initializing dialer: %v", err)
+			logger.Warnf("Connection dialout disabled")
+			self.dialing = false
+		} else {
+			self.dialing = true
+			logger.Infoln("Dial peers watching outbound address pool")
+			go self.outboundPeerHandler(dialer)
+		}
+	}
+	logger.Infoln("server started")
+}
+
+func (self *Server) Stop() {
+	logger.Infoln("server stopping...")
+	// // quit one loop if dialing
+	if self.dialing {
+		logger.Infoln("stop dialout...")
+		dialq := make(chan bool)
+		self.quit <- dialq
+		<-dialq
+		fmt.Println("quit another")
+	}
+	// quit the other loop if listening
+	if self.listening {
+		logger.Infoln("stop listening...")
+		listenq := make(chan bool)
+		self.quit <- listenq
+		<-listenq
+		fmt.Println("quit one")
+	}
+
+	fmt.Println("quit waited")
+
+	logger.Infoln("stopping peers...")
+	peers := []net.Addr{}
+	self.peersLock.RLock()
+	self.closed = true
+	for _, peer := range self.peers {
+		if peer != nil {
+			peers = append(peers, peer.Address)
+		}
+	}
+	self.peersLock.RUnlock()
+	for _, address := range peers {
+		go self.removePeer(DisconnectRequest{
+			addr:   address,
+			reason: DiscQuitting,
+		})
+	}
+	// wait till they actually disconnect
+	// this is checked by draining the peerSlots (slots are released back if a peer is removed)
+	i := 0
+	fmt.Println("draining peers")
+
+FOR:
+	for {
+		select {
+		case slot := <-self.peerSlots:
+			i++
+			fmt.Printf("%v: found slot %v", i, slot)
+			if i == self.maxPeers {
+				break FOR
+			}
+		}
+	}
+	logger.Infoln("server stopped")
+}
+
+// main loop for adding connections via listening
+func (self *Server) inboundPeerHandler(listener net.Listener) {
+	for {
+		select {
+		case slot := <-self.peerSlots:
+			go self.connectInboundPeer(listener, slot)
+		case errc := <-self.quit:
+			listener.Close()
+			fmt.Println("quit listenloop")
+			errc <- true
+			return
+		}
+	}
+}
+
+// main loop for adding outbound peers based on peerConnect address pool
+// this same loop handles peer disconnect requests as well
+func (self *Server) outboundPeerHandler(dialer Dialer) {
+	// addressChan initially set to nil (only watches peerConnect if we need more peers)
+	var addressChan chan net.Addr
+	slots := self.peerSlots
+	var slot *int
+	for {
+		select {
+		case i := <-slots:
+			// we need a peer in slot i, slot reserved
+			slot = &i
+			// now we can watch for candidate peers in the next loop
+			addressChan = self.peerConnect
+			// do not consume more until candidate peer is found
+			slots = nil
+		case address := <-addressChan:
+			// candidate peer found, will dial out asyncronously
+			// if connection fails slot will be released
+			go self.connectOutboundPeer(dialer, address, *slot)
+			// we can watch if more peers needed in the next loop
+			slots = self.peerSlots
+			// until then we dont care about candidate peers
+			addressChan = nil
+		case request := <-self.peerDisconnect:
+			go self.removePeer(request)
+		case errc := <-self.quit:
+			if addressChan != nil && slot != nil {
+				self.peerSlots <- *slot
+			}
+			fmt.Println("quit dialloop")
+			errc <- true
+			return
+		}
+	}
+}
+
+// check if peer address already connected
+func (self *Server) connected(address net.Addr) (err error) {
+	self.peersLock.RLock()
+	defer self.peersLock.RUnlock()
+	// fmt.Printf("address: %v\n", address)
+	slot, found := self.peersTable[address.String()]
+	if found {
+		err = fmt.Errorf("already connected as peer %v (%v)", slot, address)
+	}
+	return
+}
+
+// connect to peer via listener.Accept()
+func (self *Server) connectInboundPeer(listener net.Listener, slot int) {
+	var address net.Addr
+	conn, err := listener.Accept()
+	if err == nil {
+		address = conn.RemoteAddr()
+		err = self.connected(address)
+		if err != nil {
+			conn.Close()
+		}
+	}
+	if err != nil {
+		logger.Debugln(err)
+		self.peerSlots <- slot
+	} else {
+		fmt.Printf("adding %v\n", address)
+		go self.addPeer(conn, address, true, slot)
+	}
+}
+
+// connect to peer via dial out
+func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) {
+	var conn net.Conn
+	err := self.connected(address)
+	if err == nil {
+		conn, err = dialer.Dial(address.Network(), address.String())
+	}
+	if err != nil {
+		logger.Debugln(err)
+		self.peerSlots <- slot
+	} else {
+		go self.addPeer(conn, address, false, slot)
+	}
+}
+
+// creates the new peer object and inserts it into its slot
+func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) {
+	self.peersLock.Lock()
+	defer self.peersLock.Unlock()
+	if self.closed {
+		fmt.Println("oopsy, not no longer need peer")
+		conn.Close()           //oopsy our bad
+		self.peerSlots <- slot // release slot
+	} else {
+		peer := NewPeer(conn, address, inbound, self)
+		self.peers[slot] = peer
+		self.peersTable[address.String()] = slot
+		self.peerCount++
+		// reset peersmsg
+		self.peersMsg = nil
+		fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot)
+		peer.Start()
+	}
+}
+
+// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
+func (self *Server) removePeer(request DisconnectRequest) {
+	self.peersLock.Lock()
+
+	address := request.addr
+	slot := self.peersTable[address.String()]
+	peer := self.peers[slot]
+	fmt.Printf("removing peer %v %v (slot %v)\n", address, peer, slot)
+	if peer == nil {
+		logger.Debugf("already removed peer on %v", address)
+		self.peersLock.Unlock()
+		return
+	}
+	// remove from list and index
+	self.peerCount--
+	self.peers[slot] = nil
+	delete(self.peersTable, address.String())
+	// reset peersmsg
+	self.peersMsg = nil
+	fmt.Printf("removed peer %v (slot %v)\n", peer, slot)
+	self.peersLock.Unlock()
+
+	// sending disconnect message
+	disconnectMsg, _ := NewMsg(DiscMsg, request.reason)
+	peer.Write("", disconnectMsg)
+	// be nice and wait
+	time.Sleep(disconnectGracePeriod * time.Second)
+	// switch off peer and close connections etc.
+	fmt.Println("stopping peer")
+	peer.Stop()
+	fmt.Println("stopped peer")
+	// release slot to signal need for a new peer, last!
+	self.peerSlots <- slot
+}
+
+// fix handshake message to push to peers
+func (self *Server) Handshake() *Msg {
+	fmt.Println(self.identity.Pubkey()[1:])
+	msg, _ := NewMsg(HandshakeMsg, P2PVersion, []byte(self.identity.String()), []interface{}{self.protocols}, self.port, self.identity.Pubkey()[1:])
+	return msg
+}
+
+func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error {
+	// Check for blacklisting
+	if self.blacklist.Exists(pubkey) {
+		return fmt.Errorf("blacklisted")
+	}
+
+	self.peersLock.RLock()
+	defer self.peersLock.RUnlock()
+	for _, peer := range self.peers {
+		if peer != nil && peer != candidate && bytes.Compare(peer.Pubkey, pubkey) == 0 {
+			return fmt.Errorf("already connected")
+		}
+	}
+	candidate.Pubkey = pubkey
+	return nil
+}
diff --git a/p2p/server_test.go b/p2p/server_test.go
new file mode 100644
index 000000000..f749cc490
--- /dev/null
+++ b/p2p/server_test.go
@@ -0,0 +1,208 @@
+package p2p
+
+import (
+	"bytes"
+	"fmt"
+	"net"
+	"testing"
+	"time"
+)
+
+type TestNetwork struct {
+	connections map[string]*TestNetworkConnection
+	dialer      Dialer
+	maxinbound  int
+}
+
+func NewTestNetwork(maxinbound int) *TestNetwork {
+	connections := make(map[string]*TestNetworkConnection)
+	return &TestNetwork{
+		connections: connections,
+		dialer:      &TestDialer{connections},
+		maxinbound:  maxinbound,
+	}
+}
+
+func (self *TestNetwork) Dialer(addr net.Addr) (Dialer, error) {
+	return self.dialer, nil
+}
+
+func (self *TestNetwork) Listener(addr net.Addr) (net.Listener, error) {
+	return &TestListener{
+		connections: self.connections,
+		addr:        addr,
+		max:         self.maxinbound,
+	}, nil
+}
+
+func (self *TestNetwork) Start() error {
+	return nil
+}
+
+func (self *TestNetwork) NewAddr(string, int) (addr net.Addr, err error) {
+	return
+}
+
+func (self *TestNetwork) ParseAddr(string) (addr net.Addr, err error) {
+	return
+}
+
+type TestAddr struct {
+	name string
+}
+
+func (self *TestAddr) String() string {
+	return self.name
+}
+
+func (*TestAddr) Network() string {
+	return "test"
+}
+
+type TestDialer struct {
+	connections map[string]*TestNetworkConnection
+}
+
+func (self *TestDialer) Dial(network string, addr string) (conn net.Conn, err error) {
+	address := &TestAddr{addr}
+	tconn := NewTestNetworkConnection(address)
+	self.connections[addr] = tconn
+	conn = net.Conn(tconn)
+	return
+}
+
+type TestListener struct {
+	connections map[string]*TestNetworkConnection
+	addr        net.Addr
+	max         int
+	i           int
+}
+
+func (self *TestListener) Accept() (conn net.Conn, err error) {
+	self.i++
+	if self.i > self.max {
+		err = fmt.Errorf("no more")
+	} else {
+		addr := &TestAddr{fmt.Sprintf("inboundpeer-%d", self.i)}
+		tconn := NewTestNetworkConnection(addr)
+		key := tconn.RemoteAddr().String()
+		self.connections[key] = tconn
+		conn = net.Conn(tconn)
+		fmt.Printf("accepted connection from: %v \n", addr)
+	}
+	return
+}
+
+func (self *TestListener) Close() error {
+	return nil
+}
+
+func (self *TestListener) Addr() net.Addr {
+	return self.addr
+}
+
+func SetupTestServer(handlers Handlers) (network *TestNetwork, server *Server) {
+	network = NewTestNetwork(1)
+	addr := &TestAddr{"test:30303"}
+	identity := NewSimpleClientIdentity("clientIdentifier", "version", "customIdentifier", "pubkey")
+	maxPeers := 2
+	if handlers == nil {
+		handlers = make(Handlers)
+	}
+	blackist := NewBlacklist()
+	server = New(network, addr, identity, handlers, maxPeers, blackist)
+	fmt.Println(server.identity.Pubkey())
+	return
+}
+
+func TestServerListener(t *testing.T) {
+	network, server := SetupTestServer(nil)
+	server.Start(true, false)
+	time.Sleep(10 * time.Millisecond)
+	server.Stop()
+	peer1, ok := network.connections["inboundpeer-1"]
+	if !ok {
+		t.Error("not found inbound peer 1")
+	} else {
+		fmt.Printf("out: %v\n", peer1.Out)
+		if len(peer1.Out) != 2 {
+			t.Errorf("not enough messages sent to peer 1: %v ", len(peer1.Out))
+		}
+	}
+
+}
+
+func TestServerDialer(t *testing.T) {
+	network, server := SetupTestServer(nil)
+	server.Start(false, true)
+	server.peerConnect <- &TestAddr{"outboundpeer-1"}
+	time.Sleep(10 * time.Millisecond)
+	server.Stop()
+	peer1, ok := network.connections["outboundpeer-1"]
+	if !ok {
+		t.Error("not found outbound peer 1")
+	} else {
+		fmt.Printf("out: %v\n", peer1.Out)
+		if len(peer1.Out) != 2 {
+			t.Errorf("not enough messages sent to peer 1: %v ", len(peer1.Out))
+		}
+	}
+}
+
+func TestServerBroadcast(t *testing.T) {
+	handlers := make(Handlers)
+	testProtocol := &TestProtocol{Msgs: []*Msg{}}
+	handlers["aaa"] = func(p *Peer) Protocol { return testProtocol }
+	network, server := SetupTestServer(handlers)
+	server.Start(true, true)
+	server.peerConnect <- &TestAddr{"outboundpeer-1"}
+	time.Sleep(10 * time.Millisecond)
+	msg, _ := NewMsg(0)
+	server.Broadcast("", msg)
+	packet := Packet(0, 0)
+	time.Sleep(10 * time.Millisecond)
+	server.Stop()
+	peer1, ok := network.connections["outboundpeer-1"]
+	if !ok {
+		t.Error("not found outbound peer 1")
+	} else {
+		fmt.Printf("out: %v\n", peer1.Out)
+		if len(peer1.Out) != 3 {
+			t.Errorf("not enough messages sent to peer 1: %v ", len(peer1.Out))
+		} else {
+			if bytes.Compare(peer1.Out[1], packet) != 0 {
+				t.Errorf("incorrect broadcast packet %v != %v", peer1.Out[1], packet)
+			}
+		}
+	}
+	peer2, ok := network.connections["inboundpeer-1"]
+	if !ok {
+		t.Error("not found inbound peer 2")
+	} else {
+		fmt.Printf("out: %v\n", peer2.Out)
+		if len(peer1.Out) != 3 {
+			t.Errorf("not enough messages sent to peer 2: %v ", len(peer2.Out))
+		} else {
+			if bytes.Compare(peer2.Out[1], packet) != 0 {
+				t.Errorf("incorrect broadcast packet %v != %v", peer2.Out[1], packet)
+			}
+		}
+	}
+}
+
+func TestServerPeersMessage(t *testing.T) {
+	handlers := make(Handlers)
+	_, server := SetupTestServer(handlers)
+	server.Start(true, true)
+	defer server.Stop()
+	server.peerConnect <- &TestAddr{"outboundpeer-1"}
+	time.Sleep(10 * time.Millisecond)
+	peersMsg, err := server.PeersMessage()
+	fmt.Println(peersMsg)
+	if err != nil {
+		t.Errorf("expect no error, got %v", err)
+	}
+	if c := server.PeerCount(); c != 2 {
+		t.Errorf("expect 2 peers, got %v", c)
+	}
+}
-- 
GitLab