diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 8f44a7e0444630ef697d911043f0d122230f1bfc..c063337b5c8e343d91b7b16164fa9dcf2166e25e 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -7,7 +7,8 @@ import ( "time" "pggat2/lib/gat" - "pggat2/lib/gat/pools/session" + "pggat2/lib/gat/pools/transaction" + "pggat2/lib/rob" ) func main() { @@ -24,7 +25,7 @@ func main() { pooler.AddUser("postgres", postgres) // create pool - rawPool := session.NewPool() + rawPool := transaction.NewPool() pool := gat.NewPool(rawPool) postgres.AddPool("uniswap", pool) pool.AddRecipe("localhost", gat.TCPRecipe{ @@ -36,26 +37,26 @@ func main() { MaxConnections: 5, }) + go func() { + var metrics rob.Metrics + + for { + time.Sleep(1 * time.Second) + rawPool.ReadSchedulerMetrics(&metrics) + log.Println(metrics.String()) + } + }() /* go func() { - var metrics rob.Metrics + var metrics session.Metrics for { time.Sleep(1 * time.Second) - rawPool.ReadSchedulerMetrics(&metrics) + rawPool.ReadMetrics(&metrics) log.Println(metrics.String()) } }() */ - go func() { - var metrics session.Metrics - - for { - time.Sleep(1 * time.Second) - rawPool.ReadMetrics(&metrics) - log.Println(metrics.String()) - } - }() log.Println("Listening on :6432") diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go index adb2fe43b819a6acf3ae4375fbc776cd15d37c3c..05ef912541f8f9a0e006310e8b811ea02742fb5a 100644 --- a/lib/gat/pools/transaction/pool.go +++ b/lib/gat/pools/transaction/pool.go @@ -89,12 +89,28 @@ func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) { func (T *Pool) ScaleDown(amount int) (remaining int) { remaining = amount - // TODO(garet) + + for i := 0; i < amount; i++ { + id, _ := T.s.GetIdleWorker() + if id == uuid.Nil { + break + } + worker := T.s.RemoveWorker(id) + if worker == nil { + i-- + continue + } + conn := worker.(*Conn) + _ = conn.rw.Close() + remaining-- + } + return } func (T *Pool) IdleSince() time.Time { - return time.Time{} + _, idle := T.s.GetIdleWorker() + return idle } func (T *Pool) ReadSchedulerMetrics(metrics *rob.Metrics) { diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index 22f6c2c9fb3cd534ea5fe8f6b0813fba5f9a7ab8..be8926eab0289c66d9ef385903d3f0b7870c28ba 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -1,10 +1,15 @@ package rob -import "github.com/google/uuid" +import ( + "time" + + "github.com/google/uuid" +) type Scheduler interface { AddWorker(constraints Constraints, worker Worker) uuid.UUID GetWorker(id uuid.UUID) Worker + GetIdleWorker() (uuid.UUID, time.Time) RemoveWorker(id uuid.UUID) Worker WorkerCount() int diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go index 42200abb70000f8c32cb651804856cfd79b2e959..997e8634443bb12966301529baa63dffffc6e3b1 100644 --- a/lib/rob/schedulers/v1/pool/pool.go +++ b/lib/rob/schedulers/v1/pool/pool.go @@ -2,6 +2,7 @@ package pool import ( "sync" + "time" "github.com/google/uuid" @@ -137,6 +138,21 @@ func (T *Pool) GetWorker(id uuid.UUID) rob.Worker { return s.GetWorker() } +func (T *Pool) GetIdleWorker() (id uuid.UUID, idleStart time.Time) { + T.mu.RLock() + defer T.mu.RUnlock() + + for i, s := range T.sinks { + start := s.IdleStart() + if idleStart == (time.Time{}) || start.Before(idleStart) { + idleStart = start + id = i + } + } + + return +} + func (T *Pool) RemoveWorker(id uuid.UUID) rob.Worker { T.mu.Lock() s, ok := T.sinks[id] diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go index d5d3686624a2790d379db58d58b4e91a7ed2be40..e92e6ac20dfc89f30f4fb9e1b662b15a950d76fd 100644 --- a/lib/rob/schedulers/v1/pool/sink/sink.go +++ b/lib/rob/schedulers/v1/pool/sink/sink.go @@ -50,6 +50,16 @@ func NewSink(id uuid.UUID, constraints rob.Constraints, worker rob.Worker) *Sink } } +func (T *Sink) IdleStart() time.Time { + T.mu.Lock() + defer T.mu.Unlock() + if T.active != uuid.Nil { + return time.Time{} + } + + return T.start +} + func (T *Sink) GetWorker() rob.Worker { return T.worker } diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go index 608704d859702e49dc769f1539c9c6099700e458..ed32e815d96d7ef4c309329d08906fdaed078dac 100644 --- a/lib/rob/schedulers/v1/scheduler.go +++ b/lib/rob/schedulers/v1/scheduler.go @@ -1,6 +1,8 @@ package schedulers import ( + "time" + "github.com/google/uuid" "pggat2/lib/rob" @@ -31,6 +33,10 @@ func (T *Scheduler) GetWorker(id uuid.UUID) rob.Worker { return T.pool.GetWorker(id) } +func (T *Scheduler) GetIdleWorker() (uuid.UUID, time.Time) { + return T.pool.GetIdleWorker() +} + func (T *Scheduler) RemoveWorker(id uuid.UUID) rob.Worker { return T.pool.RemoveWorker(id) }