From 0d7e58b5297987a8d516a8acefa7796147cd3583 Mon Sep 17 00:00:00 2001 From: a <a@a.a> Date: Fri, 23 Sep 2022 07:56:04 -0500 Subject: [PATCH] experimentally writing --- client.go | 3 +++ handler.go | 34 ++++++++++++---------------------- protocol.go | 47 +++++++++++++++++++++++------------------------ 3 files changed, 38 insertions(+), 46 deletions(-) diff --git a/client.go b/client.go index a0ac965..08c9f58 100644 --- a/client.go +++ b/client.go @@ -84,6 +84,9 @@ type Client struct { } func (c *Client) Router() Router { + if c.r == nil { + c.r = NewMux() + } return c.r } diff --git a/handler.go b/handler.go index c395455..c719e6f 100644 --- a/handler.go +++ b/handler.go @@ -19,7 +19,6 @@ package jrpc import ( "context" "sync" - "time" "git.tuxpa.in/a/zlog" ) @@ -75,9 +74,6 @@ func newHandler(connCtx context.Context, conn jsonWriter, reg Router) *handler { cl := h.log.With().Str("conn", conn.remoteAddr()).Logger() h.log = &cl } - if h.peer.RemoteAddr == "" { - h.log.Error().Msg("CONNECTION WITHOUT REMOTE IP DETECTED. PLEASE MAKE SURE YOU KNOW WHAT YOU ARE DOING, OTHERWISE, THIS COULD BE A SECURITY ISSUE") - } return h } @@ -90,7 +86,6 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { }) return } - // Handle non-call messages first: calls := make([]*jsonrpcMessage, 0, len(msgs)) for _, msg := range msgs { @@ -209,11 +204,9 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) { // handleCallMsg executes a call message and returns the answer. // TODO: export prometheus metrics maybe? also fix logging func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - // start := NewTimer() switch { case msg.isNotification(): - h.handleCall(ctx, msg) - // h.log.Debug().Str("method", msg.Method).Dur("duration", start.Since()).Msg("Served") + go h.handleCall(ctx, msg) return nil case msg.isCall(): resp := h.handleCall(ctx, msg) @@ -231,24 +224,21 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage if !callb { return msg.errorResponse(&methodNotFoundError{method: msg.Method}) } - - start := time.Now() - // TODO: there's a copy here - // TODO: add buffer pool here req := &Request{ctx: cp.ctx, msg: *msg, peer: h.peer} mw := NewReaderResponseWriterMsg(req) h.reg.ServeRPC(mw, req) - { - // Collect the statistics for RPC calls if metrics is enabled. - // We only care about pure rpc call. Filter out subscription. - rpcRequestGauge.Inc(1) - if mw.msg.Error != nil { - failedRequestGauge.Inc(1) - } else { - successfulRequestGauge.Inc(1) + if mw.notifications != nil { + for { + val, more := <-mw.notifications + if !more { + break + } + err := h.conn.writeJSON(cp.ctx, val) + if err != nil { + close(mw.notifications) + return msg.errorResponse(err) + } } - rpcServingTimer.UpdateSince(start) - updateServeTimeHistogram(msg.Method, mw.msg.Error == nil, time.Since(start)) } return mw.msg } diff --git a/protocol.go b/protocol.go index 40a4580..e5d36dc 100644 --- a/protocol.go +++ b/protocol.go @@ -3,7 +3,6 @@ package jrpc import ( "context" "encoding/json" - "io" ) type HandlerFunc func(w ResponseWriter, r *Request) @@ -14,6 +13,7 @@ type Handler interface { type ResponseWriter interface { Send(v any, err error) error + Notify(v any) error } func (fn HandlerFunc) ServeRPC(w ResponseWriter, r *Request) { @@ -100,35 +100,22 @@ func (r *Request) Msg() jsonrpcMessage { return r.msg } -type ResponseWriterIo struct { - r *Request - w io.Writer -} - -func NewReaderResponseWriterIo(r *Request, w io.Writer) ResponseWriter { - return &ResponseWriterIo{ - w: w, - r: r, - } -} - -func (w *ResponseWriterIo) Send(args any, e error) (err error) { - enc := jzon.NewEncoder(w.w) - if e != nil { - return enc.Encode(errorMessage(e)) - } - return enc.Encode(args) -} - type ResponseWriterMsg struct { - r *Request - msg *jsonrpcMessage + r *Request + msg *jsonrpcMessage + notifications chan *jsonrpcMessage } func NewReaderResponseWriterMsg(r *Request) *ResponseWriterMsg { - return &ResponseWriterMsg{ + rw := &ResponseWriterMsg{ r: r, } + switch r.Peer().Transport { + case "http": + default: + rw.notifications = make(chan *jsonrpcMessage, 128) + } + return rw } func (w *ResponseWriterMsg) Send(args any, e error) (err error) { @@ -138,6 +125,18 @@ func (w *ResponseWriterMsg) Send(args any, e error) (err error) { return nil } w.msg = cm.response(args) + close(w.notifications) + return nil +} + +func (w *ResponseWriterMsg) Notify(args any) (err error) { + if w.notifications == nil { + return nil + } + cm := w.r.Msg() + nf := cm.response(args) + nf.ID = nil + w.notifications <- nf return nil } -- GitLab