From 9b08938e745757c17c29b7aa7e05a0ed08d287a3 Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Tue, 21 Nov 2023 22:40:39 -0600 Subject: [PATCH] no feature no bug --- contrib/client/pooling.go | 17 -- contrib/client/reconnecting.go | 13 -- contrib/codecs/_broker/broker.go | 40 ---- contrib/codecs/_broker/broker_inproc.go | 168 ----------------- contrib/codecs/_broker/client.go | 231 ----------------------- contrib/codecs/_broker/codec.go | 82 -------- contrib/codecs/_broker/codec_test.go | 28 --- contrib/codecs/_broker/server.go | 36 ---- contrib/codecs/http/client.go | 37 ---- contrib/codecs/rdwr/client.go | 51 ----- contrib/extension/subscription/client.go | 4 - contrib/middleware/log.go | 4 +- exports.go | 2 - pkg/clientutil/helper.go | 28 --- pkg/clientutil/helper_test.go | 73 ------- pkg/jrpctest/suites.go | 59 ------ pkg/jsonrpc/conn.go | 5 - pkg/jsonrpc/conn_dummy.go | 4 - pkg/jsonrpc/peer.go | 2 +- pkg/jsonrpc/reqresp.go | 53 +----- pkg/server/server.go | 12 +- 21 files changed, 16 insertions(+), 933 deletions(-) delete mode 100644 contrib/codecs/_broker/broker.go delete mode 100644 contrib/codecs/_broker/broker_inproc.go delete mode 100644 contrib/codecs/_broker/client.go delete mode 100644 contrib/codecs/_broker/codec.go delete mode 100644 contrib/codecs/_broker/codec_test.go delete mode 100644 contrib/codecs/_broker/server.go diff --git a/contrib/client/pooling.go b/contrib/client/pooling.go index cc9c210..bf7d808 100644 --- a/contrib/client/pooling.go +++ b/contrib/client/pooling.go @@ -79,23 +79,6 @@ func (r *Pooling) Do(ctx context.Context, result any, method string, params any) return <-errChan } -func (r *Pooling) BatchCall(ctx context.Context, b ...*jsonrpc.BatchElem) error { - if r.closed.Load() { - return net.ErrClosed - } - errChan := make(chan error) - go func() { - conn, err := r.getClient(ctx) - if err != nil { - errChan <- err - return - } - defer r.putClient(conn) - errChan <- conn.BatchCall(ctx, b...) - }() - return <-errChan -} - func (p *Pooling) Mount(m jsonrpc.Middleware) { p.middleware = append(p.middleware, m) } diff --git a/contrib/client/reconnecting.go b/contrib/client/reconnecting.go index e1b2662..fda3c55 100644 --- a/contrib/client/reconnecting.go +++ b/contrib/client/reconnecting.go @@ -73,19 +73,6 @@ func (r *Reconnecting) Do(ctx context.Context, result any, method string, params return <-errChan } -func (r *Reconnecting) BatchCall(ctx context.Context, b ...*jsonrpc.BatchElem) error { - errChan := make(chan error) - go func() { - conn, err := r.getClient(ctx) - if err != nil { - errChan <- err - return - } - errChan <- conn.BatchCall(ctx, b...) - }() - return <-errChan -} - func (r *Reconnecting) Mount(m jsonrpc.Middleware) { r.middleware = append(r.middleware, m) } diff --git a/contrib/codecs/_broker/broker.go b/contrib/codecs/_broker/broker.go deleted file mode 100644 index 01dfb5e..0000000 --- a/contrib/codecs/_broker/broker.go +++ /dev/null @@ -1,40 +0,0 @@ -package broker - -import ( - "context" - "encoding/json" - "io" -) - -type ServerSpoke interface { - ReadRequest(ctx context.Context) (json.RawMessage, Replier, error) -} - -type ClientSpoke interface { - WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error - Subscribe(ctx context.Context, clientId string) (Subscription, error) -} - -type Broker interface { - ServerSpoke - ClientSpoke -} - -type Replier interface { - Send(fn func(io.Writer) error) error -} - -type ReplierFunc func(fn func(io.Writer) error) error - -func (r ReplierFunc) Send(fn func(io.Writer) error) error { - return r(fn) -} - -type Subscription interface { - // channel that will close when done or error - Listen() <-chan json.RawMessage - // should close the channel and also stop listening - Close() error - // this hold errors - Err() error -} diff --git a/contrib/codecs/_broker/broker_inproc.go b/contrib/codecs/_broker/broker_inproc.go deleted file mode 100644 index c91308f..0000000 --- a/contrib/codecs/_broker/broker_inproc.go +++ /dev/null @@ -1,168 +0,0 @@ -package broker - -import ( - "context" - "encoding/json" - "strings" - "sync" - "sync/atomic" - - "github.com/go-faster/jx" -) - -type subscription struct { - topic string - ch chan json.RawMessage - err error - - closed atomic.Bool - mu sync.RWMutex -} - -// channel that will close when done or error -func (s *subscription) Listen() <-chan json.RawMessage { - return s.ch -} - -// should close the channel and also stop listening -func (s *subscription) Close() error { - s.closed.CompareAndSwap(false, true) - return nil -} - -// this hold errors -func (s *subscription) Err() error { - s.mu.RLock() - defer s.mu.RUnlock() - return s.err -} - -type frame struct { - topic string - data json.RawMessage -} - -type ChannelBroker struct { - mu sync.RWMutex - subs map[int]*subscription - subCount int - - onDroppedMessage func(string, []byte) - - msgs chan *frame - - domain string -} - -func NewChannelBroker() *ChannelBroker { - return &ChannelBroker{ - subs: map[int]*subscription{}, - msgs: make(chan *frame, 128), - } -} - -func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *ChannelBroker { - b.onDroppedMessage = fn - return b -} - -func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, Replier, error) { - select { - case <-ctx.Done(): - return nil, nil, ctx.Err() - case f := <-b.msgs: - return f.data, ReplierFunc(func(fn func(*jx.Encoder) error) error { - enc := &jx.Encoder{} - err := fn(enc) - if err != nil { - return err - } - return b.Publish(context.Background(), f.topic, json.RawMessage(enc.Bytes())) - }), nil - } -} - -func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json.RawMessage) error { - select { - case <-ctx.Done(): - return ctx.Err() - case b.msgs <- &frame{data: msg, topic: topic}: - } - return nil -} - -func (b *ChannelBroker) Publish(ctx context.Context, topic string, data []byte) error { - b.mu.RLock() - defer b.mu.RUnlock() - for _, v := range b.subs { - if v.closed.Load() { - continue - } - if childTopicMatchesParent(v.topic, topic) { - select { - case v.ch <- data: - default: - if b.onDroppedMessage != nil { - b.onDroppedMessage(topic, data) - } - } - } - } - return nil -} - -func (b *ChannelBroker) Subscribe(ctx context.Context, topic string) (Subscription, error) { - sub := &subscription{ - topic: topic, - ch: make(chan json.RawMessage, 16), - } - b.mu.Lock() - b.subCount = b.subCount + 1 - id := b.subCount - b.subs[id] = sub - b.mu.Unlock() - // gc after adding a new subscription - b.gc() - return sub, nil -} - -func (b *ChannelBroker) gc() { - b.mu.Lock() - defer b.mu.Unlock() - for k, v := range b.subs { - if v.closed.Load() { - delete(b.subs, k) - } - } -} - -// This is to see if a message with topic child should be matched with subscription parent -// so the child cannot contain wildcards * and >, but the parent can -func childTopicMatchesParent(parentString string, childString string) bool { - parent := strings.Split(parentString, ".") - child := strings.Split(childString, ".") - // if the length of the child is less than the length of the parent, its not possible for match - // for instance, if the parent topic is "one.two", and the child is "one", it will never match - if len(child) < len(parent) { - return false - } - // this is safe because length of child must be equal to or lower than parent, from previous - for idx, v := range parent { - // if parent is wildcard, match all, so continue - if v == "*" { - continue - } - // if the > wildcard is at the end, and we have exited since then, we are done - if v == ">" && len(parent)-1 == idx { - return true - } - // else make sure parent matches child. - if child[idx] != v { - return false - } - } - if len(child) == len(parent) { - return true - } - return false -} diff --git a/contrib/codecs/_broker/client.go b/contrib/codecs/_broker/client.go deleted file mode 100644 index 9c41206..0000000 --- a/contrib/codecs/_broker/client.go +++ /dev/null @@ -1,231 +0,0 @@ -package broker - -import ( - "bytes" - "context" - "encoding/json" - "sync" - - "gfx.cafe/open/jrpc/pkg/clientutil" - "gfx.cafe/open/jrpc/pkg/jsonrpc" - "github.com/rs/xid" -) - -type Client struct { - p *clientutil.IdReply - - c ClientSpoke - clientId string - - ctx context.Context - cn context.CancelFunc - - m jsonrpc.Middlewares - handler jsonrpc.Handler - mu sync.RWMutex - - handlerPeer jsonrpc.PeerInfo -} - -func NewClient(spoke ClientSpoke) *Client { - cl := &Client{ - c: spoke, - p: clientutil.NewIdReply(), - handlerPeer: jsonrpc.PeerInfo{ - Transport: "broker", - RemoteAddr: "", - }, - // this doesn't need to be secure bc... you have access to the redis instance lol - clientId: xid.New().String(), - handler: jsonrpc.HandlerFunc(func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {}), - } - cl.ctx, cl.cn = context.WithCancel(context.Background()) - go cl.listen() - return cl -} - -func (c *Client) Closed() <-chan struct{} { - return c.ctx.Done() -} - -func (c *Client) SetHandlerPeer(pi jsonrpc.PeerInfo) { - c.handlerPeer = pi -} - -func (c *Client) Mount(h jsonrpc.Middleware) { - c.mu.Lock() - defer c.mu.Unlock() - c.m = append(c.m, h) - c.handler = c.m.HandlerFunc(func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) { - // do nothing on no handler - }) -} - -func (c *Client) listen() error { - defer c.cn() - sub, err := c.c.Subscribe(c.ctx, c.clientId) - if err != nil { - return err - } - defer sub.Close() - for { - var incomingMsg json.RawMessage - select { - case incomingMsg = <-sub.Listen(): - case <-c.ctx.Done(): - return c.ctx.Err() - } - msgs, _ := jsonrpc.ParseMessage(incomingMsg) - for i := range msgs { - v := msgs[i] - if v == nil { - continue - } - id := v.ID - // messages without ids are notifications - if id == nil { - var handler jsonrpc.Handler - c.mu.RLock() - handler = c.handler - c.mu.RUnlock() - // writer should only be allowed to send notifications - // reader should contain the message above - // the context is the client context - req := jsonrpc.NewRawRequest(c.ctx, - nil, - v.Method, - v.Params, - ) - req.Peer = c.handlerPeer - handler.ServeRPC(nil, req) - continue - } - var err error - if v.Error != nil { - err = v.Error - } - c.p.Resolve(*id, v.Result, err) - } - } - -} - -func (c *Client) Do(ctx context.Context, result any, method string, params any) error { - id := c.p.NextId() - req, err := jsonrpc.NewRequest(ctx, jsonrpc.NewId(id), method, params) - if err != nil { - return err - } - fwd, err := json.Marshal(req) - if err != nil { - return err - } - err = c.writeContext(req.Context(), fwd) - if err != nil { - return err - } - ans, err := c.p.Ask(req.Context(), *id) - if err != nil { - return err - } - if result != nil { - err = json.Unmarshal(ans, result) - if err != nil { - return err - } - } - return nil -} - -func (c *Client) BatchCall(ctx context.Context, b ...*jsonrpc.BatchElem) error { - if ctx == nil { - ctx = context.Background() - } - buf := new(bytes.Buffer) - enc := json.NewEncoder(buf) - reqs := make([]*jsonrpc.Request, 0, len(b)) - ids := make([]*jsonrpc.ID, 0, len(b)) - for _, v := range b { - id := c.p.NextId() - req, err := jsonrpc.NewRequest(ctx, jsonrpc.NewId(id), v.Method, v.Params) - if err != nil { - return err - } - ids = append(ids, id) - reqs = append(reqs, req) - } - err := enc.Encode(reqs) - if err != nil { - return err - } - err = c.writeContext(ctx, buf.Bytes()) - if err != nil { - return err - } - // TODO: wait for response - wg := sync.WaitGroup{} - wg.Add(len(ids)) - for i := range ids { - idx := i - go func() { - defer wg.Done() - ans, err := c.p.Ask(reqs[idx].Context(), *ids[idx]) - if err != nil { - b[idx].Error = err - return - } - if b[idx].Result != nil { - err = json.Unmarshal(ans, b[idx].Result) - if err != nil { - b[idx].Error = err - return - } - } - }() - } - wg.Wait() - - return err -} -func (c *Client) Notify(ctx context.Context, method string, params any) error { - if ctx == nil { - ctx = context.Background() - } - req, err := jsonrpc.NewRequest(ctx, nil, method, params) - if err != nil { - return err - } - fwd, err := json.Marshal(req) - if err != nil { - return err - } - return c.writeContext(ctx, fwd) -} - -func (c *Client) SetHeader(key string, value string) { -} - -func (c *Client) Close() error { - c.cn() - return nil -} - -func (c *Client) writeContext(ctx context.Context, xs []byte) error { - errch := make(chan error) - go func() { - err := c.c.WriteRequest(ctx, c.clientId, xs) - select { - case errch <- err: - case <-ctx.Done(): - case <-c.ctx.Done(): - } - }() - select { - case err := <-errch: - return err - case <-c.ctx.Done(): - return c.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - } -} diff --git a/contrib/codecs/_broker/codec.go b/contrib/codecs/_broker/codec.go deleted file mode 100644 index 7a59323..0000000 --- a/contrib/codecs/_broker/codec.go +++ /dev/null @@ -1,82 +0,0 @@ -package broker - -import ( - "bytes" - "context" - "encoding/json" - "sync/atomic" - - "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/serverutil" - "github.com/gogo/protobuf/io" -) - -var _ jsonrpc.ReaderWriter = (*Codec)(nil) - -type Codec struct { - ctx context.Context - cn func() - - wr bytes.Buffer - replier Replier - ansCh chan *serverutil.Bundle - closed atomic.Bool - closeCh chan struct{} - - i jsonrpc.PeerInfo -} - -type httpError struct { - code int - err error -} - -func NewCodec(req json.RawMessage, replier Replier) *Codec { - c := &Codec{ - replier: replier, - ansCh: make(chan *serverutil.Bundle, 1), - closeCh: make(chan struct{}), - } - c.ctx, c.cn = context.WithCancel(context.Background()) - bundle := serverutil.ParseBundle(req) - c.ansCh <- bundle - return c -} - -// gets the peer info -func (c *Codec) PeerInfo() jsonrpc.PeerInfo { - return c.i -} - -func (c *Codec) ReadBatch(ctx context.Context) ([]*jsonrpc.Message, bool, error) { - select { - case ans := <-c.ansCh: - return ans.Messages, ans.Batch, nil - case <-ctx.Done(): - return nil, false, ctx.Err() - case <-c.ctx.Done(): - return nil, false, c.ctx.Err() - } -} - -// closes the connection -func (c *Codec) Close() error { - if c.closed.CompareAndSwap(false, true) { - close(c.closeCh) - } - c.cn() - return nil -} - -func (c *Codec) Send(fn func(io.Writer) error) error { - return c.replier.Send(fn) -} - -func (c *Codec) Flush() error { - return c.replier.Send(fn) -} - -// Closed returns a channel which is closed when the connection is closed. -func (c *Codec) Closed() <-chan struct{} { - return c.closeCh -} diff --git a/contrib/codecs/_broker/codec_test.go b/contrib/codecs/_broker/codec_test.go deleted file mode 100644 index 077a492..0000000 --- a/contrib/codecs/_broker/codec_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package broker - -import ( - "context" - "testing" - - "gfx.cafe/open/jrpc/pkg/jsonrpc" - "gfx.cafe/open/jrpc/pkg/server" - - "gfx.cafe/open/jrpc/pkg/jrpctest" -) - -func TestBasicSuite(t *testing.T) { - ctx := context.Background() - jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ - ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) { - broker := NewChannelBroker() - s := jrpctest.NewServer() - spokeServer := (&Server{Server: s}) - go spokeServer.ServeSpoke(ctx, broker) - return s, func() jsonrpc.Conn { - conn := NewClient(broker) - return conn - }, func() { - } - }, - }) -} diff --git a/contrib/codecs/_broker/server.go b/contrib/codecs/_broker/server.go deleted file mode 100644 index b2c3f07..0000000 --- a/contrib/codecs/_broker/server.go +++ /dev/null @@ -1,36 +0,0 @@ -package broker - -import ( - "context" - - "gfx.cafe/open/jrpc/pkg/server" -) - -type Server struct { - Server *server.Server -} - -func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) { - for { - select { - case <-ctx.Done(): - return - default: - } - req, fn, err := stream.ReadRequest(ctx) - if err != nil { - continue - } - if req == nil { - continue - } - cd := NewCodec(req, fn) - go func() { - err := s.Server.ServeCodec(ctx, cd) - if err != nil { - // slog.Error("codec err", "err", err) - } - cd.Close() - }() - } -} diff --git a/contrib/codecs/http/client.go b/contrib/codecs/http/client.go index 9000516..62e297b 100644 --- a/contrib/codecs/http/client.go +++ b/contrib/codecs/http/client.go @@ -1,7 +1,6 @@ package http import ( - "bytes" "context" "crypto/tls" "encoding/json" @@ -137,42 +136,6 @@ func (c *Client) Notify(ctx context.Context, method string, params any) error { return err } -func (c *Client) BatchCall(ctx context.Context, b ...*jsonrpc.BatchElem) error { - reqs := make([]*jsonrpc.Request, len(b)) - ids := make(map[int]int, len(b)) - for idx, v := range b { - var rid *jsonrpc.ID - if v.IsNotification { - } else { - id := int(c.id.Add(1)) - ids[idx] = id - rid = jsonrpc.NewNumberIDPtr(int64(id)) - } - req, err := jsonrpc.NewRequest(ctx, rid, v.Method, v.Params) - if err != nil { - return err - } - reqs = append(reqs, req) - } - dat, err := json.Marshal(reqs) - if err != nil { - return err - } - resp, err := c.postBuf(ctx, bytes.NewBuffer(dat)) - if err != nil { - return err - } - defer resp.Body.Close() - - msgs := []*jsonrpc.Message{} - err = json.NewDecoder(resp.Body).Decode(&msgs) - if err != nil { - return err - } - clientutil.FillBatch(ids, msgs, b) - return nil -} - func (c *Client) Close() error { return nil } diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index 3cf36f6..9008596 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -137,57 +137,6 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any) return nil } -func (c *Client) BatchCall(ctx context.Context, b ...*jsonrpc.BatchElem) error { - if ctx == nil { - ctx = context.Background() - } - buf := bufpool.GetStd() - defer bufpool.PutStd(buf) - enc := json.NewEncoder(buf) - reqs := make([]*jsonrpc.Request, 0, len(b)) - ids := make([]*jsonrpc.ID, 0, len(b)) - for _, v := range b { - id := c.p.NextId() - req, err := jsonrpc.NewRequest(ctx, jsonrpc.NewId(id), v.Method, v.Params) - if err != nil { - return err - } - ids = append(ids, id) - reqs = append(reqs, req) - } - err := enc.Encode(reqs) - if err != nil { - return err - } - err = c.writeContext(ctx, buf.Bytes()) - if err != nil { - return err - } - // TODO: wait for response - wg := sync.WaitGroup{} - wg.Add(len(ids)) - for i := range ids { - idx := i - go func() { - defer wg.Done() - ans, err := c.p.Ask(reqs[idx].Context(), *ids[idx]) - if err != nil { - b[idx].Error = err - return - } - if b[idx].Result != nil { - err = json.NewDecoder(ans).Decode(b[idx].Result) - if err != nil { - b[idx].Error = err - return - } - } - }() - } - wg.Wait() - - return err -} func (c *Client) Notify(ctx context.Context, method string, params any) error { if ctx == nil { ctx = context.Background() diff --git a/contrib/extension/subscription/client.go b/contrib/extension/subscription/client.go index a2494c4..1ed1c27 100644 --- a/contrib/extension/subscription/client.go +++ b/contrib/extension/subscription/client.go @@ -128,10 +128,6 @@ func (c *WrapClient) Do(ctx context.Context, result any, method string, params a return c.conn.Do(ctx, result, method, params) } -func (c *WrapClient) BatchCall(ctx context.Context, b ...*jsonrpc.BatchElem) error { - return c.conn.BatchCall(ctx, b...) -} - func (c *WrapClient) Close() error { return c.conn.Close() } diff --git a/contrib/middleware/log.go b/contrib/middleware/log.go index 4e4329d..9c24835 100644 --- a/contrib/middleware/log.go +++ b/contrib/middleware/log.go @@ -20,9 +20,9 @@ func NewLogger(logger *slog.Logger) func(next jsonrpc.Handler) jsonrpc.Handler { fn := func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) { start := time.Now() lg := logger.With( - "remote", r.Remote(), + "remote", r.Peer.RemoteAddr, "method", r.Method, - "params", string(r.Msg().Params), + "params", string(r.Params), ) if id := GetReqID(r.Context()); id != "" { lg = logger.With( diff --git a/exports.go b/exports.go index f88a121..1275291 100644 --- a/exports.go +++ b/exports.go @@ -30,8 +30,6 @@ type ( Server = server.Server // Middleware is a middleware Middleware = func(Handler) Handler - // BatchElem is an element of a batch request - BatchElem = jsonrpc.BatchElem ) var ( diff --git a/pkg/clientutil/helper.go b/pkg/clientutil/helper.go index 962e2f5..3fd99c5 100644 --- a/pkg/clientutil/helper.go +++ b/pkg/clientutil/helper.go @@ -1,9 +1,6 @@ package clientutil import ( - "encoding/json" - "fmt" - "gfx.cafe/util/go/generic" "gfx.cafe/open/jrpc/pkg/jsonrpc" @@ -25,28 +22,3 @@ func GetMessage() *jsonrpc.Message { func PutMessage(x *jsonrpc.Message) { msgPool.Put(x) } - -func FillBatch(ids map[int]int, msgs []*jsonrpc.Message, b []*jsonrpc.BatchElem) { - answers := make(map[int]*jsonrpc.Message, len(msgs)) - for _, v := range msgs { - answers[v.ID.Number()] = v - } - for idx, id := range ids { - ans, ok := answers[id] - if !ok { - b[idx].Error = fmt.Errorf("No response found") - continue - } - if ans.Error != nil { - b[idx].Error = ans.Error - continue - } - if b[idx].Result == nil { - continue - } - err := json.NewDecoder(ans.Result).Decode(b[idx].Result) - if err != nil { - b[idx].Error = err - } - } -} diff --git a/pkg/clientutil/helper_test.go b/pkg/clientutil/helper_test.go index 1519b26..cda8273 100644 --- a/pkg/clientutil/helper_test.go +++ b/pkg/clientutil/helper_test.go @@ -1,78 +1,5 @@ package clientutil -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "gfx.cafe/open/jrpc/pkg/jsonrpc" -) - func ptr[T any](v T) *T { return &v } - -func TestFillBatch(t *testing.T) { - msgs := []*jsonrpc.Message{ - { - ID: ptr(jsonrpc.ID(`"5"`)), - Result: jsonrpc.NewStringReader(`["test", "abc", "123"]`), - }, - { - ID: ptr(jsonrpc.ID(`"6"`)), - Result: jsonrpc.NewStringReader(`12345`), - }, - {}, - { - ID: ptr(jsonrpc.ID(`"7"`)), - Result: jsonrpc.NewStringReader(`"abcdefgh"`), - }, - } - ids := map[int]int{ - 0: 5, - 1: 6, - 3: 7, - } - b := []*jsonrpc.BatchElem{ - { - Result: new([]string), - }, - { - Result: new(int), - }, - {}, - { - Result: new(string), - }, - } - - FillBatch(ids, msgs, b) - - wantResult := []*jsonrpc.BatchElem{ - { - Result: &[]string{ - "test", - "abc", - "123", - }, - }, - { - Result: ptr(12345), - }, - {}, - { - Result: ptr("abcdefgh"), - }, - } - - require.EqualValues(t, len(b), len(wantResult)) - for i := range b { - expected := wantResult[i] - actual := b[i] - assert.EqualValuesf(t, expected.Method, actual.Method, "item %d", i) - assert.EqualValuesf(t, expected.Result, actual.Result, "item %d", i) - assert.EqualValuesf(t, expected.Params, actual.Params, "item %d", i) - assert.EqualValuesf(t, expected.Error, actual.Error, "item %d", i) - } -} diff --git a/pkg/jrpctest/suites.go b/pkg/jrpctest/suites.go index a36dacb..210c7d9 100644 --- a/pkg/jrpctest/suites.go +++ b/pkg/jrpctest/suites.go @@ -76,65 +76,6 @@ func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) { assert.Error(t, err, "passing var as nil gives error") }) - makeTest("BatchRequest", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { - batch := []*jsonrpc.BatchElem{ - { - Method: "test_echo", - Params: []any{"hello", 10, &EchoArgs{"world"}}, - Result: new(EchoResult), - }, - { - Method: "test_echo", - Params: []any{"hello2", 11, &EchoArgs{"world"}}, - Result: new(EchoResult), - }, - { - Method: "test_echo", - Params: []any{"hello3", 12, &EchoArgs{"world"}}, - IsNotification: true, - }, - { - Method: "no/such/method", - Params: []any{1, 2, 3}, - Result: new(int), - }, - } - if err := client.BatchCall(nil, batch...); err != nil { - t.Fatal(err) - } - wantResult := []*jsonrpc.BatchElem{ - { - Method: "test_echo", - Params: []any{"hello", 10, &EchoArgs{"world"}}, - Result: &EchoResult{"hello", 10, &EchoArgs{"world"}}, - }, - { - Method: "test_echo", - Params: []any{"hello2", 11, &EchoArgs{"world"}}, - Result: &EchoResult{"hello2", 11, &EchoArgs{"world"}}, - }, - { - Method: "test_echo", - Params: []any{"hello3", 12, &EchoArgs{"world"}}, - }, - { - Method: "no/such/method", - Params: []any{1, 2, 3}, - Result: new(int), - Error: &jsonrpc.JsonError{Code: -32601, Message: "the method no/such/method does not exist/is not available"}, - }, - } - require.EqualValues(t, len(batch), len(wantResult)) - for i := range batch { - a := batch[i] - b := wantResult[i] - assert.EqualValuesf(t, b.Method, a.Method, "item %d", i) - assert.EqualValuesf(t, b.Result, a.Result, "item %d", i) - assert.EqualValuesf(t, b.Params, a.Params, "item %d", i) - assert.EqualValuesf(t, b.Error, a.Error, "item %d", i) - } - }) - makeTest("ResposeType2", func(t *testing.T, server *server.Server, client jsonrpc.Conn) { if err := jsonrpc.CallInto(nil, client, nil, "test_echo", "hello", 10, &EchoArgs{"world"}); err != nil { t.Errorf("Passing nil as result should be fine, but got an error: %v", err) diff --git a/pkg/jsonrpc/conn.go b/pkg/jsonrpc/conn.go index 1c54ac4..ad0f7f2 100644 --- a/pkg/jsonrpc/conn.go +++ b/pkg/jsonrpc/conn.go @@ -8,7 +8,6 @@ import ( type Conn interface { Doer Notifier - BatchCaller Mounter @@ -20,10 +19,6 @@ type Doer interface { Do(ctx context.Context, result any, method string, params any) error } -type BatchCaller interface { - BatchCall(ctx context.Context, b ...*BatchElem) error -} - type Notifier interface { Notify(ctx context.Context, method string, params any) error } diff --git a/pkg/jsonrpc/conn_dummy.go b/pkg/jsonrpc/conn_dummy.go index 548f49d..ae832e9 100644 --- a/pkg/jsonrpc/conn_dummy.go +++ b/pkg/jsonrpc/conn_dummy.go @@ -18,10 +18,6 @@ func (d *DummyClient) Do(ctx context.Context, result any, method string, params panic("not implemented") // TODO: Implement } -func (d *DummyClient) BatchCall(ctx context.Context, b ...*BatchElem) error { - panic("not implemented") // TODO: Implement -} - func (d *DummyClient) Close() error { panic("not implemented") // TODO: Implement } diff --git a/pkg/jsonrpc/peer.go b/pkg/jsonrpc/peer.go index b0c2cf7..dcb2915 100644 --- a/pkg/jsonrpc/peer.go +++ b/pkg/jsonrpc/peer.go @@ -11,7 +11,7 @@ type PeerInfo struct { // Address of client. This will usually contain the IP address and port. RemoteAddr string - // Addditional information for HTTP and WebSocket connections. + // Additional information for HTTP and WebSocket connections. HTTP HttpInfo } diff --git a/pkg/jsonrpc/reqresp.go b/pkg/jsonrpc/reqresp.go index 5a4d12f..4959e25 100644 --- a/pkg/jsonrpc/reqresp.go +++ b/pkg/jsonrpc/reqresp.go @@ -12,43 +12,13 @@ type ResponseWriter interface { Notify(method string, v any) error } -// BatchElem is an element in a batch request. -type BatchElem struct { - Method string - Params any - - IsNotification bool - - // The result is unmarshaled into this field. Result must be set to a - // non-nil pointer value of the desired type, otherwise the response will be - // discarded. - Result any - // Error is set if the server returns an error for this request, or if - // unmarshaling into Result fails. It is not set for I/O errors. - Error error -} - type Request struct { - ctx context.Context - Peer PeerInfo `json:"-"` - - Message -} - -func (r *Request) UnmarshalJSON(xs []byte) error { - return r.Message.UnmarshalJSON(xs) -} + ID *ID `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Peer PeerInfo `json:"-"` -func (r Request) MarshalJSON() ([]byte, error) { - return r.Message.MarshalJSON() -} - -func NewRequestFromMessage(ctx context.Context, message *Message) (r *Request) { - if ctx == nil { - ctx = context.Background() - } - r = &Request{ctx: ctx, Message: *message} - return r + ctx context.Context } func NewRawRequest(ctx context.Context, id *ID, method string, params json.RawMessage) (r *Request) { @@ -75,18 +45,6 @@ func (r *Request) Context() context.Context { return r.ctx } -func (r *Request) Msg() Message { - return Message{ - ID: r.ID, - Method: r.Method, - Params: r.Params, - } -} - -func (r *Request) Remote() string { - return r.Peer.RemoteAddr -} - func (r *Request) WithContext(ctx context.Context) *Request { if ctx == nil { panic("nil context") @@ -98,7 +56,6 @@ func (r *Request) WithContext(ctx context.Context) *Request { r2.ID = r.ID r2.Method = r.Method r2.Params = r.Params - r2.Error = r.Error r2.Peer = r.Peer return r2 } diff --git a/pkg/server/server.go b/pkg/server/server.go index 33a241e..591860c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -101,9 +101,11 @@ func (s *Server) serveSingle(ctx context.Context, cr: r, } rw.msg, rw.err = produceOutputMessage(incoming) - req := jsonrpc.NewRequestFromMessage( + req := jsonrpc.NewRawRequest( ctx, - rw.msg, + rw.msg.ID, + rw.msg.Method, + rw.msg.Params, ) req.Peer = r.remote.PeerInfo() if rw.msg.ID == nil { @@ -210,9 +212,11 @@ func (s *Server) serveBatch(ctx context.Context, // TODO: stress test this. go func() { defer returnWg.Done() - req := jsonrpc.NewRequestFromMessage( + req := jsonrpc.NewRawRequest( ctx, - v.msg, + v.msg.ID, + v.msg.Method, + v.msg.Params, ) req.Peer = peerInfo s.services.ServeRPC(v, req) -- GitLab