good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 298f0e90 authored by a's avatar a
Browse files

a

parent 993b7773
No related branches found
No related tags found
No related merge requests found
Pipeline #61809 passed
...@@ -8,8 +8,15 @@ import ( ...@@ -8,8 +8,15 @@ import (
"github.com/go-faster/jx" "github.com/go-faster/jx"
) )
// Error types defined below are the built-in JSON-RPC errors. var (
ErrIllegalExtraField = errors.New("invalid extra field")
ErrSendAlreadyCalled = errors.New("send already called")
ErrHijackAlreadyCalled = errors.New("already hijacked")
ErrCantSendNotification = errors.New("can't send to a notification")
ErrNotSupported = errors.New("not supported")
)
// Error types defined below are the built-in JSON-RPC errors.
var ( var (
_ Error = new(ErrorMethodNotFound) _ Error = new(ErrorMethodNotFound)
_ Error = new(ErrorSubscriptionNotFound) _ Error = new(ErrorSubscriptionNotFound)
...@@ -25,12 +32,6 @@ const ( ...@@ -25,12 +32,6 @@ const (
ErrorCodeJrpc = -42000 ErrorCodeJrpc = -42000
) )
var (
ErrIllegalExtraField = errors.New("invalid extra field")
ErrSendAlreadyCalled = errors.New("send already called")
ErrCantSendNotification = errors.New("can't send to a notification")
)
// Error wraps RPC errors, which contain an error code in addition to the message. // Error wraps RPC errors, which contain an error code in addition to the message.
type Error interface { type Error interface {
Error() string // returns the message Error() string // returns the message
......
package jsonrpc
type Hijacker interface {
Hijack() (send MessageStreamer, notify MessageStreamer, err error)
}
...@@ -13,12 +13,6 @@ type ResponseWriter interface { ...@@ -13,12 +13,6 @@ type ResponseWriter interface {
Notify(method string, v any) error Notify(method string, v any) error
} }
type StreamingResponseWriter interface {
ResponseWriter
SendStream(func(MessageStreamer) error) error
NotifyStream(func(MessageStreamer) error) error
}
type Request struct { type Request struct {
ID *ID `json:"id,omitempty"` ID *ID `json:"id,omitempty"`
Method string `json:"method,omitempty"` Method string `json:"method,omitempty"`
......
package jsonrpc
import (
"fmt"
)
type ResponseController struct {
rw ResponseWriter
}
func NewResponseController(rw ResponseWriter) *ResponseController {
return &ResponseController{rw}
}
type rwUnwrapper interface {
Unwrap() ResponseWriter
}
func (c *ResponseController) Hijack() (sender MessageStreamer, notify MessageStreamer, err error) {
rw := c.rw
for {
switch t := rw.(type) {
case Hijacker:
return t.Hijack()
case rwUnwrapper:
rw = t.Unwrap()
default:
return nil, nil, errNotSupported()
}
}
}
func errNotSupported() error {
return fmt.Errorf("%w", ErrNotSupported)
}
...@@ -53,7 +53,6 @@ func serveBatch(ctx context.Context, ...@@ -53,7 +53,6 @@ func serveBatch(ctx context.Context,
returnWg := sync.WaitGroup{} returnWg := sync.WaitGroup{}
returnWg.Add(len(incoming)) returnWg.Add(len(incoming))
for _, v := range incoming { for _, v := range incoming {
canNext := make(chan struct{})
// create the response writer // create the response writer
om, omerr := produceOutputMessage(v) om, omerr := produceOutputMessage(v)
rw := &streamingRespWriter{ rw := &streamingRespWriter{
...@@ -65,9 +64,6 @@ func serveBatch(ctx context.Context, ...@@ -65,9 +64,6 @@ func serveBatch(ctx context.Context,
} }
if rw.id != nil { if rw.id != nil {
totalRequests += 1 totalRequests += 1
rw.done = func() {
close(canNext)
}
} }
req := jsonrpc.NewRawRequest( req := jsonrpc.NewRawRequest(
ctx, ctx,
...@@ -76,16 +72,14 @@ func serveBatch(ctx context.Context, ...@@ -76,16 +72,14 @@ func serveBatch(ctx context.Context,
om.Params, om.Params,
) )
req.Peer = r.peerinfo req.Peer = r.peerinfo
go func() { run := func() {
defer returnWg.Done() defer returnWg.Done()
handler.ServeRPC(rw, req) handler.ServeRPC(rw, req)
if rw.sendCalled == false && rw.id != nil { if rw.sendCalled == false && rw.id != nil {
rw.Send(jsonrpc.Null, nil) rw.Send(jsonrpc.Null, nil)
} }
}()
if rw.id != nil {
<-canNext
} }
run()
} }
err = ansBatch.Close() err = ansBatch.Close()
......
...@@ -19,41 +19,33 @@ type streamingRespWriter struct { ...@@ -19,41 +19,33 @@ type streamingRespWriter struct {
// the id to write the response with // the id to write the response with
id *jsonrpc.ID id *jsonrpc.ID
// a function that is called on the first call to send
// it's optional
done func()
// if set, will ensure that send will always send this error, instead of whatever send does // if set, will ensure that send will always send this error, instead of whatever send does
err error err error
// marks whether or not send was called. it may only be called once // marks whether or not send was called. it may only be called once
sendCalled bool sendCalled bool
// marks whether or not hijack was called
hijackCalled bool
} }
func (c *streamingRespWriter) SendStream(fn func(jsonrpc.MessageStreamer) error) error { func (c *streamingRespWriter) Hijack() (sender jsonrpc.MessageStreamer, notify jsonrpc.MessageStreamer, err error) {
if c.sendCalled { if c.hijackCalled {
return jsonrpc.ErrSendAlreadyCalled return nil, nil, jsonrpc.ErrHijackAlreadyCalled
} }
c.hijackCalled = true
c.sendCalled = true c.sendCalled = true
if c.done != nil { return c.sendStream, c.notifyStream, nil
defer c.done()
}
return fn(c.sendStream)
}
func (c *streamingRespWriter) NotifyStream(fn func(jsonrpc.MessageStreamer) error) error {
return fn(c.notifyStream)
} }
func (c *streamingRespWriter) Send(v any, e error) (err error) { func (c *streamingRespWriter) Send(v any, e error) (err error) {
if c.hijackCalled {
return jsonrpc.ErrHijackAlreadyCalled
}
if c.id == nil { if c.id == nil {
return jsonrpc.ErrCantSendNotification return jsonrpc.ErrCantSendNotification
} }
if c.sendCalled { if c.sendCalled {
return jsonrpc.ErrSendAlreadyCalled return jsonrpc.ErrSendAlreadyCalled
} }
if c.done != nil {
defer c.done()
}
c.sendCalled = true c.sendCalled = true
sentErr := c.err sentErr := c.err
// only override error if not already set // only override error if not already set
...@@ -82,6 +74,10 @@ func (c *streamingRespWriter) Send(v any, e error) (err error) { ...@@ -82,6 +74,10 @@ func (c *streamingRespWriter) Send(v any, e error) (err error) {
} }
func (c *streamingRespWriter) Notify(method string, v any) error { func (c *streamingRespWriter) Notify(method string, v any) error {
if c.hijackCalled {
return jsonrpc.ErrHijackAlreadyCalled
}
msg, err := c.notifyStream.NewMessage(c.ctx) msg, err := c.notifyStream.NewMessage(c.ctx)
if err != nil { if err != nil {
return err return err
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment