diff --git a/lib/gat/gatling/conn_pool/conn_pool.go b/lib/gat/gatling/conn_pool/conn_pool.go deleted file mode 100644 index cfc26a38950d918f0820b6e1b4bed3b62fff209e..0000000000000000000000000000000000000000 --- a/lib/gat/gatling/conn_pool/conn_pool.go +++ /dev/null @@ -1,220 +0,0 @@ -package conn_pool - -import ( - "context" - "log" - "math/rand" - "reflect" - "runtime" - "sync" - - "gfx.cafe/gfx/pggat/lib/config" - "gfx.cafe/gfx/pggat/lib/gat" - "gfx.cafe/gfx/pggat/lib/gat/gatling/conn_pool/server" - "gfx.cafe/gfx/pggat/lib/gat/protocol" -) - -type connections struct { - primary *server.Server - replicas []*server.Server - - mu sync.Mutex -} - -func (s *connections) choose(role config.ServerRole) *server.Server { - switch role { - case config.SERVERROLE_PRIMARY: - return s.primary - case config.SERVERROLE_REPLICA: - if len(s.replicas) == 0 { - // fallback to primary - return s.primary - } - return s.replicas[rand.Intn(len(s.replicas))] - default: - return nil - } -} - -func (s *connections) Primary() gat.Connection { - return s.primary -} - -func (s *connections) Replicas() []gat.Connection { - out := make([]gat.Connection, len(s.replicas)) - for idx, v := range s.replicas { - out[idx] = v - } - return out -} - -var _ gat.Shard = (*connections)(nil) - -type shard struct { - conf *config.Shard - conns []*connections - - mu sync.Mutex -} - -type ConnectionPool struct { - // the pool connection - c *config.Pool - user *config.User - pool gat.Pool - shards []shard - - // see: https://github.com/golang/go/blob/master/src/runtime/chan.go#L33 - // channels are a thread safe ring buffer implemented via a linked list of goroutines. - // the idea is that goroutines are cheap, and we can afford to have one per pending request. - // there is no real reason to implement a complicated worker pool pattern when well, if we're okay with having a 2-4kb overhead per request, then this is fine. trading space for code complexity - workerPool chan *worker - // the lock for config related things - mu sync.RWMutex -} - -func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *ConnectionPool { - p := &ConnectionPool{ - user: user, - pool: pool, - workerPool: make(chan *worker, 1+runtime.NumCPU()*4), - } - p.EnsureConfig(conf) - for i := 0; i < user.PoolSize; i++ { - p.add_pool() - } - return p -} - -func (c *ConnectionPool) add_pool() { - select { - case c.workerPool <- &worker{ - w: c, - }: - default: - } -} - -func (c *ConnectionPool) EnsureConfig(conf *config.Pool) { - c.mu.Lock() - defer c.mu.Unlock() - c.c = conf - for i, s := range conf.Shards { - for i >= len(c.shards) { - c.shards = append(c.shards, shard{}) - } - sc := s - if !reflect.DeepEqual(c.shards[i].conf, &sc) { - // disconnect all conns, switch to new conf - c.shards[i].conns = nil - c.shards[i].conf = sc - } - } -} - -func (c *ConnectionPool) chooseShard() *shard { - c.mu.RLock() - defer c.mu.RUnlock() - - if len(c.shards) == 0 { - return nil - } - - // TODO better choose func for sharding, this is not deterministic - return &c.shards[rand.Intn(len(c.shards))] -} - -// chooseConnections locks and returns connections for you to use -func (c *ConnectionPool) chooseConnections() *connections { - s := c.chooseShard() - if s == nil { - log.Println("no available shard for query :(") - return nil - } - // lock the shard - s.mu.Lock() - defer s.mu.Unlock() - // TODO ideally this would choose the server based on load, capabilities, etc. for now we just trylock - for _, srv := range s.conns { - if srv.mu.TryLock() { - return srv - } - } - // there are no conns available in the shard, let's make a new connection - // connect to servers in shard config - srvs := &connections{} - for _, srvConf := range s.conf.Servers { - srv, err := server.Dial( - context.Background(), - srvConf.Host, - srvConf.Port, - c.user, s.conf.Database, - srvConf.Username, srvConf.Password, - ) - if err != nil { - log.Println("failed to connect to server", err) - continue - } - switch srvConf.Role { - case config.SERVERROLE_PRIMARY: - srvs.primary = srv - case config.SERVERROLE_REPLICA: - srvs.replicas = append(srvs.replicas, srv) - } - } - if srvs.primary == nil { - return nil - } - srvs.mu.Lock() - s.conns = append(s.conns, srvs) - return srvs -} - -func (c *ConnectionPool) GetUser() *config.User { - return c.user -} - -func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { - srv := c.chooseConnections() - if srv == nil { - return nil - } - defer srv.mu.Unlock() - return srv.primary.GetServerInfo() -} - -func (c *ConnectionPool) Shards() []gat.Shard { - c.mu.RLock() - defer c.mu.RUnlock() - out := make([]gat.Shard, len(c.shards)) - idx := 0 - for i := range c.shards { - for _, v := range c.shards[i].conns { - out[idx] = v - idx += 1 - } - } - return out -} - -func (c *ConnectionPool) Describe(ctx context.Context, client gat.Client, d *protocol.Describe) error { - return (<-c.workerPool).HandleDescribe(ctx, client, d) -} - -func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, e *protocol.Execute) error { - return (<-c.workerPool).HandleExecute(ctx, client, e) -} - -func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, q string) error { - return (<-c.workerPool).HandleSimpleQuery(ctx, client, q) -} - -func (c *ConnectionPool) Transaction(ctx context.Context, client gat.Client, q string) error { - return (<-c.workerPool).HandleTransaction(ctx, client, q) -} - -func (c *ConnectionPool) CallFunction(ctx context.Context, client gat.Client, f *protocol.FunctionCall) error { - return (<-c.workerPool).HandleFunction(ctx, client, f) -} - -var _ gat.ConnectionPool = (*ConnectionPool)(nil) diff --git a/lib/gat/gatling/pool/conn_pool/conn_pool.go b/lib/gat/gatling/pool/conn_pool/conn_pool.go new file mode 100644 index 0000000000000000000000000000000000000000..af35f4e9e5196b9c298bee64af8e67d506db2ca3 --- /dev/null +++ b/lib/gat/gatling/pool/conn_pool/conn_pool.go @@ -0,0 +1,102 @@ +package conn_pool + +import ( + "context" + "reflect" + "runtime" + "sync" + + "gfx.cafe/gfx/pggat/lib/config" + "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/gat/protocol" +) + +type ConnectionPool struct { + // the pool connection + c *config.Pool + user *config.User + pool gat.Pool + shards []*config.Shard + + // see: https://github.com/golang/go/blob/master/src/runtime/chan.go#L33 + // channels are a thread safe ring buffer implemented via a linked list of goroutines. + // the idea is that goroutines are cheap, and we can afford to have one per pending request. + // there is no real reason to implement a complicated worker pool pattern when well, if we're okay with having a 2-4kb overhead per request, then this is fine. trading space for code complexity + workerPool chan *worker + // the lock for config related things + mu sync.RWMutex +} + +func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *ConnectionPool { + p := &ConnectionPool{ + user: user, + pool: pool, + workerPool: make(chan *worker, 1+runtime.NumCPU()*4), + } + p.EnsureConfig(conf) + for i := 0; i < user.PoolSize; i++ { + p.addWorker() + } + return p +} + +func (c *ConnectionPool) addWorker() { + select { + case c.workerPool <- &worker{ + w: c, + }: + default: + } +} + +func (c *ConnectionPool) EnsureConfig(conf *config.Pool) { + c.mu.Lock() + defer c.mu.Unlock() + c.c = conf + for i, s := range conf.Shards { + for i >= len(c.shards) { + c.shards = append(c.shards, s) + } + sc := s + if !reflect.DeepEqual(c.shards[i], &sc) { + // disconnect all conns, switch to new conf + // TODO notify workers that they need to update that shard + c.shards[i] = sc + } + } +} + +func (c *ConnectionPool) GetUser() *config.User { + return c.user +} + +func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { + return (<-c.workerPool).GetServerInfo() +} + +func (c *ConnectionPool) Shards() []gat.Shard { + // TODO go through each worker + return nil +} + +func (c *ConnectionPool) Describe(ctx context.Context, client gat.Client, d *protocol.Describe) error { + return (<-c.workerPool).HandleDescribe(ctx, client, d) +} + +func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, e *protocol.Execute) error { + return (<-c.workerPool).HandleExecute(ctx, client, e) +} + +func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, q string) error { + return (<-c.workerPool).HandleSimpleQuery(ctx, client, q) +} + +func (c *ConnectionPool) Transaction(ctx context.Context, client gat.Client, q string) error { + return (<-c.workerPool).HandleTransaction(ctx, client, q) +} + +func (c *ConnectionPool) CallFunction(ctx context.Context, client gat.Client, f *protocol.FunctionCall) error { + return (<-c.workerPool).HandleFunction(ctx, client, f) +} + +var _ gat.ConnectionPool = (*ConnectionPool)(nil) diff --git a/lib/gat/gatling/conn_pool/server/server.go b/lib/gat/gatling/pool/conn_pool/shard/server/server.go similarity index 100% rename from lib/gat/gatling/conn_pool/server/server.go rename to lib/gat/gatling/pool/conn_pool/shard/server/server.go diff --git a/lib/gat/gatling/conn_pool/server/server_test.go b/lib/gat/gatling/pool/conn_pool/shard/server/server_test.go similarity index 100% rename from lib/gat/gatling/conn_pool/server/server_test.go rename to lib/gat/gatling/pool/conn_pool/shard/server/server_test.go diff --git a/lib/gat/gatling/pool/conn_pool/shard/shard.go b/lib/gat/gatling/pool/conn_pool/shard/shard.go new file mode 100644 index 0000000000000000000000000000000000000000..7a301e76687e6d18371e47545452766bb4a73218 --- /dev/null +++ b/lib/gat/gatling/pool/conn_pool/shard/shard.go @@ -0,0 +1,59 @@ +package shard + +import ( + "context" + "gfx.cafe/gfx/pggat/lib/config" + "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/conn_pool/shard/server" + "math/rand" + "sync" +) + +type Shard struct { + primary gat.Connection + replicas []gat.Connection + + mu sync.Mutex +} + +func FromConfig(user *config.User, conf *config.Shard) *Shard { + out := &Shard{} + for _, s := range conf.Servers { + srv, err := server.Dial(context.TODO(), s.Host, s.Port, user, conf.Database, s.Username, s.Password) + if err != nil { + continue + } + switch s.Role { + case config.SERVERROLE_PRIMARY: + out.primary = srv + default: + out.replicas = append(out.replicas, srv) + } + } + return out +} + +func (s *Shard) Choose(role config.ServerRole) gat.Connection { + switch role { + case config.SERVERROLE_PRIMARY: + return s.primary + case config.SERVERROLE_REPLICA: + if len(s.replicas) == 0 { + return s.primary + } + + return s.replicas[rand.Intn(len(s.replicas))] + default: + return nil + } +} + +func (s *Shard) Primary() gat.Connection { + return s.primary +} + +func (s *Shard) Replicas() []gat.Connection { + return s.replicas +} + +var _ gat.Shard = (*Shard)(nil) diff --git a/lib/gat/gatling/conn_pool/worker.go b/lib/gat/gatling/pool/conn_pool/worker.go similarity index 74% rename from lib/gat/gatling/conn_pool/worker.go rename to lib/gat/gatling/pool/conn_pool/worker.go index 7b07cdfb79c4a618811cd3c290c101f8ff4fa9d9..ee6f0ee0f71ed660043b4cc67572cf3ba195bfc3 100644 --- a/lib/gat/gatling/conn_pool/worker.go +++ b/lib/gat/gatling/pool/conn_pool/worker.go @@ -4,20 +4,19 @@ import ( "context" "fmt" "gfx.cafe/gfx/pggat/lib/config" - "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" - "log" - "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/conn_pool/shard" "gfx.cafe/gfx/pggat/lib/gat/protocol" + "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" ) -type _wp ConnectionPool - // a single use worker with an embedded connection pool. // it wraps a pointer to the connection pool. type worker struct { // the parent connectino pool w *ConnectionPool + + shards []gat.Shard } // ret urn worker to pool @@ -25,6 +24,70 @@ func (w *worker) ret() { w.w.workerPool <- w } +// attempt to connect to a new shard with this worker +func (w *worker) fetchShard(n int) bool { + if n < 0 || n >= len(w.w.shards) { + return false + } + + for len(w.shards) <= n { + w.shards = append(w.shards, nil) + } + w.shards[n] = shard.FromConfig(w.w.user, w.w.shards[n]) + return true +} + +func (w *worker) anyShard() gat.Shard { + for _, s := range w.shards { + if s != nil { + return s + } + } + + // we need to fetch a shard + if w.fetchShard(0) { + return w.shards[0] + } + + return nil +} + +func (w *worker) chooseShardDescribe(client gat.Client, payload *protocol.Describe) gat.Shard { + return w.anyShard() // TODO +} + +func (w *worker) chooseShardExecute(client gat.Client, payload *protocol.Execute) gat.Shard { + return w.anyShard() // TODO +} + +func (w *worker) chooseShardFn(client gat.Client, fn *protocol.FunctionCall) gat.Shard { + return w.anyShard() // TODO +} + +func (w *worker) chooseShardSimpleQuery(client gat.Client, payload string) gat.Shard { + return w.anyShard() // TODO +} + +func (w *worker) chooseShardTransaction(client gat.Client, payload string) gat.Shard { + return w.anyShard() // TODO +} + +func (w *worker) GetServerInfo() []*protocol.ParameterStatus { + defer w.ret() + + shard := w.anyShard() + if shard == nil { + return nil + } + + primary := shard.Primary() + if primary == nil { + return nil + } + + return primary.GetServerInfo() +} + func (w *worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.Describe) error { defer w.ret() @@ -66,7 +129,6 @@ func (w *worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Ex } func (w *worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol.FunctionCall) error { - log.Println("worker selected for fn") defer w.ret() errch := make(chan error) @@ -139,16 +201,14 @@ func (w *worker) unsetCurrentBinding(client gat.Client, server gat.Connection) { } func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, payload *protocol.Describe) error { - c := w.w - srv := c.chooseConnections() + srv := w.chooseShardDescribe(client, payload) if srv == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) } - defer srv.mu.Unlock() // describe the portal // we can use a replica because we are just describing what this query will return, query content doesn't matter // because nothing is actually executed yet - target := srv.choose(config.SERVERROLE_REPLICA) + target := srv.Choose(config.SERVERROLE_REPLICA) if target == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) } @@ -157,12 +217,10 @@ func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, return target.Describe(client, payload) } func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, payload *protocol.Execute) error { - c := w.w - srv := c.chooseConnections() + srv := w.chooseShardExecute(client, payload) if srv == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) } - defer srv.mu.Unlock() // get the query text portal := client.GetPortal(payload.Fields.Name) @@ -183,11 +241,11 @@ func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, p } } - which, err := c.pool.GetRouter().InferRole(ps.Fields.Query) + which, err := w.w.pool.GetRouter().InferRole(ps.Fields.Query) if err != nil { return err } - target := srv.choose(which) + target := srv.Choose(which) w.setCurrentBinding(client, target) defer w.unsetCurrentBinding(client, target) if target == nil { @@ -196,41 +254,36 @@ func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, p return target.Execute(client, payload) } func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error { - c := w.w - srv := c.chooseConnections() + srv := w.chooseShardFn(client, payload) if srv == nil { return fmt.Errorf("fn('%+v') fail: no server", payload) } - defer srv.mu.Unlock() // call the function - target := srv.primary + target := srv.Primary() if target == nil { return fmt.Errorf("fn('%+v') fail: no target ", payload) } w.setCurrentBinding(client, target) defer w.unsetCurrentBinding(client, target) - err := srv.primary.CallFunction(client, payload) + err := target.CallFunction(client, payload) if err != nil { return fmt.Errorf("fn('%+v') fail: %w ", payload, err) } return nil } func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Client, payload string) error { - c := w.w // chose a server - srv := c.chooseConnections() + srv := w.chooseShardSimpleQuery(client, payload) if srv == nil { return fmt.Errorf("call to query '%s' failed", payload) } - // note that the server comes locked. you MUST unlock it - defer srv.mu.Unlock() // run the query on the server - which, err := c.pool.GetRouter().InferRole(payload) + which, err := w.w.pool.GetRouter().InferRole(payload) if err != nil { return fmt.Errorf("error parsing '%s': %w", payload, err) } // configures the server to run with a specific role - target := srv.choose(which) + target := srv.Choose(which) if target == nil { return fmt.Errorf("call to query '%s' failed", payload) } @@ -244,21 +297,18 @@ func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Clie return nil } func (w *worker) z_actually_do_transaction(ctx context.Context, client gat.Client, payload string) error { - c := w.w // chose a server - srv := c.chooseConnections() + srv := w.chooseShardTransaction(client, payload) if srv == nil { return fmt.Errorf("call to transaction '%s' failed", payload) } - // note that the server comes locked. you MUST unlock it - defer srv.mu.Unlock() // run the query on the server - which, err := c.pool.GetRouter().InferRole(payload) + which, err := w.w.pool.GetRouter().InferRole(payload) if err != nil { return fmt.Errorf("error parsing '%s': %w", payload, err) } // configures the server to run with a specific role - target := srv.choose(which) + target := srv.Choose(which) if target == nil { return fmt.Errorf("call to transaction '%s' failed", payload) } diff --git a/lib/gat/gatling/pool/pool.go b/lib/gat/gatling/pool/pool.go index 4484ca313b690100b5371689196aee59c14e68fc..dcb419f384b68446ad12bc3cff97b28d75694b90 100644 --- a/lib/gat/gatling/pool/pool.go +++ b/lib/gat/gatling/pool/pool.go @@ -2,12 +2,12 @@ package pool import ( "fmt" + "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/conn_pool" + "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/query_router" "sync" "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" - "gfx.cafe/gfx/pggat/lib/gat/gatling/conn_pool" - "gfx.cafe/gfx/pggat/lib/gat/gatling/query_router" ) type Pool struct { @@ -15,6 +15,8 @@ type Pool struct { users map[string]config.User connPools map[string]gat.ConnectionPool + stats *Stats + router query_router.QueryRouter mu sync.RWMutex @@ -23,6 +25,7 @@ type Pool struct { func NewPool(conf *config.Pool) *Pool { pool := &Pool{ connPools: make(map[string]gat.ConnectionPool), + stats: newStats(), } pool.EnsureConfig(conf) return pool @@ -84,7 +87,7 @@ func (p *Pool) ConnectionPools() []gat.ConnectionPool { } func (p *Pool) Stats() gat.PoolStats { - return nil // TODO + return p.stats } var _ gat.Pool = (*Pool)(nil) diff --git a/lib/gat/gatling/query_router/query_router.go b/lib/gat/gatling/pool/query_router/query_router.go similarity index 100% rename from lib/gat/gatling/query_router/query_router.go rename to lib/gat/gatling/pool/query_router/query_router.go diff --git a/lib/gat/gatling/query_router/query_router_test.go b/lib/gat/gatling/pool/query_router/query_router_test.go similarity index 100% rename from lib/gat/gatling/query_router/query_router_test.go rename to lib/gat/gatling/pool/query_router/query_router_test.go diff --git a/lib/gat/gatling/pool/stats.go b/lib/gat/gatling/pool/stats.go new file mode 100644 index 0000000000000000000000000000000000000000..0c16b76ab53a213aac99cf950ecea294cecdf7ef --- /dev/null +++ b/lib/gat/gatling/pool/stats.go @@ -0,0 +1,91 @@ +package pool + +import ( + "gfx.cafe/gfx/pggat/lib/gat" + "time" +) + +type Stats struct { + start time.Time + + xactCount int + queryCount int + waitCount int + received int + sent int + xactTime int + queryTime int + waitTime int +} + +func newStats() *Stats { + return &Stats{ + start: time.Now(), + } +} + +func (s *Stats) TotalXactCount() int { + return s.xactCount +} + +func (s *Stats) TotalQueryCount() int { + return s.queryCount +} + +func (s *Stats) TotalReceived() int { + return s.received +} + +func (s *Stats) TotalSent() int { + return s.sent +} + +func (s *Stats) TotalXactTime() int { + return s.xactTime +} + +func (s *Stats) TotalQueryTime() int { + return s.queryTime +} + +func (s *Stats) TotalWaitTime() int { + return s.waitTime +} + +func (s *Stats) totalTime() time.Duration { + return time.Now().Sub(s.start) +} + +func (s *Stats) AvgXactCount() float64 { + seconds := s.totalTime().Seconds() + return float64(s.xactCount) / seconds +} + +func (s *Stats) AvgQueryCount() float64 { + seconds := s.totalTime().Seconds() + return float64(s.queryCount) / seconds +} + +func (s *Stats) AvgRecv() float64 { + seconds := s.totalTime().Seconds() + return float64(s.received) / seconds +} + +func (s *Stats) AvgSent() float64 { + seconds := s.totalTime().Seconds() + return float64(s.sent) / seconds +} + +func (s *Stats) AvgXactTime() float64 { + return float64(s.xactTime) / float64(s.xactCount) +} + +func (s *Stats) AvgQueryTime() float64 { + return float64(s.queryTime) / float64(s.queryCount) +} + +func (s *Stats) AvgWaitTime() float64 { + return float64(s.waitTime) / float64(s.waitCount) +} + +var _ gat.PoolStats = (*Stats)(nil) diff --git a/lib/gat/gatling/stats/stats.go b/lib/gat/gatling/stats/stats.go deleted file mode 100644 index 1e8a0cbb3ce5d09241254fdf08f1d429b00bf429..0000000000000000000000000000000000000000 --- a/lib/gat/gatling/stats/stats.go +++ /dev/null @@ -1,570 +0,0 @@ -package stats - -//TODO: metrics -// let's do this last. we can use the go package for prometheus, its way better than anything we could do -// this would be a good project to teach promethus basics to a junior -// -//use arc_swap::ArcSwap; -///// Statistics and reporting. -//use log::{error, info, trace}; -//use once_cell::sync::Lazy; -//use parking_lot::Mutex; -//use std::collections::HashMap; -//use tokio::sync::mpsc::error::TrySendError; -//use tokio::sync::mpsc::{channel, Receiver, Sender}; -// -//use crate::pool::get_number_of_addresses; -// -//pub static REPORTER: Lazy<ArcSwap<Reporter>> = -// Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); -// -///// Latest stats updated every second; used in SHOW STATS and other admin commands. -//static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> = -// Lazy::new(|| Mutex::new(HashMap::new())); -// -///// Statistics period used for average calculations. -///// 15 seconds. -//static STAT_PERIOD: u64 = 15000; -// -///// The names for the events reported -///// to the statistics collector. -//#[derive(Debug, Clone, Copy)] -//enum EventName { -// CheckoutTime, -// SimpleQuery, -// Transaction, -// DataSent, -// DataReceived, -// ClientWaiting, -// ClientActive, -// ClientIdle, -// ClientDisconnecting, -// ServerActive, -// ServerIdle, -// ServerTested, -// ServerLogin, -// ServerDisconnecting, -// UpdateStats, -// UpdateAverages, -//} -// -///// Event data sent to the collector -///// from clients and servers. -//#[derive(Debug, Clone)] -//pub struct Event { -// /// The name of the event being reported. -// name: EventName, -// -// /// The value being reported. Meaning differs based on event name. -// value: i64, -// -// /// The client or server connection reporting the event. -// process_id: i32, -// -// /// The server the client is connected to. -// address_id: usize, -//} -// -///// The statistics reporter. An instance is given -///// to each possible source of statistics, -///// e.g. clients, servers, connection pool. -//#[derive(Clone, Debug)] -//pub struct Reporter { -// tx: Sender<Event>, -//} -// -//impl Default for Reporter { -// fn default() -> Reporter { -// let (tx, _rx) = channel(5); -// Reporter { tx } -// } -//} -// -//impl Reporter { -// /// Create a new Reporter instance. -// pub fn new(tx: Sender<Event>) -> Reporter { -// Reporter { tx: tx } -// } -// -// /// Send statistics to the task keeping track of stats. -// fn send(&self, event: Event) { -// let name = event.name; -// let result = self.tx.try_send(event); -// -// match result { -// Ok(_) => trace!( -// "{:?} event reported successfully, capacity: {}", -// name, -// self.tx.capacity() -// ), -// -// Err(err) => match err { -// TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), -// TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), -// }, -// }; -// } -// -// /// Report a query executed by a client against -// /// a server identified by the `address_id`. -// pub fn query(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::SimpleQuery, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event); -// } -// -// /// Report a transaction executed by a client against -// /// a server identified by the `address_id`. -// pub fn transaction(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::Transaction, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Report data sent to a server identified by `address_id`. -// /// The `amount` is measured in bytes. -// pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::DataSent, -// value: amount as i64, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Report data received from a server identified by `address_id`. -// /// The `amount` is measured in bytes. -// pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::DataReceived, -// value: amount as i64, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Time spent waiting to get a healthy connection from the pool -// /// for a server identified by `address_id`. -// /// Measured in milliseconds. -// pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::CheckoutTime, -// value: ms as i64, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a client identified by `process_id` waiting for a connection -// /// to a server identified by `address_id`. -// pub fn client_waiting(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ClientWaiting, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a client identified by `process_id` is done waiting for a connection -// /// to a server identified by `address_id` and is about to query the server. -// pub fn client_active(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ClientActive, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a client identified by `process_id` is done querying the server -// /// identified by `address_id` and is no longer active. -// pub fn client_idle(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ClientIdle, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a client identified by `process_id` is disconecting from the pooler. -// /// The last server it was connected to is identified by `address_id`. -// pub fn client_disconnecting(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ClientDisconnecting, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a server connection identified by `process_id` for -// /// a configured server identified by `address_id` is actively used -// /// by a client. -// pub fn server_active(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ServerActive, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a server connection identified by `process_id` for -// /// a configured server identified by `address_id` is no longer -// /// actively used by a client and is now idle. -// pub fn server_idle(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ServerIdle, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a server connection identified by `process_id` for -// /// a configured server identified by `address_id` is attempting -// /// to login. -// pub fn server_login(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ServerLogin, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a server connection identified by `process_id` for -// /// a configured server identified by `address_id` is being -// /// tested before being given to a client. -// pub fn server_tested(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ServerTested, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -// -// /// Reports a server connection identified by `process_id` is disconecting from the pooler. -// /// The configured server it was connected to is identified by `address_id`. -// pub fn server_disconnecting(&self, process_id: i32, address_id: usize) { -// let event = Event { -// name: EventName::ServerDisconnecting, -// value: 1, -// process_id: process_id, -// address_id: address_id, -// }; -// -// self.send(event) -// } -//} -// -///// The statistics collector which is receiving statistics -///// from clients, servers, and the connection pool. There is -///// only one collector (kind of like a singleton). -///// The collector can trigger events on its own, e.g. -///// it updates aggregates every second and averages every -///// 15 seconds. -//pub struct Collector { -// rx: Receiver<Event>, -// tx: Sender<Event>, -//} -// -//impl Collector { -// /// Create a new collector instance. There should only be one instance -// /// at a time. This is ensured by mpsc which allows only one receiver. -// pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector { -// Collector { rx, tx } -// } -// -// /// The statistics collection handler. It will collect statistics -// /// for `address_id`s starting at 0 up to `addresses`. -// pub async fn collect(&mut self) { -// info!("Events reporter started"); -// -// let stats_template = HashMap::from([ -// ("total_query_count", 0), -// ("total_query_time", 0), -// ("total_received", 0), -// ("total_sent", 0), -// ("total_xact_count", 0), -// ("total_xact_time", 0), -// ("total_wait_time", 0), -// ("avg_query_count", 0), -// ("avg_query_time", 0), -// ("avg_recv", 0), -// ("avg_sent", 0), -// ("avg_xact_count", 0), -// ("avg_xact_time", 0), -// ("avg_wait_time", 0), -// ("maxwait_us", 0), -// ("maxwait", 0), -// ("cl_waiting", 0), -// ("cl_active", 0), -// ("cl_idle", 0), -// ("sv_idle", 0), -// ("sv_active", 0), -// ("sv_login", 0), -// ("sv_tested", 0), -// ]); -// -// let mut stats = HashMap::new(); -// -// // Stats saved after each iteration of the flush event. Used in calculation -// // of averages in the last flush period. -// let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new(); -// -// // Track which state the client and server are at any given time. -// let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new(); -// -// // Flush stats to StatsD and calculate averages every 15 seconds. -// let tx = self.tx.clone(); -// tokio::task::spawn(async move { -// let mut interval = -// tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15)); -// loop { -// interval.tick().await; -// let address_count = get_number_of_addresses(); -// for address_id in 0..address_count { -// let _ = tx.try_send(Event { -// name: EventName::UpdateStats, -// value: 0, -// process_id: -1, -// address_id: address_id, -// }); -// } -// } -// }); -// -// let tx = self.tx.clone(); -// tokio::task::spawn(async move { -// let mut interval = -// tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); -// loop { -// interval.tick().await; -// let address_count = get_number_of_addresses(); -// for address_id in 0..address_count { -// let _ = tx.try_send(Event { -// name: EventName::UpdateAverages, -// value: 0, -// process_id: -1, -// address_id: address_id, -// }); -// } -// } -// }); -// -// // The collector loop -// loop { -// let stat = match self.rx.recv().await { -// Some(stat) => stat, -// None => { -// info!("Events collector is shutting down"); -// return; -// } -// }; -// -// let stats = stats -// .entry(stat.address_id) -// .or_insert(stats_template.clone()); -// let client_server_states = client_server_states -// .entry(stat.address_id) -// .or_insert(HashMap::new()); -// let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new()); -// -// // Some are counters, some are gauges... -// match stat.name { -// EventName::SimpleQuery => { -// let counter = stats.entry("total_query_count").or_insert(0); -// *counter += stat.value; -// } -// -// EventName::Transaction => { -// let counter = stats.entry("total_xact_count").or_insert(0); -// *counter += stat.value; -// } -// -// EventName::DataSent => { -// let counter = stats.entry("total_sent").or_insert(0); -// *counter += stat.value; -// } -// -// EventName::DataReceived => { -// let counter = stats.entry("total_received").or_insert(0); -// *counter += stat.value; -// } -// -// EventName::CheckoutTime => { -// let counter = stats.entry("total_wait_time").or_insert(0); -// *counter += stat.value; -// -// let counter = stats.entry("maxwait_us").or_insert(0); -// let mic_part = stat.value % 1_000_000; -// -// // Report max time here -// if mic_part > *counter { -// *counter = mic_part; -// } -// -// let counter = stats.entry("maxwait").or_insert(0); -// let seconds = *counter / 1_000_000; -// -// if seconds > *counter { -// *counter = seconds; -// } -// } -// -// EventName::ClientActive -// | EventName::ClientWaiting -// | EventName::ClientIdle -// | EventName::ServerActive -// | EventName::ServerIdle -// | EventName::ServerTested -// | EventName::ServerLogin => { -// client_server_states.insert(stat.process_id, stat.name); -// } -// -// EventName::ClientDisconnecting | EventName::ServerDisconnecting => { -// client_server_states.remove(&stat.process_id); -// } -// -// EventName::UpdateStats => { -// // Calculate connection states -// for (_, state) in client_server_states.iter() { -// match state { -// EventName::ClientActive => { -// let counter = stats.entry("cl_active").or_insert(0); -// *counter += 1; -// } -// -// EventName::ClientWaiting => { -// let counter = stats.entry("cl_waiting").or_insert(0); -// *counter += 1; -// } -// -// EventName::ServerIdle => { -// let counter = stats.entry("sv_idle").or_insert(0); -// *counter += 1; -// } -// -// EventName::ServerActive => { -// let counter = stats.entry("sv_active").or_insert(0); -// *counter += 1; -// } -// -// EventName::ServerTested => { -// let counter = stats.entry("sv_tested").or_insert(0); -// *counter += 1; -// } -// -// EventName::ServerLogin => { -// let counter = stats.entry("sv_login").or_insert(0); -// *counter += 1; -// } -// -// EventName::ClientIdle => { -// let counter = stats.entry("cl_idle").or_insert(0); -// *counter += 1; -// } -// -// _ => unreachable!(), -// }; -// } -// -// // Update latest stats used in SHOW STATS -// let mut guard = LATEST_STATS.lock(); -// for (key, value) in stats.iter() { -// let entry = guard.entry(stat.address_id).or_insert(HashMap::new()); -// entry.insert(key.to_string(), value.clone()); -// } -// -// // These are re-calculated every iteration of the loop, so we don't want to add values -// // from the last iteration. -// for stat in &[ -// "cl_active", -// "cl_waiting", -// "cl_idle", -// "sv_idle", -// "sv_active", -// "sv_tested", -// "sv_login", -// "maxwait", -// "maxwait_us", -// ] { -// stats.insert(stat, 0); -// } -// } -// -// EventName::UpdateAverages => { -// // Calculate averages -// for stat in &[ -// "avg_query_count", -// "avg_query_time", -// "avg_recv", -// "avg_sent", -// "avg_xact_time", -// "avg_xact_count", -// "avg_wait_time", -// ] { -// let total_name = match stat { -// &"avg_recv" => "total_received".to_string(), // Because PgBouncer is saving bytes -// _ => stat.replace("avg_", "total_"), -// }; -// -// let old_value = old_stats.entry(total_name.clone()).or_insert(0); -// let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); -// let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second -// -// stats.insert(stat, avg); -// *old_value = new_value; -// } -// } -// }; -// } -// } -//} -// -///// Get a snapshot of statistics. Updated once a second -///// by the `Collector`. -//pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> { -// LATEST_STATS.lock().clone() -//} -// -///// Get the statistics reporter used to update stats across the pools/clients. -//pub fn get_reporter() -> Reporter { -// (*(*REPORTER.load())).clone() -//} diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index 1646cabf44d9f1c74e224a6c8d276ddc408753cc..d989ba25797651e1daf3bde9bad89dc4dde4971a 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -67,20 +67,20 @@ type PoolStats interface { TotalQueryTime() int // Total time spent waiting (in microseconds) TotalWaitTime() int - // Average amount of transactions per second (in microseconds) - AvgXactCount() int + // Average amount of transactions per second + AvgXactCount() float64 // Average amount of queries per second - AvgQueryCount() + AvgQueryCount() float64 // Average bytes received per second - AvgRecv() + AvgRecv() float64 // Average bytes sent per second - AvgSent() + AvgSent() float64 // Average time transactions take (in microseconds) - AvgXactTime() + AvgXactTime() float64 // Average time queries take (in microseconds) - AvgQueryTime() + AvgQueryTime() float64 // Average time waiting for work (in microseconds) - AvgWaitTime() + AvgWaitTime() float64 } type QueryRouter interface { @@ -108,9 +108,12 @@ type ConnectionPool interface { type Shard interface { Primary() Connection Replicas() []Connection + Choose(role config.ServerRole) Connection } type Connection interface { + GetServerInfo() []*protocol.ParameterStatus + GetDatabase() string State() string Address() string @@ -126,6 +129,13 @@ type Connection interface { RemotePid() int TLS() string + // actions + Describe(client Client, payload *protocol.Describe) error + Execute(client Client, payload *protocol.Execute) error + CallFunction(client Client, payload *protocol.FunctionCall) error + SimpleQuery(ctx context.Context, client Client, payload string) error + Transaction(ctx context.Context, client Client, payload string) error + // Cancel the current running query Cancel() error }