diff --git a/cmd/devp2p/main.go b/cmd/devp2p/main.go
index 6064d17c7070ca51146be98aee3f2642ad969adb..9eebd9b1375baa94844eb231ea2cb378647e23d9 100644
--- a/cmd/devp2p/main.go
+++ b/cmd/devp2p/main.go
@@ -63,6 +63,7 @@ func init() {
 		discv5Command,
 		dnsCommand,
 		nodesetCommand,
+		rlpxCommand,
 	}
 }
 
diff --git a/cmd/devp2p/rlpxcmd.go b/cmd/devp2p/rlpxcmd.go
new file mode 100644
index 0000000000000000000000000000000000000000..14eb5989d1e38893c948afb83e3dc3d6ccbb3af0
--- /dev/null
+++ b/cmd/devp2p/rlpxcmd.go
@@ -0,0 +1,94 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
+package main
+
+import (
+	"fmt"
+	"net"
+
+	"github.com/ethereum/go-ethereum/common/hexutil"
+	"github.com/ethereum/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/rlpx"
+	"github.com/ethereum/go-ethereum/rlp"
+	"gopkg.in/urfave/cli.v1"
+)
+
+var (
+	rlpxCommand = cli.Command{
+		Name:  "rlpx",
+		Usage: "RLPx Commands",
+		Subcommands: []cli.Command{
+			rlpxPingCommand,
+		},
+	}
+	rlpxPingCommand = cli.Command{
+		Name:      "ping",
+		Usage:     "Perform a RLPx handshake",
+		ArgsUsage: "<node>",
+		Action:    rlpxPing,
+	}
+)
+
+func rlpxPing(ctx *cli.Context) error {
+	n := getNodeArg(ctx)
+
+	fd, err := net.Dial("tcp", fmt.Sprintf("%v:%d", n.IP(), n.TCP()))
+	if err != nil {
+		return err
+	}
+	conn := rlpx.NewConn(fd, n.Pubkey())
+
+	ourKey, _ := crypto.GenerateKey()
+	_, err = conn.Handshake(ourKey)
+	if err != nil {
+		return err
+	}
+
+	code, data, _, err := conn.Read()
+	if err != nil {
+		return err
+	}
+	switch code {
+	case 0:
+		var h devp2pHandshake
+		if err := rlp.DecodeBytes(data, &h); err != nil {
+			return fmt.Errorf("invalid handshake: %v", err)
+		}
+		fmt.Printf("%+v\n", h)
+	case 1:
+		var msg []p2p.DiscReason
+		if rlp.DecodeBytes(data, &msg); len(msg) == 0 {
+			return fmt.Errorf("invalid disconnect message")
+		}
+		return fmt.Errorf("received disconnect message: %v", msg[0])
+	default:
+		return fmt.Errorf("invalid message code %d, expected handshake (code zero)", code)
+	}
+	return nil
+}
+
+// devp2pHandshake is the RLP structure of the devp2p protocol handshake.
+type devp2pHandshake struct {
+	Version    uint64
+	Name       string
+	Caps       []p2p.Cap
+	ListenPort uint64
+	ID         hexutil.Bytes // secp256k1 public key
+	// Ignore additional fields (for forward compatibility).
+	Rest []rlp.RawValue `rlp:"tail"`
+}
diff --git a/p2p/message_test.go b/p2p/message_test.go
index a01f7555616217b9266d87441995c5b281cd20ec..e575c5d96e66a7393b050ad9ca6b89373c3c5b84 100644
--- a/p2p/message_test.go
+++ b/p2p/message_test.go
@@ -18,11 +18,9 @@ package p2p
 
 import (
 	"bytes"
-	"encoding/hex"
 	"fmt"
 	"io"
 	"runtime"
-	"strings"
 	"testing"
 	"time"
 )
@@ -141,12 +139,3 @@ func TestEOFSignal(t *testing.T) {
 	default:
 	}
 }
