diff --git a/lib/gat/modes/pgbouncer/pools.go b/lib/gat/modes/pgbouncer/pools.go index 636d621483b187776e3a782e756d8c059a1e2966..663301485b8b20b430eca503e3f560dadd8d12a1 100644 --- a/lib/gat/modes/pgbouncer/pools.go +++ b/lib/gat/modes/pgbouncer/pools.go @@ -15,6 +15,7 @@ import ( "pggat2/lib/bouncer/frontends/v0" "pggat2/lib/gat" "pggat2/lib/gat/pool" + "pggat2/lib/gat/pool/dialer" "pggat2/lib/gat/pool/pools/session" "pggat2/lib/gat/pool/pools/transaction" "pggat2/lib/gsql" @@ -172,7 +173,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { Database: database, }, p) - var dialer pool.Dialer + var d dialer.Dialer dbCreds := creds if db.Password != "" { @@ -204,7 +205,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { dir = dir + ".s.PGSQL." + strconv.Itoa(port) - dialer = pool.NetDialer{ + d = dialer.Net{ Network: "unix", Address: dir, AcceptOptions: acceptOptions, @@ -218,7 +219,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { } // connect over tcp - dialer = pool.NetDialer{ + d = dialer.Net{ Network: "tcp", Address: address, AcceptOptions: acceptOptions, @@ -226,7 +227,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { } recipe := pool.Recipe{ - Dialer: dialer, + Dialer: d, MinConnections: db.MinPoolSize, MaxConnections: db.MaxDBConnections, } diff --git a/lib/gat/pool/dialer/dialer.go b/lib/gat/pool/dialer/dialer.go new file mode 100644 index 0000000000000000000000000000000000000000..b50d794767858fa1bf1f4ec254eb130c3d69ab54 --- /dev/null +++ b/lib/gat/pool/dialer/dialer.go @@ -0,0 +1,11 @@ +package dialer + +import ( + "pggat2/lib/bouncer/backends/v0" + "pggat2/lib/fed" +) + +type Dialer interface { + Dial() (fed.Conn, backends.AcceptParams, error) + Cancel(cancelKey [8]byte) error +} diff --git a/lib/gat/pool/dialer.go b/lib/gat/pool/dialer/net.go similarity index 69% rename from lib/gat/pool/dialer.go rename to lib/gat/pool/dialer/net.go index 4d5a35fa030158afbd2837df7e7975909aad1297..0d24e894b3e662d12df7ef2ad17fbd6bc840fa8d 100644 --- a/lib/gat/pool/dialer.go +++ b/lib/gat/pool/dialer/net.go @@ -1,4 +1,4 @@ -package pool +package dialer import ( "net" @@ -7,19 +7,14 @@ import ( "pggat2/lib/fed" ) -type Dialer interface { - Dial() (fed.Conn, backends.AcceptParams, error) - Cancel(cancelKey [8]byte) error -} - -type NetDialer struct { +type Net struct { Network string Address string AcceptOptions backends.AcceptOptions } -func (T NetDialer) Dial() (fed.Conn, backends.AcceptParams, error) { +func (T Net) Dial() (fed.Conn, backends.AcceptParams, error) { c, err := net.Dial(T.Network, T.Address) if err != nil { return nil, backends.AcceptParams{}, err @@ -33,7 +28,7 @@ func (T NetDialer) Dial() (fed.Conn, backends.AcceptParams, error) { return conn, params, nil } -func (T NetDialer) Cancel(cancelKey [8]byte) error { +func (T Net) Cancel(cancelKey [8]byte) error { c, err := net.Dial(T.Network, T.Address) if err != nil { return err diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 430476c8ff8d0efc91f8321eaa1ad07b61ad2bf1..2ccf1ab46fac5de179099e316b3b9ee63dddac57 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -1,10 +1,11 @@ package pool import ( - "pggat2/lib/util/maps" - "sync/atomic" + "sync" "time" + "pggat2/lib/util/maps" + "github.com/google/uuid" "tuxpa.in/a/zlog/log" @@ -25,14 +26,91 @@ import ( type poolRecipe struct { recipe Recipe - count atomic.Int64 + + deleted bool + servers map[uuid.UUID]*Server + mu sync.RWMutex +} + +func (T *poolRecipe) AddServer(serverID uuid.UUID, server *Server) bool { + T.mu.Lock() + defer T.mu.Unlock() + + if T.deleted { + return false + } + + if T.recipe.MaxConnections != 0 && len(T.servers)+1 > T.recipe.MaxConnections { + return false + } + + if T.servers == nil { + T.servers = make(map[uuid.UUID]*Server) + } + T.servers[serverID] = server + return true +} + +func (T *poolRecipe) GetServer(serverID uuid.UUID) *Server { + T.mu.RLock() + defer T.mu.RUnlock() + + if T.deleted { + return nil + } + + return T.servers[serverID] +} + +func (T *poolRecipe) DeleteServer(serverID uuid.UUID) *Server { + T.mu.RLock() + defer T.mu.RUnlock() + + if T.deleted { + return nil + } + + server := T.servers[serverID] + delete(T.servers, serverID) + return server +} + +func (T *poolRecipe) Size() int { + T.mu.RLock() + defer T.mu.RUnlock() + + return len(T.servers) +} + +func (T *poolRecipe) RangeRLock(fn func(serverID uuid.UUID, server *Server) bool) bool { + T.mu.RLock() + defer T.mu.RUnlock() + + for serverID, server := range T.servers { + if !fn(serverID, server) { + return false + } + } + + return true +} + +func (T *poolRecipe) Delete(fn func(serverID uuid.UUID, server *Server)) { + T.mu.Lock() + defer T.mu.Unlock() + + T.deleted = true + for serverID, server := range T.servers { + fn(serverID, server) + delete(T.servers, serverID) + } } type Pool struct { options Options recipes maps.RWLocked[string, *poolRecipe] - servers maps.RWLocked[uuid.UUID, *Server] + servers maps.RWLocked[uuid.UUID, *poolRecipe] clients maps.RWLocked[uuid.UUID, *Client] } @@ -48,19 +126,30 @@ func NewPool(options Options) *Pool { return p } +func (T *Pool) GetServer(serverID uuid.UUID) *Server { + recipe, _ := T.servers.Load(serverID) + if recipe == nil { + return nil + } + return recipe.GetServer(serverID) +} + func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) { - T.servers.Range(func(serverID uuid.UUID, server *Server) bool { - peer, since := server.GetConnection() - if peer != uuid.Nil { - return true - } + T.recipes.Range(func(_ string, recipe *poolRecipe) bool { + recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool { + peer, since := server.GetConnection() + if peer != uuid.Nil { + return true + } - if idle != (time.Time{}) && since.After(idle) { - return true - } + if idle != (time.Time{}) && since.After(idle) { + return true + } - idlest = serverID - idle = since + idlest = serverID + idle = since + return true + }) return true }) @@ -92,12 +181,7 @@ func (T *Pool) GetCredentials() auth.Credentials { return T.options.Credentials } -func (T *Pool) _scaleUpRecipe(name string) { - r, ok := T.recipes.Load(name) - if !ok { - return - } - +func (T *Pool) scaleUpRecipe(r *poolRecipe) { server, params, err := r.recipe.Dialer.Dial() if err != nil { log.Printf("failed to dial server: %v", err) @@ -127,58 +211,62 @@ func (T *Pool) _scaleUpRecipe(name string) { ) } - r.count.Add(1) - serverID := uuid.New() - T.servers.Store(serverID, NewServer( + serverID := T.options.Pooler.NewServer() + ok := r.AddServer(serverID, NewServer( server, params.BackendKey, params.InitialParameters, - name, + psServer, eqpServer, )) - T.options.Pooler.AddServer(serverID) + if !ok { + _ = server.Close() + T.options.Pooler.DeleteServer(serverID) + return + } + T.servers.Store(serverID, r) } func (T *Pool) AddRecipe(name string, recipe Recipe) { - _, hasOld := T.recipes.Swap(name, &poolRecipe{ + r := &poolRecipe{ recipe: recipe, - }) - if hasOld { - T.servers.Range(func(serverID uuid.UUID, server *Server) bool { - if server.GetRecipe() == name { - _ = server.GetConn().Close() - T.options.Pooler.RemoveServer(serverID) - T.servers.Delete(serverID) - } - return true + } + old, _ := T.recipes.Swap(name, r) + if old != nil { + old.Delete(func(serverID uuid.UUID, server *Server) { + _ = server.GetConn().Close() + T.options.Pooler.DeleteServer(serverID) + T.servers.Delete(serverID) }) } for i := 0; i < recipe.MinConnections; i++ { - T._scaleUpRecipe(name) + T.scaleUpRecipe(r) } } func (T *Pool) RemoveRecipe(name string) { - T.recipes.Delete(name) + old, _ := T.recipes.LoadAndDelete(name) + + if old == nil { + return + } // close all servers with this recipe - T.servers.Range(func(serverID uuid.UUID, server *Server) bool { - if server.GetRecipe() == name { - _ = server.GetConn().Close() - T.options.Pooler.RemoveServer(serverID) - T.servers.Delete(serverID) - } - return true + old.Delete(func(serverID uuid.UUID, server *Server) { + _ = server.GetConn().Close() + T.options.Pooler.DeleteServer(serverID) + T.servers.Delete(serverID) }) } func (T *Pool) ScaleUp() { - T.recipes.Range(func(name string, r *poolRecipe) bool { - if r.recipe.MaxConnections == 0 || int(r.count.Load()) < r.recipe.MaxConnections { - T._scaleUpRecipe(name) + T.recipes.Range(func(_ string, r *poolRecipe) bool { + // this can race, but it will just dial an extra server and disconnect it in worst case + if r.recipe.MaxConnections == 0 || r.Size() < r.recipe.MaxConnections { + T.scaleUpRecipe(r) return false } @@ -333,19 +421,18 @@ func (T *Pool) Serve( } func (T *Pool) addClient(client fed.Conn, key [8]byte) uuid.UUID { - clientID := uuid.New() + clientID := T.options.Pooler.NewClient() T.clients.Store(clientID, NewClient( client, key, )) - T.options.Pooler.AddClient(clientID) return clientID } func (T *Pool) removeClient(clientID uuid.UUID) { T.clients.Delete(clientID) - T.options.Pooler.RemoveClient(clientID) + T.options.Pooler.DeleteClient(clientID) } func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) { @@ -355,7 +442,7 @@ func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Se serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking) } - server, _ = T.servers.Load(serverID) + server = T.GetServer(serverID) client, _ := T.clients.Load(clientID) if server != nil { server.SetPeer(clientID) @@ -367,7 +454,7 @@ func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Se } func (T *Pool) releaseServer(serverID uuid.UUID) { - server, _ := T.servers.Load(serverID) + server := T.GetServer(serverID) if server == nil { return } @@ -396,16 +483,16 @@ func (T *Pool) transactionComplete(serverID uuid.UUID) { } func (T *Pool) removeServer(serverID uuid.UUID) { - server, _ := T.servers.LoadAndDelete(serverID) + recipe, _ := T.servers.LoadAndDelete(serverID) + if recipe == nil { + return + } + server := recipe.DeleteServer(serverID) + T.options.Pooler.DeleteServer(serverID) if server == nil { return } _ = server.GetConn().Close() - T.options.Pooler.RemoveServer(serverID) - r, _ := T.recipes.Load(server.GetRecipe()) - if r != nil { - r.count.Add(-1) - } } func (T *Pool) Cancel(key [8]byte) error { @@ -423,24 +510,21 @@ func (T *Pool) Cancel(key [8]byte) error { } // get peer - var recipe string + var r *poolRecipe var serverKey [8]byte - if T.servers.Range(func(_ uuid.UUID, server *Server) bool { - if server.GetPeer() == clientID { - recipe = server.GetRecipe() - serverKey = server.GetBackendKey() - return false - } - return true + if T.recipes.Range(func(_ string, recipe *poolRecipe) bool { + return recipe.RangeRLock(func(_ uuid.UUID, server *Server) bool { + if server.GetPeer() == clientID { + r = recipe + serverKey = server.GetBackendKey() + return false + } + return true + }) }) { return nil } - r, _ := T.recipes.Load(recipe) - if r == nil { - return nil - } - return r.recipe.Dialer.Cancel(serverKey) } @@ -448,10 +532,13 @@ func (T *Pool) ReadMetrics(metrics *Metrics) { maps.Clear(metrics.Servers) maps.Clear(metrics.Clients) - T.servers.Range(func(serverID uuid.UUID, server *Server) bool { - var m ServerMetrics - server.ReadMetrics(&m) - metrics.Servers[serverID] = m + T.recipes.Range(func(_ string, recipe *poolRecipe) bool { + recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool { + var m ServerMetrics + server.ReadMetrics(&m) + metrics.Servers[serverID] = m + return true + }) return true }) diff --git a/lib/gat/pool/pooler.go b/lib/gat/pool/pooler.go index 384942673013a114ad25446e7d0d24508431a37f..9ed1d4c464c515af9ed1b1cb9bf182b274277efe 100644 --- a/lib/gat/pool/pooler.go +++ b/lib/gat/pool/pooler.go @@ -14,11 +14,11 @@ const ( ) type Pooler interface { - AddClient(client uuid.UUID) - RemoveClient(client uuid.UUID) + NewClient() uuid.UUID + DeleteClient(client uuid.UUID) - AddServer(server uuid.UUID) - RemoveServer(server uuid.UUID) + NewServer() uuid.UUID + DeleteServer(server uuid.UUID) // Acquire a peer with SyncMode Acquire(client uuid.UUID, sync SyncMode) uuid.UUID diff --git a/lib/gat/pool/pools/session/pooler.go b/lib/gat/pool/pools/session/pooler.go index c8fffbc2dd28c43dfcae454094ef1b7f0eccf1a9..fdf3321514d2585e0141241bb8771fbec1ae7318 100644 --- a/lib/gat/pool/pools/session/pooler.go +++ b/lib/gat/pool/pools/session/pooler.go @@ -16,15 +16,17 @@ type Pooler struct { mu sync.Mutex } -func (*Pooler) AddClient(_ uuid.UUID) { - // nothing to do +func (*Pooler) NewClient() uuid.UUID { + return uuid.New() } -func (*Pooler) RemoveClient(_ uuid.UUID) { +func (*Pooler) DeleteClient(_ uuid.UUID) { // nothing to do } -func (T *Pooler) AddServer(server uuid.UUID) { +func (T *Pooler) NewServer() uuid.UUID { + server := uuid.New() + T.mu.Lock() defer T.mu.Unlock() @@ -38,9 +40,11 @@ func (T *Pooler) AddServer(server uuid.UUID) { if T.ready != nil { T.ready.Signal() } + + return server } -func (T *Pooler) RemoveServer(server uuid.UUID) { +func (T *Pooler) DeleteServer(server uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() diff --git a/lib/gat/pool/pools/transaction/pooler.go b/lib/gat/pool/pools/transaction/pooler.go index 33882560f657117a40e6263c9521df2ffe6ce4cc..2f09e64a28affde174120ba6f1b4f7ad5b1931e1 100644 --- a/lib/gat/pool/pools/transaction/pooler.go +++ b/lib/gat/pool/pools/transaction/pooler.go @@ -12,20 +12,20 @@ type Pooler struct { s schedulers.Scheduler } -func (T *Pooler) AddClient(client uuid.UUID) { - T.s.AddUser(client) +func (T *Pooler) NewClient() uuid.UUID { + return T.s.NewUser() } -func (T *Pooler) RemoveClient(client uuid.UUID) { - T.s.RemoveUser(client) +func (T *Pooler) DeleteClient(client uuid.UUID) { + T.s.DeleteUser(client) } -func (T *Pooler) AddServer(server uuid.UUID) { - T.s.AddWorker(server) +func (T *Pooler) NewServer() uuid.UUID { + return T.s.NewWorker() } -func (T *Pooler) RemoveServer(server uuid.UUID) { - T.s.RemoveWorker(server) +func (T *Pooler) DeleteServer(server uuid.UUID) { + T.s.DeleteWorker(server) } func (T *Pooler) Acquire(client uuid.UUID, sync pool.SyncMode) uuid.UUID { diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go index e12c8a24bfdb592e8dc6d281f91c2b6717f30a91..d7d987032dfa3aa6920c96e80dbc1ff01ae90702 100644 --- a/lib/gat/pool/recipe.go +++ b/lib/gat/pool/recipe.go @@ -1,7 +1,9 @@ package pool +import "pggat2/lib/gat/pool/dialer" + type Recipe struct { - Dialer Dialer + Dialer dialer.Dialer MinConnections int // MaxConnections is the max number of active server connections for this recipe. // 0 = unlimited diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go index 64fcce88508aa73a231d03bf907b3b7b78c1b23d..7fa6c8cf0ae25396ba0e9ed44a5f622a448d8fb0 100644 --- a/lib/gat/pool/server.go +++ b/lib/gat/pool/server.go @@ -1,20 +1,21 @@ package pool import ( + "sync" + "time" + "github.com/google/uuid" + "pggat2/lib/fed" "pggat2/lib/middleware/middlewares/eqp" "pggat2/lib/middleware/middlewares/ps" "pggat2/lib/util/strutil" - "sync" - "time" ) type Server struct { conn fed.Conn backendKey [8]byte initialParameters map[strutil.CIString]string - recipe string psServer *ps.Server eqpServer *eqp.Server @@ -27,7 +28,6 @@ func NewServer( conn fed.Conn, backendKey [8]byte, initialParameters map[strutil.CIString]string, - recipe string, psServer *ps.Server, eqpServer *eqp.Server, @@ -36,7 +36,6 @@ func NewServer( conn: conn, backendKey: backendKey, initialParameters: initialParameters, - recipe: recipe, psServer: psServer, eqpServer: eqpServer, @@ -57,10 +56,6 @@ func (T *Server) GetInitialParameters() map[strutil.CIString]string { return T.initialParameters } -func (T *Server) GetRecipe() string { - return T.recipe -} - func (T *Server) GetPSServer() *ps.Server { return T.psServer } diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index ae057f611d0d9b771c95962c9fd9d1aa2ffa2ccd..34d65a772db8d469d60952f51f697b8e10076c6c 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -16,11 +16,11 @@ const ( ) type Scheduler interface { - AddWorker(worker uuid.UUID) - RemoveWorker(worker uuid.UUID) + NewWorker() uuid.UUID + DeleteWorker(worker uuid.UUID) - AddUser(user uuid.UUID) - RemoveUser(user uuid.UUID) + NewUser() uuid.UUID + DeleteUser(user uuid.UUID) // Acquire will acquire a worker with the desired SyncMode Acquire(user uuid.UUID, sync SyncMode) uuid.UUID diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index e7dc0d57f0aea07869ac3f7fe64d3e51c681db76..712c51b167e9e8e445081d3aface66d09e94eb6f 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -25,7 +25,9 @@ type Scheduler struct { mu sync.RWMutex } -func (T *Scheduler) AddWorker(worker uuid.UUID) { +func (T *Scheduler) NewWorker() uuid.UUID { + worker := uuid.New() + s := sink.NewSink(worker) if func() bool { @@ -47,15 +49,16 @@ func (T *Scheduler) AddWorker(worker uuid.UUID) { T.backlog = T.backlog[:0] return true }() { - return + return worker } T.mu.RLock() defer T.mu.RUnlock() T.stealFor(worker) + return worker } -func (T *Scheduler) RemoveWorker(worker uuid.UUID) { +func (T *Scheduler) DeleteWorker(worker uuid.UUID) { var s *sink.Sink var ok bool func() { @@ -76,11 +79,11 @@ func (T *Scheduler) RemoveWorker(worker uuid.UUID) { } } -func (*Scheduler) AddUser(_ uuid.UUID) { - // nothing to do, users are added lazily +func (*Scheduler) NewUser() uuid.UUID { + return uuid.New() } -func (T *Scheduler) RemoveUser(user uuid.UUID) { +func (T *Scheduler) DeleteUser(user uuid.UUID) { T.affinity.Delete(user) T.mu.RLock() diff --git a/lib/rob/schedulers/v2/scheduler_test.go b/lib/rob/schedulers/v2/scheduler_test.go index ef547a9bc2811eada9e340688e1063dc15d1327e..2ba0039df07af30a72fc010bf4db0742195d3dc9 100644 --- a/lib/rob/schedulers/v2/scheduler_test.go +++ b/lib/rob/schedulers/v2/scheduler_test.go @@ -35,14 +35,11 @@ func (T *ShareTable) Get(user int) int { } func testSink(sched *Scheduler) uuid.UUID { - id := uuid.New() - sched.AddWorker(id) - return id + return sched.NewWorker() } func testSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { - source := uuid.New() - sched.AddUser(source) + source := sched.NewUser() for { sink := sched.Acquire(source, rob.SyncModeTryNonBlocking) start := time.Now() @@ -54,8 +51,7 @@ func testSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { } func testMultiSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duration, num int) { - source := uuid.New() - sched.AddUser(source) + source := sched.NewUser() for i := 0; i < num; i++ { go func() { for { @@ -73,9 +69,8 @@ func testMultiSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duratio func testStarver(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { for { func() { - source := uuid.New() - sched.AddUser(source) - defer sched.RemoveUser(source) + source := sched.NewUser() + defer sched.DeleteUser(source) sink := sched.Acquire(source, rob.SyncModeTryNonBlocking) defer sched.Release(sink) @@ -376,7 +371,7 @@ func TestScheduler_RemoveSinkOuter(t *testing.T) { time.Sleep(10 * time.Second) - sched.RemoveWorker(toRemove) + sched.DeleteWorker(toRemove) time.Sleep(10 * time.Second) diff --git a/lib/util/maps/rwlocked.go b/lib/util/maps/rwlocked.go index 2be6f705e59704a04965cd17c23670779a33a627..5ec43d220be5fcf49ad54fd061afcad8695c5364 100644 --- a/lib/util/maps/rwlocked.go +++ b/lib/util/maps/rwlocked.go @@ -75,3 +75,19 @@ func (T *RWLocked[K, V]) Range(fn func(key K, value V) bool) bool { T.mu.RUnlock() return true } + +func (T *RWLocked[K, V]) Clear() { + T.mu.Lock() + defer T.mu.Unlock() + + for k := range T.inner { + delete(T.inner, k) + } +} + +func (T *RWLocked[K, V]) Len() int { + T.mu.RLock() + defer T.mu.RUnlock() + + return len(T.inner) +}