diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index ce495d6d84856a1750d1eed0cba94404135b465e..ef53348071260fa3211062506d6fb11361db9e69 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -9,6 +9,7 @@ import ( "tuxpa.in/a/zlog/log" "pggat/lib/gat" + "pggat/lib/gat/modules/digitalocean_discovery" "pggat/lib/gat/modules/pgbouncer" "pggat/lib/gat/modules/zalando" "pggat/lib/gat/modules/zalando_operator_discovery" @@ -41,17 +42,18 @@ func loadModule(mode string) (gat.Module, error) { } return zalando_operator_discovery.NewModule(conf) /*case "google_cloud_sql": - conf, err := cloud_sql_discovery.Load() - if err != nil { - return nil, err - } - return cloud_sql_discovery.NewModule(conf) + conf, err := cloud_sql_discovery.Load() + if err != nil { + return nil, err + } + return cloud_sql_discovery.NewModule(conf) + */ case "digitalocean_databases": conf, err := digitalocean_discovery.Load() if err != nil { return nil, err } - return digitalocean_discovery.NewModule(conf)*/ + return digitalocean_discovery.NewModule(conf) default: return nil, errors.New("Unknown PGGAT_RUN_MODE: " + mode) } diff --git a/lib/gat/modules/digitalocean_discovery/config.go b/lib/gat/modules/digitalocean_discovery/config.go index 0d317a90aa46bd0a8ed3033907c1cf1072f320bd..d228a3fff2a08b53da3e2c6b9c7b6baa058a8e16 100644 --- a/lib/gat/modules/digitalocean_discovery/config.go +++ b/lib/gat/modules/digitalocean_discovery/config.go @@ -1,37 +1,15 @@ package digitalocean_discovery import ( - "context" - "crypto/tls" "errors" - "net" - "strconv" - "time" "gfx.cafe/util/go/gun" - "github.com/digitalocean/godo" - "tuxpa.in/a/zlog/log" - - "pggat/lib/auth/credentials" - "pggat/lib/bouncer" - "pggat/lib/bouncer/backends/v0" - "pggat/lib/bouncer/frontends/v0" - "pggat/lib/gat" - "pggat/lib/gat/metrics" - "pggat/lib/gat/pool" - "pggat/lib/gat/pool/pools/session" - "pggat/lib/gat/pool/pools/transaction" - "pggat/lib/gat/pool/recipe" - "pggat/lib/util/flip" - "pggat/lib/util/strutil" ) type Config struct { - APIKey string `env:"PGGAT_DO_API_KEY"` - Private string `env:"PGGAT_DO_PRIVATE"` - PoolMode string `env:"PGGAT_POOL_MODE"` - TLSCrtFile string `env:"PGGAT_TLS_CRT_FILE" default:"/etc/ssl/certs/pgbouncer.crt"` - TLSKeyFile string `env:"PGGAT_TLS_KEY_FILE" default:"/etc/ssl/certs/pgbouncer.key"` + APIKey string `env:"PGGAT_DO_API_KEY"` + Private bool `env:"PGGAT_DO_PRIVATE"` + PoolMode string `env:"PGGAT_POOL_MODE"` } func Load() (Config, error) { @@ -43,159 +21,3 @@ func Load() (Config, error) { return conf, nil } - -func (T *Config) ListenAndServe() error { - // load certificate - var sslConfig *tls.Config - certificate, err := tls.LoadX509KeyPair(T.TLSCrtFile, T.TLSKeyFile) - if err == nil { - sslConfig = &tls.Config{ - Certificates: []tls.Certificate{ - certificate, - }, - } - } else { - log.Printf("failed to load certificate, ssl is disabled") - } - - client := godo.NewFromToken(T.APIKey) - clusters, _, err := client.Databases.List(context.Background(), nil) - - if err != nil { - return err - } - - var pools gat.PoolsMap - - go func() { - var m metrics.Pools - for { - m.Clear() - time.Sleep(1 * time.Minute) - pools.ReadMetrics(&m) - log.Print(m.String()) - } - }() - - for _, cluster := range clusters { - if cluster.EngineSlug != "pg" { - continue - } - - replicas, _, err := client.Databases.ListReplicas(context.Background(), cluster.ID, nil) - if err != nil { - return err - } - - for _, user := range cluster.Users { - creds := credentials.Cleartext{ - Username: user.Name, - Password: user.Password, - } - - for _, dbname := range cluster.DBNames { - poolOptions := pool.Options{ - Credentials: creds, - ServerReconnectInitialTime: 5 * time.Second, - ServerReconnectMaxTime: 5 * time.Second, - ServerIdleTimeout: 5 * time.Minute, - TrackedParameters: []strutil.CIString{ - strutil.MakeCIString("client_encoding"), - strutil.MakeCIString("datestyle"), - strutil.MakeCIString("timezone"), - strutil.MakeCIString("standard_conforming_strings"), - strutil.MakeCIString("application_name"), - }, - } - if T.PoolMode == "session" { - poolOptions.ServerResetQuery = "DISCARD ALL" - poolOptions = session.Apply(poolOptions) - } else { - poolOptions = transaction.Apply(poolOptions) - } - - p := pool.NewPool(poolOptions) - - acceptOptions := backends.AcceptOptions{ - SSLMode: bouncer.SSLModeRequire, - SSLConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - Username: user.Name, - Credentials: creds, - Database: dbname, - } - - var addr string - if T.Private != "" { - // private - addr = net.JoinHostPort(cluster.PrivateConnection.Host, strconv.Itoa(cluster.PrivateConnection.Port)) - } else { - addr = net.JoinHostPort(cluster.Connection.Host, strconv.Itoa(cluster.Connection.Port)) - } - - p.AddRecipe(cluster.Name, recipe.NewRecipe(recipe.Options{ - Dialer: recipe.Dialer{ - Network: "tcp", - Address: addr, - AcceptOptions: acceptOptions, - }, - })) - - pools.Add(user.Name, dbname, p) - log.Printf("registered database user=%s database=%s", user.Name, dbname) - - if len(replicas) > 0 { - // change pool credentials - creds2 := creds - creds2.Username = user.Name + "_ro" - poolOptions2 := poolOptions - poolOptions2.Credentials = creds2 - - p2 := pool.NewPool(poolOptions2) - - for _, replica := range replicas { - var replicaAddr string - if T.Private != "" { - // private - replicaAddr = net.JoinHostPort(replica.PrivateConnection.Host, strconv.Itoa(replica.PrivateConnection.Port)) - } else { - replicaAddr = net.JoinHostPort(replica.Connection.Host, strconv.Itoa(replica.Connection.Port)) - } - - p2.AddRecipe(replica.ID, recipe.NewRecipe(recipe.Options{ - Dialer: recipe.Dialer{ - Network: "tcp", - Address: replicaAddr, - AcceptOptions: acceptOptions, - }, - })) - } - - pools.Add(creds2.Username, dbname, p2) - log.Printf("registered database user=%s database=%s", creds2.Username, dbname) - } - } - } - } - - var b flip.Bank - - b.Queue(func() error { - log.Print("listening on :5432") - return gat.ListenAndServe("tcp", ":5432", frontends.AcceptOptions{ - SSLConfig: sslConfig, - AllowedStartupOptions: []strutil.CIString{ - strutil.MakeCIString("client_encoding"), - strutil.MakeCIString("datestyle"), - strutil.MakeCIString("timezone"), - strutil.MakeCIString("standard_conforming_strings"), - strutil.MakeCIString("application_name"), - strutil.MakeCIString("extra_float_digits"), - strutil.MakeCIString("options"), - }, - }, gat.NewKeyedPools(&pools)) - }) - - return b.Wait() -} diff --git a/lib/gat/modules/digitalocean_discovery/discoverer.go b/lib/gat/modules/digitalocean_discovery/discoverer.go new file mode 100644 index 0000000000000000000000000000000000000000..a334f4d5acb45916a84f3ce3c72f99c6c3b8fb15 --- /dev/null +++ b/lib/gat/modules/digitalocean_discovery/discoverer.go @@ -0,0 +1,99 @@ +package digitalocean_discovery + +import ( + "context" + "net" + "strconv" + + "github.com/digitalocean/godo" + + "pggat/lib/gat/modules/discovery" +) + +type Discoverer struct { + config Config + + do *godo.Client +} + +func NewDiscoverer(config Config) (*Discoverer, error) { + return &Discoverer{ + config: config, + do: godo.NewFromToken(config.APIKey), + }, nil +} + +func (T Discoverer) Clusters() ([]discovery.Cluster, error) { + clusters, _, err := T.do.Databases.List(context.Background(), nil) + if err != nil { + return nil, err + } + + res := make([]discovery.Cluster, 0, len(clusters)) + for _, cluster := range clusters { + if cluster.EngineSlug != "pg" { + continue + } + + var primaryAddr string + if T.config.Private { + primaryAddr = net.JoinHostPort(cluster.PrivateConnection.Host, strconv.Itoa(cluster.PrivateConnection.Port)) + } else { + primaryAddr = net.JoinHostPort(cluster.Connection.Host, strconv.Itoa(cluster.Connection.Port)) + } + + c := discovery.Cluster{ + ID: cluster.ID, + Primary: discovery.Endpoint{ + Network: "tcp", + Address: primaryAddr, + }, + Databases: cluster.DBNames, + Users: make([]discovery.User, 0, len(cluster.Users)), + } + + for _, user := range c.Users { + c.Users = append(c.Users, discovery.User{ + Username: user.Username, + Password: user.Password, + }) + } + + replicas, _, err := T.do.Databases.ListReplicas(context.Background(), cluster.ID, nil) + if err != nil { + return nil, err + } + + c.Replicas = make(map[string]discovery.Endpoint, len(replicas)) + for _, replica := range replicas { + var replicaAddr string + if T.config.Private { + replicaAddr = net.JoinHostPort(replica.PrivateConnection.Host, strconv.Itoa(replica.PrivateConnection.Port)) + } else { + replicaAddr = net.JoinHostPort(replica.Connection.Host, strconv.Itoa(replica.Connection.Port)) + } + c.Replicas[replica.ID] = discovery.Endpoint{ + Network: "tcp", + Address: replicaAddr, + } + } + + res = append(res, c) + } + + return res, nil +} + +func (T Discoverer) Added() <-chan discovery.Cluster { + return nil +} + +func (T Discoverer) Updated() <-chan discovery.Cluster { + return nil +} + +func (T Discoverer) Removed() <-chan string { + return nil +} + +var _ discovery.Discoverer = (*Discoverer)(nil) diff --git a/lib/gat/modules/digitalocean_discovery/module.go b/lib/gat/modules/digitalocean_discovery/module.go index 3a1f6a5cec5572a7d10cf1af1b4e1bb110aed5ef..0f6893882b659c9e490b4817f3188cae0a61efd5 100644 --- a/lib/gat/modules/digitalocean_discovery/module.go +++ b/lib/gat/modules/digitalocean_discovery/module.go @@ -1,9 +1,37 @@ package digitalocean_discovery import ( + "crypto/tls" + "time" + + "pggat/lib/bouncer" "pggat/lib/gat/modules/discovery" + "pggat/lib/util/strutil" ) func NewModule(config Config) (*discovery.Module, error) { - return discovery.NewModule(discovery.Config{}) + d, err := NewDiscoverer(config) + if err != nil { + return nil, err + } + + return discovery.NewModule(discovery.Config{ + ReconcilePeriod: 5 * time.Minute, + Discoverer: d, + ServerSSLMode: bouncer.SSLModeRequire, + ServerSSLConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + ServerReconnectInitialTime: 5 * time.Second, + ServerReconnectMaxTime: 5 * time.Second, + ServerIdleTimeout: 5 * time.Minute, + TrackedParameters: []strutil.CIString{ + strutil.MakeCIString("client_encoding"), + strutil.MakeCIString("datestyle"), + strutil.MakeCIString("timezone"), + strutil.MakeCIString("standard_conforming_strings"), + strutil.MakeCIString("application_name"), + }, + PoolMode: "transaction", // TODO(garet) + }) } diff --git a/lib/gat/modules/zalando_operator_discovery/config.go b/lib/gat/modules/zalando_operator_discovery/config.go index 6cafce6e951dbefa49889670f96358d700049542..c4a4945147b5a87353830c0b42ca0d3adfaff80b 100644 --- a/lib/gat/modules/zalando_operator_discovery/config.go +++ b/lib/gat/modules/zalando_operator_discovery/config.go @@ -9,8 +9,6 @@ type Config struct { Namespace string `env:"PGGAT_NAMESPACE" default:"default"` ConfigMapName string `env:"CONFIG_MAP_NAME"` OperatorConfigurationObject string `env:"POSTGRES_OPERATOR_CONFIGURATION_OBJECT"` - TLSCrtFile string `env:"PGGAT_TLS_CRT_FILE" default:"/etc/ssl/certs/pgbouncer.crt"` - TLSKeyFile string `env:"PGGAT_TLS_KEY_FILE" default:"/etc/ssl/certs/pgbouncer.key"` Rest *rest.Config } diff --git a/lib/gat/modules/zalando_operator_discovery/module.go b/lib/gat/modules/zalando_operator_discovery/module.go index 614611c0f0af9bc20172531728a7bff00a584319..e83bc178ab6b901d0ec20cfc06aefd6cd4f12c53 100644 --- a/lib/gat/modules/zalando_operator_discovery/module.go +++ b/lib/gat/modules/zalando_operator_discovery/module.go @@ -5,19 +5,14 @@ import ( "time" "pggat/lib/bouncer" - "pggat/lib/gat" "pggat/lib/gat/modules/discovery" "pggat/lib/util/strutil" ) -type Module struct { - *discovery.Module -} - -func NewModule(config Config) (Module, error) { +func NewModule(config Config) (*discovery.Module, error) { d, err := NewDiscoverer(config) if err != nil { - return Module{}, err + return nil, err } m, err := discovery.NewModule(discovery.Config{ ReconcilePeriod: 1 * time.Minute, @@ -40,32 +35,7 @@ func NewModule(config Config) (Module, error) { PoolMode: "transaction", // TODO(garet) pool mode from operator config }) if err != nil { - return Module{}, err + return nil, err } - return Module{ - Module: m, - }, nil + return m, nil } - -func (T Module) Endpoints() []gat.Endpoint { - return []gat.Endpoint{ - { - Network: "tcp", - Address: ":5432", - AcceptOptions: gat.FrontendAcceptOptions{ - AllowedStartupOptions: []strutil.CIString{ - strutil.MakeCIString("client_encoding"), - strutil.MakeCIString("datestyle"), - strutil.MakeCIString("timezone"), - strutil.MakeCIString("standard_conforming_strings"), - strutil.MakeCIString("application_name"), - strutil.MakeCIString("extra_float_digits"), - strutil.MakeCIString("options"), - }, - // TODO(garet) ssl config - }, - }, - } -} - -var _ gat.Listener = Module{}