-
-func unhex(str string) []byte {
-	r := strings.NewReplacer("\t", "", " ", "", "\n", "")
-	b, err := hex.DecodeString(r.Replace(str))
-	if err != nil {
-		panic(fmt.Sprintf("invalid hex string: %q", str))
-	}
-	return b
-}
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
index e40deb98f018fc18e7864d9046864441c0576bab..4308bbd2eb4b7251f8da9e09f0161588fb1aa147 100644
--- a/p2p/peer_test.go
+++ b/p2p/peer_test.go
@@ -86,9 +86,15 @@ func newNode(id enode.ID, addr string) *enode.Node {
 }
 
 func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) {
-	fd1, fd2 := net.Pipe()
-	c1 := &conn{fd: fd1, node: newNode(randomID(), ""), transport: newTestTransport(&newkey().PublicKey, fd1)}
-	c2 := &conn{fd: fd2, node: newNode(randomID(), ""), transport: newTestTransport(&newkey().PublicKey, fd2)}
+	var (
+		fd1, fd2   = net.Pipe()
+		key1, key2 = newkey(), newkey()
+		t1         = newTestTransport(&key2.PublicKey, fd1, nil)
+		t2         = newTestTransport(&key1.PublicKey, fd2, &key1.PublicKey)
+	)
+
+	c1 := &conn{fd: fd1, node: newNode(uintID(1), ""), transport: t1}
+	c2 := &conn{fd: fd2, node: newNode(uintID(2), ""), transport: t2}
 	for _, p := range protos {
 		c1.caps = append(c1.caps, p.cap())
 		c2.caps = append(c2.caps, p.cap())
@@ -173,9 +179,12 @@ func TestPeerPing(t *testing.T) {
 	}
 }
 
+// This test checks that a disconnect message sent by a peer is returned
+// as the error from Peer.run.
 func TestPeerDisconnect(t *testing.T) {
 	closer, rw, _, disc := testPeer(nil)
 	defer closer()
+
 	if err := SendItems(rw, discMsg, DiscQuitting); err != nil {
 		t.Fatal(err)
 	}
diff --git a/p2p/rlpx.go b/p2p/rlpx/rlpx.go
similarity index 63%
rename from p2p/rlpx.go
rename to p2p/rlpx/rlpx.go
index 4d903a08a0f64467c7433b7b37dcc46991518741..2021bf08be11cc62692cca07327b865b03df4a82 100644
--- a/p2p/rlpx.go
+++ b/p2p/rlpx/rlpx.go
@@ -14,7 +14,8 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-package p2p
+// Package rlpx implements the RLPx transport protocol.
+package rlpx
 
 import (
 	"bytes"
@@ -29,169 +30,312 @@ import (
 	"fmt"
 	"hash"
 	"io"
-	"io/ioutil"
 	mrand "math/rand"
 	"net"
-	"sync"
 	"time"
 
-	"github.com/ethereum/go-ethereum/common/bitutil"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/crypto/ecies"
-	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/golang/snappy"
 	"golang.org/x/crypto/sha3"
 )
 
-const (
-	maxUint24 = ^uint32(0) >> 8
-
-	sskLen = 16                     // ecies.MaxSharedKeyLength(pubKey) / 2
-	sigLen = crypto.SignatureLength // elliptic S256
-	pubLen = 64                     // 512 bit pubkey in uncompressed representation without format byte
-	shaLen = 32                     // hash length (for nonce etc)
-
-	authMsgLen  = sigLen + shaLen + pubLen + shaLen + 1
-	authRespLen = pubLen + shaLen + 1
-
-	eciesOverhead = 65 /* pubkey */ + 16 /* IV */ + 32 /* MAC */
-
-	encAuthMsgLen  = authMsgLen + eciesOverhead  // size of encrypted pre-EIP-8 initiator handshake
-	encAuthRespLen = authRespLen + eciesOverhead // size of encrypted pre-EIP-8 handshake reply
+// Conn is an RLPx network connection. It wraps a low-level network connection. The
+// underlying connection should not be used for other activity when it is wrapped by Conn.
+//
+// Before sending messages, a handshake must be performed by calling the Handshake method.
+// This type is not generally safe for concurrent use, but reading and writing of messages
+// may happen concurrently after the handshake.
+type Conn struct {
+	dialDest  *ecdsa.PublicKey
+	conn      net.Conn
+	handshake *handshakeState
+	snappy    bool
+}
 
-	// total timeout for encryption handshake and protocol
-	// handshake in both directions.
-	handshakeTimeout = 5 * time.Second
+type handshakeState struct {
+	enc cipher.Stream
+	dec cipher.Stream
 
-	// This is the timeout for sending the disconnect reason.
-	// This is shorter than the usual timeout because we don't want
-	// to wait if the connection is known to be bad anyway.
-	discWriteTimeout = 1 * time.Second
-)
-
-// errPlainMessageTooLarge is returned if a decompressed message length exceeds
-// the allowed 24 bits (i.e. length >= 16MB).
-var errPlainMessageTooLarge = errors.New("message length >= 16MB")
+	macCipher  cipher.Block
+	egressMAC  hash.Hash
+	ingressMAC hash.Hash
+}
 
-// rlpx is the transport protocol used by actual (non-test) connections.
-// It wraps the frame encoder with locks and read/write deadlines.
-type rlpx struct {
-	fd net.Conn
+// NewConn wraps the given network connection. If dialDest is non-nil, the connection
+// behaves as the initiator during the handshake.
+func NewConn(conn net.Conn, dialDest *ecdsa.PublicKey) *Conn {
+	return &Conn{
+		dialDest: dialDest,
+		conn:     conn,
+	}
+}
 
-	rmu, wmu sync.Mutex
-	rw       *rlpxFrameRW
+// SetSnappy enables or disables snappy compression of messages. This is usually called
+// after the devp2p Hello message exchange when the negotiated version indicates that
+// compression is available on both ends of the connection.
+func (c *Conn) SetSnappy(snappy bool) {
+	c.snappy = snappy
 }
 
-func newRLPX(fd net.Conn) transport {
-	fd.SetDeadline(time.Now().Add(handshakeTimeout))
-	return &rlpx{fd: fd}
+// SetReadDeadline sets the deadline for all future read operations.
+func (c *Conn) SetReadDeadline(time time.Time) error {
+	return c.conn.SetReadDeadline(time)
 }
 
-func (t *rlpx) ReadMsg() (Msg, error) {
-	t.rmu.Lock()
-	defer t.rmu.Unlock()
-	t.fd.SetReadDeadline(time.Now().Add(frameReadTimeout))
-	return t.rw.ReadMsg()
+// SetWriteDeadline sets the deadline for all future write operations.
+func (c *Conn) SetWriteDeadline(time time.Time) error {
+	return c.conn.SetWriteDeadline(time)
 }
 
-func (t *rlpx) WriteMsg(msg Msg) error {
-	t.wmu.Lock()
-	defer t.wmu.Unlock()
-	t.fd.SetWriteDeadline(time.Now().Add(frameWriteTimeout))
-	return t.rw.WriteMsg(msg)
+// SetDeadline sets the deadline for all future read and write operations.
+func (c *Conn) SetDeadline(time time.Time) error {
+	return c.conn.SetDeadline(time)
 }
 
-func (t *rlpx) close(err error) {
-	t.wmu.Lock()
-	defer t.wmu.Unlock()
-	// Tell the remote end why we're disconnecting if possible.
-	if t.rw != nil {
-		if r, ok := err.(DiscReason); ok && r != DiscNetworkError {
-			// rlpx tries to send DiscReason to disconnected peer
-			// if the connection is net.Pipe (in-memory simulation)
-			// it hangs forever, since net.Pipe does not implement
-			// a write deadline. Because of this only try to send
-			// the disconnect reason message if there is no error.
-			if err := t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)); err == nil {
-				SendItems(t.rw, discMsg, r)
-			}
+// Read reads a message from the connection.
+func (c *Conn) Read() (code uint64, data []byte, wireSize int, err error) {
+	if c.handshake == nil {
+		panic("can't ReadMsg before handshake")
+	}
+
+	frame, err := c.handshake.readFrame(c.conn)
+	if err != nil {
+		return 0, nil, 0, err
+	}
+	code, data, err = rlp.SplitUint64(frame)
+	if err != nil {
+		return 0, nil, 0, fmt.Errorf("invalid message code: %v", err)
+	}
+	wireSize = len(data)
+
+	// If snappy is enabled, verify and decompress message.
+	if c.snappy {
+		var actualSize int
+		actualSize, err = snappy.DecodedLen(data)
+		if err != nil {
+			return code, nil, 0, err
+		}
+		if actualSize > maxUint24 {
+			return code, nil, 0, errPlainMessageTooLarge
 		}
+		data, err = snappy.Decode(nil, data)
 	}
-	t.fd.Close()
+	return code, data, wireSize, err
 }
 
-func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
-	// Writing our handshake happens concurrently, we prefer
-	// returning the handshake read error. If the remote side
-	// disconnects us early with a valid reason, we should return it
-	// as the error so it can be tracked elsewhere.
-	werr := make(chan error, 1)
-	go func() { werr <- Send(t.rw, handshakeMsg, our) }()
-	if their, err = readProtocolHandshake(t.rw); err != nil {
-		<-werr // make sure the write terminates too
+func (h *handshakeState) readFrame(conn io.Reader) ([]byte, error) {
+	// read the header
+	headbuf := make([]byte, 32)
+	if _, err := io.ReadFull(conn, headbuf); err != nil {
 		return nil, err
 	}
-	if err := <-werr; err != nil {
-		return nil, fmt.Errorf("write error: %v", err)
+
+	// verify header mac
+	shouldMAC := updateMAC(h.ingressMAC, h.macCipher, headbuf[:16])
+	if !hmac.Equal(shouldMAC, headbuf[16:]) {
+		return nil, errors.New("bad header MAC")
 	}
-	// If the protocol version supports Snappy encoding, upgrade immediately
-	t.rw.snappy = their.Version >= snappyProtocolVersion
+	h.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
+	fsize := readInt24(headbuf)
+	// ignore protocol type for now
 
-	return their, nil
-}
+	// read the frame content
+	var rsize = fsize // frame size rounded up to 16 byte boundary
+	if padding := fsize % 16; padding > 0 {
+		rsize += 16 - padding
+	}
+	framebuf := make([]byte, rsize)
+	if _, err := io.ReadFull(conn, framebuf); err != nil {
+		return nil, err
+	}
 
-func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) {
-	msg, err := rw.ReadMsg()
-	if err != nil {
+	// read and validate frame MAC. we can re-use headbuf for that.
+	h.ingressMAC.Write(framebuf)
+	fmacseed := h.ingressMAC.Sum(nil)
+	if _, err := io.ReadFull(conn, headbuf[:16]); err != nil {
 		return nil, err
 	}
-	if msg.Size > baseProtocolMaxMsgSize {
-		return nil, fmt.Errorf("message too big")
+	shouldMAC = updateMAC(h.ingressMAC, h.macCipher, fmacseed)
+	if !hmac.Equal(shouldMAC, headbuf[:16]) {
+		return nil, errors.New("bad frame MAC")
+	}
+
+	// decrypt frame content
+	h.dec.XORKeyStream(framebuf, framebuf)
+	return framebuf[:fsize], nil
+}
+
+// Write writes a message to the connection.
+//
+// Write returns the written size of the message data. This may be less than or equal to
+// len(data) depending on whether snappy compression is enabled.
+func (c *Conn) Write(code uint64, data []byte) (uint32, error) {
+	if c.handshake == nil {
+		panic("can't WriteMsg before handshake")
 	}
-	if msg.Code == discMsg {
-		// Disconnect before protocol handshake is valid according to the
-		// spec and we send it ourself if the post-handshake checks fail.
-		// We can't return the reason directly, though, because it is echoed
-		// back otherwise. Wrap it in a string instead.
-		var reason [1]DiscReason
-		rlp.Decode(msg.Payload, &reason)
-		return nil, reason[0]
+	if len(data) > maxUint24 {
+		return 0, errPlainMessageTooLarge
 	}
-	if msg.Code != handshakeMsg {
-		return nil, fmt.Errorf("expected handshake, got %x", msg.Code)
+	if c.snappy {
+		data = snappy.Encode(nil, data)
 	}
-	var hs protoHandshake
-	if err := msg.Decode(&hs); err != nil {
-		return nil, err
+
+	wireSize := uint32(len(data))
+	err := c.handshake.writeFrame(c.conn, code, data)
+	return wireSize, err
+}
+
+func (h *handshakeState) writeFrame(conn io.Writer, code uint64, data []byte) error {
+	ptype, _ := rlp.EncodeToBytes(code)
+
+	// write header
+	headbuf := make([]byte, 32)
+	fsize := len(ptype) + len(data)
+	if fsize > maxUint24 {
+		return errPlainMessageTooLarge
+	}
+	putInt24(uint32(fsize), headbuf)
+	copy(headbuf[3:], zeroHeader)
+	h.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted
+
+	// write header MAC
+	copy(headbuf[16:], updateMAC(h.egressMAC, h.macCipher, headbuf[:16]))
+	if _, err := conn.Write(headbuf); err != nil {
+		return err
+	}
+
+	// write encrypted frame, updating the egress MAC hash with
+	// the data written to conn.
+	tee := cipher.StreamWriter{S: h.enc, W: io.MultiWriter(conn, h.egressMAC)}
+	if _, err := tee.Write(ptype); err != nil {
+		return err
+	}
+	if _, err := tee.Write(data); err != nil {
+		return err
 	}
-	if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) {
-		return nil, DiscInvalidIdentity
+	if padding := fsize % 16; padding > 0 {
+		if _, err := tee.Write(zero16[:16-padding]); err != nil {
+			return err
+		}
 	}
-	return &hs, nil
+
+	// write frame MAC. egress MAC hash is up to date because
+	// frame content was written to it as well.
+	fmacseed := h.egressMAC.Sum(nil)
+	mac := updateMAC(h.egressMAC, h.macCipher, fmacseed)
+	_, err := conn.Write(mac)
+	return err
+}
+
+func readInt24(b []byte) uint32 {
+	return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16
 }
 
-// doEncHandshake runs the protocol handshake using authenticated
-// messages. the protocol handshake is the first authenticated message
-// and also verifies whether the encryption handshake 'worked' and the
-// remote side actually provided the right public key.
-func (t *rlpx) doEncHandshake(prv *ecdsa.PrivateKey, dial *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
+func putInt24(v uint32, b []byte) {
+	b[0] = byte(v >> 16)
+	b[1] = byte(v >> 8)
+	b[2] = byte(v)
+}
+
+// updateMAC reseeds the given hash with encrypted seed.
+// it returns the first 16 bytes of the hash sum after seeding.
+func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte {
+	aesbuf := make([]byte, aes.BlockSize)
+	block.Encrypt(aesbuf, mac.Sum(nil))
+	for i := range aesbuf {
+		aesbuf[i] ^= seed[i]
+	}
+	mac.Write(aesbuf)
+	return mac.Sum(nil)[:16]
+}
+
+// Handshake performs the handshake. This must be called before any data is written
+// or read from the connection.
+func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
 	var (
-		sec secrets
+		sec Secrets
 		err error
 	)
-	if dial == nil {
-		sec, err = receiverEncHandshake(t.fd, prv)
+	if c.dialDest != nil {
+		sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)
 	} else {
-		sec, err = initiatorEncHandshake(t.fd, prv, dial)
+		sec, err = receiverEncHandshake(c.conn, prv)
 	}
 	if err != nil {
 		return nil, err
 	}
-	t.wmu.Lock()
-	t.rw = newRLPXFrameRW(t.fd, sec)
-	t.wmu.Unlock()
-	return sec.Remote.ExportECDSA(), nil
+	c.InitWithSecrets(sec)
+	return sec.remote, err
+}
+
+// InitWithSecrets injects connection secrets as if a handshake had
+// been performed. This cannot be called after the handshake.
+func (c *Conn) InitWithSecrets(sec Secrets) {
+	if c.handshake != nil {
+		panic("can't handshake twice")
+	}
+	macc, err := aes.NewCipher(sec.MAC)
+	if err != nil {
+		panic("invalid MAC secret: " + err.Error())
+	}
+	encc, err := aes.NewCipher(sec.AES)
+	if err != nil {
+		panic("invalid AES secret: " + err.Error())
+	}
+	// we use an all-zeroes IV for AES because the key used
+	// for encryption is ephemeral.
+	iv := make([]byte, encc.BlockSize())
+	c.handshake = &handshakeState{
+		enc:        cipher.NewCTR(encc, iv),
+		dec:        cipher.NewCTR(encc, iv),
+		macCipher:  macc,
+		egressMAC:  sec.EgressMAC,
+		ingressMAC: sec.IngressMAC,
+	}
+}
+
+// Close closes the underlying network connection.
+func (c *Conn) Close() error {
+	return c.conn.Close()
+}
+
+// Constants for the handshake.
+const (
+	maxUint24 = int(^uint32(0) >> 8)
+
+	sskLen = 16                     // ecies.MaxSharedKeyLength(pubKey) / 2
+	sigLen = crypto.SignatureLength // elliptic S256
+	pubLen = 64                     // 512 bit pubkey in uncompressed representation without format byte
+	shaLen = 32                     // hash length (for nonce etc)
+
+	authMsgLen  = sigLen + shaLen + pubLen + shaLen + 1
+	authRespLen = pubLen + shaLen + 1
+
+	eciesOverhead = 65 /* pubkey */ + 16 /* IV */ + 32 /* MAC */
+
+	encAuthMsgLen  = authMsgLen + eciesOverhead  // size of encrypted pre-EIP-8 initiator handshake
+	encAuthRespLen = authRespLen + eciesOverhead // size of encrypted pre-EIP-8 handshake reply
+)
+
+var (
+	// this is used in place of actual frame header data.
+	// TODO: replace this when Msg contains the protocol type code.
+	zeroHeader = []byte{0xC2, 0x80, 0x80}
+	// sixteen zero bytes
+	zero16 = make([]byte, 16)
+
+	// errPlainMessageTooLarge is returned if a decompressed message length exceeds
+	// the allowed 24 bits (i.e. length >= 16MB).
+	errPlainMessageTooLarge = errors.New("message length >= 16MB")
+)
+
+// Secrets represents the connection secrets which are negotiated during the handshake.
+type Secrets struct {
+	AES, MAC              []byte
+	EgressMAC, IngressMAC hash.Hash
+	remote                *ecdsa.PublicKey
 }
 
 // encHandshake contains the state of the encryption handshake.
@@ -203,15 +347,6 @@ type encHandshake struct {
 	remoteRandomPub      *ecies.PublicKey  // ecdhe-random-pubk
 }
 
-// secrets represents the connection secrets
-// which are negotiated during the encryption handshake.
-type secrets struct {
-	Remote                *ecies.PublicKey
-	AES, MAC              []byte
-	EgressMAC, IngressMAC hash.Hash
-	Token                 []byte
-}
-
 // RLPx v4 handshake auth (defined in EIP-8).
 type authMsgV4 struct {
 	gotPlain bool // whether read packet had plain format.
@@ -235,19 +370,85 @@ type authRespV4 struct {
 	Rest []rlp.RawValue `rlp:"tail"`
 }
 
+// receiverEncHandshake negotiates a session token on conn.
+// it should be called on the listening side of the connection.
+//
+// prv is the local client's private key.
+func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
+	authMsg := new(authMsgV4)
+	authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
+	if err != nil {
+		return s, err
+	}
+	h := new(encHandshake)
+	if err := h.handleAuthMsg(authMsg, prv); err != nil {
+		return s, err
+	}
+
+	authRespMsg, err := h.makeAuthResp()
+	if err != nil {
+		return s, err
+	}
+	var authRespPacket []byte
+	if authMsg.gotPlain {
+		authRespPacket, err = authRespMsg.sealPlain(h)
+	} else {
+		authRespPacket, err = sealEIP8(authRespMsg, h)
+	}
+	if err != nil {
+		return s, err
+	}
+	if _, err = conn.Write(authRespPacket); err != nil {
+		return s, err
+	}
+	return h.secrets(authPacket, authRespPacket)
+}
+
+func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error {
+	// Import the remote identity.
+	rpub, err := importPublicKey(msg.InitiatorPubkey[:])
+	if err != nil {
+		return err
+	}
+	h.initNonce = msg.Nonce[:]
+	h.remote = rpub
+
+	// Generate random keypair for ECDH.
+	// If a private key is already set, use it instead of generating one (for testing).
+	if h.randomPrivKey == nil {
+		h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
+		if err != nil {
+			return err
+		}
+	}
+
+	// Check the signature.
+	token, err := h.staticSharedSecret(prv)
+	if err != nil {
+		return err
+	}
+	signedMsg := xor(token, h.initNonce)
+	remoteRandomPub, err := crypto.Ecrecover(signedMsg, msg.Signature[:])
+	if err != nil {
+		return err
+	}
+	h.remoteRandomPub, _ = importPublicKey(remoteRandomPub)
+	return nil
+}
+
 // secrets is called after the handshake is completed.
 // It extracts the connection secrets from the handshake values.
-func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) {
+func (h *encHandshake) secrets(auth, authResp []byte) (Secrets, error) {
 	ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen)
 	if err != nil {
-		return secrets{}, err
+		return Secrets{}, err
 	}
 
 	// derive base secrets from ephemeral key agreement
 	sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce))
 	aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret)
-	s := secrets{
-		Remote: h.remote,
+	s := Secrets{
+		remote: h.remote.ExportECDSA(),
 		AES:    aesSecret,
 		MAC:    crypto.Keccak256(ecdheSecret, aesSecret),
 	}
@@ -278,7 +479,7 @@ func (h *encHandshake) staticSharedSecret(prv *ecdsa.PrivateKey) ([]byte, error)
 // it should be called on the dialing side of the connection.
 //
 // prv is the local client's private key.
-func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s secrets, err error) {
+func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
 	h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
 	authMsg, err := h.makeAuthMsg(prv)
 	if err != nil {
@@ -288,6 +489,7 @@ func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ec
 	if err != nil {
 		return s, err
 	}
+
 	if _, err = conn.Write(authPacket); err != nil {
 		return s, err
 	}
@@ -342,72 +544,6 @@ func (h *encHandshake) handleAuthResp(msg *authRespV4) (err error) {
 	return err
 }
 
-// receiverEncHandshake negotiates a session token on conn.
-// it should be called on the listening side of the connection.
-//
-// prv is the local client's private key.
-func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s secrets, err error) {
-	authMsg := new(authMsgV4)
-	authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
-	if err != nil {
-		return s, err
-	}
-	h := new(encHandshake)
-	if err := h.handleAuthMsg(authMsg, prv); err != nil {
-		return s, err
-	}
-
-	authRespMsg, err := h.makeAuthResp()
-	if err != nil {
-		return s, err
-	}
-	var authRespPacket []byte
-	if authMsg.gotPlain {
-		authRespPacket, err = authRespMsg.sealPlain(h)
-	} else {
-		authRespPacket, err = sealEIP8(authRespMsg, h)
-	}
-	if err != nil {
-		return s, err
-	}
-	if _, err = conn.Write(authRespPacket); err != nil {
-		return s, err
-	}
-	return h.secrets(authPacket, authRespPacket)
-}
-
-func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error {
-	// Import the remote identity.
-	rpub, err := importPublicKey(msg.InitiatorPubkey[:])
-	if err != nil {
-		return err
-	}
-	h.initNonce = msg.Nonce[:]
-	h.remote = rpub
-
-	// Generate random keypair for ECDH.
-	// If a private key is already set, use it instead of generating one (for testing).
-	if h.randomPrivKey == nil {
-		h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
-		if err != nil {
-			return err
-		}
-	}
-
-	// Check the signature.
-	token, err := h.staticSharedSecret(prv)
-	if err != nil {
-		return err
-	}
-	signedMsg := xor(token, h.initNonce)
-	remoteRandomPub, err := crypto.Ecrecover(signedMsg, msg.Signature[:])
-	if err != nil {
-		return err
-	}
-	h.remoteRandomPub, _ = importPublicKey(remoteRandomPub)
-	return nil
-}
-
 func (h *encHandshake) makeAuthResp() (msg *authRespV4, err error) {
 	// Generate random nonce.
 	h.respNonce = make([]byte, shaLen)
@@ -531,201 +667,3 @@ func xor(one, other []byte) (xor []byte) {
 	}
 	return xor
 }
-
-var (
-	// this is used in place of actual frame header data.
-	// TODO: replace this when Msg contains the protocol type code.
-	zeroHeader = []byte{0xC2, 0x80, 0x80}
-	// sixteen zero bytes
-	zero16 = make([]byte, 16)
-)
-
-// rlpxFrameRW implements a simplified version of RLPx framing.
-// chunked messages are not supported and all headers are equal to
-// zeroHeader.
-//
-// rlpxFrameRW is not safe for concurrent use from multiple goroutines.
-type rlpxFrameRW struct {
-	conn io.ReadWriter
-	enc  cipher.Stream
-	dec  cipher.Stream
-
-	macCipher  cipher.Block
-	egressMAC  hash.Hash
-	ingressMAC hash.Hash
-
-	snappy bool
-}
-
-func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW {
-	macc, err := aes.NewCipher(s.MAC)
-	if err != nil {
-		panic("invalid MAC secret: " + err.Error())
-	}
-	encc, err := aes.NewCipher(s.AES)
-	if err != nil {
-		panic("invalid AES secret: " + err.Error())
-	}
-	// we use an all-zeroes IV for AES because the key used
-	// for encryption is ephemeral.
-	iv := make([]byte, encc.BlockSize())
-	return &rlpxFrameRW{
-		conn:       conn,
-		enc:        cipher.NewCTR(encc, iv),
-		dec:        cipher.NewCTR(encc, iv),
-		macCipher:  macc,
-		egressMAC:  s.EgressMAC,
-		ingressMAC: s.IngressMAC,
-	}
-}
-
-func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
-	ptype, _ := rlp.EncodeToBytes(msg.Code)
-
-	// if snappy is enabled, compress message now
-	if rw.snappy {
-		if msg.Size > maxUint24 {
-			return errPlainMessageTooLarge
-		}
-		payload, _ := ioutil.ReadAll(msg.Payload)
-		payload = snappy.Encode(nil, payload)
-
-		msg.Payload = bytes.NewReader(payload)
-		msg.Size = uint32(len(payload))
-	}
-	msg.meterSize = msg.Size
-	if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
-		m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode)
-		metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
-		metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1)
-	}
-	// write header
-	headbuf := make([]byte, 32)
-	fsize := uint32(len(ptype)) + msg.Size
-	if fsize > maxUint24 {
-		return errors.New("message size overflows uint24")
-	}
-	putInt24(fsize, headbuf) // TODO: check overflow
-	copy(headbuf[3:], zeroHeader)
-	rw.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted
-
-	// write header MAC
-	copy(headbuf[16:], updateMAC(rw.egressMAC, rw.macCipher, headbuf[:16]))
-	if _, err := rw.conn.Write(headbuf); err != nil {
-		return err
-	}
-
-	// write encrypted frame, updating the egress MAC hash with
-	// the data written to conn.
-	tee := cipher.StreamWriter{S: rw.enc, W: io.MultiWriter(rw.conn, rw.egressMAC)}
-	if _, err := tee.Write(ptype); err != nil {
-		return err
-	}
-	if _, err := io.Copy(tee, msg.Payload); err != nil {
-		return err
-	}
-	if padding := fsize % 16; padding > 0 {
-		if _, err := tee.Write(zero16[:16-padding]); err != nil {
-			return err
-		}
-	}
-
-	// write frame MAC. egress MAC hash is up to date because
-	// frame content was written to it as well.
-	fmacseed := rw.egressMAC.Sum(nil)
-	mac := updateMAC(rw.egressMAC, rw.macCipher, fmacseed)
-	_, err := rw.conn.Write(mac)
-	return err
-}
-
-func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
-	// read the header
-	headbuf := make([]byte, 32)
-	if _, err := io.ReadFull(rw.conn, headbuf); err != nil {
-		return msg, err
-	}
-	// verify header mac
-	shouldMAC := updateMAC(rw.ingressMAC, rw.macCipher, headbuf[:16])
-	if !hmac.Equal(shouldMAC, headbuf[16:]) {
-		return msg, errors.New("bad header MAC")
-	}
-	rw.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
-	fsize := readInt24(headbuf)
-	// ignore protocol type for now
-
-	// read the frame content
-	var rsize = fsize // frame size rounded up to 16 byte boundary
-	if padding := fsize % 16; padding > 0 {
-		rsize += 16 - padding
-	}
-	framebuf := make([]byte, rsize)
-	if _, err := io.ReadFull(rw.conn, framebuf); err != nil {
-		return msg, err
-	}
-
-	// read and validate frame MAC. we can re-use headbuf for that.
-	rw.ingressMAC.Write(framebuf)
-	fmacseed := rw.ingressMAC.Sum(nil)
-	if _, err := io.ReadFull(rw.conn, headbuf[:16]); err != nil {
-		return msg, err
-	}
-	shouldMAC = updateMAC(rw.ingressMAC, rw.macCipher, fmacseed)
-	if !hmac.Equal(shouldMAC, headbuf[:16]) {
-		return msg, errors.New("bad frame MAC")
-	}
-
-	// decrypt frame content
-	rw.dec.XORKeyStream(framebuf, framebuf)
-
-	// decode message code
-	content := bytes.NewReader(framebuf[:fsize])
-	if err := rlp.Decode(content, &msg.Code); err != nil {
-		return msg, err
-	}
-	msg.Size = uint32(content.Len())
-	msg.meterSize = msg.Size
-	msg.Payload = content
-
-	// if snappy is enabled, verify and decompress message
-	if rw.snappy {
-		payload, err := ioutil.ReadAll(msg.Payload)
-		if err != nil {
-			return msg, err
-		}
-		size, err := snappy.DecodedLen(payload)
-		if err != nil {
-			return msg, err
-		}
-		if size > int(maxUint24) {
-			return msg, errPlainMessageTooLarge
-		}
-		payload, err = snappy.Decode(nil, payload)
-		if err != nil {
-			return msg, err
-		}
-		msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload)
-	}
-	return msg, nil
-}
-
-// updateMAC reseeds the given hash with encrypted seed.
-// it returns the first 16 bytes of the hash sum after seeding.
-func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte {
-	aesbuf := make([]byte, aes.BlockSize)
-	block.Encrypt(aesbuf, mac.Sum(nil))
-	for i := range aesbuf {
-		aesbuf[i] ^= seed[i]
-	}
-	mac.Write(aesbuf)
-	return mac.Sum(nil)[:16]
-}
-
-func readInt24(b []byte) uint32 {
-	return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16
-}
-
-func putInt24(v uint32, b []byte) {
-	b[0] = byte(v >> 16)
-	b[1] = byte(v >> 8)
-	b[2] = byte(v)
-}
diff --git a/p2p/rlpx_test.go b/p2p/rlpx/rlpx_test.go
similarity index 58%
rename from p2p/rlpx_test.go
rename to p2p/rlpx/rlpx_test.go
index 3f686fe09f46b33e4e0ba3d4ecf230e60321b0cf..127a0181645494b1ecffcaf181a73cc6629a8756 100644
--- a/p2p/rlpx_test.go
+++ b/p2p/rlpx/rlpx_test.go
@@ -1,4 +1,4 @@
-// Copyright 2015 The go-ethereum Authors
+// Copyright 2020 The go-ethereum Authors
 // This file is part of the go-ethereum library.
 //
 // The go-ethereum library is free software: you can redistribute it and/or modify
@@ -14,298 +14,145 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-package p2p
+package rlpx
 
 import (
 	"bytes"
 	"crypto/ecdsa"
-	"crypto/rand"
-	"errors"
+	"encoding/hex"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"net"
 	"reflect"
 	"strings"
-	"sync"
 	"testing"
-	"time"
 
 	"github.com/davecgh/go-spew/spew"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/crypto/ecies"
-	"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
 	"github.com/ethereum/go-ethereum/rlp"
-	"golang.org/x/crypto/sha3"
+	"github.com/stretchr/testify/assert"
 )
 
-func TestSharedSecret(t *testing.T) {
-	prv0, _ := crypto.GenerateKey() // = ecdsa.GenerateKey(crypto.S256(), rand.Reader)
-	pub0 := &prv0.PublicKey
-	prv1, _ := crypto.GenerateKey()
-	pub1 := &prv1.PublicKey
-
-	ss0, err := ecies.ImportECDSA(prv0).GenerateShared(ecies.ImportECDSAPublic(pub1), sskLen, sskLen)
-	if err != nil {
-		return
-	}
-	ss1, err := ecies.ImportECDSA(prv1).GenerateShared(ecies.ImportECDSAPublic(pub0), sskLen, sskLen)
-	if err != nil {
-		return
-	}
-	t.Logf("Secret:\n%v %x\n%v %x", len(ss0), ss0, len(ss0), ss1)
-	if !bytes.Equal(ss0, ss1) {
-		t.Errorf("don't match :(")
-	}
+type message struct {
+	code uint64
+	data []byte
+	err  error
 }
 
-func TestEncHandshake(t *testing.T) {
-	for i := 0; i < 10; i++ {
-		start := time.Now()
-		if err := testEncHandshake(nil); err != nil {
-			t.Fatalf("i=%d %v", i, err)
-		}
-		t.Logf("(without token) %d %v\n", i+1, time.Since(start))
-	}
-	for i := 0; i < 10; i++ {
-		tok := make([]byte, shaLen)
-		rand.Reader.Read(tok)
-		start := time.Now()
-		if err := testEncHandshake(tok); err != nil {
-			t.Fatalf("i=%d %v", i, err)
-		}
-		t.Logf("(with token) %d %v\n", i+1, time.Since(start))
-	}
+func TestHandshake(t *testing.T) {
+	p1, p2 := createPeers(t)
+	p1.Close()
+	p2.Close()
 }
 
-func testEncHandshake(token []byte) error {
-	type result struct {
-		side   string
-		pubkey *ecdsa.PublicKey
-		err    error
-	}
-	var (
-		prv0, _  = crypto.GenerateKey()
-		prv1, _  = crypto.GenerateKey()
-		fd0, fd1 = net.Pipe()
-		c0, c1   = newRLPX(fd0).(*rlpx), newRLPX(fd1).(*rlpx)
-		output   = make(chan result)
-	)
-
-	go func() {
-		r := result{side: "initiator"}
-		defer func() { output <- r }()
-		defer fd0.Close()
+// This test checks that messages can be sent and received through WriteMsg/ReadMsg.
+func TestReadWriteMsg(t *testing.T) {
+	peer1, peer2 := createPeers(t)
+	defer peer1.Close()
+	defer peer2.Close()
 
-		r.pubkey, r.err = c0.doEncHandshake(prv0, &prv1.PublicKey)
-		if r.err != nil {
-			return
-		}
-		if !reflect.DeepEqual(r.pubkey, &prv1.PublicKey) {
-			r.err = fmt.Errorf("remote pubkey mismatch: got %v, want: %v", r.pubkey, &prv1.PublicKey)
-		}
-	}()
-	go func() {
-		r := result{side: "receiver"}
-		defer func() { output <- r }()
-		defer fd1.Close()
-
-		r.pubkey, r.err = c1.doEncHandshake(prv1, nil)
-		if r.err != nil {
-			return
-		}
-		if !reflect.DeepEqual(r.pubkey, &prv0.PublicKey) {
-			r.err = fmt.Errorf("remote ID mismatch: got %v, want: %v", r.pubkey, &prv0.PublicKey)
-		}
-	}()
+	testCode := uint64(23)
+	testData := []byte("test")
+	checkMsgReadWrite(t, peer1, peer2, testCode, testData)
 
-	// wait for results from both sides
-	r1, r2 := <-output, <-output
-	if r1.err != nil {
-		return fmt.Errorf("%s side error: %v", r1.side, r1.err)
-	}
-	if r2.err != nil {
-		return fmt.Errorf("%s side error: %v", r2.side, r2.err)
-	}
-
-	// compare derived secrets
-	if !reflect.DeepEqual(c0.rw.egressMAC, c1.rw.ingressMAC) {
-		return fmt.Errorf("egress mac mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.egressMAC, c1.rw.ingressMAC)
-	}
-	if !reflect.DeepEqual(c0.rw.ingressMAC, c1.rw.egressMAC) {
-		return fmt.Errorf("ingress mac mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.ingressMAC, c1.rw.egressMAC)
-	}
-	if !reflect.DeepEqual(c0.rw.enc, c1.rw.enc) {
-		return fmt.Errorf("enc cipher mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.enc, c1.rw.enc)
-	}
-	if !reflect.DeepEqual(c0.rw.dec, c1.rw.dec) {
-		return fmt.Errorf("dec cipher mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.dec, c1.rw.dec)
-	}
-	return nil
+	t.Log("enabling snappy")
+	peer1.SetSnappy(true)
+	peer2.SetSnappy(true)
+	checkMsgReadWrite(t, peer1, peer2, testCode, testData)
 }
 
-func TestProtocolHandshake(t *testing.T) {
-	var (
-		prv0, _ = crypto.GenerateKey()
-		pub0    = crypto.FromECDSAPub(&prv0.PublicKey)[1:]
-		hs0     = &protoHandshake{Version: 3, ID: pub0, Caps: []Cap{{"a", 0}, {"b", 2}}}
-
-		prv1, _ = crypto.GenerateKey()
-		pub1    = crypto.FromECDSAPub(&prv1.PublicKey)[1:]
-		hs1     = &protoHandshake{Version: 3, ID: pub1, Caps: []Cap{{"c", 1}, {"d", 3}}}
-
-		wg sync.WaitGroup
-	)
+func checkMsgReadWrite(t *testing.T, p1, p2 *Conn, msgCode uint64, msgData []byte) {
+	// Set up the reader.
+	ch := make(chan message, 1)
+	go func() {
+		var msg message
+		msg.code, msg.data, _, msg.err = p1.Read()
+		ch <- msg
+	}()
 
-	fd0, fd1, err := pipes.TCPPipe()
+	// Write the message.
+	_, err := p2.Write(msgCode, msgData)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	wg.Add(2)
-	go func() {
-		defer wg.Done()
-		defer fd0.Close()
-		rlpx := newRLPX(fd0)
-		rpubkey, err := rlpx.doEncHandshake(prv0, &prv1.PublicKey)
-		if err != nil {
-			t.Errorf("dial side enc handshake failed: %v", err)
-			return
-		}
-		if !reflect.DeepEqual(rpubkey, &prv1.PublicKey) {
-			t.Errorf("dial side remote pubkey mismatch: got %v, want %v", rpubkey, &prv1.PublicKey)
-			return
-		}
+	// Check it was received correctly.
+	msg := <-ch
+	assert.Equal(t, msgCode, msg.code, "wrong message code returned from ReadMsg")
+	assert.Equal(t, msgData, msg.data, "wrong message data returned from ReadMsg")
+}
 
-		phs, err := rlpx.doProtoHandshake(hs0)
-		if err != nil {
-			t.Errorf("dial side proto handshake error: %v", err)
-			return
-		}
-		phs.Rest = nil
-		if !reflect.DeepEqual(phs, hs1) {
-			t.Errorf("dial side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs1))
-			return
-		}
-		rlpx.close(DiscQuitting)
-	}()
-	go func() {
-		defer wg.Done()
-		defer fd1.Close()
-		rlpx := newRLPX(fd1)
-		rpubkey, err := rlpx.doEncHandshake(prv1, nil)
-		if err != nil {
-			t.Errorf("listen side enc handshake failed: %v", err)
-			return
-		}
-		if !reflect.DeepEqual(rpubkey, &prv0.PublicKey) {
-			t.Errorf("listen side remote pubkey mismatch: got %v, want %v", rpubkey, &prv0.PublicKey)
-			return
-		}
+func createPeers(t *testing.T) (peer1, peer2 *Conn) {
+	conn1, conn2 := net.Pipe()
+	key1, key2 := newkey(), newkey()
+	peer1 = NewConn(conn1, &key2.PublicKey) // dialer
+	peer2 = NewConn(conn2, nil)             // listener
+	doHandshake(t, peer1, peer2, key1, key2)
+	return peer1, peer2
+}
 
-		phs, err := rlpx.doProtoHandshake(hs1)
+func doHandshake(t *testing.T, peer1, peer2 *Conn, key1, key2 *ecdsa.PrivateKey) {
+	keyChan := make(chan *ecdsa.PublicKey, 1)
+	go func() {
+		pubKey, err := peer2.Handshake(key2)
 		if err != nil {
-			t.Errorf("listen side proto handshake error: %v", err)
-			return
-		}
-		phs.Rest = nil
-		if !reflect.DeepEqual(phs, hs0) {
-			t.Errorf("listen side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs0))
-			return
-		}
-
-		if err := ExpectMsg(rlpx, discMsg, []DiscReason{DiscQuitting}); err != nil {
-			t.Errorf("error receiving disconnect: %v", err)
+			t.Errorf("peer2 could not do handshake: %v", err)
 		}
+		keyChan <- pubKey
 	}()
-	wg.Wait()
-}
 
-func TestProtocolHandshakeErrors(t *testing.T) {
-	tests := []struct {
-		code uint64
-		msg  interface{}
-		err  error
-	}{
-		{
-			code: discMsg,
-			msg:  []DiscReason{DiscQuitting},
-			err:  DiscQuitting,
-		},
-		{
-			code: 0x989898,
-			msg:  []byte{1},
-			err:  errors.New("expected handshake, got 989898"),
-		},
-		{
-			code: handshakeMsg,
-			msg:  make([]byte, baseProtocolMaxMsgSize+2),
-			err:  errors.New("message too big"),
-		},
-		{
-			code: handshakeMsg,
-			msg:  []byte{1, 2, 3},
-			err:  newPeerError(errInvalidMsg, "(code 0) (size 4) rlp: expected input list for p2p.protoHandshake"),
-		},
-		{
-			code: handshakeMsg,
-			msg:  &protoHandshake{Version: 3},
-			err:  DiscInvalidIdentity,
-		},
+	pubKey2, err := peer1.Handshake(key1)
+	if err != nil {
+		t.Errorf("peer1 could not do handshake: %v", err)
 	}
+	pubKey1 := <-keyChan
 
-	for i, test := range tests {
-		p1, p2 := MsgPipe()
-		go Send(p1, test.code, test.msg)
-		_, err := readProtocolHandshake(p2)
-		if !reflect.DeepEqual(err, test.err) {
-			t.Errorf("test %d: error mismatch: got %q, want %q", i, err, test.err)
-		}
+	// Confirm the handshake was successful.
+	if !reflect.DeepEqual(pubKey1, &key1.PublicKey) || !reflect.DeepEqual(pubKey2, &key2.PublicKey) {
+		t.Fatal("unsuccessful handshake")
 	}
 }
 
-func TestRLPXFrameFake(t *testing.T) {
-	buf := new(bytes.Buffer)
+// This test checks the frame data of written messages.
+func TestFrameReadWrite(t *testing.T) {
+	conn := NewConn(nil, nil)
 	hash := fakeHash([]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})
-	rw := newRLPXFrameRW(buf, secrets{
+	conn.InitWithSecrets(Secrets{
 		AES:        crypto.Keccak256(),
 		MAC:        crypto.Keccak256(),
 		IngressMAC: hash,
 		EgressMAC:  hash,
 	})
+	h := conn.handshake
 
 	golden := unhex(`
-00828ddae471818bb0bfa6b551d1cb42
-01010101010101010101010101010101
-ba628a4ba590cb43f7848f41c4382885
-01010101010101010101010101010101
-`)
-
-	// Check WriteMsg. This puts a message into the buffer.
-	if err := Send(rw, 8, []uint{1, 2, 3, 4}); err != nil {
+		00828ddae471818bb0bfa6b551d1cb42
+		01010101010101010101010101010101
+		ba628a4ba590cb43f7848f41c4382885
+		01010101010101010101010101010101
+	`)
+	msgCode := uint64(8)
+	msg := []uint{1, 2, 3, 4}
+	msgEnc, _ := rlp.EncodeToBytes(msg)
+
+	// Check writeFrame. The frame that's written should be equal to the test vector.
+	buf := new(bytes.Buffer)
+	if err := h.writeFrame(buf, msgCode, msgEnc); err != nil {
 		t.Fatalf("WriteMsg error: %v", err)
 	}
-	written := buf.Bytes()
-	if !bytes.Equal(written, golden) {
-		t.Fatalf("output mismatch:\n  got:  %x\n  want: %x", written, golden)
+	if !bytes.Equal(buf.Bytes(), golden) {
+		t.Fatalf("output mismatch:\n  got:  %x\n  want: %x", buf.Bytes(), golden)
 	}
 
-	// Check ReadMsg. It reads the message encoded by WriteMsg, which
-	// is equivalent to the golden message above.
-	msg, err := rw.ReadMsg()
+	// Check readFrame on the test vector.
+	content, err := h.readFrame(bytes.NewReader(golden))
 	if err != nil {
 		t.Fatalf("ReadMsg error: %v", err)
 	}
-	if msg.Size != 5 {
-		t.Errorf("msg size mismatch: got %d, want %d", msg.Size, 5)
-	}
-	if msg.Code != 8 {
-		t.Errorf("msg code mismatch: got %d, want %d", msg.Code, 8)
-	}
-	payload, _ := ioutil.ReadAll(msg.Payload)
-	wantPayload := unhex("C401020304")
-	if !bytes.Equal(payload, wantPayload) {
-		t.Errorf("msg payload mismatch:\ngot  %x\nwant %x", payload, wantPayload)
+	wantContent := unhex("08C401020304")
+	if !bytes.Equal(content, wantContent) {
+		t.Errorf("frame content mismatch:\ngot  %x\nwant %x", content, wantContent)
 	}
 }
 
@@ -314,66 +161,8 @@ type fakeHash []byte
 func (fakeHash) Write(p []byte) (int, error) { return len(p), nil }
 func (fakeHash) Reset()                      {}
 func (fakeHash) BlockSize() int              { return 0 }
-
-func (h fakeHash) Size() int           { return len(h) }
-func (h fakeHash) Sum(b []byte) []byte { return append(b, h...) }
-
-func TestRLPXFrameRW(t *testing.T) {
-	var (
-		aesSecret      = make([]byte, 16)
-		macSecret      = make([]byte, 16)
-		egressMACinit  = make([]byte, 32)
-		ingressMACinit = make([]byte, 32)
-	)
-	for _, s := range [][]byte{aesSecret, macSecret, egressMACinit, ingressMACinit} {
-		rand.Read(s)
-	}
-	conn := new(bytes.Buffer)
-
-	s1 := secrets{
-		AES:        aesSecret,
-		MAC:        macSecret,
-		EgressMAC:  sha3.NewLegacyKeccak256(),
-		IngressMAC: sha3.NewLegacyKeccak256(),
-	}
-	s1.EgressMAC.Write(egressMACinit)
-	s1.IngressMAC.Write(ingressMACinit)
-	rw1 := newRLPXFrameRW(conn, s1)
-
-	s2 := secrets{
-		AES:        aesSecret,
-		MAC:        macSecret,
-		EgressMAC:  sha3.NewLegacyKeccak256(),
-		IngressMAC: sha3.NewLegacyKeccak256(),
-	}
-	s2.EgressMAC.Write(ingressMACinit)
-	s2.IngressMAC.Write(egressMACinit)
-	rw2 := newRLPXFrameRW(conn, s2)
-
-	// send some messages
-	for i := 0; i < 10; i++ {
-		// write message into conn buffer
-		wmsg := []interface{}{"foo", "bar", strings.Repeat("test", i)}
-		err := Send(rw1, uint64(i), wmsg)
-		if err != nil {
-			t.Fatalf("WriteMsg error (i=%d): %v", i, err)
-		}
-
-		// read message that rw1 just wrote
-		msg, err := rw2.ReadMsg()
-		if err != nil {
-			t.Fatalf("ReadMsg error (i=%d): %v", i, err)
-		}
-		if msg.Code != uint64(i) {
-			t.Fatalf("msg code mismatch: got %d, want %d", msg.Code, i)
-		}
-		payload, _ := ioutil.ReadAll(msg.Payload)
-		wantPayload, _ := rlp.EncodeToBytes(wmsg)
-		if !bytes.Equal(payload, wantPayload) {
-			t.Fatalf("msg payload mismatch:\ngot  %x\nwant %x", payload, wantPayload)
-		}
-	}
-}
+func (h fakeHash) Size() int                 { return len(h) }
+func (h fakeHash) Sum(b []byte) []byte       { return append(b, h...) }
 
 type handshakeAuthTest struct {
 	input       string
@@ -598,3 +387,20 @@ func TestHandshakeForwardCompatibility(t *testing.T) {
 		t.Errorf("ingress-mac('foo') mismatch:\ngot %x\nwant %x", fooIngressHash, wantFooIngressHash)
 	}
 }
+
+func unhex(str string) []byte {
+	r := strings.NewReplacer("\t", "", " ", "", "\n", "")
+	b, err := hex.DecodeString(r.Replace(str))
+	if err != nil {
+		panic(fmt.Sprintf("invalid hex string: %q", str))
+	}
+	return b
+}
+
+func newkey() *ecdsa.PrivateKey {
+	key, err := crypto.GenerateKey()
+	if err != nil {
+		panic("couldn't generate key: " + err.Error())
+	}
+	return key
+}
diff --git a/p2p/server.go b/p2p/server.go
index 1fe5f3978923c5279d87857f9baabbf74a795cc7..a343f4320aca8997ebb8970832eecd482a37e6e3 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -166,7 +166,7 @@ type Server struct {
 
 	// Hooks for testing. These are useful because we can inhibit
 	// the whole protocol stack.
-	newTransport func(net.Conn) transport
+	newTransport func(net.Conn, *ecdsa.PublicKey) transport
 	newPeerHook  func(*Peer)
 	listenFunc   func(network, addr string) (net.Listener, error)
 
@@ -231,7 +231,7 @@ type conn struct {
 
 type transport interface {
 	// The two handshakes.
-	doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error)
+	doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error)
 	doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
 	// The MsgReadWriter can only be used after the encryption
 	// handshake has completed. The code uses conn.id to track this
@@ -914,7 +914,13 @@ func (srv *Server) checkInboundConn(fd net.Conn, remoteIP net.IP) error {
 // as a peer. It returns when the connection has been added as a peer
 // or the handshakes have failed.
 func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
-	c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
+	c := &conn{fd: fd, flags: flags, cont: make(chan error)}
+	if dialDest == nil {
+		c.transport = srv.newTransport(fd, nil)
+	} else {
+		c.transport = srv.newTransport(fd, dialDest.Pubkey())
+	}
+
 	err := srv.setupConn(c, flags, dialDest)
 	if err != nil {
 		c.close(err)
@@ -943,16 +949,12 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
 	}
 
 	// Run the RLPx handshake.
-	remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)
+	remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
 	if err != nil {
 		srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
 		return err
 	}
 	if dialDest != nil {
-		// For dialed connections, check that the remote public key matches.
-		if dialPubkey.X.Cmp(remotePubkey.X) != 0 || dialPubkey.Y.Cmp(remotePubkey.Y) != 0 {
-			return DiscUnexpectedIdentity
-		}
 		c.node = dialDest
 	} else {
 		c.node = nodeFromConn(remotePubkey, c.fd)
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 7dc344a67df5ddcc3942582afe00b6236df0fe1f..a5b3190aede23072ad4b83d6c8b50c8c4428426e 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -18,6 +18,7 @@ package p2p
 
 import (
 	"crypto/ecdsa"
+	"crypto/sha256"
 	"errors"
 	"io"
 	"math/rand"
@@ -31,28 +32,27 @@ import (
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/enr"
-	"golang.org/x/crypto/sha3"
+	"github.com/ethereum/go-ethereum/p2p/rlpx"
 )
 
 type testTransport struct {
-	rpub *ecdsa.PublicKey
-	*rlpx
-
+	*rlpxTransport
+	rpub     *ecdsa.PublicKey
 	closeErr error
 }
 
-func newTestTransport(rpub *ecdsa.PublicKey, fd net.Conn) transport {
-	wrapped := newRLPX(fd).(*rlpx)
-	wrapped.rw = newRLPXFrameRW(fd, secrets{
-		MAC:        zero16,
-		AES:        zero16,
-		IngressMAC: sha3.NewLegacyKeccak256(),
-		EgressMAC:  sha3.NewLegacyKeccak256(),
+func newTestTransport(rpub *ecdsa.PublicKey, fd net.Conn, dialDest *ecdsa.PublicKey) transport {
+	wrapped := newRLPX(fd, dialDest).(*rlpxTransport)
+	wrapped.conn.InitWithSecrets(rlpx.Secrets{
+		AES:        make([]byte, 16),
+		MAC:        make([]byte, 16),
+		EgressMAC:  sha256.New(),
+		IngressMAC: sha256.New(),
 	})
-	return &testTransport{rpub: rpub, rlpx: wrapped}
+	return &testTransport{rpub: rpub, rlpxTransport: wrapped}
 }
 
-func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
+func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
 	return c.rpub, nil
 }
 
@@ -62,7 +62,7 @@ func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake,
 }
 
 func (c *testTransport) close(err error) {
-	c.rlpx.fd.Close()
+	c.conn.Close()
 	c.closeErr = err
 }
 
@@ -76,9 +76,11 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *
 		Logger:      testlog.Logger(t, log.LvlTrace),
 	}
 	server := &Server{
-		Config:       config,
-		newPeerHook:  pf,
-		newTransport: func(fd net.Conn) transport { return newTestTransport(remoteKey, fd) },
+		Config:      config,
+		newPeerHook: pf,
+		newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
+			return newTestTransport(remoteKey, fd, dialDest)
+		},
 	}
 	if err := server.Start(); err != nil {
 		t.Fatalf("Could not start server: %v", err)
@@ -253,7 +255,7 @@ func TestServerAtCap(t *testing.T) {
 
 	newconn := func(id enode.ID) *conn {
 		fd, _ := net.Pipe()
-		tx := newTestTransport(&trustedNode.PublicKey, fd)
+		tx := newTestTransport(&trustedNode.PublicKey, fd, nil)
 		node := enode.SignNull(new(enr.Record), id)
 		return &conn{fd: fd, transport: tx, flags: inboundConn, node: node, cont: make(chan error)}
 	}
@@ -321,7 +323,7 @@ func TestServerPeerLimits(t *testing.T) {
 			Protocols:   []Protocol{discard},
 			Logger:      testlog.Logger(t, log.LvlTrace),
 		},
-		newTransport: func(fd net.Conn) transport { return tp },
+		newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp },
 	}
 	if err := srv.Start(); err != nil {
 		t.Fatalf("couldn't start server: %v", err)
@@ -390,13 +392,6 @@ func TestServerSetupConn(t *testing.T) {
 			wantCalls:    "doEncHandshake,close,",
 			wantCloseErr: errors.New("read error"),
 		},
-		{
-			tt:           &setupTransport{pubkey: clientpub},
-			dialDest:     enode.NewV4(&newkey().PublicKey, nil, 0, 0),
-			flags:        dynDialedConn,
-			wantCalls:    "doEncHandshake,close,",
-			wantCloseErr: DiscUnexpectedIdentity,
-		},
 		{
 			tt:           &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}},
 			dialDest:     enode.NewV4(clientpub, nil, 0, 0),
@@ -437,7 +432,7 @@ func TestServerSetupConn(t *testing.T) {
 			}
 			srv := &Server{
 				Config:       cfg,
-				newTransport: func(fd net.Conn) transport { return test.tt },
+				newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt },
 				log:          cfg.Logger,
 			}
 			if !test.dontstart {
@@ -468,7 +463,7 @@ type setupTransport struct {
 	closeErr error
 }
 
-func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
+func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
 	c.calls += "doEncHandshake,"
 	return c.pubkey, c.encHandshakeErr
 }
@@ -522,9 +517,9 @@ func TestServerInboundThrottle(t *testing.T) {
 			Protocols:   []Protocol{discard},
 			Logger:      testlog.Logger(t, log.LvlTrace),
 		},
-		newTransport: func(fd net.Conn) transport {
+		newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
 			newTransportCalled <- struct{}{}
-			return newRLPX(fd)
+			return newRLPX(fd, dialDest)
 		},
 		listenFunc: func(network, laddr string) (net.Listener, error) {
 			fakeAddr := &net.TCPAddr{IP: net.IP{95, 33, 21, 2}, Port: 4444}
diff --git a/p2p/transport.go b/p2p/transport.go
new file mode 100644
index 0000000000000000000000000000000000000000..3f1cd7d64f20211f4bfb73f29fb0b069002ca74e
--- /dev/null
+++ b/p2p/transport.go
@@ -0,0 +1,177 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package p2p
+
+import (
+	"bytes"
+	"crypto/ecdsa"
+	"fmt"
+	"io"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common/bitutil"
+	"github.com/ethereum/go-ethereum/metrics"
+	"github.com/ethereum/go-ethereum/p2p/rlpx"
+	"github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+	// total timeout for encryption handshake and protocol
+	// handshake in both directions.
+	handshakeTimeout = 5 * time.Second
+
+	// This is the timeout for sending the disconnect reason.
+	// This is shorter than the usual timeout because we don't want
+	// to wait if the connection is known to be bad anyway.
+	discWriteTimeout = 1 * time.Second
+)
+
+// rlpxTransport is the transport used by actual (non-test) connections.
+// It wraps an RLPx connection with locks and read/write deadlines.
+type rlpxTransport struct {
+	rmu, wmu sync.Mutex
+	wbuf     bytes.Buffer
+	conn     *rlpx.Conn
+}
+
+func newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
+	return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
+}
+
+func (t *rlpxTransport) ReadMsg() (Msg, error) {
+	t.rmu.Lock()
+	defer t.rmu.Unlock()
+
+	var msg Msg
+	t.conn.SetReadDeadline(time.Now().Add(frameReadTimeout))
+	code, data, wireSize, err := t.conn.Read()
+	if err == nil {
+		msg = Msg{
+			ReceivedAt: time.Now(),
+			Code:       code,
+			Size:       uint32(len(data)),
+			meterSize:  uint32(wireSize),
+			Payload:    bytes.NewReader(data),
+		}
+	}
+	return msg, err
+}
+
+func (t *rlpxTransport) WriteMsg(msg Msg) error {
+	t.wmu.Lock()
+	defer t.wmu.Unlock()
+
+	// Copy message data to write buffer.
+	t.wbuf.Reset()
+	if _, err := io.CopyN(&t.wbuf, msg.Payload, int64(msg.Size)); err != nil {
+		return err
+	}
+
+	// Write the message.
+	t.conn.SetWriteDeadline(time.Now().Add(frameWriteTimeout))
+	size, err := t.conn.Write(msg.Code, t.wbuf.Bytes())
+	if err != nil {
+		return err
+	}
+
+	// Set metrics.
+	msg.meterSize = size
+	if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
+		m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode)
+		metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
+		metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1)
+	}
+	return nil
+}
+
+func (t *rlpxTransport) close(err error) {
+	t.wmu.Lock()
+	defer t.wmu.Unlock()
+
+	// Tell the remote end why we're disconnecting if possible.
+	// We only bother doing this if the underlying connection supports
+	// setting a timeout tough.
+	if t.conn != nil {
+		if r, ok := err.(DiscReason); ok && r != DiscNetworkError {
+			deadline := time.Now().Add(discWriteTimeout)
+			if err := t.conn.SetWriteDeadline(deadline); err == nil {
+				// Connection supports write deadline.
+				t.wbuf.Reset()
+				rlp.Encode(&t.wbuf, []DiscReason{r})
+				t.conn.Write(discMsg, t.wbuf.Bytes())
+			}
+		}
+	}
+	t.conn.Close()
+}
+
+func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
+	t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
+	return t.conn.Handshake(prv)
+}
+
+func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
+	// Writing our handshake happens concurrently, we prefer
+	// returning the handshake read error. If the remote side
+	// disconnects us early with a valid reason, we should return it
+	// as the error so it can be tracked elsewhere.
+	werr := make(chan error, 1)
+	go func() { werr <- Send(t, handshakeMsg, our) }()
+	if their, err = readProtocolHandshake(t); err != nil {
+		<-werr // make sure the write terminates too
+		return nil, err
+	}
+	if err := <-werr; err != nil {
+		return nil, fmt.Errorf("write error: %v", err)
+	}
+	// If the protocol version supports Snappy encoding, upgrade immediately
+	t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
+
+	return their, nil
+}
+
+func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) {
+	msg, err := rw.ReadMsg()
+	if err != nil {
+		return nil, err
+	}
+	if msg.Size > baseProtocolMaxMsgSize {
+		return nil, fmt.Errorf("message too big")
+	}
+	if msg.Code == discMsg {
+		// Disconnect before protocol handshake is valid according to the
+		// spec and we send it ourself if the post-handshake checks fail.
+		// We can't return the reason directly, though, because it is echoed
+		// back otherwise. Wrap it in a string instead.
+		var reason [1]DiscReason
+		rlp.Decode(msg.Payload, &reason)
+		return nil, reason[0]
+	}
+	if msg.Code != handshakeMsg {
+		return nil, fmt.Errorf("expected handshake, got %x", msg.Code)
+	}
+	var hs protoHandshake
+	if err := msg.Decode(&hs); err != nil {
+		return nil, err
+	}
+	if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) {
+		return nil, DiscInvalidIdentity
+	}
+	return &hs, nil
+}
diff --git a/p2p/transport_test.go b/p2p/transport_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..753ea30bf196ef09cdcb70c3ae868b23a9e5fbb0
--- /dev/null
+++ b/p2p/transport_test.go
@@ -0,0 +1,148 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package p2p
+
+import (
+	"errors"
+	"reflect"
+	"sync"
+	"testing"
+
+	"github.com/davecgh/go-spew/spew"
+	"github.com/ethereum/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
+)
+
+func TestProtocolHandshake(t *testing.T) {
+	var (
+		prv0, _ = crypto.GenerateKey()
+		pub0    = crypto.FromECDSAPub(&prv0.PublicKey)[1:]
+		hs0     = &protoHandshake{Version: 3, ID: pub0, Caps: []Cap{{"a", 0}, {"b", 2}}}
+
+		prv1, _ = crypto.GenerateKey()
+		pub1    = crypto.FromECDSAPub(&prv1.PublicKey)[1:]
+		hs1     = &protoHandshake{Version: 3, ID: pub1, Caps: []Cap{{"c", 1}, {"d", 3}}}
+
+		wg sync.WaitGroup
+	)
+
+	fd0, fd1, err := pipes.TCPPipe()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	wg.Add(2)
+	go func() {
+		defer wg.Done()
+		defer fd0.Close()
+		frame := newRLPX(fd0, &prv1.PublicKey)
+		rpubkey, err := frame.doEncHandshake(prv0)
+		if err != nil {
+			t.Errorf("dial side enc handshake failed: %v", err)
+			return
+		}
+		if !reflect.DeepEqual(rpubkey, &prv1.PublicKey) {
+			t.Errorf("dial side remote pubkey mismatch: got %v, want %v", rpubkey, &prv1.PublicKey)
+			return
+		}
+
+		phs, err := frame.doProtoHandshake(hs0)
+		if err != nil {
+			t.Errorf("dial side proto handshake error: %v", err)
+			return
+		}
+		phs.Rest = nil
+		if !reflect.DeepEqual(phs, hs1) {
+			t.Errorf("dial side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs1))
+			return
+		}
+		frame.close(DiscQuitting)
+	}()
+	go func() {
+		defer wg.Done()
+		defer fd1.Close()
+		rlpx := newRLPX(fd1, nil)
+		rpubkey, err := rlpx.doEncHandshake(prv1)
+		if err != nil {
+			t.Errorf("listen side enc handshake failed: %v", err)
+			return
+		}
+		if !reflect.DeepEqual(rpubkey, &prv0.PublicKey) {
+			t.Errorf("listen side remote pubkey mismatch: got %v, want %v", rpubkey, &prv0.PublicKey)
+			return
+		}
+
+		phs, err := rlpx.doProtoHandshake(hs1)
+		if err != nil {
+			t.Errorf("listen side proto handshake error: %v", err)
+			return
+		}
+		phs.Rest = nil
+		if !reflect.DeepEqual(phs, hs0) {
+			t.Errorf("listen side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs0))
+			return
+		}
+
+		if err := ExpectMsg(rlpx, discMsg, []DiscReason{DiscQuitting}); err != nil {
+			t.Errorf("error receiving disconnect: %v", err)
+		}
+	}()
+	wg.Wait()
+}
+
+func TestProtocolHandshakeErrors(t *testing.T) {
+	tests := []struct {
+		code uint64
+		msg  interface{}
+		err  error
+	}{
+		{
+			code: discMsg,
+			msg:  []DiscReason{DiscQuitting},
+			err:  DiscQuitting,
+		},
+		{
+			code: 0x989898,
+			msg:  []byte{1},
+			err:  errors.New("expected handshake, got 989898"),
+		},
+		{
+			code: handshakeMsg,
+			msg:  make([]byte, baseProtocolMaxMsgSize+2),
+			err:  errors.New("message too big"),
+		},
+		{
+			code: handshakeMsg,
+			msg:  []byte{1, 2, 3},
+			err:  newPeerError(errInvalidMsg, "(code 0) (size 4) rlp: expected input list for p2p.protoHandshake"),
+		},
+		{
+			code: handshakeMsg,
+			msg:  &protoHandshake{Version: 3},
+			err:  DiscInvalidIdentity,
+		},
+	}
+
+	for i, test := range tests {
+		p1, p2 := MsgPipe()
+		go Send(p1, test.code, test.msg)
+		_, err := readProtocolHandshake(p2)
+		if !reflect.DeepEqual(err, test.err) {
+			t.Errorf("test %d: error mismatch: got %q, want %q", i, err, test.err)
+		}
+	}
+}