good morning!!!!

Skip to content
Snippets Groups Projects
pool.go 11.6 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package pool

import (
Garet Halliday's avatar
Garet Halliday committed
	"sync"
Garet Halliday's avatar
Garet Halliday committed
	"time"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/util/maps"

Garet Halliday's avatar
Garet Halliday committed
	"github.com/google/uuid"
	"tuxpa.in/a/zlog/log"

	"pggat2/lib/auth"
	"pggat2/lib/bouncer/backends/v0"
	"pggat2/lib/bouncer/bouncers/v2"
	"pggat2/lib/bouncer/frontends/v0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/fed"
	packets "pggat2/lib/fed/packets/v3.0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/middleware"
	"pggat2/lib/middleware/interceptor"
	"pggat2/lib/middleware/middlewares/eqp"
	"pggat2/lib/middleware/middlewares/ps"
	"pggat2/lib/middleware/middlewares/unterminate"
	"pggat2/lib/util/slices"
	"pggat2/lib/util/strutil"
)

type poolRecipe struct {
	recipe Recipe
Garet Halliday's avatar
Garet Halliday committed

	deleted bool
	servers map[uuid.UUID]*Server
	mu      sync.RWMutex
}

func (T *poolRecipe) AddServer(serverID uuid.UUID, server *Server) bool {
	T.mu.Lock()
	defer T.mu.Unlock()

	if T.deleted {
		return false
	}

	if T.recipe.MaxConnections != 0 && len(T.servers)+1 > T.recipe.MaxConnections {
		return false
	}

	if T.servers == nil {
		T.servers = make(map[uuid.UUID]*Server)
	}
	T.servers[serverID] = server
	return true
}

func (T *poolRecipe) GetServer(serverID uuid.UUID) *Server {
	T.mu.RLock()
	defer T.mu.RUnlock()

	if T.deleted {
		return nil
	}

	return T.servers[serverID]
}

func (T *poolRecipe) DeleteServer(serverID uuid.UUID) *Server {
	T.mu.RLock()
	defer T.mu.RUnlock()

	if T.deleted {
		return nil
	}

	server := T.servers[serverID]
	delete(T.servers, serverID)
	return server
}

func (T *poolRecipe) Size() int {
	T.mu.RLock()
	defer T.mu.RUnlock()

	return len(T.servers)
}

func (T *poolRecipe) RangeRLock(fn func(serverID uuid.UUID, server *Server) bool) bool {
	T.mu.RLock()
	defer T.mu.RUnlock()

	for serverID, server := range T.servers {
		if !fn(serverID, server) {
			return false
		}
	}

	return true
}

func (T *poolRecipe) Delete(fn func(serverID uuid.UUID, server *Server)) {
	T.mu.Lock()
	defer T.mu.Unlock()

	T.deleted = true
	for serverID, server := range T.servers {
		fn(serverID, server)
		delete(T.servers, serverID)
	}
Garet Halliday's avatar
Garet Halliday committed
}

type Pool struct {
	options Options

Garet Halliday's avatar
a  
Garet Halliday committed
	recipes maps.RWLocked[string, *poolRecipe]
Garet Halliday's avatar
Garet Halliday committed
	servers maps.RWLocked[uuid.UUID, *poolRecipe]
Garet Halliday's avatar
a  
Garet Halliday committed
	clients maps.RWLocked[uuid.UUID, *Client]
Garet Halliday's avatar
Garet Halliday committed
}

func NewPool(options Options) *Pool {
	p := &Pool{
Garet Halliday's avatar
Garet Halliday committed
		options: options,
	}

	if options.ServerIdleTimeout != 0 {
Garet Halliday's avatar
Garet Halliday committed
		go p.idleTimeoutLoop()
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetServer(serverID uuid.UUID) *Server {
	recipe, _ := T.servers.Load(serverID)
	if recipe == nil {
		return nil
	}
	return recipe.GetServer(serverID)
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) {
Garet Halliday's avatar
Garet Halliday committed
	T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
		recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool {
			peer, since := server.GetConnection()
			if peer != uuid.Nil {
				return true
			}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
			if idle != (time.Time{}) && since.After(idle) {
				return true
			}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
			idlest = serverID
			idle = since
			return true
		})
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})
Garet Halliday's avatar
Garet Halliday committed

	return
}

