good morning!!!!

Skip to content
Snippets Groups Projects
conn.go 5.28 KiB
Newer Older
// +build !js

package websocket
Anmol Sethi's avatar
Anmol Sethi committed

import (
Anmol Sethi's avatar
Anmol Sethi committed
	"bufio"
Anmol Sethi's avatar
Anmol Sethi committed
	"context"
Anmol Sethi's avatar
Anmol Sethi committed
	"errors"
Anmol Sethi's avatar
Anmol Sethi committed
	"fmt"
	"io"
Anmol Sethi's avatar
Anmol Sethi committed
	"runtime"
Anmol Sethi's avatar
Anmol Sethi committed
	"strconv"
Anmol Sethi's avatar
Anmol Sethi committed
	"sync"
	"sync/atomic"
)

// MessageType represents the type of a WebSocket message.
// See https://tools.ietf.org/html/rfc6455#section-5.6
type MessageType int

// MessageType constants.
const (
	// MessageText is for UTF-8 encoded text messages like JSON.
	MessageText MessageType = iota + 1
	// MessageBinary is for binary messages like Protobufs.
	MessageBinary
Anmol Sethi's avatar
Anmol Sethi committed
)

// Conn represents a WebSocket connection.
// All methods may be called concurrently except for Reader and Read.
Anmol Sethi's avatar
Anmol Sethi committed
//
Anmol Sethi's avatar
Anmol Sethi committed
// You must always read from the connection. Otherwise control
Anmol Sethi's avatar
Anmol Sethi committed
// frames will not be handled. See Reader and CloseRead.
Anmol Sethi's avatar
Anmol Sethi committed
//
Anmol Sethi's avatar
Anmol Sethi committed
// Be sure to call Close on the connection when you
// are finished with it to release associated resources.
// On any error from any method, the connection is closed
// with an appropriate reason.
Anmol Sethi's avatar
Anmol Sethi committed
type Conn struct {
	subprotocol string
Anmol Sethi's avatar
Anmol Sethi committed
	rwc         io.ReadWriteCloser
	client      bool
	copts       *compressionOptions
Anmol Sethi's avatar
Anmol Sethi committed
	br          *bufio.Reader
	bw          *bufio.Writer
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
	readTimeout  chan context.Context
	writeTimeout chan context.Context

	// Read state.
Anmol Sethi's avatar
Anmol Sethi committed
	readMu            *mu
	readControlBuf    [maxControlPayload]byte
	msgReader         *msgReader
	readCloseFrameErr error
Anmol Sethi's avatar
Anmol Sethi committed

	// Write state.
	msgWriter    *msgWriter
Anmol Sethi's avatar
Anmol Sethi committed
	writeFrameMu *mu
Anmol Sethi's avatar
Anmol Sethi committed
	writeBuf     []byte
	writeHeader  header
Anmol Sethi's avatar
Anmol Sethi committed
	closed     chan struct{}
	closeMu    sync.Mutex
	closeErr   error
Anmol Sethi's avatar
Anmol Sethi committed
	wroteClose bool
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
	pingCounter   int32
Anmol Sethi's avatar
Anmol Sethi committed
	activePingsMu sync.Mutex
	activePings   map[string]chan<- struct{}
Anmol Sethi's avatar
Anmol Sethi committed
type connConfig struct {
	subprotocol string
	rwc         io.ReadWriteCloser
	client      bool
	copts       *compressionOptions
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
	br *bufio.Reader
Anmol Sethi's avatar
Anmol Sethi committed
	bw *bufio.Writer
Anmol Sethi's avatar
Anmol Sethi committed
}
Anmol Sethi's avatar
Anmol Sethi committed
func newConn(cfg connConfig) *Conn {
	c := &Conn{
		subprotocol: cfg.subprotocol,
		rwc:         cfg.rwc,
		client:      cfg.client,
		copts:       cfg.copts,
Anmol Sethi's avatar
Anmol Sethi committed

		br: cfg.br,
		bw: cfg.bw,

Anmol Sethi's avatar
Anmol Sethi committed
		readTimeout:  make(chan context.Context),
Anmol Sethi's avatar
Anmol Sethi committed
		writeTimeout: make(chan context.Context),

Anmol Sethi's avatar
Anmol Sethi committed
		closed:      make(chan struct{}),
Anmol Sethi's avatar
Anmol Sethi committed
		activePings: make(map[string]chan<- struct{}),
Anmol Sethi's avatar
Anmol Sethi committed
	c.readMu = newMu(c)
	c.writeFrameMu = newMu(c)

Anmol Sethi's avatar
Anmol Sethi committed
	c.msgReader = newMsgReader(c)
Anmol Sethi's avatar
Anmol Sethi committed
	c.msgWriter = newMsgWriter(c)
	if c.client {
		c.writeBuf = extractBufioWriterBuf(c.bw, c.rwc)
	}
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
	runtime.SetFinalizer(c, func(c *Conn) {
Anmol Sethi's avatar
Anmol Sethi committed
		c.close(errors.New("connection garbage collected"))
Anmol Sethi's avatar
Anmol Sethi committed

	return c
Anmol Sethi's avatar
Anmol Sethi committed
}

Anmol Sethi's avatar
Anmol Sethi committed
// Subprotocol returns the negotiated subprotocol.
// An empty string means the default protocol.
func (c *Conn) Subprotocol() string {
	return c.subprotocol
Anmol Sethi's avatar
Anmol Sethi committed
}

Anmol Sethi's avatar
Anmol Sethi committed
func (c *Conn) close(err error) {
Anmol Sethi's avatar
Anmol Sethi committed
	c.closeMu.Lock()
	defer c.closeMu.Unlock()
Anmol Sethi's avatar
Anmol Sethi committed
	if c.isClosed() {
		return
	}
	close(c.closed)
	runtime.SetFinalizer(c, nil)
Anmol Sethi's avatar
Anmol Sethi committed
	c.setCloseErrLocked(err)
Anmol Sethi's avatar
Anmol Sethi committed
	// Have to close after c.closed is closed to ensure any goroutine that wakes up
	// from the connection being closed also sees that c.closed is closed and returns
	// closeErr.
	c.rwc.Close()
Anmol Sethi's avatar
Anmol Sethi committed
	go func() {
Anmol Sethi's avatar
Anmol Sethi committed
		if c.client {
			c.writeFrameMu.Lock(context.Background())
			putBufioWriter(c.bw)
		}
		c.msgWriter.close()

		if c.client {
			c.readMu.Lock(context.Background())
			putBufioReader(c.br)
			c.readMu.Unlock()
		}
		c.msgReader.close()
Anmol Sethi's avatar
Anmol Sethi committed
	}()
func (c *Conn) timeoutLoop() {
	readCtx := context.Background()
	writeCtx := context.Background()
Anmol Sethi's avatar
Anmol Sethi committed
	for {
		select {
		case <-c.closed:
			return
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
		case writeCtx = <-c.writeTimeout:
		case readCtx = <-c.readTimeout:
Anmol Sethi's avatar
Anmol Sethi committed

			c.setCloseErr(fmt.Errorf("read timed out: %w", readCtx.Err()))
Anmol Sethi's avatar
Anmol Sethi committed
			go c.writeError(StatusPolicyViolation, errors.New("timed out"))
Anmol Sethi's avatar
Anmol Sethi committed
			c.close(fmt.Errorf("write timed out: %w", writeCtx.Err()))
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
func (c *Conn) flate() bool {
Anmol Sethi's avatar
Anmol Sethi committed
	return c.copts != nil
Anmol Sethi's avatar
Anmol Sethi committed
}

// Ping sends a ping to the peer and waits for a pong.
// Use this to measure latency or ensure the peer is responsive.
Anmol Sethi's avatar
Anmol Sethi committed
// Ping must be called concurrently with Reader as it does
// not read from the connection but instead waits for a Reader call
// to read the pong.
Anmol Sethi's avatar
Anmol Sethi committed
//
// TCP Keepalives should suffice for most use cases.
Anmol Sethi's avatar
Anmol Sethi committed
func (c *Conn) Ping(ctx context.Context) error {
Anmol Sethi's avatar
Anmol Sethi committed
	p := atomic.AddInt32(&c.pingCounter, 1)
Anmol Sethi's avatar
Anmol Sethi committed
	err := c.ping(ctx, strconv.Itoa(int(p)))
Anmol Sethi's avatar
Anmol Sethi committed
	if err != nil {
Anmol Sethi's avatar
Anmol Sethi committed
		return fmt.Errorf("failed to ping: %w", err)
Anmol Sethi's avatar
Anmol Sethi committed
	}
	return nil
}

func (c *Conn) ping(ctx context.Context, p string) error {
Anmol Sethi's avatar
Anmol Sethi committed
	pong := make(chan struct{})
Anmol Sethi's avatar
Anmol Sethi committed
	c.activePingsMu.Lock()
	c.activePings[p] = pong
	c.activePingsMu.Unlock()
Anmol Sethi's avatar
Anmol Sethi committed

	defer func() {
Anmol Sethi's avatar
Anmol Sethi committed
		c.activePingsMu.Lock()
		delete(c.activePings, p)
		c.activePingsMu.Unlock()
Anmol Sethi's avatar
Anmol Sethi committed
	}()
Anmol Sethi's avatar
Anmol Sethi committed

Anmol Sethi's avatar
Anmol Sethi committed
	err := c.writeControl(ctx, opPing, []byte(p))
Anmol Sethi's avatar
Anmol Sethi committed
	if err != nil {
		return err
	}

	select {
Anmol Sethi's avatar
Anmol Sethi committed
	case <-c.closed:
		return c.closeErr
Anmol Sethi's avatar
Anmol Sethi committed
	case <-ctx.Done():
Anmol Sethi's avatar
Anmol Sethi committed
		err := fmt.Errorf("failed to wait for pong: %w", ctx.Err())
Anmol Sethi's avatar
Anmol Sethi committed
		c.close(err)
Anmol Sethi's avatar
Anmol Sethi committed
		return err
Anmol Sethi's avatar
Anmol Sethi committed
	case <-pong:
		return nil
	}
}
Anmol Sethi's avatar
Anmol Sethi committed
type mu struct {
Anmol Sethi's avatar
Anmol Sethi committed
	c  *Conn
	ch chan struct{}
Anmol Sethi's avatar
Anmol Sethi committed
func newMu(c *Conn) *mu {
	return &mu{
		c:  c,
		ch: make(chan struct{}, 1),
	}
Anmol Sethi's avatar
Anmol Sethi committed
func (m *mu) Lock(ctx context.Context) error {
	select {
Anmol Sethi's avatar
Anmol Sethi committed
	case <-m.c.closed:
		return m.c.closeErr
Anmol Sethi's avatar
Anmol Sethi committed
	case <-ctx.Done():
Anmol Sethi's avatar
Anmol Sethi committed
		err := fmt.Errorf("failed to acquire lock: %w", ctx.Err())
		m.c.close(err)
		return err
Anmol Sethi's avatar
Anmol Sethi committed
	case m.ch <- struct{}{}:
		return nil
Anmol Sethi's avatar
Anmol Sethi committed
}
Anmol Sethi's avatar
Anmol Sethi committed
func (m *mu) TryLock() bool {
	select {
	case m.ch <- struct{}{}:
		return true
	default:
		return false
Anmol Sethi's avatar
Anmol Sethi committed
}
Anmol Sethi's avatar
Anmol Sethi committed
func (m *mu) Unlock() {
Anmol Sethi's avatar
Anmol Sethi committed
	select {
	case <-m.ch:
	default:
	}