diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index aed9645e151ee8ac2afc24de99d13d73f5c4bf7a..ce495d6d84856a1750d1eed0cba94404135b465e 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -9,8 +9,6 @@ import ( "tuxpa.in/a/zlog/log" "pggat/lib/gat" - "pggat/lib/gat/modules/cloud_sql_discovery" - "pggat/lib/gat/modules/digitalocean_discovery" "pggat/lib/gat/modules/pgbouncer" "pggat/lib/gat/modules/zalando" "pggat/lib/gat/modules/zalando_operator_discovery" @@ -42,7 +40,7 @@ func loadModule(mode string) (gat.Module, error) { return nil, err } return zalando_operator_discovery.NewModule(conf) - case "google_cloud_sql": + /*case "google_cloud_sql": conf, err := cloud_sql_discovery.Load() if err != nil { return nil, err @@ -53,7 +51,7 @@ func loadModule(mode string) (gat.Module, error) { if err != nil { return nil, err } - return digitalocean_discovery.NewModule(conf) + return digitalocean_discovery.NewModule(conf)*/ default: return nil, errors.New("Unknown PGGAT_RUN_MODE: " + mode) } diff --git a/lib/fed/packet.go b/lib/fed/packet.go index f50496535871c5cfab50fcf8bd85ae2da2a242ad..8c60da3c875ffe5034dee2c9153249ae9c0ad0a6 100644 --- a/lib/fed/packet.go +++ b/lib/fed/packet.go @@ -146,10 +146,6 @@ func (T Packet) ReadBytes(v []byte) PacketFragment { return T.Payload().ReadBytes(v) } -func (T Packet) Done() { - // TODO(garet) -} - type PacketFragment []byte func (T PacketFragment) ReadUint8(v *uint8) PacketFragment { diff --git a/lib/gat/modules/zalando_operator_discovery/config.go b/lib/gat/modules/zalando_operator_discovery/config.go index 67db2c76fc9861c0a698a9b2d3643a27d8e9c9d0..6cafce6e951dbefa49889670f96358d700049542 100644 --- a/lib/gat/modules/zalando_operator_discovery/config.go +++ b/lib/gat/modules/zalando_operator_discovery/config.go @@ -26,11 +26,3 @@ func Load() (Config, error) { } return config, nil } - -func (T *Config) ListenAndServe() error { - server, err := NewServer(T) - if err != nil { - return err - } - return server.ListenAndServe() -} diff --git a/lib/gat/modules/zalando_operator_discovery/discoverer.go b/lib/gat/modules/zalando_operator_discovery/discoverer.go new file mode 100644 index 0000000000000000000000000000000000000000..cedb91bedd97215934506654da310acc150c2c9b --- /dev/null +++ b/lib/gat/modules/zalando_operator_discovery/discoverer.go @@ -0,0 +1,149 @@ +package zalando_operator_discovery + +import ( + "context" + "fmt" + "strings" + + acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "pggat/lib/gat/modules/discovery" +) + +type Discoverer struct { + config Config + + op *config.Config + + k8s k8sutil.KubernetesClient +} + +func NewDiscoverer(conf Config) (*Discoverer, error) { + k8s, err := k8sutil.NewFromConfig(conf.Rest) + if err != nil { + return nil, err + } + + var op *config.Config + if conf.ConfigMapName != "" { + operatorConfig, err := k8s.ConfigMaps(conf.Namespace).Get(context.Background(), conf.ConfigMapName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + op = config.NewFromMap(operatorConfig.Data) + } else if conf.OperatorConfigurationObject != "" { + operatorConfig, err := k8s.OperatorConfigurations(conf.Namespace).Get(context.Background(), conf.OperatorConfigurationObject, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + op = new(config.Config) + + // why did they do this to me + op.ClusterDomain = util.Coalesce(operatorConfig.Configuration.Kubernetes.ClusterDomain, "cluster.local") + + op.SecretNameTemplate = operatorConfig.Configuration.Kubernetes.SecretNameTemplate + + op.ConnectionPooler.NumberOfInstances = util.CoalesceInt32( + operatorConfig.Configuration.ConnectionPooler.NumberOfInstances, + k8sutil.Int32ToPointer(2)) + + op.ConnectionPooler.Mode = util.Coalesce( + operatorConfig.Configuration.ConnectionPooler.Mode, + constants.ConnectionPoolerDefaultMode) + + op.ConnectionPooler.MaxDBConnections = util.CoalesceInt32( + operatorConfig.Configuration.ConnectionPooler.MaxDBConnections, + k8sutil.Int32ToPointer(constants.ConnectionPoolerMaxDBConnections)) + } else { + // defaults + op = config.NewFromMap(make(map[string]string)) + } + + return &Discoverer{ + config: conf, + op: op, + k8s: k8s, + }, nil +} + +func (T *Discoverer) Clusters() ([]discovery.Cluster, error) { + clusters, err := T.k8s.Postgresqls(T.config.Namespace).List(context.Background(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + + res := make([]discovery.Cluster, 0, len(clusters.Items)) + for _, cluster := range clusters.Items { + c := discovery.Cluster{ + ID: string(cluster.UID), + Primary: discovery.Endpoint{ + Network: "tcp", + Address: fmt.Sprintf("%s.%s.svc.%s:5432", cluster.Name, T.config.Namespace, T.op.ClusterDomain), + }, + Databases: make([]string, 0, len(cluster.Spec.Databases)), + Users: make([]discovery.User, 0, len(cluster.Spec.Users)), + } + if cluster.Spec.NumberOfInstances > 1 { + c.Replicas = make(map[string]discovery.Endpoint, 1) + c.Replicas["repl"] = discovery.Endpoint{ + Network: "tcp", + Address: fmt.Sprintf("%s-repl.%s.svc.%s:5432", cluster.Name, T.config.Namespace, T.op.ClusterDomain), + } + } + + for user := range cluster.Spec.Users { + secretName := T.op.SecretNameTemplate.Format( + "username", strings.Replace(user, "_", "-", -1), + "cluster", cluster.Name, + "tprkind", acidv1.PostgresCRDResourceKind, + "tprgroup", acidzalando.GroupName, + ) + + // get secret + secret, err := T.k8s.Secrets(T.config.Namespace).Get(context.Background(), secretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + password, ok := secret.Data["password"] + if !ok { + return nil, fmt.Errorf("no password in secret: %s", secretName) + } + + c.Users = append(c.Users, discovery.User{ + Username: user, + Password: string(password), + }) + } + + for database := range cluster.Spec.Databases { + c.Databases = append(c.Databases, database) + } + + res = append(res, c) + } + + return res, nil +} + +func (T *Discoverer) Added() <-chan discovery.Cluster { + return nil // TODO(garet) +} + +func (T *Discoverer) Updated() <-chan discovery.Cluster { + return nil // TODO(garet) +} + +func (T *Discoverer) Removed() <-chan string { + return nil // TODO(garet) +} + +var _ discovery.Discoverer = (*Discoverer)(nil) diff --git a/lib/gat/modules/zalando_operator_discovery/module.go b/lib/gat/modules/zalando_operator_discovery/module.go index b3bfcb14e8cf455ded17f33bd7b6b821e977cb90..614611c0f0af9bc20172531728a7bff00a584319 100644 --- a/lib/gat/modules/zalando_operator_discovery/module.go +++ b/lib/gat/modules/zalando_operator_discovery/module.go @@ -1,9 +1,71 @@ package zalando_operator_discovery import ( + "crypto/tls" + "time" + + "pggat/lib/bouncer" + "pggat/lib/gat" "pggat/lib/gat/modules/discovery" + "pggat/lib/util/strutil" ) -func NewModule(config Config) (*discovery.Module, error) { - return discovery.NewModule(discovery.Config{}) +type Module struct { + *discovery.Module +} + +func NewModule(config Config) (Module, error) { + d, err := NewDiscoverer(config) + if err != nil { + return Module{}, err + } + m, err := discovery.NewModule(discovery.Config{ + ReconcilePeriod: 1 * time.Minute, + Discoverer: d, + ServerSSLMode: bouncer.SSLModePrefer, + ServerSSLConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + ServerReconnectInitialTime: 5 * time.Second, + ServerReconnectMaxTime: 5 * time.Second, + ServerIdleTimeout: 5 * time.Minute, + // ServerResetQuery: "discard all", + TrackedParameters: []strutil.CIString{ + strutil.MakeCIString("client_encoding"), + strutil.MakeCIString("datestyle"), + strutil.MakeCIString("timezone"), + strutil.MakeCIString("standard_conforming_strings"), + strutil.MakeCIString("application_name"), + }, + PoolMode: "transaction", // TODO(garet) pool mode from operator config + }) + if err != nil { + return Module{}, err + } + return Module{ + Module: m, + }, nil } + +func (T Module) Endpoints() []gat.Endpoint { + return []gat.Endpoint{ + { + Network: "tcp", + Address: ":5432", + AcceptOptions: gat.FrontendAcceptOptions{ + AllowedStartupOptions: []strutil.CIString{ + strutil.MakeCIString("client_encoding"), + strutil.MakeCIString("datestyle"), + strutil.MakeCIString("timezone"), + strutil.MakeCIString("standard_conforming_strings"), + strutil.MakeCIString("application_name"), + strutil.MakeCIString("extra_float_digits"), + strutil.MakeCIString("options"), + }, + // TODO(garet) ssl config + }, + }, + } +} + +var _ gat.Listener = Module{} diff --git a/lib/gat/modules/zalando_operator_discovery/server.go b/lib/gat/modules/zalando_operator_discovery/server.go deleted file mode 100644 index 84eab85ed13bce9d1707fe9c27dce066cab6039a..0000000000000000000000000000000000000000 --- a/lib/gat/modules/zalando_operator_discovery/server.go +++ /dev/null @@ -1,368 +0,0 @@ -package zalando_operator_discovery - -import ( - "context" - "crypto/tls" - "fmt" - "strings" - "time" - - acidzalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" - "github.com/zalando/postgres-operator/pkg/util" - "github.com/zalando/postgres-operator/pkg/util/config" - "github.com/zalando/postgres-operator/pkg/util/constants" - "github.com/zalando/postgres-operator/pkg/util/k8sutil" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" - "tuxpa.in/a/zlog/log" - - "pggat/lib/auth" - "pggat/lib/auth/credentials" - "pggat/lib/bouncer" - "pggat/lib/bouncer/backends/v0" - "pggat/lib/bouncer/frontends/v0" - "pggat/lib/gat" - "pggat/lib/gat/metrics" - "pggat/lib/gat/pool" - "pggat/lib/gat/pool/pools/session" - "pggat/lib/gat/pool/pools/transaction" - "pggat/lib/gat/pool/recipe" - "pggat/lib/util/flip" - "pggat/lib/util/strutil" -) - -type mapKey struct { - User string - Database string -} - -type toAddDetails struct { - SecretUser string - Name string -} - -type Server struct { - config *Config - - opConfig *config.Config - - k8s k8sutil.KubernetesClient - - postgresqlInformer cache.SharedIndexInformer - - pools gat.PoolsMap -} - -func NewServer(config *Config) (*Server, error) { - srv := &Server{ - config: config, - } - if err := srv.init(); err != nil { - return nil, err - } - return srv, nil -} - -func (T *Server) init() error { - var err error - T.k8s, err = k8sutil.NewFromConfig(T.config.Rest) - if err != nil { - return err - } - - if T.config.ConfigMapName != "" { - operatorConfig, err := T.k8s.ConfigMaps(T.config.Namespace).Get(context.Background(), T.config.ConfigMapName, metav1.GetOptions{}) - if err != nil { - return err - } - - T.opConfig = config.NewFromMap(operatorConfig.Data) - } else if T.config.OperatorConfigurationObject != "" { - operatorConfig, err := T.k8s.OperatorConfigurations(T.config.Namespace).Get(context.Background(), T.config.OperatorConfigurationObject, metav1.GetOptions{}) - if err != nil { - return err - } - - T.opConfig = new(config.Config) - - // why did they do this to me - T.opConfig.ClusterDomain = util.Coalesce(operatorConfig.Configuration.Kubernetes.ClusterDomain, "cluster.local") - - T.opConfig.SecretNameTemplate = operatorConfig.Configuration.Kubernetes.SecretNameTemplate - - T.opConfig.ConnectionPooler.NumberOfInstances = util.CoalesceInt32( - operatorConfig.Configuration.ConnectionPooler.NumberOfInstances, - k8sutil.Int32ToPointer(2)) - - T.opConfig.ConnectionPooler.Mode = util.Coalesce( - operatorConfig.Configuration.ConnectionPooler.Mode, - constants.ConnectionPoolerDefaultMode) - - T.opConfig.ConnectionPooler.MaxDBConnections = util.CoalesceInt32( - operatorConfig.Configuration.ConnectionPooler.MaxDBConnections, - k8sutil.Int32ToPointer(constants.ConnectionPoolerMaxDBConnections)) - } else { - // defaults - T.opConfig = config.NewFromMap(make(map[string]string)) - } - - T.postgresqlInformer = acidv1informer.NewPostgresqlInformer( - T.k8s.AcidV1ClientSet, - T.config.Namespace, - constants.QueueResyncPeriodTPR, - cache.Indexers{}) - - _, err = T.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - psql, ok := obj.(*acidv1.Postgresql) - if !ok { - return - } - T.addPostgresql(psql) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldPsql, ok := oldObj.(*acidv1.Postgresql) - if !ok { - return - } - newPsql, ok := newObj.(*acidv1.Postgresql) - if !ok { - return - } - T.updatePostgresql(oldPsql, newPsql) - }, - DeleteFunc: func(obj interface{}) { - psql, ok := obj.(*acidv1.Postgresql) - if !ok { - return - } - T.deletePostgresql(psql) - }, - }) - if err != nil { - return err - } - - return nil -} - -func (T *Server) addPostgresql(psql *acidv1.Postgresql) { - T.updatePostgresql(nil, psql) -} - -func (T *Server) addPool(name string, userCreds, serverCreds auth.Credentials, userUser, serverUser, database string) { - d := recipe.Dialer{ - Network: "tcp", - Address: fmt.Sprintf("%s.%s.svc.%s:5432", name, T.config.Namespace, T.opConfig.ClusterDomain), - AcceptOptions: backends.AcceptOptions{ - SSLMode: bouncer.SSLModePrefer, - SSLConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - Username: serverUser, - Credentials: serverCreds, - Database: database, - }, - } - - poolOptions := pool.Options{ - Credentials: userCreds, - ServerReconnectInitialTime: 5 * time.Second, - ServerReconnectMaxTime: 5 * time.Second, - ServerIdleTimeout: 5 * time.Minute, - TrackedParameters: []strutil.CIString{ - strutil.MakeCIString("client_encoding"), - strutil.MakeCIString("datestyle"), - strutil.MakeCIString("timezone"), - strutil.MakeCIString("standard_conforming_strings"), - strutil.MakeCIString("application_name"), - }, - } - switch T.opConfig.Mode { - case "transaction": - poolOptions = transaction.Apply(poolOptions) - case "session": - poolOptions.ServerResetQuery = "discard all" - poolOptions = session.Apply(poolOptions) - default: - log.Printf(`unknown pool mode "%s"`, T.opConfig.Mode) - return - } - p := pool.NewPool(poolOptions) - - var maxConnections int - if T.opConfig.MaxDBConnections != nil { - maxConnections = int(*T.opConfig.MaxDBConnections) - } - - recipeOptions := recipe.Options{ - Dialer: d, - MaxConnections: maxConnections, - } - r := recipe.NewRecipe(recipeOptions) - - p.AddRecipe("service", r) - - T.pools.Add(userUser, database, p) -} - -func (T *Server) updatePostgresql(oldPsql *acidv1.Postgresql, newPsql *acidv1.Postgresql) { - toRemove := make(map[mapKey]struct{}) - toAdd := make(map[mapKey]toAddDetails) - - if oldPsql != nil { - for user := range oldPsql.Spec.Users { - for database := range oldPsql.Spec.Databases { - toRemove[mapKey{ - User: user, - Database: database, - }] = struct{}{} - - if oldPsql.Spec.NumberOfInstances > 1 { - // there are replicas, delete them - toRemove[mapKey{ - User: user + "_ro", - Database: database, - }] = struct{}{} - } - } - } - } - if newPsql != nil { - for user := range newPsql.Spec.Users { - for database := range newPsql.Spec.Databases { - key := mapKey{ - User: user, - Database: database, - } - if _, ok := toRemove[key]; ok { - delete(toRemove, key) - } else { - toAdd[key] = toAddDetails{ - SecretUser: user, - Name: newPsql.Name, - } - } - - if newPsql.Spec.NumberOfInstances > 1 { - key = mapKey{ - User: user + "_ro", - Database: database, - } - if _, ok := toRemove[key]; ok { - delete(toRemove, key) - } else { - toAdd[key] = toAddDetails{ - SecretUser: user, - Name: newPsql.Name + "-repl", - } - } - } - } - } - } - - for pair := range toRemove { - p := T.pools.Remove(pair.User, pair.Database) - if p != nil { - p.Close() - } - log.Print("removed pool username=", pair.User, " database=", pair.Database) - } - - credentialsCache := make(map[string]credentials.Cleartext) - - for pair, details := range toAdd { - creds, ok := credentialsCache[details.SecretUser] - if !ok { - secretName := T.opConfig.SecretNameTemplate.Format( - "username", strings.Replace(details.SecretUser, "_", "-", -1), - "cluster", newPsql.Name, - "tprkind", acidv1.PostgresCRDResourceKind, - "tprgroup", acidzalando.GroupName, - ) - - secret, err := T.k8s.Secrets(T.config.Namespace).Get(context.Background(), secretName, metav1.GetOptions{}) - if err != nil { - log.Printf("error getting secret: %v", err) - return - } - - password, ok := secret.Data["password"] - if !ok { - log.Println("failed to get password in secret :(") - return - } - - creds = credentials.Cleartext{ - Username: details.SecretUser, - Password: string(password), - } - } - userCreds := credentials.Cleartext{ - Username: pair.User, - Password: creds.Password, - } - T.addPool(details.Name, userCreds, creds, pair.User, details.SecretUser, pair.Database) - log.Print("added pool username=", pair.User, " database=", pair.Database) - } -} - -func (T *Server) deletePostgresql(psql *acidv1.Postgresql) { - T.updatePostgresql(psql, nil) -} - -func (T *Server) ListenAndServe() error { - go func() { - var m metrics.Pools - for { - m.Clear() - time.Sleep(1 * time.Minute) - T.pools.ReadMetrics(&m) - log.Print(m.String()) - } - }() - - // load certificate - var sslConfig *tls.Config - certificate, err := tls.LoadX509KeyPair(T.config.TLSCrtFile, T.config.TLSKeyFile) - if err == nil { - sslConfig = &tls.Config{ - Certificates: []tls.Certificate{ - certificate, - }, - } - } else { - log.Printf("failed to load certificate, ssl is disabled") - } - - var bank flip.Bank - - bank.Queue(func() error { - T.postgresqlInformer.Run(make(chan struct{})) - return nil - }) - - bank.Queue(func() error { - listen := ":5432" // TODO(garet) use port - - log.Printf("listening on %s", listen) - - return gat.ListenAndServe("tcp", listen, frontends.AcceptOptions{ - AllowedStartupOptions: []strutil.CIString{ - strutil.MakeCIString("client_encoding"), - strutil.MakeCIString("datestyle"), - strutil.MakeCIString("timezone"), - strutil.MakeCIString("standard_conforming_strings"), - strutil.MakeCIString("application_name"), - strutil.MakeCIString("extra_float_digits"), - strutil.MakeCIString("options"), - }, - SSLConfig: sslConfig, - }, gat.NewKeyedPools(&T.pools)) - }) - - return bank.Wait() -}