Newer
Older
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed.
// the codec will return if either of these conditions are met
// 1. every request read from ReadBatch until ReadBatch returns context.Canceled is processed.
// 2. there is a server related error (failed encoding, broken conn) that was received while processing/reading messages.
func ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter, handler jsonrpc.Handler) error {
// add a cancel to the context so we can cancel all the child tasks on return
ctx = ContextWithMessageStream(ContextWithPeerInfo(
ctx,
remote.PeerInfo(),
), stream,
)
egg, ctx := errgroup.WithContext(ctx)
for {
// read messages from the stream synchronously
incoming, batch, err := remote.ReadBatch(ctx)
if err != nil {
for batch := range batches {
incoming, batch := batch.Messages, batch.Batch
responder := &callResponder{
peerinfo: remote.PeerInfo(),
batch: batch,
stream: stream,
type callResponder struct {
peerinfo jsonrpc.PeerInfo
stream *jsonrpc.MessageStream
batch bool
}
// all notification, so immediately flush a response
err := r.stream.Flush(ctx)
func produceOutputMessage(inputMessage *jsonrpc.Message) (out *jsonrpc.Message, err error) {
// a nil incoming message means return an invalid request.
if inputMessage == nil {
inputMessage = &jsonrpc.Message{ID: jsonrpc.NewNullIDPtr()}
err = jsonrpc.NewInvalidRequestError("invalid request")
}
out = inputMessage
out.Error = nil
// NOTE: in the past, a zero length method was an invalid request
// now that is no longer the case
//// zero length method is always invalid request
// assume if the method is not there AND the id is not there that it's a REQUEST not notification
// this makes sure we add 1 to totalRequests
if out.ID == nil {
out.ID = jsonrpc.NewNullIDPtr()
}