From c074cc6b4ec6a9179ea411c395086bd6e02d1b51 Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Sat, 3 Jun 2023 01:15:07 -0500 Subject: [PATCH] http --- pkg/codec/codecs/codecs.go | 5 + pkg/codec/{ => codecs}/http/client.go | 27 +-- pkg/codec/{ => codecs}/http/client_test.go | 18 +- pkg/codec/codecs/http/codec.go | 142 +++++++++++++ pkg/codec/{ => codecs}/http/http_test.go | 2 +- pkg/codec/{ => codecs}/inproc/client.go | 0 pkg/codec/{ => codecs}/inproc/inproc.go | 0 pkg/codec/{ => codecs}/inproc/inproc_test.go | 6 +- pkg/codec/{ => codecs}/ipc/ipc.go | 0 pkg/codec/{ => codecs}/ipc/ipc_js.go | 0 pkg/codec/{ => codecs}/ipc/ipc_unix.go | 0 pkg/codec/{ => codecs}/ipc/ipc_windows.go | 0 pkg/codec/{ => codecs}/stdio/server_test.go | 0 pkg/codec/{ => codecs}/stdio/stdio.go | 0 pkg/codec/{ => codecs}/websocket/client.go | 0 .../websocket/client_example_test.go | 0 pkg/codec/{ => codecs}/websocket/codec.go | 0 pkg/codec/{ => codecs}/websocket/const.go | 0 pkg/codec/{ => codecs}/websocket/dial.go | 0 pkg/codec/{ => codecs}/websocket/websocket.go | 2 +- .../{ => codecs}/websocket/websocket_test.go | 30 +-- .../websocket/wsjson/writer_test.go | 0 .../{ => codecs}/websocket/wsjson/wsjson.go | 0 pkg/codec/http/http.go | 194 ------------------ pkg/codec/http/http_client.go | 111 ---------- pkg/codec/json.go | 3 +- pkg/handlers/argreflect/json.go | 2 +- 27 files changed, 177 insertions(+), 365 deletions(-) create mode 100644 pkg/codec/codecs/codecs.go rename pkg/codec/{ => codecs}/http/client.go (76%) rename pkg/codec/{ => codecs}/http/client_test.go (93%) create mode 100644 pkg/codec/codecs/http/codec.go rename pkg/codec/{ => codecs}/http/http_test.go (99%) rename pkg/codec/{ => codecs}/inproc/client.go (100%) rename pkg/codec/{ => codecs}/inproc/inproc.go (100%) rename pkg/codec/{ => codecs}/inproc/inproc_test.go (77%) rename pkg/codec/{ => codecs}/ipc/ipc.go (100%) rename pkg/codec/{ => codecs}/ipc/ipc_js.go (100%) rename pkg/codec/{ => codecs}/ipc/ipc_unix.go (100%) rename pkg/codec/{ => codecs}/ipc/ipc_windows.go (100%) rename pkg/codec/{ => codecs}/stdio/server_test.go (100%) rename pkg/codec/{ => codecs}/stdio/stdio.go (100%) rename pkg/codec/{ => codecs}/websocket/client.go (100%) rename pkg/codec/{ => codecs}/websocket/client_example_test.go (100%) rename pkg/codec/{ => codecs}/websocket/codec.go (100%) rename pkg/codec/{ => codecs}/websocket/const.go (100%) rename pkg/codec/{ => codecs}/websocket/dial.go (100%) rename pkg/codec/{ => codecs}/websocket/websocket.go (98%) rename pkg/codec/{ => codecs}/websocket/websocket_test.go (85%) rename pkg/codec/{ => codecs}/websocket/wsjson/writer_test.go (100%) rename pkg/codec/{ => codecs}/websocket/wsjson/wsjson.go (100%) delete mode 100644 pkg/codec/http/http.go delete mode 100644 pkg/codec/http/http_client.go diff --git a/pkg/codec/codecs/codecs.go b/pkg/codec/codecs/codecs.go new file mode 100644 index 0000000..983a7bb --- /dev/null +++ b/pkg/codec/codecs/codecs.go @@ -0,0 +1,5 @@ +package codecs + +import "gfx.cafe/open/jrpc/pkg/codec/codecs/inproc" + +var NewInProc = inproc.NewCodec diff --git a/pkg/codec/http/client.go b/pkg/codec/codecs/http/client.go similarity index 76% rename from pkg/codec/http/client.go rename to pkg/codec/codecs/http/client.go index b7714e8..4cc3c8f 100644 --- a/pkg/codec/http/client.go +++ b/pkg/codec/codecs/http/client.go @@ -1,32 +1,17 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc +package http import ( "bytes" "context" "encoding/json" "errors" - "gfx.cafe/open/jrpc/pkg/clientutil" - "gfx.cafe/open/jrpc/pkg/codec" "net/http" "sync/atomic" "time" + "gfx.cafe/open/jrpc/pkg/clientutil" + "gfx.cafe/open/jrpc/pkg/codec" + "gfx.cafe/open/jrpc" ) @@ -44,6 +29,8 @@ const ( subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls ) +var _ jrpc.Conn = (*Client)(nil) + // Client represents a connection to an RPC server. type Client struct { remote string @@ -86,7 +73,7 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any) return nil } -func (c *Client) Notify(ctx context.Context, result any, method string, params any) error { +func (c *Client) Notify(ctx context.Context, method string, params any) error { req := jrpc.NewRequestInt(ctx, int(c.id.Add(1)), method, params) dat, err := req.MarshalJSON() if err != nil { diff --git a/pkg/codec/http/client_test.go b/pkg/codec/codecs/http/client_test.go similarity index 93% rename from pkg/codec/http/client_test.go rename to pkg/codec/codecs/http/client_test.go index 44cca23..ac8c090 100644 --- a/pkg/codec/http/client_test.go +++ b/pkg/codec/codecs/http/client_test.go @@ -1,20 +1,4 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc +package http import ( "context" diff --git a/pkg/codec/codecs/http/codec.go b/pkg/codec/codecs/http/codec.go new file mode 100644 index 0000000..62fba6c --- /dev/null +++ b/pkg/codec/codecs/http/codec.go @@ -0,0 +1,142 @@ +package http + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "io" + "net/http" + "net/url" + + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/pkg/codec" +) + +type Codec struct { + ctx context.Context + cn func() + + r *http.Request + w http.ResponseWriter + msgs chan json.RawMessage + errs chan error +} + +func NewCodec(r *http.Request, w http.ResponseWriter) *Codec { + ctx, cn := context.WithCancel(r.Context()) + c := &Codec{ + ctx: ctx, + cn: cn, + r: r, + w: w, + msgs: make(chan json.RawMessage, 1), + errs: make(chan error, 1), + } + go c.doRead() + return c +} + +// gets the peer info +func (c *Codec) PeerInfo() codec.PeerInfo { + ci := codec.PeerInfo{ + Transport: "http", + RemoteAddr: c.r.RemoteAddr, + HTTP: codec.HttpInfo{ + Version: c.r.Proto, + UserAgent: c.r.UserAgent(), + Host: c.r.Host, + Headers: c.r.Header.Clone(), + }, + } + ci.HTTP.Origin = c.r.Header.Get("X-Real-Ip") + if ci.HTTP.Origin == "" { + ci.HTTP.Origin = c.r.Header.Get("X-Forwarded-For") + } + if ci.HTTP.Origin == "" { + ci.HTTP.Origin = c.r.Header.Get("Origin") + } + if ci.HTTP.Origin == "" { + ci.HTTP.Origin = c.r.RemoteAddr + } + return ci +} + +func (r *Codec) doReadGet() (msgs json.RawMessage, err error) { + method_up := r.r.URL.Query().Get("method") + params, _ := url.QueryUnescape(r.r.URL.Query().Get("params")) + param := []byte(params) + if pb, err := base64.URLEncoding.DecodeString(params); err == nil { + param = pb + } + id := r.r.URL.Query().Get("id") + if id == "" { + id = "1" + } + req := jrpc.NewRequest(r.ctx, id, method_up, json.RawMessage(param)) + return req.MarshalJSON() +} + +var ErrInvalidContentType = errors.New("invalid content type") + +func (c *Codec) doRead() { + contentMatches := true + types := c.r.Header.Values("content-type") + for _, v := range types { + // TODO: check content type + _ = v + } + if !contentMatches { + c.errs <- ErrInvalidContentType + return + } + var data json.RawMessage + var err error + // TODO: implement eventsource + switch c.r.Method { + case http.MethodGet: + data, err = c.doReadGet() + return + case http.MethodPost: + data, err = io.ReadAll(c.r.Body) + } + if err != nil { + c.errs <- err + return + } + c.msgs <- data +} + +// json.RawMessage can be an array of requests. if it is, then it is a batch request +func (c *Codec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) { + select { + case ans := <-c.msgs: + return ans, nil + case err := <-c.errs: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.ctx.Done(): + return nil, c.ctx.Err() + } +} + +// closes the connection +func (c *Codec) Close() error { + c.cn() + return nil +} + +func (c *Codec) Write(p []byte) (n int, err error) { + return c.w.Write(p) +} + +// Closed returns a channel which is closed when the connection is closed. +func (c *Codec) Closed() <-chan struct{} { + return c.ctx.Done() +} + +// RemoteAddr returns the peer address of the connection. +func (c *Codec) RemoteAddr() string { + return "" +} diff --git a/pkg/codec/http/http_test.go b/pkg/codec/codecs/http/http_test.go similarity index 99% rename from pkg/codec/http/http_test.go rename to pkg/codec/codecs/http/http_test.go index a23d99a..41fbb6d 100644 --- a/pkg/codec/http/http_test.go +++ b/pkg/codec/codecs/http/http_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -package jrpc +package http import ( "net/http" diff --git a/pkg/codec/inproc/client.go b/pkg/codec/codecs/inproc/client.go similarity index 100% rename from pkg/codec/inproc/client.go rename to pkg/codec/codecs/inproc/client.go diff --git a/pkg/codec/inproc/inproc.go b/pkg/codec/codecs/inproc/inproc.go similarity index 100% rename from pkg/codec/inproc/inproc.go rename to pkg/codec/codecs/inproc/inproc.go diff --git a/pkg/codec/inproc/inproc_test.go b/pkg/codec/codecs/inproc/inproc_test.go similarity index 77% rename from pkg/codec/inproc/inproc_test.go rename to pkg/codec/codecs/inproc/inproc_test.go index 2ad22de..c49ca17 100644 --- a/pkg/codec/inproc/inproc_test.go +++ b/pkg/codec/codecs/inproc/inproc_test.go @@ -2,7 +2,7 @@ package inproc_test import ( "context" - inproc2 "gfx.cafe/open/jrpc/pkg/codec/inproc" + "gfx.cafe/open/jrpc/pkg/codec/codecs/inproc" "gfx.cafe/open/jrpc/pkg/jmux" "testing" @@ -16,8 +16,8 @@ func TestInprocSetup(t *testing.T) { ctx := context.Background() - clientCodec := inproc2.NewCodec() - client := inproc2.NewClient(clientCodec, nil) + clientCodec := inproc.NewCodec() + client := inproc.NewClient(clientCodec, nil) go func() { srv.ServeCodec(ctx, clientCodec) }() diff --git a/pkg/codec/ipc/ipc.go b/pkg/codec/codecs/ipc/ipc.go similarity index 100% rename from pkg/codec/ipc/ipc.go rename to pkg/codec/codecs/ipc/ipc.go diff --git a/pkg/codec/ipc/ipc_js.go b/pkg/codec/codecs/ipc/ipc_js.go similarity index 100% rename from pkg/codec/ipc/ipc_js.go rename to pkg/codec/codecs/ipc/ipc_js.go diff --git a/pkg/codec/ipc/ipc_unix.go b/pkg/codec/codecs/ipc/ipc_unix.go similarity index 100% rename from pkg/codec/ipc/ipc_unix.go rename to pkg/codec/codecs/ipc/ipc_unix.go diff --git a/pkg/codec/ipc/ipc_windows.go b/pkg/codec/codecs/ipc/ipc_windows.go similarity index 100% rename from pkg/codec/ipc/ipc_windows.go rename to pkg/codec/codecs/ipc/ipc_windows.go diff --git a/pkg/codec/stdio/server_test.go b/pkg/codec/codecs/stdio/server_test.go similarity index 100% rename from pkg/codec/stdio/server_test.go rename to pkg/codec/codecs/stdio/server_test.go diff --git a/pkg/codec/stdio/stdio.go b/pkg/codec/codecs/stdio/stdio.go similarity index 100% rename from pkg/codec/stdio/stdio.go rename to pkg/codec/codecs/stdio/stdio.go diff --git a/pkg/codec/websocket/client.go b/pkg/codec/codecs/websocket/client.go similarity index 100% rename from pkg/codec/websocket/client.go rename to pkg/codec/codecs/websocket/client.go diff --git a/pkg/codec/websocket/client_example_test.go b/pkg/codec/codecs/websocket/client_example_test.go similarity index 100% rename from pkg/codec/websocket/client_example_test.go rename to pkg/codec/codecs/websocket/client_example_test.go diff --git a/pkg/codec/websocket/codec.go b/pkg/codec/codecs/websocket/codec.go similarity index 100% rename from pkg/codec/websocket/codec.go rename to pkg/codec/codecs/websocket/codec.go diff --git a/pkg/codec/websocket/const.go b/pkg/codec/codecs/websocket/const.go similarity index 100% rename from pkg/codec/websocket/const.go rename to pkg/codec/codecs/websocket/const.go diff --git a/pkg/codec/websocket/dial.go b/pkg/codec/codecs/websocket/dial.go similarity index 100% rename from pkg/codec/websocket/dial.go rename to pkg/codec/codecs/websocket/dial.go diff --git a/pkg/codec/websocket/websocket.go b/pkg/codec/codecs/websocket/websocket.go similarity index 98% rename from pkg/codec/websocket/websocket.go rename to pkg/codec/codecs/websocket/websocket.go index 2a9daa6..7ad5c2a 100644 --- a/pkg/codec/websocket/websocket.go +++ b/pkg/codec/codecs/websocket/websocket.go @@ -5,7 +5,7 @@ import ( "encoding/base64" "encoding/json" codec2 "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/codec/websocket/wsjson" + "gfx.cafe/open/jrpc/pkg/codec/codecs/websocket/wsjson" "net/http" "net/url" "time" diff --git a/pkg/codec/websocket/websocket_test.go b/pkg/codec/codecs/websocket/websocket_test.go similarity index 85% rename from pkg/codec/websocket/websocket_test.go rename to pkg/codec/codecs/websocket/websocket_test.go index a1cf593..2aec7d1 100644 --- a/pkg/codec/websocket/websocket_test.go +++ b/pkg/codec/codecs/websocket/websocket_test.go @@ -4,7 +4,7 @@ import ( "context" "errors" "gfx.cafe/open/jrpc/pkg/codec" - websocket2 "gfx.cafe/open/jrpc/pkg/codec/websocket" + "gfx.cafe/open/jrpc/pkg/codec/codecs/websocket" "gfx.cafe/open/jrpc/pkg/jmux" jrpctest2 "gfx.cafe/open/jrpc/pkg/jrpctest" "net/http/httptest" @@ -17,7 +17,7 @@ import ( func TestWebsocketClientHeaders(t *testing.T) { t.Parallel() - endpoint, header, err := websocket2.WsClientHeaders("wss://testuser:test-PASS_01@example.com:1234", "https://example.com") + endpoint, header, err := websocket.WsClientHeaders("wss://testuser:test-PASS_01@example.com:1234", "https://example.com") if err != nil { t.Fatalf("wsGetConfig failed: %s", err) } @@ -38,24 +38,24 @@ func TestWebsocketOriginCheck(t *testing.T) { var ( srv = jrpctest2.NewTestServer() - httpsrv = httptest.NewServer(websocket2.WebsocketHandler(srv, []string{"http://example.com"})) + httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"http://example.com"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - client, err := websocket2.DialWebsocket(context.Background(), wsURL, "http://ekzample.com") + client, err := websocket.DialWebsocket(context.Background(), wsURL, "http://ekzample.com") if err == nil { client.Close() t.Fatal("no error for wrong origin") } - wantErr := websocket2.NewHandshakeError(errors.New("403"), "403 Forbidden") + wantErr := websocket.NewHandshakeError(errors.New("403"), "403 Forbidden") if !strings.Contains(err.Error(), wantErr.Error()) { t.Fatalf("wrong error for wrong origin: got: '%q', want: '%s'", err, wantErr) } // Connections without origin header should work. - client, err = websocket2.DialWebsocket(context.Background(), wsURL, "") + client, err = websocket.DialWebsocket(context.Background(), wsURL, "") if err != nil { t.Fatalf("error for empty origin: %v", err) } @@ -68,13 +68,13 @@ func TestWebsocketLargeCall(t *testing.T) { var ( srv = jrpctest2.NewTestServer() - httpsrv = httptest.NewServer(websocket2.WebsocketHandler(srv, []string{"*"})) + httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"*"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - client, err := websocket2.DialWebsocket(context.Background(), wsURL, "") + client, err := websocket.DialWebsocket(context.Background(), wsURL, "") if err != nil { t.Fatalf("can't dial: %v", err) } @@ -82,7 +82,7 @@ func TestWebsocketLargeCall(t *testing.T) { // This call sends slightly less than the limit and should work. var result jrpctest2.EchoResult - arg := strings.Repeat("x", websocket2.MaxRequestContentLength-200) + arg := strings.Repeat("x", websocket.MaxRequestContentLength-200) if err := client.Do(nil, &result, "test_echo", []any{arg, 1}); err != nil { t.Fatalf("valid call didn't work: %v", err) } @@ -91,7 +91,7 @@ func TestWebsocketLargeCall(t *testing.T) { } // This call sends twice the allowed size and shouldn't work. - arg = strings.Repeat("x", websocket2.MaxRequestContentLength*2) + arg = strings.Repeat("x", websocket.MaxRequestContentLength*2) err = client.Do(nil, &result, "test_echo", []any{arg}) if err == nil { t.Fatal("no error for too large call") @@ -101,14 +101,14 @@ func TestWebsocketLargeCall(t *testing.T) { func TestWebsocketPeerInfo(t *testing.T) { var ( s = jrpctest2.NewTestServer() - ts = httptest.NewServer(websocket2.WebsocketHandler(s, []string{"origin.example.com"})) + ts = httptest.NewServer(websocket.WebsocketHandler(s, []string{"origin.example.com"})) tsurl = "ws:" + strings.TrimPrefix(ts.URL, "http:") ) defer s.Stop() defer ts.Close() ctx := context.Background() - c, err := websocket2.DialWebsocket(ctx, tsurl, "http://origin.example.com") + c, err := websocket.DialWebsocket(ctx, tsurl, "http://origin.example.com") if err != nil { t.Fatal(err) } @@ -138,16 +138,16 @@ func TestClientWebsocketLargeMessage(t *testing.T) { mux := jmux.NewMux() var ( srv = jrpc.NewServer(mux) - httpsrv = httptest.NewServer(websocket2.WebsocketHandler(srv, nil)) + httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - respLength := websocket2.WsMessageSizeLimit - 50 + respLength := websocket.WsMessageSizeLimit - 50 mux.RegisterStruct("test", jrpctest2.LargeRespService{Length: respLength}) - c, err := websocket2.DialWebsocket(context.Background(), wsURL, "") + c, err := websocket.DialWebsocket(context.Background(), wsURL, "") if err != nil { t.Fatal(err) } diff --git a/pkg/codec/websocket/wsjson/writer_test.go b/pkg/codec/codecs/websocket/wsjson/writer_test.go similarity index 100% rename from pkg/codec/websocket/wsjson/writer_test.go rename to pkg/codec/codecs/websocket/wsjson/writer_test.go diff --git a/pkg/codec/websocket/wsjson/wsjson.go b/pkg/codec/codecs/websocket/wsjson/wsjson.go similarity index 100% rename from pkg/codec/websocket/wsjson/wsjson.go rename to pkg/codec/codecs/websocket/wsjson/wsjson.go diff --git a/pkg/codec/http/http.go b/pkg/codec/http/http.go deleted file mode 100644 index fbf1c26..0000000 --- a/pkg/codec/http/http.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc - -import ( - "bytes" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "gfx.cafe/open/jrpc/pkg/codec" - "io" - "net/http" - "net/url" - "time" - - "gfx.cafe/open/jrpc" - "gfx.cafe/util/go/bufpool" -) - -const ( - maxRequestContentLength = 1024 * 1024 * 5 - contentType = "application/json" -) - -// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 -var acceptedContentTypes = []string{ - // https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 - contentType, "application/json-rpc", "application/jsonrequest", - // these are added because they make sense, fight me! - "application/jsonrpc2", "application/json-rpc2", "application/jrpc", -} - -// HTTPTimeouts represents the configuration params for the HTTP RPC server. -type HTTPTimeouts struct { - // ReadTimeout is the maximum duration for reading the entire - // request, including the body. - // - // Because ReadTimeout does not let Handlers make per-request - // decisions on each request body's acceptable deadline or - // upload rate, most users will prefer to use - // ReadHeaderTimeout. It is valid to use them both. - ReadTimeout time.Duration - - // WriteTimeout is the maximum duration before timing out - // writes of the response. It is reset whenever a new - // request's header is read. Like ReadTimeout, it does not - // let Handlers make decisions on a per-request basis. - WriteTimeout time.Duration - - // IdleTimeout is the maximum amount of time to wait for the - // next request when keep-alives are enabled. If IdleTimeout - // is zero, the value of ReadTimeout is used. If both are - // zero, ReadHeaderTimeout is used. - IdleTimeout time.Duration -} - -// DefaultHTTPTimeouts represents the default timeout values used if further -// configuration is not provided. -var DefaultHTTPTimeouts = HTTPTimeouts{ - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 120 * time.Second, -} - -// httpServerConn turns a HTTP connection into a Conn. -type requestCodec struct { - r *http.Request - w http.ResponseWriter - - ctx context.Context - cn func() - - requestBuffer *bytes.Buffer - pi codec.PeerInfo -} - -func NewRequestCodec(r *http.Request, w http.ResponseWriter) *requestCodec { - // Create request-scoped context. - connInfo := codec.PeerInfo{ - Transport: "http", - RemoteAddr: r.RemoteAddr, - HTTP: codec.HttpInfo{ - Version: r.Proto, - UserAgent: r.UserAgent(), - Host: r.Host, - Headers: r.Header.Clone(), - }, - } - connInfo.HTTP.Version = r.Proto - connInfo.HTTP.Host = r.Host - connInfo.HTTP.Origin = r.Header.Get("X-Real-Ip") - if connInfo.HTTP.Origin == "" { - connInfo.HTTP.Origin = r.Header.Get("X-Forwarded-For") - } - if connInfo.HTTP.Origin == "" { - connInfo.HTTP.Origin = r.Header.Get("Origin") - } - if connInfo.HTTP.Origin == "" { - connInfo.HTTP.Origin = r.RemoteAddr - } - // the headers used - connInfo.HTTP.Headers = r.Header - buf := bufpool.GetStd() - - ctx, cn := context.WithCancel(r.Context()) - - return &requestCodec{ - ctx: ctx, - cn: cn, - r: r, - w: w, - pi: connInfo, - requestBuffer: buf, - } - -} - -// gets the peer info -func (r *requestCodec) PeerInfo() codec.PeerInfo { - return r.pi -} - -// json.RawMessage can be an array of requests. if it is, then it is a batch request -func (r *requestCodec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) { - if r.r.Method == http.MethodGet { - return r.readBatchGet(ctx) - } - if r.r.Method == http.MethodPost { - return r.readBatch(ctx) - } - return nil, fmt.Errorf("invalid request") -} - -func (r *requestCodec) readBatchGet(ctx context.Context) (msgs json.RawMessage, err error) { - method_up := r.r.URL.Query().Get("method") - params, _ := url.QueryUnescape(r.r.URL.Query().Get("params")) - param := []byte(params) - if pb, err := base64.URLEncoding.DecodeString(params); err == nil { - param = pb - } - req := jrpc.NewRequestInt(ctx, 1, method_up, json.RawMessage(param)) - return req.MarshalJSON() -} - -func (r *requestCodec) readBatch(ctx context.Context) (msgs json.RawMessage, err error) { - rd := io.LimitReader(r.r.Body, maxRequestContentLength) - _, err = io.Copy(r.requestBuffer, rd) - if err != nil { - return nil, err - } - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-r.ctx.Done(): - return nil, r.ctx.Err() - } - return json.RawMessage(r.requestBuffer.Bytes()), nil -} - -// closes the connection -func (r *requestCodec) Close() error { - r.cn() - bufpool.PutStd(r.requestBuffer) - return nil -} - -func (r *requestCodec) Write(p []byte) (n int, err error) { - return r.w.Write(p) -} - -// Closed returns a channel which is closed when the connection is closed. -func (r *requestCodec) Closed() <-chan struct{} { - return r.r.Context().Done() -} - -// RemoteAddr returns the peer address of the connection. -func (r *requestCodec) RemoteAddr() string { - return r.pi.RemoteAddr -} diff --git a/pkg/codec/http/http_client.go b/pkg/codec/http/http_client.go deleted file mode 100644 index f418615..0000000 --- a/pkg/codec/http/http_client.go +++ /dev/null @@ -1,111 +0,0 @@ -package jrpc - -import ( - "context" - "io" - "net/http" - "net/url" - "sync" - - "github.com/goccy/go-json" -) - -// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP -// using the provided HTTP Client. -func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { - // Sanity check URL so we don't end up with a client that will fail every request. - _, err := url.Parse(endpoint) - if err != nil { - return nil, err - } - - initctx := context.Background() - headers := make(http.Header, 2) - headers.Set("accept", contentType) - headers.Set("content-type", contentType) - return newClient(initctx, func(context.Context) (ServerCodec, error) { - hc := &httpConn{ - client: client, - headers: headers, - url: endpoint, - closeCh: make(chan any), - } - return hc, nil - }) -} - -// DialHTTP creates a new RPC client that connects to an RPC server over HTTP. -func DialHTTP(endpoint string) (*Client, error) { - return DialHTTPWithClient(endpoint, new(http.Client)) -} - -func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg any) error { - hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msg) - if err != nil { - return err - } - defer respBody.Close() - - var respmsg jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { - return err - } - op.resp <- &respmsg - return nil -} - -func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { - hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msgs) - if err != nil { - return err - } - defer respBody.Close() - var respmsgs []jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { - return err - } - for i := 0; i < len(respmsgs); i++ { - op.resp <- &respmsgs[i] - } - return nil -} - -type httpConn struct { - client *http.Client - url string - closeOnce sync.Once - closeCh chan any - mu sync.Mutex // protects headers - headers http.Header -} - -// httpConn implements ServerCodec, but it is treated specially by Client -// and some methods don't work. The panic() stubs here exist to ensure -// this special treatment is correct. -func (hc *httpConn) WriteJSON(context.Context, any) error { - panic("WriteJSON called on httpConn") -} - -func (hc *httpConn) PeerInfo() PeerInfo { - panic("PeerInfo called on httpConn") -} - -func (hc *httpConn) RemoteAddr() string { - return hc.url -} - -func (hc *httpConn) ReadBatch() ([]*jsonrpcMessage, bool, error) { - <-hc.closeCh - return nil, false, io.EOF -} - -func (hc *httpConn) Close() error { - hc.closeOnce.Do(func() { close(hc.closeCh) }) - return nil -} - -func (hc *httpConn) Closed() <-chan any { - return hc.closeCh -} diff --git a/pkg/codec/json.go b/pkg/codec/json.go index bea9754..1441247 100644 --- a/pkg/codec/json.go +++ b/pkg/codec/json.go @@ -3,9 +3,8 @@ package codec import ( "bytes" "encoding/json" + "gfx.cafe/open/jrpc/pkg/codec/codecs/websocket/wsjson" "strconv" - - "gfx.cafe/open/jrpc/pkg/codec/websocket/wsjson" ) var jzon = wsjson.JZON diff --git a/pkg/handlers/argreflect/json.go b/pkg/handlers/argreflect/json.go index 10f04aa..1e9dc3d 100644 --- a/pkg/handlers/argreflect/json.go +++ b/pkg/handlers/argreflect/json.go @@ -4,7 +4,7 @@ import ( "encoding/json" "errors" "fmt" - "gfx.cafe/open/jrpc/pkg/codec/websocket/wsjson" + "gfx.cafe/open/jrpc/pkg/codec/codecs/websocket/wsjson" "reflect" ) -- GitLab