From e79959c92c1137bbd196a310b430d66b535942b1 Mon Sep 17 00:00:00 2001
From: Garet Halliday <ghalliday@gfxlabs.io>
Date: Wed, 7 Sep 2022 14:10:55 -0500
Subject: [PATCH] worker threads

---
 lib/gat/gatling/conn_pool/conn_pool.go | 45 +++++++++++++++++++-------
 1 file changed, 34 insertions(+), 11 deletions(-)

diff --git a/lib/gat/gatling/conn_pool/conn_pool.go b/lib/gat/gatling/conn_pool/conn_pool.go
index b6a9c437..97291d63 100644
--- a/lib/gat/gatling/conn_pool/conn_pool.go
+++ b/lib/gat/gatling/conn_pool/conn_pool.go
@@ -8,21 +8,36 @@ import (
 	"gfx.cafe/gfx/pggat/lib/gat/gatling/server"
 	"gfx.cafe/gfx/pggat/lib/gat/protocol"
 	"log"
+	"sync"
 )
 
+type query struct {
+	query string
+	rep   chan<- protocol.Packet
+}
+
 type ConnectionPool struct {
 	c       *config.Pool
 	user    *config.User
 	pool    gat.Pool
 	servers []*server.Server
+	queries chan query
+
+	sync.Mutex
 }
 
+const workerCount = 2
+
 func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *ConnectionPool {
 	p := &ConnectionPool{
-		user: user,
-		pool: pool,
+		user:    user,
+		pool:    pool,
+		queries: make(chan query),
 	}
 	p.EnsureConfig(conf)
+	for i := 0; i < workerCount; i++ {
+		go p.worker()
+	}
 	return p
 }
 
@@ -40,6 +55,18 @@ func (c *ConnectionPool) EnsureConfig(conf *config.Pool) {
 	}
 }
 
+func (c *ConnectionPool) worker() {
+	for {
+		q := <-c.queries
+		// TODO ideally this would choose the server based on load, capabilities, etc
+		err := c.servers[0].Query(q.query, q.rep)
+		if err != nil {
+			log.Println(err)
+		}
+		close(q.rep)
+	}
+}
+
 func (c *ConnectionPool) GetUser() *config.User {
 	return c.user
 }
@@ -51,17 +78,13 @@ func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus {
 	return nil
 }
 
-func (c *ConnectionPool) Query(ctx context.Context, query string) (<-chan protocol.Packet, error) {
+func (c *ConnectionPool) Query(ctx context.Context, q string) (<-chan protocol.Packet, error) {
 	rep := make(chan protocol.Packet)
 
-	// TODO ideally, this would look at loads, capabilities, etc and choose the server accordingly
-	go func() {
-		err := c.servers[0].Query(query, rep)
-		if err != nil {
-			log.Println(err)
-		}
-		close(rep)
-	}()
+	c.queries <- query{
+		query: q,
+		rep:   rep,
+	}
 
 	return rep, nil
 }
-- 
GitLab