good morning!!!!

Skip to content
Snippets Groups Projects
Commit 8b40a537 authored by Thomas Guinther's avatar Thomas Guinther
Browse files

updated config parser for critics and replication critic

parent 820ff1d7
No related branches found
No related tags found
No related merge requests found
package gatcaddyfile
import (
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool/critics/replication"
"time"
"github.com/caddyserver/caddy/v2"
......@@ -11,11 +12,19 @@ import (
)
func init() {
// Register a directive for the latency critic which measures query response
// time as the determining factor for load balancing between replicas
RegisterDirective(Critic, "latency", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
module := &latency.Critic{
Validity: caddy.Duration(5 * time.Minute),
}
// parse nested format
if d.NextBlock(d.Nesting()) {
return parseLatency(module, d, warnings)
}
// parse legacy format
if !d.NextArg() {
return nil, d.ArgErr()
}
......@@ -38,4 +47,98 @@ func init() {
return module, nil
})
// Register a directive handler for the replication critic which uses
// replication lag as a determining factor for load balancing between replicas
RegisterDirective(Critic, "replication", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
module := &replication.Critic{
Validity: caddy.Duration(2 * time.Minute),
}
if !d.NextBlock(d.Nesting()) {
return nil, d.ArgErr()
}
for {
if d.Val() == "}" {
break
}
directive := d.Val()
switch directive {
case "validity":
if !d.NextArg() {
return nil, d.ArgErr()
}
validity, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.Validity = caddy.Duration(validity)
case "threshold":
if !d.NextArg() {
return nil, d.ArgErr()
}
threshold, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.Threshold = caddy.Duration(threshold)
default:
return nil, d.ArgErr()
}
if !d.NextLine() {
return nil, d.EOFErr()
}
}
return module, nil
})
}
func parseLatency(critic *latency.Critic, d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
for {
if d.Val() == "}" {
break
}
directive := d.Val()
switch directive {
case "validity":
if !d.NextArg() {
return nil, d.ArgErr()
}
validity, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
critic.Validity = caddy.Duration(validity)
case "threshold":
if !d.NextArg() {
return nil, d.ArgErr()
}
threshold, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
critic.Threshold = caddy.Duration(threshold)
default:
return nil, d.ArgErr()
}
if !d.NextLine() {
return nil, d.EOFErr()
}
}
return critic, nil
}
package replication
import (
"context"
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
"gfx.cafe/gfx/pggat/lib/gsql"
"github.com/caddyserver/caddy/v2"
"time"
)
func init() {
caddy.RegisterModule((*Critic)(nil))
}
type Critic struct {
Threshold caddy.Duration `json:"threshold"`
Validity caddy.Duration `json:"validity"`
}
func (T *Critic) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "pggat.handlers.pool.critics.replication",
New: func() caddy.Module {
return new(Critic)
},
}
}
type replicationLagQueryResult struct {
Lag *float64 `sql:"0"`
}
const replicationLagQuery = `SELECT CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0 ELSE EXTRACT (epoch from (now() - pg_last_xact_replay_timestamp())) END AS lag;`
func (T *Critic) Taste(ctx context.Context, conn *fed.Conn) (int, time.Duration, error) {
var result replicationLagQueryResult
start := time.Now()
err := gsql.Query(ctx, conn, []any{&result}, replicationLagQuery)
if err != nil {
return 0, time.Duration(0), err
}
penalty := 0
if result.Lag != nil {
penalty = int(*result.Lag / time.Duration(T.Threshold).Seconds())
} else {
dur := time.Since(start)
penalty = int(dur / time.Duration(T.Threshold))
}
return penalty, time.Duration(T.Validity), nil
}
var _ pool.Critic = (*Critic)(nil)
var _ caddy.Module = (*Critic)(nil)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment