good morning!!!!

Skip to content
Snippets Groups Projects
server.go 6.56 KiB
Newer Older
a's avatar
a committed
package server
a's avatar
rpc
a committed

import (
	"context"
a's avatar
a committed
	"io"
	"net/http"
a's avatar
wg  
a committed
	"sync"
a's avatar
rpc
a committed
	"sync/atomic"

a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/codec"

a's avatar
a committed
	"gfx.cafe/util/go/bufpool"

a's avatar
rpc
a committed
	mapset "github.com/deckarep/golang-set"
a's avatar
a committed
	"github.com/go-faster/jx"
	"github.com/goccy/go-json"
a's avatar
rpc
a committed
)

// Server is an RPC server.
type Server struct {
a's avatar
a committed
	services codec.Handler
a's avatar
rpc
a committed
	run      int32
	codecs   mapset.Set
a's avatar
a committed
	Tracing  Tracing
}

type Tracing struct {
a's avatar
a committed
	ErrorLogger func(remote codec.ReaderWriter, err error)
a's avatar
rpc
a committed
}

// NewServer creates a new server instance with no registered handlers.
a's avatar
a committed
func NewServer(r codec.Handler) *Server {
a's avatar
rpc
a committed
	server := &Server{
		codecs: mapset.NewSet(),
		run:    1,
	}
a's avatar
a committed
	server.services = r
a's avatar
rpc
a committed
	// Register the default service providing meta information about the RPC service such
	// as the services and methods it offers.
	return server
}

a's avatar
a committed
func (s *Server) printError(remote codec.ReaderWriter, err error) {
a's avatar
a committed
	if err != nil {
a's avatar
ok  
a committed
		return
	}
	if s.Tracing.ErrorLogger != nil {
		s.Tracing.ErrorLogger(remote, err)
a's avatar
a committed
	}
}

a's avatar
rpc
a committed
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
a's avatar
a committed
func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
a's avatar
a committed
	defer remote.Close()
a's avatar
rpc
a committed

	// Don't serve if server is stopped.
	if atomic.LoadInt32(&s.run) == 0 {
		return
	}
	// Add the codec to the set so it can be closed by Stop.
a's avatar
a committed
	s.codecs.Add(remote)
	defer s.codecs.Remove(remote)

	responder := &callResponder{
		toSend:   make(chan *callEnv, 8),
		toNotify: make(chan *notifyEnv, 8),
		remote:   remote,
	}

	ctx, cn := context.WithCancel(pctx)
	defer cn()
a's avatar
a committed
	ctx = ContextWithPeerInfo(ctx, remote.PeerInfo())
a's avatar
a committed
	go func() {
		defer cn()
		err := responder.run(ctx)
		if err != nil {
			s.printError(remote, err)
		}
		// lose
		err = remote.Close()
		if err != nil {
			s.printError(remote, err)
		}
	}()

a's avatar
ok  
a committed
	go func() {
		select {
		case <-ctx.Done():
			remote.Close()
		}
	}()

a's avatar
a committed
	for {
		msgs, err := remote.ReadBatch(ctx)
		if err != nil {
a's avatar
a committed
			remote.Flush()
a's avatar
a committed
			s.printError(remote, err)
			return
		}
a's avatar
a committed
		msg, batch := codec.ParseMessage(msgs)
a's avatar
a committed
		env := &callEnv{
			batch: batch,
		}
		for _, v := range msg {
a's avatar
wg  
a committed
			rw := &callRespWriter{
				msg:           v,
				notifications: responder.toNotify,
				header:        remote.PeerInfo().HTTP.Headers,
			}
			env.responses = append(env.responses, rw)
		}
		wg := sync.WaitGroup{}
		wg.Add(len(msg))
		for _, vv := range env.responses {
			v := vv
			go func() {
				if v.msg.ID == nil {
					wg.Done()
				} else {
					defer wg.Done()
				}
a's avatar
a committed
				s.services.ServeRPC(v, codec.NewRequestFromRaw(
					ctx,
					&codec.RequestMarshaling{
a's avatar
a committed
						ID:      v.msg.ID,
						Version: v.msg.Version,
						Method:  v.msg.Method,
						Params:  v.msg.Params,
						Peer:    remote.PeerInfo(),
a's avatar
a committed
					}))
a's avatar
wg  
a committed
			}()
a's avatar
a committed
		}
a's avatar
wg  
a committed
		wg.Wait()
a's avatar
a committed
		responder.toSend <- env
	}
}

type callResponder struct {
	toSend   chan *callEnv
	toNotify chan *notifyEnv
a's avatar
a committed
	remote   codec.ReaderWriter
a's avatar
a committed
}

func (c *callResponder) run(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case env := <-c.toSend:
			err := c.send(ctx, env)
			if err != nil {
				return err
			}
		case env := <-c.toNotify:
			err := c.notify(ctx, env)
			if err != nil {
				return err
			}
		}
