diff --git a/contrib/extension/subscription/client.go b/contrib/extension/subscription/client.go index ac5a38e1911cf3e5338aed613dd31bf18f9041b1..f329e005d42ff50d6f629d7991a203e5172d7150 100644 --- a/contrib/extension/subscription/client.go +++ b/contrib/extension/subscription/client.go @@ -76,6 +76,10 @@ func (c *WrapClient) Subscribe(ctx context.Context, namespace string, channel an return nil, ErrSubscriptionNotFound } + // FIXME(garet): we can lose subscription messages here. if they send a notification and the server handles it + // before we finish adding it to the subs, the message will be lost. Ping will almost always be much much longer + // than us adding the sub so it probably doesn't matter. but it fails the unit tests :( + // now create a client sub sub := &clientSub{ engine: c, diff --git a/contrib/extension/subscription/client_test.go b/contrib/extension/subscription/client_test.go index 4d6c688984ff4002834216bf8a9a34e4e3a38722..cc7a8eb778e005f8d513e1d23e1e3ff027bcdb15 100644 --- a/contrib/extension/subscription/client_test.go +++ b/contrib/extension/subscription/client_test.go @@ -28,11 +28,14 @@ func TestSubscription(t *testing.T) { return } - for i := 0; i < count; i++ { - if err := notifier.Notify(i); err != nil { - panic(err) + go func() { + time.Sleep(10 * time.Millisecond) + for i := 0; i < count; i++ { + if err := notifier.Notify(i); err != nil { + panic(err) + } } - } + }() }) srv := server.NewServer(r) @@ -80,11 +83,14 @@ func TestUnsubscribeNoRead(t *testing.T) { return } - for i := 0; i < 10; i++ { - if err := notifier.Notify(i); err != nil { - panic(err) + go func() { + time.Sleep(10 * time.Millisecond) + for i := 0; i < 10; i++ { + if err := notifier.Notify(i); err != nil { + panic(err) + } } - } + }() }) srv := server.NewServer(r) @@ -136,6 +142,7 @@ func TestWrapClient(t *testing.T) { return } go func() { + time.Sleep(10 * time.Millisecond) idx := 0 for { select {