diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index d36b04ca05c5a218fec0a5a5f229a1d005a38ba2..4d035b7c8a361309296ec3b18e483364c09688e3 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -66,7 +66,9 @@ func (c *Client) Mount(h jsonrpc.Middleware) { func (c *Client) listen() error { var msg json.RawMessage - defer c.cn() + defer func() { + _ = c.Close() + }() dec := json.NewDecoder(bufio.NewReader(c.rd)) for { err := dec.Decode(&msg) @@ -157,7 +159,7 @@ func (c *Client) SetHeader(key string, value string) { func (c *Client) Close() error { c.cn() - return nil + return c.p.Close() } func (c *Client) writeContext(ctx context.Context, xs []byte) error { diff --git a/contrib/codecs/websocket/client.go b/contrib/codecs/websocket/client.go index 7aa5321889b9d3554147b2e1d3eae3ceabe5395f..4847adaaef167bd61b8120fdc94b12f31afb0e7c 100644 --- a/contrib/codecs/websocket/client.go +++ b/contrib/codecs/websocket/client.go @@ -29,8 +29,11 @@ func newClient(conn *websocket.Conn) (*Client, error) { } func (c *Client) Close() error { + if err := c.Client.Close(); err != nil { + return err + } if err := c.conn.Close(websocket.StatusNormalClosure, ""); err != nil { return err } - return c.Client.Close() + return nil } diff --git a/contrib/extension/subscription/client_test.go b/contrib/extension/subscription/client_test.go index b63d6e2fbf5ca5a9380927adbfb69cd622dedbc8..f697a201e26588342267bbdafa6d9b9bc281254d 100644 --- a/contrib/extension/subscription/client_test.go +++ b/contrib/extension/subscription/client_test.go @@ -49,10 +49,7 @@ func newRouter(t *testing.T) jmux.Router { return default: } - err := notifier.Notify(idx) - if err != nil { - t.Error(err) - } + _ = notifier.Notify(idx) } }() }) @@ -172,11 +169,9 @@ func TestCloseClient(t *testing.T) { return } - go func() { - if err := cl.Close(); err != nil { - t.Error(err) - } - }() + time.AfterFunc(10*time.Millisecond, func() { + _ = cl.Close() + }) for { select { diff --git a/pkg/clientutil/idreply.go b/pkg/clientutil/idreply.go index 46ebcc1504aa37e8c669a421027644087a02a633..167ce521046d656894ffcae672c926ddd5ab8cc0 100644 --- a/pkg/clientutil/idreply.go +++ b/pkg/clientutil/idreply.go @@ -3,6 +3,7 @@ package clientutil import ( "context" "io" + "net" "sync" "sync/atomic" @@ -12,6 +13,9 @@ import ( type IdReply struct { id atomic.Int64 + close atomic.Bool + closed chan struct{} + chs map[string]chan msgOrError mu sync.Mutex } @@ -23,7 +27,8 @@ type msgOrError struct { func NewIdReply() *IdReply { return &IdReply{ - chs: make(map[string]chan msgOrError, 1), + closed: make(chan struct{}), + chs: make(map[string]chan msgOrError, 1), } } @@ -94,5 +99,19 @@ func (i *IdReply) Ask(ctx context.Context, id []byte) (io.ReadCloser, error) { case <-ctx.Done(): i.remove(id) return nil, ctx.Err() + case <-i.closed: + return nil, net.ErrClosed + } +} + +func (i *IdReply) Closed() <-chan struct{} { + return i.closed +} + +func (i *IdReply) Close() error { + if i.close.Swap(true) { + return net.ErrClosed } + close(i.closed) + return nil }