diff --git a/lib/gat/gatcaddyfile/critic.go b/lib/gat/gatcaddyfile/critic.go index db699383c14ab622e7dd56bf3e58f621b5c7272c..b77589a869cc29ffc9702d08baae3220787248d4 100644 --- a/lib/gat/gatcaddyfile/critic.go +++ b/lib/gat/gatcaddyfile/critic.go @@ -1,6 +1,7 @@ package gatcaddyfile import ( + "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/critics/replication" "time" "github.com/caddyserver/caddy/v2" @@ -11,11 +12,19 @@ import ( ) func init() { + // Register a directive for the latency critic which measures query response + // time as the determining factor for load balancing between replicas RegisterDirective(Critic, "latency", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) { module := &latency.Critic{ Validity: caddy.Duration(5 * time.Minute), } + // parse nested format + if d.NextBlock(d.Nesting()) { + return parseLatency(module, d, warnings) + } + + // parse legacy format if !d.NextArg() { return nil, d.ArgErr() } @@ -38,4 +47,98 @@ func init() { return module, nil }) + + // Register a directive handler for the replication critic which uses + // replication lag as a determining factor for load balancing between replicas + RegisterDirective(Critic, "replication", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) { + module := &replication.Critic{ + Validity: caddy.Duration(2 * time.Minute), + } + + if !d.NextBlock(d.Nesting()) { + return nil, d.ArgErr() + } + + for { + if d.Val() == "}" { + break + } + + directive := d.Val() + switch directive { + case "validity": + if !d.NextArg() { + return nil, d.ArgErr() + } + + validity, err := time.ParseDuration(d.Val()) + if err != nil { + return nil, d.WrapErr(err) + } + + module.Validity = caddy.Duration(validity) + case "threshold": + if !d.NextArg() { + return nil, d.ArgErr() + } + + threshold, err := time.ParseDuration(d.Val()) + if err != nil { + return nil, d.WrapErr(err) + } + + module.Threshold = caddy.Duration(threshold) + default: + return nil, d.ArgErr() + } + + if !d.NextLine() { + return nil, d.EOFErr() + } + } + + return module, nil + }) +} + +func parseLatency(critic *latency.Critic, d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) { + for { + if d.Val() == "}" { + break + } + + directive := d.Val() + switch directive { + case "validity": + if !d.NextArg() { + return nil, d.ArgErr() + } + + validity, err := time.ParseDuration(d.Val()) + if err != nil { + return nil, d.WrapErr(err) + } + + critic.Validity = caddy.Duration(validity) + case "threshold": + if !d.NextArg() { + return nil, d.ArgErr() + } + + threshold, err := time.ParseDuration(d.Val()) + if err != nil { + return nil, d.WrapErr(err) + } + + critic.Threshold = caddy.Duration(threshold) + default: + return nil, d.ArgErr() + } + + if !d.NextLine() { + return nil, d.EOFErr() + } + } + + return critic, nil } diff --git a/lib/gat/handlers/pool/critics/replication/critic.go b/lib/gat/handlers/pool/critics/replication/critic.go new file mode 100644 index 0000000000000000000000000000000000000000..4b6fd58e2058b1d6efdd858d144551307d3fcf5b --- /dev/null +++ b/lib/gat/handlers/pool/critics/replication/critic.go @@ -0,0 +1,58 @@ +package replication + +import ( + "context" + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" + "gfx.cafe/gfx/pggat/lib/gsql" + "github.com/caddyserver/caddy/v2" + "time" +) + +func init() { + caddy.RegisterModule((*Critic)(nil)) +} + +type Critic struct { + Threshold caddy.Duration `json:"threshold"` + Validity caddy.Duration `json:"validity"` +} + +func (T *Critic) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "pggat.handlers.pool.critics.replication", + New: func() caddy.Module { + return new(Critic) + }, + } +} + +type replicationLagQueryResult struct { + Lag *float64 `sql:"0"` +} + +const replicationLagQuery = `SELECT CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0 ELSE EXTRACT (epoch from (now() - pg_last_xact_replay_timestamp())) END AS lag;` + +func (T *Critic) Taste(ctx context.Context, conn *fed.Conn) (int, time.Duration, error) { + var result replicationLagQueryResult + + start := time.Now() + err := gsql.Query(ctx, conn, []any{&result}, replicationLagQuery) + if err != nil { + return 0, time.Duration(0), err + } + + penalty := 0 + + if result.Lag != nil { + penalty = int(*result.Lag / time.Duration(T.Threshold).Seconds()) + } else { + dur := time.Since(start) + penalty = int(dur / time.Duration(T.Threshold)) + } + + return penalty, time.Duration(T.Validity), nil +} + +var _ pool.Critic = (*Critic)(nil) +var _ caddy.Module = (*Critic)(nil)