From efc5f2bad30f7de5fd77a53f27ecf416901cea2d Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Thu, 5 Oct 2023 09:04:08 -0500 Subject: [PATCH] add flush --- pkg/server/server.go | 57 ++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 6b83d31..123651c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -39,12 +39,10 @@ func NewServer(r codec.Handler) *Server { return server } -func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, responder *callResponder) error { - incoming, batch, err := remote.ReadBatch(ctx) - if err != nil { - remote.Flush() - return err - } +func (s *Server) serveBatch(ctx context.Context, + incoming []*codec.Message, + batch bool, + remote codec.ReaderWriter, responder *callResponder) error { env := &callEnv{ batch: batch, } @@ -119,10 +117,9 @@ func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, respo // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes // the response back using the given codec. It will block until the codec is closed or the -// server is stopped. In either case the codec is closed. +// server is stopped. In either case the codec is closed when this function returns. func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) error { defer remote.Close() - // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { return fmt.Errorf("Server stopped") @@ -130,24 +127,42 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) err // Add the codec to the set so it can be closed by Stop. s.codecs.Add(remote) defer s.codecs.Remove(remote) - responder := &callResponder{ remote: remote, } - - ctx, cn := context.WithCancel(pctx) + // add a cancel to the context so we can cancel all the child tasks on return + ctx, cn := context.WithCancel(ContextWithPeerInfo(pctx, remote.PeerInfo())) defer cn() - ctx = ContextWithPeerInfo(ctx, remote.PeerInfo()) - for { - select { - case <-ctx.Done(): - remote.Close() - default: - } - err := s.codecLoop(ctx, remote, responder) - if err != nil { - return err + + errch := make(chan error) + go func() { + for { + // read messages from the stream synchronously + incoming, batch, err := remote.ReadBatch(ctx) + if err != nil { + errch <- err + return + } + // process each in a goroutine + go func() { + // the only reason this should error is if + err = s.serveBatch(ctx, incoming, batch, remote, responder) + if err != nil { + errch <- err + return + } + }() } + }() + // exit on either the first error, or the context closing. + select { + case <-ctx.Done(): + return nil + case err := <-errch: + // perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc. + // the connection is already dead, so at this point there are no rules, so this is okay to do i think + remote.Flush() + return err } } -- GitLab