From a67c16b6575a97b2464a26fc3ef7369da423753f Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Fri, 4 Aug 2023 04:03:22 -0500 Subject: [PATCH] magic --- contrib/codecs/http/client.go | 4 ++++ contrib/codecs/inproc/client.go | 10 ++++++++++ contrib/codecs/rdwr/client.go | 5 +++++ contrib/codecs/redis/client.go | 4 ++++ pkg/codec/jrpc.go | 8 +++++++- 5 files changed, 30 insertions(+), 1 deletion(-) diff --git a/contrib/codecs/http/client.go b/contrib/codecs/http/client.go index 137816a..a9fb41a 100644 --- a/contrib/codecs/http/client.go +++ b/contrib/codecs/http/client.go @@ -66,6 +66,10 @@ func (c *Client) SetHeader(key string, value string) { c.headers.Set(key, value) } +func (c *Client) Closed() <-chan struct{} { + return make(chan struct{}) +} + func (c *Client) Do(ctx context.Context, result any, method string, params any) error { req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params) if err != nil { diff --git a/contrib/codecs/inproc/client.go b/contrib/codecs/inproc/client.go index f473890..88c325e 100644 --- a/contrib/codecs/inproc/client.go +++ b/contrib/codecs/inproc/client.go @@ -14,6 +14,9 @@ type Client struct { p *clientutil.IdReply c *Codec + ctx context.Context + cn context.CancelFunc + m codec.Middlewares handler codec.Handler mu sync.Mutex @@ -34,12 +37,19 @@ func NewClient(c *Codec, handler codec.Handler) *Client { c: c, handler: handler, } + cl.ctx, cl.cn = context.WithCancel(context.Background()) go cl.listen() + return cl } +func (c *Client) Closed() <-chan struct{} { + return c.ctx.Done() +} + func (c *Client) listen() error { var msg json.RawMessage + defer c.cn() for { err := json.NewDecoder(c.c.rd).Decode(&msg) if err != nil { diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index 01d100a..0637c34 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -47,6 +47,10 @@ func (c *Client) SetHandlerPeer(pi codec.PeerInfo) { c.handlerPeer = pi } +func (c *Client) Closed() <-chan struct{} { + return c.ctx.Done() +} + func (c *Client) Mount(h codec.Middleware) { c.mu.Lock() defer c.mu.Unlock() @@ -58,6 +62,7 @@ func (c *Client) Mount(h codec.Middleware) { func (c *Client) listen() error { var msg json.RawMessage + defer c.cn() for { err := json.NewDecoder(c.rd).Decode(&msg) if err != nil { diff --git a/contrib/codecs/redis/client.go b/contrib/codecs/redis/client.go index af4b122..8969d63 100644 --- a/contrib/codecs/redis/client.go +++ b/contrib/codecs/redis/client.go @@ -54,6 +54,10 @@ func NewClient(c redis.UniversalClient, domain string) *Client { return cl } +func (c *Client) Closed() <-chan struct{} { + return c.ctx.Done() +} + func (c *Client) SetHandlerPeer(pi codec.PeerInfo) { c.handlerPeer = pi } diff --git a/pkg/codec/jrpc.go b/pkg/codec/jrpc.go index b36d4df..6f9bd22 100644 --- a/pkg/codec/jrpc.go +++ b/pkg/codec/jrpc.go @@ -33,12 +33,18 @@ type Mounter interface { Mount(Middleware) } +type Closeder interface { + Closed() <-chan struct{} +} + type Conn interface { Doer BatchCaller - io.Closer Mounter + + io.Closer + Closeder } type StreamingConn interface { -- GitLab