diff --git a/config_data.yml b/config_data.yml index 7047c51f4a7f48420a67ed4de19c52c7a591e083..c1d8134cba63e9408356814c06538a37e8a0d075 100644 --- a/config_data.yml +++ b/config_data.yml @@ -33,7 +33,7 @@ pools: port: 5432 role: primary username: ENV$PSQL_DB_USER_RW - password: ENV$PSQL_DB_PASS_RW + password: postgres - host: ENV$PSQL_REP_DB_HOST port: 5432 role: replica diff --git a/go.mod b/go.mod index 6383d9ec2b7be84c4a8325c8ea0097b11733258b..7a5770763e6ecb71d2436e5b374f9585e1b20c9a 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( gfx.cafe/ghalliday1/pg3p v0.0.19 gfx.cafe/ghalliday1/pgparser v0.0.9 gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf - gfx.cafe/util/go/generic v0.0.0-20220917152604-80373e5a2c51 + gfx.cafe/util/go/generic v0.0.0-20221001013022-0560a0526470 gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9 git.tuxpa.in/a/zlog v1.32.0 github.com/BurntSushi/toml v1.2.0 diff --git a/go.sum b/go.sum index d2926fd71aeb98fd162464c050b55c633202c670..d32e23f584615cbb90e933a50bba306b16e40de9 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,10 @@ gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf h1:ya4IK1D+Kq0DrFdrr gfx.cafe/util/go/bufpool v0.0.0-20220906091724-3a24b7f40ccf/go.mod h1:+DiyiCOBGS9O9Ce4ewHQO3Y59h66WSWAbgZZ2O2AYYw= gfx.cafe/util/go/generic v0.0.0-20220917152604-80373e5a2c51 h1:4FEqqQR2Ruae4quQQn7lqLtLvQaH+dHmehpPHhiXm+Q= gfx.cafe/util/go/generic v0.0.0-20220917152604-80373e5a2c51/go.mod h1:0mgIMiX8+M1l4VHMaOqBaydZaztbFgblQAaEDOZpUhQ= +gfx.cafe/util/go/generic v0.0.0-20221001012907-2fe841bb39cc h1:M3dtZ5NDUG5LOpB3s0sFnCK9mgNvau0nRacV2h/ui54= +gfx.cafe/util/go/generic v0.0.0-20221001012907-2fe841bb39cc/go.mod h1:0mgIMiX8+M1l4VHMaOqBaydZaztbFgblQAaEDOZpUhQ= +gfx.cafe/util/go/generic v0.0.0-20221001013022-0560a0526470 h1:PwdR5V3IO06cdDGfmYVa9AYIPmRbLIweKJqZt2hIWuo= +gfx.cafe/util/go/generic v0.0.0-20221001013022-0560a0526470/go.mod h1:0mgIMiX8+M1l4VHMaOqBaydZaztbFgblQAaEDOZpUhQ= gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9 h1:Z0RRPRbz5Bt7yu6v+RzLCAKm0YNcrwNGLMs9UVs8NsU= gfx.cafe/util/go/graceful v0.0.0-20220913082111-9770431e98e9/go.mod h1:sO44FAgBZXic9FwJaJHX1mI9vt1e3CRe0X/3bwnMRho= git.tuxpa.in/a/zlog v1.32.0 h1:KKXbRF1x8kJDSzUoGz/pivo+4TVY6xT5sVtdFZ6traY= @@ -304,6 +308,8 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108 github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M= +github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= diff --git a/lib/gat/database/query_router/query_router.go b/lib/gat/database/query_router/query_router.go index 760eeb3c0bd76dbb6813258511b6e4265ccc16ec..d1b298b2ca6f03aa64ed7fea85299fb0bb0d8e73 100644 --- a/lib/gat/database/query_router/query_router.go +++ b/lib/gat/database/query_router/query_router.go @@ -2,20 +2,22 @@ package query_router import ( "errors" + "strconv" + "unicode" + "unicode/utf8" + "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/util/cmux" "gfx.cafe/ghalliday1/pg3p" "gfx.cafe/ghalliday1/pg3p/lex" - "strconv" - "unicode" - "unicode/utf8" + "gfx.cafe/util/go/generic" ) type QueryRouter struct { - router cmux.Mux[gat.Client, error] - parser *pg3p.Parser - c *config.Pool + router cmux.Mux[gat.Client, error] + parsers generic.HookPool[*pg3p.Parser] + c *config.Pool } var defaultMux = func() *cmux.MapMux[gat.Client, error] { @@ -72,8 +74,10 @@ var defaultMux = func() *cmux.MapMux[gat.Client, error] { func DefaultRouter(c *config.Pool) *QueryRouter { return &QueryRouter{ router: defaultMux, - parser: pg3p.NewParser(), - c: c, + parsers: generic.HookPool[*pg3p.Parser]{ + New: pg3p.NewParser, + }, + c: c, } } @@ -86,7 +90,9 @@ func (r *QueryRouter) InferRole(query string) (config.ServerRole, error) { return config.SERVERROLE_PRIMARY, nil } // parse the query - tokens := r.parser.Lex(query) + parser := r.parsers.Get() + defer r.parsers.Put(parser) + tokens := parser.Lex(query) depth := 0 for _, token := range tokens { switch token.Token { diff --git a/lib/gat/pool/session/pool.go b/lib/gat/pool/session/pool.go index f58a256b3e745cc353957a3cab1e2a3fa1c741f6..b5fcc5870d608ff51e18e61b56d37b587fb2058b 100644 --- a/lib/gat/pool/session/pool.go +++ b/lib/gat/pool/session/pool.go @@ -2,12 +2,13 @@ package session import ( "context" + "runtime" + "sync/atomic" + "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/util/go/generic" - "runtime" - "sync/atomic" ) type Pool struct { diff --git a/lib/gat/pool/transaction/pool.go b/lib/gat/pool/transaction/pool.go index a41d331d9dbd57bbcf3b32c52da7dec04711690b..391199eda7a61c25d63f69cddfaf2dd3bb696ff7 100644 --- a/lib/gat/pool/transaction/pool.go +++ b/lib/gat/pool/transaction/pool.go @@ -2,7 +2,6 @@ package transaction import ( "context" - "runtime" "sync/atomic" "time" @@ -21,11 +20,7 @@ type Pool struct { dialer gat.Dialer - // see: https://github.com/golang/go/blob/master/src/runtime/chan.go#L33 - // channels are a thread safe ring buffer implemented via a linked list of goroutines. - // the idea is that goroutines are cheap, and we can afford to have one per pending request. - // there is no real reason to implement a complicated worker database pattern when well, if we're okay with having a 2-4kb overhead per request, then this is fine. trading space for code complexity - workerPool chan *worker + workerPool WorkerPool[*Worker] } func New(database gat.Database, dialer gat.Dialer, conf *config.Pool, user *config.User) *Pool { @@ -33,37 +28,45 @@ func New(database gat.Database, dialer gat.Dialer, conf *config.Pool, user *conf user: user, database: database, dialer: dialer, - workerPool: make(chan *worker, 1+runtime.NumCPU()*4), + workerPool: NewChannelPool[*Worker](user.PoolSize), } p.EnsureConfig(conf) return p } +func (c *Pool) WithWorkerPool(w WorkerPool[*Worker]) { + c.workerPool = w +} + func (c *Pool) GetDatabase() gat.Database { return c.database } -func (c *Pool) getWorker() *worker { +func (c *Pool) getWorker() *Worker { start := time.Now() defer func() { metrics.RecordWaitTime(c.database.GetName(), c.user.Name, time.Since(start)) }() - select { - case w := <-c.workerPool: + w, ok := c.workerPool.TryGet() + if ok { return w - default: + } else { if c.workerCount.Add(1)-1 < int64(c.user.PoolSize) { - next := &worker{ + next := &Worker{ w: c, } return next } else { - w := <-c.workerPool + w := c.workerPool.Get() return w } } } +func (c *Pool) returnWorker(w *Worker) { + c.workerPool.Put(w) +} + func (c *Pool) EnsureConfig(conf *config.Pool) { c.c.Store(conf) } diff --git a/lib/gat/pool/transaction/worker.go b/lib/gat/pool/transaction/worker.go index 10a869313ca82eae821a6780aa8bcceb2f23debe..57bcc4a0706b9604e1b944b0bb25ec75ced8ac2f 100644 --- a/lib/gat/pool/transaction/worker.go +++ b/lib/gat/pool/transaction/worker.go @@ -15,25 +15,18 @@ import ( "gfx.cafe/gfx/pggat/lib/metrics" ) -// a single use worker with an embedded connection database. +// a single use Worker with an embedded connection database. // it wraps a pointer to the connection database. -type worker struct { +type Worker struct { // the parent connectino database - w *Pool - rev int + w *Pool shards []*shard.Shard - - mu sync.Mutex -} - -// ret urn worker to database -func (w *worker) ret() { - w.w.workerPool <- w + mu sync.Mutex } -// attempt to connect to a new shard with this worker -func (w *worker) fetchShard(client gat.Client, n int) bool { +// attempt to connect to a new shard with this Worker +func (w *Worker) fetchShard(client gat.Client, n int) bool { conf := w.w.c.Load() if n < 0 || n >= len(conf.Shards) { return false @@ -47,14 +40,14 @@ func (w *worker) fetchShard(client gat.Client, n int) bool { return true } -func (w *worker) invalidateShard(n int) { +func (w *Worker) invalidateShard(n int) { w.mu.Lock() defer w.mu.Unlock() w.shards[n] = nil } -func (w *worker) chooseShard(client gat.Client) *shard.Shard { +func (w *Worker) chooseShard(client gat.Client) *shard.Shard { w.mu.Lock() defer w.mu.Unlock() @@ -85,8 +78,8 @@ func (w *worker) chooseShard(client gat.Client) *shard.Shard { return nil } -func (w *worker) GetServerInfo(client gat.Client) []*protocol.ParameterStatus { - defer w.ret() +func (w *Worker) GetServerInfo(client gat.Client) []*protocol.ParameterStatus { + defer w.w.returnWorker(w) s := w.chooseShard(client) if s == nil { @@ -101,8 +94,8 @@ func (w *worker) GetServerInfo(client gat.Client) []*protocol.ParameterStatus { return primary.GetServerInfo() } -func (w *worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.Describe) error { - defer w.ret() +func (w *Worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.Describe) error { + defer w.w.returnWorker(w) if w.w.user.StatementTimeout != 0 { var done context.CancelFunc @@ -134,8 +127,8 @@ func (w *worker) HandleDescribe(ctx context.Context, c gat.Client, d *protocol.D } } -func (w *worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Execute) error { - defer w.ret() +func (w *Worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Execute) error { + defer w.w.returnWorker(w) if w.w.user.StatementTimeout != 0 { var done context.CancelFunc @@ -167,8 +160,8 @@ func (w *worker) HandleExecute(ctx context.Context, c gat.Client, e *protocol.Ex } } -func (w *worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol.FunctionCall) error { - defer w.ret() +func (w *Worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol.FunctionCall) error { + defer w.w.returnWorker(w) if w.w.user.StatementTimeout != 0 { var done context.CancelFunc @@ -200,8 +193,8 @@ func (w *worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol. } } -func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query string) error { - defer w.ret() +func (w *Worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query string) error { + defer w.w.returnWorker(w) if w.w.user.StatementTimeout != 0 { var done context.CancelFunc @@ -234,8 +227,8 @@ func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query stri } } -func (w *worker) HandleTransaction(ctx context.Context, c gat.Client, query string) error { - defer w.ret() +func (w *Worker) HandleTransaction(ctx context.Context, c gat.Client, query string) error { + defer w.w.returnWorker(w) if w.w.user.StatementTimeout != 0 { var done context.CancelFunc @@ -268,17 +261,17 @@ func (w *worker) HandleTransaction(ctx context.Context, c gat.Client, query stri } } -func (w *worker) setCurrentBinding(client gat.Client, server gat.Connection) { +func (w *Worker) setCurrentBinding(client gat.Client, server gat.Connection) { client.SetCurrentConn(server) server.SetClient(client) } -func (w *worker) unsetCurrentBinding(client gat.Client, server gat.Connection) { +func (w *Worker) unsetCurrentBinding(client gat.Client, server gat.Connection) { client.SetCurrentConn(nil) server.SetClient(nil) } -func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, payload *protocol.Describe) error { +func (w *Worker) z_actually_do_describe(ctx context.Context, client gat.Client, payload *protocol.Describe) error { srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) @@ -296,7 +289,7 @@ func (w *worker) z_actually_do_describe(ctx context.Context, client gat.Client, defer w.unsetCurrentBinding(client, target) return target.Describe(ctx, client, payload) } -func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, payload *protocol.Execute) error { +func (w *Worker) z_actually_do_execute(ctx context.Context, client gat.Client, payload *protocol.Execute) error { srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("describe('%+v') fail: no server", payload) @@ -313,7 +306,7 @@ func (w *worker) z_actually_do_execute(ctx context.Context, client gat.Client, p } return target.Execute(ctx, client, payload) } -func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error { +func (w *Worker) z_actually_do_fn(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error { srv := w.chooseShard(client) if srv == nil { return fmt.Errorf("fn('%+v') fail: no server", payload) @@ -334,7 +327,7 @@ func (w *worker) z_actually_do_fn(ctx context.Context, client gat.Client, payloa } return nil } -func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Client, payload string) error { +func (w *Worker) z_actually_do_simple_query(ctx context.Context, client gat.Client, payload string) error { // chose a server srv := w.chooseShard(client) if srv == nil { @@ -362,7 +355,7 @@ func (w *worker) z_actually_do_simple_query(ctx context.Context, client gat.Clie } return nil } -func (w *worker) z_actually_do_transaction(ctx context.Context, client gat.Client, payload string) error { +func (w *Worker) z_actually_do_transaction(ctx context.Context, client gat.Client, payload string) error { // chose a server srv := w.chooseShard(client) if srv == nil { diff --git a/lib/gat/pool/transaction/worker_pool.go b/lib/gat/pool/transaction/worker_pool.go new file mode 100644 index 0000000000000000000000000000000000000000..19a026aec9ffd59774eb7e53c794fd12c971813a --- /dev/null +++ b/lib/gat/pool/transaction/worker_pool.go @@ -0,0 +1,34 @@ +package transaction + +type WorkerPool[T any] interface { + TryGet() (T, bool) + Get() T + Put(T) +} + +type ChannelPool[T any] struct { + ch chan T +} + +func NewChannelPool[T any](size int) *ChannelPool[T] { + return &ChannelPool[T]{ + ch: make(chan T, size*10), + } +} + +func (c *ChannelPool[T]) Get() T { + return <-c.ch +} + +func (c *ChannelPool[T]) TryGet() (T, bool) { + select { + case out := <-c.ch: + return out, true + default: + return *new(T), false + } +} + +func (c *ChannelPool[T]) Put(t T) { + c.ch <- t +} diff --git a/lib/metrics/gat.go b/lib/metrics/gat.go index cd597e729c7c6e3a1990d5002b4b665139ce9ec7..976120e20fde241b4ad2612ba78a5c6c3eb3c936 100644 --- a/lib/metrics/gat.go +++ b/lib/metrics/gat.go @@ -48,6 +48,8 @@ func RecordAcceptConnectionStatus(err error) { if err != nil { g.ConnectionErrorCounter.WithLabelValues(err.Error()).Inc() } + log.Println("TEST") + g.ConnectionCounter.Inc() } diff --git a/test/docker-compose.yml b/test/docker-compose.yml index b4b23dd15c2b9a0fc8af1f9a940613ad2be370b6..a49d935c632c9846bc1ec037f5036c340274896d 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -6,8 +6,6 @@ services: restart: always environment: POSTGRES_PASSWORD: example - ports: - - 5432:5432 adminer: image: adminer restart: always