From bb7dee78d8200932a1527b83b8335c66220fc376 Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Mon, 14 Nov 2022 17:48:17 -0600 Subject: [PATCH] more renaming --- json.go | 141 +++------------------------------------- json_codec.go | 129 ++++++++++++++++++++++++++++++++++++ wire.go => json_wire.go | 0 3 files changed, 138 insertions(+), 132 deletions(-) create mode 100644 json_codec.go rename wire.go => json_wire.go (100%) diff --git a/json.go b/json.go index 062db5c..77f48e8 100644 --- a/json.go +++ b/json.go @@ -2,14 +2,11 @@ package jrpc import ( "bytes" - "context" "errors" "fmt" - "io" "reflect" "strconv" "strings" - "sync" "time" stdjson "encoding/json" @@ -82,7 +79,6 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { } return resp } - func (msg *jsonrpcMessage) response(result any) *jsonrpcMessage { // do a funny marshaling enc, err := jzon.Marshal(result) @@ -138,123 +134,17 @@ func errorMessage(err error) *jsonrpcMessage { return msg } -// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec. -type Conn interface { - io.ReadWriteCloser - SetWriteDeadline(time.Time) error -} - -// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has -// support for parsing arguments and serializing (result) objects. -type jsonCodec struct { - remote string - closer sync.Once // close closed channel once - closeFunc func() error - closeCh chan any // closed on Close - decode func(v any) error // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode func(v any) error // encoder to allow multiple transports - conn Conn -} - -// NewFuncCodec creates a codec which uses the given functions to read and write. If conn -// implements ConnRemoteAddr, log messages will use it to include the remote address of -// the connection. -func NewFuncCodec( - conn Conn, - encode, decode func(v any) error, - closeFunc func() error, -) ServerCodec { - codec := &jsonCodec{ - closeFunc: closeFunc, - closeCh: make(chan any), - encode: encode, - decode: decode, - conn: conn, - } - if ra, ok := conn.(interface{ RemoteAddr() string }); ok { - codec.remote = ra.RemoteAddr() - } - return codec -} - -// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log -// messages will use it to include the remote address of the connection. -func NewCodec(conn Conn) ServerCodec { - encr := func(v any) error { - enc := jzon.BorrowStream(conn) - defer jzon.ReturnStream(enc) - enc.WriteVal(v) - enc.WriteRaw("\n") - enc.Flush() - if enc.Error != nil { - return enc.Error - } - return nil - } - // TODO: - // for some reason other json decoders are incompatible with our test suite - // pretty sure its how we handle EOFs and stuff - dec := stdjson.NewDecoder(conn) - dec.UseNumber() - return NewFuncCodec(conn, encr, dec.Decode, func() error { - return nil - }) -} - -func (c *jsonCodec) PeerInfo() PeerInfo { - // This returns "ipc" because all other built-in transports have a separate codec type. - return PeerInfo{Transport: "ipc", RemoteAddr: c.remote} -} - -func (c *jsonCodec) RemoteAddr() string { - return c.remote -} - -func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err error) { - // Decode the next JSON object in the input stream. - // This verifies basic syntax, etc. - var rawmsg json.RawMessage - if err := c.decode(&rawmsg); err != nil { - return nil, false, err - } - messages, batch = parseMessage(rawmsg) - for i, msg := range messages { - if msg == nil { - // Message is JSON 'null'. Replace with zero value so it - // will be treated like any other invalid message. - messages[i] = new(jsonrpcMessage) +// isBatch returns true when the first non-whitespace characters is '[' +func isBatch(raw json.RawMessage) bool { + for _, c := range raw { + // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) + switch c { + case 0x20, 0x09, 0x0a, 0x0d: + continue } + return c == '[' } - return messages, batch, nil -} - -func (c *jsonCodec) WriteJSON(ctx context.Context, v any) error { - c.encMu.Lock() - defer c.encMu.Unlock() - - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(defaultWriteTimeout) - } - c.conn.SetWriteDeadline(deadline) - return c.encode(v) -} - -func (c *jsonCodec) Close() error { - c.closer.Do(func() { - close(c.closeCh) - if c.closeFunc != nil { - c.closeFunc() - } - c.conn.Close() - }) - return nil -} - -// Closed returns a channel which will be closed when Close is called -func (c *jsonCodec) Closed() <-chan any { - return c.closeCh + return false } // parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error @@ -280,19 +170,6 @@ func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { return msgs, true } -// isBatch returns true when the first non-whitespace characters is '[' -func isBatch(raw json.RawMessage) bool { - for _, c := range raw { - // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) - switch c { - case 0x20, 0x09, 0x0a, 0x0d: - continue - } - return c == '[' - } - return false -} - // parsePositionalArguments tries to parse the given args to an array of values with the // given types. It returns the parsed values or an error when the args could not be // parsed. Missing optional arguments are returned as reflect.Zero values. diff --git a/json_codec.go b/json_codec.go new file mode 100644 index 0000000..1c4fda4 --- /dev/null +++ b/json_codec.go @@ -0,0 +1,129 @@ +package jrpc + +import ( + "context" + "encoding/json" + stdjson "encoding/json" + "io" + "sync" + "time" +) + +// Conn is a subset of the methods of net.Conn which are sufficient for creating a jsonCodec +type Conn interface { + io.ReadWriteCloser + SetWriteDeadline(time.Time) error +} + +// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has +// support for parsing arguments and serializing (result) objects. +type jsonCodec struct { + remote string + closer sync.Once // close closed channel once + closeFunc func() error + closeCh chan any // closed on Close + decode func(v any) error // decoder to allow multiple transports + encMu sync.Mutex // guards the encoder + encode func(v any) error // encoder to allow multiple transports + conn Conn +} + +// NewFuncCodec creates a codec which uses the given functions to read and write. If conn +// implements ConnRemoteAddr, log messages will use it to include the remote address of +// the connection. +func NewFuncCodec( + conn Conn, + encode, decode func(v any) error, + closeFunc func() error, +) ServerCodec { + codec := &jsonCodec{ + closeFunc: closeFunc, + closeCh: make(chan any), + encode: encode, + decode: decode, + conn: conn, + } + if ra, ok := conn.(interface{ RemoteAddr() string }); ok { + codec.remote = ra.RemoteAddr() + } + return codec +} + +// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log +// messages will use it to include the remote address of the connection. +func NewCodec(conn Conn) ServerCodec { + encr := func(v any) error { + enc := jzon.BorrowStream(conn) + defer jzon.ReturnStream(enc) + enc.WriteVal(v) + enc.WriteRaw("\n") + enc.Flush() + if enc.Error != nil { + return enc.Error + } + return nil + } + // TODO: + // for some reason other json decoders are incompatible with our test suite + // pretty sure its how we handle EOFs and stuff + dec := stdjson.NewDecoder(conn) + dec.UseNumber() + return NewFuncCodec(conn, encr, dec.Decode, func() error { + return nil + }) +} + +func (c *jsonCodec) PeerInfo() PeerInfo { + // This returns "ipc" because all other built-in transports have a separate codec type. + return PeerInfo{Transport: "ipc", RemoteAddr: c.remote} +} + +func (c *jsonCodec) RemoteAddr() string { + return c.remote +} + +func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err error) { + // Decode the next JSON object in the input stream. + // This verifies basic syntax, etc. + var rawmsg json.RawMessage + if err := c.decode(&rawmsg); err != nil { + return nil, false, err + } + messages, batch = parseMessage(rawmsg) + for i, msg := range messages { + if msg == nil { + // Message is JSON 'null'. Replace with zero value so it + // will be treated like any other invalid message. + messages[i] = new(jsonrpcMessage) + } + } + return messages, batch, nil +} + +func (c *jsonCodec) WriteJSON(ctx context.Context, v any) error { + c.encMu.Lock() + defer c.encMu.Unlock() + + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultWriteTimeout) + } + c.conn.SetWriteDeadline(deadline) + return c.encode(v) +} + +func (c *jsonCodec) Close() error { + c.closer.Do(func() { + close(c.closeCh) + if c.closeFunc != nil { + c.closeFunc() + } + c.conn.Close() + }) + return nil +} + +// Closed returns a channel which will be closed when Close is called +func (c *jsonCodec) Closed() <-chan any { + return c.closeCh +} diff --git a/wire.go b/json_wire.go similarity index 100% rename from wire.go rename to json_wire.go -- GitLab