From fd6f57b71de459bf1ea79718704943ff2ab53b9a Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Mon, 4 Dec 2023 16:37:09 -0600
Subject: [PATCH] params not result

---
 contrib/codecs/websocket/codec.go             |  3 +-
 contrib/extension/subscription/client_test.go | 65 +++++++++++++++++++
 pkg/jsonrpc/message.go                        |  9 +++
 pkg/server/server.go                          |  2 +-
 4 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/contrib/codecs/websocket/codec.go b/contrib/codecs/websocket/codec.go
index 325c445..68c0517 100644
--- a/contrib/codecs/websocket/codec.go
+++ b/contrib/codecs/websocket/codec.go
@@ -5,13 +5,12 @@ import (
 	"encoding/json"
 	"io"
 	"net/http"
+	_ "net/http/pprof"
 	"sync"
 	"time"
 
 	"gfx.cafe/open/websocket"
 
-	_ "net/http/pprof"
-
 	"gfx.cafe/open/jrpc/pkg/jjson"
 	"gfx.cafe/open/jrpc/pkg/jsonrpc"
 	"gfx.cafe/open/jrpc/pkg/serverutil"
diff --git a/contrib/extension/subscription/client_test.go b/contrib/extension/subscription/client_test.go
index 9e7a58e..4bf7f10 100644
--- a/contrib/extension/subscription/client_test.go
+++ b/contrib/extension/subscription/client_test.go
@@ -5,6 +5,7 @@ import (
 	"log"
 	"net"
 	"net/http"
+	_ "net/http/pprof"
 	"testing"
 	"time"
 
@@ -15,6 +16,70 @@ import (
 	"gfx.cafe/open/jrpc/pkg/server"
 )
 
+func TestSubscription(t *testing.T) {
+	go func() {
+		t.Error(http.ListenAndServe(":6060", nil))
+	}()
+
+	const count = 100
+
+	engine := NewEngine()
+	r := jmux.NewRouter()
+	r.Use(engine.Middleware())
+	r.HandleFunc("test/subscribe", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
+		notifier, ok := NotifierFromContext(r.Context())
+		if !ok {
+			_ = w.Send(nil, ErrNotificationsUnsupported)
+			return
+		}
+
+		for i := 0; i < count; i++ {
+			if err := notifier.Notify(i); err != nil {
+				panic(err)
+			}
+		}
+	})
+
+	srv := server.NewServer(r)
+	handler := codecs.WebsocketHandler(srv, []string{"*"})
+	httpSrv := http.Server{
+		Addr:    ":8855",
+		Handler: handler,
+	}
+	listener, err := net.Listen("tcp", ":8855")
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	go func() {
+		if err := httpSrv.Serve(listener); err != nil {
+			t.Error(err)
+			return
+		}
+	}()
+
+	cl, err := UpgradeConn(jrpc.Dial("ws://localhost:8855"))
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	ch := make(chan int, count)
+	sub, err := cl.Subscribe(context.Background(), "test", ch, nil)
+	defer func() {
+		if err = sub.Unsubscribe(); err != nil {
+			t.Error(err)
+		}
+	}()
+
+	for i := 0; i < count; i++ {
+		v := <-ch
+		if v != i {
+			t.Errorf("expected %d but got %d", i, v)
+		}
+	}
+}
+
 func TestWrapClient(t *testing.T) {
 	engine := NewEngine()
 	r := jmux.NewRouter()
diff --git a/pkg/jsonrpc/message.go b/pkg/jsonrpc/message.go
index f6b96a3..d23de9c 100644
--- a/pkg/jsonrpc/message.go
+++ b/pkg/jsonrpc/message.go
@@ -104,6 +104,15 @@ func (m *MessageWriter) Result() (io.Writer, error) {
 	return &ResultWriter{w: m.w}, nil
 }
 
+// Params returns a writer that writes to a params field
+func (m *MessageWriter) Params() (io.Writer, error) {
+	_, err := m.w.Write([]byte(`,"params":`))
+	if err != nil {
+		return nil, err
+	}
+	return &ResultWriter{w: m.w}, nil
+}
+
 type BatchWriter struct {
 	w          io.Writer
 	mu         *semaphore.Weighted
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 0fbbf71..04aedf1 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -325,7 +325,7 @@ func (c *callResponder) notify(env *notifyEnv, s *jsonrpc.MessageWriter) (err er
 		return err
 	}
 	// if there is no error, we try to marshal the result
-	wr, err := s.Result()
+	wr, err := s.Params()
 	if err != nil {
 		return err
 	}
-- 
GitLab