diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 3a0d542cf99a8e5f7361ee3f1c0a4e1611269aec..c7a479227eb5ac37b1aee05c1458a4cb0d06df78 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -3,10 +3,13 @@ package main import ( "io" "net" + "sync" "pggat2/lib/backend/backends/v0" "pggat2/lib/pnet" "pggat2/lib/pnet/packet" + "pggat2/lib/router" + "pggat2/lib/router/routers/v0" ) type testPacket struct { @@ -65,6 +68,25 @@ func (T *LogWriter) Write() packet.Out { var _ pnet.Writer = (*LogWriter)(nil) +func makeTestServer(r router.Router, wg *sync.WaitGroup) { + conn, err := net.Dial("tcp", "localhost:5432") + if err != nil { + panic(err) + } + server := backends.NewServer(conn) + if server == nil { + panic("failed to connect to server") + } + go func() { + handler := r.NewHandler(true) + for { + peer := handler.Next() + server.Handle(peer) + wg.Done() + } + }() +} + func main() { /* frontend, err := frontends.NewListener() @@ -76,14 +98,12 @@ func main() { panic(err) } */ - conn, err := net.Dial("tcp", "localhost:5432") - if err != nil { - panic(err) - } - server := backends.NewServer(conn) - if server == nil { - panic("failed to connect to server") - } + r := routers.MakeRouter() + var wg sync.WaitGroup + makeTestServer(&r, &wg) + // makeTestServer(&r, &wg) + + src := r.NewSource() readWriter := pnet.JoinedReadWriter{ Reader: &TestReader{ packets: []testPacket{ @@ -103,19 +123,10 @@ func main() { }, Writer: &LogWriter{}, } - perr := server.Transaction(readWriter) - if perr != nil { - panic(perr) - } - perr = server.Transaction(readWriter) - if perr != nil { - panic(perr) - } - perr = server.Transaction(readWriter) - if perr != nil { - panic(perr) - } + wg.Add(3) + src.Handle(readWriter, true) + src.Handle(readWriter, true) + src.Handle(readWriter, true) + wg.Wait() // log.Println(server) - _ = server - _ = conn.Close() } diff --git a/lib/backend/backends/v0/server.go b/lib/backend/backends/v0/server.go index 1526d42abd4f1b174079cf56335762877b9a4c78..109e8d1acd9506b36a65047609f9981b6a5b8945 100644 --- a/lib/backend/backends/v0/server.go +++ b/lib/backend/backends/v0/server.go @@ -486,8 +486,7 @@ func (T *Server) functionCall(peer pnet.ReadWriter, in packet.In) perror.Error { return nil } -// Transaction handles a transaction from peer, returning when the transaction is complete -func (T *Server) Transaction(peer pnet.ReadWriter) perror.Error { +func (T *Server) handle(peer pnet.ReadWriter) perror.Error { in, err := peer.Read() if err != nil { return perror.Wrap(err) @@ -506,4 +505,14 @@ func (T *Server) Transaction(peer pnet.ReadWriter) perror.Error { } } +// Handle handles a transaction from peer, returning when the transaction is complete +func (T *Server) Handle(peer pnet.ReadWriter) { + err := T.handle(peer) + if err != nil { + out := T.Write() + packets.WriteErrorResponse(out, err) + _ = out.Send() + } +} + var _ backend.Server = (*Server)(nil) diff --git a/lib/backend/server.go b/lib/backend/server.go index fccde9fdad560b9621d6d6b6798edc61f37d9699..43eeae04ba2f3ecd6266b281474e0086f25930a7 100644 --- a/lib/backend/server.go +++ b/lib/backend/server.go @@ -1,6 +1,8 @@ package backend -import "pggat2/lib/pnet" +import ( + "pggat2/lib/pnet" +) type Server interface { pnet.ReadWriter diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 9f61c98afd7f1b2bbcbbf930da490ff03ce4fb45..c28f49e73926258e0ad209c9f71662f86bd40984 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -13,20 +13,25 @@ type Scheduler struct { pool pool.Pool } -func NewScheduler() *Scheduler { - return &Scheduler{ +func MakeScheduler() Scheduler { + return Scheduler{ pool: pool.MakePool(), } } +func NewScheduler() *Scheduler { + scheduler := MakeScheduler() + return &scheduler +} + func (T *Scheduler) NewSink(fulfills rob.Constraints) rob.Sink { id := uuid.New() q := T.pool.NewQueue(id, fulfills) - return sink.NewSink(id, &T.pool, q) + return sink.MakeSink(id, &T.pool, q) } func (T *Scheduler) NewSource() rob.Source { - return source.NewSource(&T.pool) + return source.MakeSource(&T.pool) } var _ rob.Scheduler = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go index 01d3cb6d7669ead5bb13f478d86228998f015d1d..b786a88f6ddbf87eedcc2adb64c3b4f0b951d5de 100644 --- a/lib/rob/schedulers/v2/sink/sink.go +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -14,15 +14,15 @@ type Sink struct { queue *queue.Queue } -func NewSink(id uuid.UUID, p *pool.Pool, q *queue.Queue) *Sink { - return &Sink{ +func MakeSink(id uuid.UUID, p *pool.Pool, q *queue.Queue) Sink { + return Sink{ id: id, pool: p, queue: q, } } -func (T *Sink) findWork() { +func (T Sink) findWork() { T.pool.StealFor(T.id) // see if we stole some work if T.queue.HasWork() { @@ -32,7 +32,7 @@ func (T *Sink) findWork() { <-T.queue.Ready() } -func (T *Sink) Read() any { +func (T Sink) Read() any { for { v, ok := T.queue.Read() if ok { @@ -42,4 +42,4 @@ func (T *Sink) Read() any { } } -var _ rob.Sink = (*Sink)(nil) +var _ rob.Sink = Sink{} diff --git a/lib/rob/schedulers/v2/source/source.go b/lib/rob/schedulers/v2/source/source.go index bf73a1a722e30106d8446172258a1ea9304d49a8..ac70054c3831d0e1dc8d474728ac719a9e4b9a82 100644 --- a/lib/rob/schedulers/v2/source/source.go +++ b/lib/rob/schedulers/v2/source/source.go @@ -13,14 +13,14 @@ type Source struct { pool *pool.Pool } -func NewSource(p *pool.Pool) *Source { - return &Source{ +func MakeSource(p *pool.Pool) Source { + return Source{ uuid: uuid.New(), pool: p, } } -func (T *Source) Schedule(work any, constraints rob.Constraints) { +func (T Source) Schedule(work any, constraints rob.Constraints) { T.pool.Schedule(job.Job{ Source: T.uuid, Work: work, @@ -28,4 +28,4 @@ func (T *Source) Schedule(work any, constraints rob.Constraints) { }) } -var _ rob.Source = (*Source)(nil) +var _ rob.Source = Source{} diff --git a/lib/router/handler.go b/lib/router/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..dfff7210ae9265083f2a21c541297bd2d1a55fdc --- /dev/null +++ b/lib/router/handler.go @@ -0,0 +1,9 @@ +package router + +import ( + "pggat2/lib/pnet" +) + +type Handler interface { + Next() pnet.ReadWriter +} diff --git a/lib/router/router.go b/lib/router/router.go index f9865429ba9c78384e137f7ef6b6509aac408e73..adc58c805caee9a558c96d541d930882a83f2382 100644 --- a/lib/router/router.go +++ b/lib/router/router.go @@ -1,10 +1,6 @@ package router -import ( - "pggat2/lib/perror" - "pggat2/lib/pnet" -) - type Router interface { - Transaction(peer pnet.ReadWriter) perror.Error + NewHandler(write bool) Handler + NewSource() Source } diff --git a/lib/router/routers/v0/constraints.go b/lib/router/routers/v0/constraints.go new file mode 100644 index 0000000000000000000000000000000000000000..ef93ace40109b6eb5991c9b4da9f49bb6112c3b3 --- /dev/null +++ b/lib/router/routers/v0/constraints.go @@ -0,0 +1,18 @@ +package routers + +import "pggat2/lib/rob" + +const ( + writeConstraint rob.Constraints = 1 << iota +) + +func constraints(write bool) rob.Constraints { + var c rob.Constraints + if write { + c = rob.Constraints.All( + c, + writeConstraint, + ) + } + return c +} diff --git a/lib/router/routers/v0/handler.go b/lib/router/routers/v0/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..85820681f5715e205d93d4d6036a10e66dda0ce0 --- /dev/null +++ b/lib/router/routers/v0/handler.go @@ -0,0 +1,17 @@ +package routers + +import ( + "pggat2/lib/pnet" + "pggat2/lib/rob" + "pggat2/lib/router" +) + +type Handler struct { + sink rob.Sink +} + +func (T Handler) Next() pnet.ReadWriter { + return T.sink.Read().(pnet.ReadWriter) +} + +var _ router.Handler = Handler{} diff --git a/lib/router/routers/v0/router.go b/lib/router/routers/v0/router.go new file mode 100644 index 0000000000000000000000000000000000000000..d8ce48531e7e7e6fe6134c3b10a88edf9b090db4 --- /dev/null +++ b/lib/router/routers/v0/router.go @@ -0,0 +1,37 @@ +package routers + +import ( + "pggat2/lib/rob/schedulers/v2" + "pggat2/lib/router" +) + +type Router struct { + scheduler schedulers.Scheduler +} + +func MakeRouter() Router { + return Router{ + scheduler: schedulers.MakeScheduler(), + } +} + +func NewRouter() *Router { + r := MakeRouter() + return &r +} + +func (r *Router) NewHandler(write bool) router.Handler { + sink := r.scheduler.NewSink(constraints(write)) + return Handler{ + sink: sink, + } +} + +func (r *Router) NewSource() router.Source { + source := r.scheduler.NewSource() + return Source{ + source: source, + } +} + +var _ router.Router = (*Router)(nil) diff --git a/lib/router/routers/v0/source.go b/lib/router/routers/v0/source.go new file mode 100644 index 0000000000000000000000000000000000000000..3ef4a261c9052457f1b212d247188ff73a9db721 --- /dev/null +++ b/lib/router/routers/v0/source.go @@ -0,0 +1,17 @@ +package routers + +import ( + "pggat2/lib/pnet" + "pggat2/lib/rob" + "pggat2/lib/router" +) + +type Source struct { + source rob.Source +} + +func (T Source) Handle(peer pnet.ReadWriter, write bool) { + T.source.Schedule(peer, constraints(write)) +} + +var _ router.Source = Source{} diff --git a/lib/router/source.go b/lib/router/source.go new file mode 100644 index 0000000000000000000000000000000000000000..e542e51d8be9acaf0637a0477483a532083ccf8b --- /dev/null +++ b/lib/router/source.go @@ -0,0 +1,7 @@ +package router + +import "pggat2/lib/pnet" + +type Source interface { + Handle(peer pnet.ReadWriter, write bool) +}