func (T *Pool) idleTimeoutLoop() {
	for {
		var wait time.Duration

		now := time.Now()
		var idlest uuid.UUID
		var idle time.Time
		for idlest, idle = T.idlest(); idlest != uuid.Nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
			T.removeServer(idlest)
		}

		if idlest == uuid.Nil {
			wait = T.options.ServerIdleTimeout
		} else {
			wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
		}

		time.Sleep(wait)
	}
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetCredentials() auth.Credentials {
	return T.options.Credentials
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) scaleUpRecipe(r *poolRecipe) {
Garet Halliday's avatar
Garet Halliday committed
	server, params, err := r.recipe.Dialer.Dial()
	if err != nil {
		log.Printf("failed to dial server: %v", err)
		return
Garet Halliday's avatar
Garet Halliday committed
	}

	var middlewares []middleware.Middleware

	var psServer *ps.Server
	if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
		// add ps middleware
		psServer = ps.NewServer(params.InitialParameters)
		middlewares = append(middlewares, psServer)
	}

	var eqpServer *eqp.Server
	if T.options.ExtendedQuerySync {
		// add eqp middleware
		eqpServer = eqp.NewServer()
		middlewares = append(middlewares, eqpServer)
	}

Garet Halliday's avatar
Garet Halliday committed
	if len(middlewares) > 0 {
		server = interceptor.NewInterceptor(
			server,
			middlewares...,
		)
	}

Garet Halliday's avatar
Garet Halliday committed
	serverID := T.options.Pooler.NewServer()
	ok := r.AddServer(serverID, NewServer(
Garet Halliday's avatar
a  
Garet Halliday committed
		server,
		params.BackendKey,
		params.InitialParameters,
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
a  
Garet Halliday committed
		psServer,
		eqpServer,
	))
Garet Halliday's avatar
Garet Halliday committed
	if !ok {
		_ = server.Close()
		T.options.Pooler.DeleteServer(serverID)
		return
	}
	T.servers.Store(serverID, r)
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) AddRecipe(name string, recipe Recipe) {
Garet Halliday's avatar
Garet Halliday committed
	r := &poolRecipe{
Garet Halliday's avatar
Garet Halliday committed
		recipe: recipe,
Garet Halliday's avatar
Garet Halliday committed
	}
	old, _ := T.recipes.Swap(name, r)
	if old != nil {
		old.Delete(func(serverID uuid.UUID, server *Server) {
			_ = server.GetConn().Close()
			T.options.Pooler.DeleteServer(serverID)
			T.servers.Delete(serverID)
Garet Halliday's avatar
a  
Garet Halliday committed
		})
Garet Halliday's avatar
Garet Halliday committed
	}

	for i := 0; i < recipe.MinConnections; i++ {
Garet Halliday's avatar
Garet Halliday committed
		T.scaleUpRecipe(r)
Garet Halliday's avatar
Garet Halliday committed
	}
}

func (T *Pool) RemoveRecipe(name string) {
Garet Halliday's avatar
Garet Halliday committed
	old, _ := T.recipes.LoadAndDelete(name)

	if old == nil {
		return
	}
Garet Halliday's avatar
Garet Halliday committed

	// close all servers with this recipe
Garet Halliday's avatar
a  
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	old.Delete(func(serverID uuid.UUID, server *Server) {
		_ = server.GetConn().Close()
		T.options.Pooler.DeleteServer(serverID)
		T.servers.Delete(serverID)
Garet Halliday's avatar
a  
Garet Halliday committed
	})
Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) ScaleUp() {
Garet Halliday's avatar
Garet Halliday committed
	T.recipes.Range(func(_ string, r *poolRecipe) bool {
		// this can race, but it will just dial an extra server and disconnect it in worst case
		if r.recipe.MaxConnections == 0 || r.Size() < r.recipe.MaxConnections {
			T.scaleUpRecipe(r)
Garet Halliday's avatar
a  
Garet Halliday committed
			return false
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})
Garet Halliday's avatar
a  
Garet Halliday committed
func syncInitialParameters(
	trackedParameters []strutil.CIString,
Garet Halliday's avatar
Garet Halliday committed
	client fed.Conn,
Garet Halliday's avatar
Garet Halliday committed
	clientParams map[strutil.CIString]string,
Garet Halliday's avatar
Garet Halliday committed
	server fed.Conn,
Garet Halliday's avatar
Garet Halliday committed
	serverParams map[strutil.CIString]string,
) (clientErr, serverErr error) {
	for key, value := range clientParams {
Garet Halliday's avatar
a  
Garet Halliday committed
		setServer := slices.Contains(trackedParameters, key)
Garet Halliday's avatar
Garet Halliday committed

		// skip already set params
		if serverParams[key] == value {
			setServer = false
		} else if !setServer {
			value = serverParams[key]
		}

		p := packets.ParameterStatus{
			Key:   key.String(),
			Value: serverParams[key],
		}
		clientErr = client.WritePacket(p.IntoPacket())
		if clientErr != nil {
			return
		}

		if !setServer {
			continue
		}

		serverErr = backends.SetParameter(new(backends.Context), server, key, value)
		if serverErr != nil {
			return
		}
	}

	for key, value := range serverParams {
		if _, ok := clientParams[key]; ok {
			continue
		}

		// Don't need to run reset on server because it will reset it to the initial value

		// send to client
		p := packets.ParameterStatus{
			Key:   key.String(),
			Value: value,
		}
		clientErr = client.WritePacket(p.IntoPacket())
		if clientErr != nil {
			return
Garet Halliday's avatar
Garet Halliday committed
		}
	}

	return
}

func (T *Pool) Serve(
Garet Halliday's avatar
Garet Halliday committed
	client fed.Conn,
Garet Halliday's avatar
Garet Halliday committed
	accept frontends.AcceptParams,
	auth frontends.AuthenticateParams,
) error {
	defer func() {
		_ = client.Close()
	}()

	middlewares := []middleware.Middleware{
		unterminate.Unterminate,
	}

	var psClient *ps.Client
	if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
		// add ps middleware
		psClient = ps.NewClient(accept.InitialParameters)
		middlewares = append(middlewares, psClient)
	}

	var eqpClient *eqp.Client
	if T.options.ExtendedQuerySync {
		// add eqp middleware
		eqpClient = eqp.NewClient()
		middlewares = append(middlewares, eqpClient)
	}

	client = interceptor.NewInterceptor(
		client,
		middlewares...,
	)

Garet Halliday's avatar
Garet Halliday committed
	clientID := T.addClient(client, auth.BackendKey)
Garet Halliday's avatar
Garet Halliday committed
	defer T.removeClient(clientID)
Garet Halliday's avatar
Garet Halliday committed

	var serverID uuid.UUID
Garet Halliday's avatar
a  
Garet Halliday committed
	var server *Server
Garet Halliday's avatar
Garet Halliday committed

	defer func() {
		if serverID != uuid.Nil {
			T.releaseServer(serverID)
		}
	}()

	for {
		packet, err := client.ReadPacket(true)
		if err != nil {
			return err
		}

		var clientErr, serverErr error
		if serverID == uuid.Nil {
			serverID, server = T.acquireServer(clientID)

			switch T.options.ParameterStatusSync {
			case ParameterStatusSyncDynamic:
Garet Halliday's avatar
a  
Garet Halliday committed
				clientErr, serverErr = ps.Sync(T.options.TrackedParameters, client, psClient, server.GetConn(), server.GetPSServer())
Garet Halliday's avatar
Garet Halliday committed
			case ParameterStatusSyncInitial:
Garet Halliday's avatar
a  
Garet Halliday committed
				clientErr, serverErr = syncInitialParameters(T.options.TrackedParameters, client, accept.InitialParameters, server.GetConn(), server.GetInitialParameters())
Garet Halliday's avatar
Garet Halliday committed
			}

			if T.options.ExtendedQuerySync {
Garet Halliday's avatar
a  
Garet Halliday committed
				server.GetEQPServer().SetClient(eqpClient)
Garet Halliday's avatar
Garet Halliday committed
			}
		}
		if clientErr == nil && serverErr == nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			clientErr, serverErr = bouncers.Bounce(client, server.GetConn(), packet)
Garet Halliday's avatar
Garet Halliday committed
		}
		if serverErr != nil {
			T.removeServer(serverID)
			serverID = uuid.Nil
Garet Halliday's avatar
Garet Halliday committed
			server = nil
Garet Halliday's avatar
Garet Halliday committed
			return serverErr
		} else {
Garet Halliday's avatar
Garet Halliday committed
			T.transactionComplete(clientID, serverID)
Garet Halliday's avatar
Garet Halliday committed
			if T.options.Pooler.ReleaseAfterTransaction() {
				T.releaseServer(serverID)
				serverID = uuid.Nil
Garet Halliday's avatar
Garet Halliday committed
				server = nil
Garet Halliday's avatar
Garet Halliday committed
			}
		}

		if clientErr != nil {
			return clientErr
		}
	}
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) addClient(client fed.Conn, key [8]byte) uuid.UUID {
Garet Halliday's avatar
Garet Halliday committed
	clientID := T.options.Pooler.NewClient()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
a  
Garet Halliday committed
	T.clients.Store(clientID, NewClient(
		client,
		key,
	))
Garet Halliday's avatar
Garet Halliday committed
	return clientID
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeClient(clientID uuid.UUID) {
Garet Halliday's avatar
a  
Garet Halliday committed
	T.clients.Delete(clientID)
Garet Halliday's avatar
Garet Halliday committed
	T.options.Pooler.DeleteClient(clientID)
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	client, _ := T.clients.Load(clientID)
	if client != nil {
		client.SetPeer(Stalling)
	}

Garet Halliday's avatar
Garet Halliday committed
	serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking)
Garet Halliday's avatar
Garet Halliday committed
	if serverID == uuid.Nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		go T.ScaleUp()
Garet Halliday's avatar
Garet Halliday committed
		serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking)
