good morning!!!!

Skip to content
Snippets Groups Projects
app.go 4.98 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package gat

import (
Garet Halliday's avatar
Garet Halliday committed
	"crypto/tls"
	"errors"
	"fmt"
Garet Halliday's avatar
Garet Halliday committed
	"io"
Garet Halliday's avatar
Garet Halliday committed
	"net"
Garet Halliday's avatar
Garet Halliday committed
	"time"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"github.com/caddyserver/caddy/v2"
Garet Halliday's avatar
Garet Halliday committed
	"go.uber.org/zap"
Garet Halliday's avatar
Garet Halliday committed

	"gfx.cafe/gfx/pggat/lib/bouncer/frontends/v0"
	"gfx.cafe/gfx/pggat/lib/fed"
	packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/gat/metrics"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/middleware/interceptor"
	"gfx.cafe/gfx/pggat/lib/middleware/middlewares/unterminate"
	"gfx.cafe/gfx/pggat/lib/perror"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/util/dur"
	"gfx.cafe/gfx/pggat/lib/util/maps"
Garet Halliday's avatar
Garet Halliday committed
	"gfx.cafe/gfx/pggat/lib/util/slices"
Garet Halliday's avatar
Garet Halliday committed
)

type Config struct {
	StatLogPeriod dur.Duration     `json:"stat_log_period"`
	Listen        []ListenerConfig `json:"listen"`
	Servers       []ServerConfig   `json:"servers"`
}

func init() {
	caddy.RegisterModule((*App)(nil))
}

type App struct {
	Config

	listen  []*Listener
	servers []*Server

	keys maps.RWLocked[[8]byte, *Pool]
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	closed chan struct{}

Garet Halliday's avatar
Garet Halliday committed
	log *zap.Logger
Garet Halliday's avatar
Garet Halliday committed
}

func (T *App) CaddyModule() caddy.ModuleInfo {
	return caddy.ModuleInfo{
		ID: "pggat",
		New: func() caddy.Module {
			return new(App)
		},
	}
}

func (T *App) Provision(ctx caddy.Context) error {
Garet Halliday's avatar
Garet Halliday committed
	T.log = ctx.Logger()

Garet Halliday's avatar
Garet Halliday committed
	T.listen = make([]*Listener, 0, len(T.Listen))
	for _, config := range T.Listen {
		listener := &Listener{
			ListenerConfig: config,
		}
		if err := listener.Provision(ctx); err != nil {
			return err
		}
		T.listen = append(T.listen, listener)
	}

	T.servers = make([]*Server, 0, len(T.Servers))
	for _, config := range T.Servers {
		server := &Server{
			ServerConfig: config,
		}
		if err := server.Provision(ctx); err != nil {
			return err
		}
		T.servers = append(T.servers, server)
	}

	return nil
}

Garet Halliday's avatar
Garet Halliday committed
func (T *App) cancel(key [8]byte) {
	p, _ := T.keys.Load(key)
	if p == nil {
		return
	}

	_ = p.Cancel(key)
}

func (T *App) serve(server *Server, conn fed.Conn) {
	initialParameters := conn.InitialParameters()
	for key := range initialParameters {
		if !slices.Contains(server.AllowedStartupParameters, key) {
			errResp := packets.ErrorResponse{
				Error: perror.New(
					perror.FATAL,
					perror.FeatureNotSupported,
					fmt.Sprintf(`Startup parameter "%s" is not allowed`, key),
				),
			}
			_ = conn.WritePacket(errResp.IntoPacket(nil))
			return
		}
	}

	p := server.lookup(conn)
	if p == nil {
Garet Halliday's avatar
Garet Halliday committed
		T.log.Warn("database not found", zap.String("user", conn.User()), zap.String("database", conn.Database()))
Garet Halliday's avatar
Garet Halliday committed
		return
	}

	backendKey, err := frontends.Authenticate(conn, p.Credentials())
	if err != nil {
Garet Halliday's avatar
Garet Halliday committed
		T.log.Warn("error authenticating client", zap.Error(err))
Garet Halliday's avatar
Garet Halliday committed
		return
	}

	T.keys.Store(backendKey, p)
	defer T.keys.Delete(backendKey)

Garet Halliday's avatar
Garet Halliday committed
	if err2 := p.Serve(conn, backendKey); err2 != nil && !errors.Is(err2, io.EOF) {
		T.log.Warn("error serving client", zap.Error(err2))
Garet Halliday's avatar
Garet Halliday committed
		return
	}
}

