good morning!!!!

Skip to content
Snippets Groups Projects
pool.go 9.27 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package pool

import (
Garet Halliday's avatar
Garet Halliday committed
	"errors"
Garet Halliday's avatar
Garet Halliday committed
	"sync"
	"sync/atomic"
Garet Halliday's avatar
Garet Halliday committed
	"time"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"github.com/google/uuid"
Garet Halliday's avatar
Garet Halliday committed
	"tuxpa.in/a/zlog/log"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/auth"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/bouncer/backends/v0"
	"pggat/lib/bouncer/bouncers/v2"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/fed"
Garet Halliday's avatar
Garet Halliday committed
	packets "pggat/lib/fed/packets/v3.0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/gat/metrics"
	"pggat/lib/gat/pool/recipe"
	"pggat/lib/util/slices"
	"pggat/lib/util/strutil"
Garet Halliday's avatar
Garet Halliday committed
)

type Pool struct {
	options Options

	closed chan struct{}

Garet Halliday's avatar
Garet Halliday committed
	scalingUp atomic.Bool
	recipes         map[string]*recipe.Recipe
	clients         map[uuid.UUID]*Client
Garet Halliday's avatar
Garet Halliday committed
	clientsByKey    map[[8]byte]*Client
	servers         map[uuid.UUID]*Server
	serversByRecipe map[string][]*Server
	mu              sync.RWMutex
Garet Halliday's avatar
Garet Halliday committed
}

func NewPool(options Options) *Pool {
Garet Halliday's avatar
Garet Halliday committed
	p := &Pool{
		closed:  make(chan struct{}),
Garet Halliday's avatar
Garet Halliday committed
		options: options,
	}
Garet Halliday's avatar
Garet Halliday committed

	if options.ServerIdleTimeout != 0 {
		go p.idleLoop()
	}

	return p
}

func (T *Pool) idlest() (server *Server, at time.Time) {
	T.mu.RLock()
	defer T.mu.RUnlock()

	for _, s := range T.servers {
		state, _, since := s.GetState()
Garet Halliday's avatar
Garet Halliday committed
		if state != metrics.ConnStateIdle {
Garet Halliday's avatar
Garet Halliday committed
			continue
		}

		if at == (time.Time{}) || since.Before(at) {
			server = s
			at = since
		}
	}

	return
}

func (T *Pool) idleLoop() {
	for {
		select {
		case <-T.closed:
			return
		default:
		}

Garet Halliday's avatar
Garet Halliday committed
		var wait time.Duration

		now := time.Now()
		var idlest *Server
		var idle time.Time
		for idlest, idle = T.idlest(); idlest != nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
			T.removeServer(idlest)
		}

		if idlest == nil {
			wait = T.options.ServerIdleTimeout
		} else {
			wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
		}

		time.Sleep(wait)
	}
