From 5e162efd47da18d5dbc8cdecf36397a4a6d3c866 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Mon, 15 May 2023 15:43:15 -0500 Subject: [PATCH] working --- cmd/cgat/main.go | 36 ++++++++++++------------- lib/rob/schedulers/v0/scheduler_test.go | 16 ++--------- 2 files changed, 19 insertions(+), 33 deletions(-) diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index b663dbff..82437d2d 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -4,6 +4,8 @@ import ( "net" "net/http" _ "net/http/pprof" + + "pggat2/lib/rob/schedulers/v0" "pggat2/lib/zap/onebuffer" "pggat2/lib/bouncer/backends/v0" @@ -17,27 +19,27 @@ import ( "pggat2/lib/zap/zio" ) -type job struct { - client zap.ReadWriter - done chan<- struct{} +type server struct { + rw zap.ReadWriter +} + +func (T server) Do(_ rob.Constraints, work any) { + client := work.(zap.ReadWriter) + bouncers.Bounce(client, T.rw) } +var _ rob.Worker = server{} + func testServer(r rob.Scheduler) { conn, err := net.Dial("tcp", "localhost:5432") if err != nil { panic(err) } - server := zio.MakeReadWriter(conn) - backends.Accept(&server) - sink := r.NewSink(0) - for { - j := sink.Read().(job) - bouncers.Bounce(j.client, &server) - select { - case j.done <- struct{}{}: - default: - } - } + rw := zio.MakeReadWriter(conn) + backends.Accept(&rw) + r.AddSink(0, server{ + rw: &rw, + }) } func main() { @@ -72,11 +74,7 @@ func main() { if err != nil { break } - source.Schedule(job{ - client: &mw, - done: done, - }, 0) - <-done + source.Do(0, &mw) } }() } diff --git a/lib/rob/schedulers/v0/scheduler_test.go b/lib/rob/schedulers/v0/scheduler_test.go index b2b0c60f..7d6608b2 100644 --- a/lib/rob/schedulers/v0/scheduler_test.go +++ b/lib/rob/schedulers/v0/scheduler_test.go @@ -215,16 +215,12 @@ func TestScheduler_StealBalanced(t *testing.T) { go testSource(sched, 1, 10*time.Millisecond, 0) go testSource(sched, 2, 10*time.Millisecond, 0) go testSource(sched, 3, 10*time.Millisecond, 0) - go testSource(sched, 4, 10*time.Millisecond, 0) - go testSource(sched, 5, 10*time.Millisecond, 0) time.Sleep(20 * time.Second) t0 := table.Get(0) t1 := table.Get(1) t2 := table.Get(2) t3 := table.Get(3) - t4 := table.Get(4) - t5 := table.Get(5) /* Expectations: @@ -235,10 +231,8 @@ func TestScheduler_StealBalanced(t *testing.T) { t.Log("share of 1:", t1) t.Log("share of 2:", t2) t.Log("share of 3:", t3) - t.Log("share of 4:", t4) - t.Log("share of 5:", t5) - if !similar(t0, t1, t2, t3, t4, t5) { + if !similar(t0, t1, t2, t3) { t.Error("expected all shares to be similar") } @@ -257,15 +251,11 @@ func TestScheduler_StealUnbalanced(t *testing.T) { go testSource(sched, 0, 10*time.Millisecond, 0) go testSource(sched, 1, 10*time.Millisecond, 0) go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) - go testSource(sched, 4, 10*time.Millisecond, 0) time.Sleep(20 * time.Second) t0 := table.Get(0) t1 := table.Get(1) t2 := table.Get(2) - t3 := table.Get(3) - t4 := table.Get(4) /* Expectations: @@ -275,10 +265,8 @@ func TestScheduler_StealUnbalanced(t *testing.T) { t.Log("share of 0:", t0) t.Log("share of 1:", t1) t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - t.Log("share of 4:", t4) - if !similar(t0, t1, t2, t3, t4) { + if !similar(t0, t1, t2) { t.Error("expected all shares to be similar") } -- GitLab