diff --git a/p2p/message.go b/p2p/message.go
index d3b8b74d475a934e349c81b7e044a6bc8c7f94ed..f5418ff473d5c7869e07d084b2c3746d78be6a13 100644
--- a/p2p/message.go
+++ b/p2p/message.go
@@ -3,9 +3,11 @@ package p2p
 import (
 	"bytes"
 	"encoding/binary"
+	"errors"
 	"io"
 	"io/ioutil"
 	"math/big"
+	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/ethutil"
 	"github.com/ethereum/go-ethereum/rlp"
@@ -153,3 +155,78 @@ func (r *postrack) ReadByte() (byte, error) {
 	}
 	return b, err
 }
+
+// MsgPipe creates a message pipe. Reads on one end are matched
+// with writes on the other. The pipe is full-duplex, both ends
+// implement MsgReadWriter.
+func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
+	var (
+		c1, c2  = make(chan Msg), make(chan Msg)
+		closing = make(chan struct{})
+		closed  = new(int32)
+		rw1     = &MsgPipeRW{c1, c2, closing, closed}
+		rw2     = &MsgPipeRW{c2, c1, closing, closed}
+	)
+	return rw1, rw2
+}
+
+// ErrPipeClosed is returned from pipe operations after the
+// pipe has been closed.
+var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe")
+
+// MsgPipeRW is an endpoint of a MsgReadWriter pipe.
+type MsgPipeRW struct {
+	w       chan<- Msg
+	r       <-chan Msg
+	closing chan struct{}
+	closed  *int32
+}
+
+// WriteMsg sends a messsage on the pipe.
+// It blocks until the receiver has consumed the message payload.
+func (p *MsgPipeRW) WriteMsg(msg Msg) error {
+	if atomic.LoadInt32(p.closed) == 0 {
+		consumed := make(chan struct{}, 1)
+		msg.Payload = &eofSignal{msg.Payload, int64(msg.Size), consumed}
+		select {
+		case p.w <- msg:
+			if msg.Size > 0 {
+				// wait for payload read or discard
+				<-consumed
+			}
+			return nil
+		case <-p.closing:
+		}
+	}
+	return ErrPipeClosed
+}
+
+// EncodeMsg is a convenient shorthand for sending an RLP-encoded message.
+func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error {
+	return p.WriteMsg(NewMsg(code, data...))
+}
+
+// ReadMsg returns a message sent on the other end of the pipe.
+func (p *MsgPipeRW) ReadMsg() (Msg, error) {
+	if atomic.LoadInt32(p.closed) == 0 {
+		select {
+		case msg := <-p.r:
+			return msg, nil
+		case <-p.closing:
+		}
+	}
+	return Msg{}, ErrPipeClosed
+}
+
+// Close unblocks any pending ReadMsg and WriteMsg calls on both ends
+// of the pipe. They will return ErrPipeClosed. Note that Close does
+// not interrupt any reads from a message payload.
+func (p *MsgPipeRW) Close() error {
+	if atomic.AddInt32(p.closed, 1) != 1 {
+		// someone else is already closing
+		atomic.StoreInt32(p.closed, 1) // avoid overflow
+		return nil
+	}
+	close(p.closing)
+	return nil
+}
diff --git a/p2p/message_test.go b/p2p/message_test.go
index 7b39b061db034fa3388462763f7fc71a68b6ab7e..0fbcfeef0f42ab65e583a8e23263c0f3a979a0ff 100644
--- a/p2p/message_test.go
+++ b/p2p/message_test.go
@@ -2,8 +2,11 @@ package p2p
 
 import (
 	"bytes"
+	"fmt"
 	"io/ioutil"
+	"runtime"
 	"testing"
+	"time"
 
 	"github.com/ethereum/go-ethereum/ethutil"
 )
@@ -68,3 +71,63 @@ func TestDecodeRealMsg(t *testing.T) {
 		t.Errorf("incorrect code %d, want %d", msg.Code, 0)
 	}
 }
+
+func ExampleMsgPipe() {
+	rw1, rw2 := MsgPipe()
+	go func() {
+		rw1.EncodeMsg(8, []byte{0, 0})
+		rw1.EncodeMsg(5, []byte{1, 1})
+		rw1.Close()
+	}()
+
+	for {
+		msg, err := rw2.ReadMsg()
+		if err != nil {
+			break
+		}
+		var data [1][]byte
+		msg.Decode(&data)
+		fmt.Printf("msg: %d, %x\n", msg.Code, data[0])
+	}
+	// Output:
+	// msg: 8, 0000
+	// msg: 5, 0101
+}
+
+func TestMsgPipeUnblockWrite(t *testing.T) {
+loop:
+	for i := 0; i < 100; i++ {
+		rw1, rw2 := MsgPipe()
+		done := make(chan struct{})
+		go func() {
+			if err := rw1.EncodeMsg(1); err == nil {
+				t.Error("EncodeMsg returned nil error")
+			} else if err != ErrPipeClosed {
+				t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed)
+			}
+			close(done)
+		}()
+
+		// this call should ensure that EncodeMsg is waiting to
+		// deliver sometimes. if this isn't done, Close is likely to
+		// be executed before EncodeMsg starts and then we won't test
+		// all the cases.
+		runtime.Gosched()
+
+		rw2.Close()
+		select {
+		case <-done:
+		case <-time.After(200 * time.Millisecond):
+			t.Errorf("write didn't unblock")
+			break loop
+		}
+	}
+}
+
+// This test should panic if concurrent close isn't implemented correctly.
+func TestMsgPipeConcurrentClose(t *testing.T) {
+	rw1, _ := MsgPipe()
+	for i := 0; i < 10; i++ {
+		go rw1.Close()
+	}
+}