good morning!!!!

Skip to content
Snippets Groups Projects
pool.go 8.69 KiB
Newer Older
Garet Halliday's avatar
Garet Halliday committed
package pool

import (
Garet Halliday's avatar
Garet Halliday committed
	"errors"
Garet Halliday's avatar
Garet Halliday committed
	"sync"
Garet Halliday's avatar
a  
Garet Halliday committed
	"sync/atomic"
Garet Halliday's avatar
Garet Halliday committed
	"time"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"github.com/google/uuid"
Garet Halliday's avatar
Garet Halliday committed
	"tuxpa.in/a/zlog/log"
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/auth"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/bouncer/backends/v0"
	"pggat/lib/bouncer/bouncers/v2"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/fed"
Garet Halliday's avatar
Garet Halliday committed
	packets "pggat/lib/fed/packets/v3.0"
Garet Halliday's avatar
Garet Halliday committed
	"pggat/lib/gat/metrics"
	"pggat/lib/gat/pool/recipe"
	"pggat/lib/util/slices"
	"pggat/lib/util/strutil"
Garet Halliday's avatar
Garet Halliday committed
)

type Pool struct {
	options Options
	pooler  Pooler
Garet Halliday's avatar
Garet Halliday committed

	closed chan struct{}

Garet Halliday's avatar
a  
Garet Halliday committed
	pendingCount atomic.Int64
	pending      chan struct{}

	recipes         map[string]*recipe.Recipe
	clients         map[uuid.UUID]*Client
Garet Halliday's avatar
Garet Halliday committed
	clientsByKey    map[[8]byte]*Client
	servers         map[uuid.UUID]*Server
	serversByRecipe map[string][]*Server
	mu              sync.RWMutex
Garet Halliday's avatar
Garet Halliday committed
}

