diff --git a/ci/wasm.sh b/ci/wasm.sh index 0290f188df5f31132fbbd065235a84caeb8456bb..1497ba2407cc12ef2b38270343d6a778536cf99b 100755 --- a/ci/wasm.sh +++ b/ci/wasm.sh @@ -26,5 +26,7 @@ GOOS=js GOARCH=wasm go test -exec=wasmbrowsertest ./... -args "$WS_ECHO_SERVER_U if ! wait "$wsjstestPID"; then echo "wsjstest exited unsuccessfully" + echo "output:" + cat "$wsjstestOut" exit 1 fi diff --git a/conn.go b/conn.go index 3d7d574efcd29b34c65fe309399d7f152b259f54..2679edcc911584b59762ae04708d805bc90f4337 100644 --- a/conn.go +++ b/conn.go @@ -20,8 +20,7 @@ import ( ) // Conn represents a WebSocket connection. -// All methods may be called concurrently except for Reader, Read -// and SetReadLimit. +// All methods may be called concurrently except for Reader and Read. // // You must always read from the connection. Otherwise control // frames will not be handled. See the docs on Reader and CloseRead. @@ -56,7 +55,7 @@ type Conn struct { writeHeaderBuf []byte writeHeader *header // read limit for a message in bytes. - msgReadLimit int64 + msgReadLimit *atomicInt64 // Used to ensure a previous writer is not used after being closed. activeWriter atomic.Value @@ -70,7 +69,7 @@ type Conn struct { activeReader *messageReader // readFrameLock is acquired to read from bw. readFrameLock chan struct{} - readClosed int64 + readClosed *atomicInt64 readHeaderBuf []byte controlPayloadBuf []byte @@ -90,7 +89,8 @@ type Conn struct { func (c *Conn) init() { c.closed = make(chan struct{}) - c.msgReadLimit = 32768 + c.msgReadLimit = &atomicInt64{} + c.msgReadLimit.Store(32768) c.writeMsgLock = make(chan struct{}, 1) c.writeFrameLock = make(chan struct{}, 1) @@ -105,6 +105,7 @@ func (c *Conn) init() { c.writeHeaderBuf = makeWriteHeaderBuf() c.writeHeader = &header{} c.readHeaderBuf = makeReadHeaderBuf() + c.readClosed = &atomicInt64{} c.controlPayloadBuf = make([]byte, maxControlFramePayload) runtime.SetFinalizer(c, func(c *Conn) { @@ -341,7 +342,7 @@ func (c *Conn) handleControl(ctx context.Context, h header) error { // See https://github.com/nhooyr/websocket/issues/87#issue-451703332 // Most users should not need this. func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) { - if atomic.LoadInt64(&c.readClosed) == 1 { + if c.readClosed.Load() == 1 { return 0, nil, fmt.Errorf("websocket connection read closed") } @@ -391,7 +392,7 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) { c.readerMsgHeader = h c.readerFrameEOF = false c.readerMaskPos = 0 - c.readMsgLeft = c.msgReadLimit + c.readMsgLeft = c.msgReadLimit.Load() r := &messageReader{ c: c, diff --git a/conn_common.go b/conn_common.go index ae0fe55498610b3508dd175d034d535436afcb89..471461106b3f8f2d01175fbae25c493d43adc1d5 100644 --- a/conn_common.go +++ b/conn_common.go @@ -178,7 +178,7 @@ func (c *netConn) SetReadDeadline(t time.Time) error { // Use this when you do not want to read data messages from the connection anymore but will // want to write messages to it. func (c *Conn) CloseRead(ctx context.Context) context.Context { - atomic.StoreInt64(&c.readClosed, 1) + c.readClosed.Store(1) ctx, cancel := context.WithCancel(ctx) go func() { @@ -200,7 +200,7 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context { // // When the limit is hit, the connection will be closed with StatusMessageTooBig. func (c *Conn) SetReadLimit(n int64) { - c.msgReadLimit = n + c.msgReadLimit.Store(n) } func (c *Conn) setCloseErr(err error) { @@ -208,3 +208,24 @@ func (c *Conn) setCloseErr(err error) { c.closeErr = fmt.Errorf("websocket closed: %w", err) }) } + +// See https://github.com/nhooyr/websocket/issues/153 +type atomicInt64 struct { + v atomic.Value +} + +func (v *atomicInt64) Load() int64 { + i, ok := v.v.Load().(int64) + if !ok { + return 0 + } + return i +} + +func (v *atomicInt64) Store(i int64) { + v.v.Store(i) +} + +func (v *atomicInt64) String() string { + return fmt.Sprint(v.v.Load()) +} diff --git a/websocket_js.go b/websocket_js.go index 3822797b3f953d0a449d2ae0558ead13f2d3a270..2226d3a44e4e70c5544d292649fadd3b0d9a9c00 100644 --- a/websocket_js.go +++ b/websocket_js.go @@ -10,7 +10,6 @@ import ( "reflect" "runtime" "sync" - "sync/atomic" "syscall/js" "nhooyr.io/websocket/internal/bpool" @@ -21,9 +20,10 @@ import ( type Conn struct { ws wsjs.WebSocket - msgReadLimit int64 + // read limit for a message in bytes. + msgReadLimit *atomicInt64 - readClosed int64 + readClosed *atomicInt64 closeOnce sync.Once closed chan struct{} closeErrOnce sync.Once @@ -49,7 +49,11 @@ func (c *Conn) close(err error) { func (c *Conn) init() { c.closed = make(chan struct{}) c.readSignal = make(chan struct{}, 1) - c.msgReadLimit = 32768 + + c.msgReadLimit = &atomicInt64{} + c.msgReadLimit.Store(32768) + + c.readClosed = &atomicInt64{} c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) { cerr := CloseError{ @@ -89,7 +93,7 @@ func (c *Conn) closeWithInternal() { // Read attempts to read a message from the connection. // The maximum time spent waiting is bounded by the context. func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { - if atomic.LoadInt64(&c.readClosed) == 1 { + if c.readClosed.Load() == 1 { return 0, nil, fmt.Errorf("websocket connection read closed") } @@ -97,7 +101,7 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) { if err != nil { return 0, nil, fmt.Errorf("failed to read: %w", err) } - if int64(len(p)) > c.msgReadLimit { + if int64(len(p)) > c.msgReadLimit.Load() { c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit)) return 0, nil, c.closeErr }