Garet Halliday's avatar
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetCredentials() auth.Credentials {
	return T.options.Credentials
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
Garet Halliday's avatar
Garet Halliday committed
	func() {
		T.mu.Lock()
		defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		if T.recipes == nil {
			T.recipes = make(map[string]*recipe.Recipe)
		}
		T.recipes[name] = r
	}()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	count := r.AllocateInitial()
	for i := 0; i < count; i++ {
Garet Halliday's avatar
Garet Halliday committed
		if err := T.scaleUpL1(name, r); err != nil {
			log.Printf("failed to dial server: %v", err)
			for j := i; j < count; j++ {
				r.Free()
			}
			break
		}
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) RemoveRecipe(name string) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeRecipe(name string) {
	r, ok := T.recipes[name]
	if !ok {
Garet Halliday's avatar
Garet Halliday committed
		return
	}
Garet Halliday's avatar
Garet Halliday committed
	delete(T.recipes, name)
Garet Halliday's avatar
Garet Halliday committed

	servers := T.serversByRecipe[name]
	delete(T.serversByRecipe, name)

	for _, server := range servers {
		r.Free()
		T.removeServerL1(server)
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) scaleUp() {
Garet Halliday's avatar
Garet Halliday committed
	if T.scalingUp.Swap(true) {
		// another person is trying to scale up this pool already
		return
	}
	defer T.scalingUp.Store(false)
Garet Halliday's avatar
Garet Halliday committed
	backoff := T.options.ServerReconnectInitialTime
Garet Halliday's avatar
Garet Halliday committed
		select {
		case <-T.closed:
			return
		default:
		}

Garet Halliday's avatar
Garet Halliday committed
		name, r := func() (string, *recipe.Recipe) {
			T.mu.RLock()
			defer T.mu.RUnlock()

			for name, r := range T.recipes {
				if r.Allocate() {
					return name, r
				}
			}

			if len(T.servers) > 0 {
				// don't retry this, there are other servers available
				backoff = 0
Garet Halliday's avatar
Garet Halliday committed
			return "", nil
		}()

		if r != nil {
			err := T.scaleUpL1(name, r)
			if err == nil {
				return
			}

			log.Printf("failed to dial server: %v", err)
Garet Halliday's avatar
Garet Halliday committed
		if backoff == 0 {
			// no backoff
			return
		}

Garet Halliday's avatar
Garet Halliday committed
		log.Printf("failed to dial server. trying again in %v", backoff)

Garet Halliday's avatar
Garet Halliday committed
		time.Sleep(backoff)
Garet Halliday's avatar
Garet Halliday committed
		backoff *= 2
		if T.options.ServerReconnectMaxTime != 0 && backoff > T.options.ServerReconnectMaxTime {
			backoff = T.options.ServerReconnectMaxTime
		}
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error {
	conn, params, err := r.Dial()
	if err != nil {
		// failed to dial
		r.Free()
Garet Halliday's avatar
Garet Halliday committed
		return err
	}

	T.mu.Lock()
	defer T.mu.Unlock()
	if T.recipes[name] != r {
		// recipe was removed
		r.Free()
Garet Halliday's avatar
Garet Halliday committed
		return errors.New("recipe was removed")
	}

	id := T.options.Pooler.NewServer()
	server := NewServer(
		T.options,
		id,
		name,
		conn,
		params.InitialParameters,
		params.BackendKey,
	)

	if T.servers == nil {
		T.servers = make(map[uuid.UUID]*Server)
	}
	T.servers[id] = server

	if T.serversByRecipe == nil {
		T.serversByRecipe = make(map[string][]*Server)
	}
	T.serversByRecipe[name] = append(T.serversByRecipe[name], server)
Garet Halliday's avatar
Garet Halliday committed
	return nil
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeServer(server *Server) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

	T.removeServerL1(server)
}

func (T *Pool) removeServerL1(server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	delete(T.servers, server.GetID())
	T.options.Pooler.DeleteServer(server.GetID())
	_ = server.GetConn().Close()
	if T.serversByRecipe != nil {
		T.serversByRecipe[server.GetRecipe()] = slices.Remove(T.serversByRecipe[server.GetRecipe()], server)
	}
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) acquireServer(client *Client) *Server {
Garet Halliday's avatar
Garet Halliday committed
	client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	for {
		serverID := T.options.Pooler.Acquire(client.GetID(), SyncModeNonBlocking)
		if serverID == uuid.Nil {
			// TODO(garet) can this be run on same thread and only create a goroutine if scaling is possible?
			go T.scaleUp()
			serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking)
		}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		T.mu.RLock()
		server, ok := T.servers[serverID]
		T.mu.RUnlock()
		if !ok {
Garet Halliday's avatar
Garet Halliday committed
			T.options.Pooler.Release(serverID)
Garet Halliday's avatar
Garet Halliday committed
			continue
		}
		return server
	}
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) releaseServer(server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	if T.options.ServerResetQuery != "" {
Garet Halliday's avatar
Garet Halliday committed
		server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil)

Garet Halliday's avatar
Garet Halliday committed
		err := backends.QueryString(new(backends.Context), server.GetReadWriter(), T.options.ServerResetQuery)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
			T.removeServer(server)
			return
Garet Halliday's avatar
Garet Halliday committed
	server.SetState(metrics.ConnStateIdle, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

	T.options.Pooler.Release(server.GetID())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Serve(
Garet Halliday's avatar
Garet Halliday committed
	conn fed.Conn,
	initialParameters map[strutil.CIString]string,
	backendKey [8]byte,
Garet Halliday's avatar
Garet Halliday committed
) error {
	defer func() {
Garet Halliday's avatar
Garet Halliday committed
		_ = conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	}()

Garet Halliday's avatar
Garet Halliday committed
	client := NewClient(
		T.options,
		conn,
		initialParameters,
		backendKey,
Garet Halliday's avatar
Garet Halliday committed
	return T.serve(client, false)
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for
// real clients
func (T *Pool) ServeBot(
	conn fed.Conn,
) error {
	defer func() {
		_ = conn.Close()
	}()

	client := NewClient(
		T.options,
		conn,
		nil,
		[8]byte{},
	)

	return T.serve(client, true)
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) serve(client *Client, initialized bool) error {
Garet Halliday's avatar
Garet Halliday committed
	T.addClient(client)
	defer T.removeClient(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	var err error
	var serverErr error

Garet Halliday's avatar
Garet Halliday committed
	var server *Server
Garet Halliday's avatar
Garet Halliday committed
	defer func() {
		if server != nil {
			if serverErr != nil {
				T.removeServer(server)
			} else {
Garet Halliday's avatar
Garet Halliday committed
				T.releaseServer(server)
Garet Halliday's avatar
Garet Halliday committed
			}
Garet Halliday's avatar
Garet Halliday committed
			server = nil
Garet Halliday's avatar
Garet Halliday committed
		}
	}()

Garet Halliday's avatar
Garet Halliday committed
	if !initialized {
Garet Halliday's avatar
Garet Halliday committed
		server = T.acquireServer(client)

Garet Halliday's avatar
Garet Halliday committed
		err, serverErr = Pair(T.options, client, server)
Garet Halliday's avatar
Garet Halliday committed
		if serverErr != nil {
			return serverErr
		}
		if err != nil {
			return err
		}

		p := packets.ReadyForQuery('I')
		err = client.GetConn().WritePacket(p.IntoPacket())
		if err != nil {
			return err
		}
	}
Garet Halliday's avatar
Garet Halliday committed

	for {
Garet Halliday's avatar
Garet Halliday committed
		if server != nil && T.options.ReleaseAfterTransaction {
			client.SetState(metrics.ConnStateIdle, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed
			T.releaseServer(server)
Garet Halliday's avatar
Garet Halliday committed
			server = nil
		}

Garet Halliday's avatar
Garet Halliday committed
		var packet fed.Packet
		packet, err = client.GetConn().ReadPacket(true)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
			return err
		}

Garet Halliday's avatar
Garet Halliday committed
		if server == nil {
			server = T.acquireServer(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
			err, serverErr = Pair(T.options, client, server)
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed
		if err == nil && serverErr == nil {
Garet Halliday's avatar
Garet Halliday committed
			err, serverErr = bouncers.Bounce(client.GetReadWriter(), server.GetReadWriter(), packet)
Garet Halliday's avatar
Garet Halliday committed
		}
		if serverErr != nil {
			return serverErr
		} else {
Garet Halliday's avatar
Garet Halliday committed
			TransactionComplete(client, server)
Garet Halliday's avatar
Garet Halliday committed
		}

		if err != nil {
Garet Halliday's avatar
Garet Halliday committed
			return err
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) addClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if T.clients == nil {
		T.clients = make(map[uuid.UUID]*Client)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	T.clients[client.GetID()] = client
Garet Halliday's avatar
Garet Halliday committed
	if T.clientsByKey == nil {
		T.clientsByKey = make(map[[8]byte]*Client)
	}
	T.clientsByKey[client.GetBackendKey()] = client
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()

	T.removeClientL1(client)
}

func (T *Pool) removeClientL1(client *Client) {
	_ = client.conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	delete(T.clients, client.GetID())
Garet Halliday's avatar
Garet Halliday committed
	delete(T.clientsByKey, client.GetBackendKey())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Cancel(key [8]byte) error {
Garet Halliday's avatar
Garet Halliday committed
	T.mu.RLock()
	defer T.mu.RUnlock()

	client, ok := T.clientsByKey[key]
	if !ok {
		return nil
	}

	state, peer, _ := client.GetState()
Garet Halliday's avatar
Garet Halliday committed
	if state != metrics.ConnStateActive {
Garet Halliday's avatar
Garet Halliday committed
		return nil
	}

	server, ok := T.servers[peer]
	if !ok {
		return nil
	}

	// prevent state from changing by RLocking the server
	server.mu.RLock()
	defer server.mu.RUnlock()

	// make sure peer is still set
	if server.peer != peer {
		return nil
	}

	r, ok := T.recipes[server.recipe]
Garet Halliday's avatar
Garet Halliday committed
	if !ok {
		return nil
	}

	return r.Cancel(server.backendKey)
Garet Halliday's avatar
a  
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) ReadMetrics(m *metrics.Pool) {
	T.mu.RLock()
	defer T.mu.RUnlock()

	if len(T.clients) != 0 && m.Clients == nil {
		m.Clients = make(map[uuid.UUID]metrics.Conn)
	}
	if len(T.servers) != 0 && m.Servers == nil {
		m.Servers = make(map[uuid.UUID]metrics.Conn)
	}

	for id, client := range T.clients {
		var mc metrics.Conn
		client.ReadMetrics(&mc)
		m.Clients[id] = mc
	}

	for id, server := range T.servers {
		var mc metrics.Conn
		server.ReadMetrics(&mc)
		m.Servers[id] = mc
	}

func (T *Pool) Close() {
	close(T.closed)

	T.mu.Lock()
	defer T.mu.Unlock()

	// remove clients
	for _, client := range T.clients {
		T.removeClient(client)
	}

	// remove recipes
	for name := range T.recipes {
		T.removeRecipe(name)
	}
}