Newer
Older
"gfx.cafe/gfx/pggat/lib/gat/metrics"
"gfx.cafe/gfx/pggat/lib/perror"
Listen []ListenerConfig `json:"listen,omitempty"`
Routes []RouteConfig `json:"routes,omitempty"`
routes []*Route
cancellableHandlers []CancellableHandler
metricsHandlers []MetricsHandler
log *zap.Logger
T.listen = make([]*Listener, 0, len(T.Listen))
for _, config := range T.Listen {
listener := &Listener{
ListenerConfig: config,
}
if err := listener.Provision(ctx); err != nil {
return err
T.routes = make([]*Route, 0, len(T.Routes))
for _, config := range T.Routes {
route := &Route{
RouteConfig: config,
if cancellableHandler, ok := route.handle.(CancellableHandler); ok {
T.cancellableHandlers = append(T.cancellableHandlers, cancellableHandler)
}
if metricsHandler, ok := route.handle.(MetricsHandler); ok {
T.metricsHandlers = append(T.metricsHandlers, metricsHandler)
}
func (T *Server) Start() error {
for _, listener := range T.listen {
if err := listener.Start(); err != nil {
return err
}
go func(listener *Listener) {
for {
if !T.acceptFrom(listener) {
break
}
}
}(listener)
}
return nil
}
func (T *Server) Stop() error {
for _, listen := range T.listen {
if err := listen.Stop(); err != nil {
return err
}
}
return nil
}
for _, cancellableHandler := range T.cancellableHandlers {
cancellableHandler.Cancel(key)
}
}
func (T *Server) ReadMetrics(m *metrics.Server) {
for _, metricsHandler := range T.metricsHandlers {
metricsHandler.ReadMetrics(&m.Handler)
}
}
func (T *Server) Serve(conn *fed.Conn) {
for _, route := range T.routes {
if route.match != nil && !route.match.Matches(conn) {
continue
}
err := route.handle.Handle(conn)
if err != nil {
if errors.Is(err, io.EOF) {
// normal closure
return
}
errResp := perror.ToPacket(perror.Wrap(err))
_ = conn.WritePacket(errResp)
perror.FATAL,
perror.InvalidPassword,
fmt.Sprintf(`Database "%s" not found`, conn.Database),
),
T.log.Warn("database not found", zap.String("user", conn.User), zap.String("database", conn.Database))
func (T *Server) accept(listener *Listener, conn *fed.Conn) {
defer func() {
_ = conn.Close()
}()
var tlsConfig *tls.Config
if listener.ssl != nil {
tlsConfig = listener.ssl.ServerTLSConfig()
}
var isCanceling bool
var err error
cancelKey, isCanceling, err = frontends.Accept(conn, tlsConfig)
if err != nil {
T.log.Warn("error accepting client", zap.Error(err))
return
}
if isCanceling {
T.Cancel(cancelKey)
return
}
T.Serve(conn)
}
func (T *Server) acceptFrom(listener *Listener) bool {
conn, err := listener.accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return false
}
if netErr, ok := err.(*net.OpError); ok {
// why can't they just expose this error
if netErr.Err.Error() == "listener 'closed' 😉" {
return false
}
}
T.log.Warn("error accepting client", zap.Error(err))
return true
}
go T.accept(listener, conn)
return true
}