good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit cde0a5d6 authored by a's avatar a
Browse files

tests

parent 76c77c44
Branches
Tags
1 merge request!20mod
Pipeline #29766 failed
......@@ -25,6 +25,9 @@ func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) {
continue
}
cd := NewCodec(req, fn)
go s.Server.ServeCodec(ctx, cd)
go func() {
s.Server.ServeCodec(ctx, cd)
cd.Close()
}()
}
}
package http
import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
......@@ -26,7 +25,7 @@ type Codec struct {
r *http.Request
w http.ResponseWriter
wr *bufio.Writer
wr io.Writer
msgs chan *serverutil.Bundle
errCh chan httpError
......@@ -45,17 +44,12 @@ func NewCodec(w http.ResponseWriter, r *http.Request) *Codec {
}
func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) {
ir := io.Writer(w)
c.wr = w
if w == nil {
ir = io.Discard
c.wr = io.Discard
}
c.r = r
c.w = w
if c.wr == nil {
c.wr = bufio.NewWriter(ir)
} else {
c.wr.Reset(ir)
}
c.msgs = make(chan *serverutil.Bundle, 1)
c.errCh = make(chan httpError, 1)
......@@ -210,7 +204,7 @@ func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
case ans := <-c.msgs:
return ans.Messages, ans.Batch, nil
case err := <-c.errCh:
http.Error(c.w, err.err.Error(), err.code)
// http.Error(c.w, err.err.Error(), err.code)
return nil, false, err.err
case <-ctx.Done():
return nil, false, ctx.Err()
......@@ -226,11 +220,12 @@ func (c *Codec) Close() error {
}
func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) {
defer c.cn()
_, err = c.wr.Write(msg)
if err != nil {
return err
}
return c.wr.Flush()
return nil
}
// Closed returns a channel which is closed when the connection is closed.
......
......@@ -77,7 +77,7 @@ func (c *Codec) Close() error {
func (c *Codec) Send(ctx context.Context, buf json.RawMessage) error {
c.wrLock.Lock()
defer c.wrLock.Unlock()
_, err := c.w.Write(buf)
_, err := c.w.Write(append(buf, '\n'))
return err
}
......
......@@ -159,6 +159,9 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) err
case <-ctx.Done():
return nil
case err := <-errch:
// perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc.
// the connection is already dead, so at this point there are no rules, so this is okay to do i think
remote.Send(context.Background(), nil)
return err
}
}
......@@ -237,7 +240,7 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
}
}
if allSkip {
return nil
return c.remote.Send(ctx, nil)
}
}
// create the streaming encoder
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment