From ee6692451630e9cf12424cebe694c3b600d25f7a Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Tue, 21 May 2024 17:14:41 -0500 Subject: [PATCH] cool --- lib/gat/gatcaddyfile/pool.go | 3 +++ lib/gat/handlers/pool/errors.go | 2 +- lib/gat/handlers/pool/pools/basic/config.go | 4 ++++ lib/gat/handlers/pool/pools/basic/pool.go | 4 ++-- lib/gat/handlers/pool/pools/hybrid/config.go | 3 +++ lib/gat/handlers/pool/pools/hybrid/pool.go | 14 +++++++------- lib/gat/handlers/pool/spool/config.go | 2 ++ lib/gat/handlers/pool/spool/pool.go | 2 +- 8 files changed, 23 insertions(+), 11 deletions(-) diff --git a/lib/gat/gatcaddyfile/pool.go b/lib/gat/gatcaddyfile/pool.go index 5cdebdda..4babf8ce 100644 --- a/lib/gat/gatcaddyfile/pool.go +++ b/lib/gat/gatcaddyfile/pool.go @@ -12,6 +12,7 @@ import ( "gfx.cafe/gfx/pggat/lib/util/strutil" ) +const defaultClientAcquireTimeout = caddy.Duration(time.Minute) const defaultServerIdleTimeout = caddy.Duration(5 * time.Minute) const defaultServerReconnectInitialTime = caddy.Duration(5 * time.Second) const defaultServerReconnectMaxTime = caddy.Duration(1 * time.Minute) @@ -29,6 +30,7 @@ func defaultTrackedParameters() []strutil.CIString { } func defaultPoolConfig(base basic.Config) basic.Config { + base.ClientAcquireTimeout = defaultClientAcquireTimeout base.ServerIdleTimeout = defaultServerIdleTimeout base.ServerReconnectInitialTime = defaultServerReconnectInitialTime base.ServerReconnectMaxTime = defaultServerReconnectMaxTime @@ -187,6 +189,7 @@ func init() { RegisterDirective(Pool, "hybrid", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) { module := hybrid.Factory{ Config: hybrid.Config{ + ClientAcquireTimeout: defaultClientAcquireTimeout, ServerIdleTimeout: defaultServerIdleTimeout, ServerReconnectInitialTime: defaultServerReconnectInitialTime, ServerReconnectMaxTime: defaultServerReconnectMaxTime, diff --git a/lib/gat/handlers/pool/errors.go b/lib/gat/handlers/pool/errors.go index 409df5fa..0d909c20 100644 --- a/lib/gat/handlers/pool/errors.go +++ b/lib/gat/handlers/pool/errors.go @@ -3,5 +3,5 @@ package pool import "errors" var ( - ErrClosed = errors.New("pools closed") + ErrFailedToAcquirePeer = errors.New("failed to acquire peer (try increasing client_acquire_timeout?)") ) diff --git a/lib/gat/handlers/pool/pools/basic/config.go b/lib/gat/handlers/pool/pools/basic/config.go index 0af680c3..6eb56072 100644 --- a/lib/gat/handlers/pool/pools/basic/config.go +++ b/lib/gat/handlers/pool/pools/basic/config.go @@ -48,6 +48,9 @@ type Config struct { ServerResetQuery string `json:"server_reset_query,omitempty"` + // ClientAcquireTimeout defines how long a client may be in AWAITING_SERVER state before it is disconnected + ClientAcquireTimeout caddy.Duration `json:"client_acquire_timeout,omitempty"` + // ServerIdleTimeout defines how long a server may be idle before it is disconnected ServerIdleTimeout caddy.Duration `json:"server_idle_timeout,omitempty"` @@ -74,6 +77,7 @@ func (T Config) Spool() spool.Config { UsePS: T.ParameterStatusSync == ParameterStatusSyncDynamic, UseEQP: T.ExtendedQuerySync, ResetQuery: T.ServerResetQuery, + AcquireTimeout: time.Duration(T.ClientAcquireTimeout), IdleTimeout: time.Duration(T.ServerIdleTimeout), ReconnectInitialTime: time.Duration(T.ServerReconnectInitialTime), ReconnectMaxTime: time.Duration(T.ServerReconnectMaxTime), diff --git a/lib/gat/handlers/pool/pools/basic/pool.go b/lib/gat/handlers/pool/pools/basic/pool.go index d3023407..bfbb6c10 100644 --- a/lib/gat/handlers/pool/pools/basic/pool.go +++ b/lib/gat/handlers/pool/pools/basic/pool.go @@ -196,7 +196,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { server = T.servers.Acquire(client.ID) if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) @@ -234,7 +234,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { server = T.servers.Acquire(client.ID) if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) diff --git a/lib/gat/handlers/pool/pools/hybrid/config.go b/lib/gat/handlers/pool/pools/hybrid/config.go index 0b53ce2f..c9c53cb8 100644 --- a/lib/gat/handlers/pool/pools/hybrid/config.go +++ b/lib/gat/handlers/pool/pools/hybrid/config.go @@ -14,6 +14,8 @@ import ( ) type Config struct { + ClientAcquireTimeout caddy.Duration `json:"client_acquire_timeout,omitempty"` + ServerIdleTimeout caddy.Duration `json:"server_idle_timeout,omitempty"` ServerReconnectInitialTime caddy.Duration `json:"server_reconnect_initial_time,omitempty"` @@ -32,6 +34,7 @@ func (T Config) Spool() spool.Config { PoolerFactory: new(rob.Factory), UsePS: true, UseEQP: true, + AcquireTimeout: time.Duration(T.ClientAcquireTimeout), IdleTimeout: time.Duration(T.ServerIdleTimeout), ReconnectInitialTime: time.Duration(T.ServerReconnectInitialTime), ReconnectMaxTime: time.Duration(T.ServerReconnectMaxTime), diff --git a/lib/gat/handlers/pool/pools/hybrid/pool.go b/lib/gat/handlers/pool/pools/hybrid/pool.go index 3e28d864..705a48f2 100644 --- a/lib/gat/handlers/pool/pools/hybrid/pool.go +++ b/lib/gat/handlers/pool/pools/hybrid/pool.go @@ -166,7 +166,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { if !T.replica.Empty() { replica = T.replica.Acquire(client.ID) if replica == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, replica) @@ -181,7 +181,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { primary = T.primary.Acquire(client.ID) if primary == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, primary) @@ -224,7 +224,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { if !T.replica.Empty() { replica = T.replica.Acquire(client.ID) if replica == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, replica) @@ -258,7 +258,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { // acquire primary primary = T.primary.Acquire(client.ID) if primary == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } serverErr = T.PairPrimary(client, psi, eqpi, primary) @@ -286,7 +286,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { // acquire primary primary = T.primary.Acquire(client.ID) if primary == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, primary) @@ -361,7 +361,7 @@ func (T *Pool) serveOnly(conn *fed.Conn, write bool) error { server = T.replica.Acquire(client.ID) } if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) @@ -405,7 +405,7 @@ func (T *Pool) serveOnly(conn *fed.Conn, write bool) error { server = T.replica.Acquire(client.ID) } if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) diff --git a/lib/gat/handlers/pool/spool/config.go b/lib/gat/handlers/pool/spool/config.go index 9ff5f223..ec58e1c6 100644 --- a/lib/gat/handlers/pool/spool/config.go +++ b/lib/gat/handlers/pool/spool/config.go @@ -18,6 +18,8 @@ type Config struct { ResetQuery string + AcquireTimeout time.Duration + IdleTimeout time.Duration ReconnectInitialTime time.Duration diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go index 54f7d474..0487fca3 100644 --- a/lib/gat/handlers/pool/spool/pool.go +++ b/lib/gat/handlers/pool/spool/pool.go @@ -252,7 +252,7 @@ func (T *Pool) RemoveClient(client uuid.UUID) { func (T *Pool) Acquire(client uuid.UUID) *Server { for { - serverID := T.pooler.Acquire(client, 0) // TODO(garet) timeout in config + serverID := T.pooler.Acquire(client, T.config.AcquireTimeout) if serverID == uuid.Nil { return nil } -- GitLab