good morning!!!!

Skip to content
Snippets Groups Projects
server.go 3.87 KiB
Newer Older
a's avatar
a committed
package server
a's avatar
rpc
a committed

import (
	"context"
a's avatar
a  
a committed
	"errors"
a's avatar
rpc
a committed

a's avatar
a committed
	"golang.org/x/sync/errgroup"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/open/jrpc/pkg/jsonrpc"
a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/serverutil"
a's avatar
rpc
a committed
)

// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
a's avatar
a committed
// the response back using the given codec. It will block until the codec is closed.
// the codec will return if either of these conditions are met
// 1. every request read from ReadBatch until ReadBatch returns context.Canceled is processed.
// 2. there is a server related error (failed encoding, broken conn) that was received while processing/reading messages.
a's avatar
a  
a committed
func ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter, handler jsonrpc.Handler) error {
a's avatar
ok  
a committed
	// close the remote after handling it
	defer remote.Close()
a's avatar
a committed
	stream := jsonrpc.NewStream(remote)
	// add a cancel to the context so we can cancel all the child tasks on return
a's avatar
a  
a committed
	ctx = ContextWithMessageStream(ContextWithPeerInfo(
		ctx,
		remote.PeerInfo(),
	), stream,
	)
	egg, ctx := errgroup.WithContext(ctx)
a's avatar
a committed
	ctx, cn := context.WithCancel(ctx)
	defer cn()
a's avatar
a  
a committed

a's avatar
ok  
a committed
	errCh := make(chan error, 1)
a's avatar
a committed
	batches := make(chan serverutil.Bundle, 1)
	go func() {
a's avatar
ok  
a committed
		defer func() {
			close(batches)
		}()
		for {
			// read messages from the stream synchronously
			incoming, batch, err := remote.ReadBatch(ctx)
			if err != nil {
a's avatar
a  
a committed
				if errors.Is(err, jsonrpc.ErrNoMoreBatches) || errors.Is(err, context.Canceled) {
a's avatar
a  
a committed
					return
				}
a's avatar
a committed
				select {
				case errCh <- err:
				default:
a's avatar
a committed
				return
			}
a's avatar
a committed
			select {
			case batches <- serverutil.Bundle{
a's avatar
a committed
				Messages: incoming,
				Batch:    batch,
a's avatar
a committed
			}:
			case <-ctx.Done():
				return
a's avatar
a committed
			}
a's avatar
a committed
	// this errgroup controls the max concurrent requests per codec
a's avatar
ok  
a committed
	for batch := range batches {
		incoming, batch := batch.Messages, batch.Batch
		responder := &callResponder{
			peerinfo: remote.PeerInfo(),
			batch:    batch,
			stream:   stream,
a's avatar
a committed
		}
a's avatar
ok  
a committed
		egg.Go(func() error {
a's avatar
a  
a committed
			return serve(ctx, incoming, responder, handler)
a's avatar
ok  
a committed
		})
	}
a's avatar
a  
a committed
	err := egg.Wait()
	if err != nil {
		return err
	}
a's avatar
a committed
	select {
a's avatar
a committed
	case <-ctx.Done():
		return nil
a's avatar
a committed
	case err := <-errCh:
		return err
a's avatar
ok  
a committed
}

a's avatar
a  
a committed
type callResponder struct {
	peerinfo jsonrpc.PeerInfo
	stream   *jsonrpc.MessageStream
	batch    bool
}

a's avatar
a  
a committed
func serve(ctx context.Context,
a's avatar
a committed
	incoming []*jsonrpc.Message,
a's avatar
a committed
	r *callResponder,
a's avatar
a  
a committed
	handler jsonrpc.Handler,
a's avatar
a committed
) error {
	if r.batch {
a's avatar
a  
a committed
		return serveBatch(ctx, incoming, r, handler)
a's avatar
a committed
	} else {
a's avatar
a  
a committed
		return serveSingle(ctx, incoming[0], r, handler)
a's avatar
a committed
	}
}

a's avatar
a  
a committed
func serveSingle(ctx context.Context,
a's avatar
a committed
	incoming *jsonrpc.Message,
a's avatar
a committed
	r *callResponder,
a's avatar
a  
a committed
	handler jsonrpc.Handler,
a's avatar
a committed
) error {
a's avatar
a  
a committed
	om, omerr := produceOutputMessage(incoming)
a's avatar
a committed
	rw := &streamingRespWriter{
a's avatar
a committed
		ctx:          ctx,
		sendStream:   r.stream,
		notifyStream: r.stream,
a's avatar
a  
a committed
		id:           om.ID,
		err:          omerr,
a's avatar
a committed
	}
a's avatar
a committed
	req := jsonrpc.NewRawRequest(
a's avatar
a committed
		ctx,
a's avatar
a committed
		rw.id,
		incoming.Method,
		incoming.Params,
a's avatar
a committed
	)
a's avatar
a committed
	req.Peer = r.peerinfo
	if rw.id == nil {
a's avatar
a committed
		// all notification, so immediately flush a response
		err := r.stream.Flush(ctx)
a's avatar
a committed
		if err != nil {
			return err
		}
	}
a's avatar
a  
a committed
	handler.ServeRPC(rw, req)
a's avatar
a committed
	if rw.sendCalled == false && rw.id != nil {
a's avatar
a committed
		rw.Send(jsonrpc.Null, nil)
a's avatar
a committed
	}
	return nil
}

a's avatar
a  
a committed
func produceOutputMessage(inputMessage *jsonrpc.Message) (out *jsonrpc.Message, err error) {
	// a nil incoming message means return an invalid request.
	if inputMessage == nil {
		inputMessage = &jsonrpc.Message{ID: jsonrpc.NewNullIDPtr()}
		err = jsonrpc.NewInvalidRequestError("invalid request")
	}
	out = inputMessage
	out.Error = nil
a's avatar
a  
a committed
	// NOTE: in the past, a zero length method was an invalid request
	// now that is no longer the case
	//// zero length method is always invalid request
a's avatar
a  
a committed
	if len(out.Method) == 0 {
a's avatar
a  
a committed
		// assume if the method is not there AND the id is not there that it's a REQUEST not notification
a's avatar
a  
a committed
		// this makes sure we add 1 to totalRequests
		if out.ID == nil {
			out.ID = jsonrpc.NewNullIDPtr()
		}
a's avatar
a  
a committed
		//	err = jsonrpc.NewInvalidRequestError("invalid request")
a's avatar
a  
a committed
	}

	return
}