diff --git a/benchmark/suite_test.go b/benchmark/suite_test.go index cd3767cff13231eb1f3d8a0157223a341c2c1ddd..14d2a7e586bf217a21a8f9ba6fee5cfd86983d90 100644 --- a/benchmark/suite_test.go +++ b/benchmark/suite_test.go @@ -9,7 +9,6 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/pkg/jrpctest" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) func TestBenchmarkSuite(t *testing.T) { @@ -21,7 +20,7 @@ func TestBenchmarkSuite(t *testing.T) { } ctx := context.Background() - makeTest("SingleClient", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("SingleClient", func(t *testing.T, h jsonrpc.Handler, client jsonrpc.Conn) { err := client.Do(ctx, nil, "test_ping", nil) if err != nil { t.Error(err) @@ -37,7 +36,7 @@ func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) { executeBench(b, fm) }) } - makeBench("SingleClient", func(b *testing.B, server *server.Server, client jsonrpc.Conn) { + makeBench("SingleClient", func(b *testing.B, h jsonrpc.Handler, client jsonrpc.Conn) { for i := 0; i < b.N; i++ { err := client.Do(ctx, nil, "test_ping", nil) if err != nil { diff --git a/contrib/codecs/codecs.go b/contrib/codecs/codecs.go index e974765efeeba8942ffe7b4b6ebd6e986534db85..2badaf5addec2df4fc50d62286d51b87f2d69802 100644 --- a/contrib/codecs/codecs.go +++ b/contrib/codecs/codecs.go @@ -5,7 +5,7 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs/http" "gfx.cafe/open/jrpc/contrib/codecs/websocket" - "gfx.cafe/open/jrpc/pkg/server" + "gfx.cafe/open/jrpc/pkg/jsonrpc" gohttp "net/http" "net/url" @@ -14,7 +14,7 @@ import ( var WebsocketHandler = websocket.WebsocketHandler var HttpHandler = http.HttpHandler -var HttpWebsocketHandler = func(srv *server.Server, origins []string) gohttp.Handler { +var HttpWebsocketHandler = func(srv jsonrpc.Handler, origins []string) gohttp.Handler { cwss := WebsocketHandler(srv, origins) chttp := HttpHandler(srv) return gohttp.HandlerFunc(func(w gohttp.ResponseWriter, r *gohttp.Request) { @@ -31,7 +31,7 @@ var HttpWebsocketHandler = func(srv *server.Server, origins []string) gohttp.Han //} // ListenAndServe -func ListenAndServe(u string, srv *server.Server, opts map[string]any) error { +func ListenAndServe(u string, srv jsonrpc.Handler, opts map[string]any) error { pu, err := url.Parse(u) if err != nil { return err diff --git a/contrib/codecs/http/handler.go b/contrib/codecs/http/handler.go index 2a7d04271d1c79d2eda41fb457e4e0e9a32088c6..153bc6f4a75a840f224b527bced4f9be48e598ea 100644 --- a/contrib/codecs/http/handler.go +++ b/contrib/codecs/http/handler.go @@ -13,12 +13,12 @@ import ( ) func JrpcToHttp(h jsonrpc.Handler) http.Handler { - return HttpHandler(server.NewServer(h)) + return HttpHandler(h) } -func HttpHandler(s *server.Server) http.Handler { +func HttpHandler(h jsonrpc.Handler) http.Handler { return h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if s == nil { + if h == nil { http.Error(w, "no server set", http.StatusInternalServerError) return } @@ -27,7 +27,7 @@ func HttpHandler(s *server.Server) http.Handler { return } w.Header().Set("content-type", contentType) - err = s.ServeCodec(r.Context(), c) + err = server.ServeCodec(r.Context(), c, h) if err != nil && !errors.Is(err, context.Canceled) { // slog.Error("codec err", "err", err) http.Error(w, "Internal Error", http.StatusInternalServerError) diff --git a/contrib/codecs/http/http_test.go b/contrib/codecs/http/http_test.go index bcf9a29c44bf7240f16012094dc843992119a430..a62313c246ac75299850e8b93e9039bd7f5f7156 100644 --- a/contrib/codecs/http/http_test.go +++ b/contrib/codecs/http/http_test.go @@ -27,7 +27,6 @@ import ( "gfx.cafe/open/jrpc" "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" "gfx.cafe/open/jrpc/pkg/jrpctest" @@ -53,7 +52,7 @@ func confirmStatusCode(t *testing.T, got, want int) { func confirmHTTPRequestYieldsStatusCode(t *testing.T, method, contentType, body string, expectedStatusCode int) { t.Helper() - s := server.NewServer(jmux.NewMux()) + s := jmux.NewMux() ts := httptest.NewServer(HttpHandler(s)) defer ts.Close() @@ -77,7 +76,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) { // This checks that maxRequestContentLength is not applied to the response of a request. func TestHTTPRespBodyUnlimited(t *testing.T) { - s := jrpctest.NewServer() + s := jrpctest.NewRouter() ts := httptest.NewServer(HttpHandler(s)) defer ts.Close() @@ -136,7 +135,7 @@ func TestHTTPErrorResponse(t *testing.T) { } func TestClientHTTP(t *testing.T) { - s := jrpctest.NewServer() + s := jrpctest.NewRouter() ts := httptest.NewServer(HttpHandler(s)) defer ts.Close() c, err := DialHTTP(ts.URL) diff --git a/contrib/codecs/http/testing.go b/contrib/codecs/http/testing.go index 27df7c49495ecd9767c5510ad863d1ec1a88994e..6bd5f19f7006fb7f17962bbce2f674e249589f05 100644 --- a/contrib/codecs/http/testing.go +++ b/contrib/codecs/http/testing.go @@ -5,11 +5,10 @@ import ( "gfx.cafe/open/jrpc/pkg/jrpctest" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) -func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { - s := jrpctest.NewServer() +func ServerMaker() (jsonrpc.Handler, jrpctest.ClientMaker, func()) { + s := jrpctest.NewRouter() hsrv := httptest.NewServer(HttpHandler(s)) return s, func() jsonrpc.Conn { conn, err := DialHTTP(hsrv.URL) diff --git a/contrib/codecs/init.go b/contrib/codecs/init.go index df52372b4b3efe0d857f84d37e1f2c0e212fb7c8..8dd652e89d0ad066f1b0af7f0644dfd77c89ca15 100644 --- a/contrib/codecs/init.go +++ b/contrib/codecs/init.go @@ -11,14 +11,13 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs/rdwr" "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) func init() { - RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + RegisterHandler(func(bind *url.URL, srv jsonrpc.Handler, opts map[string]any) error { return gohttp.ListenAndServe(bind.Host, HttpHandler(srv)) }, "http") - RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + RegisterHandler(func(bind *url.URL, srv jsonrpc.Handler, opts map[string]any) error { origins := []string{} if val, ok := opts["origins"]; ok { if t, ok := val.([]string); ok { @@ -28,7 +27,7 @@ func init() { return gohttp.ListenAndServe(bind.Host, HttpWebsocketHandler(srv, origins)) }, "http+ws") - RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + RegisterHandler(func(bind *url.URL, srv jsonrpc.Handler, opts map[string]any) error { tcpAddr, err := net.ResolveTCPAddr("tcp", bind.Host) if err != nil { return err diff --git a/contrib/codecs/rdwr/rdwr_test.go b/contrib/codecs/rdwr/rdwr_test.go index db0e2fa7c0dc9c2316d8a8e8dde1608c91f8dc82..f2bc3530e990fb82935bf55552c3658677bbe332 100644 --- a/contrib/codecs/rdwr/rdwr_test.go +++ b/contrib/codecs/rdwr/rdwr_test.go @@ -15,7 +15,6 @@ import ( func TestRDWRSetup(t *testing.T) { mux := jmux.NewMux() - srv := server.NewServer(mux) ctx := context.Background() @@ -25,7 +24,7 @@ func TestRDWRSetup(t *testing.T) { clientCodec := rdwr.NewCodec(rd_s, wr_c) client := rdwr.NewClient(rd_c, wr_s) go func() { - err := srv.ServeCodec(ctx, clientCodec) + err := server.ServeCodec(ctx, clientCodec, mux) assert.NoError(t, err) }() diff --git a/contrib/codecs/rdwr/testing.go b/contrib/codecs/rdwr/testing.go index 60e9da19bdd6b567c3c07285650e054b80782658..fd67b520115ba3d2b54a91db6e29a03642f66b28 100644 --- a/contrib/codecs/rdwr/testing.go +++ b/contrib/codecs/rdwr/testing.go @@ -9,15 +9,18 @@ import ( "gfx.cafe/open/jrpc/pkg/server" ) -func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { +func ServerMaker() (jsonrpc.Handler, jrpctest.ClientMaker, func()) { rd_s, wr_s := io.Pipe() rd_c, wr_c := io.Pipe() - s := jrpctest.NewServer() + s := jrpctest.NewRouter() clientCodec := NewCodec(rd_c, wr_s) + ctx, cn := context.WithCancel(context.Background()) go func() { - s.ServeCodec(context.Background(), clientCodec) + server.ServeCodec(ctx, clientCodec, s) }() return s, func() jsonrpc.Conn { - return NewClient(rd_s, wr_c) - }, func() {} + return NewClient(rd_s, wr_c) + }, func() { + cn() + } } diff --git a/contrib/codecs/registry.go b/contrib/codecs/registry.go index 954c0a7d4906d11c32d33bcafdaa21af3a4e3430..a225ca8d390058f1539572efc6ae6d5c1869476b 100644 --- a/contrib/codecs/registry.go +++ b/contrib/codecs/registry.go @@ -5,10 +5,9 @@ import ( "net/url" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) -type handlerFunc = func(bind *url.URL, srv *server.Server, opts map[string]any) error +type handlerFunc = func(bind *url.URL, h jsonrpc.Handler, opts map[string]any) error var handlerFuncs map[string]handlerFunc = map[string]handlerFunc{} diff --git a/contrib/codecs/websocket/handler.go b/contrib/codecs/websocket/handler.go index b3ef650245119d2d68ebb56319340aea6bffbe16..08e051c1b5b5569a98e5d97fdf6ea75f022c7daa 100644 --- a/contrib/codecs/websocket/handler.go +++ b/contrib/codecs/websocket/handler.go @@ -5,15 +5,16 @@ import ( "gfx.cafe/open/websocket" + "gfx.cafe/open/jrpc/pkg/jsonrpc" "gfx.cafe/open/jrpc/pkg/server" ) type Server struct { - Server *server.Server + Handler jsonrpc.Handler } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if s.Server == nil { + if s.Handler == nil { http.Error(w, "no server set", http.StatusInternalServerError) return } @@ -23,7 +24,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } c := newWebsocketCodec(r.Context(), conn, "", r) - err = s.Server.ServeCodec(r.Context(), c) + err = server.ServeCodec(r.Context(), c, s.Handler) if err != nil { //slog.Error("codec err", "error", err) } @@ -33,7 +34,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // // allowedOrigins should be a comma-separated list of allowed origin URLs. // To allow connections with any origin, pass "*". -func WebsocketHandler(s *server.Server, allowedOrigins []string) http.Handler { +func WebsocketHandler(s jsonrpc.Handler, allowedOrigins []string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ OriginPatterns: allowedOrigins, @@ -44,7 +45,7 @@ func WebsocketHandler(s *server.Server, allowedOrigins []string) http.Handler { return } codec := newWebsocketCodec(r.Context(), conn, r.Host, r) - err = s.ServeCodec(r.Context(), codec) + err = server.ServeCodec(r.Context(), codec, s) if err != nil { // slog.Error("codec err", "error", err) } diff --git a/contrib/codecs/websocket/testing.go b/contrib/codecs/websocket/testing.go index ffef9c70988f70f05d9437534f06e4327c571b2c..7de4f04fb2b77d65ad2a1218d81b3b995309b7c7 100644 --- a/contrib/codecs/websocket/testing.go +++ b/contrib/codecs/websocket/testing.go @@ -6,12 +6,11 @@ import ( "gfx.cafe/open/jrpc/pkg/jrpctest" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) -func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { - s := jrpctest.NewServer() - hsrv := httptest.NewServer(&Server{Server: s}) +func ServerMaker() (jsonrpc.Handler, jrpctest.ClientMaker, func()) { + s := jrpctest.NewRouter() + hsrv := httptest.NewServer(&Server{Handler: s}) return s, func() jsonrpc.Conn { conn, err := DialWebsocket(context.Background(), hsrv.URL, "") if err != nil { diff --git a/contrib/codecs/websocket/websocket_test.go b/contrib/codecs/websocket/websocket_test.go index 04981d07bba2780539faaa64e52a127c886ea0e3..7959b96488bae71f9d366057d0c199830073be26 100644 --- a/contrib/codecs/websocket/websocket_test.go +++ b/contrib/codecs/websocket/websocket_test.go @@ -9,7 +9,6 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/pkg/jrpctest" - "gfx.cafe/open/jrpc/pkg/server" ) func TestWebsocketClientHeaders(t *testing.T) { @@ -35,7 +34,7 @@ func TestWebsocketOriginCheck(t *testing.T) { t.Parallel() var ( - srv = jrpctest.NewServer() + srv = jrpctest.NewRouter() httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"http://example.com"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) @@ -64,7 +63,7 @@ func TestWebsocketLargeCall(t *testing.T) { t.Parallel() var ( - srv = jrpctest.NewServer() + srv = jrpctest.NewRouter() httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"*"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) @@ -98,7 +97,7 @@ func TestWebsocketLargeCall(t *testing.T) { func TestClientWebsocketLargeMessage(t *testing.T) { mux := jmux.NewMux() var ( - srv = server.NewServer(mux) + srv = mux httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) diff --git a/contrib/extension/subscription/client_test.go b/contrib/extension/subscription/client_test.go index f697a201e26588342267bbdafa6d9b9bc281254d..ab3f77760e559e046e73a9271eab694c033ddc50 100644 --- a/contrib/extension/subscription/client_test.go +++ b/contrib/extension/subscription/client_test.go @@ -13,7 +13,6 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs" "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) func newRouter(t *testing.T) jmux.Router { @@ -59,8 +58,7 @@ func newRouter(t *testing.T) jmux.Router { func newServer(t *testing.T) (Conn, func()) { r := newRouter(t) - srv := server.NewServer(r) - handler := codecs.WebsocketHandler(srv, []string{"*"}) + handler := codecs.WebsocketHandler(r, []string{"*"}) httpSrv := httptest.NewServer(handler) wsURL := "ws:" + strings.TrimPrefix(httpSrv.URL, "http:") @@ -73,7 +71,6 @@ func newServer(t *testing.T) (Conn, func()) { return cl, func() { _ = cl.Close() httpSrv.Close() - srv.Shutdown(context.Background()) } } diff --git a/example/echo/main.go b/example/echo/main.go index 69af46e23469b1c90bdd41eaeedf9e9475d09e94..c18914b65190cadf5bad6ea84f5dd5fbdcf957cc 100644 --- a/example/echo/main.go +++ b/example/echo/main.go @@ -7,13 +7,11 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs" "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" ) func main() { r := jmux.NewRouter() - srv := server.NewServer(r) r.HandleFunc("echo", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) { w.Send(r.Params, nil) @@ -27,7 +25,7 @@ func main() { r.RegisterStruct("server", server) log.Println("running on 8855") - log.Println(http.ListenAndServe(":8855", codecs.HttpHandler(srv))) + log.Println(http.ListenAndServe(":8855", codecs.HttpHandler(r))) } type EchoServer struct { diff --git a/example/proxy/main.go b/example/proxy/main.go index 41822c1589bda7773bcfeb11018add8e788a58bd..5db4069ebdff53cbb5969f41f6b2da1295046cad 100644 --- a/example/proxy/main.go +++ b/example/proxy/main.go @@ -9,7 +9,6 @@ import ( "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/contrib/middleware" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" "gfx.cafe/open/jrpc" ) @@ -31,8 +30,7 @@ func main() { log.Println("running on 8855") - srv := server.NewServer(r) - log.Println(http.ListenAndServe(":8855", codecs.HttpHandler(srv))) + log.Println(http.ListenAndServe(":8855", codecs.HttpHandler(r))) } // http://localhost:8855/?method=eth_blockNumber diff --git a/example/subscription/main.go b/example/subscription/main.go index 38014a69f6a3b7a521e01706d6090842d092d1b3..57827d4fbf9e6968ab8b851c82bc9d9a576ab34b 100644 --- a/example/subscription/main.go +++ b/example/subscription/main.go @@ -11,7 +11,6 @@ import ( "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/contrib/middleware" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" "gfx.cafe/open/jrpc" ) @@ -20,7 +19,6 @@ func main() { engine := subscription.NewEngine() r := jmux.NewRouter() r.Use(middleware.Logger) - srv := server.NewServer(r) r.HandleFunc("echo", func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) { w.Send(r.Params, nil) }) @@ -55,7 +53,7 @@ func main() { }() log.Println("running on 8855") - handler := codecs.HttpWebsocketHandler(srv, []string{"*"}) + handler := codecs.HttpWebsocketHandler(r, []string{"*"}) err := http.ListenAndServe(":8855", handler) if err != nil { log.Println(err) diff --git a/exports.go b/exports.go index 127529104b55f1cfdd0b2e14e36d0b47e83ad62c..c0d8a0d19093123493fb28f16bde149cb46e622e 100644 --- a/exports.go +++ b/exports.go @@ -27,16 +27,12 @@ type ( // Request is the request object Request = jsonrpc.Request // Server is a jrpc server - Server = server.Server // Middleware is a middleware Middleware = func(Handler) Handler ) var ( - // NewServer creates a jrpc server - NewServer = server.NewServer - // DialContext is to dial a conn with context DialContext = codecs.DialContext // Dial is to dial a conn with context.Background() diff --git a/pkg/jrpctest/server.go b/pkg/jrpctest/server.go index c5fdd0ac8bcfaefd320fc783ea76ce6bffa63746..5b31d4ee8cf82f6bc12f6801a5309245fee4b3d2 100644 --- a/pkg/jrpctest/server.go +++ b/pkg/jrpctest/server.go @@ -5,13 +5,8 @@ import ( jmux2 "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/contrib/middleware" - "gfx.cafe/open/jrpc/pkg/server" ) -func NewServer() *server.Server { - server := server.NewServer(NewRouter()) - return server -} func NewRouter() *jmux2.Mux { mux := jmux2.NewRouter() mux.Use(middleware.LegacyUnderscoreReplacer) diff --git a/pkg/jrpctest/suites.go b/pkg/jrpctest/suites.go index b175b6fad3cd050371762f4bb25a6ee31506463e..1eb173743f43f39297eef99d86d085423b46a0b1 100644 --- a/pkg/jrpctest/suites.go +++ b/pkg/jrpctest/suites.go @@ -10,27 +10,25 @@ import ( "time" "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) type ClientMaker func() jsonrpc.Conn -type ServerMaker func() (*server.Server, ClientMaker, func()) +type ServerMaker func() (jsonrpc.Handler, ClientMaker, func()) type BasicTestSuiteArgs struct { ServerMaker ServerMaker } -type TestContext func(t *testing.T, server *server.Server, client jsonrpc.Conn) -type BenchContext func(t *testing.B, server *server.Server, client jsonrpc.Conn) +type TestContext func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) +type BenchContext func(t *testing.B, server jsonrpc.Handler, client jsonrpc.Conn) func TestExecutor(sm ServerMaker) func(t *testing.T, c TestContext) { return func(t *testing.T, c TestContext) { server, dialer, cn := sm() defer cn() - defer server.Shutdown(context.Background()) client := dialer() defer client.Close() c(t, server, client) @@ -40,7 +38,6 @@ func BenchExecutor(sm ServerMaker) func(t *testing.B, c BenchContext) { return func(t *testing.B, c BenchContext) { server, dialer, cn := sm() defer cn() - defer server.Shutdown(context.Background()) client := dialer() defer client.Close() c(t, server, client) @@ -60,7 +57,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { } t.Parallel() - makeTest("Request", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("Request", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { var resp EchoResult err := client.Do(nil, &resp, "test_echo", []any{"hello", 10, &EchoArgs{"world"}}) require.NoError(t, err) @@ -69,7 +66,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { } }) - makeTest("ResponseType", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("ResponseType", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { err := jsonrpc.CallInto(nil, client, nil, "test_echo", "hello", 10, &EchoArgs{"world"}) assert.NoErrorf(t, err, "passing nil as result should be ok") var resultVar EchoResult @@ -78,7 +75,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { assert.Error(t, err, "passing var as nil gives error") }) - makeTest("ResposeType2", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("ResposeType2", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { if err := jsonrpc.CallInto(nil, client, nil, "test_echo", "hello", 10, &EchoArgs{"world"}); err != nil { t.Errorf("Passing nil as result should be fine, but got an error: %v", err) } @@ -90,7 +87,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { } }) - makeTest("ErrorReturnType", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("ErrorReturnType", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { var resp any err := jsonrpc.CallInto(nil, client, &resp, "test_returnError") require.Error(t, err) @@ -108,7 +105,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { t.Fatalf("wrong error data %#v, want %#v", e.ErrorData(), testError{}.ErrorData()) } }) - makeTest("Notify", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("Notify", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { if c, ok := client.(jsonrpc.Conn); ok { if err := c.Notify(context.Background(), "test_echo", []any{"hello", 10, &EchoArgs{"world"}}); err != nil { t.Fatal(err) @@ -116,7 +113,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { } }) - makeTest("context cancel", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("context cancel", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { maxContextCancelTimeout := 300 * time.Millisecond // The actual test starts here. var ( @@ -164,7 +161,7 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { wg.Wait() }) - makeTest("big", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("big", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { var ( wg sync.WaitGroup nreqs = 2 @@ -188,6 +185,6 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { wg.Wait() }) - makeTest("", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { + makeTest("", func(t *testing.T, server jsonrpc.Handler, client jsonrpc.Conn) { }) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 092875c6f4ba4f79e2ea266980ed9b55e856845d..86b1eaf1c2c73543d8111cc80d42a5a18d40cb00 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3,7 +3,6 @@ package server import ( "context" "encoding/json" - "net/http" "sync" "github.com/mailgun/multibuf" @@ -14,43 +13,25 @@ import ( "gfx.cafe/open/jrpc/pkg/serverutil" ) -// Server is an RPC server. -// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics. -// a server can be used to listenandserve multiple codecs at a time -type Server struct { - services jsonrpc.Handler - - lctx context.Context - cn context.CancelFunc -} - -// NewServer creates a new server instance with no registered handlers. -func NewServer(r jsonrpc.Handler) *Server { - server := &Server{services: r} - server.lctx, server.cn = context.WithCancel(context.Background()) - return server -} - // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes // the response back using the given codec. It will block until the codec is closed. // the codec will return if either of these conditions are met // 1. every request read from ReadBatch until ReadBatch returns context.Canceled is processed. // 2. there is a server related error (failed encoding, broken conn) that was received while processing/reading messages. -func (s *Server) ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter) error { - select { - case <-s.lctx.Done(): - return http.ErrServerClosed - default: - } +func ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter, handler jsonrpc.Handler) error { // close the remote after handling it defer remote.Close() stream := jsonrpc.NewStream(remote) // add a cancel to the context so we can cancel all the child tasks on return - ctx = ContextWithPeerInfo(ctx, remote.PeerInfo()) - ctx = ContextWithMessageStream(ctx, stream) + ctx = ContextWithMessageStream(ContextWithPeerInfo( + ctx, + remote.PeerInfo(), + ), stream, + ) + egg, ctx := errgroup.WithContext(ctx) ctx, cn := context.WithCancel(ctx) defer cn() - egg, ctx := errgroup.WithContext(ctx) + errCh := make(chan error, 1) batches := make(chan serverutil.Bundle, 1) go func() { @@ -90,7 +71,7 @@ func (s *Server) ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter) er stream: stream, } egg.Go(func() error { - return s.serve(ctx, incoming, responder) + return serve(ctx, incoming, responder, handler) }) } egg.Wait() @@ -102,25 +83,22 @@ func (s *Server) ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter) er } } -func (s *Server) Shutdown(ctx context.Context) error { - s.cn() - return nil -} - -func (s *Server) serve(ctx context.Context, +func serve(ctx context.Context, incoming []*jsonrpc.Message, r *callResponder, + handler jsonrpc.Handler, ) error { if r.batch { - return s.serveBatch(ctx, incoming, r) + return serveBatch(ctx, incoming, r, handler) } else { - return s.serveSingle(ctx, incoming[0], r) + return serveSingle(ctx, incoming[0], r, handler) } } -func (s *Server) serveSingle(ctx context.Context, +func serveSingle(ctx context.Context, incoming *jsonrpc.Message, r *callResponder, + handler jsonrpc.Handler, ) error { rw := &streamingRespWriter{ ctx: ctx, @@ -144,37 +122,17 @@ func (s *Server) serveSingle(ctx context.Context, return err } } - s.services.ServeRPC(rw, req) + handler.ServeRPC(rw, req) if rw.sendCalled == false && rw.id != nil { rw.Send(jsonrpc.Null, nil) } return nil } -func produceOutputMessage(inputMessage *jsonrpc.Message) (out *jsonrpc.Message, err error) { - // a nil incoming message means return an invalid request. - if inputMessage == nil { - inputMessage = &jsonrpc.Message{ID: jsonrpc.NewNullIDPtr()} - err = jsonrpc.NewInvalidRequestError("invalid request") - } - out = inputMessage - out.Error = nil - // zero length method is always invalid request - if len(out.Method) == 0 { - // assume if the method is not there AND the id is not there that it's an invalid REQUEST not notification - // this makes sure we add 1 to totalRequests - if out.ID == nil { - out.ID = jsonrpc.NewNullIDPtr() - } - err = jsonrpc.NewInvalidRequestError("invalid request") - } - - return -} - -func (s *Server) serveBatch(ctx context.Context, +func serveBatch(ctx context.Context, incoming []*jsonrpc.Message, r *callResponder, + handler jsonrpc.Handler, ) error { // check for empty batch if r.batch && len(incoming) == 0 { @@ -240,7 +198,7 @@ func (s *Server) serveBatch(ctx context.Context, req.Peer = r.peerinfo go func() { defer returnWg.Done() - s.services.ServeRPC(rw, req) + handler.ServeRPC(rw, req) if rw.sendCalled == false && rw.id != nil { rw.Send(jsonrpc.Null, nil) } @@ -279,6 +237,27 @@ func (s *Server) serveBatch(ctx context.Context, return nil } +func produceOutputMessage(inputMessage *jsonrpc.Message) (out *jsonrpc.Message, err error) { + // a nil incoming message means return an invalid request. + if inputMessage == nil { + inputMessage = &jsonrpc.Message{ID: jsonrpc.NewNullIDPtr()} + err = jsonrpc.NewInvalidRequestError("invalid request") + } + out = inputMessage + out.Error = nil + // zero length method is always invalid request + if len(out.Method) == 0 { + // assume if the method is not there AND the id is not there that it's an invalid REQUEST not notification + // this makes sure we add 1 to totalRequests + if out.ID == nil { + out.ID = jsonrpc.NewNullIDPtr() + } + err = jsonrpc.NewInvalidRequestError("invalid request") + } + + return +} + type callResponder struct { peerinfo jsonrpc.PeerInfo stream *jsonrpc.MessageStream diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 33295ee73eec5a19d659eb3c7e34e725850c753f..a4565240bac6a3636bd8d48b8cf87963c50b0f7e 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -10,6 +10,7 @@ import ( "gfx.cafe/open/jrpc/contrib/codecs/rdwr" "gfx.cafe/open/jrpc/pkg/jrpctest" + "gfx.cafe/open/jrpc/pkg/server" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -20,9 +21,9 @@ func TestGoEthereumTestScripts(t *testing.T) { // create a net pipe rd, wr := net.Pipe() readbuf := bufio.NewReader(rd) - srv := jrpctest.NewServer() + srv := jrpctest.NewRouter() c := rdwr.NewCodec(wr, wr) - go srv.ServeCodec(context.TODO(), c) + go server.ServeCodec(context.TODO(), c, srv) for _, act := range tf.Action { switch act.Direction { case jrpctest.DirectionRecv: