diff --git a/benchmark_test.go b/benchmark/benchmark_test.go similarity index 100% rename from benchmark_test.go rename to benchmark/benchmark_test.go diff --git a/codec/errors.go b/codec/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..a175fab7c25e4dea3fb046882ba32774f3cf4ab3 --- /dev/null +++ b/codec/errors.go @@ -0,0 +1,132 @@ +package codec + +import ( + "fmt" +) + +// HTTPError is returned by client operations when the HTTP status code of the +// response is not a 2xx status. +type HTTPError struct { + StatusCode int + Status string + Body []byte +} + +func (err HTTPError) Error() string { + if len(err.Body) == 0 { + return err.Status + } + return fmt.Sprintf("%v: %s", err.Status, err.Body) +} + +// Error wraps RPC errors, which contain an error code in addition to the message. +type Error interface { + Error() string // returns the message + ErrorCode() int // returns the code +} + +// A DataError contains some data in addition to the error message. +type DataError interface { + Error() string // returns the message + ErrorData() any // returns the error data +} + +type JrpcErr struct { + Data any +} + +func (j *JrpcErr) ErrorData() any { + return j.Data +} + +func (j *JrpcErr) Error() string { + return "Jrpc Error" +} + +func (j *JrpcErr) ErrorCode() int { + return ErrorCodeJrpc +} + +func WrapJrpcErr(err error) error { + if err == nil { + return nil + } + return fmt.Errorf("%w: %w", &JrpcErr{}, err) +} + +func MakeJrpcErr(s string) error { + return fmt.Errorf("%w: %s", &JrpcErr{}, s) +} + +// Error types defined below are the built-in JSON-RPC errors. + +var ( + _ Error = new(ErrorMethodNotFound) + _ Error = new(ErrorSubscriptionNotFound) + _ Error = new(ErrorParse) + _ Error = new(ErrorInvalidRequest) + _ Error = new(ErrorInvalidMessage) + _ Error = new(ErrorInvalidParams) +) + +const ErrorCodeDefault = -32000 + +const ErrorCodeApplication = -32080 + +const ErrorCodeJrpc = -42000 + +type ErrorMethodNotFound struct{ method string } + +func (e *ErrorMethodNotFound) ErrorCode() int { return -32601 } + +func (e *ErrorMethodNotFound) Error() string { + return fmt.Sprintf("the method %s does not exist/is not available", e.method) +} + +type ErrorSubscriptionNotFound struct{ namespace, subscription string } + +func (e *ErrorSubscriptionNotFound) ErrorCode() int { return -32601 } + +func (e *ErrorSubscriptionNotFound) Error() string { + return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) +} + +// Invalid JSON was received by the server. +type ErrorParse struct{ message string } + +func (e *ErrorParse) ErrorCode() int { return -32700 } + +func (e *ErrorParse) Error() string { return e.message } + +func NewInvalidRequestError(message string) *ErrorInvalidRequest { + return &ErrorInvalidRequest{ + message: message, + } +} + +// received message isn't a valid request +type ErrorInvalidRequest struct{ message string } + +func (e *ErrorInvalidRequest) ErrorCode() int { return -32600 } + +func (e *ErrorInvalidRequest) Error() string { return e.message } + +// received message is invalid +type ErrorInvalidMessage struct{ message string } + +func (e *ErrorInvalidMessage) ErrorCode() int { return -32700 } + +func (e *ErrorInvalidMessage) Error() string { return e.message } + +func NewInvalidParamsError(message string) *ErrorInvalidMessage { + return &ErrorInvalidMessage{ + message: message, + } +} + +// unable to decode supplied params, or an invalid number of parameters +type ErrorInvalidParams struct{ message string } + +func (e *ErrorInvalidParams) ErrorCode() int { return -32602 } + +func (e *ErrorInvalidParams) Error() string { return e.message } diff --git a/client.go b/codec/http/client.go similarity index 100% rename from client.go rename to codec/http/client.go diff --git a/client_test.go b/codec/http/client_test.go similarity index 100% rename from client_test.go rename to codec/http/client_test.go diff --git a/http.go b/codec/http/http.go similarity index 98% rename from http.go rename to codec/http/http.go index 359e643faa502c6504e280c8349fd98b56d2fce2..acc542079d24144d3317f3ab53e77f5a9e5add57 100644 --- a/http.go +++ b/codec/http/http.go @@ -27,6 +27,7 @@ import ( "net/url" "time" + "gfx.cafe/open/jrpc/codec" "gfx.cafe/util/go/bufpool" json "github.com/goccy/go-json" @@ -82,15 +83,15 @@ type httpServerConn struct { io.Reader io.Writer - jc ServerCodec + jc codec.ReaderWriter r *http.Request w http.ResponseWriter - pi PeerInfo + pi codec.PeerInfo } -func newHTTPServerConn(r *http.Request, w http.ResponseWriter, pi PeerInfo) ServerCodec { +func newHTTPServerConn(r *http.Request, w http.ResponseWriter, pi codec.PeerInfo) codec.ReaderWriter { c := &httpServerConn{Writer: w, r: r, pi: pi} // if the request is a GET request, and the body is empty, we turn the request into fake json rpc request, see below // https://www.jsonrpc.org/historical/json-rpc-over-http.html#encoded-parameters diff --git a/http_client.go b/codec/http/http_client.go similarity index 100% rename from http_client.go rename to codec/http/http_client.go diff --git a/http_test.go b/codec/http/http_test.go similarity index 100% rename from http_test.go rename to codec/http/http_test.go diff --git a/inproc.go b/codec/inproc/inproc.go similarity index 100% rename from inproc.go rename to codec/inproc/inproc.go diff --git a/ipc.go b/codec/ipc/ipc.go similarity index 100% rename from ipc.go rename to codec/ipc/ipc.go diff --git a/ipc_js.go b/codec/ipc/ipc_js.go similarity index 100% rename from ipc_js.go rename to codec/ipc/ipc_js.go diff --git a/ipc_unix.go b/codec/ipc/ipc_unix.go similarity index 100% rename from ipc_unix.go rename to codec/ipc/ipc_unix.go diff --git a/ipc_windows.go b/codec/ipc/ipc_windows.go similarity index 100% rename from ipc_windows.go rename to codec/ipc/ipc_windows.go diff --git a/json.go b/codec/json.go similarity index 52% rename from json.go rename to codec/json.go index a015cb9d6576b7e5220222542e275346e51fe683..48a25d28e06efa1b6c72b98dd633d2a0e3630868 100644 --- a/json.go +++ b/codec/json.go @@ -1,130 +1,103 @@ -package jrpc +package codec import ( "bytes" + "encoding/json" "strconv" - "strings" - "time" - stdjson "encoding/json" - - "gfx.cafe/open/jrpc/wsjson" - "github.com/goccy/go-json" + "gfx.cafe/open/jrpc/codec/websocket/wsjson" ) var jzon = wsjson.JZON -const ( - defaultWriteTimeout = 10 * time.Second // used if context has no deadline -) - -var null = json.RawMessage("null") +var Null = json.RawMessage("null") // A value of this type can a JSON-RPC request, notification, successful response or // error response. Which one it is depends on the fields. -type jsonrpcMessage struct { - Version version `json:"jsonrpc,omitempty"` +type Message struct { + Version Version `json:"jsonrpc,omitempty"` ID *ID `json:"id,omitempty"` Method string `json:"method,omitempty"` Params json.RawMessage `json:"params,omitempty"` Result json.RawMessage `json:"result,omitempty"` - Error *jsonError `json:"error,omitempty"` + Error *JsonError `json:"error,omitempty"` } -func MakeCall(id int, method string, params []any) *JsonRpcMessage { - return &JsonRpcMessage{ +func MakeCall(id int, method string, params []any) *Message { + return &Message{ ID: NewNumberIDPtr(int64(id)), } } -type JsonRpcMessage = jsonrpcMessage - -func (msg *jsonrpcMessage) isNotification() bool { +func (msg *Message) isNotification() bool { return msg.ID == nil && len(msg.Method) > 0 } -func (msg *jsonrpcMessage) isCall() bool { + +func (msg *Message) isCall() bool { return msg.hasValidID() && len(msg.Method) > 0 } -func (msg *jsonrpcMessage) isResponse() bool { + +func (msg *Message) isResponse() bool { return msg.hasValidID() && len(msg.Method) == 0 && msg.Params == nil && (msg.Result != nil || msg.Error != nil) } -func (msg *jsonrpcMessage) toResponse() *Response { - return &Response{ - ID: msg.ID, - Result: msg.Result, - Error: msg.Error, - } -} -func (msg *jsonrpcMessage) hasValidID() bool { - return msg.ID != nil && !msg.ID.null -} -func (msg *jsonrpcMessage) isSubscribe() bool { - return strings.HasSuffix(msg.Method, subscribeMethodSuffix) -} -func (msg *jsonrpcMessage) isUnsubscribe() bool { - return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix) -} -func (msg *jsonrpcMessage) namespace() string { - elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2) - return elem[0] +func (msg *Message) hasValidID() bool { + return msg.ID != nil && !msg.ID.null } -func (msg *jsonrpcMessage) String() string { +func (msg *Message) String() string { b, _ := json.Marshal(msg) return string(b) } -func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { - resp := errorMessage(err) +func (msg *Message) ErrorResponse(err error) *Message { + resp := ErrorMessage(err) if resp.ID != nil { resp.ID = msg.ID } return resp } -func (msg *jsonrpcMessage) response(result any) *jsonrpcMessage { +func (msg *Message) response(result any) *Message { // do a funny marshaling enc, err := jzon.Marshal(result) if err != nil { - return msg.errorResponse(err) + return msg.ErrorResponse(err) } if len(enc) == 0 { enc = []byte("null") } - return &jsonrpcMessage{ID: msg.ID, Result: enc} + return &Message{ID: msg.ID, Result: enc} } // encapsulate json rpc error into struct -type jsonError struct { +type JsonError struct { Code int `json:"code"` Message string `json:"message"` Data any `json:"data,omitempty"` } -type JsonError = jsonError - -func (err *jsonError) Error() string { +func (err *JsonError) Error() string { if err.Message == "" { return "json-rpc error " + strconv.Itoa(err.Code) } return err.Message } -func (err *jsonError) ErrorCode() int { +func (err *JsonError) ErrorCode() int { return err.Code } -func (err *jsonError) ErrorData() any { +func (err *JsonError) ErrorData() any { return err.Data } // error message produces json rpc message with error message -func errorMessage(err error) *jsonrpcMessage { - msg := &jsonrpcMessage{ +func ErrorMessage(err error) *Message { + msg := &Message{ ID: NewNullIDPtr(), - Error: &jsonError{ - Code: defaultErrorCode, + Error: &JsonError{ + Code: ErrorCodeDefault, Message: err.Error(), }} ec, ok := err.(Error) @@ -139,7 +112,7 @@ func errorMessage(err error) *jsonrpcMessage { } // isBatch returns true when the first non-whitespace characters is '[' -func isBatch(raw json.RawMessage) bool { +func IsBatchMessage(raw json.RawMessage) bool { for _, c := range raw { // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) switch c { @@ -154,21 +127,21 @@ func isBatch(raw json.RawMessage) bool { // parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error // checks in this function because the raw message has already been syntax-checked when it // is called. Any non-JSON-RPC messages in the input return the zero value of -// jsonrpcMessage. -func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { - if !isBatch(raw) { - msgs := []*jsonrpcMessage{{}} - json.Unmarshal(raw, &msgs[0]) +// Message. +func ParseMessage(raw json.RawMessage) ([]*Message, bool) { + if !IsBatchMessage(raw) { + msgs := []*Message{{}} + jzon.Unmarshal(raw, &msgs[0]) return msgs, false } // TODO: // for some reason other json decoders are incompatible with our test suite // pretty sure its how we handle EOFs and stuff - dec := stdjson.NewDecoder(bytes.NewReader(raw)) + dec := json.NewDecoder(bytes.NewReader(raw)) dec.Token() // skip '[' - var msgs []*jsonrpcMessage + var msgs []*Message for dec.More() { - msgs = append(msgs, new(jsonrpcMessage)) + msgs = append(msgs, new(Message)) dec.Decode(&msgs[len(msgs)-1]) } return msgs, true diff --git a/codec/peer.go b/codec/peer.go new file mode 100644 index 0000000000000000000000000000000000000000..2e9389a3618d8f92b0ea22d2984d1979139209a9 --- /dev/null +++ b/codec/peer.go @@ -0,0 +1,31 @@ +package codec + +import "net/http" + +// PeerInfo contains information about the remote end of the network connection. +// +// This is available within RPC method handlers through the context. Call +// PeerInfoFromContext to get information about the client connection related to +// the current method call. +type PeerInfo struct { + // Transport is name of the protocol used by the client. + // This can be "http", "ws" or "ipc". + Transport string + + // Address of client. This will usually contain the IP address and port. + RemoteAddr string + + // Addditional information for HTTP and WebSocket connections. + HTTP HttpInfo +} + +type HttpInfo struct { + // Protocol version, i.e. "HTTP/1.1". This is not set for WebSocket. + Version string + // Header values sent by the client. + UserAgent string + Origin string + Host string + + Headers http.Header +} diff --git a/server_test.go b/codec/stdio/server_test.go similarity index 100% rename from server_test.go rename to codec/stdio/server_test.go diff --git a/stdio.go b/codec/stdio/stdio.go similarity index 100% rename from stdio.go rename to codec/stdio/stdio.go diff --git a/codec/transport.go b/codec/transport.go new file mode 100644 index 0000000000000000000000000000000000000000..5155b6fa4f7019d155c3228925228c27e23d3350 --- /dev/null +++ b/codec/transport.go @@ -0,0 +1,33 @@ +package codec + +import ( + "context" + "encoding/json" +) + +type ReaderWriter interface { + Reader + Writer +} + +// Reader can write JSON messages to its underlying connection +// Implementations must be safe for concurrent use +type Reader interface { + // gets the peer info + PeerInfo() PeerInfo + // json.RawMessage can be an array of requests. if it is, then it is a batch request + ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) + // closes the connection + Close() error +} + +// Writer can write JSON messages to its underlying connection. +// Implementations must be safe for concurrent use. +type Writer interface { + // write json blob to stream + WriteJSON(context.Context, any) error + // Closed returns a channel which is closed when the connection is closed. + Closed() <-chan any + // RemoteAddr returns the peer address of the connection. + RemoteAddr() string +} diff --git a/codec/websocket/client.go b/codec/websocket/client.go new file mode 100644 index 0000000000000000000000000000000000000000..2f16f1d41c434d20d67ba04b3e72ed4bc8d0198a --- /dev/null +++ b/codec/websocket/client.go @@ -0,0 +1,53 @@ +package websocket + +import ( + "context" + "sync" + + "gfx.cafe/open/jrpc" + "nhooyr.io/websocket" +) + +type Client struct { + conn *websocket.Conn + reconnectFunc reconnectFunc + + mu sync.RWMutex +} + +type reconnectFunc func(ctx context.Context) (*websocket.Conn, error) + +func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { + conn, err := connect(initctx) + if err != nil { + return nil, err + } + c := &Client{} + c.conn = conn + c.reconnectFunc = connect + return c, nil +} + +func (c *Client) Do(ctx context.Context, result any, method string, params any) error { + panic("not implemented") // TODO: Implement +} + +func (c *Client) BatchCall(ctx context.Context, b ...jrpc.BatchElem) error { + panic("not implemented") // TODO: Implement +} + +func (c *Client) SetHeader(key string, value string) { + panic("not implemented") // TODO: Implement +} + +func (c *Client) Close() error { + panic("not implemented") // TODO: Implement +} + +func (c *Client) Notify(ctx context.Context, method string, args ...any) error { + panic("not implemented") // TODO: Implement +} + +func (c *Client) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*jrpc.ClientSubscription, error) { + panic("not implemented") // TODO: Implement +} diff --git a/client_example_test.go b/codec/websocket/client_example_test.go similarity index 100% rename from client_example_test.go rename to codec/websocket/client_example_test.go diff --git a/codec/websocket/codec.go b/codec/websocket/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..708bc8cb5dd4392e386ecaefe121a43791aa892b --- /dev/null +++ b/codec/websocket/codec.go @@ -0,0 +1 @@ +package websocket diff --git a/codec/websocket/const.go b/codec/websocket/const.go new file mode 100644 index 0000000000000000000000000000000000000000..83897f72dcafad3c9b4571e5fe12558741f8c27d --- /dev/null +++ b/codec/websocket/const.go @@ -0,0 +1,14 @@ +package websocket + +import "time" + +const ( + MaxRequestContentLength = 1024 * 1024 * 5 + ContentType = "application/json" + WsReadBuffer = 1024 + WsWriteBuffer = 1024 + WsPingInterval = 60 * time.Second + WsPingWriteTimeout = 5 * time.Second + WsPongTimeout = 30 * time.Second + WsMessageSizeLimit = 128 * 1024 * 1024 +) diff --git a/codec/websocket/dial.go b/codec/websocket/dial.go new file mode 100644 index 0000000000000000000000000000000000000000..3f50e5d6269bdd78deef5409c6ecce197542b0b7 --- /dev/null +++ b/codec/websocket/dial.go @@ -0,0 +1,45 @@ +package websocket + +import ( + "context" + + "nhooyr.io/websocket" +) + +// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { + endpoint, header, err := WsClientHeaders(endpoint, origin) + if err != nil { + return nil, err + } + dialer := &websocket.DialOptions{ + CompressionMode: websocket.CompressionContextTakeover, + CompressionThreshold: 4096, + HTTPHeader: header, + } + return DialWebsocketWithDialer(ctx, endpoint, origin, dialer) +} + +// that is listening on the given endpoint using the provided dialer. +func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, opts *websocket.DialOptions) (*Client, error) { + endpoint, header, err := WsClientHeaders(endpoint, origin) + if err != nil { + return nil, err + } + opts.HTTPHeader = header + return newClient(ctx, func(cctx context.Context) (*websocket.Conn, error) { + conn, resp, err := websocket.Dial(cctx, endpoint, opts) + if err != nil { + hErr := WsHandshakeError{err: err} + if resp != nil { + hErr.status = resp.Status + } + return nil, hErr + } + return conn, err + }) +} diff --git a/websocket.go b/codec/websocket/websocket.go similarity index 60% rename from websocket.go rename to codec/websocket/websocket.go index 69c98db39c4e6ab1cb9da78cada4a1951a74462b..89701e59d6bf3dc54d35162e633f4eeeebb10b21 100644 --- a/websocket.go +++ b/codec/websocket/websocket.go @@ -14,34 +14,28 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -package jrpc +package websocket import ( "context" "encoding/base64" + "encoding/json" "net/http" "net/url" "time" - "gfx.cafe/open/jrpc/wsjson" - "tuxpa.in/a/zlog/log" + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/codec" + "gfx.cafe/open/jrpc/codec/websocket/wsjson" "nhooyr.io/websocket" -) - -const ( - wsReadBuffer = 1024 - wsWriteBuffer = 1024 - wsPingInterval = 60 * time.Second - wsPingWriteTimeout = 5 * time.Second - wsPongTimeout = 30 * time.Second - wsMessageSizeLimit = 128 * 1024 * 1024 + "tuxpa.in/a/zlog/log" ) // WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. // // allowedOrigins should be a comma-separated list of allowed origin URLs. // To allow connections with any origin, pass "*". -func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { +func WebsocketHandler(s *jrpc.Server, allowedOrigins []string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ OriginPatterns: allowedOrigins, @@ -57,6 +51,10 @@ func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { }) } +func NewHandshakeError(err error, status string) error { + return &wsHandshakeError{err, status} +} + type wsHandshakeError struct { err error status string @@ -70,46 +68,7 @@ func (e wsHandshakeError) Error() string { return s } -// that is listening on the given endpoint using the provided dialer. -func DialWebsocketWithDialer(ctx context.Context, endpoint, origin string, opts *websocket.DialOptions) (*Client, error) { - endpoint, header, err := wsClientHeaders(endpoint, origin) - if err != nil { - return nil, err - } - opts.HTTPHeader = header - return newClient(ctx, func(cctx context.Context) (ServerCodec, error) { - conn, resp, err := websocket.Dial(cctx, endpoint, opts) - if err != nil { - hErr := wsHandshakeError{err: err} - if resp != nil { - hErr.status = resp.Status - } - return nil, hErr - } - out := newWebsocketCodec(resp.Request.Context(), conn, endpoint, header) - return out, err - }) -} - -// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server -// that is listening on the given endpoint. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { - endpoint, header, err := wsClientHeaders(endpoint, origin) - if err != nil { - return nil, err - } - dialer := &websocket.DialOptions{ - CompressionMode: websocket.CompressionContextTakeover, - CompressionThreshold: 4096, - HTTPHeader: header, - } - return DialWebsocketWithDialer(ctx, endpoint, origin, dialer) -} - -func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { +func WsClientHeaders(endpoint, origin string) (string, http.Header, error) { endpointURL, err := url.Parse(endpoint) if err != nil { return endpoint, nil, err @@ -127,11 +86,32 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { } type websocketCodec struct { - *jsonCodec conn *websocket.Conn - info PeerInfo + info codec.PeerInfo pingReset chan struct{} + + closed chan any +} + +// if there is more than one message, it is a batch request +func (w *websocketCodec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) { + w.conn.SetReadLimit(WsMessageSizeLimit) + err = wsjson.Read(ctx, w.conn, &msgs) + if err != nil { + return nil, err + } + return msgs, nil +} + +// Closed returns a channel which is closed when the connection is closed. +func (w *websocketCodec) Closed() <-chan any { + return w.closed +} + +// RemoteAddr returns the peer address of the connection. +func (w *websocketCodec) RemoteAddr() string { + return w.info.RemoteAddr } func heartbeat(ctx context.Context, c *websocket.Conn, d time.Duration) { @@ -143,32 +123,22 @@ func heartbeat(ctx context.Context, c *websocket.Conn, d time.Duration) { return case <-t.C: } - err := c.Ping(ctx) if err != nil { return } - t.Reset(time.Minute) } } -func newWebsocketCodec(ctx context.Context, c *websocket.Conn, host string, req http.Header) ServerCodec { - jsonWriter := func(v any) error { - return wsjson.Write(context.Background(), c, v) - } - jsonReader := func(v any) error { - c.SetReadLimit(wsMessageSizeLimit) - return wsjson.Read(context.Background(), c, v) - } - conn := websocket.NetConn(ctx, c, websocket.MessageText) +func newWebsocketCodec(ctx context.Context, c *websocket.Conn, host string, req http.Header) codec.ReaderWriter { wc := &websocketCodec{ - jsonCodec: NewFuncCodec(conn, jsonWriter, jsonReader, func() error { return nil }).(*jsonCodec), conn: c, pingReset: make(chan struct{}, 1), - info: PeerInfo{ + info: codec.PeerInfo{ Transport: "ws", }, + closed: make(chan any), } // Fill in connection details. wc.info.HTTP.Host = host @@ -185,22 +155,22 @@ func newWebsocketCodec(ctx context.Context, c *websocket.Conn, host string, req wc.info.HTTP.UserAgent = req.Get("User-Agent") wc.info.HTTP.Headers = req // Start pinger. - go heartbeat(ctx, c, wsPingInterval) + go heartbeat(ctx, c, WsPingInterval) return wc } func (wc *websocketCodec) Close() error { - wc.jsonCodec.Close() wc.conn.CloseRead(context.Background()) + close(wc.closed) return nil } -func (wc *websocketCodec) PeerInfo() PeerInfo { +func (wc *websocketCodec) PeerInfo() codec.PeerInfo { return wc.info } func (wc *websocketCodec) WriteJSON(ctx context.Context, v any) error { - err := wc.jsonCodec.WriteJSON(ctx, v) + err := wsjson.Write(ctx, wc.conn, v) if err == nil { // Notify pingLoop to delay the next idle ping. select { diff --git a/websocket_test.go b/codec/websocket/websocket_test.go similarity index 72% rename from websocket_test.go rename to codec/websocket/websocket_test.go index 8d0c2009370cbea783df0578e97be423192d342b..0e6b8dd1605d35594cace95a1b89d433c544a48f 100644 --- a/websocket_test.go +++ b/codec/websocket/websocket_test.go @@ -1,20 +1,4 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc +package websocket_test import ( "context" @@ -22,12 +6,18 @@ import ( "net/http/httptest" "strings" "testing" + + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/codec" + "gfx.cafe/open/jrpc/codec/websocket" + "gfx.cafe/open/jrpc/jmux" + "gfx.cafe/open/jrpc/jrpctest" ) func TestWebsocketClientHeaders(t *testing.T) { t.Parallel() - endpoint, header, err := wsClientHeaders("wss://testuser:test-PASS_01@example.com:1234", "https://example.com") + endpoint, header, err := websocket.WsClientHeaders("wss://testuser:test-PASS_01@example.com:1234", "https://example.com") if err != nil { t.Fatalf("wsGetConfig failed: %s", err) } @@ -47,25 +37,25 @@ func TestWebsocketOriginCheck(t *testing.T) { t.Parallel() var ( - srv = newTestServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"http://example.com"})) + srv = jrpctest.NewTestServer() + httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"http://example.com"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - client, err := DialWebsocket(context.Background(), wsURL, "http://ekzample.com") + client, err := websocket.DialWebsocket(context.Background(), wsURL, "http://ekzample.com") if err == nil { client.Close() t.Fatal("no error for wrong origin") } - wantErr := wsHandshakeError{errors.New("403"), "403 Forbidden"} + wantErr := websocket.NewHandshakeError(errors.New("403"), "403 Forbidden") if !strings.Contains(err.Error(), wantErr.Error()) { t.Fatalf("wrong error for wrong origin: got: '%q', want: '%s'", err, wantErr) } // Connections without origin header should work. - client, err = DialWebsocket(context.Background(), wsURL, "") + client, err = websocket.DialWebsocket(context.Background(), wsURL, "") if err != nil { t.Fatalf("error for empty origin: %v", err) } @@ -77,23 +67,23 @@ func TestWebsocketLargeCall(t *testing.T) { t.Parallel() var ( - srv = newTestServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"})) + srv = jrpctest.NewTestServer() + httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, []string{"*"})) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - client, err := DialWebsocket(context.Background(), wsURL, "") + client, err := websocket.DialWebsocket(context.Background(), wsURL, "") if err != nil { t.Fatalf("can't dial: %v", err) } defer client.Close() // This call sends slightly less than the limit and should work. - var result echoResult - arg := strings.Repeat("x", maxRequestContentLength-200) - if err := client.Call(nil, &result, "test_echo", arg, 1); err != nil { + var result jrpctest.EchoResult + arg := strings.Repeat("x", websocket.MaxRequestContentLength-200) + if err := client.Do(nil, &result, "test_echo", []any{arg, 1}); err != nil { t.Fatalf("valid call didn't work: %v", err) } if result.String != arg { @@ -101,8 +91,8 @@ func TestWebsocketLargeCall(t *testing.T) { } // This call sends twice the allowed size and shouldn't work. - arg = strings.Repeat("x", maxRequestContentLength*2) - err = client.Call(nil, &result, "test_echo", arg) + arg = strings.Repeat("x", websocket.MaxRequestContentLength*2) + err = client.Do(nil, &result, "test_echo", []any{arg}) if err == nil { t.Fatal("no error for too large call") } @@ -110,22 +100,22 @@ func TestWebsocketLargeCall(t *testing.T) { func TestWebsocketPeerInfo(t *testing.T) { var ( - s = newTestServer() - ts = httptest.NewServer(s.WebsocketHandler([]string{"origin.example.com"})) + s = jrpctest.NewTestServer() + ts = httptest.NewServer(websocket.WebsocketHandler(s, []string{"origin.example.com"})) tsurl = "ws:" + strings.TrimPrefix(ts.URL, "http:") ) defer s.Stop() defer ts.Close() ctx := context.Background() - c, err := DialWebsocket(ctx, tsurl, "http://origin.example.com") + c, err := websocket.DialWebsocket(ctx, tsurl, "http://origin.example.com") if err != nil { t.Fatal(err) } // Request peer information. - var connInfo PeerInfo - if err := c.Call(nil, &connInfo, "test_peerInfo"); err != nil { + var connInfo codec.PeerInfo + if err := c.Do(nil, &connInfo, "test_peerInfo", []any{}); err != nil { t.Fatal(err) } @@ -145,24 +135,25 @@ func TestWebsocketPeerInfo(t *testing.T) { // This checks that the websocket transport can deal with large messages. func TestClientWebsocketLargeMessage(t *testing.T) { + mux := jmux.NewMux() var ( - srv = NewServer() - httpsrv = httptest.NewServer(srv.WebsocketHandler(nil)) + srv = jrpc.NewServer(mux) + httpsrv = httptest.NewServer(websocket.WebsocketHandler(srv, nil)) wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") ) defer srv.Stop() defer httpsrv.Close() - respLength := wsMessageSizeLimit - 50 - srv.Router().RegisterStruct("test", largeRespService{respLength}) + respLength := websocket.WsMessageSizeLimit - 50 + mux.RegisterStruct("test", jrpctest.LargeRespService{Length: respLength}) - c, err := DialWebsocket(context.Background(), wsURL, "") + c, err := websocket.DialWebsocket(context.Background(), wsURL, "") if err != nil { t.Fatal(err) } var r string - if err := c.Call(nil, &r, "test_largeResp"); err != nil { + if err := c.Do(nil, &r, "test_largeResp", nil); err != nil { t.Fatal("call failed:", err) } if len(r) != respLength { diff --git a/wsjson/writer_test.go b/codec/websocket/wsjson/writer_test.go similarity index 100% rename from wsjson/writer_test.go rename to codec/websocket/wsjson/writer_test.go diff --git a/wsjson/wsjson.go b/codec/websocket/wsjson/wsjson.go similarity index 100% rename from wsjson/wsjson.go rename to codec/websocket/wsjson/wsjson.go diff --git a/json_wire.go b/codec/wire.go similarity index 86% rename from json_wire.go rename to codec/wire.go index 6df978cd9caee613b7ab3b8be976dadab98edf8c..ea2d5e82c44a35cdd4c42e17a2216d33e88a95c8 100644 --- a/json_wire.go +++ b/codec/wire.go @@ -1,4 +1,4 @@ -package jrpc +package codec import ( "fmt" @@ -8,31 +8,31 @@ import ( ) // Version represents a JSON-RPC version. -const Version = "2.0" +const VersionString = "2.0" // version is a special 0 sized struct that encodes as the jsonrpc version tag. // // It will fail during decode if it is not the correct version tag in the stream. -type version struct{} +type Version struct{} // compile time check whether the version implements a json.Marshaler and json.Unmarshaler interfaces. var ( - _ json.Marshaler = (*version)(nil) - _ json.Unmarshaler = (*version)(nil) + _ json.Marshaler = (*Version)(nil) + _ json.Unmarshaler = (*Version)(nil) ) // MarshalJSON implements json.Marshaler. -func (version) MarshalJSON() ([]byte, error) { - return []byte(`"` + Version + `"`), nil +func (Version) MarshalJSON() ([]byte, error) { + return []byte(`"` + VersionString + `"`), nil } // UnmarshalJSON implements json.Unmarshaler. -func (version) UnmarshalJSON(data []byte) error { +func (Version) UnmarshalJSON(data []byte) error { version := "" if err := json.Unmarshal(data, &version); err != nil { return fmt.Errorf("failed to Unmarshal: %w", err) } - if version != Version { + if version != VersionString { return fmt.Errorf("invalid RPC version %v", version) } return nil @@ -90,6 +90,12 @@ func (id *ID) Format(f fmt.State, r rune) { fmt.Fprintf(f, numF, id.number) } } +func (id *ID) IsNull() bool { + if id == nil { + return true + } + return id.null +} // get the raw message func (id *ID) RawMessage() json.RawMessage { @@ -97,10 +103,10 @@ func (id *ID) RawMessage() json.RawMessage { return nil } if id == nil { - return null + return Null } if id.null { - return null + return Null } if id.name != "" { return json.RawMessage(`"` + id.name + `"`) diff --git a/conn.go b/conn.go index 0e3166238317d37e374beaf5bf1dc2deb647665d..61d2d6cb6f553a3cd6c8842053080448ae785e2c 100644 --- a/conn.go +++ b/conn.go @@ -2,8 +2,6 @@ package jrpc import "context" -var _ Conn = (*Client)(nil) - type Conn interface { Do(ctx context.Context, result any, method string, params any) error BatchCall(ctx context.Context, b ...BatchElem) error @@ -11,11 +9,10 @@ type Conn interface { Close() error } -type SubscriptionConn interface { +type StreamingConn interface { Conn Notify(ctx context.Context, method string, args ...any) error - Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*ClientSubscription, error) } // BatchElem is an element in a batch request. diff --git a/errors.go b/errors.go deleted file mode 100644 index 3291846d0d76deb5b9dd062eb83733fd44c48fdb..0000000000000000000000000000000000000000 --- a/errors.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc - -import ( - "fmt" -) - -// HTTPError is returned by client operations when the HTTP status code of the -// response is not a 2xx status. -type HTTPError struct { - StatusCode int - Status string - Body []byte -} - -func (err HTTPError) Error() string { - if len(err.Body) == 0 { - return err.Status - } - return fmt.Sprintf("%v: %s", err.Status, err.Body) -} - -// Error wraps RPC errors, which contain an error code in addition to the message. -type Error interface { - Error() string // returns the message - ErrorCode() int // returns the code -} - -// A DataError contains some data in addition to the error message. -type DataError interface { - Error() string // returns the message - ErrorData() any // returns the error data -} - -type JrpcErr struct { - Data any -} - -func (j *JrpcErr) ErrorData() any { - return j.Data -} - -func (j *JrpcErr) Error() string { - return "Jrpc Error" -} - -func (j *JrpcErr) ErrorCode() int { - return jrpcErrorCode -} - -func WrapJrpcErr(err error) error { - if err == nil { - return nil - } - return fmt.Errorf("%w: %w", &JrpcErr{}, err) -} - -func MakeJrpcErr(s string) error { - return fmt.Errorf("%w: %s", &JrpcErr{}, s) -} - -// Error types defined below are the built-in JSON-RPC errors. - -var ( - _ Error = new(methodNotFoundError) - _ Error = new(subscriptionNotFoundError) - _ Error = new(parseError) - _ Error = new(invalidRequestError) - _ Error = new(invalidMessageError) - _ Error = new(invalidParamsError) -) - -const defaultErrorCode = -32000 - -const applicationErrorCode = -32080 - -const jrpcErrorCode = -42000 - -type methodNotFoundError struct{ method string } - -func (e *methodNotFoundError) ErrorCode() int { return -32601 } - -func (e *methodNotFoundError) Error() string { - return fmt.Sprintf("the method %s does not exist/is not available", e.method) -} - -type subscriptionNotFoundError struct{ namespace, subscription string } - -func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } - -func (e *subscriptionNotFoundError) Error() string { - return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) -} - -// Invalid JSON was received by the server. -type parseError struct{ message string } - -func (e *parseError) ErrorCode() int { return -32700 } - -func (e *parseError) Error() string { return e.message } - -// received message isn't a valid request -type invalidRequestError struct{ message string } - -func (e *invalidRequestError) ErrorCode() int { return -32600 } - -func (e *invalidRequestError) Error() string { return e.message } - -// received message is invalid -type invalidMessageError struct{ message string } - -func (e *invalidMessageError) ErrorCode() int { return -32700 } - -func (e *invalidMessageError) Error() string { return e.message } - -func NewInvalidParamsError(message string) *invalidMessageError { - return &invalidMessageError{ - message: message, - } -} - -// unable to decode supplied params, or an invalid number of parameters -type invalidParamsError struct{ message string } - -func (e *invalidParamsError) ErrorCode() int { return -32602 } - -func (e *invalidParamsError) Error() string { return e.message } diff --git a/go.mod b/go.mod index 19f26af42f7f7e97eaed7a6b3024175cb25afafc..585b33da8127170ef5ef828229ca3282f002667b 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce nhooyr.io/websocket v1.8.7 sigs.k8s.io/yaml v1.3.0 - tuxpa.in/a/zlog v1.60.0 + tuxpa.in/a/zlog v1.61.0 ) require ( @@ -37,7 +37,7 @@ require ( github.com/markbates/oncer v1.0.0 // indirect github.com/markbates/safe v1.0.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-isatty v0.0.18 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/rs/zerolog v1.29.0 // indirect @@ -47,7 +47,7 @@ require ( github.com/tklauser/numcpus v0.6.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect golang.org/x/crypto v0.6.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/sys v0.7.0 // indirect golang.org/x/term v0.5.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index e23766a528ece2824122b66b284fa61140afba07..44d54d869097fd5f6c975dff574efa5462f97b43 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,7 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -286,6 +287,8 @@ github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peK github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= +github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -559,6 +562,9 @@ golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -773,3 +779,5 @@ tuxpa.in/a/zlog v1.50.0 h1:4XVCBpOT/jtPoJ346Sp6jomLfcLWXI7MMazl+QjHPNw= tuxpa.in/a/zlog v1.50.0/go.mod h1:kwk2puLClsPPQhqtjG2OEV0Y4sjN0BuSRn7AYtACzjI= tuxpa.in/a/zlog v1.60.0 h1:bU4wJk6nwvaFsKIvKKxgGM0uO+Z2kaE8LzgRiQ4NCRw= tuxpa.in/a/zlog v1.60.0/go.mod h1:1t8SX1a4zLy+p6ylGn6m1ZXnssTPr/2ErdPjjSP+C2k= +tuxpa.in/a/zlog v1.61.0 h1:7wrS6G4QwpnOmgHRQknrr7IgiMXrfGpekkU0PjM9FhE= +tuxpa.in/a/zlog v1.61.0/go.mod h1:CNpMe8laDHLSypx/DyxfX1S0oyxUydeo3aGTEbtRBhg= diff --git a/handler.go b/handler.go index 50a74b049bc6e95429f8aab116f7e0544b9da0d8..e72baebaca67d878af2b8b4b707699c49852e68f 100644 --- a/handler.go +++ b/handler.go @@ -19,64 +19,41 @@ package jrpc import ( "context" "encoding/json" - "strings" "sync" - "time" + "gfx.cafe/open/jrpc/codec" "tuxpa.in/a/zlog" ) -// handler handles JSON-RPC messages. There is one handler per connection. Note that -// handler is not safe for concurrent use. Message handling never blocks indefinitely -// because RPCs are processed on background goroutines launched by handler. -// -// The entry points for incoming messages are: -// -// h.handleMsg(message) -// h.handleBatch(message) -// -// Outgoing calls use the requestOp struct. Register the request before sending it -// on the connection: -// -// op := &requestOp{ids: ...} -// h.addRequestOp(op) -// -// Now send the request, then wait for the reply to be delivered through handleMsg: -// -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } +type requestOp struct { + ids []json.RawMessage + err error + resp chan json.RawMessage // receives up to len(ids) responses +} + type handler struct { reg Handler respWait map[string]*requestOp // active client requests callWG sync.WaitGroup // pending call goroutines rootCtx context.Context // canceled by close() cancelRoot func() // cancel function for rootCtx - conn JsonWriter // where responses will be sent + conn codec.Writer // where responses will be sent log *zlog.Logger - subLock sync.RWMutex - clientSubs map[string]*ClientSubscription // active client subscriptions - serverSubs map[SubID]*Subscription - unsubscribeCb func(ctx context.Context, id SubID) - - peer PeerInfo + peer codec.PeerInfo } type callProc struct { - ctx context.Context - notifiers []*Notifier + ctx context.Context } -func newHandler(connCtx context.Context, conn JsonWriter, reg Handler) *handler { +func newHandler(connCtx context.Context, conn codec.Writer, reg Handler) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ peer: PeerInfoFromContext(connCtx), reg: reg, conn: conn, respWait: make(map[string]*requestOp), - clientSubs: map[string]*ClientSubscription{}, - serverSubs: map[SubID]*Subscription{}, rootCtx: rootCtx, cancelRoot: cancelRoot, log: zlog.Ctx(connCtx), @@ -89,58 +66,48 @@ func newHandler(connCtx context.Context, conn JsonWriter, reg Handler) *handler } // handleBatch executes all messages in a batch and returns the responses. -func (h *handler) handleBatch(msgs []*jsonrpcMessage) { +func (h *handler) handleBatch(msgs []json.RawMessage) { // Emit error response for empty batches: if len(msgs) == 0 { h.startCallProc(func(cp *callProc) { - h.conn.WriteJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) + h.conn.WriteJSON(cp.ctx, codec.ErrorMessage(codec.NewInvalidRequestError("empty batch"))) }) return } // Handle non-call messages first: - calls := make([]*jsonrpcMessage, 0, len(msgs)) + calls := make([]json.RawMessage, 0, len(msgs)) for _, msg := range msgs { - if handled := h.handleImmediate(msg); !handled { - calls = append(calls, msg) - } + //TODO: filter immediate handled + //if handled := h.handleImmediate(msg); !handled { + calls = append(calls, msg) + //} } if len(calls) == 0 { return } + // TODO: implement this // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { - answers := make([]*jsonrpcMessage, 0, len(msgs)) + answers := make([]*json.RawMessage, 0, len(msgs)) for _, msg := range calls { - r := NewMsgRequest(cp.ctx, h.peer, *msg) - if answer := h.handleCallMsg(cp, r); answer != nil { - answers = append(answers, answer.Msg()) - } + h.log.Info().Any("payload", msg).Msg("received call") } - h.addSubscriptions(cp.notifiers) if len(answers) > 0 { h.conn.WriteJSON(cp.ctx, answers) } - for _, n := range cp.notifiers { - n.activate() - } }) } // handleMsg handles a single message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { - if ok := h.handleImmediate(msg); ok { - return - } +func (h *handler) handleMsg(msg *codec.Message) { + // TODO: implement this h.startCallProc(func(cp *callProc) { - r := NewMsgRequest(cp.ctx, h.peer, *msg) - answer := h.handleCallMsg(cp, r) - h.addSubscriptions(cp.notifiers) - if answer != nil { - h.conn.WriteJSON(cp.ctx, answer) - } - for _, n := range cp.notifiers { - n.activate() - } + // r := NewMsgRequest(cp.ctx, h.peer, *msg) + //r := &Request{} + //answer := h.handleCallMsg(cp, r) + //if answer != nil { + // h.conn.WriteJSON(cp.ctx, answer) + //} }) } @@ -150,7 +117,6 @@ func (h *handler) close(err error, inflightReq *requestOp) { h.cancelAllRequests(err, inflightReq) h.callWG.Wait() h.cancelRoot() - h.cancelServerSubscriptions(err) } // addRequestOp registers a request operation. @@ -196,107 +162,8 @@ func (h *handler) startCallProc(fn func(*callProc)) { }() } -// handleImmediate executes non-call messages. It returns false if the message is a -// call or requires a reply. -func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { - start := time.Now() - switch { - case msg.isNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { - h.handleSubscriptionResult(msg) - return true - } - return false - case msg.isResponse(): - h.handleResponse(msg.toResponse()) - h.log.Trace().Str("reqid", string(msg.ID.RawMessage())).Dur("duration", time.Since(start)).Msg("Handled RPC response") - return true - default: - return false - } -} - -func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { - var result subscriptionResult - if err := json.Unmarshal(msg.Params, &result); err != nil { - h.log.Trace().Msg("Dropping invalid subscription message") - return - } - if h.clientSubs[result.ID] != nil { - h.clientSubs[result.ID].deliver(result.Result) - } -} - -func (h *handler) handleResponse(msg *Response) { - op := h.respWait[string(msg.ID.RawMessage())] - if op == nil { - h.log.Debug().Str("reqid", string(msg.ID.RawMessage())).Msg("Unsolicited RPC response") - return - } - delete(h.respWait, string(msg.ID.RawMessage())) - if op.sub == nil { - // not a sub, so just send the msg back - op.resp <- msg.Msg() - return - } - // For subscription responses, start the subscription if the server - // indicates success. EthSubscribe gets unblocked in either case through - // the op.resp channel. - defer close(op.resp) - if msg.Error != nil { - op.err = msg.Error - return - } - if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.start() - h.clientSubs[op.sub.subid] = op.sub - } -} - -// handleCallMsg executes a call message and returns the answer. -// TODO: export prometheus metrics maybe? also fix logging -func (h *handler) handleCallMsg(ctx *callProc, r *Request) *Response { - switch { - case r.isNotification(): - go h.handleCall(ctx, r) - return nil - case r.isCall(): - resp := h.handleCall(ctx, r) - return resp - case r.hasValidID(): - return r.errorResponse(&invalidRequestError{"invalid request"}) - default: - res := r.errorResponse(&invalidRequestError{"invalid request"}) - res.ID = NewNullIDPtr() - return res - } -} - func (h *handler) handleCall(cp *callProc, r *Request) *Response { mw := NewReaderResponseWriterMsg(r) - //if r.isSubscribe() { - // return h.handleSubscribe(cp, r) - //} - //if r.isUnsubscribe() { - // var ans SubID - // err := r.ParamArray(&ans) - // if err != nil { - // mw.Send(nil, &invalidParamsError{message: "subscription not found"}) - // return mw.Response() - // } - // val, err := h.unsubscribe(r.ctx, ans) - // if err != nil { - // mw.Send(nil, err) - // } - // mw.Send(val, nil) - // return mw.Response() - //} - // no method found - //if !callb { - // mw.Send(nil, &methodNotFoundError{method: r.Method}) - // return mw.Response() - //} - // now actually run the handler h.reg.ServeRPC(mw, r) return mw.Response() } diff --git a/jrpc.go b/jrpc.go index 20b6734255ed0c6e03887356b3df56b8467a4254..65785b6b93a938a8cfc68c1eb2de927832013d0e 100644 --- a/jrpc.go +++ b/jrpc.go @@ -1,31 +1,9 @@ package jrpc import ( - "context" "net/http" ) -// ServerCodec implements reading, parsing and writing RPC messages for the server side of -// a RPC session. Implementations must be go-routine safe since the codec can be called in -// multiple go-routines concurrently. -type ServerCodec interface { - PeerInfo() PeerInfo - ReadBatch() (msgs []*jsonrpcMessage, isBatch bool, err error) - Close() error - - JsonWriter -} - -// jsonWriter can write JSON messages to its underlying connection. -// Implementations must be safe for concurrent use. -type JsonWriter interface { - WriteJSON(context.Context, any) error - // Closed returns a channel which is closed when the connection is closed. - Closed() <-chan any - // RemoteAddr returns the peer address of the connection. - RemoteAddr() string -} - // http.handler, but for jrpc type Handler interface { ServeRPC(w ResponseWriter, r *Request) diff --git a/jrpctest/server.go b/jrpctest/server.go new file mode 100644 index 0000000000000000000000000000000000000000..e99447731a1b660c259f2691598a5adb4f6eba10 --- /dev/null +++ b/jrpctest/server.go @@ -0,0 +1,33 @@ +package jrpctest + +import ( + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/jmux" +) + +func NewTestServer() *jrpc.Server { + mux := jmux.NewRouter() + server := jrpc.NewServer(mux) + mux.HandleFunc("testservice_subscribe", func(w jrpc.ResponseWriter, r *jrpc.Request) { + sub, err := jrpc.UpgradeToSubscription(w, r) + w.Send(sub, err) + if err != nil { + return + } + idx := 0 + for { + err := w.Notify(idx) + if err != nil { + return + } + idx = idx + 1 + } + }) + if err := mux.RegisterStruct("test", new(testService)); err != nil { + panic(err) + } + if err := mux.RegisterStruct("nftest", new(notificationTestService)); err != nil { + panic(err) + } + return server +} diff --git a/jrpctest/services.go b/jrpctest/services.go new file mode 100644 index 0000000000000000000000000000000000000000..008d8d498e501cbc6122938dedcf6b2d2e680005 --- /dev/null +++ b/jrpctest/services.go @@ -0,0 +1,168 @@ +package jrpctest + +import ( + "context" + "errors" + "strings" + "time" + + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/codec" +) + +type testService struct{} + +type echoArgs struct { + S string +} + +type EchoResult struct { + String string + Int int + Args *echoArgs +} + +type testError struct{} + +func (testError) Error() string { return "testError" } +func (testError) ErrorCode() int { return 444 } +func (testError) ErrorData() any { return "testError data" } + +func (s *testService) NoArgsRets() {} + +func (s *testService) EchoAny(n any) any { + return n +} + +func (s *testService) Echo(str string, i int, args *echoArgs) EchoResult { + return EchoResult{str, i, args} +} + +func (s *testService) EchoWithCtx(ctx context.Context, str string, i int, args *echoArgs) EchoResult { + return EchoResult{str, i, args} +} + +func (s *testService) PeerInfo(ctx context.Context) codec.PeerInfo { + return jrpc.PeerInfoFromContext(ctx) +} + +func (s *testService) Sleep(ctx context.Context, duration time.Duration) { + time.Sleep(duration) +} + +func (s *testService) Block(ctx context.Context) error { + <-ctx.Done() + return errors.New("context canceled in testservice_block") +} + +func (s *testService) Rets() (string, error) { + return "", nil +} + +//lint:ignore ST1008 returns error first on purpose. +func (s *testService) InvalidRets1() (error, string) { + return nil, "" +} + +func (s *testService) InvalidRets2() (string, string) { + return "", "" +} + +func (s *testService) InvalidRets3() (string, string, error) { + return "", "", nil +} + +func (s *testService) ReturnError() error { + return testError{} +} + +func (s *testService) CallMeBack(ctx context.Context, method string, args []any) (any, error) { + c, ok := jrpc.ClientFromContext(ctx) + if !ok { + return nil, errors.New("no client") + } + var result any + err := c.Call(nil, &result, method, args...) + return result, err +} + +func (s *testService) CallMeBackLater(ctx context.Context, method string, args []any) error { + c, ok := jrpc.ClientFromContext(ctx) + if !ok { + return errors.New("no client") + } + go func() { + <-ctx.Done() + var result any + c.Call(nil, &result, method, args...) + }() + return nil +} + +type notificationTestService struct { + unsubscribed chan string + gotHangSubscriptionReq chan struct{} + unblockHangSubscription chan struct{} +} + +func (s *notificationTestService) Echo(i int) int { + return i +} + +func (s *notificationTestService) Unsubscribe(subid string) { + if s.unsubscribed != nil { + s.unsubscribed <- subid + } +} + +//func (s *notificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { +// notifier, supported := jrpc.NotifierFromContext(ctx) +// if !supported { +// return nil, jrpc.ErrNotificationsUnsupported +// } +// +// // By explicitly creating an subscription we make sure that the subscription id is send +// // back to the client before the first subscription.Notify is called. Otherwise the +// // events might be send before the response for the *_subscribe method. +// subscription := notifier.CreateSubscription() +// go func() { +// for i := 0; i < n; i++ { +// if err := notifier.Notify(subscription.ID, val+i); err != nil { +// return +// } +// } +// select { +// case <-notifier.Closed(): +// case <-subscription.Err(): +// } +// if s.unsubscribed != nil { +// s.unsubscribed <- string(subscription.ID) +// } +// }() +// return subscription, nil +//} +// +//// HangSubscription blocks on s.unblockHangSubscription before sending anything. +//func (s *notificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { +// notifier, supported := NotifierFromContext(ctx) +// if !supported { +// return nil, ErrNotificationsUnsupported +// } +// s.gotHangSubscriptionReq <- struct{}{} +// <-s.unblockHangSubscription +// subscription := notifier.CreateSubscription() +// +// go func() { +// notifier.Notify(subscription.ID, val) +// }() +// return subscription, nil +//} + +// largeRespService generates arbitrary-size JSON responses. +type LargeRespService struct { + Length int +} + +func (x LargeRespService) LargeResp() string { + return strings.Repeat("x", x.Length) +} diff --git a/json_codec.go b/json_codec.go deleted file mode 100644 index c9666a9c544be62c7fc1192a45a1106995364d79..0000000000000000000000000000000000000000 --- a/json_codec.go +++ /dev/null @@ -1,129 +0,0 @@ -package jrpc - -import ( - "context" - "encoding/json" - stdjson "encoding/json" - "io" - "sync" - "time" -) - -// DeadlineConn is a subset of the methods of net.Conn which are sufficient for creating a jsonCodec -type DeadlineConn interface { - io.ReadWriteCloser - SetWriteDeadline(time.Time) error -} - -// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has -// support for parsing arguments and serializing (result) objects. -type jsonCodec struct { - remote string - closer sync.Once // close closed channel once - closeFunc func() error - closeCh chan any // closed on Close - decode func(v any) error // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode func(v any) error // encoder to allow multiple transports - conn DeadlineConn -} - -// NewFuncCodec creates a codec which uses the given functions to read and write. If conn -// implements ConnRemoteAddr, log messages will use it to include the remote address of -// the connection. -func NewFuncCodec( - conn DeadlineConn, - encode, decode func(v any) error, - closeFunc func() error, -) ServerCodec { - codec := &jsonCodec{ - closeFunc: closeFunc, - closeCh: make(chan any), - encode: encode, - decode: decode, - conn: conn, - } - if ra, ok := conn.(interface{ RemoteAddr() string }); ok { - codec.remote = ra.RemoteAddr() - } - return codec -} - -// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log -// messages will use it to include the remote address of the connection. -func NewCodec(conn DeadlineConn) ServerCodec { - encr := func(v any) error { - enc := jzon.BorrowStream(conn) - defer jzon.ReturnStream(enc) - enc.WriteVal(v) - enc.WriteRaw("\n") - enc.Flush() - if enc.Error != nil { - return enc.Error - } - return nil - } - // TODO: - // for some reason other json decoders are incompatible with our test suite - // pretty sure its how we handle EOFs and stuff - dec := stdjson.NewDecoder(conn) - dec.UseNumber() - return NewFuncCodec(conn, encr, dec.Decode, func() error { - return nil - }) -} - -func (c *jsonCodec) PeerInfo() PeerInfo { - // This returns "ipc" because all other built-in transports have a separate codec type. - return PeerInfo{Transport: "ipc", RemoteAddr: c.remote} -} - -func (c *jsonCodec) RemoteAddr() string { - return c.remote -} - -func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err error) { - // Decode the next JSON object in the input stream. - // This verifies basic syntax, etc. - var rawmsg json.RawMessage - if err := c.decode(&rawmsg); err != nil { - return nil, false, err - } - messages, batch = parseMessage(rawmsg) - for i, msg := range messages { - if msg == nil { - // Message is JSON 'null'. Replace with zero value so it - // will be treated like any other invalid message. - messages[i] = new(jsonrpcMessage) - } - } - return messages, batch, nil -} - -func (c *jsonCodec) WriteJSON(ctx context.Context, v any) error { - c.encMu.Lock() - defer c.encMu.Unlock() - - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(defaultWriteTimeout) - } - c.conn.SetWriteDeadline(deadline) - return c.encode(v) -} - -func (c *jsonCodec) Close() error { - c.closer.Do(func() { - close(c.closeCh) - if c.closeFunc != nil { - c.closeFunc() - } - c.conn.Close() - }) - return nil -} - -// Closed returns a channel which will be closed when Close is called -func (c *jsonCodec) Closed() <-chan any { - return c.closeCh -} diff --git a/request.go b/request.go index a2c31bfc5718a2647987cefa659a9127b65dc2c0..c0fa655a0d1dfaa0bb66576ff9090c0d53654c8b 100644 --- a/request.go +++ b/request.go @@ -2,82 +2,54 @@ package jrpc import ( "context" - "strings" + "gfx.cafe/open/jrpc/codec" json "github.com/goccy/go-json" jsoniter "github.com/json-iterator/go" ) +var jpool = jsoniter.NewIterator(jsoniter.ConfigCompatibleWithStandardLibrary).Pool() + type Request struct { - Version version `json:"jsonrpc"` - ID *ID `json:"id,omitempty"` + Version codec.Version `json:"jsonrpc"` + ID *codec.ID `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` - - Peer PeerInfo `json:"-"` + Peer codec.PeerInfo `json:"-"` ctx context.Context } -func NewMsgRequest(ctx context.Context, peer PeerInfo, msg jsonrpcMessage) *Request { - r := &Request{ctx: ctx} - r.ID = msg.ID - r.Method = msg.Method - r.Params = msg.Params - r.Peer = peer - r.ctx = ctx - return r -} - func NewRequest(ctx context.Context, id string, method string, params any) *Request { r := &Request{ctx: ctx} pms, _ := json.Marshal(params) - r.ID = NewStringIDPtr(id) + r.ID = codec.NewStringIDPtr(id) r.Method = method r.Params = pms return r } -func (r *Request) ParamSlice() []any { - var params []any - json.Unmarshal(r.Params, ¶ms) - return params -} - -func (r *Request) makeError(err error) *jsonrpcMessage { +func (r *Request) makeError(err error) *codec.Message { m := r.Msg() - return m.errorResponse(err) + return m.ErrorResponse(err) } -// DEPRECATED -// TODO: use our router to do this? jrpc.Namespace(string) (string, string) maybe? -func (r *Request) namespace() string { - elem := strings.SplitN(r.Method, serviceMethodSeparator, 2) - return elem[0] -} func (r *Request) errorResponse(err error) *Response { mw := NewReaderResponseWriterMsg(r) mw.Send(nil, err) return mw.Response() } -func (r *Request) isSubscribe() bool { - return strings.HasSuffix(r.Method, subscribeMethodSuffix) -} -func (r *Request) isUnsubscribe() bool { - return strings.HasSuffix(r.Method, unsubscribeMethodSuffix) -} func (r *Request) isNotification() bool { return r.ID == nil && len(r.Method) > 0 } + func (r *Request) isCall() bool { return r.hasValidID() && len(r.Method) > 0 } -func (r *Request) isResponse() bool { - return false -} + func (r *Request) hasValidID() bool { - return r.ID != nil && !r.ID.null + return r.ID != nil && !r.ID.IsNull() } func (r *Request) ParamArray(a ...any) error { @@ -104,8 +76,8 @@ func (r *Request) Context() context.Context { return r.ctx } -func (r *Request) Msg() jsonrpcMessage { - return jsonrpcMessage{ +func (r *Request) Msg() codec.Message { + return codec.Message{ ID: r.ID, Method: r.Method, Params: r.Params, @@ -131,8 +103,6 @@ func (r *Request) WithContext(ctx context.Context) *Request { return r2 } -var jpool = jsoniter.NewIterator(jsoniter.ConfigCompatibleWithStandardLibrary).Pool() - func (r *Request) Iter(fn func(j *jsoniter.Iterator) error) error { it := jpool.BorrowIterator(r.Params) defer jpool.ReturnIterator(it) diff --git a/response.go b/response.go index 839e51e63e4b27713711b3375ad5e3b02ee3db4c..97b3a9940c9246c239402568fe5686fe7dcfe712 100644 --- a/response.go +++ b/response.go @@ -2,53 +2,39 @@ package jrpc import ( "encoding/json" - "errors" "net/http" + + "gfx.cafe/open/jrpc/codec" ) type Response struct { - Version version `json:"jsonrpc,omitempty"` - ID *ID `json:"id,omitempty"` - Result json.RawMessage `json:"result,omitempty"` - Error *jsonError `json:"error,omitempty"` + Version codec.Version `json:"jsonrpc,omitempty"` + ID *codec.ID `json:"id,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *codec.JsonError `json:"error,omitempty"` } -func (r *Response) Msg() *jsonrpcMessage { - out := &jsonrpcMessage{} +func (r *Response) Msg() *codec.Message { + out := &codec.Message{} if r.ID != nil { out.ID = r.ID } if r.Error != nil { out.Error = r.Error - return out + } else { + out.Result = r.Result } - out.Result = r.Result return out } type ResponseWriterMsg struct { r *Request resp *Response - n *Notifier - s *Subscription - - //TODO: add options - // currently there are no useful options so i havent added any - // find a use case to add - options options } type options struct { } -func UpgradeToSubscription(w ResponseWriter, r *Request) (*Subscription, error) { - not, ok := NotifierFromContext(r.ctx) - if !ok || not == nil { - return nil, errors.New("subscription not supported") - } - return not.CreateSubscription(), nil -} - func NewReaderResponseWriterMsg(r *Request) *ResponseWriterMsg { rw := &ResponseWriterMsg{ r: r, @@ -56,12 +42,11 @@ func NewReaderResponseWriterMsg(r *Request) *ResponseWriterMsg { ID: r.ID, }, } - rw.n, _ = NotifierFromContext(r.ctx) return rw } func (w *ResponseWriterMsg) Header() http.Header { - wh := w.r.Peer.HTTP.WriteHeaders + wh := w.r.Peer.HTTP.Headers if wh == nil { wh = http.Header{} } @@ -73,32 +58,27 @@ func (w *ResponseWriterMsg) Option(k string, v any) { func (w *ResponseWriterMsg) Send(args any, e error) (err error) { if e != nil { - if c, ok := e.(*jsonError); ok { + if c, ok := e.(*codec.JsonError); ok { w.resp.Error = c } else { - w.resp.Error = &jsonError{ - Code: applicationErrorCode, + w.resp.Error = &codec.JsonError{ + Code: codec.ErrorCodeApplication, Message: e.Error(), } } - ec, ok := e.(Error) + ec, ok := e.(codec.Error) if ok { w.resp.Error.Code = ec.ErrorCode() } - de, ok := e.(DataError) + de, ok := e.(codec.DataError) if ok { w.resp.Error.Data = de.ErrorData() } return nil } - switch c := args.(type) { - case *Subscription: - w.s = c - default: - } - w.resp.Result, err = jzon.Marshal(args) + w.resp.Result, err = json.Marshal(args) if err != nil { - w.resp.Error = &jsonError{ + w.resp.Error = &codec.JsonError{ Code: -32603, Message: err.Error(), } @@ -107,24 +87,15 @@ func (w *ResponseWriterMsg) Send(args any, e error) (err error) { return nil } +// TODO: implement func (w *ResponseWriterMsg) Notify(args any) (err error) { - if w.n == nil { - w.n, _ = NotifierFromContext(w.r.ctx) - } - if w.s == nil || w.n == nil { - return ErrSubscriptionNotFound - } - bts, _ := json.Marshal(args) - err = w.n.send(w.s, bts) - if err != nil { - return err - } return nil } func (w *ResponseWriterMsg) Response() *Response { return w.resp } -func (w *ResponseWriterMsg) Msg() *jsonrpcMessage { + +func (w *ResponseWriterMsg) Msg() *codec.Message { return w.resp.Msg() } diff --git a/server.go b/server.go index 9959e1f2bf325fed652df2abeb6879854f21c5eb..b6d5e15a6655a65ca43e3db5482b9922a6abd6a1 100644 --- a/server.go +++ b/server.go @@ -2,21 +2,13 @@ package jrpc import ( "context" - "io" - "net/http" "sync/atomic" + "gfx.cafe/open/jrpc/codec" mapset "github.com/deckarep/golang-set" "tuxpa.in/a/zlog/log" ) -const ( - MetadataApi = "rpc" - EngineApi = "engine" -) - -// CodecOption specifies which type of messages a codec supports. - // Server is an RPC server. type Server struct { services Handler @@ -39,48 +31,21 @@ func NewServer(r Handler) *Server { // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes // the response back using the given codec. It will block until the codec is closed or the // server is stopped. In either case the codec is closed. -func (s *Server) ServeCodec(codec ServerCodec) { +func (s *Server) ServeCodec(codec codec.ReaderWriter) { defer codec.Close() // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { return } - // Add the codec to the set so it can be closed by Stop. s.codecs.Add(codec) defer s.codecs.Remove(codec) - c := initClient(codec, s.services) - <-codec.Closed() - c.Close() -} - -// serveSingleRequest reads and processes a single RPC request from the given codec. This -// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in -// this mode. -func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { - return - } - // create a new handler for this context - h := newHandler(ctx, codec, s.services) - defer h.close(io.EOF, nil) - - // read the HTTP body - reqs, batch, err := codec.ReadBatch() - if err != nil { - if err != io.EOF { - codec.WriteJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) - } - return - } - if batch { - h.handleBatch(reqs) - } else { - h.handleMsg(reqs[0]) - } + // TODO: handle this + // c := initClient(codec, s.services) + // <-codec.Closed() + // c.Close() } // Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending @@ -90,55 +55,19 @@ func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { log.Debug().Msg("RPC server shutting down") s.codecs.Each(func(c any) bool { - c.(ServerCodec).Close() + c.(codec.ReaderWriter).Close() return true }) } } -// Deprecated: RPCService gives meta information about the server. -// e.g. gives information about the loaded modules. -type RPCService struct { - server *Server -} - -// PeerInfo contains information about the remote end of the network connection. -// -// This is available within RPC method handlers through the context. Call -// PeerInfoFromContext to get information about the client connection related to -// the current method call. -type PeerInfo struct { - // Transport is name of the protocol used by the client. - // This can be "http", "ws" or "ipc". - Transport string - - // Address of client. This will usually contain the IP address and port. - RemoteAddr string - - // Addditional information for HTTP and WebSocket connections. - HTTP HttpInfo -} - -type HttpInfo struct { - // Protocol version, i.e. "HTTP/1.1". This is not set for WebSocket. - Version string - // Header values sent by the client. - UserAgent string - Origin string - Host string - - Headers http.Header - - WriteHeaders http.Header -} - type peerInfoContextKey struct{} // PeerInfoFromContext returns information about the client's network connection. // Use this with the context passed to RPC method handler functions. // // The zero value is returned if no connection info is present in ctx. -func PeerInfoFromContext(ctx context.Context) PeerInfo { - info, _ := ctx.Value(peerInfoContextKey{}).(PeerInfo) +func PeerInfoFromContext(ctx context.Context) codec.PeerInfo { + info, _ := ctx.Value(peerInfoContextKey{}).(codec.PeerInfo) return info } diff --git a/subscription/conn.go b/subscription/conn.go new file mode 100644 index 0000000000000000000000000000000000000000..3346cdc902e9abfab9564dcb2adf84c941783a5e --- /dev/null +++ b/subscription/conn.go @@ -0,0 +1,13 @@ +package subscription + +import ( + "context" + + "gfx.cafe/open/jrpc" +) + +type SubscriptionConn interface { + jrpc.StreamingConn + + Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*ClientSubscription, error) +} diff --git a/subscription.go b/subscription/subscription.go similarity index 99% rename from subscription.go rename to subscription/subscription.go index 527e6b0b46b33a2606423a02e2a584d8f3cba3a1..a3b2b66bacade40e967287503873309b2d39580b 100644 --- a/subscription.go +++ b/subscription/subscription.go @@ -1,4 +1,4 @@ -package jrpc +package subscription import ( "container/list" diff --git a/testservice_test.go b/testservice_test.go deleted file mode 100644 index bc1f919a415a7960ea91f8319e1ec36573a28f5b..0000000000000000000000000000000000000000 --- a/testservice_test.go +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc - -import ( - "context" - "errors" - "log" - "strings" - "time" -) - -func newTestServer() *Server { - server := NewServer() - server.Router().HandleFunc("testservice_subscribe", func(w ResponseWriter, r *Request) { - log.Println(r.Params) - sub, err := UpgradeToSubscription(w, r) - w.Send(sub, err) - if err != nil { - return - } - idx := 0 - for { - err := w.Notify(idx) - if err != nil { - return - } - idx = idx + 1 - } - }) - if err := server.Router().RegisterStruct("test", new(testService)); err != nil { - panic(err) - } - if err := server.Router().RegisterStruct("nftest", new(notificationTestService)); err != nil { - panic(err) - } - return server -} - -type testService struct{} - -type echoArgs struct { - S string -} - -type echoResult struct { - String string - Int int - Args *echoArgs -} - -type testError struct{} - -func (testError) Error() string { return "testError" } -func (testError) ErrorCode() int { return 444 } -func (testError) ErrorData() any { return "testError data" } - -func (s *testService) NoArgsRets() {} - -func (s *testService) EchoAny(n any) any { - return n -} - -func (s *testService) Echo(str string, i int, args *echoArgs) echoResult { - return echoResult{str, i, args} -} - -func (s *testService) EchoWithCtx(ctx context.Context, str string, i int, args *echoArgs) echoResult { - return echoResult{str, i, args} -} - -func (s *testService) PeerInfo(ctx context.Context) PeerInfo { - return PeerInfoFromContext(ctx) -} - -func (s *testService) Sleep(ctx context.Context, duration time.Duration) { - time.Sleep(duration) -} - -func (s *testService) Block(ctx context.Context) error { - <-ctx.Done() - return errors.New("context canceled in testservice_block") -} - -func (s *testService) Rets() (string, error) { - return "", nil -} - -//lint:ignore ST1008 returns error first on purpose. -func (s *testService) InvalidRets1() (error, string) { - return nil, "" -} - -func (s *testService) InvalidRets2() (string, string) { - return "", "" -} - -func (s *testService) InvalidRets3() (string, string, error) { - return "", "", nil -} - -func (s *testService) ReturnError() error { - return testError{} -} - -func (s *testService) CallMeBack(ctx context.Context, method string, args []any) (any, error) { - c, ok := ClientFromContext(ctx) - if !ok { - return nil, errors.New("no client") - } - var result any - err := c.Call(nil, &result, method, args...) - return result, err -} - -func (s *testService) CallMeBackLater(ctx context.Context, method string, args []any) error { - c, ok := ClientFromContext(ctx) - if !ok { - return errors.New("no client") - } - go func() { - <-ctx.Done() - var result any - c.Call(nil, &result, method, args...) - }() - return nil -} - -type notificationTestService struct { - unsubscribed chan string - gotHangSubscriptionReq chan struct{} - unblockHangSubscription chan struct{} -} - -func (s *notificationTestService) Echo(i int) int { - return i -} - -func (s *notificationTestService) Unsubscribe(subid string) { - if s.unsubscribed != nil { - s.unsubscribed <- subid - } -} - -func (s *notificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { - notifier, supported := NotifierFromContext(ctx) - if !supported { - return nil, ErrNotificationsUnsupported - } - - // By explicitly creating an subscription we make sure that the subscription id is send - // back to the client before the first subscription.Notify is called. Otherwise the - // events might be send before the response for the *_subscribe method. - subscription := notifier.CreateSubscription() - go func() { - for i := 0; i < n; i++ { - if err := notifier.Notify(subscription.ID, val+i); err != nil { - return - } - } - select { - case <-notifier.Closed(): - case <-subscription.Err(): - } - if s.unsubscribed != nil { - s.unsubscribed <- string(subscription.ID) - } - }() - return subscription, nil -} - -// HangSubscription blocks on s.unblockHangSubscription before sending anything. -func (s *notificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { - notifier, supported := NotifierFromContext(ctx) - if !supported { - return nil, ErrNotificationsUnsupported - } - s.gotHangSubscriptionReq <- struct{}{} - <-s.unblockHangSubscription - subscription := notifier.CreateSubscription() - - go func() { - notifier.Notify(subscription.ID, val) - }() - return subscription, nil -} - -// largeRespService generates arbitrary-size JSON responses. -type largeRespService struct { - length int -} - -func (x largeRespService) LargeResp() string { - return strings.Repeat("x", x.length) -} diff --git a/websocket_server.go b/websocket_server.go deleted file mode 100644 index e66c73324e64d64b37c6c5a360136356fbc9cbfd..0000000000000000000000000000000000000000 --- a/websocket_server.go +++ /dev/null @@ -1,38 +0,0 @@ -package jrpc - -import ( - "net/http" - "strings" -) - -type WebsocketServer struct { - s *Server -} - -func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if IsWebsocket(r) { - s.s.WebsocketHandler([]string{"*"}).ServeHTTP(w, r) - return - } - s.s.ServeHTTP(w, r) -} - -func IsWebsocket(r *http.Request) bool { - return strings.EqualFold(r.Header.Get("Upgrade"), "websocket") && - strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") -} - -func (s *Server) ServeHTTPWithWss(cb func(w http.ResponseWriter, r *http.Request) bool) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if cb != nil { - if cb(w, r) { - return - } - } - if IsWebsocket(r) { - s.WebsocketHandler([]string{"*"}).ServeHTTP(w, r) - return - } - s.ServeHTTP(w, r) - }) -}