From dc450c04069e6ca312e94fd98bd848cd276c5a67 Mon Sep 17 00:00:00 2001
From: a <a@a.a>
Date: Fri, 30 Sep 2022 20:51:52 -0500
Subject: [PATCH] lol

---
 config_data.yml                               |  2 +-
 go.mod                                        |  3 +-
 go.sum                                        |  6 ++
 lib/gat/database/query_router/query_router.go |  7 +-
 lib/gat/pool/session/pool.go                  |  5 +-
 lib/gat/pool/transaction/pool.go              | 34 ++++++----
 lib/gat/pool/transaction/worker.go            | 68 +++++++++----------
 lib/gat/pool/transaction/worker_pool.go       | 34 ++++++++++
 test/docker-compose.yml                       |  2 -
 9 files changed, 100 insertions(+), 61 deletions(-)
 create mode 100644 lib/gat/pool/transaction/worker_pool.go

diff --git a/config_data.yml b/config_data.yml
index 7047c51f..c1d8134c 100644
--- a/config_data.yml
+++ b/config_data.yml
@@ -33,7 +33,7 @@ pools:
             port: 5432
             role: primary
             username: ENV$PSQL_DB_USER_RW
-            password: ENV$PSQL_DB_PASS_RW
+            password: postgres
           - host: ENV$PSQL_REP_DB_HOST
             port: 5432
             role: replica
