good morning!!!!

Skip to content
Snippets Groups Projects
pool.go 3.73 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package pool

import (
Garet Halliday's avatar
Garet Halliday committed
	"sync"

Garet Halliday's avatar
Garet Halliday committed
	"github.com/google/uuid"

	"pggat2/lib/auth"
	"pggat2/lib/bouncer/backends/v0"
	"pggat2/lib/bouncer/bouncers/v2"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/fed"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/gat/pool/metrics"
	"pggat2/lib/gat/pool/recipe"
Garet Halliday's avatar
Garet Halliday committed
	"pggat2/lib/util/strutil"
)

type Pool struct {
	options Options

Garet Halliday's avatar
Garet Halliday committed
	recipes map[string]*recipe.Recipe
	clients map[uuid.UUID]*Client
	servers map[uuid.UUID]*Server
	mu      sync.RWMutex
Garet Halliday's avatar
Garet Halliday committed
}

func NewPool(options Options) *Pool {
Garet Halliday's avatar
Garet Halliday committed
	return &Pool{
Garet Halliday's avatar
Garet Halliday committed
		options: options,
	}
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetCredentials() auth.Credentials {
	return T.options.Credentials
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if T.recipes == nil {
		T.recipes = make(map[string]*recipe.Recipe)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	T.recipes[name] = r
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	// TODO(garet) allocate servers until at the min
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) RemoveRecipe(name string) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeRecipe(name string) {
	r, ok := T.recipes[name]
	if !ok {
Garet Halliday's avatar
Garet Halliday committed
		return
	}
Garet Halliday's avatar
Garet Halliday committed
	delete(T.recipes, name)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	// TODO(garet) deallocate all servers created by recipe
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) scaleUp() {
	// TODO(garet)
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeServer(server *Server) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	delete(T.servers, server.GetID())
	T.options.Pooler.DeleteServer(server.GetID())
	_ = server.GetConn().Close()
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) acquireServer(client *Client) *Server {
	client.SetState(StateAwaitingServer, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	serverID := T.options.Pooler.Acquire(client.GetID(), SyncModeNonBlocking)
	if serverID == uuid.Nil {
		// TODO(garet) can this be run on same thread and only create a goroutine if scaling is possible?
		go T.scaleUp()
		serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking)
Garet Halliday's avatar
Garet Halliday committed
	}

Garet Halliday's avatar
Garet Halliday committed
	T.mu.RLock()
	defer T.mu.RUnlock()
	return T.servers[serverID]
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) releaseServer(server *Server) {
	server.SetState(StateRunningResetQuery, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed
	if T.options.ServerResetQuery != "" {
		err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
		if err != nil {
			T.removeServer(server)
			return
Garet Halliday's avatar
Garet Halliday committed
	server.SetState(StateIdle, uuid.Nil)

	T.options.Pooler.Release(server.GetID())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Serve(
Garet Halliday's avatar
Garet Halliday committed
	conn fed.Conn,
	initialParameters map[strutil.CIString]string,
	backendKey [8]byte,
Garet Halliday's avatar
Garet Halliday committed
) error {
	defer func() {
Garet Halliday's avatar
Garet Halliday committed
		_ = conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	}()

Garet Halliday's avatar
Garet Halliday committed
	client := NewClient(
		T.options,
		conn,
		initialParameters,
		backendKey,
Garet Halliday's avatar
Garet Halliday committed
	return T.serve(client)
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) serve(client *Client) error {
	T.addClient(client)
	defer T.removeClient(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	var server *Server
Garet Halliday's avatar
Garet Halliday committed

	for {
Garet Halliday's avatar
Garet Halliday committed
		packet, err := client.GetConn().ReadPacket(true)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
Garet Halliday's avatar
Garet Halliday committed
			if server != nil {
				T.releaseServer(server)
			}
Garet Halliday's avatar
Garet Halliday committed
			return err
		}

Garet Halliday's avatar
Garet Halliday committed
		var serverErr error
		if server == nil {
			server = T.acquireServer(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
			err, serverErr = Pair(T.options, client, server)
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed
		if err == nil && serverErr == nil {
			err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet)
Garet Halliday's avatar
Garet Halliday committed
		}
		if serverErr != nil {
Garet Halliday's avatar
Garet Halliday committed
			T.removeServer(server)
Garet Halliday's avatar
Garet Halliday committed
			return serverErr
		} else {
Garet Halliday's avatar
Garet Halliday committed
			TransactionComplete(client, server)
			if T.options.ReleaseAfterTransaction {
				client.SetState(StateIdle, uuid.Nil)
				go T.releaseServer(server) // TODO(garet) does this need to be a goroutine
Garet Halliday's avatar
Garet Halliday committed
				server = nil
Garet Halliday's avatar
Garet Halliday committed
			}
		}

		if err != nil {
Garet Halliday's avatar
Garet Halliday committed
			if server != nil {
				T.releaseServer(server)
			}
			return err
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) addClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if T.clients == nil {
		T.clients = make(map[uuid.UUID]*Client)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	T.clients[client.GetID()] = client
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()

	delete(T.clients, client.GetID())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Cancel(key [8]byte) error {
Garet Halliday's avatar
a  
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) ReadMetrics(metrics *metrics.Pool) {
Garet Halliday's avatar
a  
Garet Halliday committed