good morning!!!!

Skip to content
Snippets Groups Projects
server.go 6.51 KiB
Newer Older
a's avatar
a committed
package server
a's avatar
rpc
a committed

import (
a's avatar
a committed
	"bytes"
a's avatar
rpc
a committed
	"context"
a's avatar
wg  
a committed
	"sync"
a's avatar
rpc
a committed
	"sync/atomic"

a's avatar
a committed
	"gfx.cafe/open/jrpc/pkg/codec"

a's avatar
a committed
	"gfx.cafe/util/go/bufpool"

a's avatar
rpc
a committed
	mapset "github.com/deckarep/golang-set"
a's avatar
a committed
	"github.com/go-faster/jx"
	"github.com/goccy/go-json"
a's avatar
rpc
a committed
)

// Server is an RPC server.
type Server struct {
a's avatar
a committed
	services codec.Handler
a's avatar
rpc
a committed
	run      int32
	codecs   mapset.Set
a's avatar
a committed
	Tracing  Tracing
}

type Tracing struct {
a's avatar
a committed
	ErrorLogger func(remote codec.ReaderWriter, err error)
a's avatar
rpc
a committed
}

// NewServer creates a new server instance with no registered handlers.
a's avatar
a committed
func NewServer(r codec.Handler) *Server {
a's avatar
rpc
a committed
	server := &Server{
		codecs: mapset.NewSet(),
		run:    1,
	}
a's avatar
a committed
	server.services = r
a's avatar
rpc
a committed
	return server
}

a's avatar
a committed
func (s *Server) printError(remote codec.ReaderWriter, err error) {
a's avatar
a committed
	if err != nil {
a's avatar
ok  
a committed
		return
	}
	if s.Tracing.ErrorLogger != nil {
		s.Tracing.ErrorLogger(remote, err)
a's avatar
a committed
	}
}

a's avatar
a committed
func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, responder *callResponder) error {
a's avatar
a committed
	incoming, batch, err := remote.ReadBatch(ctx)
a's avatar
a committed
	if err != nil {
		remote.Flush()
		s.printError(remote, err)
		return err
	}
	env := &callEnv{
		batch: batch,
	}
a's avatar
a committed

a's avatar
a committed
	// check for empty batch
a's avatar
a committed
	if batch && len(incoming) == 0 {
a's avatar
a committed
		// if it is empty batch, send the empty batch warning
		responder.toSend <- &callEnv{
			responses: []*callRespWriter{{
a's avatar
a committed
				pkt: &codec.Message{
					ID:    codec.NewNullIDPtr(),
					Error: codec.NewInvalidRequestError("empty batch"),
				},
a's avatar
a committed
			}},
			batch: false,
		}
		return nil
	}

	// populate the envelope
a's avatar
a committed
	for _, v := range incoming {
a's avatar
a committed
		rw := &callRespWriter{
a's avatar
a committed
			pkt: &codec.Message{
				ID: codec.NewNullIDPtr(),
			},
			msg: &codec.Message{
				ID: codec.NewNullIDPtr(),
			},
a's avatar
a committed
			notifications: responder.toNotify,
			header:        remote.PeerInfo().HTTP.Headers,
		}
a's avatar
a committed
		if v != nil {
			rw.msg = v
			if v.ID != nil {
				rw.pkt.ID = v.ID
			}
a's avatar
a committed
		}
a's avatar
a committed
		env.responses = append(env.responses, rw)
a's avatar
a committed
	}

	// create a waitgroup
	wg := sync.WaitGroup{}
a's avatar
ok  
a committed
	wg.Add(len(env.responses))
a's avatar
a committed
	for _, vRef := range env.responses {
		v := vRef
a's avatar
a committed
		// early respond to nil requests
a's avatar
ok  
a committed
		if v.msg == nil || len(v.msg.Method) == 0 {
a's avatar
a committed
			v.pkt.Error = codec.NewInvalidRequestError("invalid request")
a's avatar
a committed
			wg.Done()
			continue
		}
a's avatar
ok  
a committed
		if v.msg.ID == nil || v.msg.ID.IsNull() {
			// it's a notification, so we mark skip and we don't write anything for it
			v.skip = true
			wg.Done()
			continue
		}
a's avatar
a committed
		go func() {
			defer wg.Done()
a's avatar
a committed
			r := codec.NewRequestFromMessage(
a's avatar
a committed
				ctx,
a's avatar
a committed
				v.msg,
			)
			r.Peer = remote.PeerInfo()
			s.services.ServeRPC(v, r)
a's avatar
a committed
		}()
	}
	wg.Wait()
	responder.toSend <- env
	return nil
}

a's avatar
rpc
a committed
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
a's avatar
a committed
func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
a's avatar
a committed
	defer remote.Close()
a's avatar
rpc
a committed

	// Don't serve if server is stopped.
	if atomic.LoadInt32(&s.run) == 0 {
		return
	}
	// Add the codec to the set so it can be closed by Stop.
a's avatar
a committed
	s.codecs.Add(remote)
	defer s.codecs.Remove(remote)

	responder := &callResponder{
		toSend:   make(chan *callEnv, 8),
		toNotify: make(chan *notifyEnv, 8),
		remote:   remote,
	}

	ctx, cn := context.WithCancel(pctx)
	defer cn()
a's avatar
a committed
	ctx = ContextWithPeerInfo(ctx, remote.PeerInfo())
a's avatar
a committed
	go func() {
		defer cn()
		err := responder.run(ctx)
		if err != nil {
			s.printError(remote, err)
		}
a's avatar
ok  
a committed
	}()

a's avatar
a committed
	for {
a's avatar
a committed
		select {
		case <-ctx.Done():
			remote.Close()
		default:
		}
a's avatar
a committed
		err := s.codecLoop(ctx, remote, responder)
a's avatar
a committed
		if err != nil {
			s.printError(remote, err)
			return
		}
	}
}

a's avatar
a committed
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
	if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
		s.codecs.Each(func(c any) bool {
			c.(codec.ReaderWriter).Close()
			return true
		})
	}
}

a's avatar
a committed
type callResponder struct {
	toSend   chan *callEnv
	toNotify chan *notifyEnv
a's avatar
a committed
	remote   codec.ReaderWriter
a's avatar
a committed
}

func (c *callResponder) run(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case env := <-c.toSend:
			err := c.send(ctx, env)
			if err != nil {
				return err
			}
		case env := <-c.toNotify:
			err := c.notify(ctx, env)
			if err != nil {
				return err
			}
		}
a's avatar
a committed
		if c.remote != nil {
			c.remote.Flush()
		}
a's avatar
a committed
	}
}
a's avatar
a committed

type notifyEnv struct {
	method string
	dat    any
	extra  []codec.RequestField
}

a's avatar
a committed
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error {
a's avatar
a committed
	enc := jx.GetEncoder()
	if cap(enc.Bytes()) < 4096 {
		enc.SetBytes(make([]byte, 0, 4096))
	}
	enc.ResetWriter(c.remote)
	defer jx.PutEncoder(enc)
	//enc := jx.NewStreamingEncoder(c.remote, 4096)
a's avatar
a committed
	msg := &codec.Message{}
	var err error
	//  allocate a temp buffer for this packet
a's avatar
a committed
	buf := bufpool.GetStd()
	defer bufpool.PutStd(buf)
a's avatar
a committed
	err = json.NewEncoder(buf).Encode(env.dat)
a's avatar
a committed
	if err != nil {
a's avatar
a committed
		msg.Error = err
a's avatar
a committed
	} else {
a's avatar
a committed
		msg.Result = buf.Bytes()
a's avatar
a committed
	}
a's avatar
a committed
	msg.ExtraFields = env.extra
	// add the method
	msg.Method = env.method
	err = codec.MarshalMessage(msg, enc)
a's avatar
a committed
	if err != nil {
		return err
	}
a's avatar
a committed
	return enc.Close()
}

type callEnv struct {
	responses []*callRespWriter
	batch     bool
a's avatar
a committed
}

a's avatar
a committed
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
a's avatar
ok  
a committed
	// notification gets nothing
a's avatar
a committed
	// if all msgs in batch are notification, we trigger an allSkip and write nothing
a's avatar
ok  
a committed
	if env.batch {
		allSkip := true
		for _, v := range env.responses {
			if v.skip != true {
				allSkip = false
			}
		}
		if allSkip {
			return nil
		}
	}
a's avatar
a committed
	// create the streaming encoder
a's avatar
a committed
	enc := jx.GetEncoder()
	if cap(enc.Bytes()) < 4096 {
		enc.SetBytes(make([]byte, 0, 4096))
	}
	enc.ResetWriter(c.remote)
	defer jx.PutEncoder(enc)
a's avatar
a committed
	if env.batch {
		enc.ArrStart()
	}
	for _, v := range env.responses {
a's avatar
a committed
		msg := v.pkt
		// if we are a batch AND we are supposed to skip, then continue
		// this means that for a non-batch notification, we do not skip!
a's avatar
ok  
a committed
		if env.batch && v.skip {
a's avatar
a committed
			continue
		}
a's avatar
a committed
		// if there is no error, we try to marshal the result
		if msg.Error == nil {
			buf := bufpool.GetStd()
			defer bufpool.PutStd(buf)
			je := json.NewEncoder(buf)
			err = je.EncodeWithOption(v.dat)
a's avatar
a committed
			if err != nil {
a's avatar
a committed
				msg.Error = err
			} else {
				msg.Result = buf.Bytes()
				msg.Result = bytes.TrimSuffix(msg.Result, []byte{'\n'})
a's avatar
a committed
			}
a's avatar
a committed
		}
		// then marshal the whole message into the stream
		err := codec.MarshalMessage(msg, enc)
		if err != nil {
			return err
		}
a's avatar
a committed
	}
	if env.batch {
		enc.ArrEnd()
	}
a's avatar
a committed
	err = enc.Close()
a's avatar
a committed
	if err != nil {
		return err
	}
	return nil
}