good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit a67c16b6 authored by a's avatar a
Browse files

magic

parent 3d4ce91b
No related branches found
No related tags found
No related merge requests found
Pipeline #24786 failed
...@@ -66,6 +66,10 @@ func (c *Client) SetHeader(key string, value string) { ...@@ -66,6 +66,10 @@ func (c *Client) SetHeader(key string, value string) {
c.headers.Set(key, value) c.headers.Set(key, value)
} }
func (c *Client) Closed() <-chan struct{} {
return make(chan struct{})
}
func (c *Client) Do(ctx context.Context, result any, method string, params any) error { func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params) req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params)
if err != nil { if err != nil {
......
...@@ -14,6 +14,9 @@ type Client struct { ...@@ -14,6 +14,9 @@ type Client struct {
p *clientutil.IdReply p *clientutil.IdReply
c *Codec c *Codec
ctx context.Context
cn context.CancelFunc
m codec.Middlewares m codec.Middlewares
handler codec.Handler handler codec.Handler
mu sync.Mutex mu sync.Mutex
...@@ -34,12 +37,19 @@ func NewClient(c *Codec, handler codec.Handler) *Client { ...@@ -34,12 +37,19 @@ func NewClient(c *Codec, handler codec.Handler) *Client {
c: c, c: c,
handler: handler, handler: handler,
} }
cl.ctx, cl.cn = context.WithCancel(context.Background())
go cl.listen() go cl.listen()
return cl return cl
} }
func (c *Client) Closed() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) listen() error { func (c *Client) listen() error {
var msg json.RawMessage var msg json.RawMessage
defer c.cn()
for { for {
err := json.NewDecoder(c.c.rd).Decode(&msg) err := json.NewDecoder(c.c.rd).Decode(&msg)
if err != nil { if err != nil {
......
...@@ -47,6 +47,10 @@ func (c *Client) SetHandlerPeer(pi codec.PeerInfo) { ...@@ -47,6 +47,10 @@ func (c *Client) SetHandlerPeer(pi codec.PeerInfo) {
c.handlerPeer = pi c.handlerPeer = pi
} }
func (c *Client) Closed() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) Mount(h codec.Middleware) { func (c *Client) Mount(h codec.Middleware) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
...@@ -58,6 +62,7 @@ func (c *Client) Mount(h codec.Middleware) { ...@@ -58,6 +62,7 @@ func (c *Client) Mount(h codec.Middleware) {
func (c *Client) listen() error { func (c *Client) listen() error {
var msg json.RawMessage var msg json.RawMessage
defer c.cn()
for { for {
err := json.NewDecoder(c.rd).Decode(&msg) err := json.NewDecoder(c.rd).Decode(&msg)
if err != nil { if err != nil {
......
...@@ -54,6 +54,10 @@ func NewClient(c redis.UniversalClient, domain string) *Client { ...@@ -54,6 +54,10 @@ func NewClient(c redis.UniversalClient, domain string) *Client {
return cl return cl
} }
func (c *Client) Closed() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) SetHandlerPeer(pi codec.PeerInfo) { func (c *Client) SetHandlerPeer(pi codec.PeerInfo) {
c.handlerPeer = pi c.handlerPeer = pi
} }
......
...@@ -33,12 +33,18 @@ type Mounter interface { ...@@ -33,12 +33,18 @@ type Mounter interface {
Mount(Middleware) Mount(Middleware)
} }
type Closeder interface {
Closed() <-chan struct{}
}
type Conn interface { type Conn interface {
Doer Doer
BatchCaller BatchCaller
io.Closer
Mounter Mounter
io.Closer
Closeder
} }
type StreamingConn interface { type StreamingConn interface {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment