diff --git a/go.mod b/go.mod index 7e96ddf8611db4900c970c11705389900cedd2aa..5b9c52983b6015b4363acd208754a4df43bc7675 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf + gfx.cafe/util/go/lambda v0.0.0-20220906200602-98a6b35a1b42 git.tuxpa.in/a/zlog v1.32.0 github.com/BurntSushi/toml v1.2.0 github.com/auxten/postgresql-parser v1.0.1 @@ -37,6 +38,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect + golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index f5afeaa3ba04be4b69fae5329a1531108175ce9b..a66211542b6ffec50500ead79b713358089d615f 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,9 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf h1:ya4IK1D+Kq0DrFdrrZ7tjmp3BgoO4v5sCAeUytR6j1U= gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf/go.mod h1:+DiyiCOBGS9O9Ce4ewHQO3Y59h66WSWAbgZZ2O2AYYw= +gfx.cafe/util/go/lambda v0.0.0-20220906200602-98a6b35a1b42 h1:8mKA+jVj7l3sM/s6CjqF/5DhAIWftIhit0XqkswzkAg= +gfx.cafe/util/go/lambda v0.0.0-20220906200602-98a6b35a1b42/go.mod h1:+qAj+4kl6uABj/3RqR84AwHnm+vYyYoPwLJdTR3GgRc= +git.tuxpa.in/a/lambda v0.0.0-20220903040836-72d2bd6dc070 h1:CuEIvPmhTWKmMMoJnsP2xOoWRFOFB7cZABONrSQaiDc= git.tuxpa.in/a/zlog v1.32.0 h1:KKXbRF1x8kJDSzUoGz/pivo+4TVY6xT5sVtdFZ6traY= git.tuxpa.in/a/zlog v1.32.0/go.mod h1:vUa2Qhu6DLPLqmfRy99FiPqaY2eb6/KQjtMekW3UNnA= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= @@ -108,8 +111,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -253,6 +256,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -330,7 +335,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/lib/gat/client.go b/lib/gat/client.go index ba91d62d4d33ac315fd6ce5fe67551350a8e0fcb..98d548f86bc5e8f73c0a4dfba130979351bff885 100644 --- a/lib/gat/client.go +++ b/lib/gat/client.go @@ -7,37 +7,20 @@ import ( "crypto/rand" "crypto/tls" "fmt" - "gfx.cafe/gfx/pggat/lib/gat/protocol" - "gfx.cafe/gfx/pggat/lib/util/maps" "io" "math/big" "net" "reflect" + "gfx.cafe/gfx/pggat/lib/gat/protocol" + "gfx.cafe/gfx/pggat/lib/util/maps" + "gfx.cafe/gfx/pggat/lib/config" "git.tuxpa.in/a/zlog" "git.tuxpa.in/a/zlog/log" "github.com/ethereum/go-ethereum/common/math" ) -type ClientConnectionType interface { -} - -var _ []ClientConnectionType = []ClientConnectionType{ - &StartupConnection{}, - &TLSConnection{}, - &CancelQueryConnection{}, -} - -type StartupConnection struct { -} - -type TLSConnection struct { -} - -type CancelQueryConnection struct { -} - type ClientKey [2]int type ClientInfo struct { @@ -63,7 +46,6 @@ type Client struct { pid int32 secret_key int32 - csm map[ClientKey]ClientInfo parameters map[string]string stats any // TODO: Reporter admin bool @@ -84,7 +66,6 @@ type Client struct { func NewClient( conf *config.Global, conn net.Conn, - csm map[ClientKey]ClientInfo, admin_only bool, ) *Client { c := &Client{ @@ -92,7 +73,6 @@ func NewClient( r: bufio.NewReader(conn), wr: conn, addr: conn.RemoteAddr(), - csm: csm, conf: conf, } c.log = log.With(). @@ -265,7 +245,7 @@ func (c *Client) Accept(ctx context.Context) error { shard := pool.Shards["0"] serv := shard.Servers[0] - c.server, err = DialServer(context.TODO(), fmt.Sprintf("%s:%d", serv.Host(), serv.Port()), &user, shard.Database, c.csm, nil) + c.server, err = DialServer(context.TODO(), fmt.Sprintf("%s:%d", serv.Host(), serv.Port()), &user, shard.Database, nil) if err != nil { return err } diff --git a/lib/gat/gatling.go b/lib/gat/gatling.go index eaa015a36b811dcc94de7ca75a32b2678ad1f22f..5f8d637e30c2f339f8facc1e891e56a5fd0a4278 100644 --- a/lib/gat/gatling.go +++ b/lib/gat/gatling.go @@ -3,33 +3,119 @@ package gat import ( "context" "fmt" - "git.tuxpa.in/a/zlog/log" "net" + "sync" + + "git.tuxpa.in/a/zlog/log" "gfx.cafe/gfx/pggat/lib/config" + "gfx.cafe/gfx/pggat/lib/gat/protocol" ) type Gatling struct { - c *config.Global + c *config.Global + mu sync.RWMutex + + rout *QueryRouter + + csm map[ClientKey]*ClientInfo + clients map[string]*Client - csm map[ClientKey]ClientInfo chConfig chan *config.Global + + servers map[string]*Server + pools map[string]*ConnectionPool } func NewGatling() *Gatling { - return &Gatling{ - csm: map[ClientKey]ClientInfo{}, + g := &Gatling{ + csm: map[ClientKey]*ClientInfo{}, chConfig: make(chan *config.Global, 1), + servers: map[string]*Server{}, + clients: map[string]*Client{}, + pools: map[string]*ConnectionPool{}, + rout: &QueryRouter{}, } + go g.watchConfigs() + return g } -func (g *Gatling) ApplyConfig(c *config.Global) error { - if g.c == nil { - g.c = c - } else { - // TODO: dynamic config reload - g.c = c +func (g *Gatling) watchConfigs() { + for { + c := <-g.chConfig + err := g.ensureConfig(c) + if err != nil { + log.Println("failed to parse config", err) + } + } +} + +func (g *Gatling) GetClient(s string) (*Client, error) { + g.mu.RLock() + defer g.mu.RUnlock() + srv, ok := g.clients[s] + if !ok { + return nil, fmt.Errorf("client '%s' not found", s) } + return srv, nil +} +func (g *Gatling) GetPool(s string) (*ConnectionPool, error) { + g.mu.RLock() + defer g.mu.RUnlock() + srv, ok := g.pools[s] + if !ok { + return nil, fmt.Errorf("pool '%s' not found", s) + } + return srv, nil +} + +func (g *Gatling) GetServer(s string) (*Server, error) { + g.mu.RLock() + defer g.mu.RUnlock() + srv, ok := g.servers[s] + if !ok { + return nil, fmt.Errorf("server '%s' not found", s) + } + return srv, nil +} + +func (g *Gatling) ensureConfig(c *config.Global) error { + g.mu.Lock() + defer g.mu.Unlock() + + if err := g.ensureGeneral(c); err != nil { + return err + } + if err := g.ensureAdmin(c); err != nil { + return err + } + if err := g.ensureServers(c); err != nil { + return err + } + if err := g.ensurePools(c); err != nil { + return err + } + + return nil +} + +// TODO: all other settings +func (g *Gatling) ensureGeneral(c *config.Global) error { + return nil +} + +// TODO: should configure the admin things, metrics, etc +func (g *Gatling) ensureAdmin(c *config.Global) error { + return nil +} + +// TODO: should connect to and load servers from config +func (g *Gatling) ensureServers(c *config.Global) error { + return nil +} + +// TODO: should connect to & load pools from config +func (g *Gatling) ensurePools(c *config.Global) error { return nil } @@ -55,7 +141,7 @@ func (g *Gatling) ListenAndServe(ctx context.Context) error { // TODO: TLS func (g *Gatling) handleConnection(ctx context.Context, c net.Conn) error { - cl := NewClient(g.c, c, g.csm, false) + cl := NewClient(g.c, c, false) err := cl.Accept(ctx) if err != nil { log.Println(err.Error()) @@ -75,3 +161,42 @@ func (g *Gatling) handleConnection(ctx context.Context, c net.Conn) error { } return nil } + +type QueryRequest struct { + ctx context.Context + raw protocol.Packet + c *Client +} + +func (g *Gatling) handleQuery(ctx context.Context, c *Client, raw protocol.Packet) error { + // 1. analyze query using the query router + role, err := g.rout.InferRole(raw) + if err != nil { + return err + } + pool, err := g.GetPool(g.selectPool(c, role)) + if err != nil { + return err + } + // check config, select a pool + _ = pool + // TODO: we need to add some more information to the connectionpools, like current load, selectors, etc + // perhaps we should just put the server connections in ServerPool and make that responsible for all of that + srv, err := g.GetServer("some_output") + if err != nil { + return err + } + // write the packet or maybe send in a channel to the server + _ = srv + + // send the result back to the client + _ = c + return nil +} + +func (g *Gatling) selectPool(c *Client, role config.ServerRole) string { + g.mu.RLock() + defer g.mu.RUnlock() + // do some filtering and figure out which pool you want to connect this client to, knowing their rold + return "some_pool" +} diff --git a/lib/gat/query_router.go b/lib/gat/query_router.go index 5b527b71d7228077a060526b23fb90d5d9f52015..41eb86c416e720310c46308a91d19cdcb4e4fb31 100644 --- a/lib/gat/query_router.go +++ b/lib/gat/query_router.go @@ -2,29 +2,29 @@ package gat import ( "fmt" - "git.tuxpa.in/a/zlog/log" "regexp" "strings" + "git.tuxpa.in/a/zlog/log" + "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat/protocol" + "gfx.cafe/util/go/lambda" "github.com/auxten/postgresql-parser/pkg/sql/parser" "github.com/auxten/postgresql-parser/pkg/sql/sem/tree" "github.com/auxten/postgresql-parser/pkg/walk" ) -var compiler = regexp.MustCompile - -var CustomSqlRegex = []*regexp.Regexp{ - compiler("(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$"), - compiler("(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$"), - compiler("(?i)^ *SHOW SHARD *;? *$"), - compiler("(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$"), - compiler("(?i)^ *SHOW SERVER ROLE *;? *$"), - compiler("(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$"), - compiler("(?i)^ *SHOW PRIMARY READS *;? *$"), -} +var CustomSqlRegex = lambda.MapV([]string{ + "(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", + "(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$", + "(?i)^ *SHOW SHARD *;? *$", + "(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$", + "(?i)^ *SHOW SERVER ROLE *;? *$", + "(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$", + "(?i)^ *SHOW PRIMARY READS *;? *$", +}, regexp.MustCompile) type Command interface { } @@ -33,7 +33,7 @@ var _ []Command = []Command{ &CommandSetShardingKey{}, &CommandSetShard{}, &CommandShowShard{}, - &CommandSetServerRole{}, + CommandSetServerRole{}, &CommandShowServerRole{}, &CommandSetPrimaryReads{}, &CommandShowPrimaryReads{}, @@ -49,8 +49,6 @@ type CommandShowPrimaryReads struct{} type QueryRouter struct { active_shard int - active_role config.ServerRole - query_parser_enabled bool primary_reads_enabled bool pool_settings PoolSettings } @@ -61,6 +59,7 @@ func (r *QueryRouter) UpdatePoolSettings(pool_settings PoolSettings) { } // / Try to parse a command and execute it. +// TODO: needs to just provide the execution function and so gatling can then plug in the client, server, etc func (r *QueryRouter) try_execute_command(pkt *protocol.Query) (Command, string) { // Only simple protocol supported for commands. // TODO: read msg len @@ -201,17 +200,20 @@ func (r *QueryRouter) try_execute_command(pkt *protocol.Query) (Command, string) // / Try to infer which server to connect to based on the contents of the query. // TODO: implement -func (r *QueryRouter) InferRole(pkt protocol.Packet) error { +func (r *QueryRouter) InferRole(pkt protocol.Packet) (config.ServerRole, error) { var query string + var active_role config.ServerRole switch c := pkt.(type) { case *protocol.Query: - r.active_role = config.SERVERROLE_REPLICA + active_role = config.SERVERROLE_REPLICA case *protocol.Parse: query = c.Fields.Query query = strings.ReplaceAll(query, "$", "") default: - return fmt.Errorf("unknown packet %v", pkt) + return config.SERVERROLE_REPLICA, fmt.Errorf("unknown packet %v", pkt) } + // by default it will hit a replica + active_role = config.SERVERROLE_REPLICA // ok now parse the query wk := &walk.AstWalker{ Fn: func(ctx, node any) (stop bool) { @@ -220,7 +222,7 @@ func (r *QueryRouter) InferRole(pkt protocol.Packet) error { *tree.BeginTransaction, *tree.CommitTransaction, *tree.RollbackTransaction, *tree.SetTransaction, *tree.ShowTransactionStatus, *tree.Delete, *tree.Insert: // - r.active_role = config.SERVERROLE_PRIMARY + active_role = config.SERVERROLE_PRIMARY return true default: _ = n @@ -228,21 +230,15 @@ func (r *QueryRouter) InferRole(pkt protocol.Packet) error { return false }, } - r.active_role = config.SERVERROLE_REPLICA stmts, err := parser.Parse(query) if err != nil { - return err + return config.SERVERROLE_REPLICA, err } _, err = wk.Walk(stmts, nil) if err != nil { - return err + return config.SERVERROLE_REPLICA, err } - return nil -} - -// / Get the current desired server role we should be talking to. -func (r *QueryRouter) Role() config.ServerRole { - return r.active_role + return active_role, nil } // / Get desired shard we should be talking to. @@ -253,7 +249,3 @@ func (r *QueryRouter) Shard() int { func (r *QueryRouter) SetShard(shard int) { r.active_shard = shard } - -func (r *QueryRouter) QueryParserEnabled() bool { - return r.query_parser_enabled -} diff --git a/lib/gat/server.go b/lib/gat/server.go index 024b6dcf8ddaced4e4152b609b0e128334af81b0..2ac8c13e7dba69b6f0d59d81906a1625a865ad8f 100644 --- a/lib/gat/server.go +++ b/lib/gat/server.go @@ -5,13 +5,14 @@ import ( "bytes" "encoding/binary" "fmt" - "gfx.cafe/gfx/pggat/lib/gat/protocol" - "gfx.cafe/gfx/pggat/lib/util/slices" - "gfx.cafe/util/go/bufpool" "io" "net" "time" + "gfx.cafe/gfx/pggat/lib/gat/protocol" + "gfx.cafe/gfx/pggat/lib/util/slices" + "gfx.cafe/util/go/bufpool" + "gfx.cafe/gfx/pggat/lib/auth/sasl" "gfx.cafe/gfx/pggat/lib/auth/scram" "gfx.cafe/gfx/pggat/lib/config" @@ -33,15 +34,11 @@ type Server struct { process_id int32 secret_key int32 - bad bool - + bad bool in_txn bool - csm map[ClientKey]ClientInfo - - connected_at time.Time - - stats any // TODO: stats + connected_at time.Time + stats any // TODO: stats application_name string last_activity time.Time @@ -54,7 +51,7 @@ type Server struct { var ENDIAN = binary.BigEndian -func DialServer(ctx context.Context, addr string, user *config.User, db string, csm map[ClientKey]ClientInfo, stats any) (*Server, error) { +func DialServer(ctx context.Context, addr string, user *config.User, db string, stats any) (*Server, error) { s := &Server{} var err error s.conn, err = net.Dial("tcp", addr) @@ -192,7 +189,11 @@ func (s *Server) connect(ctx context.Context) error { } } -// TODO: implement drop - we should rename to close. +func (s *Server) Close(ctx context.Context) error { + <-ctx.Done() + return nil +} + //impl Drop for Server { // /// Try to do a clean shut down. Best effort because // /// the socket is in non-blocking mode, so it may not be ready