diff --git a/chain.go b/chain.go index 15a20b596f4e51554be756ed068b9ab5ab2ad847..e33941166ce5f279be5954a8b1c2a46971642723 100644 --- a/chain.go +++ b/chain.go @@ -27,7 +27,6 @@ type ChainHandler struct { func (c *ChainHandler) ServeRPC(w ResponseWriter, r *Request) { c.chain.ServeRPC(w, r) - return } // chain builds a Handler composed of an inline middleware stack and endpoint @@ -37,12 +36,10 @@ func chain(middlewares []func(Handler) Handler, endpoint Handler) Handler { if len(middlewares) == 0 { return endpoint } - // Wrap the end handler with the middleware chain h := middlewares[len(middlewares)-1](endpoint) for i := len(middlewares) - 2; i >= 0; i-- { h = middlewares[i](h) } - return h } diff --git a/client.go b/client.go index 22fc7c2a182d550496681f39502262ee39a67c8a..a0ac96510b88fdaf392c9b9d815d608fcf1464d0 100644 --- a/client.go +++ b/client.go @@ -23,7 +23,6 @@ import ( "fmt" "net/url" "reflect" - "sync" "sync/atomic" "time" @@ -44,20 +43,6 @@ const ( subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls ) -const ( - // Subscriptions are removed when the subscriber cannot keep up. - // - // This can be worked around by supplying a channel with sufficiently sized buffer, - // but this can be inconvenient and hard to explain in the docs. Another issue with - // buffered channels is that the buffer is static even though it might not be needed - // most of the time. - // - // The approach taken here is to maintain a per-subscription linked list buffer - // shrinks on demand. If the buffer reaches the size below, the subscription is - // dropped. - maxClientSubscriptionBuffer = 20000 -) - // BatchElem is an element in a batch request. type BatchElem struct { Method string @@ -81,8 +66,6 @@ type Client struct { // This function, if non-nil, is called when the connection is lost. reconnectFunc reconnectFunc - reconnectMu sync.Mutex - // writeConn is used for writing to the connection on the caller's goroutine. It should // only be accessed outside of dispatch, with the write lock held. The write lock is // taken by sending on reqInit and released by sending on reqSent. diff --git a/client_test.go b/client_test.go index 72b77a3144a5ba051fd39735480d0d7e887fd5f0..9a5e5f4b392e5c5d7e7e0543cd0e2567c5be29a3 100644 --- a/client_test.go +++ b/client_test.go @@ -103,7 +103,6 @@ func TestClientBatchRequest(t *testing.T) { defer server.Stop() client := DialInProc(server) defer client.Close() - batch := []BatchElem{ { Method: "test_echo", diff --git a/handler.go b/handler.go index 31aa5724a2c00f724ee7c52846e13266a7c84834..c395455c01a8d1d3917c1b43f3a9ed79f83724af 100644 --- a/handler.go +++ b/handler.go @@ -18,8 +18,6 @@ package jrpc import ( "context" - "encoding/json" - "strconv" "sync" "time" @@ -219,20 +217,6 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess return nil case msg.isCall(): resp := h.handleCall(ctx, msg) - // var ctx []any - // log2 := h.log.With() - // log2.Str("reqid", string(msg.ID)).Dur("duration", start.Since()) - if resp.Error != nil { - // log2.Str("err", resp.Error.Message) - // if resp.Error.Data != nil { - // log2.Interface("errdata", resp.Error.Data) - // } - // sl := log2.Logger() - // sl.Warn().Str("method", msg.Method).Interface("ctx", ctx).Msg("Served") - } else { - // sl := log2.Logger() - // sl.Debug().Str("method", msg.Method).Interface("ctx", ctx).Msg("Served") - } return resp case msg.hasValidID(): return msg.errorResponse(&invalidRequestError{"invalid request"}) @@ -268,12 +252,3 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage } return mw.msg } - -type idForLog struct{ json.RawMessage } - -func (id idForLog) String() string { - if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { - return s - } - return string(id.RawMessage) -} diff --git a/json.go b/json.go index a8aa4c6e7cc937d7fde76c95856b3889a34bc62c..41780432e38c1a1281e54788630457c5452d139c 100644 --- a/json.go +++ b/json.go @@ -25,7 +25,6 @@ import ( "io" "reflect" "strconv" - "strings" "sync" "time" @@ -36,12 +35,6 @@ import ( var jzon = wsjson.JZON const ( - vsn = "2.0" - serviceMethodSeparator = "_" - subscribeMethodSuffix = "_subscribe" - unsubscribeMethodSuffix = "_unsubscribe" - notificationMethodSuffix = "_subscription" - defaultWriteTimeout = 10 * time.Second // used if context has no deadline ) @@ -82,19 +75,6 @@ func (msg *jsonrpcMessage) hasValidID() bool { return msg.ID != nil && !msg.ID.null } -func (msg *jsonrpcMessage) isSubscribe() bool { - return strings.HasSuffix(msg.Method, subscribeMethodSuffix) -} - -func (msg *jsonrpcMessage) isUnsubscribe() bool { - return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix) -} - -func (msg *jsonrpcMessage) namespace() string { - elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2) - return elem[0] -} - func (msg *jsonrpcMessage) String() string { b, _ := jzon.Marshal(msg) return string(b) diff --git a/mux.go b/mux.go index 9186b95d21337b4574e51fa63684a81b569a7227..bbb0db7c20bdc7359c5c8bcef73a1040ea57ea4d 100644 --- a/mux.go +++ b/mux.go @@ -106,7 +106,6 @@ func (mx *Mux) ServeRPC(w ResponseWriter, r *Request) { // Serve the request and once its done, put the request context back in the sync pool mx.handler.ServeRPC(w, r) mx.pool.Put(rctx) - return } // Use appends a middleware handler to the Mux middleware stack. @@ -405,11 +404,9 @@ func (mx *Mux) updateRouteHandler() { // methodNotAllowedHandler is a helper function to respond with a 405, // method not allowed. func methodNotAllowedHandler(w ResponseWriter, r *Request) { - w.Send(nil, errors.New("Forbidden")) - return + w.Send(nil, errors.New("forbidden")) } func NotFound(w ResponseWriter, r *Request) { - w.Send(nil, errors.New("Not Found")) - return + w.Send(nil, errors.New("not found")) } diff --git a/service.go b/service.go index a5defae77f586a2ab3876cb3cc26b5b1fa174ea2..058b70ed1886e81a7414113d06ca6fa5169ebd09 100644 --- a/service.go +++ b/service.go @@ -18,7 +18,6 @@ package jrpc import ( "context" - "errors" "reflect" "runtime" "unicode" @@ -29,7 +28,6 @@ import ( var ( contextType = reflect.TypeOf((*context.Context)(nil)).Elem() errorType = reflect.TypeOf((*error)(nil)).Elem() - stringType = reflect.TypeOf("") ) // A helper function that mimics the behavior of the handlers in the go-ethereum rpc package @@ -119,7 +117,6 @@ func (e *callback) ServeRPC(w ResponseWriter, r *Request) { return } w.Send(results[0].Interface(), nil) - return } // newCallback turns fn (a function) into a callback object. It returns nil if the function @@ -171,42 +168,6 @@ func (c *callback) makeArgTypes() { } } -// call invokes the callback. -func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res any, errRes error) { - // Create the argument slice. - fullargs := make([]reflect.Value, 0, 2+len(args)) - if c.rcvr.IsValid() { - fullargs = append(fullargs, c.rcvr) - } - if c.hasCtx { - fullargs = append(fullargs, reflect.ValueOf(ctx)) - } - fullargs = append(fullargs, args...) - - // Catch panic while running the callback. - defer func() { - if err := recover(); err != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Error().Str("method", method).Interface("err", err).Hex("buf", buf).Msg("crashed") - errRes = errors.New("method handler crashed") - } - }() - // Run the callback. - // fn is a pointer to a function. I'm not sure if I like this... - results := c.fn.Call(fullargs) - if len(results) == 0 { - return nil, nil - } - if c.errPos >= 0 && !results[c.errPos].IsNil() { - // Method has returned non-nil error value. - err := results[c.errPos].Interface().(error) - return reflect.Value{}, err - } - return results[0].Interface(), nil -} - // Does t satisfy the error interface? func isErrorType(t reflect.Type) bool { for t.Kind() == reflect.Ptr { diff --git a/testservice_test.go b/testservice_test.go index fc33805ca0f4689400cba4d7c49644244a34f0f1..e7c93f14e961a8565b944ef93155e7af3e6dc9a0 100644 --- a/testservice_test.go +++ b/testservice_test.go @@ -124,9 +124,7 @@ func (s *testService) CallMeBackLater(ctx context.Context, method string, args [ } type notificationTestService struct { - unsubscribed chan string - gotHangSubscriptionReq chan struct{} - unblockHangSubscription chan struct{} + unsubscribed chan string } func (s *notificationTestService) Echo(i int) int { diff --git a/timer.go b/timer.go index 2a544a85ddc53046cdaf2e7b3e8ac016d20dff58..689d95564f6dd8f6deb3bc1716fb715766a2e23a 100644 --- a/timer.go +++ b/timer.go @@ -15,9 +15,9 @@ func NewTimer() *Timer { } func (t *Timer) Since(...any) time.Duration { - return time.Now().Sub(t.s) + return time.Since(t.s) } func (t *Timer) Until() time.Duration { - return t.s.Sub(time.Now()) + return time.Until(t.s) } diff --git a/tree.go b/tree.go index 4619e098c24634568a909f2b81f945fa86691e5f..dbc4f472553dc6a78a9bd969510e74bfa5686670 100644 --- a/tree.go +++ b/tree.go @@ -19,8 +19,6 @@ const ( ntCatchAll // /api/v1/* ) -type endpoints map[string]*endpoint - type node struct { // subroutes on the leaf node subroutes Routes diff --git a/websocket.go b/websocket.go index b3b21b83b881f97de98a316caf7af37feec01dc5..a54aa04c74b9964ba3a6b53e939f6d53edb00ae2 100644 --- a/websocket.go +++ b/websocket.go @@ -21,7 +21,6 @@ import ( "encoding/base64" "net/http" "net/url" - "sync" "time" "gfx.cafe/open/jrpc/wsjson" @@ -38,8 +37,6 @@ const ( wsMessageSizeLimit = 32 * 1024 * 1024 ) -var wsBufferPool = new(sync.Pool) - // WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. // // allowedOrigins should be a comma-separated list of allowed origin URLs. @@ -134,7 +131,6 @@ type websocketCodec struct { conn *websocket.Conn info PeerInfo - wg sync.WaitGroup pingReset chan struct{} } diff --git a/websocket_test.go b/websocket_test.go index ed2550220b719f2713ba5b0f314f5cb5f2e37f92..5893226d4e2e99b4dc1a4122483bd97309f96f18 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -19,10 +19,8 @@ package jrpc import ( "context" "errors" - "io" "net/http/httptest" "strings" - "sync/atomic" "testing" ) @@ -274,31 +272,3 @@ func TestClientWebsocketLargeMessage(t *testing.T) { /// } /// } ///} - -// severableReadWriteCloser wraps an io.ReadWriteCloser and provides a Sever() method to drop writes and read empty. -type severableReadWriteCloser struct { - io.ReadWriteCloser - severed int32 // atomic -} - -func (s *severableReadWriteCloser) Sever() { - atomic.StoreInt32(&s.severed, 1) -} - -func (s *severableReadWriteCloser) Read(p []byte) (n int, err error) { - if atomic.LoadInt32(&s.severed) > 0 { - return 0, nil - } - return s.ReadWriteCloser.Read(p) -} - -func (s *severableReadWriteCloser) Write(p []byte) (n int, err error) { - if atomic.LoadInt32(&s.severed) > 0 { - return len(p), nil - } - return s.ReadWriteCloser.Write(p) -} - -func (s *severableReadWriteCloser) Close() error { - return s.ReadWriteCloser.Close() -} diff --git a/wire.go b/wire.go index 39216c86b7bff3bd5a91806391b59576f8a1a616..bfc334357e20f70a161a0ebcbb1837071ed16506 100644 --- a/wire.go +++ b/wire.go @@ -72,19 +72,19 @@ func NewNullIDPtr() *ID { return &ID{null: true} } // // If the rune is q the representation is non ambiguous, // string forms are quoted, number forms are preceded by a #. -func (id ID) Format(f fmt.State, r rune) { +func (id *ID) Format(f fmt.State, r rune) { numF, strF := `%d`, `%s` if r == 'q' { numF, strF = `#%d`, `%q` } + id.null = false switch { case id.name != "": fmt.Fprintf(f, strF, id.name) default: fmt.Fprintf(f, numF, id.number) } - id.null = false } // get the raw message