diff --git a/lib/fed/conn.go b/lib/fed/conn.go index b6bc0016b2e728e3224ba1fb58ed2fdacec28e31..0acb2710963e6fc2846268222e90f0862f0656c6 100644 --- a/lib/fed/conn.go +++ b/lib/fed/conn.go @@ -2,12 +2,18 @@ package fed import ( "crypto/tls" + "io" "net" "gfx.cafe/gfx/pggat/lib/util/decorator" "gfx.cafe/gfx/pggat/lib/util/strutil" ) +type Listener interface { + Accept(func(*Conn)) error + io.Closer +} + type Conn struct { noCopy decorator.NoCopy diff --git a/lib/fed/listeners/netconnlistener/listener.go b/lib/fed/listeners/netconnlistener/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..e2156e40bb02d97d0069c6e7849c3f5bd407ef3c --- /dev/null +++ b/lib/fed/listeners/netconnlistener/listener.go @@ -0,0 +1,27 @@ +package netconnlistener + +import ( + "net" + + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/fed/codecs/netconncodec" +) + +type Listener struct { + Listener net.Listener +} + +func (listener *Listener) Accept(fn func(*fed.Conn)) error { + raw, err := listener.Listener.Accept() + if err != nil { + return err + } + fedConn := fed.NewConn(netconncodec.NewCodec(raw)) + go func() { + fn(fedConn) + }() + return nil +} +func (l *Listener) Close() error { + return l.Close() +} diff --git a/lib/gat/listen.go b/lib/gat/listen.go index fb47fd97442a9143d5b824f64b6fe525dbbf495d..dd9dbdbc84adb3fef2ec52d81f642b9126c6867b 100644 --- a/lib/gat/listen.go +++ b/lib/gat/listen.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap" "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/codecs/netconncodec" + "gfx.cafe/gfx/pggat/lib/fed/listeners/netconnlistener" ) type ListenerConfig struct { @@ -30,20 +30,12 @@ type Listener struct { networkAddress caddy.NetworkAddress ssl SSLServer - listener net.Listener + listener fed.Listener open atomic.Int64 log *zap.Logger } -func (T *Listener) accept() (*fed.Conn, error) { - raw, err := T.listener.Accept() - if err != nil { - return nil, err - } - return fed.NewConn(netconncodec.NewCodec(raw)), nil -} - func (T *Listener) Provision(ctx caddy.Context) error { T.log = ctx.Logger() @@ -95,9 +87,10 @@ func (T *Listener) Start() error { if err != nil { return err } - T.listener = listener.(net.Listener) + ncn := &netconnlistener.Listener{Listener: listener.(net.Listener)} + T.listener = ncn - T.log.Info("listening", zap.String("address", T.listener.Addr().String())) + T.log.Info("listening", zap.String("address", ncn.Listener.Addr().String())) return nil } diff --git a/lib/gat/server.go b/lib/gat/server.go index 3af2afadfa3ebc68e9b4f90285ea4782a7c0fa47..ad68e5f30dfc1d7b215149cdb8bf8681148d5f92 100644 --- a/lib/gat/server.go +++ b/lib/gat/server.go @@ -184,7 +184,9 @@ func (T *Server) accept(listener *Listener, conn *fed.Conn) { } func (T *Server) acceptFrom(listener *Listener) bool { - conn, err := listener.accept() + err := listener.listener.Accept(func(c *fed.Conn) { + T.accept(listener, c) + }) if err != nil { if errors.Is(err, net.ErrClosed) { return false @@ -198,8 +200,6 @@ func (T *Server) acceptFrom(listener *Listener) bool { T.log.Warn("error accepting client", zap.Error(err)) return true } - - go T.accept(listener, conn) return true }