good morning!!!!

Skip to content
Snippets Groups Projects

fix subscription deadlock

Files

@@ -50,7 +50,11 @@ func (c *WrapClient) Middleware(h jsonrpc.Handler) jsonrpc.Handler {
clientSub, ok := c.subs[params.ID]
c.mu.Unlock()
if ok {
clientSub.onmsg <- params.Result
// this could deadlock if we waited on onmsg and the sub was done
select {
case clientSub.onmsg <- params.Result:
case <-clientSub.subdone:
}
}
})
}
@@ -84,24 +88,29 @@ func (c *WrapClient) Subscribe(ctx context.Context, namespace string, channel an
namespace: namespace,
id: result,
channel: chanVal,
// BUG: a worse is better solution... it means that when this fills, you might receive subscriptions in an undefined error
onmsg: make(chan json.RawMessage, 32),
subdone: make(chan struct{}),
readErr: make(chan error),
onmsg: make(chan json.RawMessage),
subdone: make(chan struct{}),
readErr: make(chan error),
}
// will get the type of the event
etype := chanVal.Type().Elem()
go func() {
defer func() {
// close if possible
if sub.done.CompareAndSwap(false, true) {
close(sub.subdone)
}
// we're done reading
close(sub.readErr)
}()
for {
select {
case <-sub.subdone:
// sub is done, so close readErr
close(sub.readErr)
return
case params, ok := <-sub.onmsg:
if !ok {
close(sub.readErr)
return
}
val := reflect.New(etype)
@@ -111,9 +120,23 @@ func (c *WrapClient) Subscribe(ctx context.Context, namespace string, channel an
return
}
// and now send the elem
sub.channel.Send(val.Elem())
// this could deadlock if the client stopped waiting on the chan and unsubscribed
reflect.Select([]reflect.SelectCase{
{
Dir: reflect.SelectSend,
Chan: sub.channel,
Send: val.Elem(),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(sub.subdone),
},
})
case <-ctx.Done():
close(sub.readErr)
return
}
}
Loading