diff --git a/lib/fed/packets/v3.0/saslinitialresponse.go b/lib/fed/packets/v3.0/saslinitialresponse.go index 9a246659fc5e432f5bb944f4313b4957aa65605e..27fbbfa94d3fe267407986dfa7680c98aca70875 100644 --- a/lib/fed/packets/v3.0/saslinitialresponse.go +++ b/lib/fed/packets/v3.0/saslinitialresponse.go @@ -21,7 +21,7 @@ func (T *SASLInitialResponse) ReadFromPacket(packet fed.Packet) bool { p = p.ReadInt32(&initialResponseSize) T.InitialResponse = slices.Resize(T.InitialResponse, int(initialResponseSize)) - p = p.ReadBytes(T.InitialResponse[:]) + p = p.ReadBytes(T.InitialResponse) return true } diff --git a/lib/gat/modes/pgbouncer/pools.go b/lib/gat/modes/pgbouncer/pools.go index f4dc5224a2db40b002721e1ea9d16c0054751ead..636d621483b187776e3a782e756d8c059a1e2966 100644 --- a/lib/gat/modes/pgbouncer/pools.go +++ b/lib/gat/modes/pgbouncer/pools.go @@ -120,7 +120,6 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { if result.Password != nil { password = *result.Password - ok = true } } diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index a159a25188dab03b98d7a8cd36d7285c616e6730..83ce95780f6e6f33060f3b1bf05e260557af4bea 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -380,10 +380,10 @@ func (T *Pool) removeClient(clientID uuid.UUID) { } func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *poolServer) { - serverID = T.options.Pooler.AcquireConcurrent(clientID) + serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking) if serverID == uuid.Nil { go T.scaleUp() - serverID = T.options.Pooler.AcquireAsync(clientID) + serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking) } T.mu.Lock() diff --git a/lib/gat/pool/pooler.go b/lib/gat/pool/pooler.go index 27e39a7e9f86b19be63cac445f0ae51a3fcb7b0f..384942673013a114ad25446e7d0d24508431a37f 100644 --- a/lib/gat/pool/pooler.go +++ b/lib/gat/pool/pooler.go @@ -1,6 +1,17 @@ package pool -import "github.com/google/uuid" +import ( + "github.com/google/uuid" +) + +type SyncMode int + +const ( + // SyncModeNonBlocking will obtain a server without blocking + SyncModeNonBlocking SyncMode = iota + // SyncModeBlocking will obtain a server by stalling + SyncModeBlocking +) type Pooler interface { AddClient(client uuid.UUID) @@ -9,12 +20,8 @@ type Pooler interface { AddServer(server uuid.UUID) RemoveServer(server uuid.UUID) - // AcquireConcurrent tries to acquire a peer for the client without stalling. - // Returns uuid.Nil if no peer can be acquired - AcquireConcurrent(client uuid.UUID) uuid.UUID - - // AcquireAsync will stall until a peer is available. - AcquireAsync(client uuid.UUID) uuid.UUID + // Acquire a peer with SyncMode + Acquire(client uuid.UUID, sync SyncMode) uuid.UUID // ReleaseAfterTransaction queries whether servers should be immediately released after a transaction is completed. ReleaseAfterTransaction() bool diff --git a/lib/gat/pool/pools/session/pooler.go b/lib/gat/pool/pools/session/pooler.go index 7d8bb9daed7d424e4582edcb25baa2406140d8e1..c8fffbc2dd28c43dfcae454094ef1b7f0eccf1a9 100644 --- a/lib/gat/pool/pools/session/pooler.go +++ b/lib/gat/pool/pools/session/pooler.go @@ -50,7 +50,7 @@ func (T *Pooler) RemoveServer(server uuid.UUID) { delete(T.servers, server) } -func (T *Pooler) AcquireConcurrent(_ uuid.UUID) uuid.UUID { +func (T *Pooler) TryAcquire() uuid.UUID { T.mu.Lock() defer T.mu.Unlock() @@ -63,7 +63,7 @@ func (T *Pooler) AcquireConcurrent(_ uuid.UUID) uuid.UUID { return server } -func (T *Pooler) AcquireAsync(_ uuid.UUID) uuid.UUID { +func (T *Pooler) AcquireBlocking() uuid.UUID { T.mu.Lock() defer T.mu.Unlock() @@ -79,6 +79,17 @@ func (T *Pooler) AcquireAsync(_ uuid.UUID) uuid.UUID { return server } +func (T *Pooler) Acquire(_ uuid.UUID, mode pool.SyncMode) uuid.UUID { + switch mode { + case pool.SyncModeBlocking: + return T.TryAcquire() + case pool.SyncModeNonBlocking: + return T.AcquireBlocking() + default: + return uuid.Nil + } +} + func (*Pooler) ReleaseAfterTransaction() bool { // servers are released when the client is removed return false diff --git a/lib/gat/pool/pools/transaction/pooler.go b/lib/gat/pool/pools/transaction/pooler.go index 00350f40a204d649e5bc3986d832257f5f68bdae..33882560f657117a40e6263c9521df2ffe6ce4cc 100644 --- a/lib/gat/pool/pools/transaction/pooler.go +++ b/lib/gat/pool/pools/transaction/pooler.go @@ -4,6 +4,7 @@ import ( "github.com/google/uuid" "pggat2/lib/gat/pool" + "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v2" ) @@ -27,12 +28,15 @@ func (T *Pooler) RemoveServer(server uuid.UUID) { T.s.RemoveWorker(server) } -func (T *Pooler) AcquireConcurrent(client uuid.UUID) uuid.UUID { - return T.s.AcquireConcurrent(client) -} - -func (T *Pooler) AcquireAsync(client uuid.UUID) uuid.UUID { - return T.s.AcquireAsync(client) +func (T *Pooler) Acquire(client uuid.UUID, sync pool.SyncMode) uuid.UUID { + switch sync { + case pool.SyncModeNonBlocking: + return T.s.Acquire(client, rob.SyncModeNonBlocking) + case pool.SyncModeBlocking: + return T.s.Acquire(client, rob.SyncModeBlocking) + default: + return uuid.Nil + } } func (*Pooler) ReleaseAfterTransaction() bool { diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index 282adf21ee02dac42ca5ab794b218f16a564cd6a..ae057f611d0d9b771c95962c9fd9d1aa2ffa2ccd 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -4,6 +4,17 @@ import ( "github.com/google/uuid" ) +type SyncMode int + +const ( + // SyncModeNonBlocking will attempt to acquire a worker without blocking + SyncModeNonBlocking SyncMode = iota + // SyncModeBlocking will block to acquire a worker + SyncModeBlocking + // SyncModeTryNonBlocking will attempt to acquire without blocking first, then fallback to blocking if none were available + SyncModeTryNonBlocking +) + type Scheduler interface { AddWorker(worker uuid.UUID) RemoveWorker(worker uuid.UUID) @@ -11,21 +22,10 @@ type Scheduler interface { AddUser(user uuid.UUID) RemoveUser(user uuid.UUID) - // AcquireConcurrent tries to acquire a peer for the user without stalling. - // Returns uuid.Nil if no peer can be acquired - AcquireConcurrent(user uuid.UUID) uuid.UUID - // AcquireAsync will stall until a peer is available - AcquireAsync(user uuid.UUID) uuid.UUID + // Acquire will acquire a worker with the desired SyncMode + Acquire(user uuid.UUID, sync SyncMode) uuid.UUID // Release will release a worker. // This should be called after acquire unless the worker is removed with RemoveWorker Release(worker uuid.UUID) } - -func Acquire(scheduler Scheduler, user uuid.UUID) uuid.UUID { - if s := scheduler.AcquireConcurrent(user); s != uuid.Nil { - return s - } - - return scheduler.AcquireAsync(user) -} diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 76ac15a97be452268a4188e94b9071640e391748..e7dc0d57f0aea07869ac3f7fe64d3e51c681db76 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -90,7 +90,7 @@ func (T *Scheduler) RemoveUser(user uuid.UUID) { } } -func (T *Scheduler) Acquire(j job.Concurrent) uuid.UUID { +func (T *Scheduler) TryAcquire(j job.Concurrent) uuid.UUID { affinity, _ := T.affinity.Load(j.User) // these can be unlocked and locked a bunch here because it is less bad if ExecuteConcurrent misses a sink @@ -123,12 +123,6 @@ func (T *Scheduler) Acquire(j job.Concurrent) uuid.UUID { return uuid.Nil } -func (T *Scheduler) AcquireConcurrent(user uuid.UUID) uuid.UUID { - return T.Acquire(job.Concurrent{ - User: user, - }) -} - func (T *Scheduler) Enqueue(j job.Stalled) { affinity, _ := T.affinity.Load(j.User) @@ -157,21 +151,35 @@ func (T *Scheduler) Enqueue(j job.Stalled) { T.backlog = append(T.backlog, j) } -func (T *Scheduler) AcquireAsync(user uuid.UUID) uuid.UUID { - ready, ok := T.ready.Get() - if !ok { - ready = make(chan uuid.UUID, 1) - } - defer T.ready.Put(ready) - - j := job.Stalled{ - Concurrent: job.Concurrent{ +func (T *Scheduler) Acquire(user uuid.UUID, mode rob.SyncMode) uuid.UUID { + switch mode { + case rob.SyncModeNonBlocking: + return T.TryAcquire(job.Concurrent{ User: user, - }, - Ready: ready, + }) + case rob.SyncModeBlocking: + ready, ok := T.ready.Get() + if !ok { + ready = make(chan uuid.UUID, 1) + } + defer T.ready.Put(ready) + + j := job.Stalled{ + Concurrent: job.Concurrent{ + User: user, + }, + Ready: ready, + } + T.Enqueue(j) + return <-ready + case rob.SyncModeTryNonBlocking: + if id := T.Acquire(user, rob.SyncModeNonBlocking); id != uuid.Nil { + return id + } + return T.Acquire(user, rob.SyncModeBlocking) + default: + return uuid.Nil } - T.Enqueue(j) - return <-ready } func (T *Scheduler) Release(worker uuid.UUID) { diff --git a/lib/rob/schedulers/v2/scheduler_test.go b/lib/rob/schedulers/v2/scheduler_test.go index f21e39d5790d84edec9c92404ea36eb57bd30173..ef547a9bc2811eada9e340688e1063dc15d1327e 100644 --- a/lib/rob/schedulers/v2/scheduler_test.go +++ b/lib/rob/schedulers/v2/scheduler_test.go @@ -44,7 +44,7 @@ func testSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { source := uuid.New() sched.AddUser(source) for { - sink := rob.Acquire(sched, source) + sink := sched.Acquire(source, rob.SyncModeTryNonBlocking) start := time.Now() for time.Since(start) < dur { } @@ -59,7 +59,7 @@ func testMultiSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duratio for i := 0; i < num; i++ { go func() { for { - sink := rob.Acquire(sched, source) + sink := sched.Acquire(source, rob.SyncModeTryNonBlocking) start := time.Now() for time.Since(start) < dur { } @@ -77,7 +77,7 @@ func testStarver(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { sched.AddUser(source) defer sched.RemoveUser(source) - sink := rob.Acquire(sched, source) + sink := sched.Acquire(source, rob.SyncModeTryNonBlocking) defer sched.Release(sink) start := time.Now() for time.Since(start) < dur { diff --git a/lib/util/ring/ring_test.go b/lib/util/ring/ring_test.go index 8c4d80c30428cf4a7e70e90a16ca9d774c3c266c..84149fcce67c5f619364f9e6bf33a28723e87092 100644 --- a/lib/util/ring/ring_test.go +++ b/lib/util/ring/ring_test.go @@ -263,27 +263,13 @@ func BenchmarkFIFO_Ring(b *testing.B) { } for i := 0; i < b.N; i++ { - ring.PushBack(1) - ring.PushBack(2) - ring.PushBack(3) - ring.PushBack(4) - ring.PushBack(5) - ring.PushBack(6) - ring.PushBack(7) - ring.PushBack(8) - ring.PushBack(9) - ring.PushBack(10) - - assert(1) - assert(2) - assert(3) - assert(4) - assert(5) - assert(6) - assert(7) - assert(8) - assert(9) - assert(10) + for j := 0; j < 10; j++ { + ring.PushBack(j) + } + + for j := 0; j < 10; j++ { + assert(j) + } } } @@ -300,37 +286,14 @@ func BenchmarkFIFO_StdRing(b *testing.B) { } for i := 0; i < b.N; i++ { - ring.Value = 1 - ring = ring.Next() - ring.Value = 2 - ring = ring.Next() - ring.Value = 3 - ring = ring.Next() - ring.Value = 4 - ring = ring.Next() - ring.Value = 5 - ring = ring.Next() - ring.Value = 6 - ring = ring.Next() - ring.Value = 7 - ring = ring.Next() - ring.Value = 8 - ring = ring.Next() - ring.Value = 9 - ring = ring.Next() - ring.Value = 10 - ring = ring.Next() - - assert(1) - assert(2) - assert(3) - assert(4) - assert(5) - assert(6) - assert(7) - assert(8) - assert(9) - assert(10) + for j := 0; j < 10; j++ { + ring.Value = j + ring = ring.Next() + } + + for j := 0; j < 10; j++ { + assert(j) + } } } @@ -352,28 +315,14 @@ func BenchmarkFIFO_Slice(b *testing.B) { for i := 0; i < b.N; i++ { // pushing is easy for slices - slice = append(slice, 1) - slice = append(slice, 2) - slice = append(slice, 3) - slice = append(slice, 4) - slice = append(slice, 5) - slice = append(slice, 6) - slice = append(slice, 7) - slice = append(slice, 8) - slice = append(slice, 9) - slice = append(slice, 10) + for j := 0; j < 10; j++ { + slice = append(slice, j) + } // popping is a bit more complicated - assert(1) - assert(2) - assert(3) - assert(4) - assert(5) - assert(6) - assert(7) - assert(8) - assert(9) - assert(10) + for j := 0; j < 10; j++ { + assert(j) + } } } @@ -394,28 +343,14 @@ func BenchmarkFIFO_Slice2(b *testing.B) { for i := 0; i < b.N; i++ { // pushing is easy for slices - slice = append(slice, 1) - slice = append(slice, 2) - slice = append(slice, 3) - slice = append(slice, 4) - slice = append(slice, 5) - slice = append(slice, 6) - slice = append(slice, 7) - slice = append(slice, 8) - slice = append(slice, 9) - slice = append(slice, 10) + for j := 0; j < 10; j++ { + slice = append(slice, j) + } // popping is a bit more complicated - assert(1) - assert(2) - assert(3) - assert(4) - assert(5) - assert(6) - assert(7) - assert(8) - assert(9) - assert(10) + for j := 0; j < 10; j++ { + assert(j) + } } } @@ -435,26 +370,12 @@ func BenchmarkFIFO_Channel(b *testing.B) { for i := 0; i < b.N; i++ { // channel is the easiest interface by far - channel <- 1 - channel <- 2 - channel <- 3 - channel <- 4 - channel <- 5 - channel <- 6 - channel <- 7 - channel <- 8 - channel <- 9 - channel <- 10 - - assert(1) - assert(2) - assert(3) - assert(4) - assert(5) - assert(6) - assert(7) - assert(8) - assert(9) - assert(10) + for j := 0; j < 10; j++ { + channel <- j + } + + for j := 0; j < 10; j++ { + assert(j) + } } }