func NewPool(options Options) *Pool {
	if options.NewPooler == nil {
		panic("expected new pooler func")
	}
	pooler := options.NewPooler()
	if pooler == nil {
		panic("expected pooler")
	}

Garet Halliday's avatar
Garet Halliday committed
	p := &Pool{
		options: options,
		pooler:  pooler,

		closed:  make(chan struct{}),
Garet Halliday's avatar
a  
Garet Halliday committed
		pending: make(chan struct{}, 1),
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	s := NewScaler(p)
	go s.Run()
Garet Halliday's avatar
Garet Halliday committed

	return p
}

func (T *Pool) idlest() (server *Server, at time.Time) {
	T.mu.RLock()
	defer T.mu.RUnlock()

	for _, s := range T.servers {
		state, _, since := s.GetState()
Garet Halliday's avatar
Garet Halliday committed
		if state != metrics.ConnStateIdle {
Garet Halliday's avatar
Garet Halliday committed
			continue
		}

		if at == (time.Time{}) || since.Before(at) {
			server = s
			at = since
		}
	}

	return
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) GetCredentials() auth.Credentials {
	return T.options.Credentials
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
Garet Halliday's avatar
Garet Halliday committed
	func() {
		T.mu.Lock()
		defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		if T.recipes == nil {
			T.recipes = make(map[string]*recipe.Recipe)
		}
		T.recipes[name] = r
	}()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	count := r.AllocateInitial()
	for i := 0; i < count; i++ {
Garet Halliday's avatar
Garet Halliday committed
		if err := T.scaleUpL1(name, r); err != nil {
			log.Printf("failed to dial server: %v", err)
			for j := i; j < count; j++ {
				r.Free()
			}
			break
		}
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) RemoveRecipe(name string) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	T.removeRecipe(name)
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeRecipe(name string) {
	r, ok := T.recipes[name]
	if !ok {
Garet Halliday's avatar
Garet Halliday committed
		return
	}
Garet Halliday's avatar
Garet Halliday committed
	delete(T.recipes, name)
Garet Halliday's avatar
Garet Halliday committed

	servers := T.serversByRecipe[name]
	delete(T.serversByRecipe, name)

	for _, server := range servers {
		r.Free()
		T.removeServerL1(server)
	}
Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) scaleUpL0() (string, *recipe.Recipe) {
	T.mu.RLock()
	defer T.mu.RUnlock()
Garet Halliday's avatar
a  
Garet Halliday committed
	for name, r := range T.recipes {
		if r.Allocate() {
			return name, r
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
a  
Garet Halliday committed
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	if len(T.servers) > 0 {
		return "", nil
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
a  
Garet Halliday committed
	return "", nil
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error {
	conn, params, err := r.Dial()
	if err != nil {
		// failed to dial
		r.Free()
Garet Halliday's avatar
Garet Halliday committed
		return err
Garet Halliday's avatar
Garet Halliday committed
	server, err := func() (*Server, error) {
		T.mu.Lock()
		defer T.mu.Unlock()
		if T.recipes[name] != r {
			// recipe was removed
			r.Free()
			return nil, errors.New("recipe was removed")
		}
Garet Halliday's avatar
Garet Halliday committed
		server := NewServer(
			T.options,
			name,
			conn,
			params.InitialParameters,
			params.BackendKey,
		)
Garet Halliday's avatar
Garet Halliday committed
		if T.servers == nil {
			T.servers = make(map[uuid.UUID]*Server)
		}
		T.servers[server.GetID()] = server
Garet Halliday's avatar
Garet Halliday committed
		if T.serversByRecipe == nil {
			T.serversByRecipe = make(map[string][]*Server)
		}
		T.serversByRecipe[name] = append(T.serversByRecipe[name], server)
		return server, nil
	}()

	if err != nil {
		return err
Garet Halliday's avatar
Garet Halliday committed

	T.pooler.AddServer(server.GetID())
Garet Halliday's avatar
Garet Halliday committed
	return nil
Garet Halliday's avatar
a  
Garet Halliday committed
func (T *Pool) scaleUp() bool {
	name, r := T.scaleUpL0()
	if r == nil {
		return false
	}

	err := T.scaleUpL1(name, r)
	if err != nil {
		log.Printf("failed to dial server: %v", err)
		return false
	}

	return true
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeServer(server *Server) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

	T.removeServerL1(server)
}

func (T *Pool) removeServerL1(server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	delete(T.servers, server.GetID())
	T.pooler.DeleteServer(server.GetID())
Garet Halliday's avatar
Garet Halliday committed
	_ = server.GetConn().Close()
	if T.serversByRecipe != nil {
Garet Halliday's avatar
Garet Halliday committed
		T.serversByRecipe[server.GetRecipe()] = slices.Delete(T.serversByRecipe[server.GetRecipe()], server)
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) acquireServer(client *Client) *Server {
Garet Halliday's avatar
Garet Halliday committed
	client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	for {
		serverID := T.pooler.Acquire(client.GetID(), SyncModeNonBlocking)
Garet Halliday's avatar
Garet Halliday committed
		if serverID == uuid.Nil {
Garet Halliday's avatar
a  
Garet Halliday committed
			T.pendingCount.Add(1)
			select {
			case T.pending <- struct{}{}:
			default:
			}
			serverID = T.pooler.Acquire(client.GetID(), SyncModeBlocking)
Garet Halliday's avatar
a  
Garet Halliday committed
			T.pendingCount.Add(-1)
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
		T.mu.RLock()
		server, ok := T.servers[serverID]
		T.mu.RUnlock()
		if !ok {
			T.pooler.DeleteServer(serverID)
Garet Halliday's avatar
Garet Halliday committed
			continue
		}
		return server
	}
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) releaseServer(server *Server) {
Garet Halliday's avatar
Garet Halliday committed
	if T.options.ServerResetQuery != "" {
Garet Halliday's avatar
Garet Halliday committed
		server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil)

Garet Halliday's avatar
Garet Halliday committed
		ctx := backends.Context{
			Server: server.GetReadWriter(),
		}
		err := backends.QueryString(&ctx, T.options.ServerResetQuery)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
			T.removeServer(server)
			return
Garet Halliday's avatar
Garet Halliday committed
	server.SetState(metrics.ConnStateIdle, uuid.Nil)
	T.pooler.Release(server.GetID())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Serve(
Garet Halliday's avatar
Garet Halliday committed
	conn fed.Conn,
	initialParameters map[strutil.CIString]string,
	backendKey [8]byte,
Garet Halliday's avatar
Garet Halliday committed
) error {
	defer func() {
Garet Halliday's avatar
Garet Halliday committed
		_ = conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	}()

Garet Halliday's avatar
Garet Halliday committed
	client := NewClient(
		T.options,
		conn,
		initialParameters,
		backendKey,
Garet Halliday's avatar
Garet Halliday committed
	return T.serve(client, false)
Garet Halliday's avatar
Garet Halliday committed
}
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for
// real clients
func (T *Pool) ServeBot(
	conn fed.Conn,
) error {
	defer func() {
		_ = conn.Close()
	}()

	client := NewClient(
		T.options,
		conn,
		nil,
		[8]byte{},
	)

	return T.serve(client, true)
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) serve(client *Client, initialized bool) error {
Garet Halliday's avatar
Garet Halliday committed
	T.addClient(client)
	defer T.removeClient(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	var err error
	var serverErr error

Garet Halliday's avatar
Garet Halliday committed
	var server *Server
Garet Halliday's avatar
Garet Halliday committed
	defer func() {
		if server != nil {
			if serverErr != nil {
				T.removeServer(server)
			} else {
Garet Halliday's avatar
Garet Halliday committed
				T.releaseServer(server)
Garet Halliday's avatar
Garet Halliday committed
			}
Garet Halliday's avatar
Garet Halliday committed
			server = nil
Garet Halliday's avatar
Garet Halliday committed
		}
	}()

Garet Halliday's avatar
Garet Halliday committed
	var packet fed.Packet

Garet Halliday's avatar
Garet Halliday committed
	if !initialized {
Garet Halliday's avatar
Garet Halliday committed
		server = T.acquireServer(client)

Garet Halliday's avatar
Garet Halliday committed
		err, serverErr = Pair(T.options, client, server)
Garet Halliday's avatar
Garet Halliday committed
		if serverErr != nil {
			return serverErr
		}
		if err != nil {
			return err
		}

		p := packets.ReadyForQuery('I')
Garet Halliday's avatar
Garet Halliday committed
		packet = p.IntoPacket(packet)
		err = client.GetConn().WritePacket(packet)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
			return err
		}
	}
Garet Halliday's avatar
Garet Halliday committed

	for {
Garet Halliday's avatar
Garet Halliday committed
		if server != nil && T.options.ReleaseAfterTransaction {
			client.SetState(metrics.ConnStateIdle, uuid.Nil)
Garet Halliday's avatar
Garet Halliday committed
			T.releaseServer(server)
Garet Halliday's avatar
Garet Halliday committed
			server = nil
		}

Garet Halliday's avatar
Garet Halliday committed
		packet, err = client.GetConn().ReadPacket(true, packet)
Garet Halliday's avatar
Garet Halliday committed
		if err != nil {
			return err
		}

Garet Halliday's avatar
Garet Halliday committed
		if server == nil {
			server = T.acquireServer(client)
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
			err, serverErr = Pair(T.options, client, server)
Garet Halliday's avatar
Garet Halliday committed
		}
Garet Halliday's avatar
Garet Halliday committed
		if err == nil && serverErr == nil {
Garet Halliday's avatar
Garet Halliday committed
			packet, err, serverErr = bouncers.Bounce(client.GetReadWriter(), server.GetReadWriter(), packet)
Garet Halliday's avatar
Garet Halliday committed
		}
		if serverErr != nil {
			return serverErr
		} else {
Garet Halliday's avatar
Garet Halliday committed
			TransactionComplete(client, server)
Garet Halliday's avatar
Garet Halliday committed
		}

		if err != nil {
Garet Halliday's avatar
Garet Halliday committed
			return err
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) addClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()
Garet Halliday's avatar
Garet Halliday committed

Garet Halliday's avatar
Garet Halliday committed
	if T.clients == nil {
		T.clients = make(map[uuid.UUID]*Client)
Garet Halliday's avatar
Garet Halliday committed
	}
Garet Halliday's avatar
Garet Halliday committed
	T.clients[client.GetID()] = client
Garet Halliday's avatar
Garet Halliday committed
	if T.clientsByKey == nil {
		T.clientsByKey = make(map[[8]byte]*Client)
	}
	T.clientsByKey[client.GetBackendKey()] = client
	T.pooler.AddClient(client.GetID())
Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) removeClient(client *Client) {
	T.mu.Lock()
	defer T.mu.Unlock()

	T.removeClientL1(client)
}

func (T *Pool) removeClientL1(client *Client) {
	T.pooler.DeleteClient(client.GetID())
	_ = client.conn.Close()
Garet Halliday's avatar
Garet Halliday committed
	delete(T.clients, client.GetID())
Garet Halliday's avatar
Garet Halliday committed
	delete(T.clientsByKey, client.GetBackendKey())
Garet Halliday's avatar
Garet Halliday committed
}

func (T *Pool) Cancel(key [8]byte) error {
Garet Halliday's avatar
Garet Halliday committed
	T.mu.RLock()
	defer T.mu.RUnlock()

	client, ok := T.clientsByKey[key]
	if !ok {
		return nil
	}

	state, peer, _ := client.GetState()
Garet Halliday's avatar
Garet Halliday committed
	if state != metrics.ConnStateActive {
Garet Halliday's avatar
Garet Halliday committed
		return nil
	}

	server, ok := T.servers[peer]
	if !ok {
		return nil
	}

	// prevent state from changing by RLocking the server
	server.mu.RLock()
	defer server.mu.RUnlock()

	// make sure peer is still set
	if server.peer != peer {
		return nil
	}

	r, ok := T.recipes[server.recipe]
Garet Halliday's avatar
Garet Halliday committed
	if !ok {
		return nil
	}

	return r.Cancel(server.backendKey)
Garet Halliday's avatar
a  
Garet Halliday committed
}

Garet Halliday's avatar
Garet Halliday committed
func (T *Pool) ReadMetrics(m *metrics.Pool) {
	T.mu.RLock()
	defer T.mu.RUnlock()

	if len(T.clients) != 0 && m.Clients == nil {
		m.Clients = make(map[uuid.UUID]metrics.Conn)
	}
	if len(T.servers) != 0 && m.Servers == nil {
		m.Servers = make(map[uuid.UUID]metrics.Conn)
	}

	for id, client := range T.clients {
		var mc metrics.Conn
		client.ReadMetrics(&mc)
		m.Clients[id] = mc
	}

	for id, server := range T.servers {
		var mc metrics.Conn
		server.ReadMetrics(&mc)
		m.Servers[id] = mc
	}

func (T *Pool) Close() {
	close(T.closed)

	T.mu.Lock()
	defer T.mu.Unlock()

	// remove clients
	for _, client := range T.clients {
		T.removeClient(client)
	}

	// remove recipes
	for name := range T.recipes {
		T.removeRecipe(name)
	}
}