Newer
Older
"time"
)
// NetConn converts a *websocket.Conn into a net.Conn.
// It's for tunneling arbitrary protocols over WebSockets.
// Few users of the library will need this but it's tricky to implement
// correctly and so provided in the library.
// See https://github.com/nhooyr/websocket/issues/100.
//
// Every Write to the net.Conn will correspond to a message write of
// the given type on *websocket.Conn.
//
// The passed ctx bounds the lifetime of the net.Conn. If cancelled,
// all reads and writes on the net.Conn will be cancelled.
//
// If a message is read that is not of the correct type, the connection
// will be closed with StatusUnsupportedData and an error will be returned.
// Close will close the *websocket.Conn with StatusNormalClosure.
// When a deadline is hit and there is an active read or write goroutine, the
// connection will be closed. This is different from most net.Conn implementations
// where only the reading/writing goroutines are interrupted but the connection
// is kept alive.
// The Addr methods will return a mock net.Addr that returns "websocket" for Network
// and "websocket/unknown-addr" for String.
// A received StatusNormalClosure or StatusGoingAway close frame will be translated to
// io.EOF when reading.
//
// Furthermore, the ReadLimit is set to -1 to disable it.
func NetConn(ctx context.Context, c *Conn, msgType MessageType) net.Conn {
readMu: newMu(c),
writeMu: newMu(c),
nc.writeCtx, nc.writeCancel = context.WithCancel(ctx)
nc.readCtx, nc.readCancel = context.WithCancel(ctx)
nc.writeTimer = time.AfterFunc(math.MaxInt64, func() {
if !nc.writeMu.tryLock() {
// If the lock cannot be acquired, then there is an
// active write goroutine and so we should cancel the context.
return
}
defer nc.writeMu.unlock()
// Prevents future writes from writing until the deadline is reset.
atomic.StoreInt64(&nc.writeExpired, 1)
})
if !nc.writeTimer.Stop() {
<-nc.writeTimer.C
}
nc.readTimer = time.AfterFunc(math.MaxInt64, func() {
if !nc.readMu.tryLock() {
// If the lock cannot be acquired, then there is an
// active read goroutine and so we should cancel the context.
return
}
defer nc.readMu.unlock()
// Prevents future reads from reading until the deadline is reset.
atomic.StoreInt64(&nc.readExpired, 1)
})
if !nc.readTimer.Stop() {
<-nc.readTimer.C
}
writeMu *mu
writeExpired int64
writeCtx context.Context
readMu *mu
readExpired int64
readCtx context.Context
readEOFed bool
reader io.Reader
func (nc *netConn) Close() error {
return nc.c.Close(StatusNormalClosure, "")
func (nc *netConn) Write(p []byte) (int, error) {
nc.writeMu.forceLock()
defer nc.writeMu.unlock()
if atomic.LoadInt64(&nc.writeExpired) == 1 {
return 0, fmt.Errorf("failed to write: %w", context.DeadlineExceeded)
}
err := nc.c.Write(nc.writeCtx, nc.msgType, p)
if err != nil {
return 0, err
}
return len(p), nil
}
func (nc *netConn) Read(p []byte) (int, error) {
nc.readMu.forceLock()
defer nc.readMu.unlock()
if atomic.LoadInt64(&nc.readExpired) == 1 {
return 0, fmt.Errorf("failed to read: %w", context.DeadlineExceeded)
}
if nc.reader == nil {
typ, r, err := nc.c.Reader(nc.readCtx)
switch CloseStatus(err) {
case StatusNormalClosure, StatusGoingAway:
if typ != nc.msgType {
err := fmt.Errorf("unexpected frame type read (expected %v): %v", nc.msgType, typ)
nc.c.Close(StatusUnsupportedData, err.Error())
n, err := nc.reader.Read(p)
func (a websocketAddr) Network() string {
return "websocket"
func (a websocketAddr) String() string {
return "websocket/unknown-addr"
func (nc *netConn) RemoteAddr() net.Addr {
func (nc *netConn) LocalAddr() net.Addr {
func (nc *netConn) SetDeadline(t time.Time) error {
nc.SetWriteDeadline(t)
nc.SetReadDeadline(t)
func (nc *netConn) SetWriteDeadline(t time.Time) error {
atomic.StoreInt64(&nc.writeExpired, 0)
nc.writeTimer.Reset(t.Sub(time.Now()))
func (nc *netConn) SetReadDeadline(t time.Time) error {
atomic.StoreInt64(&nc.readExpired, 0)
nc.readTimer.Reset(t.Sub(time.Now()))