good morning!!!!

Skip to content
Snippets Groups Projects
client_test.go 4.4 KiB
Newer Older
Garet Halliday's avatar
fix
Garet Halliday committed
package subscription

import (
	"context"
Garet Halliday's avatar
Garet Halliday committed
	"net/http/httptest"
Garet Halliday's avatar
Garet Halliday committed
	_ "net/http/pprof"
Garet Halliday's avatar
Garet Halliday committed
	"strings"
Garet Halliday's avatar
fix
Garet Halliday committed
	"testing"
	"time"

	"gfx.cafe/open/jrpc"
	"gfx.cafe/open/jrpc/contrib/codecs"
	"gfx.cafe/open/jrpc/contrib/jmux"
	"gfx.cafe/open/jrpc/pkg/jsonrpc"
	"gfx.cafe/open/jrpc/pkg/server"
)

Garet Halliday's avatar
Garet Halliday committed
func TestSubscription(t *testing.T) {
	const count = 100

	engine := NewEngine()
	r := jmux.NewRouter()
	r.Use(engine.Middleware())
	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		notifier, ok := NotifierFromContext(r.Context())
		if !ok {
			_ = w.Send(nil, ErrNotificationsUnsupported)
			return
		}

		for i := 0; i < count; i++ {
			if err := notifier.Notify(i); err != nil {
				panic(err)
			}
		}
	})

	srv := server.NewServer(r)
	defer srv.Shutdown(context.Background())
Garet Halliday's avatar
Garet Halliday committed
	handler := codecs.WebsocketHandler(srv, []string{"*"})
Garet Halliday's avatar
Garet Halliday committed
	httpSrv := httptest.NewServer(handler)
	defer httpSrv.Close()
Garet Halliday's avatar
Garet Halliday committed
	wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:")
	cl, err := UpgradeConn(jrpc.Dial(wsURL))
Garet Halliday's avatar
Garet Halliday committed
	if err != nil {
		t.Error(err)
		return
	}
	defer func() {
		if err = cl.Close(); err != nil {
			t.Error(err)
		}
	}()
Garet Halliday's avatar
Garet Halliday committed

	ch := make(chan int, count)
	sub, err := cl.Subscribe(context.Background(), "test", ch, nil)
	defer func() {
		if err = sub.Unsubscribe(); err != nil {
			t.Error(err)
		}
	}()

	for i := 0; i < count; i++ {
		v := <-ch
		if v != i {
			t.Errorf("expected %d but got %d", i, v)
		}
	}
}

func TestUnsubscribeNoRead(t *testing.T) {
	engine := NewEngine()
	r := jmux.NewRouter()
	r.Use(engine.Middleware())
	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		notifier, ok := NotifierFromContext(r.Context())
		if !ok {
			_ = w.Send(nil, ErrNotificationsUnsupported)
			return
		}

		for i := 0; i < 10; i++ {
			if err := notifier.Notify(i); err != nil {
				panic(err)
			}
		}
	})

	srv := server.NewServer(r)
	defer srv.Shutdown(context.Background())
	handler := codecs.WebsocketHandler(srv, []string{"*"})
	httpSrv := httptest.NewServer(handler)
	defer httpSrv.Close()

	wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:")
	cl, err := UpgradeConn(jrpc.Dial(wsURL))
	if err != nil {
		t.Error(err)
		return
	}
	defer func() {
		if err = cl.Close(); err != nil {
			t.Error(err)
		}
	}()

	ch := make(chan int)
	sub, err := cl.Subscribe(context.Background(), "test", ch, nil)
	time.Sleep(time.Second)
	if err = sub.Unsubscribe(); err != nil {
		t.Error(err)
		return
	}
}

Garet Halliday's avatar
fix
Garet Halliday committed
func TestWrapClient(t *testing.T) {
	engine := NewEngine()
	r := jmux.NewRouter()
	r.Use(engine.Middleware())
	r.HandleFunc("echo", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
Garet Halliday's avatar
Garet Halliday committed
		err := w.Send(r.Params, nil)
		if err != nil {
			t.Error(err)
		}
Garet Halliday's avatar
fix
Garet Halliday committed
	})
	// extremely fast subscription to fill buffers to get a higher chance that we receive another message while trying
	// to unsubscribe
	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		notifier, ok := NotifierFromContext(r.Context())
		if !ok {
Garet Halliday's avatar
Garet Halliday committed
			err := w.Send(nil, ErrNotificationsUnsupported)
			if err != nil {
				t.Error(err)
			}
Garet Halliday's avatar
fix
Garet Halliday committed
			return
		}
		go func() {
			idx := 0
			for {
				select {
				case <-r.Context().Done():
					return
				case <-notifier.Err():
					return
				default:
				}
Garet Halliday's avatar
Garet Halliday committed
				err := notifier.Notify(idx)
				if err != nil {
					t.Error(err)
				}
Garet Halliday's avatar
fix
Garet Halliday committed
				idx += 1
			}
		}()
	})
	srv := server.NewServer(r)
	defer srv.Shutdown(context.Background())
Garet Halliday's avatar
fix
Garet Halliday committed
	handler := codecs.WebsocketHandler(srv, []string{"*"})
Garet Halliday's avatar
Garet Halliday committed
	httpSrv := httptest.NewServer(handler)
	defer httpSrv.Close()
Garet Halliday's avatar
fix
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:")
	cl, err := UpgradeConn(jrpc.Dial(wsURL))
Garet Halliday's avatar
fix
Garet Halliday committed
	if err != nil {
		t.Error(err)
		return
	}
	defer func() {
		if err = cl.Close(); err != nil {
			t.Error(err)
		}
	}()
Garet Halliday's avatar
fix
Garet Halliday committed

	for i := 0; i < 10; i++ {
		var res string
		if err = cl.Do(context.Background(), &res, "echo", "test"); err != nil {
			t.Error(err)
			return
		}
		if res != "test" {
			t.Errorf(`expected "test" but got %#v`, res)
			return
		}

Garet Halliday's avatar
Garet Halliday committed
		ch := make(chan int, 101)
		var sub ClientSubscription
		sub, err = cl.Subscribe(context.Background(), "test", ch, nil)
Garet Halliday's avatar
fix
Garet Halliday committed
		if err != nil {
			t.Error(err)
			return
		}

		func() {
			for {
				select {
				case err, ok := <-sub.Err():
					if ok {
						t.Errorf("sub errored: %v", err)
					}
					return
Garet Halliday's avatar
Garet Halliday committed
				case n, ok := <-ch:
					if !ok {
						return
					}
					if n == 100 {
						if err = sub.Unsubscribe(); err != nil {
							t.Error(err)
							return
						}
					}
Garet Halliday's avatar
fix
Garet Halliday committed
				}
			}
		}()
	}
}