good morning!!!!

Skip to content
Snippets Groups Projects
pool.go 9.56 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package pool

import (
Garet Halliday's avatar
a  
Garet Halliday committed
	"pggat2/lib/util/maps"
Garet Halliday's avatar
Garet Halliday committed
	"sync/atomic"
	"time"
Garet Halliday's avatar
Garet Halliday committed

	"github.com/google/uuid"
	"tuxpa.in/a/zlog/log"

	"pggat2/lib/auth"
	"pggat2/lib/bouncer/backends/v0"
	"pggat2/lib/bouncer/bouncers/v2"
	"pggat2/lib/bouncer/frontends/v0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/fed"
	packets "pggat2/lib/fed/packets/v3.0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/middleware"
	"pggat2/lib/middleware/interceptor"
	"pggat2/lib/middleware/middlewares/eqp"
	"pggat2/lib/middleware/middlewares/ps"
	"pggat2/lib/middleware/middlewares/unterminate"
	"pggat2/lib/util/slices"
	"pggat2/lib/util/strutil"
)

type poolRecipe struct {
	recipe Recipe
Garet Halliday's avatar
Garet Halliday committed
	count  atomic.Int64
Garet Halliday's avatar
Garet Halliday committed
}

type Pool struct {
	options Options

Garet Halliday's avatar
a  
Garet Halliday committed
	recipes maps.RWLocked[string, *poolRecipe]
	servers maps.RWLocked[uuid.UUID, *Server]
	clients maps.RWLocked[uuid.UUID, *Client]
Garet Halliday's avatar
Garet Halliday committed
}

func NewPool(options Options) *Pool {
	p := &Pool{
Garet Halliday's avatar
Garet Halliday committed
		options: options,
	}

	if options.ServerIdleTimeout != 0 {
Garet Halliday's avatar
Garet Halliday committed
		go p.idleTimeoutLoop()
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) {
Garet Halliday's avatar
a  
Garet Halliday committed
	T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
		peer, since := server.GetConnection()
		if peer != uuid.Nil {
			return true
		}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
a  
Garet Halliday committed
		if idle != (time.Time{}) && since.After(idle) {
			return true
		}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
a  
Garet Halliday committed
		idlest = serverID
		idle = since
		return true
	})
Garet Halliday's avatar
Garet Halliday committed

	return
}

func (T *Pool) idleTimeoutLoop() {
	for {
		var wait time.Duration

		now := time.Now()
		var idlest uuid.UUID
		var idle time.Time
		for idlest, idle = T.idlest(); idlest != uuid.Nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
			T.removeServer(idlest)
		}

		if idlest == uuid.Nil {
			wait = T.options.ServerIdleTimeout
		} else {
			wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
		}

		time.Sleep(wait)
	}
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetCredentials() auth.Credentials {
	return T.options.Credentials
}

func (T *Pool) _scaleUpRecipe(name string) {
Garet Halliday's avatar
a  
Garet Halliday committed
	r, ok := T.recipes.Load(name)
	if !ok {
		return
	}
Garet Halliday's avatar
Garet Halliday committed

	server, params, err := r.recipe.Dialer.Dial()
	if err != nil {
		log.Printf("failed to dial server: %v", err)
		return
Garet Halliday's avatar
Garet Halliday committed
	}

	var middlewares []middleware.Middleware

	var psServer *ps.Server
	if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
		// add ps middleware
		psServer = ps.NewServer(params.InitialParameters)
		middlewares = append(middlewares, psServer)
	}

	var eqpServer *eqp.Server
	if T.options.ExtendedQuerySync {
		// add eqp middleware
		eqpServer = eqp.NewServer()
		middlewares = append(middlewares, eqpServer)
	}

Garet Halliday's avatar
Garet Halliday committed
	if len(middlewares) > 0 {
		server = interceptor.NewInterceptor(
			server,
			middlewares...,
		)
	}

Garet Halliday's avatar
a  
Garet Halliday committed
	r.count.Add(1)
	serverID := uuid.New()
	T.servers.Store(serverID, NewServer(
		server,
		params.BackendKey,
		params.InitialParameters,
		name,
		psServer,
		eqpServer,
	))
Garet Halliday's avatar
Garet Halliday committed
	T.options.Pooler.AddServer(serverID)
}

func (T *Pool) AddRecipe(name string, recipe Recipe) {
Garet Halliday's avatar
a  
Garet Halliday committed
	_, hasOld := T.recipes.Swap(name, &poolRecipe{
Garet Halliday's avatar
Garet Halliday committed
		recipe: recipe,
Garet Halliday's avatar
a  
Garet Halliday committed
	})
	if hasOld {
		T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
			if server.GetRecipe() == name {
				_ = server.GetConn().Close()
				T.options.Pooler.RemoveServer(serverID)
				T.servers.Delete(serverID)
			}
			return true
		})
Garet Halliday's avatar
Garet Halliday committed
	}

	for i := 0; i < recipe.MinConnections; i++ {
		T._scaleUpRecipe(name)
	}
}

