diff --git a/README.md b/README.md index 0b1c69c5e17ad88f6a8f97c3c2178f7e69d59def..cdfbd30e82f0bae82c549b2ffc7d80a574404f65 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ i'll lyk when its done | **Feature** | **Status** | Gat Status | **Comments** | |--------------------------------|-----------------------------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| | Transaction pooling | :white_check_mark: | :white_check_mark: | Identical to PgBouncer. | -| Session pooling | :white_check_mark: | :white_check_mark: | Identical to PgBouncer. | +| Session pooling | :white_check_mark: | no (do we want?) | Identical to PgBouncer. | | `COPY` support | :white_check_mark: | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. | | Query cancellation | :white_check_mark: | :white_check_mark: | Supported both in transaction and session pooling modes. | | Load balancing of read queries | :white_check_mark: | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | diff --git a/lib/gat/gatling/pool/conn_pool/conn_pool.go b/lib/gat/gatling/pool/conn_pool/conn_pool.go index 02c283504a06ad14015804c2105843f961bf7c6a..f29d4391c3fe197390e01e7a53d94f13a6b9ccb2 100644 --- a/lib/gat/gatling/pool/conn_pool/conn_pool.go +++ b/lib/gat/gatling/pool/conn_pool/conn_pool.go @@ -5,27 +5,23 @@ import ( "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/protocol" - "reflect" "runtime" - "sync" + "sync/atomic" "time" ) type ConnectionPool struct { // the pool connection - c *config.Pool - user *config.User - pool gat.Pool - shards []*config.Shard + c atomic.Pointer[config.Pool] + user *config.User + pool gat.Pool + workerCount atomic.Int64 - workers []*worker // see: https://github.com/golang/go/blob/master/src/runtime/chan.go#L33 // channels are a thread safe ring buffer implemented via a linked list of goroutines. // the idea is that goroutines are cheap, and we can afford to have one per pending request. // there is no real reason to implement a complicated worker pool pattern when well, if we're okay with having a 2-4kb overhead per request, then this is fine. trading space for code complexity workerPool chan *worker - // the lock for config related things - mu sync.RWMutex } func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *ConnectionPool { @@ -51,38 +47,20 @@ func (c *ConnectionPool) getWorker() *worker { case w := <-c.workerPool: return w default: - c.mu.Lock() - if len(c.workers) < c.user.PoolSize { + if c.workerCount.Add(1)-1 < int64(c.user.PoolSize) { next := &worker{ w: c, } - c.workers = append(c.workers, next) - c.mu.Unlock() return next } else { - c.mu.Unlock() - return <-c.workerPool + w := <-c.workerPool + return w } } } func (c *ConnectionPool) EnsureConfig(conf *config.Pool) { - c.mu.Lock() - defer c.mu.Unlock() - c.c = conf - for i, s := range conf.Shards { - for i >= len(c.shards) { - c.shards = append(c.shards, s) - } - sc := s - if !reflect.DeepEqual(c.shards[i], &sc) { - c.shards[i] = sc - // disconnect all conns using shard i, switch to new conf - for _, w := range c.workers { - w.invalidateShard(i) - } - } - } + c.c.Store(conf) } func (c *ConnectionPool) GetUser() *config.User { @@ -93,16 +71,6 @@ func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { return c.getWorker().GetServerInfo() } -func (c *ConnectionPool) GetShards() []gat.Shard { - var shards []gat.Shard - c.mu.RLock() - defer c.mu.RUnlock() - for _, w := range c.workers { - shards = append(shards, w.shards...) - } - return shards -} - func (c *ConnectionPool) Describe(ctx context.Context, client gat.Client, d *protocol.Describe) error { return c.getWorker().HandleDescribe(ctx, client, d) } diff --git a/lib/gat/gatling/pool/conn_pool/shard/shard.go b/lib/gat/gatling/pool/conn_pool/shard/shard.go index e54cad49cb58fcf40be009079b5671474331a9c0..a3bb64ad60fe467efe3c8057c94a3978cbee270b 100644 --- a/lib/gat/gatling/pool/conn_pool/shard/shard.go +++ b/lib/gat/gatling/pool/conn_pool/shard/shard.go @@ -6,31 +6,41 @@ import ( "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/conn_pool/shard/server" "math/rand" - "sync" + "reflect" ) type Shard struct { primary gat.Connection replicas []gat.Connection - mu sync.Mutex + user *config.User + conf *config.Shard } func FromConfig(user *config.User, conf *config.Shard) *Shard { - out := &Shard{} - for _, s := range conf.Servers { - srv, err := server.Dial(context.TODO(), s.Host, s.Port, user, conf.Database, s.Username, s.Password) + out := &Shard{ + user: user, + conf: conf, + } + out.init() + return out +} + +func (s *Shard) init() { + s.primary = nil + s.replicas = nil + for _, serv := range s.conf.Servers { + srv, err := server.Dial(context.TODO(), serv.Host, serv.Port, s.user, s.conf.Database, serv.Username, serv.Password) if err != nil { continue } - switch s.Role { + switch serv.Role { case config.SERVERROLE_PRIMARY: - out.primary = srv + s.primary = srv default: - out.replicas = append(out.replicas, srv) + s.replicas = append(s.replicas, srv) } } - return out } func (s *Shard) Choose(role config.ServerRole) gat.Connection { @@ -56,4 +66,11 @@ func (s *Shard) GetReplicas() []gat.Connection { return s.replicas } +func (s *Shard) EnsureConfig(c *config.Shard) { + if !reflect.DeepEqual(s.conf, c) { + s.conf = c + s.init() + } +} + var _ gat.Shard = (*Shard)(nil) diff --git a/lib/gat/gatling/pool/conn_pool/worker.go b/lib/gat/gatling/pool/conn_pool/worker.go index a1125b98e0ba4aa44f7e50d385871192bad3a88b..8f2a4b64b678f1361b8fd5fc9c9c1722e30d7db6 100644 --- a/lib/gat/gatling/pool/conn_pool/worker.go +++ b/lib/gat/gatling/pool/conn_pool/worker.go @@ -16,7 +16,8 @@ import ( // it wraps a pointer to the connection pool. type worker struct { // the parent connectino pool - w *ConnectionPool + w *ConnectionPool + rev int shards []gat.Shard @@ -30,7 +31,8 @@ func (w *worker) ret() { // attempt to connect to a new shard with this worker func (w *worker) fetchShard(n int) bool { - if n < 0 || n >= len(w.w.shards) { + conf := w.w.c.Load() + if n < 0 || n >= len(conf.Shards) { return false } @@ -38,7 +40,7 @@ func (w *worker) fetchShard(n int) bool { w.shards = append(w.shards, nil) } - w.shards[n] = shard.FromConfig(w.w.user, w.w.shards[n]) + w.shards[n] = shard.FromConfig(w.w.user, conf.Shards[n]) return true } @@ -53,8 +55,11 @@ func (w *worker) anyShard() gat.Shard { w.mu.Lock() defer w.mu.Unlock() - for _, s := range w.shards { + conf := w.w.c.Load() + + for idx, s := range w.shards { if s != nil { + s.EnsureConfig(conf.Shards[idx]) return s } } @@ -90,12 +95,12 @@ func (w *worker) chooseShardTransaction(client gat.Client, payload string) gat.S func (w *worker) GetServerInfo() []*protocol.ParameterStatus { defer w.ret() - shard := w.anyShard() - if shard == nil { + s := w.anyShard() + if s == nil { return nil } - primary := shard.GetPrimary() + primary := s.GetPrimary() if primary == nil { return nil } diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index 6c00bc7524fafd4e753b867c6756d27c8568f26f..e8752ca995b5071321106d139b2ceb60a83aedfb 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -71,8 +71,6 @@ type ConnectionPool interface { GetPool() Pool - GetShards() []Shard - EnsureConfig(c *config.Pool) // extended queries @@ -89,6 +87,8 @@ type Shard interface { GetPrimary() Connection GetReplicas() []Connection Choose(role config.ServerRole) Connection + + EnsureConfig(c *config.Shard) } type ConnectionState string