good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 8ba87a2f authored by a's avatar a
Browse files

a

parent c6e8b132
No related branches found
No related tags found
1 merge request!19Simplify
Pipeline #28672 failed with stage
in 54 minutes and 13 seconds
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"io" "io"
) )
// ReaderWriter represents a single stream
// this stream can be used to send/receive an arbitrary amount of requests and notifications
type ReaderWriter interface { type ReaderWriter interface {
Reader Reader
Writer Writer
......
...@@ -17,7 +17,7 @@ type callRespWriter struct { ...@@ -17,7 +17,7 @@ type callRespWriter struct {
skip bool skip bool
header http.Header header http.Header
notifications chan *notifyEnv notifications func(env *notifyEnv) error
} }
func (c *callRespWriter) Send(v any, err error) error { func (c *callRespWriter) Send(v any, err error) error {
...@@ -39,10 +39,9 @@ func (c *callRespWriter) Header() http.Header { ...@@ -39,10 +39,9 @@ func (c *callRespWriter) Header() http.Header {
} }
func (c *callRespWriter) Notify(method string, v any) error { func (c *callRespWriter) Notify(method string, v any) error {
c.notifications <- &notifyEnv{ return c.notifications(&notifyEnv{
method: method, method: method,
dat: v, dat: v,
extra: c.pkt.ExtraFields, extra: c.pkt.ExtraFields,
} })
return nil
} }
...@@ -3,6 +3,7 @@ package server ...@@ -3,6 +3,7 @@ package server
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
...@@ -16,6 +17,8 @@ import ( ...@@ -16,6 +17,8 @@ import (
) )
// Server is an RPC server. // Server is an RPC server.
// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics.
// a server can be used to listenandserve multiple codecs at a time
type Server struct { type Server struct {
services codec.Handler services codec.Handler
run int32 run int32
...@@ -24,7 +27,6 @@ type Server struct { ...@@ -24,7 +27,6 @@ type Server struct {
} }
type Tracing struct { type Tracing struct {
ErrorLogger func(remote codec.ReaderWriter, err error)
} }
// NewServer creates a new server instance with no registered handlers. // NewServer creates a new server instance with no registered handlers.
...@@ -37,20 +39,10 @@ func NewServer(r codec.Handler) *Server { ...@@ -37,20 +39,10 @@ func NewServer(r codec.Handler) *Server {
return server return server
} }
func (s *Server) printError(remote codec.ReaderWriter, err error) {
if err != nil {
return
}
if s.Tracing.ErrorLogger != nil {
s.Tracing.ErrorLogger(remote, err)
}
}
func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, responder *callResponder) error { func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, responder *callResponder) error {
incoming, batch, err := remote.ReadBatch(ctx) incoming, batch, err := remote.ReadBatch(ctx)
if err != nil { if err != nil {
remote.Flush() remote.Flush()
s.printError(remote, err)
return err return err
} }
env := &callEnv{ env := &callEnv{
...@@ -59,8 +51,8 @@ func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, respo ...@@ -59,8 +51,8 @@ func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, respo
// check for empty batch // check for empty batch
if batch && len(incoming) == 0 { if batch && len(incoming) == 0 {
// if it is empty batch, send the empty batch warning // if it is empty batch, send the empty batch error and immediately return
responder.toSend <- &callEnv{ return responder.send(ctx, &callEnv{
responses: []*callRespWriter{{ responses: []*callRespWriter{{
pkt: &codec.Message{ pkt: &codec.Message{
ID: codec.NewNullIDPtr(), ID: codec.NewNullIDPtr(),
...@@ -68,94 +60,84 @@ func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, respo ...@@ -68,94 +60,84 @@ func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, respo
}, },
}}, }},
batch: false, batch: false,
} })
return nil
} }
// populate the envelope // populate the envelope we are about to send. this is synchronous pre-prpcessing
for _, v := range incoming { for _, v := range incoming {
// create the response writer
rw := &callRespWriter{ rw := &callRespWriter{
pkt: &codec.Message{ notifications: func(env *notifyEnv) error { return responder.notify(ctx, env) },
ID: codec.NewNullIDPtr(),
},
msg: &codec.Message{
ID: codec.NewNullIDPtr(),
},
notifications: responder.toNotify,
header: remote.PeerInfo().HTTP.Headers, header: remote.PeerInfo().HTTP.Headers,
} }
if v != nil {
rw.msg = v
if v.ID != nil {
rw.pkt.ID = v.ID
}
}
env.responses = append(env.responses, rw) env.responses = append(env.responses, rw)
// a nil incoming message means an empty response
if v == nil {
rw.msg = &codec.Message{ID: codec.NewNullIDPtr()}
rw.pkt = &codec.Message{ID: codec.NewNullIDPtr()}
continue
}
rw.msg = v
if v.ID == nil {
rw.pkt = &codec.Message{ID: codec.NewNullIDPtr()}
continue
}
rw.pkt = &codec.Message{ID: v.ID}
} }
// create a waitgroup // create a waitgroup
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(env.responses)) wg.Add(len(env.responses))
// for each item in the envelope
peerInfo := remote.PeerInfo()
for _, vRef := range env.responses { for _, vRef := range env.responses {
v := vRef v := vRef
// early respond to nil requests // process each request in its own goroutine
if v.msg == nil || len(v.msg.Method) == 0 {
v.pkt.Error = codec.NewInvalidRequestError("invalid request")
wg.Done()
continue
}
if v.msg.ID == nil || v.msg.ID.IsNull() {
// it's a notification, so we mark skip and we don't write anything for it
v.skip = true
wg.Done()
continue
}
go func() { go func() {
defer wg.Done() defer wg.Done()
// early respond to nil requests
if v.msg == nil || len(v.msg.Method) == 0 {
v.pkt.Error = codec.NewInvalidRequestError("invalid request")
return
}
if v.msg.ID == nil || v.msg.ID.IsNull() {
// it's a notification, so we mark skip and we don't write anything for it
v.skip = true
return
}
r := codec.NewRequestFromMessage( r := codec.NewRequestFromMessage(
ctx, ctx,
v.msg, v.msg,
) )
r.Peer = remote.PeerInfo() r.Peer = peerInfo
s.services.ServeRPC(v, r) s.services.ServeRPC(v, r)
}() }()
} }
wg.Wait() wg.Wait()
responder.toSend <- env return responder.send(ctx, env)
return nil
} }
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the // the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed. // server is stopped. In either case the codec is closed.
func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) error {
defer remote.Close() defer remote.Close()
// Don't serve if server is stopped. // Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 { if atomic.LoadInt32(&s.run) == 0 {
return return fmt.Errorf("Server stopped")
} }
// Add the codec to the set so it can be closed by Stop. // Add the codec to the set so it can be closed by Stop.
s.codecs.Add(remote) s.codecs.Add(remote)
defer s.codecs.Remove(remote) defer s.codecs.Remove(remote)
responder := &callResponder{ responder := &callResponder{
toSend: make(chan *callEnv, 8), remote: remote,
toNotify: make(chan *notifyEnv, 8),
remote: remote,
} }
ctx, cn := context.WithCancel(pctx) ctx, cn := context.WithCancel(pctx)
defer cn() defer cn()
ctx = ContextWithPeerInfo(ctx, remote.PeerInfo()) ctx = ContextWithPeerInfo(ctx, remote.PeerInfo())
go func() {
defer cn()
err := responder.run(ctx)
if err != nil {
s.printError(remote, err)
}
}()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
...@@ -164,8 +146,7 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { ...@@ -164,8 +146,7 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
} }
err := s.codecLoop(ctx, remote, responder) err := s.codecLoop(ctx, remote, responder)
if err != nil { if err != nil {
s.printError(remote, err) return err
return
} }
} }
} }
...@@ -183,31 +164,7 @@ func (s *Server) Stop() { ...@@ -183,31 +164,7 @@ func (s *Server) Stop() {
} }
type callResponder struct { type callResponder struct {
toSend chan *callEnv remote codec.ReaderWriter
toNotify chan *notifyEnv
remote codec.ReaderWriter
}
func (c *callResponder) run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case env := <-c.toSend:
err := c.send(ctx, env)
if err != nil {
return err
}
case env := <-c.toNotify:
err := c.notify(ctx, env)
if err != nil {
return err
}
}
if c.remote != nil {
c.remote.Flush()
}
}
} }
type notifyEnv struct { type notifyEnv struct {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment