diff --git a/README.md b/README.md
index 06263e4c15a174394e370da3d5a1e647fe9dd5e6..9c44e4e1445f99cf27aee868cf2b16af6fd69641 100644
--- a/README.md
+++ b/README.md
@@ -22,12 +22,11 @@ go get nhooyr.io/websocket@v0.2.0
 - Zero dependencies outside of the stdlib for the core library
 - JSON and ProtoBuf helpers in the wsjson and wspb subpackages
 - High performance
-- Concurrent writes
+- Concurrent reads and writes out of the box
 
 ## Roadmap
 
 - [ ] WebSockets over HTTP/2 [#4](https://github.com/nhooyr/websocket/issues/4)
-- [ ] Deflate extension support [#5](https://github.com/nhooyr/websocket/issues/5)
 
 ## Examples
 
@@ -86,11 +85,11 @@ c.Close(websocket.StatusNormalClosure, "")
 - A minimal API is easier to maintain due to less docs, tests and bugs
 - A minimal API is also easier to use and learn
 - Context based cancellation is more ergonomic and robust than setting deadlines
-- No ping support because TCP keep alives work fine for HTTP/1.1 and they do not make
-  sense with HTTP/2 (see [#1](https://github.com/nhooyr/websocket/issues/1))
 - net.Conn is never exposed as WebSocket over HTTP/2 will not have a net.Conn.
 - Using net/http's Client for dialing means we do not have to reinvent dialing hooks
   and configurations like other WebSocket libraries
+- We do not support the compression extension because Go's compress/flate library is very memory intensive
+  and browsers do not handle WebSocket compression intelligently. See [#5](https://github.com/nhooyr/websocket/issues/5)
 
 ## Comparison
 
@@ -122,8 +121,8 @@ also uses net/http's Client and ResponseWriter directly for WebSocket handshakes
 gorilla/websocket writes its handshakes to the underlying net.Conn which means
 it has to reinvent hooks for TLS and proxies and prevents support of HTTP/2.
 
-Some more advantages of nhooyr/websocket are that it supports concurrent writes and makes it
-very easy to close the connection with a status code and reason.
+Some more advantages of nhooyr/websocket are that it supports concurrent reads,
+writes and makes it very easy to close the connection with a status code and reason.
 
 In terms of performance, the only difference is nhooyr/websocket is forced to use one extra
 goroutine for context.Context support. Otherwise, they perform identically.
diff --git a/accept.go b/accept.go
index 2cf1dc017a11a56473e7da7c8184cc127a637109..a80f70aa97aeaf0815088cb72f1a4b3246c160a9 100644
--- a/accept.go
+++ b/accept.go
@@ -1,8 +1,10 @@
 package websocket
 
 import (
+	"bytes"
 	"crypto/sha1"
 	"encoding/base64"
+	"io"
 	"net/http"
 	"net/textproto"
 	"net/url"
@@ -75,8 +77,12 @@ func verifyClientRequest(w http.ResponseWriter, r *http.Request) error {
 
 // Accept accepts a WebSocket handshake from a client and upgrades the
 // the connection to WebSocket.
+//
 // Accept will reject the handshake if the Origin domain is not the same as the Host unless
 // the InsecureSkipVerify option is set.
+//
+// The returned connection will be bound by r.Context(). Use c.Context() to change
+// the bounding context.
 func Accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn, error) {
 	c, err := accept(w, r, opts)
 	if err != nil {
@@ -125,6 +131,10 @@ func accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn,
 		return nil, err
 	}
 
+	// https://github.com/golang/go/issues/32314
+	b, _ := brw.Reader.Peek(brw.Reader.Buffered())
+	brw.Reader.Reset(io.MultiReader(bytes.NewReader(b), netConn))
+
 	c := &Conn{
 		subprotocol: w.Header().Get("Sec-WebSocket-Protocol"),
 		br:          brw.Reader,
@@ -132,6 +142,7 @@ func accept(w http.ResponseWriter, r *http.Request, opts AcceptOptions) (*Conn,
 		closer:      netConn,
 	}
 	c.init()
+	c.Context(r.Context())
 
 	return c, nil
 }
diff --git a/ci/.codecov.yml b/ci/.codecov.yml
new file mode 100644
index 0000000000000000000000000000000000000000..7d614ef3af467bf98254851a701c114a8a5da03c
--- /dev/null
+++ b/ci/.codecov.yml
@@ -0,0 +1,9 @@
+coverage:
+  status:
+    # Prevent small changes in coverage from failing CI.
+    project:
+      default:
+        threshold: 5
+    patch:
+      default:
+        threshold: 5
diff --git a/ci/lint/entrypoint.sh b/ci/lint/entrypoint.sh
index c539495ee30efb24850810379991a1fd4d4248fe..09c3168322beecaa4a665a3f3e0d7180bea42432 100755
--- a/ci/lint/entrypoint.sh
+++ b/ci/lint/entrypoint.sh
@@ -7,5 +7,5 @@ source ci/lib.sh || exit 1
 	shellcheck ./**/*.sh
 )
 
-go vet -composites=false ./...
+go vet -composites=false -lostcancel=false ./...
 go run golang.org/x/lint/golint -set_exit_status ./...
diff --git a/dial.go b/dial.go
index 3c7e71db902ddea97977c8193296a361012dd9bc..53acd32ce6207c4d19d1d9c2e53087dd754c2d17 100644
--- a/dial.go
+++ b/dial.go
@@ -10,6 +10,7 @@ import (
 	"net/http"
 	"net/url"
 	"strings"
+	"sync"
 
 	"golang.org/x/xerrors"
 )
@@ -112,8 +113,8 @@ func dial(ctx context.Context, u string, opts DialOptions) (_ *Conn, _ *http.Res
 
 	c := &Conn{
 		subprotocol: resp.Header.Get("Sec-WebSocket-Protocol"),
-		br:          bufio.NewReader(rwc),
-		bw:          bufio.NewWriter(rwc),
+		br:          getBufioReader(rwc),
+		bw:          getBufioWriter(rwc),
 		closer:      rwc,
 		client:      true,
 	}
@@ -140,3 +141,38 @@ func verifyServerResponse(resp *http.Response) error {
 
 	return nil
 }
+
+// The below pools can only be used by the client because http.Hijacker will always
+// have a bufio.Reader/Writer for us so it doesn't make sense to use a pool on top.
+
+var bufioReaderPool = sync.Pool{
+	New: func() interface{} {
+		return bufio.NewReader(nil)
+	},
+}
+
+func getBufioReader(r io.Reader) *bufio.Reader {
+	br := bufioReaderPool.Get().(*bufio.Reader)
+	br.Reset(r)
+	return br
+}
+
+func returnBufioReader(br *bufio.Reader) {
+	bufioReaderPool.Put(br)
+}
+
+var bufioWriterPool = sync.Pool{
+	New: func() interface{} {
+		return bufio.NewWriter(nil)
+	},
+}
+
+func getBufioWriter(w io.Writer) *bufio.Writer {
+	bw := bufioWriterPool.Get().(*bufio.Writer)
+	bw.Reset(w)
+	return bw
+}
+
+func returnBufioWriter(bw *bufio.Writer) {
+	bufioWriterPool.Put(bw)
+}
diff --git a/example_echo_test.go b/example_echo_test.go
index ab0e8e70c9e7c3828b518a45f34754508c54d116..405c7a4167f66b1e1c1094335d137f6dc391f394 100644
--- a/example_echo_test.go
+++ b/example_echo_test.go
@@ -51,6 +51,7 @@ func Example_echo() {
 
 	// Now we dial the server, send the messages and echo the responses.
 	err = client("ws://" + l.Addr().String())
+	time.Sleep(time.Second)
 	if err != nil {
 		log.Fatalf("client failed: %v", err)
 	}
@@ -66,6 +67,8 @@ func Example_echo() {
 // It ensures the client speaks the echo subprotocol and
 // only allows one message every 100ms with a 10 message burst.
 func echoServer(w http.ResponseWriter, r *http.Request) error {
+	log.Printf("serving %v", r.RemoteAddr)
+
 	c, err := websocket.Accept(w, r, websocket.AcceptOptions{
 		Subprotocols: []string{"echo"},
 	})
@@ -83,7 +86,7 @@ func echoServer(w http.ResponseWriter, r *http.Request) error {
 	for {
 		err = echo(r.Context(), c, l)
 		if err != nil {
-			return xerrors.Errorf("failed to echo: %w", err)
+			return xerrors.Errorf("failed to echo with %v: %w", r.RemoteAddr, err)
 		}
 	}
 }
@@ -91,7 +94,6 @@ func echoServer(w http.ResponseWriter, r *http.Request) error {
 // echo reads from the websocket connection and then writes
 // the received message back to it.
 // The entire function has 10s to complete.
-// The received message is limited to 32768 bytes.
 func echo(ctx context.Context, c *websocket.Conn, l *rate.Limiter) error {
 	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
 	defer cancel()
@@ -105,7 +107,6 @@ func echo(ctx context.Context, c *websocket.Conn, l *rate.Limiter) error {
 	if err != nil {
 		return err
 	}
-	r = io.LimitReader(r, 32768)
 
 	w, err := c.Writer(ctx, typ)
 	if err != nil {
diff --git a/export_test.go b/export_test.go
deleted file mode 100644
index d180e119cac2fe896220300d3c4f71afcd13f63e..0000000000000000000000000000000000000000
--- a/export_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package websocket
-
-import (
-	"context"
-)
-
-// Write writes p as a single data frame to the connection. This is an optimization
-// method for when the entire message is in memory and does not need to be streamed
-// to the peer via Writer.
-//
-// This prevents the allocation of the Writer.
-// Furthermore Writer always has to write an additional fin frame when Close is
-// called on the writer which can result in worse performance if the full message
-// exceeds the buffer size which is 4096 right now as then an extra syscall
-// will be necessary to complete the message.
-func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error {
-	return c.writeSingleFrame(ctx, opcode(typ), p)
-}
diff --git a/go.mod b/go.mod
index f747eecfdd5a4dcd61f7a8c01bff5a2a495a568d..cc9a865d6146ac1d7a1990ac0c148f7f9cd42668 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,6 @@ require (
 	golang.org/x/text v0.3.2 // indirect
 	golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
 	golang.org/x/tools v0.0.0-20190429184909-35c670923e21
-	golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18
+	golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522
 	mvdan.cc/sh v2.6.4+incompatible
 )
diff --git a/go.sum b/go.sum
index 63aaa2a54432983b48ed3cfc17e133694b2017e6..7965958d1d33cea476ce0692b038ba951336d207 100644
--- a/go.sum
+++ b/go.sum
@@ -28,7 +28,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190429184909-35c670923e21 h1:Kjcw+D2LTzLmxOHrMK9uvYP/NigJ0EdwMgzt6EU+Ghs=
 golang.org/x/tools v0.0.0-20190429184909-35c670923e21/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18 h1:1AGvnywFL1aB5KLRxyLseWJI6aSYPo3oF7HSpXdWQdU=
-golang.org/x/xerrors v0.0.0-20190315151331-d61658bd2e18/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4=
+golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 mvdan.cc/sh v2.6.4+incompatible h1:eD6tDeh0pw+/TOTI1BBEryZ02rD2nMcFsgcvde7jffM=
 mvdan.cc/sh v2.6.4+incompatible/go.mod h1:IeeQbZq+x2SUGBensq/jge5lLQbS3XT2ktyp3wrt4x8=
diff --git a/statuscode.go b/statuscode.go
index d4223745d3e7a33ecae74a49d8c81ff5faa1b240..c7b20367046bebea6d757ff0169cbdb833d7dd94 100644
--- a/statuscode.go
+++ b/statuscode.go
@@ -49,7 +49,7 @@ type CloseError struct {
 }
 
 func (ce CloseError) Error() string {
-	return fmt.Sprintf("websocket closed with status = %v and reason = %q", ce.Code, ce.Reason)
+	return fmt.Sprintf("status = %v and reason = %q", ce.Code, ce.Reason)
 }
 
 func parseClosePayload(p []byte) (CloseError, error) {
diff --git a/websocket.go b/websocket.go
index 912508d5635321679d8c2459a66035fac11ab9fb..db2e82e70c78f9347f64e33ec967210727d3d800 100644
--- a/websocket.go
+++ b/websocket.go
@@ -5,7 +5,11 @@ import (
 	"context"
 	"fmt"
 	"io"
+	"io/ioutil"
+	"math/rand"
+	"os"
 	"runtime"
+	"strconv"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -13,13 +17,9 @@ import (
 	"golang.org/x/xerrors"
 )
 
-type frame struct {
-	opcode  opcode
-	payload []byte
-}
-
 // Conn represents a WebSocket connection.
-// All methods except Reader can be used concurrently.
+// All methods may be called concurrently.
+//
 // Please be sure to call Close on the connection when you
 // are finished with it to release resources.
 type Conn struct {
@@ -29,35 +29,52 @@ type Conn struct {
 	closer      io.Closer
 	client      bool
 
+	msgReadLimit int64
+
 	closeOnce sync.Once
 	closeErr  error
 	closed    chan struct{}
 
-	// Writers should send on write to begin sending
-	// a message and then follow that up with some data
-	// on writeBytes.
-	// Send on control to write a control message.
-	// writeDone will be sent back when the message is written
-	// Send on writeFlush to flush the message and wait for a
-	// ping on writeDone.
-	// writeDone will be closed if the data message write errors.
-	write      chan MessageType
-	control    chan frame
-	fastWrite  chan frame
-	writeBytes chan []byte
-	writeDone  chan struct{}
-	writeFlush chan struct{}
-
-	// Readers should receive on read to begin reading a message.
-	// Then send a byte slice to readBytes to read into it.
-	// The n of bytes read will be sent on readDone once the read into a slice is complete.
-	// readDone will be closed if the read fails.
-	// activeReader will be set to 0 on io.EOF.
-	activeReader int64
-	inMsg        bool
-	read         chan opcode
-	readBytes    chan []byte
-	readDone     chan int
+	writeDataLock  chan struct{}
+	writeFrameLock chan struct{}
+
+	readMsgLock   chan struct{}
+	readMsg       chan header
+	readMsgDone   chan struct{}
+	readFrameLock chan struct{}
+
+	setReadTimeout  chan context.Context
+	setWriteTimeout chan context.Context
+	setConnContext  chan context.Context
+	getConnContext  chan context.Context
+
+	pingListenerMu sync.Mutex
+	pingListener   map[string]chan<- struct{}
+}
+
+// Context returns a context derived from parent that will be cancelled
+// when the connection is closed or broken.
+// If the parent context is cancelled, the connection will be closed.
+//
+// This is an experimental API that may be removed in the future.
+// Please let me know how you feel about it in https://github.com/nhooyr/websocket/issues/79
+func (c *Conn) Context(parent context.Context) context.Context {
+	select {
+	case <-c.closed:
+		ctx, cancel := context.WithCancel(parent)
+		cancel()
+		return ctx
+	case c.setConnContext <- parent:
+	}
+
+	select {
+	case <-c.closed:
+		ctx, cancel := context.WithCancel(parent)
+		cancel()
+		return ctx
+	case ctx := <-c.getConnContext:
+		return ctx
+	}
 }
 
 func (c *Conn) close(err error) {
@@ -72,6 +89,17 @@ func (c *Conn) close(err error) {
 		c.closeErr = xerrors.Errorf("websocket closed: %w", cerr)
 
 		close(c.closed)
+
+		// This ensures every goroutine that interacts
+		// with the conn closes before it can interact with the connection
+		c.readFrameLock <- struct{}{}
+		c.writeFrameLock <- struct{}{}
+
+		// See comment in dial.go
+		if c.client {
+			returnBufioReader(c.br)
+			returnBufioWriter(c.bw)
+		}
 	})
 }
 
@@ -84,124 +112,119 @@ func (c *Conn) Subprotocol() string {
 func (c *Conn) init() {
 	c.closed = make(chan struct{})
 
-	c.write = make(chan MessageType)
-	c.control = make(chan frame)
-	c.fastWrite = make(chan frame)
-	c.writeBytes = make(chan []byte)
-	c.writeDone = make(chan struct{})
-	c.writeFlush = make(chan struct{})
+	c.msgReadLimit = 32768
 
-	c.read = make(chan opcode)
-	c.readBytes = make(chan []byte)
-	c.readDone = make(chan int)
+	c.writeDataLock = make(chan struct{}, 1)
+	c.writeFrameLock = make(chan struct{}, 1)
+
+	c.readMsg = make(chan header)
+	c.readMsgDone = make(chan struct{})
+	c.readMsgLock = make(chan struct{}, 1)
+	c.readFrameLock = make(chan struct{}, 1)
+
+	c.setReadTimeout = make(chan context.Context)
+	c.setWriteTimeout = make(chan context.Context)
+	c.setConnContext = make(chan context.Context)
+	c.getConnContext = make(chan context.Context)
+
+	c.pingListener = make(map[string]chan<- struct{})
 
 	runtime.SetFinalizer(c, func(c *Conn) {
 		c.close(xerrors.New("connection garbage collected"))
 	})
 
-	go c.writeLoop()
+	go c.timeoutLoop()
 	go c.readLoop()
 }
 
 // We never mask inside here because our mask key is always 0,0,0,0.
 // See comment on secWebSocketKey.
-func (c *Conn) writeFrame(h header, p []byte) {
-	b2 := marshalHeader(h)
-	_, err := c.bw.Write(b2)
+func (c *Conn) writeFrame(ctx context.Context, h header, p []byte) (err error) {
+	err = c.acquireLock(ctx, c.writeFrameLock)
 	if err != nil {
-		c.close(xerrors.Errorf("failed to write to connection: %w", err))
-		return
+		return err
 	}
+	defer c.releaseLock(c.writeFrameLock)
 
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-c.closed:
+		return c.closeErr
+	case c.setWriteTimeout <- ctx:
+	}
+	defer func() {
+		// We have to remove the write timeout, even if ctx is cancelled.
+		select {
+		case <-c.closed:
+			return
+		case c.setWriteTimeout <- context.Background():
+		}
+	}()
+
+	defer func() {
+		if err != nil {
+			// We need to always release the lock first before closing the connection to ensure
+			// the lock can be acquired inside close.
+			c.releaseLock(c.writeFrameLock)
+			c.close(err)
+		}
+	}()
+
+	h.masked = c.client
+	h.payloadLength = int64(len(p))
+
+	b2 := marshalHeader(h)
+	_, err = c.bw.Write(b2)
+	if err != nil {
+		return xerrors.Errorf("failed to write to connection: %w", err)
+	}
 	_, err = c.bw.Write(p)
 	if err != nil {
-		c.close(xerrors.Errorf("failed to write to connection: %w", err))
-		return
+		return xerrors.Errorf("failed to write to connection: %w", err)
+
 	}
 
 	if h.fin {
 		err := c.bw.Flush()
 		if err != nil {
-			c.close(xerrors.Errorf("failed to write to connection: %w", err))
-			return
+			return xerrors.Errorf("failed to write to connection: %w", err)
 		}
 	}
-}
 
-func (c *Conn) writeLoopFastWrite(frame frame) {
-	h := header{
-		fin:           true,
-		opcode:        frame.opcode,
-		payloadLength: int64(len(frame.payload)),
-		masked:        c.client,
-	}
-	c.writeFrame(h, frame.payload)
-	select {
-	case <-c.closed:
-	case c.writeDone <- struct{}{}:
-	}
+	return nil
 }
 
-func (c *Conn) writeLoop() {
-	defer close(c.writeDone)
+func (c *Conn) timeoutLoop() {
+	readCtx := context.Background()
+	writeCtx := context.Background()
+	parentCtx := context.Background()
+	cancelCtx := func() {}
+	defer func() {
+		// We do not defer cancelCtx directly because its value may change.
+		cancelCtx()
+	}()
 
-messageLoop:
 	for {
-		var dataType MessageType
 		select {
 		case <-c.closed:
 			return
-		case control := <-c.control:
-			c.writeLoopFastWrite(control)
-			continue
-		case frame := <-c.fastWrite:
-			c.writeLoopFastWrite(frame)
-			continue
-		case dataType = <-c.write:
-		}
-
-		var firstSent bool
-		for {
+		case writeCtx = <-c.setWriteTimeout:
+		case readCtx = <-c.setReadTimeout:
+		case <-readCtx.Done():
+			c.close(xerrors.Errorf("data read timed out: %w", readCtx.Err()))
+		case <-writeCtx.Done():
+			c.close(xerrors.Errorf("data write timed out: %w", writeCtx.Err()))
+		case <-parentCtx.Done():
+			c.close(xerrors.Errorf("parent context cancelled: %w", parentCtx.Err()))
+			return
+		case parentCtx = <-c.setConnContext:
+			var ctx context.Context
+			ctx, cancelCtx = context.WithCancel(parentCtx)
 			select {
 			case <-c.closed:
 				return
-			case control := <-c.control:
-				c.writeLoopFastWrite(control)
-			case b := <-c.writeBytes:
-				h := header{
-					fin:           false,
-					opcode:        opcode(dataType),
-					payloadLength: int64(len(b)),
-					masked:        c.client,
-				}
-
-				if firstSent {
-					h.opcode = opContinuation
-				}
-				firstSent = true
-
-				c.writeFrame(h, b)
-
-				select {
-				case <-c.closed:
-					return
-				case c.writeDone <- struct{}{}:
-				}
-			case <-c.writeFlush:
-				h := header{
-					fin:           true,
-					opcode:        opcode(dataType),
-					payloadLength: 0,
-					masked:        c.client,
-				}
-
-				if firstSent {
-					h.opcode = opContinuation
-				}
-
-				c.writeFrame(h, nil)
-
-				continue messageLoop
+			case c.getConnContext <- ctx:
 			}
 		}
 	}
@@ -233,6 +256,12 @@ func (c *Conn) handleControl(h header) {
 	case opPing:
 		c.writePong(b)
 	case opPong:
+		c.pingListenerMu.Lock()
+		listener, ok := c.pingListener[string(b)]
+		c.pingListenerMu.Unlock()
+		if ok {
+			close(listener)
+		}
 	case opClose:
 		ce, err := parseClosePayload(b)
 		if err != nil {
@@ -249,19 +278,20 @@ func (c *Conn) handleControl(h header) {
 	}
 }
 
-func (c *Conn) readLoop() {
-	defer close(c.readDone)
-
+func (c *Conn) readTillData() (header, error) {
 	for {
-		h, err := readHeader(c.br)
+		h, err := c.readHeader()
 		if err != nil {
-			c.close(xerrors.Errorf("failed to read header: %w", err))
-			return
+			return header{}, err
 		}
 
 		if h.rsv1 || h.rsv2 || h.rsv3 {
-			c.Close(StatusProtocolError, fmt.Sprintf("received header with rsv bits set: %v:%v:%v", h.rsv1, h.rsv2, h.rsv3))
-			return
+			ce := CloseError{
+				Code:   StatusProtocolError,
+				Reason: fmt.Sprintf("received header with rsv bits set: %v:%v:%v", h.rsv1, h.rsv2, h.rsv3),
+			}
+			c.Close(ce.Code, ce.Reason)
+			return header{}, ce
 		}
 
 		if h.opcode.controlOp() {
@@ -270,89 +300,75 @@ func (c *Conn) readLoop() {
 		}
 
 		switch h.opcode {
-		case opBinary, opText:
-			if c.inMsg {
-				c.Close(StatusProtocolError, "cannot read new data frame when previous frame is not finished")
-				return
-			}
-
-			select {
-			case <-c.closed:
-				return
-			case c.read <- h.opcode:
-				c.inMsg = true
-			}
-		case opContinuation:
-			if !c.inMsg {
-				c.Close(StatusProtocolError, "continuation frame not after data or text frame")
-				return
-			}
+		case opBinary, opText, opContinuation:
+			return h, nil
 		default:
-			c.Close(StatusProtocolError, fmt.Sprintf("unknown opcode %v", h.opcode))
-			return
+			ce := CloseError{
+				Code:   StatusProtocolError,
+				Reason: fmt.Sprintf("unknown opcode %v", h.opcode),
+			}
+			c.Close(ce.Code, ce.Reason)
+			return header{}, ce
 		}
+	}
+}
+
+func (c *Conn) readHeader() (header, error) {
+	err := c.acquireLock(context.Background(), c.readFrameLock)
+	if err != nil {
+		return header{}, err
+	}
+	defer c.releaseLock(c.readFrameLock)
 
-		err = c.dataReadLoop(h)
+	h, err := readHeader(c.br)
+	if err != nil {
+		return header{}, xerrors.Errorf("failed to read header: %w", err)
+	}
+
+	return h, nil
+}
+
+func (c *Conn) readLoop() {
+	for {
+		h, err := c.readTillData()
 		if err != nil {
-			c.close(xerrors.Errorf("failed to read from connection: %w", err))
+			c.close(err)
 			return
 		}
-	}
-}
 
-func (c *Conn) dataReadLoop(h header) error {
-	maskPos := 0
-	left := h.payloadLength
-	firstReadDone := false
-	for left > 0 || !firstReadDone {
 		select {
 		case <-c.closed:
-			return c.closeErr
-		case b := <-c.readBytes:
-			if int64(len(b)) > left {
-				b = b[:left]
-			}
-
-			_, err := io.ReadFull(c.br, b)
-			if err != nil {
-				return xerrors.Errorf("failed to read from connection: %w", err)
-			}
-			left -= int64(len(b))
-
-			if h.masked {
-				maskPos = fastXOR(h.maskKey, maskPos, b)
-			}
-
-			// Must set this before we signal the read is done.
-			// The reader will use this to return io.EOF and
-			// c.Read will use it to check if the reader has been completed.
-			if left == 0 && h.fin {
-				atomic.StoreInt64(&c.activeReader, 0)
-				c.inMsg = false
-			}
+			return
+		case c.readMsg <- h:
+		}
 
-			select {
-			case <-c.closed:
-				return c.closeErr
-			case c.readDone <- len(b):
-				firstReadDone = true
-			}
+		select {
+		case <-c.closed:
+			return
+		case <-c.readMsgDone:
 		}
 	}
-
-	return nil
 }
 
 func (c *Conn) writePong(p []byte) error {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
 	defer cancel()
 
-	err := c.writeSingleFrame(ctx, opPong, p)
+	err := c.writeMessage(ctx, opPong, p)
 	return err
 }
 
 // Close closes the WebSocket connection with the given status code and reason.
+//
 // It will write a WebSocket close frame with a timeout of 5 seconds.
+// The connection can only be closed once. Additional calls to Close
+// are no-ops.
+//
+// The maximum length of reason must be 125 bytes otherwise an internal
+// error will be sent to the peer. For this reason, you should avoid
+// sending a dynamic reason.
+//
+// Close will unblock all goroutines interacting with the connection.
 func (c *Conn) Close(code StatusCode, reason string) error {
 	err := c.exportedClose(code, reason)
 	if err != nil {
@@ -372,24 +388,21 @@ func (c *Conn) exportedClose(code StatusCode, reason string) error {
 	// Definitely worth seeing what popular browsers do later.
 	p, err := ce.bytes()
 	if err != nil {
+		fmt.Fprintf(os.Stderr, "websocket: failed to marshal close frame: %v\n", err)
 		ce = CloseError{
 			Code: StatusInternalError,
 		}
 		p, _ = ce.bytes()
 	}
 
-	cerr := c.writeClose(p, ce)
-	if err != nil {
-		return err
-	}
-	return cerr
+	return c.writeClose(p, ce)
 }
 
 func (c *Conn) writeClose(p []byte, cerr CloseError) error {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
 	defer cancel()
 
-	err := c.writeSingleFrame(ctx, opClose, p)
+	err := c.writeMessage(ctx, opClose, p)
 
 	c.close(cerr)
 
@@ -404,33 +417,44 @@ func (c *Conn) writeClose(p []byte, cerr CloseError) error {
 	return nil
 }
 
-func (c *Conn) writeSingleFrame(ctx context.Context, opcode opcode, p []byte) error {
-	ch := c.fastWrite
-	if opcode.controlOp() {
-		ch = c.control
-	}
+func (c *Conn) acquireLock(ctx context.Context, lock chan struct{}) error {
 	select {
-	case <-c.closed:
-		return c.closeErr
-	case ch <- frame{
-		opcode:  opcode,
-		payload: p,
-	}:
 	case <-ctx.Done():
-		c.close(xerrors.Errorf("control frame write timed out: %w", ctx.Err()))
 		return ctx.Err()
-	}
-
-	select {
 	case <-c.closed:
 		return c.closeErr
-	case <-c.writeDone:
+	case lock <- struct{}{}:
 		return nil
-	case <-ctx.Done():
-		return ctx.Err()
 	}
 }
 
+func (c *Conn) releaseLock(lock chan struct{}) {
+	// Allow multiple releases.
+	select {
+	case <-lock:
+	default:
+	}
+}
+
+func (c *Conn) writeMessage(ctx context.Context, opcode opcode, p []byte) error {
+	if !opcode.controlOp() {
+		err := c.acquireLock(ctx, c.writeDataLock)
+		if err != nil {
+			return err
+		}
+		defer c.releaseLock(c.writeDataLock)
+	}
+
+	err := c.writeFrame(ctx, header{
+		fin:    true,
+		opcode: opcode,
+	}, p)
+	if err != nil {
+		return xerrors.Errorf("failed to write frame: %v", err)
+	}
+	return nil
+}
+
 // Writer returns a writer bounded by the context that will write
 // a WebSocket message of type dataType to the connection.
 //
@@ -447,27 +471,57 @@ func (c *Conn) Writer(ctx context.Context, typ MessageType) (io.WriteCloser, err
 }
 
 func (c *Conn) writer(ctx context.Context, typ MessageType) (io.WriteCloser, error) {
-	select {
-	case <-c.closed:
-		return nil, c.closeErr
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	case c.write <- typ:
-		return messageWriter{
-			ctx: ctx,
-			c:   c,
-		}, nil
+	err := c.acquireLock(ctx, c.writeDataLock)
+	if err != nil {
+		return nil, err
 	}
+	return &messageWriter{
+		ctx:    ctx,
+		opcode: opcode(typ),
+		c:      c,
+	}, nil
+}
+
+// Read is a convenience method to read a single message from the connection.
+//
+// See the Reader method if you want to be able to reuse buffers or want to stream a message.
+//
+// This is an experimental API, please let me know how you feel about it in
+// https://github.com/nhooyr/websocket/issues/62
+func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
+	typ, r, err := c.Reader(ctx)
+	if err != nil {
+		return 0, nil, err
+	}
+
+	b, err := ioutil.ReadAll(r)
+	if err != nil {
+		return typ, b, err
+	}
+
+	return typ, b, nil
+}
+
+// Write is a convenience method to write a message to the connection.
+//
+// See the Writer method if you want to stream a message.
+//
+// This is an experimental API, please let me know how you feel about it in
+// https://github.com/nhooyr/websocket/issues/62
+func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error {
+	return c.writeMessage(ctx, opcode(typ), p)
 }
 
 // messageWriter enables writing to a WebSocket connection.
 type messageWriter struct {
-	ctx context.Context
-	c   *Conn
+	ctx    context.Context
+	opcode opcode
+	c      *Conn
+	closed bool
 }
 
 // Write writes the given bytes to the WebSocket connection.
-func (w messageWriter) Write(p []byte) (int, error) {
+func (w *messageWriter) Write(p []byte) (int, error) {
 	n, err := w.write(p)
 	if err != nil {
 		return n, xerrors.Errorf("failed to write: %w", err)
@@ -475,31 +529,23 @@ func (w messageWriter) Write(p []byte) (int, error) {
 	return n, nil
 }
 
-func (w messageWriter) write(p []byte) (int, error) {
-	select {
-	case <-w.c.closed:
-		return 0, w.c.closeErr
-	case w.c.writeBytes <- p:
-		select {
-		case <-w.ctx.Done():
-			w.c.close(xerrors.Errorf("data write timed out: %w", w.ctx.Err()))
-			// Wait for writeLoop to complete so we know p is done with.
-			<-w.c.writeDone
-			return 0, w.ctx.Err()
-		case _, ok := <-w.c.writeDone:
-			if !ok {
-				return 0, w.c.closeErr
-			}
-			return len(p), nil
-		}
-	case <-w.ctx.Done():
-		return 0, w.ctx.Err()
+func (w *messageWriter) write(p []byte) (int, error) {
+	if w.closed {
+		return 0, xerrors.Errorf("cannot use closed writer")
+	}
+	err := w.c.writeFrame(w.ctx, header{
+		opcode: w.opcode,
+	}, p)
+	if err != nil {
+		return 0, err
 	}
+	w.opcode = opContinuation
+	return len(p), nil
 }
 
 // Close flushes the frame to the connection.
 // This must be called for every messageWriter.
-func (w messageWriter) Close() error {
+func (w *messageWriter) Close() error {
 	err := w.close()
 	if err != nil {
 		return xerrors.Errorf("failed to close writer: %w", err)
@@ -507,15 +553,22 @@ func (w messageWriter) Close() error {
 	return nil
 }
 
-func (w messageWriter) close() error {
-	select {
-	case <-w.c.closed:
-		return w.c.closeErr
-	case <-w.ctx.Done():
-		return w.ctx.Err()
-	case w.c.writeFlush <- struct{}{}:
-		return nil
+func (w *messageWriter) close() error {
+	if w.closed {
+		return xerrors.Errorf("cannot use closed writer")
+	}
+	w.closed = true
+
+	err := w.c.writeFrame(w.ctx, header{
+		fin:    true,
+		opcode: w.opcode,
+	}, nil)
+	if err != nil {
+		return err
 	}
+
+	w.c.releaseLock(w.c.writeDataLock)
+	return nil
 }
 
 // Reader will wait until there is a WebSocket data message to read from the connection.
@@ -524,8 +577,7 @@ func (w messageWriter) close() error {
 //
 // Your application must keep reading messages for the Conn to automatically respond to ping
 // and close frames and not become stuck waiting for a data message to be read.
-// Please ensure to read the full message from io.Reader. If you do not read till
-// io.EOF, the connection will break unless the next read would have yielded io.EOF.
+// Please ensure to read the full message from io.Reader.
 //
 // You can only read a single message at a time so do not call this method
 // concurrently.
@@ -534,53 +586,53 @@ func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) {
 	if err != nil {
 		return 0, nil, xerrors.Errorf("failed to get reader: %w", err)
 	}
-	return typ, r, nil
+	return typ, io.LimitReader(r, c.msgReadLimit), nil
 }
 
-func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) {
-	if !atomic.CompareAndSwapInt64(&c.activeReader, 0, 1) {
-		// If the next read yields io.EOF we are good to go.
-		r := messageReader{
-			ctx: ctx,
-			c:   c,
-		}
-		_, err := r.Read(nil)
-		if err == nil {
-			return 0, nil, xerrors.New("previous message not fully read")
-		}
-		if !xerrors.Is(err, io.EOF) {
-			return 0, nil, xerrors.Errorf("failed to check if last message at io.EOF: %w", err)
-		}
-
-		atomic.StoreInt64(&c.activeReader, 1)
+func (c *Conn) reader(ctx context.Context) (_ MessageType, _ io.Reader, err error) {
+	err = c.acquireLock(ctx, c.readMsgLock)
+	if err != nil {
+		return 0, nil, err
 	}
 
 	select {
 	case <-c.closed:
 		return 0, nil, c.closeErr
-	case opcode := <-c.read:
-		return MessageType(opcode), messageReader{
+	case <-ctx.Done():
+		return 0, nil, ctx.Err()
+	case h := <-c.readMsg:
+		if h.opcode == opContinuation {
+			ce := CloseError{
+				Code:   StatusProtocolError,
+				Reason: "continuation frame not after data or text frame",
+			}
+			c.Close(ce.Code, ce.Reason)
+			return 0, nil, ce
+		}
+		return MessageType(h.opcode), &messageReader{
 			ctx: ctx,
+			h:   &h,
 			c:   c,
 		}, nil
-	case <-ctx.Done():
-		return 0, nil, ctx.Err()
 	}
 }
 
 // messageReader enables reading a data frame from the WebSocket connection.
 type messageReader struct {
-	ctx context.Context
-	c   *Conn
+	ctx     context.Context
+	maskPos int
+	h       *header
+	c       *Conn
+	eofed   bool
 }
 
 // Read reads as many bytes as possible into p.
-func (r messageReader) Read(p []byte) (int, error) {
+func (r *messageReader) Read(p []byte) (int, error) {
 	n, err := r.read(p)
 	if err != nil {
 		// Have to return io.EOF directly for now, we cannot wrap as xerrors
 		// isn't used in stdlib.
-		if err == io.EOF {
+		if xerrors.Is(err, io.EOF) {
 			return n, io.EOF
 		}
 		return n, xerrors.Errorf("failed to read: %w", err)
@@ -588,31 +640,129 @@ func (r messageReader) Read(p []byte) (int, error) {
 	return n, nil
 }
 
-func (r messageReader) read(p []byte) (_ int, err error) {
-	if atomic.LoadInt64(&r.c.activeReader) == 0 {
-		return 0, io.EOF
+func (r *messageReader) read(p []byte) (int, error) {
+	if r.eofed {
+		return 0, xerrors.Errorf("cannot use EOFed reader")
+	}
+
+	if r.h == nil {
+		select {
+		case <-r.c.closed:
+			return 0, r.c.closeErr
+		case h := <-r.c.readMsg:
+			if h.opcode != opContinuation {
+				ce := CloseError{
+					Code:   StatusProtocolError,
+					Reason: "cannot read new data frame when previous frame is not finished",
+				}
+				r.c.Close(ce.Code, ce.Reason)
+				return 0, ce
+			}
+			r.h = &h
+		}
+	}
+
+	if int64(len(p)) > r.h.payloadLength {
+		p = p[:r.h.payloadLength]
 	}
 
 	select {
 	case <-r.c.closed:
 		return 0, r.c.closeErr
-	case r.c.readBytes <- p:
+	case r.c.setReadTimeout <- r.ctx:
+	}
+
+	err := r.c.acquireLock(r.ctx, r.c.readFrameLock)
+	if err != nil {
+		return 0, err
+	}
+	n, err := io.ReadFull(r.c.br, p)
+	r.c.releaseLock(r.c.readFrameLock)
+
+	select {
+	case <-r.c.closed:
+		return 0, r.c.closeErr
+	case r.c.setReadTimeout <- context.Background():
+	}
+
+	r.h.payloadLength -= int64(n)
+	if r.h.masked {
+		r.maskPos = fastXOR(r.h.maskKey, r.maskPos, p)
+	}
+
+	if err != nil {
+		r.c.close(xerrors.Errorf("failed to read control frame payload: %w", err))
+		return n, r.c.closeErr
+	}
+
+	if r.h.payloadLength == 0 {
 		select {
-		case <-r.ctx.Done():
-			r.c.close(xerrors.Errorf("data read timed out: %w", r.ctx.Err()))
-			// Wait for readLoop to complete so we know p is done.
-			<-r.c.readDone
-			return 0, r.ctx.Err()
-		case n, ok := <-r.c.readDone:
-			if !ok {
-				return 0, r.c.closeErr
-			}
-			if atomic.LoadInt64(&r.c.activeReader) == 0 {
-				return n, io.EOF
-			}
-			return n, nil
+		case <-r.c.closed:
+			return n, r.c.closeErr
+		case r.c.readMsgDone <- struct{}{}:
+		}
+		if r.h.fin {
+			r.eofed = true
+			r.c.releaseLock(r.c.readMsgLock)
+			return n, io.EOF
 		}
-	case <-r.ctx.Done():
-		return 0, r.ctx.Err()
+		r.maskPos = 0
+		r.h = nil
+	}
+
+	return n, nil
+}
+
+// SetReadLimit sets the max number of bytes to read for a single message.
+// It applies to the Reader and Read methods.
+//
+// By default, the connection has a message read limit of 32768 bytes.
+func (c *Conn) SetReadLimit(n int64) {
+	atomic.StoreInt64(&c.msgReadLimit, n)
+}
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+}
+
+// Ping sends a ping to the peer and waits for a pong.
+// Use this to measure latency or ensure the peer is responsive.
+//
+// This API is experimental and subject to change.
+// Please provide feedback in https://github.com/nhooyr/websocket/issues/1.
+func (c *Conn) Ping(ctx context.Context) error {
+	err := c.ping(ctx)
+	if err != nil {
+		return xerrors.Errorf("failed to ping: %w", err)
+	}
+	return nil
+}
+
+func (c *Conn) ping(ctx context.Context) error {
+	id := rand.Uint64()
+	p := strconv.FormatUint(id, 10)
+
+	pong := make(chan struct{})
+
+	c.pingListenerMu.Lock()
+	c.pingListener[p] = pong
+	c.pingListenerMu.Unlock()
+
+	defer func() {
+		c.pingListenerMu.Lock()
+		delete(c.pingListener, p)
+		c.pingListenerMu.Unlock()
+	}()
+
+	err := c.writeMessage(ctx, opPing, []byte(p))
+	if err != nil {
+		return err
+	}
+
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-pong:
+		return nil
 	}
 }
diff --git a/websocket_test.go b/websocket_test.go
index 8d18c738bb73c1ca7d940aed309b50ba19269de9..f1905c30fbb93ecdd813deff9a592ef3cc1642e2 100644
--- a/websocket_test.go
+++ b/websocket_test.go
@@ -293,10 +293,6 @@ func TestHandshake(t *testing.T) {
 				if err != nil {
 					return err
 				}
-				err = write()
-				if err != nil {
-					return err
-				}
 
 				c.Close(websocket.StatusNormalClosure, "")
 				return nil
@@ -329,11 +325,6 @@ func TestHandshake(t *testing.T) {
 				if err != nil {
 					return err
 				}
-				// Read twice to ensure the un EOFed previous reader works correctly.
-				err = read()
-				if err != nil {
-					return err
-				}
 
 				c.Close(websocket.StatusNormalClosure, "")
 				return nil
@@ -385,6 +376,39 @@ func TestHandshake(t *testing.T) {
 				return nil
 			},
 		},
+		{
+			name: "ping",
+			server: func(w http.ResponseWriter, r *http.Request) error {
+				c, err := websocket.Accept(w, r, websocket.AcceptOptions{})
+				if err != nil {
+					return err
+				}
+				defer c.Close(websocket.StatusInternalError, "")
+
+				err = c.Ping(r.Context())
+				if err != nil {
+					return err
+				}
+
+				c.Close(websocket.StatusNormalClosure, "")
+				return nil
+			},
+			client: func(ctx context.Context, u string) error {
+				c, _, err := websocket.Dial(ctx, u, websocket.DialOptions{})
+				if err != nil {
+					return err
+				}
+				defer c.Close(websocket.StatusInternalError, "")
+
+				err = c.Ping(ctx)
+				if err != nil {
+					return err
+				}
+
+				c.Close(websocket.StatusNormalClosure, "")
+				return nil
+			},
+		},
 	}
 
 	for _, tc := range testCases {
@@ -498,6 +522,8 @@ func TestAutobahnServer(t *testing.T) {
 func echoLoop(ctx context.Context, c *websocket.Conn) {
 	defer c.Close(websocket.StatusInternalError, "")
 
+	c.SetReadLimit(1 << 40)
+
 	ctx, cancel := context.WithTimeout(ctx, time.Minute)
 	defer cancel()
 
@@ -766,6 +792,11 @@ func benchConn(b *testing.B, echo, stream bool, size int) {
 			if err != nil {
 				b.Fatal(err)
 			}
+
+			_, err = r.Read([]byte{0})
+			if !xerrors.Is(err, io.EOF) {
+				b.Fatalf("more data in reader than needed")
+			}
 		}
 	}
 	b.StopTimer()
diff --git a/wsjson/wsjson.go b/wsjson/wsjson.go
index 9dd61bd012523f4938b9dd0e6a5086113b1bdd3e..d85700bc55c458a7f4ba44bf10806201800e4f4f 100644
--- a/wsjson/wsjson.go
+++ b/wsjson/wsjson.go
@@ -12,8 +12,6 @@ import (
 )
 
 // Read reads a json message from c into v.
-// For security reasons, it will not read messages
-// larger than 32768 bytes.
 func Read(ctx context.Context, c *websocket.Conn, v interface{}) error {
 	err := read(ctx, c, v)
 	if err != nil {
@@ -33,14 +31,23 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error {
 		return xerrors.Errorf("unexpected frame type for json (expected %v): %v", websocket.MessageText, typ)
 	}
 
-	r = io.LimitReader(r, 32768)
-
 	d := json.NewDecoder(r)
 	err = d.Decode(v)
 	if err != nil {
 		return xerrors.Errorf("failed to decode json: %w", err)
 	}
 
+	// Have to ensure we read till EOF.
+	// Unfortunate but necessary evil for now. Can improve later.
+	// The code to do this automatically gets complicated fast because
+	// we support concurrent reading.
+	// So the Reader has to synchronize with Read somehow.
+	// Maybe its best to bring back the old readLoop?
+	_, err = r.Read([]byte{0})
+	if !xerrors.Is(err, io.EOF) {
+		return xerrors.Errorf("more data than needed in reader")
+	}
+
 	return nil
 }
 
diff --git a/wspb/wspb.go b/wspb/wspb.go
index 90a0d0462c9689779ef633e3f0b53e1e810306ae..edffede18b98ea905e6ac23c9c287c46fcd421e5 100644
--- a/wspb/wspb.go
+++ b/wspb/wspb.go
@@ -3,7 +3,6 @@ package wspb
 
 import (
 	"context"
-	"io"
 	"io/ioutil"
 
 	"github.com/golang/protobuf/proto"
@@ -13,8 +12,6 @@ import (
 )
 
 // Read reads a protobuf message from c into v.
-// For security reasons, it will not read messages
-// larger than 32768 bytes.
 func Read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
 	err := read(ctx, c, v)
 	if err != nil {
@@ -34,8 +31,6 @@ func read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
 		return xerrors.Errorf("unexpected frame type for protobuf (expected %v): %v", websocket.MessageBinary, typ)
 	}
 
-	r = io.LimitReader(r, 32768)
-
 	b, err := ioutil.ReadAll(r)
 	if err != nil {
 		return xerrors.Errorf("failed to read message: %w", err)
@@ -64,19 +59,5 @@ func write(ctx context.Context, c *websocket.Conn, v proto.Message) error {
 		return xerrors.Errorf("failed to marshal protobuf: %w", err)
 	}
 
-	w, err := c.Writer(ctx, websocket.MessageBinary)
-	if err != nil {
-		return err
-	}
-
-	_, err = w.Write(b)
-	if err != nil {
-		return err
-	}
-
-	err = w.Close()
-	if err != nil {
-		return err
-	}
-	return nil
+	return c.Write(ctx, websocket.MessageBinary, b)
 }