Garet Halliday's avatar
Garet Halliday committed
	server = T.GetServer(serverID)
Garet Halliday's avatar
Garet Halliday committed
	if server != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		server.SetPeer(clientID)
Garet Halliday's avatar
Garet Halliday committed
	}
	if client != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		client.SetPeer(serverID)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	return
}

func (T *Pool) releaseServer(serverID uuid.UUID) {
Garet Halliday's avatar
Garet Halliday committed
	server := T.GetServer(serverID)
Garet Halliday's avatar
Garet Halliday committed
	if server == nil {
		return
	}

Garet Halliday's avatar
Garet Halliday committed
	clientID := server.SetPeer(Stalling)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if clientID != uuid.Nil {
Garet Halliday's avatar
a  
Garet Halliday committed
		client, _ := T.clients.Load(clientID)
Garet Halliday's avatar
Garet Halliday committed
		if client != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			client.SetPeer(uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed
		}
	}

Garet Halliday's avatar
Garet Halliday committed
	if T.options.ServerResetQuery != "" {
Garet Halliday's avatar
a  
Garet Halliday committed
		err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			T.removeServer(serverID)
Garet Halliday's avatar
Garet Halliday committed
			return
		}
	}
Garet Halliday's avatar
Garet Halliday committed

	server.SetPeer(uuid.Nil)

Garet Halliday's avatar
Garet Halliday committed
	T.options.Pooler.Release(serverID)
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) transactionComplete(clientID, serverID uuid.UUID) {
	func() {
		server := T.GetServer(serverID)
		if server == nil {
			return
		}
Garet Halliday's avatar
a  
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		server.TransactionComplete()
	}()

	client, _ := T.clients.Load(clientID)
	if client == nil {
		return
	}

	client.TransactionComplete()
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) removeServer(serverID uuid.UUID) {
Garet Halliday's avatar
Garet Halliday committed
	recipe, _ := T.servers.LoadAndDelete(serverID)
	if recipe == nil {
		return
	}
	server := recipe.DeleteServer(serverID)
	T.options.Pooler.DeleteServer(serverID)
Garet Halliday's avatar
a  
Garet Halliday committed
	if server == nil {
		return
	}
	_ = server.GetConn().Close()
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Cancel(key [8]byte) error {
Garet Halliday's avatar
a  
Garet Halliday committed
	var clientID uuid.UUID
	T.clients.Range(func(id uuid.UUID, client *Client) bool {
		if client.GetBackendKey() == key {
			clientID = id
			return false
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})
Garet Halliday's avatar
a  
Garet Halliday committed
	if clientID == uuid.Nil {
		return nil
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	// get peer
Garet Halliday's avatar
Garet Halliday committed
	var r *poolRecipe
Garet Halliday's avatar
a  
Garet Halliday committed
	var serverKey [8]byte
Garet Halliday's avatar
Garet Halliday committed
	if T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
		return recipe.RangeRLock(func(_ uuid.UUID, server *Server) bool {
			if server.GetPeer() == clientID {
				r = recipe
				serverKey = server.GetBackendKey()
				return false
			}
			return true
		})
Garet Halliday's avatar
a  
Garet Halliday committed
	}) {
		return nil
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	return r.recipe.Dialer.Cancel(serverKey)
}

func (T *Pool) ReadMetrics(metrics *Metrics) {
Garet Halliday's avatar
Garet Halliday committed
	if metrics.Servers == nil {
		metrics.Servers = make(map[uuid.UUID]ItemMetrics)
	}
	if metrics.Clients == nil {
		metrics.Clients = make(map[uuid.UUID]ItemMetrics)
	}
Garet Halliday's avatar
a  
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
		recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool {
Garet Halliday's avatar
Garet Halliday committed
			var m ItemMetrics
Garet Halliday's avatar
Garet Halliday committed
			server.ReadMetrics(&m)
			metrics.Servers[serverID] = m
			return true
		})
Garet Halliday's avatar
a  
Garet Halliday committed
		return true
	})

	T.clients.Range(func(clientID uuid.UUID, client *Client) bool {
Garet Halliday's avatar
Garet Halliday committed
		var m ItemMetrics
Garet Halliday's avatar
a  
Garet Halliday committed
		client.ReadMetrics(&m)
		metrics.Clients[clientID] = m
		return true
	})