diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index bc97e446d684ffdc26ebbec946d55dd3af2faccb..e9559519805b2406517fbe71cefdb5a2e6a760c4 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -4,10 +4,9 @@ import ( "log" "net/http" _ "net/http/pprof" - "time" "pggat2/lib/gat" - "pggat2/lib/gat/pools/session" + "pggat2/lib/gat/configs/pgbouncer" ) func main() { @@ -17,74 +16,14 @@ func main() { log.Println("Starting pggat...") - pooler := gat.NewPooler() - - // create user - postgres := gat.NewUser("pw") - pooler.AddUser("postgres", postgres) - - // create pool - /* - rawPool := transaction.NewPool() - pool := gat.NewPool(rawPool, 15*time.Second) - postgres.AddPool("uniswap", pool) - pool.AddRecipe("localhost", gat.TCPRecipe{ - Database: "uniswap", - Address: "localhost:5432", - User: "postgres", - Password: "password", - MinConnections: 0, - MaxConnections: 5, - }) - */ - { - rawPool := session.NewPool(false) - pool := gat.NewPool(rawPool, 15*time.Second) - postgres.AddPool("postgres", pool) - pool.AddRecipe("localhost", gat.TCPRecipe{ - Database: "postgres", - Address: "localhost:5432", - User: "postgres", - Password: "password", - MinConnections: 0, - MaxConnections: 5, - }) + conf, err := pgbouncer.Load() + if err != nil { + panic(err) } - rawPool := session.NewPool(false) - pool := gat.NewPool(rawPool, 15*time.Second) - postgres.AddPool("regression", pool) - pool.AddRecipe("localhost", gat.TCPRecipe{ - Database: "regression", - Address: "localhost:5432", - User: "postgres", - Password: "password", - MinConnections: 0, - MaxConnections: 5, - }) - - /*go func() { - var metrics rob.Metrics - for { - time.Sleep(1 * time.Second) - rawPool.ReadSchedulerMetrics(&metrics) - log.Println(metrics.String()) - } - }() - */ - go func() { - var metrics session.Metrics - - for { - time.Sleep(1 * time.Second) - rawPool.ReadMetrics(&metrics) - log.Println(metrics.String()) - } - }() - - log.Println("Listening on :6432") + pooler := gat.NewPooler() - err := pooler.ListenAndServe(":6432") + err = conf.ListenAndServe(pooler) if err != nil { panic(err) } diff --git a/lib/gat/configs/pgbouncer/config.go b/lib/gat/configs/pgbouncer/config.go index 008470791dace63dc004c54f53787425d79cfc36..aa464449a8a561704e0661e5e810e32d921174b1 100644 --- a/lib/gat/configs/pgbouncer/config.go +++ b/lib/gat/configs/pgbouncer/config.go @@ -1,5 +1,17 @@ package pgbouncer +import ( + "errors" + "net" + "strconv" + "time" + + "pggat2/lib/gat" + "pggat2/lib/gat/pools/session" + "pggat2/lib/gat/pools/transaction" + "pggat2/lib/util/ini" +) + type PoolMode string const ( @@ -54,7 +66,7 @@ type PgBouncer struct { LogFile string `ini:"logfile"` PidFile string `ini:"pidfile"` ListenAddr string `ini:"listen_addr"` - ListenPort string `ini:"listen_port"` + ListenPort int `ini:"listen_port"` UnixSocketDir string `ini:"unix_socket_dir"` UnixSocketMode string `ini:"unix_socket_mode"` UnixSocketGroup string `ini:"unix_socket_group"` @@ -175,3 +187,147 @@ type Config struct { Users map[string]User `ini:"users"` Peers map[string]Peer `ini:"peers"` } + +var Default = Config{ + PgBouncer: PgBouncer{ + ListenPort: 6432, + UnixSocketDir: "/tmp", + UnixSocketMode: "0777", + PoolMode: PoolModeSession, + MaxClientConn: 100, + DefaultPoolSize: 20, + ReservePoolTimeout: 5.0, + TrackExtraParameters: []string{ + "IntervalStyle", + }, + ServiceName: "pgbouncer", + StatsPeriod: 60, + AuthQuery: "SELECT usename, passwd FROM pg_shadow WHERE usename=$1", + SyslogIdent: "pgbouncer", + SyslogFacility: "daemon", + LogConnections: 1, + LogDisconnections: 1, + LogPoolerErrors: 1, + LogStats: 1, + ServerResetQuery: "DISCARD ALL", + ServerCheckDelay: 30.0, + ServerCheckQuery: "select 1", + ServerLifetime: 3600.0, + ServerIdleTimeout: 600.0, + ServerConnectTimeout: 15.0, + ServerLoginRetry: 15.0, + ClientLoginTimeout: 60.0, + AutodbIdleTimeout: 3600.0, + DnsMaxTtl: 15.0, + DnsNxdomainTtl: 15.0, + ClientTLSSSLMode: SSLModeDisable, + ClientTLSProtocols: []TLSProtocol{ + TLSProtocolSecure, + }, + ClientTLSCiphers: []TLSCipher{ + "fast", + }, + ClientTLSECDHCurve: "auto", + ServerTLSSSLMode: SSLModePrefer, + ServerTLSProtocols: []TLSProtocol{ + TLSProtocolSecure, + }, + ServerTLSCiphers: []TLSCipher{ + "fast", + }, + QueryWaitTimeout: 120.0, + CancelWaitTimeout: 10.0, + SuspendTimeout: 10.0, + PktBuf: 4096, + MaxPacketSize: 2147483647, + ListenBacklog: 128, + SbufLoopcnt: 5, + TcpDeferAccept: 1, + TcpKeepalive: 1, + }, +} + +func Load() (Config, error) { + conf, err := ini.ReadFile("pgbouncer.ini") + if err != nil { + return Config{}, err + } + + var c = Default + err = ini.Unmarshal(conf, &c) + return c, err +} + +func (T *Config) ListenAndServe(pooler *gat.Pooler) error { + for name, user := range T.Users { + u := gat.NewUser("pw") // TODO(garet) passwords + pooler.AddUser(name, u) + + for dbname, db := range T.Databases { + if db.User != "" && db.User != name { + continue + } + + var poolMode PoolMode + if user.PoolMode != "" { + poolMode = user.PoolMode + } else if db.PoolMode != "" { + poolMode = db.PoolMode + } else { + poolMode = T.PgBouncer.PoolMode + } + + var raw gat.RawPool + switch poolMode { + case PoolModeSession: + raw = session.NewPool(T.PgBouncer.ServerRoundRobin != 0) + case PoolModeTransaction: + raw = transaction.NewPool() + default: + return errors.New("unsupported pool mode") + } + + p := gat.NewPool(raw, time.Duration(T.PgBouncer.ServerIdleTimeout*float64(time.Second))) + u.AddPool(dbname, p) + + startupParameters := make(map[string]string) + if db.ClientEncoding != "" { + startupParameters["client_encoding"] = db.ClientEncoding + } + if db.Datestyle != "" { + startupParameters["datestyle"] = db.Datestyle + } + if db.Timezone != "" { + startupParameters["timezone"] = db.Timezone + } + + if db.Host == "" { + // connect over unix socket + // TODO(garet) + } else { + // connect over tcp + recipe := gat.TCPRecipe{ + Database: db.DBName, + Address: db.Host, + User: name, + Password: "pw", + MinConnections: db.MinPoolSize, + MaxConnections: db.MaxDBConnections, + StartupParameters: startupParameters, + } + if recipe.MinConnections == 0 { + recipe.MinConnections = T.PgBouncer.MinPoolSize + } + if recipe.MaxConnections == 0 { + recipe.MaxConnections = T.PgBouncer.MaxDBConnections + } + + p.AddRecipe("pgbouncer", recipe) + } + } + } + + return pooler.ListenAndServe( + net.JoinHostPort(T.PgBouncer.ListenAddr, strconv.Itoa(T.PgBouncer.ListenPort)), + ) +} diff --git a/lib/gat/pool.go b/lib/gat/pool.go index a3757c86fbac4b137197da21fbb47deb972cacd1..d72bf5b8632e9a76a7ac8b5fc7fba79e47e8b57d 100644 --- a/lib/gat/pool.go +++ b/lib/gat/pool.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" - "pggat2/lib/bouncer/backends/v0" "pggat2/lib/util/maps" "pggat2/lib/util/maths" "pggat2/lib/zap" @@ -36,26 +35,6 @@ type PoolRecipe struct { r Recipe } -func (T *PoolRecipe) connect() (zap.ReadWriter, map[string]string, error) { - rw, err := T.r.Connect() - if err != nil { - return nil, nil, err - } - - startupParameters := T.r.GetStartupParameters() - parameterStatus := make(map[string]string, len(startupParameters)) - for k, v := range startupParameters { - parameterStatus[k] = v - } - - err = backends.Accept(rw, T.r.GetUser(), T.r.GetPassword(), T.r.GetDatabase(), parameterStatus) - if err != nil { - return nil, nil, err - } - - return rw, parameterStatus, nil -} - type Pool struct { recipes maps.RWLocked[string, *PoolRecipe] @@ -121,9 +100,13 @@ func (T *Pool) _tryAddServers(recipe *PoolRecipe, amount int) (remaining int) { } recipe.servers = recipe.servers[:j] - max := maths.Min(recipe.r.GetMaxConnections()-j, amount) + var max = amount + maxConnections := recipe.r.GetMaxConnections() + if maxConnections != 0 { + max = maths.Min(maxConnections-j, max) + } for i := 0; i < max; i++ { - conn, ps, err := recipe.connect() + conn, ps, err := recipe.r.Connect() if err != nil { log.Printf("error connecting to server: %v", err) continue diff --git a/lib/gat/recipe.go b/lib/gat/recipe.go index 55b4232cb6d0362ce2e4762591b79a011d2f17d1..1a3398902ea0e876c3be5594ed119e8db5a6f284 100644 --- a/lib/gat/recipe.go +++ b/lib/gat/recipe.go @@ -3,19 +3,17 @@ package gat import ( "net" + "pggat2/lib/bouncer/backends/v0" + "pggat2/lib/util/maps" "pggat2/lib/zap" ) type Recipe interface { - Connect() (zap.ReadWriter, error) - - GetDatabase() string - GetUser() string - GetPassword() string - - GetStartupParameters() map[string]string + Connect() (zap.ReadWriter, map[string]string, error) GetMinConnections() int + // GetMaxConnections returns the maximum amount of connections for this db + // Return 0 for unlimited connections GetMaxConnections() int } @@ -27,15 +25,25 @@ type TCPRecipe struct { MinConnections int MaxConnections int + + StartupParameters map[string]string } -func (T TCPRecipe) Connect() (zap.ReadWriter, error) { +func (T TCPRecipe) Connect() (zap.ReadWriter, map[string]string, error) { conn, err := net.Dial("tcp", T.Address) if err != nil { - return nil, err + return nil, nil, err } rw := zap.WrapIOReadWriter(conn) - return rw, nil + + parameterStatus := maps.Clone(T.StartupParameters) + + err = backends.Accept(rw, T.User, T.Password, T.Database, T.StartupParameters) + if err != nil { + return nil, nil, err + } + + return rw, parameterStatus, nil } func (T TCPRecipe) GetDatabase() string { @@ -51,7 +59,7 @@ func (T TCPRecipe) GetPassword() string { } func (T TCPRecipe) GetStartupParameters() map[string]string { - return nil + return T.StartupParameters } func (T TCPRecipe) GetMinConnections() int { diff --git a/lib/util/maps/clone.go b/lib/util/maps/clone.go new file mode 100644 index 0000000000000000000000000000000000000000..d74c7429cff93dda0a5e395a2687d695950970b1 --- /dev/null +++ b/lib/util/maps/clone.go @@ -0,0 +1,9 @@ +package maps + +func Clone[K comparable, V any](value map[K]V) map[K]V { + m := make(map[K]V, len(value)) + for k, v := range value { + m[k] = v + } + return m +}