good morning!!!!

Skip to content
Snippets Groups Projects
Commit 8e96d902 authored by Garet Halliday's avatar Garet Halliday
Browse files

replicas

parent 325d4806
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@ import (
"tuxpa.in/a/zlog/log"
"pggat/lib/gat/modes/digitalocean_discovery"
"pggat/lib/gat/modes/pgbouncer"
"pggat/lib/gat/modes/zalando"
"pggat/lib/gat/modes/zalando_operator_discovery"
......@@ -49,8 +50,23 @@ func main() {
return
}
if os.Getenv("PGGAT_DO_API_KEY") != "" {
log.Printf("running in digitalocean discovery mode")
conf, err := digitalocean_discovery.Load()
if err != nil {
panic(err)
}
err = conf.ListenAndServe()
if err != nil {
panic(err)
}
return
}
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
log.Printf("Running in zalando operator discovery mode")
log.Printf("running in zalando operator discovery mode")
conf, err := zalando_operator_discovery.Load()
if err != nil {
panic(err)
......
......@@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strconv"
"time"
"gfx.cafe/util/go/gun"
"tuxpa.in/a/zlog/log"
......@@ -17,6 +18,7 @@ import (
"pggat/lib/bouncer/backends/v0"
"pggat/lib/bouncer/frontends/v0"
"pggat/lib/gat"
"pggat/lib/gat/metrics"
"pggat/lib/gat/pool"
"pggat/lib/gat/pool/dialer"
"pggat/lib/gat/pool/pools/transaction"
......@@ -65,13 +67,48 @@ func (T *Config) ListenAndServe() error {
return err
}
var m gat.PoolsMap
var pools gat.PoolsMap
go func() {
var m metrics.Pools
for {
m.Clear()
time.Sleep(1 * time.Minute)
pools.ReadMetrics(&m)
log.Print(m.String())
}
}()
for _, cluster := range r.Databases {
if cluster.Engine != "pg" {
continue
}
replicaDest, err := url.Parse("https://api.digitalocean.com/v2/databases/" + cluster.ID.String() + "/replicas")
if err != nil {
return err
}
replicaReq := http.Request{
Method: http.MethodGet,
URL: replicaDest,
Header: http.Header{
"Content-Type": []string{"application/json"},
"Authorization": []string{"Bearer " + T.APIKey},
},
}
replicaResp, err := http.DefaultClient.Do(&replicaReq)
if err != nil {
return err
}
var replicaR ListReplicasResponse
err = json.NewDecoder(replicaResp.Body).Decode(&replicaR)
if err != nil {
return err
}
for _, user := range cluster.Users {
creds := credentials.Cleartext{
Username: user.Name,
......@@ -80,7 +117,10 @@ func (T *Config) ListenAndServe() error {
for _, dbname := range cluster.DBNames {
p := pool.NewPool(transaction.Apply(pool.Options{
Credentials: creds,
Credentials: creds,
ServerReconnectInitialTime: 5 * time.Second,
ServerReconnectMaxTime: 5 * time.Second,
ServerIdleTimeout: 5 * time.Minute,
TrackedParameters: []strutil.CIString{
strutil.MakeCIString("client_encoding"),
strutil.MakeCIString("datestyle"),
......@@ -104,7 +144,44 @@ func (T *Config) ListenAndServe() error {
},
}))
m.Add(user.Name, dbname, p)
pools.Add(user.Name, dbname, p)
if len(replicaR.Replicas) > 0 {
creds2 := creds
creds2.Username = user.Name + "_ro"
p2 := pool.NewPool(transaction.Apply(pool.Options{
Credentials: creds2,
ServerReconnectInitialTime: 5 * time.Second,
ServerReconnectMaxTime: 5 * time.Second,
ServerIdleTimeout: 5 * time.Minute,
TrackedParameters: []strutil.CIString{
strutil.MakeCIString("client_encoding"),
strutil.MakeCIString("datestyle"),
strutil.MakeCIString("timezone"),
strutil.MakeCIString("standard_conforming_strings"),
strutil.MakeCIString("application_name"),
},
}))
for _, replica := range replicaR.Replicas {
p2.AddRecipe("do", recipe.NewRecipe(recipe.Options{
Dialer: dialer.Net{
Network: "tcp",
Address: net.JoinHostPort(replica.Connection.Host, strconv.Itoa(replica.Connection.Port)),
AcceptOptions: backends.AcceptOptions{
SSLMode: bouncer.SSLModeRequire,
SSLConfig: &tls.Config{
InsecureSkipVerify: true,
},
Credentials: creds,
Database: dbname,
},
},
}))
}
pools.Add(user.Name+"_ro", dbname, p2)
}
}
}
}
......@@ -123,7 +200,7 @@ func (T *Config) ListenAndServe() error {
strutil.MakeCIString("extra_float_digits"),
strutil.MakeCIString("options"),
},
}, &m)
}, &pools)
})
return b.Wait()
......
......@@ -52,3 +52,7 @@ type Database struct {
type ListClustersResponse struct {
Databases []Database `json:"databases"`
}
type ListReplicasResponse struct {
Replicas []Database `json:"replicas"`
}
......@@ -171,6 +171,7 @@ func (T *Server) addPool(name string, userCreds, serverCreds auth.Credentials, d
Credentials: userCreds,
ServerReconnectInitialTime: 5 * time.Second,
ServerReconnectMaxTime: 5 * time.Second,
ServerIdleTimeout: 5 * time.Minute,
TrackedParameters: []strutil.CIString{
strutil.MakeCIString("client_encoding"),
strutil.MakeCIString("datestyle"),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment