diff --git a/lib/gat/gatcaddyfile/pool.go b/lib/gat/gatcaddyfile/pool.go index 5cdebddae04b6c269258afd196c2a64d1d31ede5..4babf8ce34e71d46c2f5ec53a98c6c83cd8bb9ff 100644 --- a/lib/gat/gatcaddyfile/pool.go +++ b/lib/gat/gatcaddyfile/pool.go @@ -12,6 +12,7 @@ import ( "gfx.cafe/gfx/pggat/lib/util/strutil" ) +const defaultClientAcquireTimeout = caddy.Duration(time.Minute) const defaultServerIdleTimeout = caddy.Duration(5 * time.Minute) const defaultServerReconnectInitialTime = caddy.Duration(5 * time.Second) const defaultServerReconnectMaxTime = caddy.Duration(1 * time.Minute) @@ -29,6 +30,7 @@ func defaultTrackedParameters() []strutil.CIString { } func defaultPoolConfig(base basic.Config) basic.Config { + base.ClientAcquireTimeout = defaultClientAcquireTimeout base.ServerIdleTimeout = defaultServerIdleTimeout base.ServerReconnectInitialTime = defaultServerReconnectInitialTime base.ServerReconnectMaxTime = defaultServerReconnectMaxTime @@ -187,6 +189,7 @@ func init() { RegisterDirective(Pool, "hybrid", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) { module := hybrid.Factory{ Config: hybrid.Config{ + ClientAcquireTimeout: defaultClientAcquireTimeout, ServerIdleTimeout: defaultServerIdleTimeout, ServerReconnectInitialTime: defaultServerReconnectInitialTime, ServerReconnectMaxTime: defaultServerReconnectMaxTime, diff --git a/lib/gat/handlers/pool/errors.go b/lib/gat/handlers/pool/errors.go index 409df5fa637e5b8639e4d8dd65ce3587de65af0e..0d909c208889accc643e89df51d5366c466c64af 100644 --- a/lib/gat/handlers/pool/errors.go +++ b/lib/gat/handlers/pool/errors.go @@ -3,5 +3,5 @@ package pool import "errors" var ( - ErrClosed = errors.New("pools closed") + ErrFailedToAcquirePeer = errors.New("failed to acquire peer (try increasing client_acquire_timeout?)") ) diff --git a/lib/gat/handlers/pool/pools/basic/config.go b/lib/gat/handlers/pool/pools/basic/config.go index 0af680c37e5322f2ede3bb17a0739322b09f6653..6eb560721bec93c72533494505a431f2f9b3aaea 100644 --- a/lib/gat/handlers/pool/pools/basic/config.go +++ b/lib/gat/handlers/pool/pools/basic/config.go @@ -48,6 +48,9 @@ type Config struct { ServerResetQuery string `json:"server_reset_query,omitempty"` + // ClientAcquireTimeout defines how long a client may be in AWAITING_SERVER state before it is disconnected + ClientAcquireTimeout caddy.Duration `json:"client_acquire_timeout,omitempty"` + // ServerIdleTimeout defines how long a server may be idle before it is disconnected ServerIdleTimeout caddy.Duration `json:"server_idle_timeout,omitempty"` @@ -74,6 +77,7 @@ func (T Config) Spool() spool.Config { UsePS: T.ParameterStatusSync == ParameterStatusSyncDynamic, UseEQP: T.ExtendedQuerySync, ResetQuery: T.ServerResetQuery, + AcquireTimeout: time.Duration(T.ClientAcquireTimeout), IdleTimeout: time.Duration(T.ServerIdleTimeout), ReconnectInitialTime: time.Duration(T.ServerReconnectInitialTime), ReconnectMaxTime: time.Duration(T.ServerReconnectMaxTime), diff --git a/lib/gat/handlers/pool/pools/basic/pool.go b/lib/gat/handlers/pool/pools/basic/pool.go index d3023407259a83876a0a78c6f0a7c11fc2d4fd8c..bfbb6c10f34b88fa9f7b607605ccb7ca4deece13 100644 --- a/lib/gat/handlers/pool/pools/basic/pool.go +++ b/lib/gat/handlers/pool/pools/basic/pool.go @@ -196,7 +196,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { server = T.servers.Acquire(client.ID) if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) @@ -234,7 +234,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { server = T.servers.Acquire(client.ID) if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) diff --git a/lib/gat/handlers/pool/pools/hybrid/config.go b/lib/gat/handlers/pool/pools/hybrid/config.go index 0b53ce2f2fe80204274a47f49c4a724dd87599d5..c9c53cb84532c460203dfcc6962068e059af513e 100644 --- a/lib/gat/handlers/pool/pools/hybrid/config.go +++ b/lib/gat/handlers/pool/pools/hybrid/config.go @@ -14,6 +14,8 @@ import ( ) type Config struct { + ClientAcquireTimeout caddy.Duration `json:"client_acquire_timeout,omitempty"` + ServerIdleTimeout caddy.Duration `json:"server_idle_timeout,omitempty"` ServerReconnectInitialTime caddy.Duration `json:"server_reconnect_initial_time,omitempty"` @@ -32,6 +34,7 @@ func (T Config) Spool() spool.Config { PoolerFactory: new(rob.Factory), UsePS: true, UseEQP: true, + AcquireTimeout: time.Duration(T.ClientAcquireTimeout), IdleTimeout: time.Duration(T.ServerIdleTimeout), ReconnectInitialTime: time.Duration(T.ServerReconnectInitialTime), ReconnectMaxTime: time.Duration(T.ServerReconnectMaxTime), diff --git a/lib/gat/handlers/pool/pools/hybrid/pool.go b/lib/gat/handlers/pool/pools/hybrid/pool.go index 3e28d8643d2f986e8a488c13a15f57f4a7994f9e..705a48f286a1b17bcbeaf612ff36088a86cd8b03 100644 --- a/lib/gat/handlers/pool/pools/hybrid/pool.go +++ b/lib/gat/handlers/pool/pools/hybrid/pool.go @@ -166,7 +166,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { if !T.replica.Empty() { replica = T.replica.Acquire(client.ID) if replica == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, replica) @@ -181,7 +181,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { primary = T.primary.Acquire(client.ID) if primary == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, primary) @@ -224,7 +224,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { if !T.replica.Empty() { replica = T.replica.Acquire(client.ID) if replica == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, replica) @@ -258,7 +258,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { // acquire primary primary = T.primary.Acquire(client.ID) if primary == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } serverErr = T.PairPrimary(client, psi, eqpi, primary) @@ -286,7 +286,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error { // acquire primary primary = T.primary.Acquire(client.ID) if primary == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, primary) @@ -361,7 +361,7 @@ func (T *Pool) serveOnly(conn *fed.Conn, write bool) error { server = T.replica.Acquire(client.ID) } if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) @@ -405,7 +405,7 @@ func (T *Pool) serveOnly(conn *fed.Conn, write bool) error { server = T.replica.Acquire(client.ID) } if server == nil { - return pool.ErrClosed + return pool.ErrFailedToAcquirePeer } err, serverErr = T.Pair(client, server) diff --git a/lib/gat/handlers/pool/spool/config.go b/lib/gat/handlers/pool/spool/config.go index 9ff5f22350475f92b65e9dafc2bf80846e17c813..ec58e1c6da3a6bc2a758ad4ba4187469e26c2502 100644 --- a/lib/gat/handlers/pool/spool/config.go +++ b/lib/gat/handlers/pool/spool/config.go @@ -18,6 +18,8 @@ type Config struct { ResetQuery string + AcquireTimeout time.Duration + IdleTimeout time.Duration ReconnectInitialTime time.Duration diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go index 54f7d4742b6dec0ac69b9834417fac58769f14b1..0487fca34376013928c0385a9ba9ecdc0c801ffd 100644 --- a/lib/gat/handlers/pool/spool/pool.go +++ b/lib/gat/handlers/pool/spool/pool.go @@ -252,7 +252,7 @@ func (T *Pool) RemoveClient(client uuid.UUID) { func (T *Pool) Acquire(client uuid.UUID) *Server { for { - serverID := T.pooler.Acquire(client, 0) // TODO(garet) timeout in config + serverID := T.pooler.Acquire(client, T.config.AcquireTimeout) if serverID == uuid.Nil { return nil }