good morning!!!!

Skip to content
Snippets Groups Projects
server.go 4.55 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package gat

import (
Garet Halliday's avatar
Garet Halliday committed
	"crypto/tls"
Garet Halliday's avatar
Garet Halliday committed
	"errors"
Garet Halliday's avatar
Garet Halliday committed
	"fmt"
Garet Halliday's avatar
Garet Halliday committed
	"io"
Garet Halliday's avatar
Garet Halliday committed
	"net"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"github.com/caddyserver/caddy/v2"
Garet Halliday's avatar
Garet Halliday committed
	"go.uber.org/zap"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/bouncer/frontends/v0"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/fed"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/gat/metrics"
a's avatar
a committed
	"gfx.cafe/gfx/pggat/lib/instrumentation/prom"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/perror"
Garet Halliday's avatar
Garet Halliday committed
)

Garet Halliday's avatar
Garet Halliday committed
type ServerConfig struct {
Garet Halliday's avatar
Garet Halliday committed
	Listen []ListenerConfig `json:"listen,omitempty"`
	Routes []RouteConfig    `json:"routes,omitempty"`
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
type Server struct {
	ServerConfig
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	listen              []*Listener
Garet Halliday's avatar
Garet Halliday committed
	routes              []*Route
	cancellableHandlers []CancellableHandler
	metricsHandlers     []MetricsHandler

	log *zap.Logger
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Server) Provision(ctx caddy.Context) error {
Garet Halliday's avatar
Garet Halliday committed
	T.log = ctx.Logger()

Garet Halliday's avatar
Garet Halliday committed
	T.listen = make([]*Listener, 0, len(T.Listen))
	for _, config := range T.Listen {
		listener := &Listener{
			ListenerConfig: config,
		}
		if err := listener.Provision(ctx); err != nil {
			return err
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed
		T.listen = append(T.listen, listener)
Garet Halliday's avatar
Garet Halliday committed
	}

Garet Halliday's avatar
Garet Halliday committed
	T.routes = make([]*Route, 0, len(T.Routes))
	for _, config := range T.Routes {
		route := &Route{
			RouteConfig: config,
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed
		if err := route.Provision(ctx); err != nil {
Garet Halliday's avatar
Garet Halliday committed
			return err
		}
Garet Halliday's avatar
Garet Halliday committed
		if cancellableHandler, ok := route.handle.(CancellableHandler); ok {
			T.cancellableHandlers = append(T.cancellableHandlers, cancellableHandler)
		}
		if metricsHandler, ok := route.handle.(MetricsHandler); ok {
			T.metricsHandlers = append(T.metricsHandlers, metricsHandler)
		}
Garet Halliday's avatar
Garet Halliday committed
		T.routes = append(T.routes, route)
Garet Halliday's avatar
Garet Halliday committed
	}

Garet Halliday's avatar
Garet Halliday committed
func (T *Server) Start() error {
	for _, listener := range T.listen {
		if err := listener.Start(); err != nil {
			return err
		}

		go func(listener *Listener) {
			for {
				if !T.acceptFrom(listener) {
					break
				}
			}
		}(listener)
	}

	return nil
}

func (T *Server) Stop() error {
	for _, listen := range T.listen {
		if err := listen.Stop(); err != nil {
			return err
		}
	}

	return nil
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Server) Cancel(key fed.BackendKey) {
Garet Halliday's avatar
Garet Halliday committed
	for _, cancellableHandler := range T.cancellableHandlers {
		cancellableHandler.Cancel(key)
	}
}

func (T *Server) ReadMetrics(m *metrics.Server) {
	for _, metricsHandler := range T.metricsHandlers {
		metricsHandler.ReadMetrics(&m.Handler)
	}
}

func (T *Server) Serve(conn *fed.Conn) {
a's avatar
a committed
	composed := Router(RouterFunc(func(conn *fed.Conn) error {
		// database not found
		errResp := perror.ToPacket(
			perror.New(
				perror.FATAL,
				perror.InvalidPassword,
				fmt.Sprintf(`Database "%s" not found`, conn.Database),
			),
		)
		_ = conn.WritePacket(errResp)
		T.log.Warn("database not found", zap.String("user", conn.User), zap.String("database", conn.Database))
		return nil
	}))
a's avatar
a committed
	for j := 0; j < len(T.routes); j++ {
a's avatar
a committed
		route := T.routes[j]
Garet Halliday's avatar
Garet Halliday committed
		if route.match != nil && !route.match.Matches(conn) {
			continue
		}
a's avatar
a committed
		if route.handle == nil {
			continue
		}
a's avatar
a committed
		composed = route.handle.Handle(composed)
	}
	err := composed.Route(conn)
	if err != nil {
		if errors.Is(err, io.EOF) {
			// normal closure
Garet Halliday's avatar
Garet Halliday committed
			return
a's avatar
a committed
		errResp := perror.ToPacket(perror.Wrap(err))
		_ = conn.WritePacket(errResp)
		return
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Server) accept(listener *Listener, conn *fed.Conn) {
	defer func() {
		_ = conn.Close()
	}()
a's avatar
a committed
	labels := prom.ListenerLabels{ListenAddr: listener.networkAddress.String()}
Garet Halliday's avatar
Garet Halliday committed

	var tlsConfig *tls.Config
	if listener.ssl != nil {
		tlsConfig = listener.ssl.ServerTLSConfig()
	}

Garet Halliday's avatar
Garet Halliday committed
	var cancelKey fed.BackendKey
Garet Halliday's avatar
Garet Halliday committed
	var isCanceling bool
	var err error
	cancelKey, isCanceling, err = frontends.Accept(conn, tlsConfig)
	if err != nil {
Garet Halliday's avatar
Garet Halliday committed
		if !errors.Is(err, io.EOF) {
			T.log.Warn("error accepting client", zap.Error(err))
		}
Garet Halliday's avatar
Garet Halliday committed
		return
	}

	if isCanceling {
		T.Cancel(cancelKey)
		return
	}

Garet Halliday's avatar
Garet Halliday committed
	count := listener.open.Add(1)
a's avatar
a committed
	prom.Listener.Client(labels).Inc()
	prom.Listener.Incoming(labels).Inc()
a's avatar
a committed
	defer func() {
		listener.open.Add(-1)
a's avatar
a committed
		prom.Listener.Client(labels).Dec()
a's avatar
a committed
	}()
Garet Halliday's avatar
Garet Halliday committed

	if listener.MaxConnections != 0 && int(count) > listener.MaxConnections {
		_ = conn.WritePacket(
			perror.ToPacket(perror.New(
				perror.FATAL,
				perror.TooManyConnections,
				"Too many connections, sorry",
			)),
		)
		return
	}
a's avatar
a committed
	prom.Listener.Accepted(labels).Inc()
Garet Halliday's avatar
Garet Halliday committed
	T.Serve(conn)
}

func (T *Server) acceptFrom(listener *Listener) bool {
a's avatar
a committed
	err := listener.listener.Accept(func(c *fed.Conn) {
		T.accept(listener, c)
	})
Garet Halliday's avatar
Garet Halliday committed
	if err != nil {
Garet Halliday's avatar
Garet Halliday committed
		if errors.Is(err, net.ErrClosed) {
			return false
		}
Garet Halliday's avatar
Garet Halliday committed
		if netErr, ok := err.(*net.OpError); ok {
			// why can't they just expose this error
			if netErr.Err.Error() == "listener 'closed' 😉" {
				return false
			}
Garet Halliday's avatar
Garet Halliday committed
		}
		T.log.Warn("error accepting client", zap.Error(err))
		return true
	}
	return true
}

Garet Halliday's avatar
Garet Halliday committed
var _ caddy.Provisioner = (*Server)(nil)