Newer
Older
)
// MessageType represents the type of a WebSocket message.
// See https://tools.ietf.org/html/rfc6455#section-5.6
type MessageType int
// MessageType constants.
const (
// MessageText is for UTF-8 encoded text messages like JSON.
MessageText MessageType = iota + 1
// MessageBinary is for binary messages like protobufs.
// All methods may be called concurrently except for Reader and Read.
// You must always read from the connection. Otherwise control
// frames will not be handled. See Reader and CloseRead.
// are finished with it to release associated resources.
// On any error from any method, the connection is closed
// with an appropriate reason.
rwc io.ReadWriteCloser
client bool
copts *compressionOptions
readTimeout chan context.Context
writeTimeout chan context.Context
// Read state.
readMu *mu
readControlBuf [maxControlPayload]byte
msgReader *msgReader
readCloseFrameErr error
activePingsMu sync.Mutex
activePings map[string]chan<- struct{}
type connConfig struct {
subprotocol string
rwc io.ReadWriteCloser
client bool
copts *compressionOptions
c := &Conn{
subprotocol: cfg.subprotocol,
rwc: cfg.rwc,
client: cfg.client,
copts: cfg.copts,
c.msgWriter = newMsgWriter(c)
if c.client {
c.writeBuf = extractBufioWriterBuf(c.bw, c.rwc)
}
go c.timeoutLoop()
// Subprotocol returns the negotiated subprotocol.
// An empty string means the default protocol.
func (c *Conn) Subprotocol() string {
return c.subprotocol
if c.isClosed() {
return
}
close(c.closed)
runtime.SetFinalizer(c, nil)
// Have to close after c.closed is closed to ensure any goroutine that wakes up
// from the connection being closed also sees that c.closed is closed and returns
// closeErr.
c.rwc.Close()
if c.client {
c.writeFrameMu.Lock(context.Background())
putBufioWriter(c.bw)
}
c.msgWriter.close()
if c.client {
c.readMu.Lock(context.Background())
putBufioReader(c.br)
c.readMu.Unlock()
}
c.msgReader.close()
func (c *Conn) timeoutLoop() {
readCtx := context.Background()
writeCtx := context.Background()
case writeCtx = <-c.writeTimeout:
case readCtx = <-c.readTimeout:
case <-readCtx.Done():
c.setCloseErr(fmt.Errorf("read timed out: %w", readCtx.Err()))
go c.writeError(StatusPolicyViolation, errors.New("timed out"))
case <-writeCtx.Done():
}
// Ping sends a ping to the peer and waits for a pong.
// Use this to measure latency or ensure the peer is responsive.
// Ping must be called concurrently with Reader as it does
// not read from the connection but instead waits for a Reader call
// to read the pong.
// TCP Keepalives should suffice for most use cases.
func (c *Conn) ping(ctx context.Context, p string) error {
c.activePingsMu.Lock()
c.activePings[p] = pong
c.activePingsMu.Unlock()
c.activePingsMu.Lock()
delete(c.activePings, p)
c.activePingsMu.Unlock()
err := fmt.Errorf("failed to wait for pong: %w", ctx.Err())
func newMu(c *Conn) *mu {
return &mu{
c: c,
ch: make(chan struct{}, 1),
}
err := fmt.Errorf("failed to acquire lock: %w", ctx.Err())
m.c.close(err)
return err
func (m *mu) TryLock() bool {
select {
case m.ch <- struct{}{}:
return true
default:
return false