a's avatar
a committed
		if c.remote != nil {
			c.remote.Flush()
		}
a's avatar
a committed
	}
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error {
	buf := bufpool.GetStd()
	defer bufpool.PutStd(buf)
	enc := jx.GetEncoder()
	enc.ResetWriter(c.remote)
	defer jx.PutEncoder(enc)
	buf.Reset()
	enc.ObjStart()
	enc.FieldStart("jsonrpc")
	enc.Str("2.0")
	err := env.dat(buf)
	if err != nil {
		enc.FieldStart("error")
a's avatar
a committed
		err := codec.EncodeError(enc, err)
a's avatar
a committed
		if err != nil {
			return err
		}
	} else {
a's avatar
a committed
		enc.FieldStart("result")
a's avatar
a committed
		enc.Raw(buf.Bytes())
	}
	enc.ObjEnd()
	err = enc.Close()
	if err != nil {
		return err
	}
	return nil
}

func (c *callResponder) send(ctx context.Context, env *callEnv) error {
	buf := bufpool.GetStd()
	defer bufpool.PutStd(buf)
	enc := jx.GetEncoder()
a's avatar
a committed
	enc.Reset()
	//enc.ResetWriter(c.remote)
a's avatar
a committed
	defer jx.PutEncoder(enc)
	if env.batch {
		enc.ArrStart()
	}
	for _, v := range env.responses {
a's avatar
a committed
		if v.msg.ID == nil {
a's avatar
a committed
			continue
		}
		enc.ObjStart()
		enc.FieldStart("jsonrpc")
		enc.Str("2.0")
		enc.FieldStart("id")
a's avatar
wg  
a committed
		enc.Raw(v.msg.ID.RawMessage())
a's avatar
a committed
		err := v.err
a's avatar
a committed
		if err == nil {
			if v.dat != nil {
				buf.Reset()
				err = v.dat(buf)
				if err == nil {
					enc.FieldStart("result")
					enc.Raw(buf.Bytes())
				}
			} else {
				err = codec.NewMethodNotFoundError(v.msg.Method)
a's avatar
a committed
			}
		}
a's avatar
a committed
		if err != nil {
			enc.FieldStart("error")
a's avatar
a committed
			err := codec.EncodeError(enc, err)
a's avatar
a committed
			if err != nil {
				return err
			}
		}
		enc.ObjEnd()
	}
	if env.batch {
		enc.ArrEnd()
	}
a's avatar
a committed
	//err := enc.Close()
	_, err := enc.WriteTo(c.remote)
a's avatar
a committed
	if err != nil {
		return err
	}
	return nil
}
a's avatar
rpc
a committed

a's avatar
a committed
type callEnv struct {
	responses []*callRespWriter
	batch     bool
}

type notifyEnv struct {
a's avatar
a committed
	method string
	dat    func(io.Writer) error
a's avatar
a committed
}

a's avatar
a committed
var _ codec.ResponseWriter = (*callRespWriter)(nil)

a's avatar
a committed
type callRespWriter struct {
a's avatar
a committed
	msg    *codec.Message
a's avatar
a committed
	dat    func(io.Writer) error
	err    error
	skip   bool
	header http.Header

	notifications chan *notifyEnv
}

func (c *callRespWriter) Send(v any, err error) error {
	if err != nil {
		c.err = err
		return nil
	}
	c.dat = func(w io.Writer) error {
		return json.NewEncoder(w).Encode(v)
	}
	return nil
}

func (c *callRespWriter) Option(k string, v any) {
	// no options for now
}

func (c *callRespWriter) Header() http.Header {
	return c.header
}

a's avatar
a committed
func (c *callRespWriter) Notify(method string, v any) error {
a's avatar
a committed
	c.notifications <- &notifyEnv{
a's avatar
a committed
		method: method,
a's avatar
a committed
		dat: func(w io.Writer) error {
			return json.NewEncoder(w).Encode(v)
		},
	}
	return nil
a's avatar
rpc
a committed
}

// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
	if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
		s.codecs.Each(func(c any) bool {
a's avatar
a committed
			c.(codec.ReaderWriter).Close()
a's avatar
rpc
a committed
			return true
		})
	}
}

type peerInfoContextKey struct{}

// PeerInfoFromContext returns information about the client's network connection.
// Use this with the context passed to RPC method handler functions.
//
// The zero value is returned if no connection info is present in ctx.
a's avatar
a committed
func PeerInfoFromContext(ctx context.Context) codec.PeerInfo {
	info, _ := ctx.Value(peerInfoContextKey{}).(codec.PeerInfo)
a's avatar
rpc
a committed
	return info
}
a's avatar
a committed
func ContextWithPeerInfo(ctx context.Context, c codec.PeerInfo) context.Context {
	return context.WithValue(ctx, peerInfoContextKey{}, c)
}