good morning!!!!

Skip to content
Snippets Groups Projects
server.go 4.19 KiB
Newer Older
a's avatar
rpc
a committed
package jrpc

import (
	"context"
	"io"
	"net/http"
a's avatar
rpc
a committed
	"sync/atomic"

a's avatar
a committed
	"tuxpa.in/a/zlog/log"
a's avatar
rpc
a committed
	mapset "github.com/deckarep/golang-set"
)

a's avatar
rpc
a committed

// CodecOption specifies which type of messages a codec supports.

// Server is an RPC server.
type Server struct {
	services Router
	run      int32
	codecs   mapset.Set
}

// NewServer creates a new server instance with no registered handlers.
func NewServer(r ...Router) *Server {
	server := &Server{
		codecs: mapset.NewSet(),
		run:    1,
	}
	if len(r) > 0 {
		server.services = r[0]
	} else {
		server.services = NewRouter()
	}
	// Register the default service providing meta information about the RPC service such
	// as the services and methods it offers.
	rpcService := &RPCService{server}
	server.services.RegisterStruct(MetadataApi, rpcService)
a's avatar
rpc
a committed
	return server
}

func (s *Server) Router() Router {
	return s.services
}

a's avatar
rpc
a committed
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
func (s *Server) ServeCodec(codec ServerCodec) {
a's avatar
a committed
	defer codec.Close()
a's avatar
rpc
a committed

	// Don't serve if server is stopped.
	if atomic.LoadInt32(&s.run) == 0 {
		return
	}

	// Add the codec to the set so it can be closed by Stop.
	s.codecs.Add(codec)
	defer s.codecs.Remove(codec)

	c := initClient(codec, s.services)
a's avatar
a committed
	<-codec.Closed()
a's avatar
rpc
a committed
	c.Close()
}

// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
	// Don't serve if server is stopped.
	if atomic.LoadInt32(&s.run) == 0 {
		return
	}
	// create a new handler for this context
	h := newHandler(ctx, codec, s.services)
	defer h.close(io.EOF, nil)

	// read the HTTP body
a's avatar
a committed
	reqs, batch, err := codec.ReadBatch()
a's avatar
rpc
a committed
	if err != nil {
		if err != io.EOF {
a's avatar
a committed
			codec.WriteJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
a's avatar
rpc
a committed
		}
		return
	}
	if batch {
		h.handleBatch(reqs)
	} else {
		h.handleMsg(reqs[0])
	}
}

// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
	if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
		log.Debug().Msg("RPC server shutting down")
		s.codecs.Each(func(c any) bool {
a's avatar
a committed
			c.(ServerCodec).Close()
a's avatar
rpc
a committed
			return true
		})
	}
}

a's avatar
ok  
a committed
// Deprecated: RPCService gives meta information about the server.
a's avatar
rpc
a committed
// e.g. gives information about the loaded modules.
type RPCService struct {
	server *Server
}

a's avatar
ok  
a committed
// Deprecated: Modules returns the list of RPC services with their version number
a's avatar
rpc
a committed
func (s *RPCService) Modules() map[string]string {
	modules := make(map[string]string)
	for _, route := range s.server.services.Routes() {
		modules[route.Pattern] = "1.0"
	}
	return modules
}

// PeerInfo contains information about the remote end of the network connection.
//
// This is available within RPC method handlers through the context. Call
// PeerInfoFromContext to get information about the client connection related to
// the current method call.
type PeerInfo struct {
	// Transport is name of the protocol used by the client.
	// This can be "http", "ws" or "ipc".
	Transport string

	// Address of client. This will usually contain the IP address and port.
	RemoteAddr string

	// Addditional information for HTTP and WebSocket connections.
a's avatar
a committed
	HTTP HttpInfo
}

type HttpInfo struct {
	// Protocol version, i.e. "HTTP/1.1". This is not set for WebSocket.
	Version string
	// Header values sent by the client.
	UserAgent string
	Origin    string
	Host      string

	Headers http.Header

	WriteHeaders http.Header
a's avatar
rpc
a committed
}

type peerInfoContextKey struct{}

// PeerInfoFromContext returns information about the client's network connection.
// Use this with the context passed to RPC method handler functions.
//
// The zero value is returned if no connection info is present in ctx.
func PeerInfoFromContext(ctx context.Context) PeerInfo {
	info, _ := ctx.Value(peerInfoContextKey{}).(PeerInfo)
	return info
}