diff --git a/lib/gsql/client.go b/lib/gsql/client.go index 07df6795345361bcbaa5db6b9823253a13ba1213..ff9a15e6f35f254d02463ee72d2a2da136359127 100644 --- a/lib/gsql/client.go +++ b/lib/gsql/client.go @@ -23,8 +23,8 @@ type Client struct { closed bool mu sync.Mutex - readQueue chan struct{} - writeQueue chan struct{} + readC *sync.Cond + writeC *sync.Cond } func (T *Client) Do(result ResultWriter, packets ...fed.Packet) { @@ -36,14 +36,8 @@ func (T *Client) Do(result ResultWriter, packets ...fed.Packet) { packets: packets, }) - if T.readQueue != nil { - for { - select { - case T.readQueue <- struct{}{}: - default: - return - } - } + if T.readC != nil { + T.readC.Broadcast() } } @@ -66,13 +60,8 @@ func (T *Client) ReadPacket(typed bool) (fed.Packet, error) { T.read.PushBack(packet) } T.write = b.result - outer: - for { - select { - case T.writeQueue <- struct{}{}: - default: - break outer - } + if T.writeC != nil { + T.writeC.Broadcast() } continue } @@ -81,17 +70,10 @@ func (T *Client) ReadPacket(typed bool) (fed.Packet, error) { return nil, io.EOF } - func() { - if T.readQueue == nil { - T.readQueue = make(chan struct{}) - } - q := T.readQueue - - T.mu.Unlock() - defer T.mu.Lock() - - <-q - }() + if T.readC == nil { + T.readC = sync.NewCond(&T.mu) + } + T.readC.Wait() } if (p.Type() == 0 && typed) || (p.Type() != 0 && !typed) { @@ -110,17 +92,10 @@ func (T *Client) WritePacket(packet fed.Packet) error { return io.EOF } - func() { - if T.writeQueue == nil { - T.writeQueue = make(chan struct{}) - } - q := T.writeQueue - - T.mu.Unlock() - defer T.mu.Lock() - - <-q - }() + if T.writeC == nil { + T.writeC = sync.NewCond(&T.mu) + } + T.writeC.Wait() } if err := T.write.WritePacket(packet); err != nil { @@ -140,11 +115,11 @@ func (T *Client) Close() error { T.closed = true - if T.writeQueue != nil { - close(T.writeQueue) + if T.writeC != nil { + T.writeC.Broadcast() } - if T.readQueue != nil { - close(T.readQueue) + if T.readC != nil { + T.readC.Broadcast() } return nil }