diff --git a/.gitignore b/.gitignore index c5f700c6b97a5a6e7a5e0942a4afefd26d01ba4d..7b69cd4dede7547aba08e593214fbf8ee2e30115 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ .bin .log .db -.exe +*.exe .tmp diff --git a/lib/gat/gatling/client/client.go b/lib/gat/gatling/client/client.go index 4eb65d83529f3f7e3e1b0448444cb8a5bee632a6..b0f1243064e76018f49d2803a5a804e8a61aaa44 100644 --- a/lib/gat/gatling/client/client.go +++ b/lib/gat/gatling/client/client.go @@ -30,8 +30,7 @@ type Client struct { r *bufio.Reader wr io.Writer - bufwr *bufio.Writer - recv chan protocol.Packet + recv chan protocol.Packet buf bytes.Buffer @@ -78,7 +77,6 @@ func NewClient( conn: conn, r: bufio.NewReader(conn), wr: conn, - bufwr: bufio.NewWriter(conn), recv: make(chan protocol.Packet), addr: conn.RemoteAddr(), pid: int32(pid.Int64()), @@ -154,7 +152,6 @@ func (c *Client) Accept(ctx context.Context) error { c.conn = tls.Server(c.conn, cfg) c.r = bufio.NewReader(c.conn) c.wr = c.conn - c.bufwr.Reset(c.wr) err = startup.Read(c.r) if err != nil { return err @@ -496,12 +493,8 @@ func (c *Client) GetPortal(name string) *protocol.Bind { func (c *Client) Send(pkt protocol.Packet) error { //log.Printf("sent packet(%s) %+v", reflect.TypeOf(pkt), pkt) - _, err := pkt.Write(c.bufwr) - if err != nil { - c.bufwr.Reset(c.wr) - return err - } - return c.bufwr.Flush() + _, err := pkt.Write(c.wr) + return err } func (c *Client) Recv() <-chan protocol.Packet { diff --git a/lib/gat/gatling/conn_pool/server/server.go b/lib/gat/gatling/conn_pool/server/server.go index 89ee97f14cb33c35917723b49129fcb91282b090..c30630878f5c905550972adc0af226a48e4449fb 100644 --- a/lib/gat/gatling/conn_pool/server/server.go +++ b/lib/gat/gatling/conn_pool/server/server.go @@ -29,7 +29,6 @@ type Server struct { conn net.Conn r *bufio.Reader wr io.Writer - bufwr *bufio.Writer server_info []*protocol.ParameterStatus @@ -73,10 +72,13 @@ func Dial(ctx context.Context, if err != nil { return nil, err } + err = s.conn.(*net.TCPConn).SetNoDelay(false) + if err != nil { + return nil, err + } s.remote = s.conn.RemoteAddr() s.r = bufio.NewReader(s.conn) s.wr = s.conn - s.bufwr = bufio.NewWriter(s.wr) s.user = *user s.db = db @@ -237,19 +239,9 @@ func (s *Server) forwardTo(client gat.Client, predicate func(pkt protocol.Packet } } -func (s *Server) writeNoFlush(pkt protocol.Packet) error { - //log.Printf("send backend packet(%s) %+v", reflect.TypeOf(pkt), pkt) - _, err := pkt.Write(s.bufwr) - return err -} - func (s *Server) writePacket(pkt protocol.Packet) error { - err := s.writeNoFlush(pkt) - if err != nil { - s.bufwr.Reset(s.wr) - return err - } - return s.bufwr.Flush() + _, err := pkt.Write(s.wr) + return err } func (s *Server) readPacket() (protocol.Packet, error) { @@ -281,9 +273,7 @@ func (s *Server) ensurePreparedStatement(client gat.Client, name string) error { s.bound_prepared_statments[name] = stmt // send prepared statement to server - _ = s.writeNoFlush(stmt) - - return nil + return s.writePacket(stmt) } func (s *Server) ensurePortal(client gat.Client, name string) error { @@ -310,9 +300,7 @@ func (s *Server) ensurePortal(client gat.Client, name string) error { } s.bound_portals[name] = portal - _ = s.writeNoFlush(portal) - - return nil + return s.writePacket(portal) } func (s *Server) destructPreparedStatement(name string) { @@ -362,8 +350,11 @@ func (s *Server) Describe(client gat.Client, d *protocol.Describe) error { } // now we actually execute the thing the client wants - _ = s.writeNoFlush(d) - err := s.writePacket(new(protocol.Sync)) + err := s.writePacket(d) + if err != nil { + return err + } + err = s.writePacket(new(protocol.Sync)) if err != nil { return err } @@ -387,7 +378,10 @@ func (s *Server) Execute(client gat.Client, e *protocol.Execute) error { return err } - _ = s.writeNoFlush(e) + err = s.writePacket(e) + if err != nil { + return err + } err = s.writePacket(new(protocol.Sync)) if err != nil { return err diff --git a/lib/gat/gatling/gatling.go b/lib/gat/gatling/gatling.go index 1f6358a462cff50ce7cb651bb1fbf98d8232f768..d969933937ade0a80d7e744979ab38cca28feca3 100644 --- a/lib/gat/gatling/gatling.go +++ b/lib/gat/gatling/gatling.go @@ -123,9 +123,10 @@ func (g *Gatling) ListenAndServe(ctx context.Context) error { return err } go func() { - err := g.handleConnection(ctx, c) + err = g.handleConnection(ctx, c) if err != nil { log.Println("disconnected:", err) + return } }() } @@ -133,6 +134,11 @@ func (g *Gatling) ListenAndServe(ctx context.Context) error { // TODO: TLS func (g *Gatling) handleConnection(ctx context.Context, c net.Conn) error { + err := c.(*net.TCPConn).SetNoDelay(false) + if err != nil { + return err + } + cl := client.NewClient(g, g.c, c, false) func() { @@ -146,7 +152,7 @@ func (g *Gatling) handleConnection(ctx context.Context, c net.Conn) error { delete(g.clients, cl.Id()) }() - err := cl.Accept(ctx) + err = cl.Accept(ctx) if err != nil { log.Println("err in connection:", err.Error()) _ = cl.Send(pg_error.IntoPacket(err))