diff --git a/lib/gat/configs/pgbouncer/config.go b/lib/gat/configs/pgbouncer/config.go index 98ca3fc78da6d5efae2b58e4fcd91ce6b098c429..1fb11220bf87ff730aa4776e6449f78f2db8b1bc 100644 --- a/lib/gat/configs/pgbouncer/config.go +++ b/lib/gat/configs/pgbouncer/config.go @@ -374,13 +374,13 @@ func (T *Config) ListenAndServe(pooler *gat.Pooler) error { listen := net.JoinHostPort(listenAddr, strconv.Itoa(T.PgBouncer.ListenPort)) - log.Println("listening on", listen) - listener, err := net.Listen("tcp", listen) if err != nil { return err } + log.Println("listening on", listen) + return pooler.ListenAndServe(listener) }) } diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go index 08f81ee16e85ebe6455b991b040467a7a79dc4c6..404a8ff73a7a15e198d9b58c5e7bb64b3e45e3a1 100644 --- a/lib/gat/pools/session/pool.go +++ b/lib/gat/pools/session/pool.go @@ -12,7 +12,6 @@ import ( "pggat2/lib/util/chans" "pggat2/lib/util/maps" "pggat2/lib/util/ring" - "pggat2/lib/util/strings" "pggat2/lib/zap" packets "pggat2/lib/zap/packets/v3.0" ) @@ -90,10 +89,9 @@ func (T *Pool) release(conn Conn) { T._release(conn.id) } -func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter, startupParameters map[string]string) { +func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter, _ map[string]string) { defer func() { _ = client.Close() - }() connOk := true @@ -110,28 +108,9 @@ func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter, startupParameters pkts := zap.NewPackets() defer pkts.Done() for key, value := range conn.initialParameters { - if _, ok := startupParameters[key]; ok { - continue - } - packet := zap.NewPacket() - packets.WriteParameterStatus(packet, key, value) - pkts.Append(packet) - } - - for key, value := range startupParameters { packet := zap.NewPacket() packets.WriteParameterStatus(packet, key, value) pkts.Append(packet) - - if current, ok := conn.initialParameters[key]; ok && current == value { - continue - } - - err := backends.QueryString(&backends.Context{}, conn.rw, "SET "+key+" = '"+strings.Escape(value, "'")+"'") - if err != nil { - connOk = false - return true - } } err := client.WriteV(pkts) diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go index 9052832126514ce92fadac406ce5c7b8993d9b97..12e09ba12197f998dbcbf79950fda60f066ee5bd 100644 --- a/lib/rob/schedulers/v1/pool/pool.go +++ b/lib/rob/schedulers/v1/pool/pool.go @@ -112,18 +112,20 @@ func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) uuid.UU id := uuid.New() s := sink.NewSink(id, constraints, worker) - T.mu.Lock() - // if mu is locked, we don't need to lock bmu, because we are the only accessor - T.sinks[id] = s - i := 0 - for _, v := range T.backlog { - if ok := s.ExecuteStalled(v); !ok { - T.backlog[i] = v - i++ + func() { + T.mu.Lock() + defer T.mu.Unlock() + // if mu is locked, we don't need to lock bmu, because we are the only accessor + T.sinks[id] = s + i := 0 + for _, v := range T.backlog { + if ok := s.ExecuteStalled(v); !ok { + T.backlog[i] = v + i++ + } } - } - T.backlog = T.backlog[:i] - T.mu.Unlock() + T.backlog = T.backlog[:i] + }() T.stealFor(id) @@ -156,14 +158,17 @@ func (T *Pool) GetIdleWorker() (id uuid.UUID, idleStart time.Time) { } func (T *Pool) RemoveWorker(id uuid.UUID) rob.Worker { - T.mu.Lock() - s, ok := T.sinks[id] + var s *sink.Sink + var ok bool + func() { + T.mu.Lock() + defer T.mu.Unlock() + s, ok = T.sinks[id] + delete(T.sinks, id) + }() if !ok { - T.mu.Unlock() return nil } - delete(T.sinks, id) - T.mu.Unlock() // now we need to reschedule all the work that was scheduled to s (stalled only). jobs := s.StealAll() @@ -207,9 +212,12 @@ func (T *Pool) stealFor(id uuid.UUID) { } func (T *Pool) Execute(id uuid.UUID, ctx *rob.Context, work any) { - T.mu.RLock() - s := T.sinks[id] - T.mu.RUnlock() + var s *sink.Sink + func() { + T.mu.RLock() + defer T.mu.RUnlock() + s = T.sinks[id] + }() hasMore := s.Execute(ctx, work) if ctx.Removed { @@ -236,14 +244,16 @@ func (T *Pool) ReadMetrics(metrics *rob.Metrics) { T.mu.RLock() defer T.mu.RUnlock() - T.bmu.Lock() - for _, j := range T.backlog { - metrics.Jobs[j.ID] = rob.JobMetrics{ - Created: j.Created, - Backlogged: true, + func() { + T.bmu.Lock() + defer T.bmu.Unlock() + for _, j := range T.backlog { + metrics.Jobs[j.ID] = rob.JobMetrics{ + Created: j.Created, + Backlogged: true, + } } - } - T.bmu.Unlock() + }() for _, worker := range T.sinks { worker.ReadMetrics(metrics) diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go index e92e6ac20dfc89f30f4fb9e1b662b15a950d76fd..682b36703a3413d582a906da07da8143a0fc26c6 100644 --- a/lib/rob/schedulers/v1/pool/sink/sink.go +++ b/lib/rob/schedulers/v1/pool/sink/sink.go @@ -83,17 +83,22 @@ func (T *Sink) ExecuteConcurrent(j job.Concurrent) (ok, hasMore bool) { return false, false } - T.mu.Lock() + var wasInUse bool - if T.active != uuid.Nil { - // this Sink is in use - T.mu.Unlock() - return false, false - } + func() { + T.mu.Lock() + defer T.mu.Unlock() - T.setActive(j.Source) + if T.active != uuid.Nil { + wasInUse = true + return + } - T.mu.Unlock() + T.setActive(j.Source) + }() + if wasInUse { + return false, false + } return true, T.Execute(j.Context, j.Work) } @@ -242,15 +247,16 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID { pending, _ := T.pending[source] delete(T.pending, source) - T.mu.Unlock() + func() { + T.mu.Unlock() + defer T.mu.Lock() - rhs.ExecuteStalled(j) - - for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() { rhs.ExecuteStalled(j) - } - T.mu.Lock() + for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() { + rhs.ExecuteStalled(j) + } + }() if pending != nil { if _, ok = T.pending[source]; !ok { diff --git a/lib/util/maps/rwlocked.go b/lib/util/maps/rwlocked.go index 7c8276c87257039c1fd203de08abf2fccf1efd4f..2be6f705e59704a04965cd17c23670779a33a627 100644 --- a/lib/util/maps/rwlocked.go +++ b/lib/util/maps/rwlocked.go @@ -63,6 +63,7 @@ func (T *RWLocked[K, V]) Swap(key K, value V) (previous V, loaded bool) { } func (T *RWLocked[K, V]) Range(fn func(key K, value V) bool) bool { + // this is ok because if fn panics the map will be unlocked T.mu.RLock() for k, v := range T.inner { T.mu.RUnlock() diff --git a/pgbouncer.ini b/pgbouncer.ini index 85f013d2d5270ee758ee6cac27c96267e2ff7ba5..793026571ec05377406670f32d5adb556b8970e4 100644 --- a/pgbouncer.ini +++ b/pgbouncer.ini @@ -1,5 +1,5 @@ [pgbouncer] -pool_mode = transaction +pool_mode = session auth_file = userlist.txt listen_addr = * @@ -7,5 +7,5 @@ listen_addr = * postgres = [databases] -uniswap = host=localhost +regression = host=localhost postgres = host=localhost