good morning!!!!

Skip to content
Snippets Groups Projects
Commit 0177bf8a authored by Thomas Guinther's avatar Thomas Guinther
Browse files

Merge branch 'tomg_critic' into tomg_tracing

parents 46e9dc40 ea1e895d
No related branches found
No related tags found
No related merge requests found
Pipeline #91249 passed with stages
in 14 minutes and 51 seconds
:5433 {
ssl self_signed
pool /base {
pool basic session {
penalize latency
penalize latency 500ms
penalize latency 500ms 3m
penalize latency {
}
penalize latency {
query_threshold 500ms
}
penalize latency {
validity 3m
}
penalize latency {
query_threshold 500ms
validity 3m
}
penalize query_latency
penalize query_latency 500ms
penalize query_latency 500ms 3m
penalize query_latency {
}
penalize query_latency {
query_threshold 500ms
}
penalize query_latency {
validity 3m
}
penalize query_latency {
query_threshold 500ms
validity 3m
}
}
address localhost:5432
username postgres
password postgres
database postgres
}
}
:5433 {
ssl self_signed
pool /base {
pool basic session {
penalize replication_latency {
}
penalize replication_latency {
replication_threshold 7m
}
penalize replication_latency {
query_threshold 500ms
}
penalize replication_latency {
replication_threshold 7m
query_threshold 500ms
}
penalize replication_latency {
validity 3m
}
penalize replication_latency {
replication_threshold 7m
validity 3m
}
penalize replication_latency {
query_threshold 500ms
validity 3m
}
penalize replication_latency {
replication_threshold 7m
query_threshold 500ms
validity 3m
}
}
address localhost:5432
username postgres
password postgres
database postgres
}
}
package gatcaddyfile
import (
"time"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool/critics/replication"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
......@@ -11,31 +10,164 @@ import (
)
func init() {
// Register a directive for the latency critic which measures query response
// time as the determining factor for load balancing between replicas
//
// Config Format
//
// * All fields are optional and will fall back to a suitable default
// * Duration values use caddy.Duration syntax
//
// penalize query_latency [query_threshold duration [validity duration]]
//
// -or-
//
// penalize query_latency {
// [query_threshold] {duration}
// [validity] {duration}
// }
//
// pool basic session {
// penalize query_latency # use the defaults
// penalize query_latency 500ms # set query threshold w/ default validity
// penalize query_latency 500ms 3m # set query threshold and validity
// penalize query_latency {
// query_threshold 300ms
// validity 5m
// }
// }
RegisterDirective(Critic, "query_latency", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
return parseQueryCritic(d, warnings)
})
// legacy directive for query_latency
RegisterDirective(Critic, "latency", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
module := &latency.Critic{
Validity: caddy.Duration(5 * time.Minute),
}
return parseQueryCritic(d, warnings)
})
if !d.NextArg() {
return nil, d.ArgErr()
// Register a directive handler for the replication critic which uses
// replication lag as a determining factor for load balancing between
// replicas
//
// Config format
//
// * All fields are optional and will fall back to a suitable default
// * Duration values use caddy.Duration syntax
//
// pool basic session {
// penalize replication_latency # valid declaration. Use default values
//
// penalize replication_latency {
// replication_threshold 3s
// query_threshold 300ms
// validity 5m
// }
// }
RegisterDirective(Critic, "replication_latency", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
module := replication.NewCritic()
for nesting := d.Nesting(); d.NextBlock(nesting); {
directive := d.Val()
switch directive {
case "validity":
if !d.NextArg() {
return nil, d.ArgErr()
}
validity, err := caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.Validity = caddy.Duration(validity)
case "replication_threshold":
if !d.NextArg() {
return nil, d.ArgErr()
}
threshold, err := caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.ReplicationThreshold = caddy.Duration(threshold)
case "query_threshold":
if !d.NextArg() {
return nil, d.ArgErr()
}
threshold, err := caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.QueryThreshold = caddy.Duration(threshold)
default:
return nil, d.ArgErr()
}
}
threshold, err := caddy.ParseDuration(d.Val())
return module, nil
})
}
func parseQueryCritic(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
module := latency.NewCritic()
// use the defaults
if d.NextArg() {
// parse legacy format
dur, err := caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.Threshold = caddy.Duration(threshold)
module.QueryThreshold = caddy.Duration(dur)
if d.NextArg() {
// optional validity
var validity time.Duration
validity, err = caddy.ParseDuration(d.Val())
dur, err = caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.Validity = caddy.Duration(validity)
module.Validity = caddy.Duration(dur)
}
} else {
return parseLatency(module,d,warnings)
}
return module, nil
})
return module, nil
}
func parseLatency(critic *latency.Critic, d *caddyfile.Dispenser, _ *[]caddyconfig.Warning) (caddy.Module, error) {
for nesting := d.Nesting(); d.NextBlock(nesting); {
directive := d.Val()
switch directive {
case "validity":
if !d.NextArg() {
return nil, d.ArgErr()
}
validity, err := caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
critic.Validity = caddy.Duration(validity)
case "query_threshold":
if !d.NextArg() {
return nil, d.ArgErr()
}
threshold, err := caddy.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
critic.QueryThreshold = caddy.Duration(threshold)
default:
return nil, d.ArgErr()
}
}
return critic, nil
}
......@@ -452,15 +452,7 @@ func init() {
}
module.Recipe.Dialer.RawPassword = d.Val()
} else {
if !d.NextBlock(d.Nesting()) {
return nil, d.ArgErr()
}
for {
if d.Val() == "}" {
break
}
for nesting := d.Nesting(); d.NextBlock(nesting); {
directive := d.Val()
switch directive {
case "pool":
......@@ -540,10 +532,6 @@ func init() {
default:
return nil, d.ArgErr()
}
if !d.NextLine() {
return nil, d.EOFErr()
}
}
}
......
......@@ -59,18 +59,10 @@ func init() {
return &module, nil
}
} else {
if !d.NextBlock(d.Nesting()) {
return nil, d.ArgErr()
}
module.TrackedParameters = nil
}
for {
if d.Val() == "}" {
break
}
for nesting := d.Nesting(); d.NextBlock(nesting); {
directive := d.Val()
switch directive {
case "pooler":
......@@ -182,12 +174,6 @@ func init() {
return nil, d.ArgErr()
}
module.TrackedParameters = append(module.TrackedParameters, strutil.MakeCIString(d.Val()))
case "penalize":
if !d.NextArg() {
return nil, d.ArgErr()
}
critic, err := UnmarshalDirectiveJSONModuleObject(
d,
Critic,
......@@ -202,14 +188,11 @@ func init() {
default:
return nil, d.ArgErr()
}
if !d.NextLine() {
return nil, d.EOFErr()
}
}
return &module, nil
})
RegisterDirective(Pool, "hybrid", func(d *caddyfile.Dispenser, warnings *[]caddyconfig.Warning) (caddy.Module, error) {
module := hybrid.Factory{
Config: hybrid.Config{
......@@ -221,76 +204,66 @@ func init() {
},
}
if d.NextBlock(d.Nesting()) {
module.TrackedParameters = nil
module.TrackedParameters = nil
for {
if d.Val() == "}" {
break
for nesting := d.Nesting(); d.NextBlock(nesting); {
directive := d.Val()
switch directive {
case "idle_timeout":
if !d.NextArg() {
return nil, d.ArgErr()
}
directive := d.Val()
switch directive {
case "idle_timeout":
if !d.NextArg() {
return nil, d.ArgErr()
}
val, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
val, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
module.ServerIdleTimeout = caddy.Duration(val)
case "reconnect":
if !d.NextArg() {
return nil, d.ArgErr()
}
module.ServerIdleTimeout = caddy.Duration(val)
case "reconnect":
if !d.NextArg() {
return nil, d.ArgErr()
}
initialTime, err := time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
initialTime, err := time.ParseDuration(d.Val())
maxTime := initialTime
if d.NextArg() {
maxTime, err = time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
}
maxTime := initialTime
if d.NextArg() {
maxTime, err = time.ParseDuration(d.Val())
if err != nil {
return nil, d.WrapErr(err)
}
}
module.ServerReconnectInitialTime = caddy.Duration(initialTime)
module.ServerReconnectMaxTime = caddy.Duration(maxTime)
case "track":
if !d.NextArg() {
return nil, d.ArgErr()
}
module.TrackedParameters = append(module.TrackedParameters, strutil.MakeCIString(d.Val()))
case "penalize":
if !d.NextArg() {
return nil, d.ArgErr()
}
critic, err := UnmarshalDirectiveJSONModuleObject(
d,
Critic,
"critic",
warnings,
)
if err != nil {
return nil, err
}
module.ServerReconnectInitialTime = caddy.Duration(initialTime)
module.ServerReconnectMaxTime = caddy.Duration(maxTime)
case "track":
if !d.NextArg() {
return nil, d.ArgErr()
}
module.RawCritics = append(module.RawCritics, critic)
default:
module.TrackedParameters = append(module.TrackedParameters, strutil.MakeCIString(d.Val()))
case "penalize":
if !d.NextArg() {
return nil, d.ArgErr()
}
if !d.NextLine() {
return nil, d.EOFErr()
critic, err := UnmarshalDirectiveJSONModuleObject(
d,
Critic,
"critic",
warnings,
)
if err != nil {
return nil, err
}
module.RawCritics = append(module.RawCritics, critic)
default:
return nil, d.ArgErr()
}
}
......
......@@ -15,9 +15,18 @@ func init() {
caddy.RegisterModule((*Critic)(nil))
}
// Critic describes a replication critic which measures a penalty due to
// query lag
type Critic struct {
Threshold caddy.Duration `json:"threshold"`
Validity caddy.Duration `json:"validity"`
QueryThreshold caddy.Duration `json:"query_threshold"`
Validity caddy.Duration `json:"validity"`
}
func NewCritic() *Critic {
return &Critic{
QueryThreshold: caddy.Duration(time.Millisecond * 300),
Validity: caddy.Duration(time.Minute * 5),
}
}
func (T *Critic) CaddyModule() caddy.ModuleInfo {
......@@ -36,7 +45,7 @@ func (T *Critic) Taste(ctx context.Context, conn *fed.Conn) (int, time.Duration,
return 0, time.Duration(T.Validity), err
}
dur := time.Since(start)
penalty := int(dur / time.Duration(T.Threshold))
penalty := int(dur / time.Duration(T.QueryThreshold))
return penalty, time.Duration(T.Validity), nil
}
......
package replication
import (
"context"
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
"gfx.cafe/gfx/pggat/lib/gsql"
"github.com/caddyserver/caddy/v2"
"time"
)
func init() {
caddy.RegisterModule((*Critic)(nil))
}
// Critic describes a replication critic which measures replication lag,
// with a fallback to query latency when there is no *measurable* lag
type Critic struct {
QueryThreshold caddy.Duration `json:"query_threshold"`
ReplicationThreshold caddy.Duration `json:"replication_threshold"`
Validity caddy.Duration `json:"validity"`
}
func NewCritic() *Critic {
return &Critic{
QueryThreshold: caddy.Duration(time.Millisecond * 300),
ReplicationThreshold: caddy.Duration(time.Second * 3),
Validity: caddy.Duration(time.Minute * 5),
}
}
func (T *Critic) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "pggat.handlers.pool.critics.replication",
New: func() caddy.Module {
return new(Critic)
},
}
}
type replicationLagQueryResult struct {
Lag *float64 `sql:"0"`
}
const replicationLagQuery = `SELECT CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0 ELSE EXTRACT (epoch from (now() - pg_last_xact_replay_timestamp())) END AS lag;`
func (T *Critic) Taste(ctx context.Context, conn *fed.Conn) (int, time.Duration, error) {
var result replicationLagQueryResult
start := time.Now()
err := gsql.Query(ctx, conn, []any{&result}, replicationLagQuery)
if err != nil {
return 0, time.Duration(0), err
}
penalty := 0
if (result.Lag != nil) && (*result.Lag > 0) {
penalty = int(*result.Lag / time.Duration(T.ReplicationThreshold).Seconds())
} else {
dur := time.Since(start)
penalty = int(dur / time.Duration(T.QueryThreshold))
}
return penalty, time.Duration(T.Validity), nil
}
var _ pool.Critic = (*Critic)(nil)
var _ caddy.Module = (*Critic)(nil)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment