diff --git a/argreflect/json.go b/argreflect/json.go index 0f58d9071e2e7e87aac43e11eb3d5ea8a2de66ac..6d12c20b2da428bea56f73701eaffee82d3e17cf 100644 --- a/argreflect/json.go +++ b/argreflect/json.go @@ -5,8 +5,12 @@ import ( "errors" "fmt" "reflect" + + "gfx.cafe/open/jrpc/codec/websocket/wsjson" ) +var jzon = wsjson.JZON + // parsePositionalArguments tries to parse the given args to an array of values with the // given types. It returns the parsed values or an error when the args could not be // parsed. Missing optional arguments are returned as reflect.Zero values. diff --git a/clientutil/idreply.go b/clientutil/idreply.go new file mode 100644 index 0000000000000000000000000000000000000000..6aac64666d08398d47d7a627915ddc8bd09cb8d2 --- /dev/null +++ b/clientutil/idreply.go @@ -0,0 +1,58 @@ +package clientutil + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" +) + +type IdReply struct { + id atomic.Int64 + + chs map[int]chan msgOrError + mu sync.Mutex +} + +type msgOrError struct { + msg json.RawMessage + err error +} + +func NewIdReply() *IdReply { + return &IdReply{ + chs: make(map[int]chan msgOrError, 1), + } +} + +func (i *IdReply) NextId() int { + return int(i.id.Add(1)) +} + +func (i *IdReply) makeOrTake(id int) chan msgOrError { + i.mu.Lock() + defer i.mu.Unlock() + if val, ok := i.chs[id]; ok { + delete(i.chs, id) + return val + } + o := make(chan msgOrError) + i.chs[id] = o + return o +} + +func (i *IdReply) Resolve(id int, msg json.RawMessage, err error) { + i.makeOrTake(id) <- msgOrError{ + err: err, + msg: msg, + } +} + +func (i *IdReply) Ask(ctx context.Context, id int) (json.RawMessage, error) { + select { + case resp := <-i.makeOrTake(id): + return resp.msg, resp.err + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/codec/errors.go b/codec/errors.go index a74c4950ea879dada7309066e279b775e17262d9..52e12187ea9fdb1d409e807030dc89301901986a 100644 --- a/codec/errors.go +++ b/codec/errors.go @@ -3,7 +3,6 @@ package codec import ( "encoding/json" "fmt" - "io" "github.com/go-faster/jx" ) @@ -36,14 +35,14 @@ type DataError interface { ErrorData() any // returns the error data } -func EncodeError(i io.Writer, err error) error { - enc := jx.GetEncoder() - defer jx.PutEncoder(enc) +func EncodeError(enc *jx.Encoder, err error) error { enc.Obj(func(e *jx.Encoder) { switch er := err.(type) { case Error: - e.Field("code", func(e *jx.Encoder) { e.Int(er.ErrorCode()) }) - e.Field("message", func(e *jx.Encoder) { e.Str(er.Error()) }) + e.FieldStart("code") + e.Int(er.ErrorCode()) + e.FieldStart("message") + e.Str(er.Error()) case DataError: data, err := json.Marshal(er.ErrorData()) if err != nil { @@ -114,6 +113,12 @@ func (e *ErrorMethodNotFound) Error() string { return fmt.Sprintf("the method %s does not exist/is not available", e.method) } +func NewMethodNotFoundError(method string) *ErrorMethodNotFound { + return &ErrorMethodNotFound{ + method: method, + } +} + type ErrorSubscriptionNotFound struct{ namespace, subscription string } func (e *ErrorSubscriptionNotFound) ErrorCode() int { return -32601 } diff --git a/codec/inproc/client.go b/codec/inproc/client.go new file mode 100644 index 0000000000000000000000000000000000000000..e0bb7c38098938d5db72e1a44c0b9e36397f5f8e --- /dev/null +++ b/codec/inproc/client.go @@ -0,0 +1,131 @@ +package inproc + +import ( + "bytes" + "context" + "encoding/json" + "sync" + + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/clientutil" + "gfx.cafe/open/jrpc/codec" +) + +type Client struct { + p *clientutil.IdReply + c *Codec + + handler jrpc.Handler +} + +func NewClient(c *Codec, handler jrpc.Handler) *Client { + cl := &Client{ + p: clientutil.NewIdReply(), + c: c, + handler: handler, + } + go cl.listen() + return cl +} + +func (c *Client) listen() error { + var msg json.RawMessage + for { + err := json.NewDecoder(c.c.rd).Decode(&msg) + if err != nil { + return err + } + msgs, _ := codec.ParseMessage(msg) + for _, v := range msgs { + id := v.ID.Number() + if id == 0 { + //if c.handler != nil { + // c.handler.ServeRPC(w, r) + //} + continue + } + c.p.Resolve(id, v.Result, v.Error) + } + } + +} + +func (c *Client) Do(ctx context.Context, result any, method string, params any) error { + id := c.p.NextId() + req := jrpc.NewRequestInt(ctx, id, method, params) + fwd, err := json.Marshal(req) + if err != nil { + return err + } + c.c.msgs <- fwd + ans, err := c.p.Ask(ctx, id) + if err != nil { + return err + } + if result != nil { + err = json.Unmarshal(ans, result) + if err != nil { + return err + } + } + return nil +} + +func (c *Client) BatchCall(ctx context.Context, b ...*jrpc.BatchElem) error { + buf := new(bytes.Buffer) + enc := json.NewEncoder(buf) + reqs := make([]*jrpc.Request, 0) + ids := make([]int, 0, len(b)) + for _, v := range b { + id := c.p.NextId() + req := jrpc.NewRequestInt(ctx, id, v.Method, v.Params) + ids = append(ids, id) + reqs = append(reqs, req) + } + err := enc.Encode(reqs) + if err != nil { + return err + } + c.c.msgs <- buf.Bytes() + // TODO: wait for response + wg := sync.WaitGroup{} + wg.Add(len(ids)) + for i := range ids { + idx := i + go func() { + ans, err := c.p.Ask(ctx, ids[idx]) + if err != nil { + b[idx].Error = err + return + } + if b[idx].Result != nil { + err = json.Unmarshal(ans, b[idx].Result) + if err != nil { + b[idx].Error = err + return + } + } + defer wg.Done() + }() + } + wg.Wait() + + return err +} + +func (c *Client) SetHeader(key string, value string) { +} + +func (c *Client) Close() error { + return c.c.Close() +} + +func (c *Client) Notify(ctx context.Context, method string, params any) error { + req := jrpc.NewRequest(ctx, "", method, params) + fwd, err := json.Marshal(req) + if err != nil { + return err + } + c.c.msgs <- fwd + return nil +} diff --git a/codec/inproc/inproc.go b/codec/inproc/inproc.go index e9449329d2b3621f60ab2b740117668b5f8d9b2c..0e5665a8156eb47090ead6cf7e641e2de433ef94 100644 --- a/codec/inproc/inproc.go +++ b/codec/inproc/inproc.go @@ -1,33 +1,78 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package jrpc +package inproc import ( + "bufio" "context" - "net" + "encoding/json" + "io" + + "gfx.cafe/open/jrpc/codec" ) -// DialInProc attaches an in-process connection to the given RPC server. -func DialInProc(handler *Server) *Client { - initctx := context.Background() - c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { - p1, p2 := net.Pipe() - go handler.ServeCodec(NewCodec(p1)) - return NewCodec(p2), nil - }) - return c +type Codec struct { + done chan any + + rd io.Reader + wr io.Writer + msgs chan json.RawMessage +} + +func NewCodec() *Codec { + rd, wr := io.Pipe() + return &Codec{ + done: make(chan interface{}), + rd: bufio.NewReader(rd), + wr: wr, + msgs: make(chan json.RawMessage, 8), + } } + +// gets the peer info +func (c *Codec) PeerInfo() codec.PeerInfo { + return codec.PeerInfo{ + Transport: "ipc", + RemoteAddr: "", + HTTP: codec.HttpInfo{}, + } +} + +// json.RawMessage can be an array of requests. if it is, then it is a batch request +func (c *Codec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) { + select { + case ans := <-c.msgs: + return ans, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// closes the connection +func (c *Codec) Close() error { + close(c.done) + return nil +} + +func (c *Codec) Write(p []byte) (n int, err error) { + return c.wr.Write(p) +} + +// Closed returns a channel which is closed when the connection is closed. +func (c *Codec) Closed() <-chan any { + return c.done +} + +// RemoteAddr returns the peer address of the connection. +func (c *Codec) RemoteAddr() string { + return "" +} + +// DialInProc attaches an in-process connection to the given RPC server. +//func DialInProc(handler *Server) *Client { +// initctx := context.Background() +// c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { +// p1, p2 := net.Pipe() +// go handler.ServeCodec(NewCodec(p1)) +// return NewCodec(p2), nil +// }) +// return c +//} diff --git a/codec/inproc/inproc_test.go b/codec/inproc/inproc_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d8fcda65a98e20eb76de9b3340efa0a7d95f306a --- /dev/null +++ b/codec/inproc/inproc_test.go @@ -0,0 +1,28 @@ +package inproc_test + +import ( + "context" + "testing" + + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/codec/inproc" + "gfx.cafe/open/jrpc/jmux" + "github.com/stretchr/testify/require" +) + +func TestInprocSetup(t *testing.T) { + mux := jmux.NewMux() + srv := jrpc.NewServer(mux) + + ctx := context.Background() + + clientCodec := inproc.NewCodec() + client := inproc.NewClient(clientCodec, nil) + go func() { + srv.ServeCodec(ctx, clientCodec) + }() + + var res any + err := client.Do(ctx, res, "hi_there", []any{}) + require.ErrorContains(t, err, "does not exist") +} diff --git a/codec/wire.go b/codec/wire.go index ea2d5e82c44a35cdd4c42e17a2216d33e88a95c8..fae9404dba1d2d8f0d0e77636adba7e5c9abf92f 100644 --- a/codec/wire.go +++ b/codec/wire.go @@ -68,9 +68,25 @@ func NewStringID(v string) ID { return *NewStringIDPtr(v) } // NewStringID returns a new string request ID. func NewNullID() ID { return *NewNullIDPtr() } -func NewNumberIDPtr(v int64) *ID { return &ID{number: v} } -func NewStringIDPtr(v string) *ID { return &ID{name: v} } -func NewNullIDPtr() *ID { return &ID{null: true} } +func NewNumberIDPtr(v int64) *ID { return &ID{number: v} } +func NewStringIDPtr(v string) *ID { + if v == "" { + return nil + } + return &ID{name: v} +} +func NewNullIDPtr() *ID { return &ID{null: true} } + +func (id *ID) Number() int { + if id == nil { + return 0 + } + if id.number == 0 { + ans, _ := strconv.Atoi(id.name) + return ans + } + return int(id.number) +} // Format writes the ID to the formatter. // diff --git a/conn.go b/conn.go index 61d2d6cb6f553a3cd6c8842053080448ae785e2c..11c74464213012462fbc9fb0f52eeceeab71393f 100644 --- a/conn.go +++ b/conn.go @@ -4,7 +4,7 @@ import "context" type Conn interface { Do(ctx context.Context, result any, method string, params any) error - BatchCall(ctx context.Context, b ...BatchElem) error + BatchCall(ctx context.Context, b ...*BatchElem) error SetHeader(key, value string) Close() error } @@ -12,13 +12,14 @@ type Conn interface { type StreamingConn interface { Conn - Notify(ctx context.Context, method string, args ...any) error + Notify(ctx context.Context, method string, params any) error + Handler } // BatchElem is an element in a batch request. type BatchElem struct { Method string - Args any + Params any // The result is unmarshaled into this field. Result must be set to a // non-nil pointer value of the desired type, otherwise the response will be // discarded. diff --git a/go.mod b/go.mod index a038fdb8638e5e5b4584c536bbc4641db03b1e69..cd02b566fd6e793342d37c140f7252c70620cb7f 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/goccy/go-json v0.10.0 github.com/iancoleman/strcase v0.2.0 github.com/json-iterator/go v1.1.12 + github.com/stretchr/testify v1.8.2 golang.org/x/sync v0.1.0 gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce nhooyr.io/websocket v1.8.7 @@ -41,6 +42,7 @@ require ( github.com/mattn/go-isatty v0.0.18 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/zerolog v1.29.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect @@ -53,4 +55,5 @@ require ( golang.org/x/term v0.5.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4cb13c261a9275e435a0ca316dafd68c5c612a87..6ecb1c9ce4c3715bec4182865759edb4caceb575 100644 --- a/go.sum +++ b/go.sum @@ -320,6 +320,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -329,6 +330,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= diff --git a/handler.go b/handler.go deleted file mode 100644 index 0832b345d6474bf50a7cb8cb5fe34e8ec83c2d09..0000000000000000000000000000000000000000 --- a/handler.go +++ /dev/null @@ -1,146 +0,0 @@ -package jrpc - -import ( - "context" - "encoding/json" - "sync" - - "gfx.cafe/open/jrpc/codec" -) - -type requestOp struct { - ids []json.RawMessage - err error - resp chan json.RawMessage // receives up to len(ids) responses -} - -type handler struct { - reg Handler - respWait map[string]*requestOp // active client requests - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn codec.Writer // where responses will be sent - - peer codec.PeerInfo -} - -type callProc struct { - ctx context.Context -} - -func newHandler(connCtx context.Context, conn codec.Writer, reg Handler) *handler { - rootCtx, cancelRoot := context.WithCancel(connCtx) - h := &handler{ - peer: PeerInfoFromContext(connCtx), - reg: reg, - conn: conn, - respWait: make(map[string]*requestOp), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - } - return h -} - -// handleBatch executes all messages in a batch and returns the responses. -func (h *handler) handleBatch(msgs []json.RawMessage) { - // Emit error response for empty batches: - if len(msgs) == 0 { - h.startCallProc(func(cp *callProc) { - h.conn.WriteJSON(cp.ctx, codec.ErrorMessage(codec.NewInvalidRequestError("empty batch"))) - }) - return - } - // Handle non-call messages first: - calls := make([]json.RawMessage, 0, len(msgs)) - for _, msg := range msgs { - //TODO: filter immediate handled - //if handled := h.handleImmediate(msg); !handled { - calls = append(calls, msg) - //} - } - if len(calls) == 0 { - return - } - // TODO: implement this - // Process calls on a goroutine because they may block indefinitely: - h.startCallProc(func(cp *callProc) { - answers := make([]*json.RawMessage, 0, len(msgs)) - for _, msg := range calls { - _ = msg - } - if len(answers) > 0 { - h.conn.WriteJSON(cp.ctx, answers) - } - }) -} - -// handleMsg handles a single message. -func (h *handler) handleMsg(msg *codec.Message) { - // TODO: implement this - h.startCallProc(func(cp *callProc) { - // r := NewMsgRequest(cp.ctx, h.peer, *msg) - //r := &Request{} - //answer := h.handleCallMsg(cp, r) - //if answer != nil { - // h.conn.WriteJSON(cp.ctx, answer) - //} - }) -} - -// close cancels all requests except for inflightReq and waits for -// call goroutines to shut down. -func (h *handler) close(err error, inflightReq *requestOp) { - h.cancelAllRequests(err, inflightReq) - h.callWG.Wait() - h.cancelRoot() -} - -// addRequestOp registers a request operation. -func (h *handler) addRequestOp(op *requestOp) { - for _, id := range op.ids { - h.respWait[string(id)] = op - } -} - -// removeRequestOps stops waiting for the given request IDs. -func (h *handler) removeRequestOp(op *requestOp) { - for _, id := range op.ids { - delete(h.respWait, string(id)) - } -} - -// cancelAllRequests unblocks and removes pending requests and active subscriptions. -func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { - didClose := make(map[*requestOp]bool) - if inflightReq != nil { - didClose[inflightReq] = true - } - - for id, op := range h.respWait { - // Remove the op so that later calls will not close op.resp again. - delete(h.respWait, id) - if !didClose[op] { - op.err = err - close(op.resp) - didClose[op] = true - } - } -} - -// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. -func (h *handler) startCallProc(fn func(*callProc)) { - h.callWG.Add(1) - go func() { - ctx, cancel := context.WithCancel(h.rootCtx) - defer h.callWG.Done() - defer cancel() - fn(&callProc{ctx: ctx}) - }() -} - -func (h *handler) handleCall(cp *callProc, r *Request) *Response { - mw := NewReaderResponseWriterMsg(r) - h.reg.ServeRPC(mw, r) - return mw.Response() -} diff --git a/jmux/router_chain.go b/jmux/router_chain.go index 78d8413ed8a3b6494f645ea9f79038cd235dcfff..0eb1d3bf850cad68dd531c0c3e74df14b0933a83 100644 --- a/jmux/router_chain.go +++ b/jmux/router_chain.go @@ -1,37 +1,39 @@ package jmux +import "gfx.cafe/open/jrpc" + // Chain returns a Middlewares type from a slice of middleware handlers. -func Chain(middlewares ...func(Handler) Handler) Middlewares { +func Chain(middlewares ...func(jrpc.Handler) jrpc.Handler) Middlewares { return Middlewares(middlewares) } // Handler builds and returns a Handler from the chain of middlewares, // with `h Handler` as the final handler. -func (mws Middlewares) Handler(h Handler) Handler { +func (mws Middlewares) Handler(h jrpc.Handler) jrpc.Handler { return &ChainHandler{h, chain(mws, h), mws} } // HandlerFunc builds and returns a Handler from the chain of middlewares, // with `h Handler` as the final handler. -func (mws Middlewares) HandlerFunc(h HandlerFunc) Handler { +func (mws Middlewares) HandlerFunc(h jrpc.HandlerFunc) jrpc.Handler { return &ChainHandler{h, chain(mws, h), mws} } // ChainHandler is a Handler with support for handler composition and // execution. type ChainHandler struct { - Endpoint Handler - chain Handler + Endpoint jrpc.Handler + chain jrpc.Handler Middlewares Middlewares } -func (c *ChainHandler) ServeRPC(w ResponseWriter, r *Request) { +func (c *ChainHandler) ServeRPC(w jrpc.ResponseWriter, r *jrpc.Request) { c.chain.ServeRPC(w, r) } // chain builds a Handler composed of an inline middleware stack and endpoint // handler in the order they are passed. -func chain(middlewares []func(Handler) Handler, endpoint Handler) Handler { +func chain(middlewares []func(jrpc.Handler) jrpc.Handler, endpoint jrpc.Handler) jrpc.Handler { // Return ahead of time if there aren't any middlewares for the chain if len(middlewares) == 0 { return endpoint diff --git a/jmux/router_tree.go b/jmux/router_tree.go index 59a4374ef937135c4e5483fa3c0773101194e680..f5031e42365d08c38b4fb456c2b11888e021f136 100644 --- a/jmux/router_tree.go +++ b/jmux/router_tree.go @@ -8,6 +8,8 @@ import ( "regexp" "sort" "strings" + + "gfx.cafe/open/jrpc" ) type nodeTyp uint8 @@ -48,7 +50,7 @@ type node struct { type endpoint struct { // endpoint handler - handler Handler + handler jrpc.Handler // pattern is the routing pattern for handler nodes pattern string @@ -57,7 +59,7 @@ type endpoint struct { paramKeys []string } -func (n *node) InsertRoute(pattern string, handler Handler) *node { +func (n *node) InsertRoute(pattern string, handler jrpc.Handler) *node { var parent *node search := pattern for { @@ -261,7 +263,7 @@ func (n *node) getEdge(ntyp nodeTyp, label, tail byte, prefix string) *node { return nil } -func (n *node) setEndpoint(handler Handler, pattern string) { +func (n *node) setEndpoint(handler jrpc.Handler, pattern string) { paramKeys := patParamKeys(pattern) n.endpoint = &endpoint{ handler: handler, @@ -270,7 +272,7 @@ func (n *node) setEndpoint(handler Handler, pattern string) { } } -func (n *node) FindRoute(rctx *Context, path string) (*node, *endpoint, Handler) { +func (n *node) FindRoute(rctx *Context, path string) (*node, *endpoint, jrpc.Handler) { // Reset the context routing pattern and params rctx.routePattern = "" rctx.routeParams.Keys = rctx.routeParams.Keys[:0] @@ -681,21 +683,21 @@ func (ns nodes) findEdge(label byte) *node { // Route describes the details of a routing handler. type Route struct { SubRoutes Routes - Handler Handler + Handler jrpc.Handler Pattern string } // WalkFunc is the type of the function called for each method and route visited by Walk. -type WalkFunc func(route string, handler Handler, middlewares ...func(Handler) Handler) error +type WalkFunc func(route string, handler jrpc.Handler, middlewares ...func(jrpc.Handler) jrpc.Handler) error // Walk walks any router tree that implements Routes interface. func Walk(r Routes, walkFn WalkFunc) error { return walk(r, walkFn, "") } -func walk(r Routes, walkFn WalkFunc, parentRoute string, parentMw ...func(Handler) Handler) error { +func walk(r Routes, walkFn WalkFunc, parentRoute string, parentMw ...func(jrpc.Handler) jrpc.Handler) error { for _, route := range r.Routes() { - mws := make([]func(Handler) Handler, len(parentMw)) + mws := make([]func(jrpc.Handler) jrpc.Handler, len(parentMw)) copy(mws, parentMw) mws = append(mws, r.Middlewares()...) diff --git a/request.go b/request.go index c0fa655a0d1dfaa0bb66576ff9090c0d53654c8b..e72a1ea31ee8eefeafe714e3594563b73cff722b 100644 --- a/request.go +++ b/request.go @@ -11,13 +11,34 @@ import ( var jpool = jsoniter.NewIterator(jsoniter.ConfigCompatibleWithStandardLibrary).Pool() type Request struct { + RequestMarshaling + + ctx context.Context +} + +func (r *Request) UnmarshalJSON(xs []byte) error { + return json.Unmarshal(xs, &r.RequestMarshaling) +} + +func (r *Request) MarshalJSON() ([]byte, error) { + return json.Marshal(r.RequestMarshaling) +} + +type RequestMarshaling struct { Version codec.Version `json:"jsonrpc"` ID *codec.ID `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` Peer codec.PeerInfo `json:"-"` +} - ctx context.Context +func NewRequestInt(ctx context.Context, id int, method string, params any) *Request { + r := &Request{ctx: ctx} + pms, _ := json.Marshal(params) + r.ID = codec.NewNumberIDPtr(int64(id)) + r.Method = method + r.Params = pms + return r } func NewRequest(ctx context.Context, id string, method string, params any) *Request { diff --git a/server.go b/server.go index 3f1d270ad55ae6d4f630e7f23b28986fe77ebf18..456086cc974de572aba3207d29c869b952ae801b 100644 --- a/server.go +++ b/server.go @@ -111,13 +111,15 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { } else { defer wg.Done() } - s.services.ServeRPC(vv, &Request{ - ctx: ctx, - ID: v.msg.ID, - Version: v.msg.Version, - Method: v.msg.Method, - Params: v.msg.Params, - Peer: remote.PeerInfo(), + s.services.ServeRPC(v, &Request{ + ctx: ctx, + RequestMarshaling: RequestMarshaling{ + ID: v.msg.ID, + Version: v.msg.Version, + Method: v.msg.Method, + Params: v.msg.Params, + Peer: remote.PeerInfo(), + }, }) }() } @@ -168,7 +170,7 @@ func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { return err } } else { - enc.FieldStart("data") + enc.FieldStart("result") enc.Raw(buf.Bytes()) } enc.ObjEnd() @@ -183,13 +185,14 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) error { buf := bufpool.GetStd() defer bufpool.PutStd(buf) enc := jx.GetEncoder() - enc.ResetWriter(c.remote) + enc.Reset() + //enc.ResetWriter(c.remote) defer jx.PutEncoder(enc) if env.batch { enc.ArrStart() } for _, v := range env.responses { - if v.skip { + if v.msg.ID == nil { continue } buf.Reset() @@ -198,23 +201,30 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) error { enc.Str("2.0") enc.FieldStart("id") enc.Raw(v.msg.ID.RawMessage()) - err := v.dat(buf) + err := v.err + if err == nil && v.dat != nil { + err = v.dat(buf) + if err != nil { + enc.FieldStart("result") + enc.Raw(buf.Bytes()) + } + } else { + err = codec.NewMethodNotFoundError(v.msg.Method) + } if err != nil { enc.FieldStart("error") err := codec.EncodeError(enc, err) if err != nil { return err } - } else { - enc.FieldStart("data") - enc.Raw(buf.Bytes()) } enc.ObjEnd() } if env.batch { enc.ArrEnd() } - err := enc.Close() + //err := enc.Close() + _, err := enc.WriteTo(c.remote) if err != nil { return err } diff --git a/session.go b/session.go deleted file mode 100644 index 66cdccc0d83a425b60492c10e74e643323a0fe83..0000000000000000000000000000000000000000 --- a/session.go +++ /dev/null @@ -1,37 +0,0 @@ -package jrpc - -import ( - "context" - - "gfx.cafe/open/jrpc/codec" -) - -type session struct { -} - -func (s *session) MaxProcs() int { - return 8 -} - -func (s *session) handle(ctx context.Context, stream codec.ReaderWriter, service Handler) error { - msg, err := stream.ReadBatch(ctx) - if err != nil { - //TODO: deal with this error - return err - } - messages, batch := codec.ParseMessage(msg) - totalReplies := 0 - for _, v := range messages { - if v.ID != nil && !v.ID.IsNull() { - totalReplies = totalReplies + 1 - } - } - for _, msg := range messages { - s.handleMessage(err) - } - return nil -} - -func (s *session) handleMessage(ctx context.Context, stream codec.ReaderWriter, service Handler, msg *codec.Message) func() { - return nil -}