diff --git a/contrib/codecs/http/client.go b/contrib/codecs/http/client.go index 137816a15aede372acdfb3ce736d34971340f70d..a9fb41aa8e4b82192ba3534f6a7631517e401a9d 100644 --- a/contrib/codecs/http/client.go +++ b/contrib/codecs/http/client.go @@ -66,6 +66,10 @@ func (c *Client) SetHeader(key string, value string) { c.headers.Set(key, value) } +func (c *Client) Closed() <-chan struct{} { + return make(chan struct{}) +} + func (c *Client) Do(ctx context.Context, result any, method string, params any) error { req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params) if err != nil { diff --git a/contrib/codecs/inproc/client.go b/contrib/codecs/inproc/client.go index f4738905773a42895865c689d1881c789fd060c5..88c325e8cd28d0c19bf7b373d9597e18b598f021 100644 --- a/contrib/codecs/inproc/client.go +++ b/contrib/codecs/inproc/client.go @@ -14,6 +14,9 @@ type Client struct { p *clientutil.IdReply c *Codec + ctx context.Context + cn context.CancelFunc + m codec.Middlewares handler codec.Handler mu sync.Mutex @@ -34,12 +37,19 @@ func NewClient(c *Codec, handler codec.Handler) *Client { c: c, handler: handler, } + cl.ctx, cl.cn = context.WithCancel(context.Background()) go cl.listen() + return cl } +func (c *Client) Closed() <-chan struct{} { + return c.ctx.Done() +} + func (c *Client) listen() error { var msg json.RawMessage + defer c.cn() for { err := json.NewDecoder(c.c.rd).Decode(&msg) if err != nil { diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index 01d100ab7ce339d2004271ee4ea479461f1670db..0637c34373006092911937de5d4ec8fc61c01718 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -47,6 +47,10 @@ func (c *Client) SetHandlerPeer(pi codec.PeerInfo) { c.handlerPeer = pi } +func (c *Client) Closed() <-chan struct{} { + return c.ctx.Done() +} + func (c *Client) Mount(h codec.Middleware) { c.mu.Lock() defer c.mu.Unlock() @@ -58,6 +62,7 @@ func (c *Client) Mount(h codec.Middleware) { func (c *Client) listen() error { var msg json.RawMessage + defer c.cn() for { err := json.NewDecoder(c.rd).Decode(&msg) if err != nil { diff --git a/contrib/codecs/redis/client.go b/contrib/codecs/redis/client.go index af4b12253d61b96f0fa0f3d8f6d3c084c8ef2d1f..8969d63d427601eee990324be6a1abb878aaf68a 100644 --- a/contrib/codecs/redis/client.go +++ b/contrib/codecs/redis/client.go @@ -54,6 +54,10 @@ func NewClient(c redis.UniversalClient, domain string) *Client { return cl } +func (c *Client) Closed() <-chan struct{} { + return c.ctx.Done() +} + func (c *Client) SetHandlerPeer(pi codec.PeerInfo) { c.handlerPeer = pi } diff --git a/pkg/codec/jrpc.go b/pkg/codec/jrpc.go index b36d4df4daa8075002a7ba0d11057c099ede2cd5..6f9bd22ba9549157703b90db829cdb5b27fbf97e 100644 --- a/pkg/codec/jrpc.go +++ b/pkg/codec/jrpc.go @@ -33,12 +33,18 @@ type Mounter interface { Mount(Middleware) } +type Closeder interface { + Closed() <-chan struct{} +} + type Conn interface { Doer BatchCaller - io.Closer Mounter + + io.Closer + Closeder } type StreamingConn interface {