From 11ce44af7d57ab18d0052beff35f4f415a2d8e76 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Sun, 17 Sep 2023 19:25:51 -0500 Subject: [PATCH] fix --- lib/gat/pool/client.go | 3 -- lib/gat/pool/conn.go | 3 +- lib/gat/pool/pool.go | 57 ++++++++++++++---------- lib/gat/pool/pooler.go | 4 +- lib/gat/pool/pools/session/pooler.go | 10 +---- lib/gat/pool/pools/transaction/pooler.go | 8 ++-- lib/gat/pool/server.go | 4 -- lib/rob/scheduler.go | 4 +- lib/rob/schedulers/v2/scheduler.go | 12 ++--- 9 files changed, 48 insertions(+), 57 deletions(-) diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go index 5a91d3f6..d15496df 100644 --- a/lib/gat/pool/client.go +++ b/lib/gat/pool/client.go @@ -1,8 +1,6 @@ package pool import ( - "github.com/google/uuid" - "pggat/lib/fed" "pggat/lib/middleware" "pggat/lib/middleware/interceptor" @@ -50,7 +48,6 @@ func NewClient( return &Client{ Conn: MakeConn( - uuid.New(), conn, initialParameters, backendKey, diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go index 1a9e6b0f..b314ab46 100644 --- a/lib/gat/pool/conn.go +++ b/lib/gat/pool/conn.go @@ -38,13 +38,12 @@ type Conn struct { } func MakeConn( - id uuid.UUID, conn fed.Conn, initialParameters map[strutil.CIString]string, backendKey [8]byte, ) Conn { return Conn{ - id: id, + id: uuid.New(), conn: conn, rw: conn, initialParameters: initialParameters, diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 161cdeb4..d1e56823 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -212,33 +212,40 @@ func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error { return err } - T.mu.Lock() - defer T.mu.Unlock() - if T.recipes[name] != r { - // recipe was removed - r.Free() - return errors.New("recipe was removed") - } + server, err := func() (*Server, error) { + T.mu.Lock() + defer T.mu.Unlock() + if T.recipes[name] != r { + // recipe was removed + r.Free() + return nil, errors.New("recipe was removed") + } - id := T.options.Pooler.NewServer() - server := NewServer( - T.options, - id, - name, - conn, - params.InitialParameters, - params.BackendKey, - ) + server := NewServer( + T.options, + name, + conn, + params.InitialParameters, + params.BackendKey, + ) - if T.servers == nil { - T.servers = make(map[uuid.UUID]*Server) - } - T.servers[id] = server + if T.servers == nil { + T.servers = make(map[uuid.UUID]*Server) + } + T.servers[server.GetID()] = server - if T.serversByRecipe == nil { - T.serversByRecipe = make(map[string][]*Server) + if T.serversByRecipe == nil { + T.serversByRecipe = make(map[string][]*Server) + } + T.serversByRecipe[name] = append(T.serversByRecipe[name], server) + return server, nil + }() + + if err != nil { + return err } - T.serversByRecipe[name] = append(T.serversByRecipe[name], server) + + T.options.Pooler.AddServer(server.GetID()) return nil } @@ -273,7 +280,7 @@ func (T *Pool) acquireServer(client *Client) *Server { server, ok := T.servers[serverID] T.mu.RUnlock() if !ok { - T.options.Pooler.Release(serverID) + T.options.Pooler.DeleteServer(serverID) continue } return server @@ -416,6 +423,7 @@ func (T *Pool) addClient(client *Client) { T.clientsByKey = make(map[[8]byte]*Client) } T.clientsByKey[client.GetBackendKey()] = client + T.options.Pooler.AddClient(client.GetID()) } func (T *Pool) removeClient(client *Client) { @@ -426,6 +434,7 @@ func (T *Pool) removeClient(client *Client) { } func (T *Pool) removeClientL1(client *Client) { + T.options.Pooler.DeleteClient(client.GetID()) _ = client.conn.Close() delete(T.clients, client.GetID()) delete(T.clientsByKey, client.GetBackendKey()) diff --git a/lib/gat/pool/pooler.go b/lib/gat/pool/pooler.go index a8d9d506..2133ce11 100644 --- a/lib/gat/pool/pooler.go +++ b/lib/gat/pool/pooler.go @@ -10,10 +10,10 @@ const ( ) type Pooler interface { - NewClient() uuid.UUID + AddClient(id uuid.UUID) DeleteClient(client uuid.UUID) - NewServer() uuid.UUID + AddServer(id uuid.UUID) DeleteServer(server uuid.UUID) Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) diff --git a/lib/gat/pool/pools/session/pooler.go b/lib/gat/pool/pools/session/pooler.go index a8c8cc12..f322a828 100644 --- a/lib/gat/pool/pools/session/pooler.go +++ b/lib/gat/pool/pools/session/pooler.go @@ -16,17 +16,13 @@ type Pooler struct { mu sync.Mutex } -func (*Pooler) NewClient() uuid.UUID { - return uuid.New() -} +func (*Pooler) AddClient(_ uuid.UUID) {} func (*Pooler) DeleteClient(_ uuid.UUID) { // nothing to do } -func (T *Pooler) NewServer() uuid.UUID { - server := uuid.New() - +func (T *Pooler) AddServer(server uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() @@ -40,8 +36,6 @@ func (T *Pooler) NewServer() uuid.UUID { if T.ready != nil { T.ready.Signal() } - - return server } func (T *Pooler) DeleteServer(server uuid.UUID) { diff --git a/lib/gat/pool/pools/transaction/pooler.go b/lib/gat/pool/pools/transaction/pooler.go index c5c0f1ae..5774a26d 100644 --- a/lib/gat/pool/pools/transaction/pooler.go +++ b/lib/gat/pool/pools/transaction/pooler.go @@ -12,16 +12,16 @@ type Pooler struct { s schedulers.Scheduler } -func (T *Pooler) NewClient() uuid.UUID { - return T.s.NewUser() +func (T *Pooler) AddClient(client uuid.UUID) { + T.s.AddUser(client) } func (T *Pooler) DeleteClient(client uuid.UUID) { T.s.DeleteUser(client) } -func (T *Pooler) NewServer() uuid.UUID { - return T.s.NewWorker() +func (T *Pooler) AddServer(server uuid.UUID) { + T.s.AddWorker(server) } func (T *Pooler) DeleteServer(server uuid.UUID) { diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go index a051085d..1aa43342 100644 --- a/lib/gat/pool/server.go +++ b/lib/gat/pool/server.go @@ -1,8 +1,6 @@ package pool import ( - "github.com/google/uuid" - "pggat/lib/fed" "pggat/lib/middleware" "pggat/lib/middleware/interceptor" @@ -22,7 +20,6 @@ type Server struct { func NewServer( options Options, - id uuid.UUID, recipe string, conn fed.Conn, initialParameters map[strutil.CIString]string, @@ -53,7 +50,6 @@ func NewServer( return &Server{ Conn: MakeConn( - id, conn, initialParameters, backendKey, diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index 34d65a77..1cef414e 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -16,10 +16,10 @@ const ( ) type Scheduler interface { - NewWorker() uuid.UUID + AddWorker(id uuid.UUID) DeleteWorker(worker uuid.UUID) - NewUser() uuid.UUID + AddUser(id uuid.UUID) DeleteUser(user uuid.UUID) // Acquire will acquire a worker with the desired SyncMode diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index e4a4c4fe..47ebde77 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -23,9 +23,7 @@ type Scheduler struct { mu sync.RWMutex } -func (T *Scheduler) NewWorker() uuid.UUID { - worker := uuid.New() - +func (T *Scheduler) AddWorker(worker uuid.UUID) { s := sink.NewSink(worker) T.mu.Lock() @@ -39,11 +37,11 @@ func (T *Scheduler) NewWorker() uuid.UUID { if len(T.backlog) > 0 { s.Enqueue(T.backlog...) T.backlog = T.backlog[:0] - return worker + return } T.stealFor(worker) - return worker + return } func (T *Scheduler) DeleteWorker(worker uuid.UUID) { @@ -67,9 +65,7 @@ func (T *Scheduler) DeleteWorker(worker uuid.UUID) { } } -func (*Scheduler) NewUser() uuid.UUID { - return uuid.New() -} +func (*Scheduler) AddUser(_ uuid.UUID) {} func (T *Scheduler) DeleteUser(user uuid.UUID) { T.affinity.Delete(user) -- GitLab