diff --git a/go.mod b/go.mod
index 71544486..46a4b5a6 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@ require (
 	gfx.cafe/ghalliday1/pg3p v0.0.19
 	gfx.cafe/ghalliday1/pgparser v0.0.9
 	gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf
-	gfx.cafe/util/go/generic v0.0.0-20220917152604-80373e5a2c51
+	gfx.cafe/util/go/generic v0.0.0-20221001013022-0560a0526470
 	gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9
 	git.tuxpa.in/a/zlog v1.32.0
 	github.com/BurntSushi/toml v1.2.0
@@ -38,6 +38,7 @@ require (
 	github.com/lib/pq v1.9.0 // indirect
 	github.com/mattn/go-colorable v0.1.12 // indirect
 	github.com/mattn/go-isatty v0.0.14 // indirect
+	github.com/panjf2000/ants v1.3.0 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/sirupsen/logrus v1.6.0 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
diff --git a/go.sum b/go.sum
index f78b0455..7b60079b 100644
--- a/go.sum
+++ b/go.sum
@@ -8,6 +8,10 @@ gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf h1:ya4IK1D+Kq0DrFdrr
 gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf/go.mod h1:+DiyiCOBGS9O9Ce4ewHQO3Y59h66WSWAbgZZ2O2AYYw=
 gfx.cafe/util/go/generic v0.0.0-20220917152604-80373e5a2c51 h1:4FEqqQR2Ruae4quQQn7lqLtLvQaH+dHmehpPHhiXm+Q=
 gfx.cafe/util/go/generic v0.0.0-20220917152604-80373e5a2c51/go.mod h1:0mgIMiX8+M1l4VHMaOqBaydZaztbFgblQAaEDOZpUhQ=
+gfx.cafe/util/go/generic v0.0.0-20221001012907-2fe841bb39cc h1:M3dtZ5NDUG5LOpB3s0sFnCK9mgNvau0nRacV2h/ui54=
+gfx.cafe/util/go/generic v0.0.0-20221001012907-2fe841bb39cc/go.mod h1:0mgIMiX8+M1l4VHMaOqBaydZaztbFgblQAaEDOZpUhQ=
+gfx.cafe/util/go/generic v0.0.0-20221001013022-0560a0526470 h1:PwdR5V3IO06cdDGfmYVa9AYIPmRbLIweKJqZt2hIWuo=
+gfx.cafe/util/go/generic v0.0.0-20221001013022-0560a0526470/go.mod h1:0mgIMiX8+M1l4VHMaOqBaydZaztbFgblQAaEDOZpUhQ=
 gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9 h1:Z0RRPRbz5Bt7yu6v+RzLCAKm0YNcrwNGLMs9UVs8NsU=
 gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9/go.mod h1:sO44FAgBZXic9FwJaJHX1mI9vt1e3CRe0X/3bwnMRho=
 git.tuxpa.in/a/zlog v1.32.0 h1:KKXbRF1x8kJDSzUoGz/pivo+4TVY6xT5sVtdFZ6traY=
@@ -196,6 +200,8 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108
 github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0=
 github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
+github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
 github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
diff --git a/lib/gat/database/query_router/query_router.go b/lib/gat/database/query_router/query_router.go
index 894255f0..d1b298b2 100644
--- a/lib/gat/database/query_router/query_router.go
+++ b/lib/gat/database/query_router/query_router.go
@@ -2,15 +2,16 @@ package query_router
 
 import (
 	"errors"
+	"strconv"
+	"unicode"
+	"unicode/utf8"
+
 	"gfx.cafe/gfx/pggat/lib/config"
 	"gfx.cafe/gfx/pggat/lib/gat"
 	"gfx.cafe/gfx/pggat/lib/util/cmux"
 	"gfx.cafe/ghalliday1/pg3p"
 	"gfx.cafe/ghalliday1/pg3p/lex"
 	"gfx.cafe/util/go/generic"
-	"strconv"
-	"unicode"
-	"unicode/utf8"
 )
 
 type QueryRouter struct {
diff --git a/lib/gat/pool/session/pool.go b/lib/gat/pool/session/pool.go
index f58a256b..b5fcc587 100644
--- a/lib/gat/pool/session/pool.go
+++ b/lib/gat/pool/session/pool.go
@@ -2,12 +2,13 @@ package session
 
 import (
 	"context"
+	"runtime"
+	"sync/atomic"
+
 	"gfx.cafe/gfx/pggat/lib/config"
 	"gfx.cafe/gfx/pggat/lib/gat"
 	"gfx.cafe/gfx/pggat/lib/gat/protocol"
 	"gfx.cafe/util/go/generic"
-	"runtime"
-	"sync/atomic"
 )
 
 type Pool struct {
diff --git a/lib/gat/pool/transaction/pool.go b/lib/gat/pool/transaction/pool.go
index f3fe5776..47ae5fea 100644
--- a/lib/gat/pool/transaction/pool.go
+++ b/lib/gat/pool/transaction/pool.go
@@ -2,12 +2,12 @@ package transaction
 
 import (
 	"context"
+	"sync/atomic"
+	"time"
+
 	"gfx.cafe/gfx/pggat/lib/config"
 	"gfx.cafe/gfx/pggat/lib/gat"
 	"gfx.cafe/gfx/pggat/lib/gat/protocol"
-	"runtime"
-	"sync/atomic"
-	"time"
 )
 
 type Pool struct {
@@ -19,11 +19,7 @@ type Pool struct {
 
 	dialer gat.Dialer
 
-	// see: https://github.com/golang/go/blob/master/src/runtime/chan.go#L33
-	// channels are a thread safe ring buffer implemented via a linked list of goroutines.
-	// the idea is that goroutines are cheap, and we can afford to have one per pending request.
-	// there is no real reason to implement a complicated worker database pattern when well, if we're okay with having a 2-4kb overhead per request, then this is fine. trading space for code complexity
-	workerPool chan *worker
+	workerPool WorkerPool[*Worker]
 }
 
 func New(database gat.Database, dialer gat.Dialer, conf *config.Pool, user *config.User) *Pool {
@@ -31,37 +27,45 @@ func New(database gat.Database, dialer gat.Dialer, conf *config.Pool, user *conf
 		user:       user,
 		database:   database,
 		dialer:     dialer,
-		workerPool: make(chan *worker, 1+runtime.NumCPU()*4),
+		workerPool: NewChannelPool[*Worker](user.PoolSize),
 	}
 	p.EnsureConfig(conf)
 	return p
 }
 
+func (c *Pool) WithWorkerPool(w WorkerPool[*Worker]) {
+	c.workerPool = w
+}
+
 func (c *Pool) GetDatabase() gat.Database {
 	return c.database
 }
 
-func (c *Pool) getWorker() *worker {
+func (c *Pool) getWorker() *Worker {
 	start := time.Now()
 	defer func() {
 		c.database.GetStats().AddWaitTime(time.Now().Sub(start).Microseconds())
 	}()
-	select {
-	case w := <-c.workerPool:
+	w, ok := c.workerPool.TryGet()
+	if ok {
 		return w
-	default:
+	} else {
 		if c.workerCount.Add(1)-1 < int64(c.user.PoolSize) {
-			next := &worker{
+			next := &Worker{
 				w: c,
 			}
 			return next
 		} else {
-			w := <-c.workerPool
+			w := c.workerPool.Get()
 			return w
 		}
 	}
 }
 
+func (c *Pool) returnWorker(w *Worker) {
+	c.workerPool.Put(w)
+}
+
 func (c *Pool) EnsureConfig(conf *config.Pool) {
 	c.c.Store(conf)
 }
diff --git a/lib/gat/pool/transaction/worker.go b/lib/gat/pool/transaction/worker.go
index 617939ee..ec9c2c43 100644
--- a/lib/gat/pool/transaction/worker.go
+++ b/lib/gat/pool/transaction/worker.go
@@ -4,34 +4,28 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"math/rand"
+	"sync"
+	"time"
+
 	"gfx.cafe/gfx/pggat/lib/config"
 	"gfx.cafe/gfx/pggat/lib/gat"
 	"gfx.cafe/gfx/pggat/lib/gat/pool/transaction/shard"
 	"gfx.cafe/gfx/pggat/lib/gat/protocol"
-	"math/rand"
-	"sync"
-	"time"
 )
 
-// a single use worker with an embedded connection database.
+// a single use Worker with an embedded connection database.
 // it wraps a pointer to the connection database.
-type worker struct {
+type Worker struct {
 	// the parent connectino database
-	w   *Pool
-	rev int
+	w *Pool
 
 	shards []*shard.Shard
-
-	mu sync.Mutex
-}
-
-// ret urn worker to database
-func (w *worker) ret() {
-	w.w.workerPool <- w
+	mu     sync.Mutex
 }
 
-// attempt to connect to a new shard with this worker
-func (w *worker) fetchShard(client gat.Client, n int) bool {
+// attempt to connect to a new shard with this Worker
+func (w *Worker) fetchShard(client gat.Client, n int) bool {
 	conf := w.w.c.Load()
 	if n < 0 || n >= len(conf.Shards) {
 		return false
@@ -45,14 +39,14 @@ func (w *worker) fetchShard(client gat.Client, n int) bool {
 	return true
 }
 
-func (w *worker) invalidateShard(n int) {
+func (w *Worker) invalidateShard(n int) {
 	w.mu.Lock()
 	defer w.mu.Unlock()
 
 	w.shards[n] = nil
 }
 
-func (w *worker) chooseShard(client gat.Client) *shard.Shard {
+func (w *Worker) chooseShard(client gat.Client) *shard.Shard {
 	w.mu.Lock()
 	defer w.mu.Unlock()
 
@@ -83,8 +77,8 @@ func (w *worker) chooseShard(client gat.Client) *shard.Shard {
 	return nil
 }
 
-func (w *worker) GetServerInfo(client gat.Client) []*protocol.ParameterStatus {
-	defer w.ret()
+func (w *Worker) GetServerInfo(client gat.Client) []*protocol.ParameterStatus {
+	defer w.w.returnWorker(w)
 
 	s := w.chooseShard(client)
 	if s == nil {
@@ -99,8 +93,8 @@ func (w *worker) GetServerInfo(client gat.Client) []*protocol.ParameterStatus {
 	return primary.GetServerInfo()
 }
 
-func (w *worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.Describe) error {
-	defer w.ret()
+func (w *Worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.Describe) error {
+	defer w.w.returnWorker(w)
 
 	if w.w.user.StatementTimeout != 0 {
 		var done context.CancelFunc
@@ -125,8 +119,8 @@ func (w *worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.D
 	}
 }
 
-func (w *worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Execute) error {
-	defer w.ret()
+func (w *Worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Execute) error {
+	defer w.w.returnWorker(w)
 
 	if w.w.user.StatementTimeout != 0 {
 		var done context.CancelFunc
@@ -151,8 +145,8 @@ func (w *worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Ex
 	}
 }
 
-func (w *worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol.FunctionCall) error {
-	defer w.ret()
+func (w *Worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol.FunctionCall) error {
+	defer w.w.returnWorker(w)
 
 	if w.w.user.StatementTimeout != 0 {
 		var done context.CancelFunc
@@ -177,8 +171,8 @@ func (w *worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol.
 	}
 }
 
-func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query string) error {
-	defer w.ret()
+func (w *Worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query string) error {
+	defer w.w.returnWorker(w)
 
 	if w.w.user.StatementTimeout != 0 {
 		var done context.CancelFunc
@@ -209,8 +203,8 @@ func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query stri
 	}
 }
 
-func (w *worker) HandleTransaction(ctx context.Context, c gat.Client, query string) error {
-	defer w.ret()
+func (w *Worker) HandleTransaction(ctx context.Context, c gat.Client, query string) error {
+	defer w.w.returnWorker(w)
 
 	if w.w.user.StatementTimeout != 0 {
 		var done context.CancelFunc
@@ -241,17 +235,17 @@ func (w *worker) HandleTransaction(ctx context.Context, c gat.Client, query stri
 	}
 }
 
-func (w *worker) setCurrentBinding(client gat.Client, server gat.Connection) {
+func (w *Worker) setCurrentBinding(client gat.Client, server gat.Connection) {
 	client.SetCurrentConn(server)
 	server.SetClient(client)
 }
 
-func (w *worker) unsetCurrentBinding(client gat.Client, server gat.Connection) {
+func (w *Worker) unsetCurrentBinding(client gat.Client, server gat.Connection) {
 	client.SetCurrentConn(nil)
 	server.SetClient(nil)
 }
 
-func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, payload *protocol.Describe) error {
+func (w *Worker) z_actually_do_describe(ctx context.Context, client gat.Client, payload *protocol.Describe) error {
 	srv := w.chooseShard(client)
 	if srv == nil {
 		return fmt.Errorf("describe('%+v') fail: no server", payload)
@@ -269,7 +263,7 @@ func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client,
 	defer w.unsetCurrentBinding(client, target)
 	return target.Describe(ctx, client, payload)
 }
-func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, payload *protocol.Execute) error {
+func (w *Worker) z_actually_do_execute(ctx context.Context, client gat.Client, payload *protocol.Execute) error {
 	srv := w.chooseShard(client)
 	if srv == nil {
 		return fmt.Errorf("describe('%+v') fail: no server", payload)
@@ -286,7 +280,7 @@ func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, p
 	}
 	return target.Execute(ctx, client, payload)
 }
-func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error {
+func (w *Worker) z_actually_do_fn(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error {
 	srv := w.chooseShard(client)
 	if srv == nil {
 		return fmt.Errorf("fn('%+v') fail: no server", payload)
@@ -307,7 +301,7 @@ func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payloa
 	}
 	return nil
 }
-func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Client, payload string) error {
+func (w *Worker) z_actually_do_simple_query(ctx context.Context, client gat.Client, payload string) error {
 	// chose a server
 	srv := w.chooseShard(client)
 	if srv == nil {
@@ -335,7 +329,7 @@ func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Clie
 	}
 	return nil
 }
-func (w *worker) z_actually_do_transaction(ctx context.Context, client gat.Client, payload string) error {
+func (w *Worker) z_actually_do_transaction(ctx context.Context, client gat.Client, payload string) error {
 	// chose a server
 	srv := w.chooseShard(client)
 	if srv == nil {
diff --git a/lib/gat/pool/transaction/worker_pool.go b/lib/gat/pool/transaction/worker_pool.go
new file mode 100644
index 00000000..19a026ae
--- /dev/null
+++ b/lib/gat/pool/transaction/worker_pool.go
@@ -0,0 +1,34 @@
+package transaction
+
+type WorkerPool[T any] interface {
+	TryGet() (T, bool)
+	Get() T
+	Put(T)
+}
+
+type ChannelPool[T any] struct {
+	ch chan T
+}
+
+func NewChannelPool[T any](size int) *ChannelPool[T] {
+	return &ChannelPool[T]{
+		ch: make(chan T, size*10),
+	}
+}
+
+func (c *ChannelPool[T]) Get() T {
+	return <-c.ch
+}
+
+func (c *ChannelPool[T]) TryGet() (T, bool) {
+	select {
+	case out := <-c.ch:
+		return out, true
+	default:
+		return *new(T), false
+	}
+}
+
+func (c *ChannelPool[T]) Put(t T) {
+	c.ch <- t
+}
diff --git a/test/docker-compose.yml b/test/docker-compose.yml
index 3b8a6e23..a9597da1 100644
--- a/test/docker-compose.yml
+++ b/test/docker-compose.yml
@@ -6,8 +6,6 @@ services:
     restart: always
     environment:
       POSTGRES_PASSWORD: example
-    ports:
-      - 5432:5432
   adminer:
     image: adminer
     restart: always
-- 
GitLab