diff --git a/contrib/extension/subscription/engine.go b/contrib/extension/subscription/engine.go index e8d1aa65999fbb2e83d400c67d1906817f255338..369c9ae8520a92b09bb921289180711b3e53f4e5 100644 --- a/contrib/extension/subscription/engine.go +++ b/contrib/extension/subscription/engine.go @@ -55,10 +55,9 @@ func (e *Engine) Middleware() func(jsonrpc.Handler) jsonrpc.Handler { e.mu.Lock() e.subscriptions[n.id] = n e.mu.Unlock() - // now send the subscription id back - w.Send(n.id, nil) // then inject the notifier r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n)) + // start serving the request/sub h.ServeRPC(w, r) case strings.HasSuffix(r.Method, serviceMethodSeparator+unsubscribeMethodSuffix): // read the subscription id to close diff --git a/contrib/extension/subscription/subscription.go b/contrib/extension/subscription/subscription.go index f970f2420606eaea17aba11d23df30150751d04b..bd9159a880559413f0eea07a0ad5f9f5fc470c7d 100644 --- a/contrib/extension/subscription/subscription.go +++ b/contrib/extension/subscription/subscription.go @@ -35,7 +35,7 @@ var ( // ErrNotificationNotFound is returned when the notification for the given id is not found ErrSubscriptionNotFound = errors.New("subscription not found") // ErrNotificationNotFound is returned when the notification for the given id is not found - ErrSubscriptionClosed = errors.New("subscription not found") + ErrSubscriptionClosed = errors.New("subscription closed. not found") ) var globalInc = atomic.Int64{} @@ -110,6 +110,11 @@ func (n *Notifier) Err() <-chan error { func (n *Notifier) send(data json.RawMessage) error { params, _ := jjson.Marshal(&subscriptionResult{ID: string(n.id), Result: data}) + // try to send the id back. this will just fail with errAlreadySent if its already been sent. + // so it is safe-ish to just ignore this error + // technically we should check for jsonrpc.ErrSendAlreadyCalled and then error earlier otherwise... but is that really right? + _ = n.h.Send(n.id, nil) + err := n.h.Notify( n.namespace+ serviceMethodSeparator+