good morning!!!!

Skip to content
Snippets Groups Projects
client_test.go 3.75 KiB
Newer Older
Garet Halliday's avatar
fix
Garet Halliday committed
package subscription

import (
	"context"
Garet Halliday's avatar
Garet Halliday committed
	"net/http/httptest"
Garet Halliday's avatar
Garet Halliday committed
	_ "net/http/pprof"
Garet Halliday's avatar
Garet Halliday committed
	"strings"
Garet Halliday's avatar
fix
Garet Halliday committed
	"testing"
	"time"

	"gfx.cafe/open/jrpc"
	"gfx.cafe/open/jrpc/contrib/codecs"
	"gfx.cafe/open/jrpc/contrib/jmux"
	"gfx.cafe/open/jrpc/pkg/jsonrpc"
	"gfx.cafe/open/jrpc/pkg/server"
)

Garet Halliday's avatar
Garet Halliday committed
func TestSubscription(t *testing.T) {
	const count = 100

	engine := NewEngine()
	r := jmux.NewRouter()
	r.Use(engine.Middleware())
	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		notifier, ok := NotifierFromContext(r.Context())
		if !ok {
			_ = w.Send(nil, ErrNotificationsUnsupported)
			return
		}

		for i := 0; i < count; i++ {
			if err := notifier.Notify(i); err != nil {
				panic(err)
			}
		}
	})

	srv := server.NewServer(r)
	handler := codecs.WebsocketHandler(srv, []string{"*"})
Garet Halliday's avatar
Garet Halliday committed
	httpSrv := httptest.NewServer(handler)
Garet Halliday's avatar
Garet Halliday committed
	wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:")
	cl, err := UpgradeConn(jrpc.Dial(wsURL))
Garet Halliday's avatar
Garet Halliday committed
	if err != nil {
		t.Error(err)
		return
	}

	ch := make(chan int, count)
	sub, err := cl.Subscribe(context.Background(), "test", ch, nil)
	defer func() {
		if err = sub.Unsubscribe(); err != nil {
			t.Error(err)
		}
	}()

	for i := 0; i < count; i++ {
		v := <-ch
		if v != i {
			t.Errorf("expected %d but got %d", i, v)
		}
	}
}

func TestUnsubscribeNoRead(t *testing.T) {
	engine := NewEngine()
	r := jmux.NewRouter()
	r.Use(engine.Middleware())
	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		notifier, ok := NotifierFromContext(r.Context())
		if !ok {
			_ = w.Send(nil, ErrNotificationsUnsupported)
			return
		}

		for i := 0; i < 10; i++ {
			if err := notifier.Notify(i); err != nil {
				panic(err)
			}
		}
	})

	srv := server.NewServer(r)
	handler := codecs.WebsocketHandler(srv, []string{"*"})
	httpSrv := httptest.NewServer(handler)

	wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:")
	cl, err := UpgradeConn(jrpc.Dial(wsURL))
	if err != nil {
		t.Error(err)
		return
	}

	ch := make(chan int)
	sub, err := cl.Subscribe(context.Background(), "test", ch, nil)
	time.Sleep(time.Second)
	if err = sub.Unsubscribe(); err != nil {
		t.Error(err)
		return
	}
}

Garet Halliday's avatar
fix
Garet Halliday committed
func TestWrapClient(t *testing.T) {
	engine := NewEngine()
	r := jmux.NewRouter()
	r.Use(engine.Middleware())
	r.HandleFunc("echo", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		_ = w.Send(r.Params, nil)
	})
	// extremely fast subscription to fill buffers to get a higher chance that we receive another message while trying
	// to unsubscribe
	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
		notifier, ok := NotifierFromContext(r.Context())
		if !ok {
			_ = w.Send(nil, ErrNotificationsUnsupported)
			return
		}
		go func() {
			idx := 0
			for {
				select {
				case <-r.Context().Done():
					return
				case <-notifier.Err():
					return
				default:
				}
				_ = notifier.Notify(idx)
				idx += 1
			}
		}()
	})
	srv := server.NewServer(r)
	handler := codecs.WebsocketHandler(srv, []string{"*"})
Garet Halliday's avatar
Garet Halliday committed
	httpSrv := httptest.NewServer(handler)
Garet Halliday's avatar
fix
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:")
	cl, err := UpgradeConn(jrpc.Dial(wsURL))
Garet Halliday's avatar
fix
Garet Halliday committed
	if err != nil {
		t.Error(err)
		return
	}

	for i := 0; i < 10; i++ {
		var res string
		if err = cl.Do(context.Background(), &res, "echo", "test"); err != nil {
			t.Error(err)
			return
		}
		if res != "test" {
			t.Errorf(`expected "test" but got %#v`, res)
			return
		}

		ch := make(chan int, 1)
		sub, err := cl.Subscribe(context.Background(), "test", ch, nil)
		if err != nil {
			t.Error(err)
			return
		}

		go func() {
Garet Halliday's avatar
Garet Halliday committed
			time.Sleep(20 * time.Millisecond)
Garet Halliday's avatar
fix
Garet Halliday committed
			_ = sub.Unsubscribe()
		}()

		func() {
			for {
				select {
				case err, ok := <-sub.Err():
					if ok {
						t.Errorf("sub errored: %v", err)
					}
					return
Garet Halliday's avatar
Garet Halliday committed
				case <-ch:
Garet Halliday's avatar
fix
Garet Halliday committed
				}
			}
		}()
	}
}