good morning!!!!

Skip to content
Snippets Groups Projects
Commit a1ad14cf authored by Garet Halliday's avatar Garet Halliday
Browse files

better listener iface

parent e2a000d3
No related branches found
No related tags found
No related merge requests found
......@@ -13,5 +13,5 @@ type AcceptedConn struct {
type Listener interface {
Module
Accept() []<-chan AcceptedConn
Listen(chan<- AcceptedConn)
}
......@@ -15,7 +15,7 @@ type Module struct {
Config
listener net.Listener
accepted chan gat.AcceptedConn
accepted chan<- gat.AcceptedConn
}
func (*Module) GatModule() {}
......@@ -33,9 +33,6 @@ func (T *Module) Start() error {
}
log.Printf("listening on %v", T.listener.Addr())
T.accepted = make(chan gat.AcceptedConn)
go T.acceptLoop()
return nil
}
......@@ -86,10 +83,9 @@ func (T *Module) acceptLoop() {
}
}
func (T *Module) Accept() []<-chan gat.AcceptedConn {
return []<-chan gat.AcceptedConn{
T.accepted,
}
func (T *Module) Listen(ch chan<- gat.AcceptedConn) {
T.accepted = ch
go T.acceptLoop()
}
var _ gat.Module = (*Module)(nil)
......
......@@ -348,13 +348,11 @@ func (T *Module) ReadMetrics(metrics *metrics.Pools) {
})
}
func (T *Module) Accept() []<-chan gat.AcceptedConn {
var accept []<-chan gat.AcceptedConn
func (T *Module) Listen(ch chan<- gat.AcceptedConn) {
if T.PgBouncer.ListenAddr != "" {
accept = append(accept, T.tcpListener.Accept()...)
T.tcpListener.Listen(ch)
}
accept = append(accept, T.unixListener.Accept()...)
return accept
T.unixListener.Listen(ch)
}
func (T *Module) GatModule() {}
......
......@@ -9,7 +9,6 @@ import (
"gfx.cafe/gfx/pggat/lib/bouncer/frontends/v0"
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/gat/metrics"
"gfx.cafe/gfx/pggat/lib/util/chans"
"gfx.cafe/gfx/pggat/lib/util/maps"
)
......@@ -126,23 +125,23 @@ func (T *Server) Start() error {
}
func (T *Server) acceptLoop() {
var accept []<-chan AcceptedConn
accept := make(chan AcceptedConn)
for _, listener := range T.listeners {
accept = append(accept, listener.Accept()...)
listener.Listen(accept)
}
acceptor := chans.NewMultiRecv(accept, T.done)
for {
accepted, ok := acceptor.Recv()
if !ok {
break
select {
case accepted := <-accept:
go func() {
if err := T.serve(accepted.Conn, accepted.Params); err != nil && !errors.Is(err, io.EOF) {
log.Printf("failed to serve client: %v", err)
}
}()
case <-T.done:
return
}
go func() {
if err := T.serve(accepted.Conn, accepted.Params); err != nil && !errors.Is(err, io.EOF) {
log.Printf("failed to serve client: %v", err)
}
}()
}
}
......
package chans
import (
"reflect"
"gfx.cafe/gfx/pggat/lib/util/slices"
)
type MultiRecv[T any] struct {
cases []reflect.SelectCase
}
func NewMultiRecv[T any](cases []<-chan T, done <-chan struct{}) *MultiRecv[T] {
c := make([]reflect.SelectCase, 0, len(cases)+1)
c = append(c, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(done),
})
for _, ch := range cases {
c = append(c, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
return &MultiRecv[T]{
cases: c,
}
}
func (c *MultiRecv[T]) Recv() (T, bool) {
for {
idx, value, ok := reflect.Select(c.cases)
if !ok {
if idx == 0 {
// done triggered
return *new(T), false
}
c.cases = slices.DeleteIndex(c.cases, idx)
continue
}
return value.Interface().(T), true
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment