good morning!!!!

Skip to content
Snippets Groups Projects
pool.go 8.11 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package pool

import (
Garet Halliday's avatar
Garet Halliday committed
	"log"
Garet Halliday's avatar
Garet Halliday committed
	"sync"
Garet Halliday's avatar
Garet Halliday committed
	"time"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"github.com/google/uuid"

Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/auth"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/bouncer/backends/v0"
	"pggat/lib/bouncer/bouncers/v2"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/fed"
Garet Halliday's avatar
Garet Halliday committed
	packets "pggat/lib/fed/packets/v3.0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/gat/metrics"
	"pggat/lib/gat/pool/recipe"
	"pggat/lib/util/slices"
	"pggat/lib/util/strutil"
Garet Halliday's avatar
Garet Halliday committed
)

type Pool struct {
	options Options

	closed chan struct{}

	recipes         map[string]*recipe.Recipe
	clients         map[uuid.UUID]*Client
Garet Halliday's avatar
Garet Halliday committed
	clientsByKey    map[[8]byte]*Client
	servers         map[uuid.UUID]*Server
	serversByRecipe map[string][]*Server
	mu              sync.RWMutex
Garet Halliday's avatar
Garet Halliday committed
}

func NewPool(options Options) *Pool {
Garet Halliday's avatar
Garet Halliday committed
	p := &Pool{
		closed:  make(chan struct{}),
Garet Halliday's avatar
Garet Halliday committed
		options: options,
	}
Garet Halliday's avatar
Garet Halliday committed

	if options.ServerIdleTimeout != 0 {
		go p.idleLoop()
	}

	return p
}

func (T *Pool) idlest() (server *Server, at time.Time) {
	T.mu.RLock()
	defer T.mu.RUnlock()

	for _, s := range T.servers {
		state, _, since := s.GetState()
Garet Halliday's avatar
Garet Halliday committed
		if state != metrics.ConnStateIdle {
Garet Halliday's avatar
Garet Halliday committed
			continue
		}

		if at == (time.Time{}) || since.Before(at) {
			server = s
			at = since
		}
	}

	return
}

func (T *Pool) idleLoop() {
	for {
		select {
		case <-T.closed:
			return
		default:
		}

Garet Halliday's avatar
Garet Halliday committed
		var wait time.Duration

		now := time.Now()
		var idlest *Server
		var idle time.Time
		for idlest, idle = T.idlest(); idlest != nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
			T.removeServer(idlest)
		}

		if idlest == nil {
			wait = T.options.ServerIdleTimeout
		} else {
			wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
		}

		time.Sleep(wait)
	}
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetCredentials() auth.Credentials {
	return T.options.Credentials
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
Garet Halliday's avatar
Garet Halliday committed
	func() {
		T.mu.Lock()
		defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		if T.recipes == nil {
			T.recipes = make(map[string]*recipe.Recipe)
		}
		T.recipes[name] = r
	}()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	count := r.AllocateInitial()
	for i := 0; i < count; i++ {
		T.scaleUpL1(name, r)
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) RemoveRecipe(name string) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeRecipe(name string) {
	r, ok := T.recipes[name]
	if !ok {
Garet Halliday's avatar
Garet Halliday committed
		return
	}
Garet Halliday's avatar
Garet Halliday committed
	delete(T.recipes, name)
Garet Halliday's avatar
Garet Halliday committed

	servers := T.serversByRecipe[name]
	delete(T.serversByRecipe, name)

	for _, server := range servers {
		r.Free()
		T.removeServerL1(server)
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) scaleUp() {
	name, r := func() (string, *recipe.Recipe) {
		T.mu.RLock()
		defer T.mu.RUnlock()
		for name, r := range T.recipes {
			if r.Allocate() {
				return name, r
			}
		}

		return "", nil
	}()
	if r == nil {
		// no recipe to scale
		return
	}

Garet Halliday's avatar
Garet Halliday committed
	T.scaleUpL1(name, r)
}

func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) {
	conn, params, err := r.Dial()
	if err != nil {
Garet Halliday's avatar
Garet Halliday committed
		log.Print("failed to dial server: ", err)
		// failed to dial
		r.Free()
		return
	}

	T.mu.Lock()
	defer T.mu.Unlock()
	if T.recipes[name] != r {
		// recipe was removed
		r.Free()
		return
	}

	id := T.options.Pooler.NewServer()
	server := NewServer(
		T.options,
		id,
		name,
		conn,
		params.InitialParameters,
		params.BackendKey,
	)

	if T.servers == nil {
		T.servers = make(map[uuid.UUID]*Server)
	}
	T.servers[id] = server

	if T.serversByRecipe == nil {
		T.serversByRecipe = make(map[string][]*Server)
	}
	T.serversByRecipe[name] = append(T.serversByRecipe[name], server)
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeServer(server *Server) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

	T.removeServerL1(server)
}

func (T *Pool) removeServerL1(server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	delete(T.servers, server.GetID())
	T.options.Pooler.DeleteServer(server.GetID())
	_ = server.GetConn().Close()
	if T.serversByRecipe != nil {
		T.serversByRecipe[server.GetRecipe()] = slices.Remove(T.serversByRecipe[server.GetRecipe()], server)
	}
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) acquireServer(client *Client) *Server {
Garet Halliday's avatar
Garet Halliday committed
	client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	serverID := T.options.Pooler.Acquire(client.GetID(), SyncModeNonBlocking)
	if serverID == uuid.Nil {
		// TODO(garet) can this be run on same thread and only create a goroutine if scaling is possible?
		go T.scaleUp()
		serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking)
Garet Halliday's avatar
Garet Halliday committed
	T.mu.RLock()
	defer T.mu.RUnlock()
	return T.servers[serverID]
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) releaseServer(server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed
	if T.options.ServerResetQuery != "" {
		err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
		if err != nil {
			T.removeServer(server)
			return
Garet Halliday's avatar
Garet Halliday committed
	server.SetState(metrics.ConnStateIdle, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

	T.options.Pooler.Release(server.GetID())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Serve(
Garet Halliday's avatar
Garet Halliday committed
	conn fed.Conn,
	initialParameters map[strutil.CIString]string,
	backendKey [8]byte,
Garet Halliday's avatar
Garet Halliday committed
) error {
	defer func() {
Garet Halliday's avatar
Garet Halliday committed
		_ = conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	}()

Garet Halliday's avatar
Garet Halliday committed
	client := NewClient(
		T.options,
		conn,
		initialParameters,
		backendKey,
Garet Halliday's avatar
Garet Halliday committed
	return T.serve(client, false)
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for
// real clients
func (T *Pool) ServeBot(
	conn fed.Conn,
) error {
	defer func() {
		_ = conn.Close()
	}()

	client := NewClient(
		T.options,
		conn,
		nil,
		[8]byte{},
	)

	return T.serve(client, true)
}

func (T *Pool) serve(client *Client, initialize bool) error {
Garet Halliday's avatar
Garet Halliday committed
	T.addClient(client)
	defer T.removeClient(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	var server *Server
Garet Halliday's avatar
Garet Halliday committed
	if !initialize {
		server = T.acquireServer(client)

		err, serverErr := Pair(T.options, client, server)
		if serverErr != nil {
			T.removeServer(server)
			return serverErr
		}
		if err != nil {
			T.releaseServer(server)
			return err
		}

		p := packets.ReadyForQuery('I')
		err = client.GetConn().WritePacket(p.IntoPacket())
		if err != nil {
			T.releaseServer(server)
			return err
		}
	}
Garet Halliday's avatar
Garet Halliday committed

	for {
Garet Halliday's avatar
Garet Halliday committed
		if server != nil && T.options.ReleaseAfterTransaction {
			client.SetState(metrics.ConnStateIdle, uuid.Nil)
			go T.releaseServer(server) // TODO(garet) does this need to be a goroutine
			server = nil
		}

Garet Halliday's avatar
Garet Halliday committed
		packet, err := client.GetConn().ReadPacket(true)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
Garet Halliday's avatar
Garet Halliday committed
			if server != nil {
				T.releaseServer(server)
			}
Garet Halliday's avatar
Garet Halliday committed
			return err
		}

Garet Halliday's avatar
Garet Halliday committed
		var serverErr error
		if server == nil {
			server = T.acquireServer(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
			err, serverErr = Pair(T.options, client, server)
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed
		if err == nil && serverErr == nil {
			err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet)
Garet Halliday's avatar
Garet Halliday committed
		}
		if serverErr != nil {
Garet Halliday's avatar
Garet Halliday committed
			T.removeServer(server)
Garet Halliday's avatar
Garet Halliday committed
			return serverErr
		} else {
Garet Halliday's avatar
Garet Halliday committed
			TransactionComplete(client, server)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		}

		if err != nil {
Garet Halliday's avatar
Garet Halliday committed
			T.releaseServer(server)
Garet Halliday's avatar
Garet Halliday committed
			return err
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) addClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if T.clients == nil {
		T.clients = make(map[uuid.UUID]*Client)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	T.clients[client.GetID()] = client
Garet Halliday's avatar
Garet Halliday committed
	if T.clientsByKey == nil {
		T.clientsByKey = make(map[[8]byte]*Client)
	}
	T.clientsByKey[client.GetBackendKey()] = client
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()

	T.removeClientL1(client)
}

func (T *Pool) removeClientL1(client *Client) {
	_ = client.conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	delete(T.clients, client.GetID())
Garet Halliday's avatar
Garet Halliday committed
	delete(T.clientsByKey, client.GetBackendKey())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Cancel(key [8]byte) error {
Garet Halliday's avatar
Garet Halliday committed
	T.mu.RLock()
	defer T.mu.RUnlock()

	client, ok := T.clientsByKey[key]
	if !ok {
		return nil
	}

	state, peer, _ := client.GetState()
Garet Halliday's avatar
Garet Halliday committed
	if state != metrics.ConnStateActive {
Garet Halliday's avatar
Garet Halliday committed
		return nil
	}

	server, ok := T.servers[peer]
	if !ok {
		return nil
	}

	r, ok := T.recipes[server.GetRecipe()]
	if !ok {
		return nil
	}

	return r.Cancel(server.GetBackendKey())
Garet Halliday's avatar
a  
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) ReadMetrics(m *metrics.Pool) {
	T.mu.RLock()
	defer T.mu.RUnlock()

	if len(T.clients) != 0 && m.Clients == nil {
		m.Clients = make(map[uuid.UUID]metrics.Conn)
	}
	if len(T.servers) != 0 && m.Servers == nil {
		m.Servers = make(map[uuid.UUID]metrics.Conn)
	}

	for id, client := range T.clients {
		var mc metrics.Conn
		client.ReadMetrics(&mc)
		m.Clients[id] = mc
	}

	for id, server := range T.servers {
		var mc metrics.Conn
		server.ReadMetrics(&mc)
		m.Servers[id] = mc
	}

func (T *Pool) Close() {
	close(T.closed)

	T.mu.Lock()
	defer T.mu.Unlock()

	// remove clients
	for _, client := range T.clients {
		T.removeClient(client)
	}

	// remove recipes
	for name := range T.recipes {
		T.removeRecipe(name)
	}
}