diff --git a/lib/gat/pool.go b/lib/gat/pool.go index d7a355dba10e0cd4c806d7ff888e28c71092d69c..c5fa7df00ac502fbc2260f010d50ae3607c9acac 100644 --- a/lib/gat/pool.go +++ b/lib/gat/pool.go @@ -4,4 +4,7 @@ import "pggat2/lib/zap" type Pool interface { Serve(client zap.ReadWriter) + + AddRecipe(name string, recipe Recipe) + RemoveRecipe(name string) } diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go index b435ac1807874f63db2573ffa8dd89442b658b1b..3aeef547cc7da5382ff49b38460f4e9360baaf3a 100644 --- a/lib/gat/pools/session/pool.go +++ b/lib/gat/pools/session/pool.go @@ -1,14 +1,82 @@ package session import ( + "net" + "sync" + + "pggat2/lib/bouncer/backends/v0" + "pggat2/lib/bouncer/bouncers/v2" "pggat2/lib/gat" "pggat2/lib/zap" ) type Pool struct { + // use slice lifo for better perf + queue []zap.ReadWriter + mu sync.RWMutex + + signal chan struct{} +} + +func NewPool() *Pool { + return &Pool{ + signal: make(chan struct{}), + } +} + +func (T *Pool) acquire() zap.ReadWriter { + for { + T.mu.Lock() + if len(T.queue) > 0 { + server := T.queue[len(T.queue)-1] + T.queue = T.queue[:len(T.queue)-1] + T.mu.Unlock() + return server + } + T.mu.Unlock() + <-T.signal + } +} + +func (T *Pool) release(server zap.ReadWriter) { + T.mu.Lock() + defer T.mu.Unlock() + T.queue = append(T.queue, server) + + select { + case T.signal <- struct{}{}: + default: + } } func (T *Pool) Serve(client zap.ReadWriter) { + server := T.acquire() + defer T.release(server) + for { + if err := client.Poll(); err != nil { + break + } + bouncers.Bounce(client, server) + } +} + +func (T *Pool) AddRecipe(name string, recipe gat.Recipe) { + for i := 0; i < recipe.MinConnections; i++ { + conn, err := net.Dial("tcp", recipe.Address) + if err != nil { + // TODO(garet) do something here + continue + } + rw := zap.CombinedReadWriter{ + Reader: zap.IOReader{Reader: conn}, + Writer: zap.IOWriter{Writer: conn}, + } + backends.Accept(rw, recipe.User, recipe.Password, recipe.Database) + T.release(rw) + } +} + +func (T *Pool) RemoveRecipe(name string) { // TODO implement me panic("implement me") } diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go index 3a2ae380d36d2817a626e421b4965ec679d80a2c..c39672686e16debc4bd5aed70b45345b34c6a51c 100644 --- a/lib/gat/pools/transaction/pool.go +++ b/lib/gat/pools/transaction/pool.go @@ -28,7 +28,8 @@ func (T *Pool) AddRecipe(name string, recipe gat.Recipe) { for i := 0; i < recipe.MinConnections; i++ { conn, err := net.Dial("tcp", recipe.Address) if err != nil { - panic(err) + // TODO(garet) do something here + continue } rw := zap.CombinedReadWriter{ Reader: zap.IOReader{Reader: conn}, @@ -66,8 +67,7 @@ func (T *Pool) Serve(client zap.ReadWriter) { psc, ) for { - err := client.Poll() - if err != nil { + if err := client.Poll(); err != nil { break } source.Do(0, Work{