From 1c55610410dc26fa1a59ba364eabd1f5e2f22e06 Mon Sep 17 00:00:00 2001 From: Garet Halliday <ghalliday@gfxlabs.io> Date: Wed, 14 Sep 2022 15:57:21 -0500 Subject: [PATCH] admin database --- README.md | 2 +- lib/config/config.go | 6 +- lib/gat/{gatling => }/admin/admin.go | 254 +++++++++++++++++++- lib/gat/{gatling => }/admin/admin_test.go | 0 lib/gat/gatling/gatling.go | 14 ++ lib/gat/gatling/pool/conn_pool/conn_pool.go | 15 +- lib/gat/gatling/pool/conn_pool/worker.go | 14 ++ lib/gat/gatling/pool/pool.go | 2 +- lib/gat/gatling/sharding/sharding.go | 122 ---------- lib/gat/gatling/sharding/sharding_test.go | 61 ----- lib/gat/interfaces.go | 5 + lib/gat/readme.md | 2 +- 12 files changed, 300 insertions(+), 197 deletions(-) rename lib/gat/{gatling => }/admin/admin.go (73%) rename lib/gat/{gatling => }/admin/admin_test.go (100%) delete mode 100644 lib/gat/gatling/sharding/sharding.go delete mode 100644 lib/gat/gatling/sharding/sharding_test.go diff --git a/README.md b/README.md index d18211e8..566844bd 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ i'll lyk when its done | Statistics | :white_check_mark: | no | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. | | Live configuration reloading | :white_check_mark: | kind of | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. | | Client authentication | :white_check_mark: :wrench: | same as them | MD5 password authentication is supported, SCRAM is on the roadmap; one user is used to connect to Postgres with both SCRAM and MD5 supported. | -| Admin database | :white_check_mark: | no | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. | +| Admin database | :white_check_mark: | :white_check_mark: | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. | # original README diff --git a/lib/config/config.go b/lib/config/config.go index 978a80a0..0a21f9d9 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -26,9 +26,9 @@ const ( type UserRole string const ( - USERROLE_ADMIN ServerRole = "admin" - USERROLE_WRITER ServerRole = "writer" - USERROLE_READER ServerRole = "reader" + USERROLE_ADMIN UserRole = "admin" + USERROLE_WRITER UserRole = "writer" + USERROLE_READER UserRole = "reader" ) func Load(path string) (*Global, error) { diff --git a/lib/gat/gatling/admin/admin.go b/lib/gat/admin/admin.go similarity index 73% rename from lib/gat/gatling/admin/admin.go rename to lib/gat/admin/admin.go index fb5103b6..3296ced7 100644 --- a/lib/gat/gatling/admin/admin.go +++ b/lib/gat/admin/admin.go @@ -1,12 +1,20 @@ package admin import ( - "gfx.cafe/gfx/pggat/lib/gat/protocol" + "fmt" + "gfx.cafe/gfx/pggat/lib/config" + "gfx.cafe/gfx/pggat/lib/gat" ) -const SERVER_VERSION = "0.0.1" +// The admin database, implemented through the gat.Pool interface, allowing it to be added to any existing Gat -func AdminServerInfo() []*protocol.ParameterStatus { +import ( + "context" + "gfx.cafe/gfx/pggat/lib/gat/protocol" + "time" +) + +func getServerInfo(g gat.Gat) []*protocol.ParameterStatus { return []*protocol.ParameterStatus{ { Fields: protocol.FieldsParameterStatus{ @@ -35,7 +43,7 @@ func AdminServerInfo() []*protocol.ParameterStatus { { Fields: protocol.FieldsParameterStatus{ Parameter: "server_version", - Value: SERVER_VERSION, + Value: g.Version(), }, }, { @@ -47,6 +55,244 @@ func AdminServerInfo() []*protocol.ParameterStatus { } } +type Pool struct { + gat gat.Gat + connPool *ConnectionPool +} + +func NewPool(g gat.Gat) *Pool { + out := &Pool{ + gat: g, + } + out.connPool = &ConnectionPool{ + pool: out, + } + return out +} + +func (p *Pool) GetUser(name string) (*config.User, error) { + conf := p.gat.Config() + if name != conf.General.AdminUsername { + return nil, fmt.Errorf("%w: %s", gat.UserNotFound, name) + } + return &config.User{ + Name: conf.General.AdminUsername, + Password: conf.General.AdminPassword, + + Role: config.USERROLE_ADMIN, + PoolSize: 1, + StatementTimeout: 0, + }, nil +} + +func (p *Pool) GetRouter() gat.QueryRouter { + return nil +} + +func (p *Pool) WithUser(name string) (gat.ConnectionPool, error) { + conf := p.gat.Config() + if name != conf.General.AdminUsername { + return nil, fmt.Errorf("%w: %s", gat.UserNotFound, name) + } + return p.connPool, nil +} + +func (p *Pool) ConnectionPools() []gat.ConnectionPool { + return []gat.ConnectionPool{ + p.connPool, + } +} + +func (p *Pool) Stats() gat.PoolStats { + return nil // TODO +} + +func (p *Pool) EnsureConfig(c *config.Pool) { + // TODO +} + +var _ gat.Pool = (*Pool)(nil) + +type ConnectionPool struct { + pool *Pool +} + +func (c *ConnectionPool) GetUser() *config.User { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { + return getServerInfo(c.pool.gat) +} + +func (c *ConnectionPool) Shards() []gat.Shard { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) EnsureConfig(conf *config.Pool) { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) Describe(ctx context.Context, client gat.Client, describe *protocol.Describe) error { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, execute *protocol.Execute) error { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, query string) error { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) Transaction(ctx context.Context, client gat.Client, query string) error { + //TODO implement me + panic("implement me") +} + +func (c *ConnectionPool) CallFunction(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error { + //TODO implement me + panic("implement me") +} + +var _ gat.ConnectionPool = (*ConnectionPool)(nil) + +type Shard struct { +} + +func (s *Shard) Primary() gat.Connection { + //TODO implement me + panic("implement me") +} + +func (s *Shard) Replicas() []gat.Connection { + //TODO implement me + panic("implement me") +} + +func (s *Shard) Choose(role config.ServerRole) gat.Connection { + //TODO implement me + panic("implement me") +} + +var _ gat.Shard = (*Shard)(nil) + +type Connection struct { + pool *Pool +} + +func (c *Connection) GetServerInfo() []*protocol.ParameterStatus { + return getServerInfo(c.pool.gat) +} + +func (c *Connection) GetDatabase() string { + //TODO implement me + panic("implement me") +} + +func (c *Connection) State() string { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Address() string { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Port() int { + //TODO implement me + panic("implement me") +} + +func (c *Connection) LocalAddr() string { + //TODO implement me + panic("implement me") +} + +func (c *Connection) LocalPort() int { + //TODO implement me + panic("implement me") +} + +func (c *Connection) ConnectTime() time.Time { + //TODO implement me + panic("implement me") +} + +func (c *Connection) RequestTime() time.Time { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Wait() time.Duration { + //TODO implement me + panic("implement me") +} + +func (c *Connection) CloseNeeded() bool { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Client() gat.Client { + //TODO implement me + panic("implement me") +} + +func (c *Connection) SetClient(client gat.Client) { + //TODO implement me + panic("implement me") +} + +func (c *Connection) RemotePid() int { + //TODO implement me + panic("implement me") +} + +func (c *Connection) TLS() string { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Describe(client gat.Client, payload *protocol.Describe) error { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Execute(client gat.Client, payload *protocol.Execute) error { + //TODO implement me + panic("implement me") +} + +func (c *Connection) CallFunction(client gat.Client, payload *protocol.FunctionCall) error { + //TODO implement me + panic("implement me") +} + +func (c *Connection) SimpleQuery(ctx context.Context, client gat.Client, payload string) error { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Transaction(ctx context.Context, client gat.Client, payload string) error { + //TODO implement me + panic("implement me") +} + +func (c *Connection) Cancel() error { + //TODO implement me + panic("implement me") +} + +var _ gat.Connection = (*Connection)(nil) + ///// Handle admin client. //func handle_admin<T>( // stream: &mut T, diff --git a/lib/gat/gatling/admin/admin_test.go b/lib/gat/admin/admin_test.go similarity index 100% rename from lib/gat/gatling/admin/admin_test.go rename to lib/gat/admin/admin_test.go diff --git a/lib/gat/gatling/gatling.go b/lib/gat/gatling/gatling.go index 36c80aae..9531c4c9 100644 --- a/lib/gat/gatling/gatling.go +++ b/lib/gat/gatling/gatling.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "gfx.cafe/gfx/pggat/lib/gat/admin" "io" "net" "sync" @@ -36,6 +37,11 @@ func NewGatling(conf *config.Global) *Gatling { pools: make(map[string]gat.Pool), clients: make(map[gat.ClientID]gat.Client), } + // add admin pool + adminPool := admin.NewPool(g) + g.pools["pgbouncer"] = adminPool + g.pools["pggat"] = adminPool + err := g.ensureConfig(conf) if err != nil { log.Println("failed to parse config", err) @@ -54,6 +60,14 @@ func (g *Gatling) watchConfigs() { } } +func (g *Gatling) Version() string { + return "pggat Gatling 0.0.1" +} + +func (g *Gatling) Config() *config.Global { + return g.c +} + func (g *Gatling) GetPool(name string) (gat.Pool, error) { g.mu.RLock() defer g.mu.RUnlock() diff --git a/lib/gat/gatling/pool/conn_pool/conn_pool.go b/lib/gat/gatling/pool/conn_pool/conn_pool.go index 0347019d..2cb82695 100644 --- a/lib/gat/gatling/pool/conn_pool/conn_pool.go +++ b/lib/gat/gatling/pool/conn_pool/conn_pool.go @@ -67,9 +67,11 @@ func (c *ConnectionPool) EnsureConfig(conf *config.Pool) { } sc := s if !reflect.DeepEqual(c.shards[i], &sc) { - // disconnect all conns, switch to new conf - // TODO notify workers that they need to update that shard c.shards[i] = sc + // disconnect all conns using shard i, switch to new conf + for _, w := range c.workers { + w.invalidateShard(i) + } } } } @@ -83,8 +85,13 @@ func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { } func (c *ConnectionPool) Shards() []gat.Shard { - // TODO go through each worker - return nil + var shards []gat.Shard + c.mu.Lock() + defer c.mu.Unlock() + for _, w := range c.workers { + shards = append(shards, w.shards...) + } + return shards } func (c *ConnectionPool) Describe(ctx context.Context, client gat.Client, d *protocol.Describe) error { diff --git a/lib/gat/gatling/pool/conn_pool/worker.go b/lib/gat/gatling/pool/conn_pool/worker.go index ee6f0ee0..592033c7 100644 --- a/lib/gat/gatling/pool/conn_pool/worker.go +++ b/lib/gat/gatling/pool/conn_pool/worker.go @@ -8,6 +8,7 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/conn_pool/shard" "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" + "sync" ) // a single use worker with an embedded connection pool. @@ -17,6 +18,8 @@ type worker struct { w *ConnectionPool shards []gat.Shard + + mu sync.Mutex } // ret urn worker to pool @@ -33,11 +36,22 @@ func (w *worker) fetchShard(n int) bool { for len(w.shards) <= n { w.shards = append(w.shards, nil) } + w.shards[n] = shard.FromConfig(w.w.user, w.w.shards[n]) return true } +func (w *worker) invalidateShard(n int) { + w.mu.Lock() + defer w.mu.Unlock() + + w.shards[n] = nil +} + func (w *worker) anyShard() gat.Shard { + w.mu.Lock() + defer w.mu.Unlock() + for _, s := range w.shards { if s != nil { return s diff --git a/lib/gat/gatling/pool/pool.go b/lib/gat/gatling/pool/pool.go index dcb419f3..85a1d867 100644 --- a/lib/gat/gatling/pool/pool.go +++ b/lib/gat/gatling/pool/pool.go @@ -55,7 +55,7 @@ func (p *Pool) GetUser(name string) (*config.User, error) { defer p.mu.RUnlock() user, ok := p.users[name] if !ok { - return nil, fmt.Errorf("user '%s' not found", name) + return nil, fmt.Errorf("%w: %s", gat.UserNotFound, name) } return &user, nil } diff --git a/lib/gat/gatling/sharding/sharding.go b/lib/gat/gatling/sharding/sharding.go deleted file mode 100644 index 8a471a2e..00000000 --- a/lib/gat/gatling/sharding/sharding.go +++ /dev/null @@ -1,122 +0,0 @@ -package sharding - -const PARTITION_HASH_SEED = 0x7A5B22367996DCFD - -type ShardFunc func(int64) int - -type Sharder struct { - shards int - fn ShardFunc -} - -func NewSharder(shards int, fn ShardFunc) *Sharder { - return &Sharder{ - shards: shards, - fn: fn, - } -} - -//TODO: implement hash functions -// -// fn pg_bigint_hash(&self, key: i64) -> usize { -// let mut lohalf = key as u32; -// let hihalf = (key >> 32) as u32; -// lohalf ^= if key >= 0 { hihalf } else { !hihalf }; -// Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards -// } - -// /// Example of a hashing function based on SHA1. -// fn sha1(&self, key: i64) -> usize { -// let mut hasher = Sha1::new(); - -// hasher.update(&key.to_string().as_bytes()); - -// let result = hasher.finalize(); - -// // Convert the SHA1 hash into hex so we can parse it as a large integer. -// let hex = format!("{:x}", result); - -// // Parse the last 8 bytes as an integer (8 bytes = bigint). -// let key = i64::from_str_radix(&hex[hex.len() - 8..], 16).unwrap() as usize; - -// key % self.shards -// } - -// #[inline] -// fn rot(x: u32, k: u32) -> u32 { -// (x << k) | (x >> (32 - k)) -// } - -// #[inline] -// fn mix(mut a: u32, mut b: u32, mut c: u32) -> (u32, u32, u32) { -// a = a.wrapping_sub(c); -// a ^= Self::rot(c, 4); -// c = c.wrapping_add(b); - -// b = b.wrapping_sub(a); -// b ^= Self::rot(a, 6); -// a = a.wrapping_add(c); - -// c = c.wrapping_sub(b); -// c ^= Self::rot(b, 8); -// b = b.wrapping_add(a); - -// a = a.wrapping_sub(c); -// a ^= Self::rot(c, 16); -// c = c.wrapping_add(b); - -// b = b.wrapping_sub(a); -// b ^= Self::rot(a, 19); -// a = a.wrapping_add(c); - -// c = c.wrapping_sub(b); -// c ^= Self::rot(b, 4); -// b = b.wrapping_add(a); - -// (a, b, c) -// } - -// #[inline] -// fn _final(mut a: u32, mut b: u32, mut c: u32) -> (u32, u32, u32) { -// c ^= b; -// c = c.wrapping_sub(Self::rot(b, 14)); -// a ^= c; -// a = a.wrapping_sub(Self::rot(c, 11)); -// b ^= a; -// b = b.wrapping_sub(Self::rot(a, 25)); -// c ^= b; -// c = c.wrapping_sub(Self::rot(b, 16)); -// a ^= c; -// a = a.wrapping_sub(Self::rot(c, 4)); -// b ^= a; -// b = b.wrapping_sub(Self::rot(a, 14)); -// c ^= b; -// c = c.wrapping_sub(Self::rot(b, 24)); -// (a, b, c) -// } - -// #[inline] -// fn combine(mut a: u64, b: u64) -> u64 { -// a ^= b -// .wrapping_add(0x49a0f4dd15e5a8e3 as u64) -// .wrapping_add(a << 54) -// .wrapping_add(a >> 7); -// a -// } - -// #[inline] -// fn pg_u32_hash(k: u32) -> u64 { -// let mut a: u32 = 0x9e3779b9 as u32 + std::mem::size_of::<u32>() as u32 + 3923095 as u32; -// let mut b = a; -// let c = a; - -// a = a.wrapping_add((PARTITION_HASH_SEED >> 32) as u32); -// b = b.wrapping_add(PARTITION_HASH_SEED as u32); -// let (mut a, b, c) = Self::mix(a, b, c); - -// a = a.wrapping_add(k); - -// let (_a, b, c) = Self::_final(a, b, c); - -// ((b as u64) << 32) | (c as u64) -// } diff --git a/lib/gat/gatling/sharding/sharding_test.go b/lib/gat/gatling/sharding/sharding_test.go deleted file mode 100644 index 356dd2cd..00000000 --- a/lib/gat/gatling/sharding/sharding_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package sharding - -//TODO: convert test - -//#[cfg(test)] -//mod test { -// use super::*; -// -// // See tests/sharding/partition_hash_test_setup.sql -// // The output of those SELECT statements will match this test, -// // confirming that we implemented Postgres BIGINT hashing correctly. -// #[test] -// fn test_pg_bigint_hash() { -// let sharder = Sharder::new(5, ShardingFunction::PgBigintHash); -// -// let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53]; -// -// for v in shard_0 { -// assert_eq!(sharder.shard(v), 0); -// } -// -// let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54]; -// -// for v in shard_1 { -// assert_eq!(sharder.shard(v), 1); -// } -// -// let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35]; -// -// for v in shard_2 { -// assert_eq!(sharder.shard(v), 2); -// } -// -// let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43]; -// -// for v in shard_3 { -// assert_eq!(sharder.shard(v), 3); -// } -// -// let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45]; -// -// for v in shard_4 { -// assert_eq!(sharder.shard(v), 4); -// } -// } -// -// #[test] -// fn test_sha1_hash() { -// let sharder = Sharder::new(12, ShardingFunction::Sha1); -// let ids = vec![ -// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, -// ]; -// let shards = vec![ -// 4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3, -// ]; -// -// for (i, id) in ids.iter().enumerate() { -// assert_eq!(sharder.shard(*id), shards[i]); -// } -// } -//} diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index d989ba25..efe2e3a5 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -2,6 +2,7 @@ package gat import ( "context" + "errors" "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat/protocol" "time" @@ -34,12 +35,16 @@ type Client interface { } type Gat interface { + Version() string + Config() *config.Global GetPool(name string) (Pool, error) Pools() []Pool GetClient(id ClientID) (Client, error) Clients() []Client } +var UserNotFound = errors.New("user not found") + type Pool interface { GetUser(name string) (*config.User, error) GetRouter() QueryRouter diff --git a/lib/gat/readme.md b/lib/gat/readme.md index 992aa5ce..7ea18a8f 100644 --- a/lib/gat/readme.md +++ b/lib/gat/readme.md @@ -1,5 +1,5 @@ ## business logic - +gatling is the main proxy and router, but other pool routers may be added in the future following the same interfaces protocol is code generated! every file in there. don't edit it! -- GitLab