good morning!!!!

Skip to content
Commits on Source (15)
......@@ -27,7 +27,7 @@ func BenchmarkClientHTTPEcho(b *testing.B) {
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, "test_echoAny", []any{1, 2, 3, 4, 56, 6, wantBack, wantBack, wantBack})
return client.Call(nil, nil, "test_echoAny", []any{1, 2, 3, 4, 56, 6, wantBack, wantBack, wantBack})
})
}
eg.Wait()
......@@ -48,7 +48,7 @@ func BenchmarkClientHTTPEchoEmpty(b *testing.B) {
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, "test_echoAny", 0)
return client.Call(nil, nil, "test_echoAny", 0)
})
}
eg.Wait()
......@@ -76,7 +76,7 @@ func BenchmarkClientWebsocketEcho(b *testing.B) {
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, "test_echoAny", payload)
return client.Call(nil, nil, "test_echoAny", payload)
})
}
eg.Wait()
......@@ -98,7 +98,7 @@ func BenchmarkClientWebsocketEchoEmpty(b *testing.B) {
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, "test_echoAny", 0)
return client.Call(nil, nil, "test_echoAny", 0)
})
}
eg.Wait()
......
......@@ -18,7 +18,7 @@ package jrpc
import (
"context"
"encoding/json"
gojson "encoding/json"
"errors"
"fmt"
"net/url"
......@@ -26,7 +26,7 @@ import (
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"tuxpa.in/a/zlog/log"
)
......@@ -57,6 +57,8 @@ type BatchElem struct {
Error error
}
var _ SubscriptionConn = (*Client)(nil)
// Client represents a connection to an RPC server.
type Client struct {
isHTTP bool // connection type: http, ws or ipc
......@@ -231,7 +233,7 @@ func (c *Client) SupportedModules() (map[string]string, error) {
var result map[string]string
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
err := c.CallContext(ctx, &result, "rpc_modules")
err := c.Call(ctx, &result, "rpc_modules")
return result, err
}
......@@ -286,7 +288,7 @@ func (c *Client) call(ctx context.Context, result any, msg *jsonrpcMessage) erro
case result == nil:
return nil
default:
return json.Unmarshal(resp.Result, &result)
return json.Unmarshal(resp.Result, result)
}
}
......@@ -295,29 +297,23 @@ func (c *Client) call(ctx context.Context, result any, msg *jsonrpcMessage) erro
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) Do(result any, method string, param any) error {
ctx := context.Background()
return c.DoContext(ctx, result, method, param)
}
func (c *Client) DoContext(ctx context.Context, result any, method string, param any) error {
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}
msg, err := c.newMessageP(method, param)
msg, err := c.newMessageP(method, params)
if err != nil {
return err
}
if ctx == nil {
ctx = context.TODO()
}
return c.call(ctx, result, msg)
}
// Deprecated: use Do
func (c *Client) Call(result any, method string, args ...any) error {
return c.Do(result, method, args)
}
// Deprecated: use DoContext
func (c *Client) CallContext(ctx context.Context, result any, method string, args ...any) error {
return c.DoContext(ctx, result, method, args)
// Call calls Do, except accepts variadic parameters
func (c *Client) Call(ctx context.Context, result any, method string, args ...any) error {
return c.Do(ctx, result, method, args)
}
// BatchCall sends all given requests as a single batch and waits for the server
......@@ -327,25 +323,15 @@ func (c *Client) CallContext(ctx context.Context, result any, method string, arg
// a request is reported through the Error field of the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCall(b []BatchElem) error {
ctx := context.Background()
return c.BatchCallContext(ctx, b)
}
// BatchCallContext sends all given requests as a single batch and waits for the server
// to return a response for all of them. The wait duration is bounded by the
// context's deadline.
//
// In contrast to CallContext, BatchCallContext only returns errors that have occurred
// while sending the request. Any error specific to a request is reported through the
// Error field of the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
func (c *Client) BatchCall(ctx context.Context, b ...BatchElem) error {
var (
msgs = make([]*jsonrpcMessage, len(b))
byID = make(map[string]int, len(b))
)
if ctx == nil {
ctx = context.TODO()
}
op := &requestOp{
ids: make([]json.RawMessage, len(b)),
resp: make(chan *jsonrpcMessage, len(b)),
......@@ -393,18 +379,15 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
}
func (c *Client) Notify(ctx context.Context, method string, args ...any) error {
return c.DoNotify(ctx, method, args)
}
// Notify sends a notification, i.e. a method call that doesn't expect a response.
func (c *Client) DoNotify(ctx context.Context, method string, args any) error {
op := new(requestOp)
msg, err := c.newMessageP(method, args)
if err != nil {
return err
}
if ctx == nil {
ctx = context.TODO()
}
msg.ID = nil
if c.isHTTP {
return c.sendHTTP(ctx, op, msg)
}
......@@ -451,7 +434,7 @@ func (c *Client) newMessageP(method string, paramIn any) (*jsonrpcMessage, error
msg := &jsonrpcMessage{ID: c.nextID(), Method: method}
if paramIn != nil { // prevent sending "params":null
var err error
if msg.Params, err = jsoniter.Marshal(paramIn); err != nil {
if msg.Params, err = json.Marshal(paramIn); err != nil {
return nil, err
}
}
......@@ -617,6 +600,9 @@ func (c *Client) read(codec ServerCodec) {
if _, ok := err.(*json.SyntaxError); ok {
codec.WriteJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if _, ok := err.(*gojson.SyntaxError); ok {
codec.WriteJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
return
......
......@@ -30,13 +30,13 @@ import (
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"tuxpa.in/a/zlog"
"tuxpa.in/a/zlog/log"
"github.com/davecgh/go-spew/spew"
)
func init() {
zlog.SetGlobalLevel(zlog.FatalLevel)
zlog.SetGlobalLevel(zlog.ErrorLevel)
}
func TestClientRequest(t *testing.T) {
......@@ -46,7 +46,7 @@ func TestClientRequest(t *testing.T) {
defer client.Close()
var resp echoResult
if err := client.Call(&resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
if err := client.Call(nil, &resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(resp, echoResult{"hello", 10, &echoArgs{"world"}}) {
......@@ -60,12 +60,12 @@ func TestClientResponseType(t *testing.T) {
client := DialInProc(server)
defer client.Close()
if err := client.Call(nil, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
if err := client.Call(nil, nil, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
t.Errorf("Passing nil as result should be fine, but got an error: %v", err)
}
var resultVar echoResult
// Note: passing the var, not a ref
err := client.Call(resultVar, "test_echo", "hello", 10, &echoArgs{"world"})
err := client.Call(nil, resultVar, "test_echo", "hello", 10, &echoArgs{"world"})
if err == nil {
t.Error("Passing a var as result should be an error")
}
......@@ -79,7 +79,7 @@ func TestClientErrorData(t *testing.T) {
defer client.Close()
var resp any
err := client.Call(&resp, "test_returnError")
err := client.Call(nil, &resp, "test_returnError")
if err == nil {
t.Fatal("expected error")
}
......@@ -120,7 +120,7 @@ func TestClientBatchRequest(t *testing.T) {
Result: new(int),
},
}
if err := client.BatchCall(batch); err != nil {
if err := client.BatchCall(nil, batch...); err != nil {
t.Fatal(err)
}
wantResult := []BatchElem{
......@@ -162,7 +162,7 @@ func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
// This test checks that requests made through CallContext can be canceled by canceling
// This test checks that requests made through Call can be canceled by canceling
// the context.
func testClientCancel(transport string, t *testing.T) {
// These tests take a lot of time, run them all at once.
......@@ -234,7 +234,7 @@ func testClientCancel(transport string, t *testing.T) {
// Now perform a call with the context.
// The key thing here is that no call will ever complete successfully.
err := client.CallContext(ctx, nil, "test_block")
err := client.Call(ctx, nil, "test_block")
switch {
case err == nil:
_, hasDeadline := ctx.Deadline()
......@@ -307,7 +307,7 @@ func TestClientHTTP(t *testing.T) {
for i := range results {
i := i
go func() {
errc <- client.Call(&results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
errc <- client.Call(nil, &results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
}()
}
......@@ -357,7 +357,7 @@ func TestClientReconnect(t *testing.T) {
// Perform a call. This should work because the server is up.
var resp echoResult
if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil {
if err := client.Call(ctx, &resp, "test_echo", "", 1, nil); err != nil {
t.Fatal(err)
}
......@@ -368,7 +368,7 @@ func TestClientReconnect(t *testing.T) {
time.Sleep(2 * time.Second)
// Try calling again. It shouldn't work.
if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil {
if err := client.Call(ctx, &resp, "test_echo", "", 2, nil); err == nil {
t.Error("successful call while the server is down")
t.Logf("resp: %#v", resp)
}
......@@ -385,7 +385,7 @@ func TestClientReconnect(t *testing.T) {
go func() {
<-start
var resp echoResult
errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil)
errors <- client.Call(ctx, &resp, "test_echo", "", 3, nil)
}()
}
close(start)
......
package jrpc
import "context"
var _ Conn = (*Client)(nil)
type Conn interface {
Do(ctx context.Context, result any, method string, params any) error
BatchCall(ctx context.Context, b ...BatchElem) error
SetHeader(key, value string)
Close() error
}
type SubscriptionConn interface {
Conn
Notify(ctx context.Context, method string, args ...any) error
Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*ClientSubscription, error)
}
......@@ -16,7 +16,9 @@
package jrpc
import "fmt"
import (
"fmt"
)
// HTTPError is returned by client operations when the HTTP status code of the
// response is not a 2xx status.
......@@ -45,6 +47,33 @@ type DataError interface {
ErrorData() any // returns the error data
}
type JrpcErr struct {
Data any
}
func (j *JrpcErr) ErrorData() any {
return j.Data
}
func (j *JrpcErr) Error() string {
return "Jrpc Error"
}
func (j *JrpcErr) ErrorCode() int {
return jrpcErrorCode
}
func WrapJrpcErr(err error) error {
if err == nil {
return nil
}
return fmt.Errorf("%w: %w", &JrpcErr{}, err)
}
func MakeJrpcErr(s string) error {
return fmt.Errorf("%w: %s", &JrpcErr{}, s)
}
// Error types defined below are the built-in JSON-RPC errors.
var (
......@@ -60,6 +89,8 @@ const defaultErrorCode = -32000
const applicationErrorCode = -32080
const jrpcErrorCode = -42000
type methodNotFoundError struct{ method string }
func (e *methodNotFoundError) ErrorCode() int { return -32601 }
......
......@@ -3,12 +3,12 @@ module gfx.cafe/open/jrpc
go 1.18
require (
gfx.cafe/util/go/bufpool v0.0.0-20221221002138-cc76846e2979
gfx.cafe/util/go/frand v0.0.0-20221221002138-cc76846e2979
gfx.cafe/util/go/bufpool v0.0.0-20230121041905-80dafb1e973e
gfx.cafe/util/go/frand v0.0.0-20230121041905-80dafb1e973e
github.com/alecthomas/kong v0.7.1
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.8.0
github.com/ethereum/go-ethereum v1.10.26
github.com/ethereum/go-ethereum v1.11.1
github.com/gobuffalo/packr/v2 v2.8.3
github.com/goccy/go-json v0.10.0
github.com/iancoleman/strcase v0.2.0
......@@ -22,6 +22,7 @@ require (
require (
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect
......@@ -30,24 +31,24 @@ require (
github.com/google/go-cmp v0.5.8 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/markbates/errx v1.1.0 // indirect
github.com/markbates/oncer v1.0.0 // indirect
github.com/markbates/safe v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/rs/zerolog v1.28.0 // indirect
github.com/rs/zerolog v1.29.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/term v0.3.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
......
......@@ -41,10 +41,14 @@ gfx.cafe/util/go/bufpool v0.0.0-20221003014715-6000e93338e0 h1:QFUk3cN0Nd9wjcz5A
gfx.cafe/util/go/bufpool v0.0.0-20221003014715-6000e93338e0/go.mod h1:+DiyiCOBGS9O9Ce4ewHQO3Y59h66WSWAbgZZ2O2AYYw=
gfx.cafe/util/go/bufpool v0.0.0-20221221002138-cc76846e2979 h1:IrfGF9CX1mvuK6myvE0Y3qBzzMdv4nuCUlBpUnv/JSc=
gfx.cafe/util/go/bufpool v0.0.0-20221221002138-cc76846e2979/go.mod h1:+DiyiCOBGS9O9Ce4ewHQO3Y59h66WSWAbgZZ2O2AYYw=
gfx.cafe/util/go/bufpool v0.0.0-20230121041905-80dafb1e973e h1:cx35whzZb3wcLhmOUOiqz0N4f6o9ZWVnRe386rW9R5c=
gfx.cafe/util/go/bufpool v0.0.0-20230121041905-80dafb1e973e/go.mod h1:+DiyiCOBGS9O9Ce4ewHQO3Y59h66WSWAbgZZ2O2AYYw=
gfx.cafe/util/go/frand v0.0.0-20221022080059-c522fd4b624d h1:TtdfrlzSxGVz28pYOSxDpa0W1SUVOzycH6ix7OHOTNU=
gfx.cafe/util/go/frand v0.0.0-20221022080059-c522fd4b624d/go.mod h1:poXnpKa5SFxaZCQl+1Wqvpg8F/3EpLUqsvEZedEmf9U=
gfx.cafe/util/go/frand v0.0.0-20221221002138-cc76846e2979 h1:FVV5buGRB1ya+HrXXcBx/s7PUa8zjYRizPbQyXgkVUs=
gfx.cafe/util/go/frand v0.0.0-20221221002138-cc76846e2979/go.mod h1:LNHxMJl0WnIr5+OChYxlVopxk+j7qxZv0XvWCzB6uGE=
gfx.cafe/util/go/frand v0.0.0-20230121041905-80dafb1e973e h1:A62zlsu3HkEAVRIb+cCpRIpSTmd047+ABV1KC2RsI2U=
gfx.cafe/util/go/frand v0.0.0-20230121041905-80dafb1e973e/go.mod h1:LNHxMJl0WnIr5+OChYxlVopxk+j7qxZv0XvWCzB6uGE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY=
......@@ -80,6 +84,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4=
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI=
github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
......@@ -91,6 +97,8 @@ github.com/ethereum/go-ethereum v1.10.25 h1:5dFrKJDnYf8L6/5o42abCE6a9yJm9cs4EJVR
github.com/ethereum/go-ethereum v1.10.25/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/ethereum/go-ethereum v1.10.26 h1:i/7d9RBBwiXCEuyduBQzJw/mKmnvzsN14jqBmytw72s=
github.com/ethereum/go-ethereum v1.10.26/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/ethereum/go-ethereum v1.11.1 h1:EMymmWFzpS7G9l9NvVN8G73cgdUIqDPNRf2YTSGBXlk=
github.com/ethereum/go-ethereum v1.11.1/go.mod h1:DuefStAgaxoaYGLR0FueVcVbehmn5n9QUcVrMCuOvuc=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
......@@ -248,6 +256,8 @@ github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0=
github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
......@@ -274,6 +284,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
......@@ -307,6 +319,8 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY=
github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/rs/zerolog v1.29.0 h1:Zes4hju04hjbvkVkOhdl2HpZa+0PmVwigmo8XoORE5w=
github.com/rs/zerolog v1.29.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
......@@ -383,6 +397,8 @@ golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 h1:x8vtB3zMecnlqZIwJNUUpw
golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.4.0 h1:UVQgzMY87xqpKNgb+kDsll2Igd33HszWHFLmpaRMq/8=
golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
......@@ -541,12 +557,16 @@ golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
......
......@@ -320,11 +320,11 @@ func (h *handler) handleSubscribe(cp *callProc, r *Request) *Response {
// Install notifier in context so the subscription handler can find it.
n := &Notifier{h: h, namespace: namespace, idgen: randomIDGenerator()}
cp.notifiers = append(cp.notifiers, n)
req := r.WithContext(cp.ctx)
// now actually run the handler
req = req.WithContext(
context.WithValue(req.ctx, notifierKey{}, n),
req := r.WithContext(
context.WithValue(r.ctx, notifierKey{}, n),
)
mw = NewReaderResponseWriterMsg(req)
h.reg.ServeRPC(mw, req)
return mw.Response()
}
......
package jrpc
func Do[T any](c *Client, method string, args any) (T, error) {
import "context"
func Do[T any](ctx context.Context, c Conn, method string, args any) (*T, error) {
var t T
err := c.Do(ctx, &t, method, args)
if err != nil {
return nil, err
}
return &t, nil
}
func Call[T any](ctx context.Context, c Conn, method string, args ...any) (*T, error) {
var t T
err := c.Do(t, method, args)
err := c.Do(ctx, &t, method, args)
if err != nil {
return t, err
return nil, err
}
return t, nil
return &t, nil
}
......@@ -118,7 +118,7 @@ func TestHTTPRespBodyUnlimited(t *testing.T) {
defer c.Close()
var r string
if err := c.Call(&r, "test_largeResp"); err != nil {
if err := c.Call(nil, &r, "test_largeResp"); err != nil {
t.Fatal(err)
}
if len(r) != respLength {
......@@ -140,7 +140,7 @@ func TestHTTPErrorResponse(t *testing.T) {
}
var r string
err = c.Call(&r, "test_method")
err = c.Call(nil, &r, "test_method")
if err == nil {
t.Fatal("error was expected")
}
......@@ -180,7 +180,7 @@ func TestHTTPPeerInfo(t *testing.T) {
// Request peer information.
var info PeerInfo
if err := c.Call(&info, "test_peerInfo"); err != nil {
if err := c.Call(nil, &info, "test_peerInfo"); err != nil {
t.Fatal(err)
}
......
......@@ -2,15 +2,15 @@ package jrpc
import (
"context"
"encoding/json"
stdjson "encoding/json"
"io"
"sync"
"time"
"github.com/goccy/go-json"
)
// Conn is a subset of the methods of net.Conn which are sufficient for creating a jsonCodec
type Conn interface {
// DeadlineConn is a subset of the methods of net.Conn which are sufficient for creating a jsonCodec
type DeadlineConn interface {
io.ReadWriteCloser
SetWriteDeadline(time.Time) error
}
......@@ -25,14 +25,14 @@ type jsonCodec struct {
decode func(v any) error // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encode func(v any) error // encoder to allow multiple transports
conn Conn
conn DeadlineConn
}
// NewFuncCodec creates a codec which uses the given functions to read and write. If conn
// implements ConnRemoteAddr, log messages will use it to include the remote address of
// the connection.
func NewFuncCodec(
conn Conn,
conn DeadlineConn,
encode, decode func(v any) error,
closeFunc func() error,
) ServerCodec {
......@@ -51,7 +51,7 @@ func NewFuncCodec(
// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
// messages will use it to include the remote address of the connection.
func NewCodec(conn Conn) ServerCodec {
func NewCodec(conn DeadlineConn) ServerCodec {
encr := func(v any) error {
enc := jzon.BorrowStream(conn)
defer jzon.ReturnStream(enc)
......@@ -66,7 +66,7 @@ func NewCodec(conn Conn) ServerCodec {
// TODO:
// for some reason other json decoders are incompatible with our test suite
// pretty sure its how we handle EOFs and stuff
dec := stdjson.NewDecoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
return NewFuncCodec(conn, encr, dec.Decode, func() error {
return nil
......
......@@ -18,6 +18,7 @@ package jrpc
import (
"context"
"fmt"
"reflect"
"runtime"
"unicode"
......@@ -102,14 +103,18 @@ func (e *callback) ServeRPC(w ResponseWriter, r *Request) {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Error().Str("method", r.Method).Interface("err", err).Hex("buf", buf).Msg("crashed")
log.Error().Str("method", r.Method).Interface("err", fmt.Sprintf("%s", err)).Stack().Msg("reflect handler crashed")
// errRes := errors.New("method handler crashed: " + fmt.Sprint(err))
w.Send(nil, nil)
w.Send(nil, fmt.Errorf("recover: %s", err))
return
}
}()
// Run the callback.
results := e.fn.Call(fullargs)
if len(results) == 0 {
w.Send(nil, nil)
return
}
if e.errPos >= 0 && !results[e.errPos].IsNil() {
// Method has returned non-nil error value.
err := results[e.errPos].Interface().(error)
......
......@@ -108,6 +108,9 @@ func (w *ResponseWriterMsg) Send(args any, e error) (err error) {
}
func (w *ResponseWriterMsg) Notify(args any) (err error) {
if w.n == nil {
w.n, _ = NotifierFromContext(w.r.ctx)
}
if w.s == nil || w.n == nil {
return ErrSubscriptionNotFound
}
......
......@@ -6,7 +6,6 @@ import (
"net/http"
"sync/atomic"
"tuxpa.in/a/zlog/log"
mapset "github.com/deckarep/golang-set"
)
......@@ -98,7 +97,6 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
// subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
log.Debug().Msg("RPC server shutting down")
s.codecs.Each(func(c any) bool {
c.(ServerCodec).Close()
return true
......
......@@ -310,5 +310,5 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e
func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
return sub.client.Call(context.Background(), &result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}
// This test checks that an error is written for invalid JSON requests.
--> 'f
<-- {"jsonrpc":"2.0","id":null,"error":{"code":-32700,"message":"invalid character '\\'' looking for beginning of value"}}
<-- {"jsonrpc":"2.0","id":null,"error":{"code":-32700,"message":"json: invalid character \n as bool(false)"}}
......@@ -123,7 +123,7 @@ func (s *testService) CallMeBack(ctx context.Context, method string, args []any)
return nil, errors.New("no client")
}
var result any
err := c.Call(&result, method, args...)
err := c.Call(nil, &result, method, args...)
return result, err
}
......@@ -135,7 +135,7 @@ func (s *testService) CallMeBackLater(ctx context.Context, method string, args [
go func() {
<-ctx.Done()
var result any
c.Call(&result, method, args...)
c.Call(nil, &result, method, args...)
}()
return nil
}
......
......@@ -10,24 +10,26 @@ type WebsocketServer struct {
}
func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if isWebsocket(r) {
if IsWebsocket(r) {
s.s.WebsocketHandler([]string{"*"}).ServeHTTP(w, r)
return
}
s.s.ServeHTTP(w, r)
}
func isWebsocket(r *http.Request) bool {
func IsWebsocket(r *http.Request) bool {
return strings.EqualFold(r.Header.Get("Upgrade"), "websocket") &&
strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade")
}
func (s *Server) ServeHTTPWithWss(cb func(r *http.Request)) http.Handler {
func (s *Server) ServeHTTPWithWss(cb func(w http.ResponseWriter, r *http.Request) bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if isWebsocket(r) {
if cb != nil {
cb(r)
if cb != nil {
if cb(w, r) {
return
}
}
if IsWebsocket(r) {
s.WebsocketHandler([]string{"*"}).ServeHTTP(w, r)
return
}
......
......@@ -93,7 +93,7 @@ func TestWebsocketLargeCall(t *testing.T) {
// This call sends slightly less than the limit and should work.
var result echoResult
arg := strings.Repeat("x", maxRequestContentLength-200)
if err := client.Call(&result, "test_echo", arg, 1); err != nil {
if err := client.Call(nil, &result, "test_echo", arg, 1); err != nil {
t.Fatalf("valid call didn't work: %v", err)
}
if result.String != arg {
......@@ -102,7 +102,7 @@ func TestWebsocketLargeCall(t *testing.T) {
// This call sends twice the allowed size and shouldn't work.
arg = strings.Repeat("x", maxRequestContentLength*2)
err = client.Call(&result, "test_echo", arg)
err = client.Call(nil, &result, "test_echo", arg)
if err == nil {
t.Fatal("no error for too large call")
}
......@@ -125,7 +125,7 @@ func TestWebsocketPeerInfo(t *testing.T) {
// Request peer information.
var connInfo PeerInfo
if err := c.Call(&connInfo, "test_peerInfo"); err != nil {
if err := c.Call(nil, &connInfo, "test_peerInfo"); err != nil {
t.Fatal(err)
}
......@@ -162,7 +162,7 @@ func TestClientWebsocketLargeMessage(t *testing.T) {
}
var r string
if err := c.Call(&r, "test_largeResp"); err != nil {
if err := c.Call(nil, &r, "test_largeResp"); err != nil {
t.Fatal("call failed:", err)
}
if len(r) != respLength {
......