diff --git a/README.md b/README.md index cdfbd30e82f0bae82c549b2ffc7d80a574404f65..6437612f51515a0654ea38357606d10656b2a2f2 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ i'll lyk when its done | `COPY` support | :white_check_mark: | :white_check_mark: | Both `COPY TO` and `COPY FROM` are supported. | | Query cancellation | :white_check_mark: | :white_check_mark: | Supported both in transaction and session pooling modes. | | Load balancing of read queries | :white_check_mark: | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | -| Sharding | :white_check_mark: | no | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | +| Sharding | :white_check_mark: | :white_check_mark: | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | | Failover | :white_check_mark: | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | | Statistics | :white_check_mark: | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. | | Live configuration reloading | :white_check_mark: | kind of | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. | diff --git a/go.mod b/go.mod index a2619c25dd6e8900eeab224602d851b900cfe271..6779a323b2c3443d4217a64942e5d36725f77f5c 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,13 @@ go 1.19 require ( gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf + gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9 gfx.cafe/util/go/lambda v0.0.0-20220906200602-98a6b35a1b42 git.tuxpa.in/a/zlog v1.32.0 github.com/BurntSushi/toml v1.2.0 github.com/auxten/postgresql-parser v1.0.1 - github.com/ethereum/go-ethereum v1.10.23 github.com/iancoleman/strcase v0.2.0 + github.com/looplab/fsm v0.3.0 github.com/xdg-go/scram v1.1.1 golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b gopkg.in/yaml.v2 v2.4.0 @@ -17,7 +18,6 @@ require ( ) require ( - gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9 // indirect github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 // indirect github.com/cockroachdb/apd v1.1.1-0.20181017181144-bced77f817b4 // indirect github.com/cockroachdb/errors v1.8.2 // indirect @@ -33,13 +33,12 @@ require ( github.com/kr/pretty v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/lib/pq v1.9.0 // indirect - github.com/looplab/fsm v0.3.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/theckman/go-fsm v0.0.2 // indirect + github.com/stretchr/testify v1.7.2 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect diff --git a/go.sum b/go.sum index 5c5c65ead0e3f3110fd772aebe66d45edce88795..ad65268b1261139e16a7f64617521287ae2977e2 100644 --- a/go.sum +++ b/go.sum @@ -62,8 +62,6 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= -github.com/ethereum/go-ethereum v1.10.23 h1:Xk8XAT4/UuqcjMLIMF+7imjkg32kfVFKoeyQDaO2yWM= -github.com/ethereum/go-ethereum v1.10.23/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= @@ -230,8 +228,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= -github.com/theckman/go-fsm v0.0.2 h1:KdFn95Si2ATAGWzExxCONuwZiY3caSPITlMBTw4Y3VI= -github.com/theckman/go-fsm v0.0.2/go.mod h1:hN13NqBn5Mf9MbGIw1ToT3dMtRa36//yr75uRI2vha8= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= @@ -260,7 +257,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= diff --git a/lib/gat/admin/admin.go b/lib/gat/admin/admin.go index c2d659d7146da8027e0d12c8150d03b6039ad3af..015aa388f3056cedffc827c9514df5ec8e770d2d 100644 --- a/lib/gat/admin/admin.go +++ b/lib/gat/admin/admin.go @@ -6,7 +6,7 @@ import ( "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/parse" - "strings" + "gfx.cafe/gfx/pggat/lib/util/cmux" "time" ) @@ -77,6 +77,106 @@ func getAdminUser(g gat.Gat) *config.User { type Pool struct { gat gat.Gat connPool *ConnectionPool + + r cmux.Mux[gat.Client, error] +} + +func NewPool(g gat.Gat) *Pool { + out := &Pool{ + gat: g, + } + out.connPool = &ConnectionPool{ + pool: out, + } + out.r = cmux.NewMapMux[gat.Client, error]() + out.r.Register([]string{"show", "stats_totals"}, func(client gat.Client, _ []string) error { + return out.showStats(client, true, false) + }) + out.r.Register([]string{"show", "stats_averages"}, func(client gat.Client, _ []string) error { + return out.showStats(client, false, true) + }) + out.r.Register([]string{"show", "stats"}, func(client gat.Client, _ []string) error { + return out.showStats(client, true, true) + }) + out.r.Register([]string{"show", "totals"}, func(client gat.Client, _ []string) error { + return out.showTotals(client) + }) + out.r.Register([]string{"show", "servers"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "clients"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "pools"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "lists"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "users"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "databases"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "fds"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "sockets"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "active_sockets"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "config"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "mem"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "dns_hosts"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "dns_zones"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"show", "version"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"pause"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"disable"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"enable"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"reconnect"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"kill"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"suspend"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"resume"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"shutdown"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"reload"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"wait_close"}, func(_ gat.Client, _ []string) error { + return nil + }) + out.r.Register([]string{"set"}, func(_ gat.Client, _ []string) error { + return nil + }) + return out } func (p *Pool) showStats(client gat.Client, totals, averages bool) error { @@ -423,16 +523,6 @@ func (p *Pool) showTotals(client gat.Client) error { return client.Send(row) } -func NewPool(g gat.Gat) *Pool { - out := &Pool{ - gat: g, - } - out.connPool = &ConnectionPool{ - pool: out, - } - return out -} - func (p *Pool) GetUser(name string) *config.User { u := getAdminUser(p.gat) if name != u.Name { @@ -507,54 +597,15 @@ func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, que if err != nil { return err } + if len(parsed) == 0 { + return client.Send(new(protocol.EmptyQueryResponse)) + } for _, cmd := range parsed { - switch strings.ToLower(cmd.Command) { - case "show": - if len(cmd.Arguments) < 1 { - return errors.New("usage: show [item]") - } - - switch strings.ToLower(cmd.Arguments[0]) { - case "stats": - err = c.pool.showStats(client, true, true) - case "stats_totals": - err = c.pool.showStats(client, true, false) - case "stats_averages": - err = c.pool.showStats(client, false, true) - case "totals": - err = c.pool.showTotals(client) - case "servers": - case "clients": - case "pools": - case "lists": - case "users": - case "databases": - case "fds": - case "sockets", "active_sockets": - case "config": - case "mem": - case "dns_hosts": - case "dns_zones": - case "version": - - default: - return errors.New("unknown command") - } - case "pause": - case "disable": - case "enable": - case "reconnect": - case "kill": - case "suspend": - case "resume": - case "shutdown": - case "reload": - case "wait_close": - case "set": - default: + var matched bool + err, matched = c.pool.r.Call(client, append([]string{cmd.Command}, cmd.Arguments...)) + if !matched { return errors.New("unknown command") } - if err != nil { return err } diff --git a/lib/gat/gatling/client/client.go b/lib/gat/gatling/client/client.go index 8b06cc5ad4da10ee733f8a0624453166effd5c11..017879132e9da461e9b190ccb65d216fd62c0be7 100644 --- a/lib/gat/gatling/client/client.go +++ b/lib/gat/gatling/client/client.go @@ -81,6 +81,10 @@ type Client struct { poolName string username string + shardingKey string + preferredShard int + hasPreferredShard bool + gatling gat.Gat currentConn gat.Connection statements map[string]*protocol.Parse @@ -128,6 +132,27 @@ func (c *Client) GetConnectionPool() gat.ConnectionPool { return c.server } +func (c *Client) SetRequestedShard(shard int) { + c.preferredShard = shard + c.hasPreferredShard = true +} + +func (c *Client) UnsetRequestedShard() { + c.hasPreferredShard = false +} + +func (c *Client) GetRequestedShard() (int, bool) { + return c.preferredShard, c.hasPreferredShard +} + +func (c *Client) SetShardingKey(key string) { + c.shardingKey = key +} + +func (c *Client) GetShardingKey() string { + return c.shardingKey +} + func NewClient( gatling gat.Gat, conf *config.Global, @@ -496,64 +521,63 @@ func (c *Client) handle_query(ctx context.Context, q *protocol.Query) error { // we can handle empty queries here if len(parsed) == 0 { - err = c.Send(&protocol.EmptyQueryResponse{}) - if err != nil { - return err - } - ready := new(protocol.ReadyForQuery) - ready.Fields.Status = 'I' - return c.Send(ready) + return c.Send(&protocol.EmptyQueryResponse{}) } - prev := 0 - transaction := false + transaction := -1 for idx, cmd := range parsed { - switch strings.ToUpper(cmd.Command) { - case "START": - if len(cmd.Arguments) < 1 || strings.ToUpper(cmd.Arguments[0]) != "TRANSACTION" { - break - } - fallthrough - case "BEGIN": - // begin transaction - if prev != cmd.Index { - query := q.Fields.Query[prev:cmd.Index] - c.startRequest() - err = c.handle_simple_query(ctx, query) - prev = cmd.Index - if err != nil { - return err + var next int + if idx+1 >= len(parsed) { + next = len(q.Fields.Query) + } else { + next = parsed[idx+1].Index + } + + cmdUpper := strings.ToUpper(cmd.Command) + + // not in transaction + if transaction == -1 { + switch cmdUpper { + case "START": + if len(cmd.Arguments) < 1 || strings.ToUpper(cmd.Arguments[0]) != "TRANSACTION" { + break } + fallthrough + case "BEGIN": + transaction = cmd.Index } - transaction = true - case "END": - // end transaction block - var query string - if idx+1 >= len(parsed) { - query = q.Fields.Query[prev:] - } else { - query = q.Fields.Query[prev:parsed[idx+1].Index] + } + + if transaction == -1 { + // this is a simple query + c.startRequest() + err = c.handle_simple_query(ctx, q.Fields.Query[cmd.Index:next]) + if err != nil { + return err } - if query != "" { + } else { + // this command is part of a transaction + switch cmdUpper { + case "END": c.startRequest() - err = c.handle_transaction(ctx, query) - prev = cmd.Index + err = c.handle_transaction(ctx, q.Fields.Query[transaction:next]) if err != nil { return err } + transaction = -1 } - transaction = false - } } - query := q.Fields.Query[prev:] - c.startRequest() - if transaction { - err = c.handle_transaction(ctx, query) - } else { - err = c.handle_simple_query(ctx, query) + + if transaction != -1 { + c.startRequest() + err = c.handle_transaction(ctx, q.Fields.Query[transaction:]) + if err != nil { + return err + } } - return err + + return nil } func (c *Client) handle_simple_query(ctx context.Context, q string) error { diff --git a/lib/gat/gatling/pool/conn_pool/conn_pool.go b/lib/gat/gatling/pool/conn_pool/conn_pool.go index f29d4391c3fe197390e01e7a53d94f13a6b9ccb2..8f9536c85ff9bc1139c8746e27b5fc5759c61915 100644 --- a/lib/gat/gatling/pool/conn_pool/conn_pool.go +++ b/lib/gat/gatling/pool/conn_pool/conn_pool.go @@ -80,6 +80,14 @@ func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, e *prot } func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, q string) error { + // see if the pool router can handle it + handled, err := c.pool.GetRouter().TryHandle(client, q) + if err != nil { + return err + } + if handled { + return nil + } return c.getWorker().HandleSimpleQuery(ctx, client, q) } diff --git a/lib/gat/gatling/pool/conn_pool/worker.go b/lib/gat/gatling/pool/conn_pool/worker.go index 8f2a4b64b678f1361b8fd5fc9c9c1722e30d7db6..31036b45b76e458170dff9d41111101b83db83b9 100644 --- a/lib/gat/gatling/pool/conn_pool/worker.go +++ b/lib/gat/gatling/pool/conn_pool/worker.go @@ -8,6 +8,7 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/gatling/pool/conn_pool/shard" "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" + "math/rand" "sync" "time" ) @@ -51,51 +52,41 @@ func (w *worker) invalidateShard(n int) { w.shards[n] = nil } -func (w *worker) anyShard() gat.Shard { +func (w *worker) chooseShard(client gat.Client) gat.Shard { w.mu.Lock() defer w.mu.Unlock() conf := w.w.c.Load() - for idx, s := range w.shards { - if s != nil { - s.EnsureConfig(conf.Shards[idx]) - return s + preferred := rand.Intn(len(conf.Shards)) + if client != nil { + if p, ok := client.GetRequestedShard(); ok { + preferred = p % len(conf.Shards) } + + key := client.GetShardingKey() + if key != "" { + // do sharding function on key TODO + } + } + + if preferred < len(w.shards) && w.shards[preferred] != nil { + w.shards[preferred].EnsureConfig(conf.Shards[preferred]) + return w.shards[preferred] } // we need to fetch a shard - if w.fetchShard(0) { - return w.shards[0] + if w.fetchShard(preferred) { + return w.shards[preferred] } return nil } -func (w *worker) chooseShardDescribe(client gat.Client, payload *protocol.Describe) gat.Shard { - return w.anyShard() // TODO -} - -func (w *worker) chooseShardExecute(client gat.Client, payload *protocol.Execute) gat.Shard { - return w.anyShard() // TODO -} - -func (w *worker) chooseShardFn(client gat.Client, fn *protocol.FunctionCall) gat.Shard { - return w.anyShard() // TODO -} - -func (w *worker) chooseShardSimpleQuery(client gat.Client, payload string) gat.Shard { - return w.anyShard() // TODO -} - -func (w *worker) chooseShardTransaction(client gat.Client, payload string) gat.Shard { - return w.anyShard() // TODO -} - func (w *worker) GetServerInfo() []*protocol.ParameterStatus { defer w.ret() - s := w.anyShard() + s := w.chooseShard(nil) if s == nil { return nil } @@ -231,7 +222,7 @@ func (w *worker) unsetCurrentBinding(client gat.Client, server gat.Connection) { } func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, payload *protocol.Describe) error { - srv := w.chooseShardDescribe(client, payload) + srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) } @@ -247,7 +238,7 @@ func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, return target.Describe(client, payload) } func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, payload *protocol.Execute) error { - srv := w.chooseShardExecute(client, payload) + srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) } @@ -284,7 +275,7 @@ func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, p return target.Execute(client, payload) } func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error { - srv := w.chooseShardFn(client, payload) + srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("fn('%+v') fail: no server", payload) } @@ -303,7 +294,7 @@ func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payloa } func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Client, payload string) error { // chose a server - srv := w.chooseShardSimpleQuery(client, payload) + srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("call to query '%s' failed", payload) } @@ -328,7 +319,7 @@ func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Clie } func (w *worker) z_actually_do_transaction(ctx context.Context, client gat.Client, payload string) error { // chose a server - srv := w.chooseShardTransaction(client, payload) + srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("call to transaction '%s' failed", payload) } diff --git a/lib/gat/gatling/pool/pool.go b/lib/gat/gatling/pool/pool.go index cc8d33f79b9bcaf2c63edf3a2a8202d882df783a..29e4932177907e8e9430e331a02c29ab75a53b1d 100644 --- a/lib/gat/gatling/pool/pool.go +++ b/lib/gat/gatling/pool/pool.go @@ -16,7 +16,7 @@ type Pool struct { stats *gat.PoolStats - router query_router.QueryRouter + router *query_router.QueryRouter mu sync.RWMutex } @@ -25,6 +25,7 @@ func NewPool(conf *config.Pool) *Pool { pool := &Pool{ connPools: make(map[string]gat.ConnectionPool), stats: gat.NewPoolStats(), + router: query_router.DefaultRouter, } pool.EnsureConfig(conf) return pool @@ -60,7 +61,7 @@ func (p *Pool) GetUser(name string) *config.User { } func (p *Pool) GetRouter() gat.QueryRouter { - return &p.router + return p.router } func (p *Pool) WithUser(name string) gat.ConnectionPool { diff --git a/lib/gat/gatling/pool/query_router/query_router.go b/lib/gat/gatling/pool/query_router/query_router.go index 1bc0bf0fea41cf9fd3baa4382640d0121e153aa6..a8596c18afd7dcb022dfd9abac91c3d110bf531b 100644 --- a/lib/gat/gatling/pool/query_router/query_router.go +++ b/lib/gat/gatling/pool/query_router/query_router.go @@ -1,199 +1,73 @@ package query_router import ( - "regexp" - - "git.tuxpa.in/a/zlog/log" - + "errors" "gfx.cafe/gfx/pggat/lib/config" + "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/protocol" - "gfx.cafe/util/go/lambda" + "gfx.cafe/gfx/pggat/lib/parse" + "gfx.cafe/gfx/pggat/lib/util/cmux" + "strconv" + "unicode" + "unicode/utf8" ) -var CustomSqlRegex = lambda.MapV([]string{ - "(?i)^ *SET SHARDING KEY TO '?([0-9]+)'? *;? *$", - "(?i)^ *SET SHARD TO '?([0-9]+|ANY)'? *;? *$", - "(?i)^ *SHOW SHARD *;? *$", - "(?i)^ *SET SERVER ROLE TO '(PRIMARY|REPLICA|ANY|AUTO|DEFAULT)' *;? *$", - "(?i)^ *SHOW SERVER ROLE *;? *$", - "(?i)^ *SET PRIMARY READS TO '?(on|off|default)'? *;? *$", - "(?i)^ *SHOW PRIMARY READS *;? *$", -}, regexp.MustCompile) - -type Command interface { -} - -var _ []Command = []Command{ - &CommandSetShardingKey{}, - &CommandSetShard{}, - &CommandShowShard{}, - &CommandSetServerRole{}, - &CommandShowServerRole{}, - &CommandSetPrimaryReads{}, - &CommandShowPrimaryReads{}, -} - -type CommandSetShardingKey struct{} -type CommandSetShard struct{} -type CommandShowShard struct{} -type CommandSetServerRole struct{} -type CommandShowServerRole struct{} -type CommandSetPrimaryReads struct{} -type CommandShowPrimaryReads struct{} - type QueryRouter struct { - active_shard int - primary_reads_enabled bool - //pool_settings pool.PoolSettings + router cmux.Mux[gat.Client, error] } -/* TODO -// / Pool settings can change because of a config reload. -func (r *QueryRouter) UpdatePoolSettings(pool_settings pool.PoolSettings) { - r.pool_settings = pool_settings -} - -*/ - -// / Try to parse a command and execute it. -// TODO: needs to just provide the execution function and so gatling can then plug in the client, server, etc -func (r *QueryRouter) try_execute_command(pkt *protocol.Query) (Command, string) { - // Only simple protocol supported for commands. - // TODO: read msg len - // msglen := buf.get_i32() - custom := false - for _, v := range CustomSqlRegex { - if v.MatchString(pkt.Fields.Query) { - custom = true - break +var DefaultRouter = func() *QueryRouter { + r := cmux.NewMapMux[gat.Client, error]() + r.Register([]string{"set", "sharding", "key", "to"}, func(_ gat.Client, args []string) error { + return nil + }) + r.Register([]string{"set", "shard", "to"}, func(client gat.Client, args []string) error { + if len(args) == 0 { + return errors.New("expected at least one argument") } - } - // This is not a custom query, try to infer which - // server it'll go to if the query parser is enabled. - if !custom { - log.Println("regular query, not a command") - return nil, "" - } - - // TODO: command matching - //command := switch matches[0] { - // 0 => Command::SetShardingKey, - // 1 => Command::SetShard, - // 2 => Command::ShowShard, - // 3 => Command::SetServerRole, - // 4 => Command::ShowServerRole, - // 5 => Command::SetPrimaryReads, - // 6 => Command::ShowPrimaryReads, - // _ => unreachable!(), - //} - - //mut value := switch command { - // Command::SetShardingKey - // | Command::SetShard - // | Command::SetServerRole - // | Command::SetPrimaryReads => { - // // Capture value. I know this re-runs the regex engine, but I haven't - // // figured out a better way just yet. I think I can write a single Regex - // // that switches all 5 custom SQL patterns, but maybe that's not very legible? - // // - // // I think this is faster than running the Regex engine 5 times. - // switch regex_list[matches[0]].captures(&query) { - // Some(captures) => switch captures.get(1) { - // Some(value) => value.as_str().to_string(), - // None => return None, - // }, - // None => return None, - // } - // } - - // Command::ShowShard => self.shard().to_string(), - // Command::ShowServerRole => switch self.active_role { - // Some(Role::GetPrimary) => string("primary"), - // Some(Role::Replica) => string("replica"), - // None => { - // if self.query_parser_enabled { - // string("auto") - // } else { - // string("any") - // } - // } - // }, - - // Command::ShowPrimaryReads => switch self.primary_reads_enabled { - // true => string("on"), - // false => string("off"), - // }, - //} - - //switch command { - // Command::SetShardingKey => { - // sharder := Sharder::new( - // self.pool_settings.shards, - // self.pool_settings.sharding_function, - // ) - // shard := sharder.shard(value.parse::<i64>().unwrap()) - // self.active_shard := Some(shard) - // value := shard.to_string() - // } - - // Command::SetShard => { - // self.active_shard := switch value.to_ascii_uppercase().as_ref() { - // "ANY" => Some(rand::random::<usize>() % self.pool_settings.shards), - // _ => Some(value.parse::<usize>().unwrap()), - // } - // } - - // Command::SetServerRole => { - // self.active_role := switch value.to_ascii_lowercase().as_ref() { - // "primary" => { - // self.query_parser_enabled := false - // Some(Role::GetPrimary) - // } - - // "replica" => { - // self.query_parser_enabled := false - // Some(Role::Replica) - // } - // "any" => { - // self.query_parser_enabled := false - // None - // } - - // "auto" => { - // self.query_parser_enabled := true - // None - // } - - // "default" => { - // self.active_role := self.pool_settings.default_role - // self.query_parser_enabled := self.query_parser_enabled - // self.active_role - // } - - // _ => unreachable!(), - // } - // } + v := args[0] + r, l := utf8.DecodeRuneInString(v) + if !unicode.IsNumber(r) { + if len(v)-l <= l { + return errors.New("malformed input") + } + v = v[l : len(v)-l] + } - // Command::SetPrimaryReads => { - // if value == "on" { - // log.Println("Setting primary reads to on") - // self.primary_reads_enabled := true - // } else if value == "off" { - // log.Println("Setting primary reads to off") - // self.primary_reads_enabled := false - // } else if value == "default" { - // log.Println("Setting primary reads to default") - // self.primary_reads_enabled := self.pool_settings.primary_reads_enabled - // } - // } + if v == "any" { + client.UnsetRequestedShard() + return nil + } - // _ => (), - //} + num, err := strconv.Atoi(v) + if err != nil { + return err + } - //Some((command, value)) - return nil, "" -} + client.SetRequestedShard(num) + + return nil + }) + r.Register([]string{"show", "shard"}, func(_ gat.Client, args []string) error { + return nil + }) + r.Register([]string{"set", "server", "role", "to"}, func(_ gat.Client, args []string) error { + return nil + }) + r.Register([]string{"show", "server", "role"}, func(_ gat.Client, args []string) error { + return nil + }) + r.Register([]string{"set", "primary", "reads", "to"}, func(_ gat.Client, args []string) error { + return nil + }) + r.Register([]string{"show", "primary", "reads"}, func(_ gat.Client, args []string) error { + return nil + }) + return &QueryRouter{ + router: r, + } +}() // Try to infer the server role to try to connect to // based on the contents of the query. @@ -231,11 +105,20 @@ func (r *QueryRouter) InferRole(query string) (config.ServerRole, error) { return active_role, nil } -// / Get desired shard we should be talking to. -func (r *QueryRouter) Shard() int { - return r.active_shard -} - -func (r *QueryRouter) SetShard(shard int) { - r.active_shard = shard +func (r *QueryRouter) TryHandle(client gat.Client, query string) (handled bool, err error) { + var parsed []parse.Command + parsed, err = parse.Parse(query) + if err != nil { + return + } + if len(parsed) == 0 { + // send empty query + err = client.Send(new(protocol.EmptyQueryResponse)) + return true, err + } + if len(parsed) != 1 { + return + } + err, handled = r.router.Call(client, append([]string{parsed[0].Command}, parsed[0].Arguments...)) + return } diff --git a/lib/gat/gatling/pool/query_router/query_router_test.go b/lib/gat/gatling/pool/query_router/query_router_test.go index 31b3fd4b244af6c963d689ac326743f137ed33f8..a68247b611d67af69b09168419f96e67dd8e9097 100644 --- a/lib/gat/gatling/pool/query_router/query_router_test.go +++ b/lib/gat/gatling/pool/query_router/query_router_test.go @@ -8,7 +8,7 @@ import ( // TODO: adapt tests func TestQueryRouterInterRoleReplica(t *testing.T) { - qr := &QueryRouter{} + qr := DefaultRouter role, err := qr.InferRole(`UPDATE items SET name = 'pumpkin' WHERE id = 5`) if err != nil { t.Fatal(err) diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index e8752ca995b5071321106d139b2ceb60a83aedfb..35a3ec7d0a66e2c62d597e9b6629ed043463687d 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -35,6 +35,13 @@ type Client interface { GetRequestTime() time.Time GetRemotePid() int + // sharding + SetRequestedShard(shard int) + UnsetRequestedShard() + GetRequestedShard() (int, bool) + SetShardingKey(key string) + GetShardingKey() string + Send(pkt protocol.Packet) error Flush() error Recv() <-chan protocol.Packet @@ -63,6 +70,8 @@ type Pool interface { type QueryRouter interface { InferRole(query string) (config.ServerRole, error) + // TryHandle the client's query string. If we handled it, return true + TryHandle(client Client, query string) (bool, error) } type ConnectionPool interface { diff --git a/lib/util/cmux/cmux.go b/lib/util/cmux/cmux.go index ad330e6d5679d429ac5fe117b3cc9087b8bece75..403803892b8aa99e4bd8a20f46218e0d6456c3b0 100644 --- a/lib/util/cmux/cmux.go +++ b/lib/util/cmux/cmux.go @@ -7,33 +7,80 @@ import ( "github.com/looplab/fsm" ) -type Mux[T any] interface { - Register([]string, func([]string) T) - Call([]string) T +type Mux[IN, OUT any] interface { + Register([]string, func(IN, []string) OUT) + Call(IN, []string) (OUT, bool) } -type funcSet[T any] struct { +type MapMux[IN, OUT any] struct { + sub map[string]*MapMux[IN, OUT] + fn func(IN, []string) OUT +} + +func NewMapMux[IN, OUT any]() *MapMux[IN, OUT] { + return &MapMux[IN, OUT]{ + sub: make(map[string]*MapMux[IN, OUT]), + } +} + +func (m *MapMux[IN, OUT]) Register(path []string, fn func(IN, []string) OUT) { + mux := m + for { + if len(path) == 0 { + mux.fn = fn + return + } + + var ok bool + if _, ok = mux.sub[path[0]]; !ok { + mux.sub[path[0]] = NewMapMux[IN, OUT]() + } + mux = mux.sub[path[0]] + path = path[1:] + } +} + +func (m *MapMux[IN, OUT]) Call(arg IN, path []string) (o OUT, exists bool) { + mux := m + for { + if len(path) != 0 { + if sub, ok := mux.sub[path[0]]; ok { + mux = sub + path = path[1:] + continue + } + } + + if mux.fn != nil { + o = mux.fn(arg, path) + exists = true + } + return + } +} + +type funcSet[IN, OUT any] struct { Ref []string - Call func([]string) T + Call func(IN, []string) OUT } -type FsmMux[T any] struct { +type FsmMux[IN, OUT any] struct { f *fsm.FSM - funcs map[string]funcSet[T] + funcs map[string]funcSet[IN, OUT] sync.RWMutex } -func (f *FsmMux[T]) Register(path []string, fn func([]string) T) { +func (f *FsmMux[IN, OUT]) Register(path []string, fn func(IN, []string) OUT) { execkey := strings.Join(path, "|") - f.funcs[execkey] = funcSet[T]{ + f.funcs[execkey] = funcSet[IN, OUT]{ Ref: path, Call: fn, } f.construct() } -func (f *FsmMux[T]) construct() { +func (f *FsmMux[IN, OUT]) construct() { evts := fsm.Events{} cbs := fsm.Callbacks{} for _, fset := range f.funcs { @@ -67,8 +114,8 @@ func (f *FsmMux[T]) construct() { f.f = fsm.NewFSM("_", evts, cbs) } -func (f *FsmMux[T]) Call(k []string) T { - fn := f.funcs[""].Call +func (f *FsmMux[IN, OUT]) Call(arg IN, k []string) (r OUT, matched bool) { + var fn func(IN, []string) OUT args := k path := k lp := len(path) @@ -97,17 +144,16 @@ func (f *FsmMux[T]) Call(k []string) T { } f.Unlock() } - return fn(args) + if fn != nil { + r = fn(arg, args) + matched = true + } + return } -func NewFsmMux[T any]() Mux[T] { - o := &FsmMux[T]{ - funcs: map[string]funcSet[T]{ - "": { - Ref: []string{}, - Call: func([]string) T { return *new(T) }, - }, - }, +func NewFsmMux[IN, OUT any]() Mux[IN, OUT] { + o := &FsmMux[IN, OUT]{ + funcs: map[string]funcSet[IN, OUT]{}, } return o } diff --git a/lib/util/cmux/cmux_test.go b/lib/util/cmux/cmux_test.go index 047957370f8e1cee76bf1e93ba95b0a99f302ed0..e0ff58884f94532c21cc9d78df4a61b192a6eff1 100644 --- a/lib/util/cmux/cmux_test.go +++ b/lib/util/cmux/cmux_test.go @@ -6,18 +6,18 @@ import ( ) func TestFsm(t *testing.T) { - m := NewFsmMux[error]() + m := NewFsmMux[any, error]() - m.Register([]string{"set", "shard", "to"}, func(s []string) error { + m.Register([]string{"set", "shard", "to"}, func(_ any, s []string) error { log.Println(s) return nil }) - m.Register([]string{"set", "sharding", "key", "to"}, func(s []string) error { + m.Register([]string{"set", "sharding", "key", "to"}, func(_ any, s []string) error { log.Println(s) return nil }) - m.Call([]string{"set", "shard", "to", "doggo", "wow", "this", "works"}) - m.Call([]string{"set", "sharding", "key", "to", "doggo", "wow", "this", "works2"}) + m.Call(nil, []string{"set", "shard", "to", "doggo", "wow", "this", "works"}) + m.Call(nil, []string{"set", "sharding", "key", "to", "doggo", "wow", "this", "works2"}) }