diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index f6a7f6a422cc126f6d084069fe7bfb99da50ad5a..546d5ca13905edb36a1e2aef6c2166d5edbe8f38 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -8,6 +8,7 @@ import ( "tuxpa.in/a/zlog/log" + "pggat/lib/gat/modes/digitalocean_discovery" "pggat/lib/gat/modes/pgbouncer" "pggat/lib/gat/modes/zalando" "pggat/lib/gat/modes/zalando_operator_discovery" @@ -49,8 +50,23 @@ func main() { return } + if os.Getenv("PGGAT_DO_API_KEY") != "" { + log.Printf("running in digitalocean discovery mode") + + conf, err := digitalocean_discovery.Load() + if err != nil { + panic(err) + } + + err = conf.ListenAndServe() + if err != nil { + panic(err) + } + return + } + if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { - log.Printf("Running in zalando operator discovery mode") + log.Printf("running in zalando operator discovery mode") conf, err := zalando_operator_discovery.Load() if err != nil { panic(err) diff --git a/lib/gat/modes/digitalocean_discovery/config.go b/lib/gat/modes/digitalocean_discovery/config.go index efaabc787a83c07e5fdfa4e01f971b8110ecc59b..1550d857678c761bdb4c5f45cb6e2c6141b5efee 100644 --- a/lib/gat/modes/digitalocean_discovery/config.go +++ b/lib/gat/modes/digitalocean_discovery/config.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strconv" + "time" "gfx.cafe/util/go/gun" "tuxpa.in/a/zlog/log" @@ -17,6 +18,7 @@ import ( "pggat/lib/bouncer/backends/v0" "pggat/lib/bouncer/frontends/v0" "pggat/lib/gat" + "pggat/lib/gat/metrics" "pggat/lib/gat/pool" "pggat/lib/gat/pool/dialer" "pggat/lib/gat/pool/pools/transaction" @@ -65,13 +67,48 @@ func (T *Config) ListenAndServe() error { return err } - var m gat.PoolsMap + var pools gat.PoolsMap + + go func() { + var m metrics.Pools + for { + m.Clear() + time.Sleep(1 * time.Minute) + pools.ReadMetrics(&m) + log.Print(m.String()) + } + }() for _, cluster := range r.Databases { if cluster.Engine != "pg" { continue } + replicaDest, err := url.Parse("https://api.digitalocean.com/v2/databases/" + cluster.ID.String() + "/replicas") + if err != nil { + return err + } + + replicaReq := http.Request{ + Method: http.MethodGet, + URL: replicaDest, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + "Authorization": []string{"Bearer " + T.APIKey}, + }, + } + + replicaResp, err := http.DefaultClient.Do(&replicaReq) + if err != nil { + return err + } + + var replicaR ListReplicasResponse + err = json.NewDecoder(replicaResp.Body).Decode(&replicaR) + if err != nil { + return err + } + for _, user := range cluster.Users { creds := credentials.Cleartext{ Username: user.Name, @@ -80,7 +117,10 @@ func (T *Config) ListenAndServe() error { for _, dbname := range cluster.DBNames { p := pool.NewPool(transaction.Apply(pool.Options{ - Credentials: creds, + Credentials: creds, + ServerReconnectInitialTime: 5 * time.Second, + ServerReconnectMaxTime: 5 * time.Second, + ServerIdleTimeout: 5 * time.Minute, TrackedParameters: []strutil.CIString{ strutil.MakeCIString("client_encoding"), strutil.MakeCIString("datestyle"), @@ -104,7 +144,44 @@ func (T *Config) ListenAndServe() error { }, })) - m.Add(user.Name, dbname, p) + pools.Add(user.Name, dbname, p) + + if len(replicaR.Replicas) > 0 { + creds2 := creds + creds2.Username = user.Name + "_ro" + p2 := pool.NewPool(transaction.Apply(pool.Options{ + Credentials: creds2, + ServerReconnectInitialTime: 5 * time.Second, + ServerReconnectMaxTime: 5 * time.Second, + ServerIdleTimeout: 5 * time.Minute, + TrackedParameters: []strutil.CIString{ + strutil.MakeCIString("client_encoding"), + strutil.MakeCIString("datestyle"), + strutil.MakeCIString("timezone"), + strutil.MakeCIString("standard_conforming_strings"), + strutil.MakeCIString("application_name"), + }, + })) + + for _, replica := range replicaR.Replicas { + p2.AddRecipe("do", recipe.NewRecipe(recipe.Options{ + Dialer: dialer.Net{ + Network: "tcp", + Address: net.JoinHostPort(replica.Connection.Host, strconv.Itoa(replica.Connection.Port)), + AcceptOptions: backends.AcceptOptions{ + SSLMode: bouncer.SSLModeRequire, + SSLConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + Credentials: creds, + Database: dbname, + }, + }, + })) + } + + pools.Add(user.Name+"_ro", dbname, p2) + } } } } @@ -123,7 +200,7 @@ func (T *Config) ListenAndServe() error { strutil.MakeCIString("extra_float_digits"), strutil.MakeCIString("options"), }, - }, &m) + }, &pools) }) return b.Wait() diff --git a/lib/gat/modes/digitalocean_discovery/database.go b/lib/gat/modes/digitalocean_discovery/database.go index af84cebe37a8e0042b28889698a9b5ce459d26c5..6063a162f0496a437bd9fffe2670cb73647ff22d 100644 --- a/lib/gat/modes/digitalocean_discovery/database.go +++ b/lib/gat/modes/digitalocean_discovery/database.go @@ -52,3 +52,7 @@ type Database struct { type ListClustersResponse struct { Databases []Database `json:"databases"` } + +type ListReplicasResponse struct { + Replicas []Database `json:"replicas"` +} diff --git a/lib/gat/modes/zalando_operator_discovery/server.go b/lib/gat/modes/zalando_operator_discovery/server.go index b473d060a5fc82ba90369a91b212e1b4883e76a5..f9ecdecbe5f4e92235ab8d7358c53123089a872b 100644 --- a/lib/gat/modes/zalando_operator_discovery/server.go +++ b/lib/gat/modes/zalando_operator_discovery/server.go @@ -171,6 +171,7 @@ func (T *Server) addPool(name string, userCreds, serverCreds auth.Credentials, d Credentials: userCreds, ServerReconnectInitialTime: 5 * time.Second, ServerReconnectMaxTime: 5 * time.Second, + ServerIdleTimeout: 5 * time.Minute, TrackedParameters: []strutil.CIString{ strutil.MakeCIString("client_encoding"), strutil.MakeCIString("datestyle"),