func (T *App) accept(listener *Listener, conn *fed.NetConn) {
	defer func() {
		_ = conn.Close()
	}()

	var tlsConfig *tls.Config
	if listener.ssl != nil {
		tlsConfig = listener.ssl.ServerTLSConfig()
	}

	cancelKey, isCanceling, _, user, database, initialParameters, err := frontends.Accept(conn, tlsConfig)
	if err != nil {
Garet Halliday's avatar
Garet Halliday committed
		T.log.Warn("error accepting client", zap.Error(err))
Garet Halliday's avatar
Garet Halliday committed
		return
	}

	if isCanceling {
		T.cancel(cancelKey)
		return
	}

	conn.SetUser(user)
	conn.SetDatabase(database)
	conn.SetInitialParameters(initialParameters)

	for _, server := range T.servers {
		if server.match == nil || server.match.Matches(conn) {
			T.serve(server, interceptor.NewInterceptor(conn, unterminate.Unterminate))
			return
		}
	}

Garet Halliday's avatar
Garet Halliday committed
	T.log.Warn("server not found", zap.String("user", conn.User()), zap.String("database", conn.Database()))
Garet Halliday's avatar
Garet Halliday committed

	errResp := packets.ErrorResponse{
		Error: perror.New(
			perror.FATAL,
			perror.InternalError,
			"No server is available to handle your request",
		),
	}
	_ = conn.WritePacket(errResp.IntoPacket(nil))
}

func (T *App) acceptFrom(listener *Listener) bool {
	conn, err := listener.accept()
	if err != nil {
		if errors.Is(err, net.ErrClosed) {
			return false
		}
Garet Halliday's avatar
Garet Halliday committed
		T.log.Warn("error accepting client", zap.Error(err))
Garet Halliday's avatar
Garet Halliday committed
		return true
	}

	go T.accept(listener, conn)
	return true
}

Garet Halliday's avatar
Garet Halliday committed
func (T *App) statLogLoop() {
	t := time.NewTicker(T.StatLogPeriod.Duration())
	defer t.Stop()

	var stats metrics.Server
	for {
		select {
		case <-t.C:
			for _, server := range T.servers {
				for _, route := range server.routes {
					route.provide.ReadMetrics(&stats.Pools)
				}
			}
			T.log.Info(stats.String())
			stats.Clear()
		case <-T.closed:
			return
		}
	}
}

Garet Halliday's avatar
Garet Halliday committed
func (T *App) Start() error {
Garet Halliday's avatar
Garet Halliday committed
	T.closed = make(chan struct{})
	if T.StatLogPeriod != 0 {
		go T.statLogLoop()
	}

Garet Halliday's avatar
Garet Halliday committed
	// start listeners
	for _, listener := range T.listen {
		if err := listener.Start(); err != nil {
			return err
		}
Garet Halliday's avatar
Garet Halliday committed

		go func(listener *Listener) {
			for {
				if !T.acceptFrom(listener) {
					break
				}
			}
		}(listener)
Garet Halliday's avatar
Garet Halliday committed
	}

	return nil
}

func (T *App) Stop() error {
Garet Halliday's avatar
Garet Halliday committed
	close(T.closed)

Garet Halliday's avatar
Garet Halliday committed
	// stop listeners
	for _, listener := range T.listen {
		if err := listener.Stop(); err != nil {
			return err
		}
	}

	return nil
}

var _ caddy.Module = (*App)(nil)
var _ caddy.Provisioner = (*App)(nil)
var _ caddy.App = (*App)(nil)