func (T *Pool) RemoveRecipe(name string) {
Garet Halliday's avatar
a  
Garet Halliday committed
	T.recipes.Delete(name)
Garet Halliday's avatar
Garet Halliday committed

	// close all servers with this recipe
Garet Halliday's avatar
a  
Garet Halliday committed

	T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
		if server.GetRecipe() == name {
			_ = server.GetConn().Close()
			T.options.Pooler.RemoveServer(serverID)
			T.servers.Delete(serverID)
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})
Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) ScaleUp() {
	T.recipes.Range(func(name string, r *poolRecipe) bool {
Garet Halliday's avatar
Garet Halliday committed
		if r.recipe.MaxConnections == 0 || int(r.count.Load()) < r.recipe.MaxConnections {
Garet Halliday's avatar
Garet Halliday committed
			T._scaleUpRecipe(name)
Garet Halliday's avatar
a  
Garet Halliday committed
			return false
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})
Garet Halliday's avatar
a  
Garet Halliday committed
func syncInitialParameters(
	trackedParameters []strutil.CIString,
Garet Halliday's avatar
Garet Halliday committed
	client fed.Conn,
Garet Halliday's avatar
Garet Halliday committed
	clientParams map[strutil.CIString]string,
Garet Halliday's avatar
Garet Halliday committed
	server fed.Conn,
Garet Halliday's avatar
Garet Halliday committed
	serverParams map[strutil.CIString]string,
) (clientErr, serverErr error) {
	for key, value := range clientParams {
Garet Halliday's avatar
a  
Garet Halliday committed
		setServer := slices.Contains(trackedParameters, key)
Garet Halliday's avatar
Garet Halliday committed

		// skip already set params
		if serverParams[key] == value {
			setServer = false
		} else if !setServer {
			value = serverParams[key]
		}

		p := packets.ParameterStatus{
			Key:   key.String(),
			Value: serverParams[key],
		}
		clientErr = client.WritePacket(p.IntoPacket())
		if clientErr != nil {
			return
		}

		if !setServer {
			continue
		}

		serverErr = backends.SetParameter(new(backends.Context), server, key, value)
		if serverErr != nil {
			return
		}
	}

	for key, value := range serverParams {
		if _, ok := clientParams[key]; ok {
			continue
		}

		// Don't need to run reset on server because it will reset it to the initial value

		// send to client
		p := packets.ParameterStatus{
			Key:   key.String(),
			Value: value,
		}
		clientErr = client.WritePacket(p.IntoPacket())
		if clientErr != nil {
			return
Garet Halliday's avatar
Garet Halliday committed
		}
	}

	return
}

func (T *Pool) Serve(
Garet Halliday's avatar
Garet Halliday committed
	client fed.Conn,
Garet Halliday's avatar
Garet Halliday committed
	accept frontends.AcceptParams,
	auth frontends.AuthenticateParams,
) error {
	defer func() {
		_ = client.Close()
	}()

	middlewares := []middleware.Middleware{
		unterminate.Unterminate,
	}

	var psClient *ps.Client
	if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
		// add ps middleware
		psClient = ps.NewClient(accept.InitialParameters)
		middlewares = append(middlewares, psClient)
	}

	var eqpClient *eqp.Client
	if T.options.ExtendedQuerySync {
		// add eqp middleware
		eqpClient = eqp.NewClient()
		middlewares = append(middlewares, eqpClient)
	}

	client = interceptor.NewInterceptor(
		client,
		middlewares...,
	)

Garet Halliday's avatar
Garet Halliday committed
	clientID := T.addClient(client, auth.BackendKey)
Garet Halliday's avatar
Garet Halliday committed
	defer T.removeClient(clientID)
Garet Halliday's avatar
Garet Halliday committed

	var serverID uuid.UUID
Garet Halliday's avatar
a  
Garet Halliday committed
	var server *Server
Garet Halliday's avatar
Garet Halliday committed

	defer func() {
		if serverID != uuid.Nil {
			T.releaseServer(serverID)
		}
	}()

	for {
		packet, err := client.ReadPacket(true)
		if err != nil {
			return err
		}

		var clientErr, serverErr error
		if serverID == uuid.Nil {
			serverID, server = T.acquireServer(clientID)

			switch T.options.ParameterStatusSync {
			case ParameterStatusSyncDynamic:
Garet Halliday's avatar
a  
Garet Halliday committed
				clientErr, serverErr = ps.Sync(T.options.TrackedParameters, client, psClient, server.GetConn(), server.GetPSServer())
Garet Halliday's avatar
Garet Halliday committed
			case ParameterStatusSyncInitial:
Garet Halliday's avatar
a  
Garet Halliday committed
				clientErr, serverErr = syncInitialParameters(T.options.TrackedParameters, client, accept.InitialParameters, server.GetConn(), server.GetInitialParameters())
Garet Halliday's avatar
Garet Halliday committed
			}

			if T.options.ExtendedQuerySync {
Garet Halliday's avatar
a  
Garet Halliday committed
				server.GetEQPServer().SetClient(eqpClient)
Garet Halliday's avatar
Garet Halliday committed
			}
		}
		if clientErr == nil && serverErr == nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			clientErr, serverErr = bouncers.Bounce(client, server.GetConn(), packet)
Garet Halliday's avatar
Garet Halliday committed
		}
		if serverErr != nil {
			T.removeServer(serverID)
			serverID = uuid.Nil
Garet Halliday's avatar
Garet Halliday committed
			server = nil
Garet Halliday's avatar
Garet Halliday committed
			return serverErr
		} else {
			if T.options.Pooler.ReleaseAfterTransaction() {
				T.releaseServer(serverID)
				serverID = uuid.Nil
Garet Halliday's avatar
Garet Halliday committed
				server = nil
Garet Halliday's avatar
a  
Garet Halliday committed
			} else {
				T.transactionComplete(serverID)
Garet Halliday's avatar
Garet Halliday committed
			}
		}

		if clientErr != nil {
			return clientErr
		}
	}
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) addClient(client fed.Conn, key [8]byte) uuid.UUID {
Garet Halliday's avatar
Garet Halliday committed
	clientID := uuid.New()

Garet Halliday's avatar
a  
Garet Halliday committed
	T.clients.Store(clientID, NewClient(
		client,
		key,
	))
Garet Halliday's avatar
Garet Halliday committed
	T.options.Pooler.AddClient(clientID)
	return clientID
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeClient(clientID uuid.UUID) {
Garet Halliday's avatar
a  
Garet Halliday committed
	T.clients.Delete(clientID)
Garet Halliday's avatar
Garet Halliday committed
	T.options.Pooler.RemoveClient(clientID)
}

Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking)
Garet Halliday's avatar
Garet Halliday committed
	if serverID == uuid.Nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		go T.ScaleUp()
Garet Halliday's avatar
Garet Halliday committed
		serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking)
Garet Halliday's avatar
a  
Garet Halliday committed
	server, _ = T.servers.Load(serverID)
	client, _ := T.clients.Load(clientID)
Garet Halliday's avatar
Garet Halliday committed
	if server != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		server.SetPeer(clientID)
Garet Halliday's avatar
Garet Halliday committed
	}
	if client != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		client.SetPeer(serverID)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	return
}

func (T *Pool) releaseServer(serverID uuid.UUID) {
Garet Halliday's avatar
a  
Garet Halliday committed
	server, _ := T.servers.Load(serverID)
Garet Halliday's avatar
Garet Halliday committed
	if server == nil {
		return
	}

Garet Halliday's avatar
a  
Garet Halliday committed
	clientID := server.SetPeer(uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if clientID != uuid.Nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		client, _ := T.clients.Load(clientID)
Garet Halliday's avatar
Garet Halliday committed
		if client != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			client.SetPeer(uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed
		}
	}

Garet Halliday's avatar
Garet Halliday committed
	if T.options.ServerResetQuery != "" {
Garet Halliday's avatar
a  
Garet Halliday committed
		err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			T.removeServer(serverID)
Garet Halliday's avatar
Garet Halliday committed
			return
		}
	}
	T.options.Pooler.Release(serverID)
}

Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) transactionComplete(serverID uuid.UUID) {

Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) removeServer(serverID uuid.UUID) {
Garet Halliday's avatar
a  
Garet Halliday committed
	server, _ := T.servers.LoadAndDelete(serverID)
	if server == nil {
		return
	}
	_ = server.GetConn().Close()
	T.options.Pooler.RemoveServer(serverID)
	r, _ := T.recipes.Load(server.GetRecipe())
	if r != nil {
		r.count.Add(-1)
	}
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Cancel(key [8]byte) error {
Garet Halliday's avatar
a  
Garet Halliday committed
	var clientID uuid.UUID
	T.clients.Range(func(id uuid.UUID, client *Client) bool {
		if client.GetBackendKey() == key {
			clientID = id
			return false
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})
Garet Halliday's avatar
a  
Garet Halliday committed
	if clientID == uuid.Nil {
		return nil
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	// get peer
	var recipe string
	var serverKey [8]byte
	if T.servers.Range(func(_ uuid.UUID, server *Server) bool {
		if server.GetPeer() == clientID {
			recipe = server.GetRecipe()
			serverKey = server.GetBackendKey()
			return false
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	}) {
		return nil
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	r, _ := T.recipes.Load(recipe)
	if r == nil {
Garet Halliday's avatar
Garet Halliday committed
		return nil
	}

Garet Halliday's avatar
a  
Garet Halliday committed
	return r.recipe.Dialer.Cancel(serverKey)
}

func (T *Pool) ReadMetrics(metrics *Metrics) {
	maps.Clear(metrics.Servers)
	maps.Clear(metrics.Clients)

	T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
		var m ServerMetrics
		server.ReadMetrics(&m)
		metrics.Servers[serverID] = m
		return true
	})

	T.clients.Range(func(clientID uuid.UUID, client *Client) bool {
		var m ClientMetrics
		client.ReadMetrics(&m)
		metrics.Clients[clientID] = m
		return true
	})