diff --git a/go.mod b/go.mod index 1144fd2bb3f0b0daa982391502c9440dd8da0828..fb269eef8729a88364c88775b2321b5a09f0bc7a 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/BurntSushi/toml v1.2.0 github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20220911224424-aa1f1f12a846 github.com/auxten/postgresql-parser v1.0.1 + github.com/google/uuid v1.1.2 github.com/iancoleman/strcase v0.2.0 github.com/jackc/pgx/v5 v5.2.0 github.com/looplab/fsm v0.3.0 diff --git a/go.sum b/go.sum index ed85b7e29463796ca93c609ff0df72f3d5d60247..b1d67f113c6d4175f84fc64fc4d2f6456ce1a870 100644 --- a/go.sum +++ b/go.sum @@ -205,6 +205,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/lib/gat2/balancer/balancer.go b/lib/gat2/balancer/balancer.go deleted file mode 100644 index 9eaa440d5e4337beab1db87030bb10cb70447c2f..0000000000000000000000000000000000000000 --- a/lib/gat2/balancer/balancer.go +++ /dev/null @@ -1,6 +0,0 @@ -package balancer - -// Balancer is the frontend that listens for clients and accepts them, routing them to the correct pool -type Balancer interface { - Run() error -} diff --git a/lib/gat2/client/acceptor.go b/lib/gat2/client/acceptor.go deleted file mode 100644 index 0eeddadce038e42abb16fa7b2151ddcb7d00280d..0000000000000000000000000000000000000000 --- a/lib/gat2/client/acceptor.go +++ /dev/null @@ -1,13 +0,0 @@ -package client - -import "net" - -type Acceptor interface { - Accept(net.Conn) (Client, error) -} - -type AcceptorFunc func(net.Conn) (Client, error) - -func (T AcceptorFunc) Accept(conn net.Conn) (Client, error) { - return T(conn) -} diff --git a/lib/gat2/client/client.go b/lib/gat2/client/client.go deleted file mode 100644 index f64d0522a3f3bdcc19a3f818ef07a48b80765083..0000000000000000000000000000000000000000 --- a/lib/gat2/client/client.go +++ /dev/null @@ -1,5 +0,0 @@ -package client - -type Client interface { - Id() int -} diff --git a/lib/gat2/frontend/frontend.go b/lib/gat2/frontend/frontend.go deleted file mode 100644 index 8d5bc4a4dce44d6b12d91562a8a5e262bae1fa00..0000000000000000000000000000000000000000 --- a/lib/gat2/frontend/frontend.go +++ /dev/null @@ -1,7 +0,0 @@ -package frontend - -// Frontend handles -type Frontend interface { - // Run the frontend, awaiting new conns and - Run() error -} diff --git a/lib/gat2/pool/pool.go b/lib/gat2/pool/pool.go deleted file mode 100644 index b52b1ef0f09e494c087dbb8328cd4d0eb0f794ca..0000000000000000000000000000000000000000 --- a/lib/gat2/pool/pool.go +++ /dev/null @@ -1,4 +0,0 @@ -package pool - -type Pool interface { -} diff --git a/lib/gat2/pool/pools/session/v1/pool.go b/lib/gat2/pool/pools/session/v1/pool.go deleted file mode 100644 index dcd447f39416ef2ec02bd7a3d341bbe2b495e7bf..0000000000000000000000000000000000000000 --- a/lib/gat2/pool/pools/session/v1/pool.go +++ /dev/null @@ -1,7 +0,0 @@ -package session - -import "gfx.cafe/gfx/pggat/lib/gat2/pool" - -type Pool struct{} - -var _ pool.Pool = (*Pool)(nil) diff --git a/lib/gat2/pool/pools/transaction/v1/pool.go b/lib/gat2/pool/pools/transaction/v1/pool.go deleted file mode 100644 index a059718eb1abaa90cddf6d21c142da43876bfeeb..0000000000000000000000000000000000000000 --- a/lib/gat2/pool/pools/transaction/v1/pool.go +++ /dev/null @@ -1,7 +0,0 @@ -package v1 - -import "gfx.cafe/gfx/pggat/lib/gat2/pool" - -type Pool struct{} - -var _ pool.Pool = (*Pool)(nil) diff --git a/lib/gat2/pools/session.go b/lib/gat2/pools/session.go new file mode 100644 index 0000000000000000000000000000000000000000..9573007bec42c8462418d0396153c99230db1d1d --- /dev/null +++ b/lib/gat2/pools/session.go @@ -0,0 +1,97 @@ +package pools + +import ( + "sync" + + "github.com/google/uuid" + + "gfx.cafe/gfx/pggat/lib/gat2" + "gfx.cafe/gfx/pggat/lib/util/iter" +) + +type Session struct { + id uuid.UUID + + free []gat2.Sink + inuse map[uuid.UUID]gat2.Sink + mu sync.RWMutex +} + +func NewSession(sinks []gat2.Sink) *Session { + return &Session{ + id: uuid.New(), + + free: sinks, + inuse: make(map[uuid.UUID]gat2.Sink), + } +} + +func (T *Session) ID() uuid.UUID { + return T.id +} + +func (T *Session) bySource(src gat2.Source) gat2.Sink { + T.mu.RLock() + defer T.mu.RUnlock() + sink, _ := T.inuse[src.ID()] + return sink +} + +func (T *Session) cleanup(src gat2.Source) { + id := src.ID() + + T.mu.Lock() + defer T.mu.Unlock() + + sink := T.inuse[id] + delete(T.inuse, id) + T.free = append(T.free, sink) +} + +func (T *Session) assign(src gat2.Source) gat2.Sink { + T.mu.Lock() + defer T.mu.Unlock() + + if len(T.free) > 0 { + // just grab free + sink := T.free[len(T.free)-1] + T.free = T.free[:len(T.free)-1] + T.inuse[src.ID()] = sink + + return sink + } + + return nil +} + +func (T *Session) Route(w gat2.Work) iter.Iter[chan<- gat2.Work] { + src := w.Source() + sink := T.bySource(src) + if sink != nil { + return sink.Route(w) + } + sink = T.assign(src) + if sink != nil { + return sink.Route(w) + } + return iter.Empty[chan<- gat2.Work]() +} + +func (T *Session) KillSource(source gat2.Source) { + id := source.ID() + + T.mu.Lock() + defer T.mu.Unlock() + + sink, ok := T.inuse[id] + if !ok { + return + } + delete(T.inuse, id) + sink.KillSource(source) + + // return sink to free pool + T.free = append(T.free, sink) +} + +var _ gat2.Sink = (*Session)(nil) diff --git a/lib/gat2/pools/transaction.go b/lib/gat2/pools/transaction.go new file mode 100644 index 0000000000000000000000000000000000000000..f10818c3d312b1c04a8ec268f8bb2f107a3023c5 --- /dev/null +++ b/lib/gat2/pools/transaction.go @@ -0,0 +1,43 @@ +package pools + +import ( + "github.com/google/uuid" + + "gfx.cafe/gfx/pggat/lib/gat2" + "gfx.cafe/gfx/pggat/lib/util/iter" +) + +type Transaction struct { + id uuid.UUID + sinks []gat2.Sink +} + +func NewTransaction(sinks []gat2.Sink) *Transaction { + return &Transaction{ + id: uuid.New(), + sinks: sinks, + } +} + +func (T *Transaction) ID() uuid.UUID { + return T.id +} + +func (T *Transaction) Route(w gat2.Work) iter.Iter[chan<- gat2.Work] { + return iter.Flatten( + iter.Map( + iter.Slice(T.sinks), + func(s gat2.Sink) iter.Iter[chan<- gat2.Work] { + return s.Route(w) + }, + ), + ) +} + +func (T *Transaction) KillSource(source gat2.Source) { + for _, sink := range T.sinks { + sink.KillSource(source) + } +} + +var _ gat2.Sink = (*Transaction)(nil) diff --git a/lib/gat2/request/request.go b/lib/gat2/request/request.go deleted file mode 100644 index d8dabfa73798bc743ff5bb2602999201242af4d6..0000000000000000000000000000000000000000 --- a/lib/gat2/request/request.go +++ /dev/null @@ -1,7 +0,0 @@ -package request - -import "gfx.cafe/gfx/pggat/lib/gat2/source" - -type Request interface { - Source() source.Source -} diff --git a/lib/gat2/router.go b/lib/gat2/router.go new file mode 100644 index 0000000000000000000000000000000000000000..b572efe556748d22d49ad31415a1c02d35977013 --- /dev/null +++ b/lib/gat2/router.go @@ -0,0 +1,4 @@ +package gat2 + +type Router interface { +} diff --git a/lib/gat2/routers/v0/router.go b/lib/gat2/routers/v0/router.go new file mode 100644 index 0000000000000000000000000000000000000000..5d4f245afe0f843cdf611d1b028ec5dbcaf9efdb --- /dev/null +++ b/lib/gat2/routers/v0/router.go @@ -0,0 +1,104 @@ +package routers + +import ( + "sync" + + "gfx.cafe/gfx/pggat/lib/gat2" + "gfx.cafe/gfx/pggat/lib/util/iter" + "gfx.cafe/gfx/pggat/lib/util/race" +) + +type Router struct { + sinks []gat2.Sink + sources []gat2.Source + mu sync.RWMutex +} + +func NewRouter(sinks []gat2.Sink, sources []gat2.Source) *Router { + return &Router{ + sinks: sinks, + sources: sources, + } +} + +func (T *Router) removesrc(idx int) gat2.Source { + T.mu.Lock() + defer T.mu.Unlock() + source := T.sources[idx] + for i := idx; i < len(T.sources)-1; i++ { + T.sources[i] = T.sources[i+1] + } + T.sources = T.sources[:len(T.sources)-1] + return source +} + +// srcdead should be called to clean up resources related to a source when the source dies +func (T *Router) srcdead(idx int) { + source := T.removesrc(idx) + + for _, sink := range T.sinks { + sink.KillSource(source) + } +} + +// srcrecv is basically a huge select statement on all clients.Out() +func (T *Router) srcrecv() (work gat2.Work, idx int, ok bool) { + T.mu.RLock() + defer T.mu.RUnlock() + if len(T.sources) == 0 { + return nil, -1, true + } + + // receive work + return race.Recv( + iter.Map( + iter.Slice(T.sources), + func(source gat2.Source) <-chan gat2.Work { + return source.Out() + }, + ), + ) +} + +// recv receives the next unit of work from the sources +func (T *Router) recv() gat2.Work { + for { + work, idx, ok := T.srcrecv() + if ok { + return work + } + + // T.sources[idx] died, remove it + T.srcdead(idx) + } +} + +// send tries to get a unit of work done +func (T *Router) send(work gat2.Work) { + // send work + race.Send( + iter.Flatten( + iter.Map( + iter.Slice(T.sinks), + func(sink gat2.Sink) iter.Iter[chan<- gat2.Work] { + return sink.Route(work) + }, + ), + ), + work, + ) +} + +func (T *Router) route() { + work := T.recv() + if work == nil { + return + } + T.send(work) +} + +func (T *Router) Run() { + for { + T.route() + } +} diff --git a/lib/gat2/routers/v0/router_test.go b/lib/gat2/routers/v0/router_test.go new file mode 100644 index 0000000000000000000000000000000000000000..26440c84e4e3c70c267b508a058d7e340de19663 --- /dev/null +++ b/lib/gat2/routers/v0/router_test.go @@ -0,0 +1,123 @@ +package routers + +import ( + "log" + "testing" + "time" + + "github.com/google/uuid" + + "gfx.cafe/gfx/pggat/lib/gat2" + "gfx.cafe/gfx/pggat/lib/gat2/pools" + "gfx.cafe/gfx/pggat/lib/util/iter" +) + +type DummyWork struct { + id uuid.UUID + src gat2.Source +} + +func NewDummyWork(src gat2.Source) *DummyWork { + return &DummyWork{ + id: uuid.New(), + src: src, + } +} + +func (T *DummyWork) ID() uuid.UUID { + return T.id +} + +func (T *DummyWork) Source() gat2.Source { + return T.src +} + +var _ gat2.Work = (*DummyWork)(nil) + +type DummySink struct { + id uuid.UUID + in chan gat2.Work +} + +func NewDummySink() *DummySink { + s := &DummySink{ + id: uuid.New(), + in: make(chan gat2.Work), + } + go func() { + for { + w := <-s.in + log.Println("received work", w.ID()) + } + }() + return s +} + +func (T *DummySink) ID() uuid.UUID { + return T.id +} + +func (T *DummySink) Route(_ gat2.Work) iter.Iter[chan<- gat2.Work] { + return iter.Single[chan<- gat2.Work](T.in) +} + +func (T *DummySink) KillSource(_ gat2.Source) {} + +var _ gat2.Sink = (*DummySink)(nil) + +type DummySource struct { + id uuid.UUID + out chan gat2.Work +} + +func NewDummySource() *DummySource { + src := &DummySource{ + id: uuid.New(), + out: make(chan gat2.Work), + } + return src +} + +func (T *DummySource) QueueWork() { + go func() { + T.out <- NewDummyWork(T) + }() +} + +func (T *DummySource) ID() uuid.UUID { + return T.id +} + +func (T *DummySource) Out() <-chan gat2.Work { + return T.out +} + +func (T *DummySource) Close() { + close(T.out) +} + +var _ gat2.Source = (*DummySource)(nil) + +func TestRouter(t *testing.T) { + s1 := NewDummySource() + s2 := NewDummySource() + router := NewRouter( + []gat2.Sink{ + pools.NewSession([]gat2.Sink{ + NewDummySink(), + }), + }, + []gat2.Source{ + s1, + s2, + }, + ) + + s1.QueueWork() + router.route() + s1.Close() + s2.QueueWork() + router.route() + + <-time.After(1 * time.Second) +} diff --git a/lib/gat2/server/server.go b/lib/gat2/server/server.go deleted file mode 100644 index 4fc64a61205a9561c6abec2068dba8c7ecdf1614..0000000000000000000000000000000000000000 --- a/lib/gat2/server/server.go +++ /dev/null @@ -1,4 +0,0 @@ -package server - -type Server interface { -} diff --git a/lib/gat2/sink.go b/lib/gat2/sink.go new file mode 100644 index 0000000000000000000000000000000000000000..f4b486b1d04b4b64467df0bbee8aa8f9c92802a2 --- /dev/null +++ b/lib/gat2/sink.go @@ -0,0 +1,19 @@ +package gat2 + +import ( + "github.com/google/uuid" + + "gfx.cafe/gfx/pggat/lib/util/iter" +) + +type Sink interface { + ID() uuid.UUID + + // Route will return an iter.Iter of channels equipped to handle the Work. + // If Sink dies, it should return iter.Empty. If one of the underlying channels dies, they should remain open + // but not accept work. + Route(Work) iter.Iter[chan<- Work] + + // KillSource will be called when a source dies. This can be used to free resources related to the Source + KillSource(Source) +} diff --git a/lib/gat2/sink/pools/request/pool.go b/lib/gat2/sink/pools/request/pool.go deleted file mode 100644 index f8d2e153e26d4506b92b3cde7865406e6414489d..0000000000000000000000000000000000000000 --- a/lib/gat2/sink/pools/request/pool.go +++ /dev/null @@ -1,57 +0,0 @@ -package request - -import ( - "gfx.cafe/gfx/pggat/lib/gat2/request" - "gfx.cafe/gfx/pggat/lib/gat2/sink" - "gfx.cafe/gfx/pggat/lib/util/race" -) - -type Pool struct { - in chan request.Request - sinks []sink.Sink -} - -func NewPool(sinks []sink.Sink) *Pool { - pool := &Pool{ - in: make(chan request.Request), - sinks: sinks, - } - go pool.run() - return pool -} - -func (T *Pool) handle(req request.Request) { - for _, s := range T.sinks { - select { - case s.In() <- req: - return - default: - } - } - if len(T.sinks) == 0 { - // TODO(garet) this should just error - panic("no free pools") - } - // choose a random sink to wait for - ok := race.Send(func(i int) (chan<- request.Request, bool) { - if i >= len(T.sinks) { - return nil, false - } - return T.sinks[i].In(), true - }, req) - if !ok { - panic("failed to send req to pool") - } -} - -func (T *Pool) run() { - for { - T.handle(<-T.in) - } -} - -func (T *Pool) In() chan<- request.Request { - return T.in -} - -var _ sink.Sink = (*Pool)(nil) diff --git a/lib/gat2/sink/pools/session/pool.go b/lib/gat2/sink/pools/session/pool.go deleted file mode 100644 index 1a658ccbb97f9ccbab40feb1421701a4ae1e962b..0000000000000000000000000000000000000000 --- a/lib/gat2/sink/pools/session/pool.go +++ /dev/null @@ -1,74 +0,0 @@ -package session - -import ( - "gfx.cafe/gfx/pggat/lib/gat2/request" - "gfx.cafe/gfx/pggat/lib/gat2/sink" - "gfx.cafe/gfx/pggat/lib/gat2/source" -) - -type Pool struct { - in chan request.Request - free []sink.Sink - inuse map[source.Source]sink.Sink -} - -func NewPool(sinks []sink.Sink) *Pool { - pool := &Pool{ - in: make(chan request.Request), - free: sinks, - inuse: make(map[source.Source]sink.Sink), - } - go pool.run() - return pool -} - -func (T *Pool) gc() { - for src, s := range T.inuse { - select { - case <-src.Closed(): - delete(T.inuse, src) - T.free = append(T.free, s) - default: - } - } -} - -func (T *Pool) usePool(src source.Source) sink.Sink { - s, ok := T.inuse[src] - if ok { - return s - } - - // collect no longer in use pools - T.gc() - - if len(T.free) == 0 { - // TODO(garet) this should just error - panic("no free pools") - } - - // steal from free - s = T.free[len(T.free)-1] - T.free = T.free[:len(T.free)-1] - - // assign to inuse - T.inuse[src] = s - return s -} - -func (T *Pool) handle(req request.Request) { - src := req.Source() - T.usePool(src).In() <- req -} - -func (T *Pool) run() { - for { - T.handle(<-T.in) - } -} - -func (T *Pool) In() chan<- request.Request { - return T.in -} - -var _ sink.Sink = (*Pool)(nil) diff --git a/lib/gat2/sink/sink.go b/lib/gat2/sink/sink.go deleted file mode 100644 index 71882e6b3928001d79246293416e795f365fe723..0000000000000000000000000000000000000000 --- a/lib/gat2/sink/sink.go +++ /dev/null @@ -1,11 +0,0 @@ -package sink - -import ( - "gfx.cafe/gfx/pggat/lib/gat2/request" -) - -// Sink is usually a server or pool. This object should have some way to fulfil requests. -type Sink interface { - // In receives pending requests and fulfills them - In() chan<- request.Request -} diff --git a/lib/gat2/source.go b/lib/gat2/source.go new file mode 100644 index 0000000000000000000000000000000000000000..81a00615d33491f4530b6aa1197566dcb3f0dbe7 --- /dev/null +++ b/lib/gat2/source.go @@ -0,0 +1,11 @@ +package gat2 + +import ( + "github.com/google/uuid" +) + +type Source interface { + ID() uuid.UUID + + Out() <-chan Work +} diff --git a/lib/gat2/source/source.go b/lib/gat2/source/source.go deleted file mode 100644 index 9533165312ae36ea97d2877e03c84e0b93b648d6..0000000000000000000000000000000000000000 --- a/lib/gat2/source/source.go +++ /dev/null @@ -1,13 +0,0 @@ -package source - -import ( - "gfx.cafe/gfx/pggat/lib/gat2/request" -) - -// Source is usually a client. This object should generate requests to be fulfilled by sinks. -type Source interface { - // Out sends pending requests to be fulfilled by a sink - Out() <-chan request.Request - - Closed() <-chan struct{} -} diff --git a/lib/gat2/source/sources/client/source.go b/lib/gat2/source/sources/client/source.go deleted file mode 100644 index 0dafbd30f81770ec3b80a57d46f8806299158989..0000000000000000000000000000000000000000 --- a/lib/gat2/source/sources/client/source.go +++ /dev/null @@ -1,28 +0,0 @@ -package client - -import ( - "gfx.cafe/gfx/pggat/lib/gat2/request" - "gfx.cafe/gfx/pggat/lib/gat2/source" -) - -type Client struct { - out chan request.Request - closed chan struct{} -} - -func NewClient() *Client { - return &Client{ - out: make(chan request.Request), - closed: make(chan struct{}), - } -} - -func (T *Client) Out() <-chan request.Request { - return T.out -} - -func (T *Client) Closed() <-chan struct{} { - return T.closed -} - -var _ source.Source = (*Client)(nil) diff --git a/lib/gat2/work.go b/lib/gat2/work.go new file mode 100644 index 0000000000000000000000000000000000000000..0a65226b7d61ed2763c0edd3a3b08526de7b72e2 --- /dev/null +++ b/lib/gat2/work.go @@ -0,0 +1,11 @@ +package gat2 + +import ( + "github.com/google/uuid" +) + +type Work interface { + ID() uuid.UUID + + Source() Source +} diff --git a/lib/util/iter/empty.go b/lib/util/iter/empty.go new file mode 100644 index 0000000000000000000000000000000000000000..fa209b4c70a03494d57ce920346eba988a7fa405 --- /dev/null +++ b/lib/util/iter/empty.go @@ -0,0 +1,7 @@ +package iter + +func Empty[T any]() Iter[T] { + return func() (T, bool) { + return *new(T), false + } +} diff --git a/lib/util/iter/flatten.go b/lib/util/iter/flatten.go new file mode 100644 index 0000000000000000000000000000000000000000..3d5b83af44dbb6757a0a50ee00afc2807702b967 --- /dev/null +++ b/lib/util/iter/flatten.go @@ -0,0 +1,18 @@ +package iter + +func Flatten[T any](iter Iter[Iter[T]]) Iter[T] { + i := Empty[T]() + return func() (T, bool) { + for { + v, ok := i() + if ok { + return v, true + } + i, ok = iter() + if !ok { + break + } + } + return *new(T), false + } +} diff --git a/lib/util/iter/iter_test.go b/lib/util/iter/iter_test.go index 2c7c82c8992bc12ded5a14c3ce26d800225873af..4c5de2ce1fe96cfb123af600fc7441f501f00150 100644 --- a/lib/util/iter/iter_test.go +++ b/lib/util/iter/iter_test.go @@ -87,3 +87,44 @@ func TestForEach(t *testing.T) { }) end(t, iter) } + +func TestEmpty(t *testing.T) { + iter := Empty[int]() + + end(t, iter) +} + +func TestFlatten(t *testing.T) { + slice := [][]int{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}} + iter := Slice(slice) + iter2 := Map(iter, func(v []int) Iter[int] { + return Slice(v) + }) + iter3 := Flatten(iter2) + + expect(t, iter3, 1) + expect(t, iter3, 2) + expect(t, iter3, 3) + expect(t, iter3, 4) + expect(t, iter3, 5) + expect(t, iter3, 6) + expect(t, iter3, 7) + expect(t, iter3, 8) + expect(t, iter3, 9) + end(t, iter3) +} + +func BenchmarkFlatten(b *testing.B) { + b.ReportAllocs() + slice := [][]int{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}} + + for i := 0; i < b.N; i++ { + iter := Slice(slice) + iter2 := Map(iter, func(v []int) Iter[int] { + return Slice(v) + }) + iter3 := Flatten(iter2) + + ForEach(iter3, func(i int) {}) + } +} diff --git a/lib/util/iter/single.go b/lib/util/iter/single.go new file mode 100644 index 0000000000000000000000000000000000000000..fcc4dc7d4d75230cdba3dc90f6e94fe08a0f7d2e --- /dev/null +++ b/lib/util/iter/single.go @@ -0,0 +1,12 @@ +package iter + +func Single[T any](value T) Iter[T] { + ok := true + return func() (T, bool) { + if ok { + ok = false + return value, true + } + return *new(T), false + } +} diff --git a/lib/util/race/recv.go b/lib/util/race/recv.go index 13c6616c8d901e7a066ed7483e99317a03ca9b25..5149b502c3116d116732027008e865494e26818b 100644 --- a/lib/util/race/recv.go +++ b/lib/util/race/recv.go @@ -6,7 +6,7 @@ import ( "gfx.cafe/gfx/pggat/lib/util/iter" ) -func Recv[T any](next iter.Iter[<-chan T]) (T, bool) { +func Recv[T any](next iter.Iter[<-chan T]) (T, int, bool) { cases := casePool.Get()[:0] defer func() { casePool.Put(cases) @@ -17,9 +17,9 @@ func Recv[T any](next iter.Iter[<-chan T]) (T, bool) { Chan: reflect.ValueOf(ch), }) }) - _, value, ok := reflect.Select(cases) + idx, value, ok := reflect.Select(cases) if !ok { - return *new(T), false + return *new(T), idx, false } - return value.Interface().(T), true + return value.Interface().(T), idx, true } diff --git a/lib/util/race/send.go b/lib/util/race/send.go index 861373114cad58420639b46edf9882f8c2e9e7fb..8c3b28af8d84d33cdc88d95a8f2a2751a665d7e0 100644 --- a/lib/util/race/send.go +++ b/lib/util/race/send.go @@ -6,7 +6,7 @@ import ( "gfx.cafe/gfx/pggat/lib/util/iter" ) -func Send[T any](next iter.Iter[chan<- T], value T) bool { +func Send[T any](next iter.Iter[chan<- T], value T) { reflectValue := reflect.ValueOf(value) cases := casePool.Get()[:0] defer func() { @@ -19,6 +19,5 @@ func Send[T any](next iter.Iter[chan<- T], value T) bool { Send: reflectValue, }) }) - _, _, ok := reflect.Select(cases) - return ok + reflect.Select(cases) }