diff --git a/lib/fed/conn.go b/lib/fed/conn.go index b6bc0016b2e728e3224ba1fb58ed2fdacec28e31..a32336e7bf480c26947e76bc6681c46334bfe8b7 100644 --- a/lib/fed/conn.go +++ b/lib/fed/conn.go @@ -2,12 +2,18 @@ package fed import ( "crypto/tls" + "io" "net" "gfx.cafe/gfx/pggat/lib/util/decorator" "gfx.cafe/gfx/pggat/lib/util/strutil" ) +type Listener interface { + Accept(fn func(*Conn)) error + io.Closer +} + type Conn struct { noCopy decorator.NoCopy diff --git a/lib/fed/listeners/netconnlistener/listener.go b/lib/fed/listeners/netconnlistener/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..0d6c0c55759bf2b482d96d8b1431c7e2c85da80d --- /dev/null +++ b/lib/fed/listeners/netconnlistener/listener.go @@ -0,0 +1,27 @@ +package netconnlistener + +import ( + "net" + + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/fed/codecs/netconncodec" +) + +type Listener struct { + Listener net.Listener +} + +func (listener *Listener) Accept(fn func(*fed.Conn)) error { + raw, err := listener.Listener.Accept() + if err != nil { + return err + } + fedConn := fed.NewConn(netconncodec.NewCodec(raw)) + go func() { + fn(fedConn) + }() + return nil +} +func (l *Listener) Close() error { + return l.Listener.Close() +} diff --git a/lib/gat/listen.go b/lib/gat/listen.go index fb47fd97442a9143d5b824f64b6fe525dbbf495d..ad806bf3b1a80c11d2f76be60a7ac8514ad4685a 100644 --- a/lib/gat/listen.go +++ b/lib/gat/listen.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap" "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/codecs/netconncodec" + "gfx.cafe/gfx/pggat/lib/fed/listeners/netconnlistener" ) type ListenerConfig struct { @@ -30,20 +30,12 @@ type Listener struct { networkAddress caddy.NetworkAddress ssl SSLServer - listener net.Listener + listener fed.Listener open atomic.Int64 log *zap.Logger } -func (T *Listener) accept() (*fed.Conn, error) { - raw, err := T.listener.Accept() - if err != nil { - return nil, err - } - return fed.NewConn(netconncodec.NewCodec(raw)), nil -} - func (T *Listener) Provision(ctx caddy.Context) error { T.log = ctx.Logger() @@ -86,19 +78,22 @@ func (T *Listener) Provision(ctx caddy.Context) error { } func (T *Listener) Start() error { - if T.networkAddress.Network == "unix" { - if err := os.MkdirAll(filepath.Dir(T.networkAddress.Host), 0o660); err != nil { + addr := T.networkAddress + if addr.Network == "unix" { + if err := os.MkdirAll(filepath.Dir(addr.Host), 0o660); err != nil { return err } } - listener, err := T.networkAddress.Listen(context.Background(), 0, net.ListenConfig{}) + listener, err := addr.Listen(context.Background(), 0, net.ListenConfig{}) if err != nil { return err } - T.listener = listener.(net.Listener) - - T.log.Info("listening", zap.String("address", T.listener.Addr().String())) - + if netListener, ok := listener.(net.Listener); ok { + T.listener = &netconnlistener.Listener{Listener: netListener} + } else if fedListener, ok := listener.(fed.Listener); ok { + T.listener = fedListener + } + T.log.Info("listening", zap.String("address", T.networkAddress.String())) return nil } diff --git a/lib/gat/server.go b/lib/gat/server.go index 3af2afadfa3ebc68e9b4f90285ea4782a7c0fa47..ad68e5f30dfc1d7b215149cdb8bf8681148d5f92 100644 --- a/lib/gat/server.go +++ b/lib/gat/server.go @@ -184,7 +184,9 @@ func (T *Server) accept(listener *Listener, conn *fed.Conn) { } func (T *Server) acceptFrom(listener *Listener) bool { - conn, err := listener.accept() + err := listener.listener.Accept(func(c *fed.Conn) { + T.accept(listener, c) + }) if err != nil { if errors.Is(err, net.ErrClosed) { return false @@ -198,8 +200,6 @@ func (T *Server) acceptFrom(listener *Listener) bool { T.log.Warn("error accepting client", zap.Error(err)) return true } - - go T.accept(listener, conn) return true } diff --git a/lib/gat/standard/standard.go b/lib/gat/standard/standard.go index be606cfcbce1ff4f2b3e8d216a00fe2eb4d9a9f2..f1c51ab53c35c56f48ac14295467cf399ca237d6 100644 --- a/lib/gat/standard/standard.go +++ b/lib/gat/standard/standard.go @@ -2,6 +2,7 @@ package standard import ( // base server + _ "gfx.cafe/gfx/pggat/lib/gat" // matchers @@ -47,4 +48,7 @@ import ( // pools _ "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/pools/basic" _ "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/pools/hybrid" + + // listeners + _ "gfx.cafe/gfx/pggat/lib/fed/listeners/netconnlistener" ) diff --git a/test/tester_test.go b/test/tester_test.go index 9ffc5bd823bdb6a3dfbda239f31d7bd7d470d1f2..0639377177371ecac3ed248cb46341a89515b307 100644 --- a/test/tester_test.go +++ b/test/tester_test.go @@ -13,6 +13,7 @@ import ( "github.com/caddyserver/caddy/v2/caddyconfig" "gfx.cafe/gfx/pggat/lib/auth/credentials" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/gatcaddyfile" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" @@ -22,6 +23,8 @@ import ( "gfx.cafe/gfx/pggat/lib/util/strutil" "gfx.cafe/gfx/pggat/test" "gfx.cafe/gfx/pggat/test/tests" + + _ "gfx.cafe/gfx/pggat/lib/fed/listeners/netconnlistener" ) func wrapConfig(conf basic.Config) basic.Config { @@ -97,6 +100,7 @@ func createServer(parent dialer, pools map[string]caddy.Module) (server gat.Serv Dialer: pool.Dialer{ Address: parent.Address, Username: parent.Username, + SSLMode: bouncer.SSLModeDisable, RawPassword: parent.Password, Database: parent.Database, }, @@ -164,9 +168,10 @@ func TestTester(t *testing.T) { control := pool.Dialer{ Address: "localhost:5432", Username: "postgres", + SSLMode: bouncer.SSLModeDisable, Credentials: credentials.Cleartext{ Username: "postgres", - Password: "password", + Password: "postgres", }, Database: "postgres", } @@ -176,7 +181,7 @@ func TestTester(t *testing.T) { parent, err := daisyChain(&config, dialer{ Address: "localhost:5432", Username: "postgres", - Password: "password", + Password: "postgres", Database: "postgres", }, 16) if err != nil {