diff --git a/client.go b/client.go index 97adec9afb87248d622c0b055eb0dbdf5e0cdc4b..6463ef51bd8a8159538663c83a432d13884dfb3f 100644 --- a/client.go +++ b/client.go @@ -69,7 +69,7 @@ type Client struct { // writeConn is used for writing to the connection on the caller's goroutine. It should // only be accessed outside of dispatch, with the write lock held. The write lock is // taken by sending on reqInit and released by sending on reqSent. - writeConn jsonWriter + writeConn JsonWriter // for dispatch close chan struct{} @@ -109,7 +109,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { func (cc *clientConn) close(err error, inflightReq *requestOp) { cc.handler.close(err, inflightReq) - cc.codec.close() + cc.codec.Close() } type readOp struct { @@ -233,15 +233,16 @@ func (c *Client) SupportedModules() (map[string]string, error) { } // Close closes the client, aborting any in-flight requests. -func (c *Client) Close() { +func (c *Client) Close() error { if c.isHTTP { - return + return nil } select { case c.close <- struct{}{}: <-c.didClose case <-c.didClose: } + return nil } // SetHeader adds a custom HTTP header to the client's requests. @@ -517,7 +518,7 @@ func (c *Client) reconnect(ctx context.Context) error { c.writeConn = newconn return nil case <-c.didClose: - newconn.close() + newconn.Close() return ErrClientQuit } } diff --git a/handler.go b/handler.go index 7154be94ab9b5adc4d46f65a36e9dd62c40fd9be..8536c36668e5d1251993f4ad8e90a7e5ff2dfba7 100644 --- a/handler.go +++ b/handler.go @@ -54,7 +54,7 @@ type handler struct { callWG sync.WaitGroup // pending call goroutines rootCtx context.Context // canceled by close() cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent + conn JsonWriter // where responses will be sent log *zlog.Logger subLock sync.RWMutex @@ -70,7 +70,7 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, reg Router) *handler { +func newHandler(connCtx context.Context, conn JsonWriter, reg Router) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ peer: PeerInfoFromContext(connCtx), diff --git a/http.go b/http.go index 3045457139b1fc721684314bfa7821c2f4f3840e..1e5ae1ae0a3f87f833b4a3562fa6114a2cf6e435 100644 --- a/http.go +++ b/http.go @@ -77,11 +77,12 @@ func (hc *httpConn) ReadBatch() ([]*jsonrpcMessage, bool, error) { return nil, false, io.EOF } -func (hc *httpConn) close() { +func (hc *httpConn) Close() error { hc.closeOnce.Do(func() { close(hc.closeCh) }) + return nil } -func (hc *httpConn) closed() <-chan any { +func (hc *httpConn) Closed() <-chan any { return hc.closeCh } @@ -275,18 +276,15 @@ func (c *httpServerConn) WriteJSON(ctx context.Context, v any) error { return c.jc.WriteJSON(ctx, v) } -func (c *httpServerConn) close() { - c.jc.close() +func (c *httpServerConn) Close() error { + return nil } // Closed returns a channel which will be closed when Close is called -func (c *httpServerConn) closed() <-chan any { - return c.jc.closed() +func (c *httpServerConn) Closed() <-chan any { + return c.jc.Closed() } -// Close does nothing and always returns nil. -func (t *httpServerConn) Close() error { return nil } - // RemoteAddr returns the peer address of the underlying connection. func (t *httpServerConn) RemoteAddr() string { return t.PeerInfo().RemoteAddr @@ -373,7 +371,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", contentType) codec := newHTTPServerConn(r, w, connInfo) - defer codec.close() + defer codec.Close() s.serveSingleRequest(ctx, codec) } diff --git a/json.go b/json.go index 20e0b10a04244ed6363a33a82b0371af16bdf200..0ea97ce2e77d2b86cbed535f228cc8a215efa247 100644 --- a/json.go +++ b/json.go @@ -255,15 +255,16 @@ func (c *jsonCodec) WriteJSON(ctx context.Context, v any) error { return c.encode(v) } -func (c *jsonCodec) close() { +func (c *jsonCodec) Close() error { c.closer.Do(func() { close(c.closeCh) c.conn.Close() }) + return nil } // Closed returns a channel which will be closed when Close is called -func (c *jsonCodec) closed() <-chan any { +func (c *jsonCodec) Closed() <-chan any { return c.closeCh } diff --git a/server.go b/server.go index b80012275dd0b1e0a43c9236c9662dbc25b4312a..da3eeef2e79947fa380aaf5ed23ac07b8d4bc953 100644 --- a/server.go +++ b/server.go @@ -50,7 +50,7 @@ func (s *Server) Router() Router { // the response back using the given codec. It will block until the codec is closed or the // server is stopped. In either case the codec is closed. func (s *Server) ServeCodec(codec ServerCodec) { - defer codec.close() + defer codec.Close() // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { @@ -62,7 +62,7 @@ func (s *Server) ServeCodec(codec ServerCodec) { defer s.codecs.Remove(codec) c := initClient(codec, s.services) - <-codec.closed() + <-codec.Closed() c.Close() } @@ -100,7 +100,7 @@ func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { log.Debug().Msg("RPC server shutting down") s.codecs.Each(func(c any) bool { - c.(ServerCodec).close() + c.(ServerCodec).Close() return true }) } diff --git a/subscription.go b/subscription.go index 686d408e851ae0f03572c1fc122e8302b646a080..313cb2617f70e33b18409885c5eff6fcff5f11f9 100644 --- a/subscription.go +++ b/subscription.go @@ -128,7 +128,7 @@ func (n *Notifier) Notify(id SubID, data interface{}) error { // Closed returns a channel that is closed when the RPC connection is closed. // Deprecated: use subscription error channel func (n *Notifier) Closed() <-chan interface{} { - return n.h.conn.closed() + return n.h.conn.Closed() } // takeSubscription returns the subscription (if one has been created). No subscription can diff --git a/types.go b/types.go index 6f8bee586728fcf8d7a1dbf7b9e2a19926bbb9f5..1f7070d411a4fcf41ed7c5f5db8586aa9e794d9d 100644 --- a/types.go +++ b/types.go @@ -30,17 +30,17 @@ import ( type ServerCodec interface { PeerInfo() PeerInfo ReadBatch() (msgs []*jsonrpcMessage, isBatch bool, err error) - close() + Close() error - jsonWriter + JsonWriter } // jsonWriter can write JSON messages to its underlying connection. // Implementations must be safe for concurrent use. -type jsonWriter interface { +type JsonWriter interface { WriteJSON(context.Context, any) error // Closed returns a channel which is closed when the connection is closed. - closed() <-chan any + Closed() <-chan any // RemoteAddr returns the peer address of the connection. RemoteAddr() string } diff --git a/websocket.go b/websocket.go index f064764ed136192cdb3df2dc0c6b6db7f1adcd87..7c9b049c0754e286c1cde6d600f71dc3d816b492 100644 --- a/websocket.go +++ b/websocket.go @@ -189,9 +189,10 @@ func newWebsocketCodec(ctx context.Context, c *websocket.Conn, host string, req return wc } -func (wc *websocketCodec) close() { - wc.jsonCodec.close() +func (wc *websocketCodec) Close() error { + wc.jsonCodec.Close() wc.conn.CloseRead(context.Background()) + return nil } func (wc *websocketCodec) PeerInfo() PeerInfo {