good morning!!!!

Skip to content
Snippets Groups Projects
Commit 710c7c63 authored by Garet Halliday's avatar Garet Halliday
Browse files

make conn_pool thread safe

parent d923bf7e
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,10 @@ import (
"gfx.cafe/gfx/pggat/lib/gat/gatling/server"
"gfx.cafe/gfx/pggat/lib/gat/protocol"
"log"
"math/rand"
"reflect"
"strconv"
"sync"
)
type query struct {
......@@ -15,15 +19,29 @@ type query struct {
rep chan<- protocol.Packet
}
type servers struct {
primary *server.Server
replicas []*server.Server
sync.Mutex
}
type shard struct {
conf *config.Shard
servers []*servers
sync.Mutex
}
type ConnectionPool struct {
c *config.Pool
user *config.User
pool gat.Pool
servers []*server.Server
shards []shard
queries chan query
}
const workerCount = 2
sync.RWMutex
}
func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *ConnectionPool {
p := &ConnectionPool{
......@@ -32,33 +50,98 @@ func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *Con
queries: make(chan query),
}
p.EnsureConfig(conf)
for i := 0; i < workerCount; i++ {
for i := 0; i < user.PoolSize; i++ {
go p.worker()
}
return p
}
func (c *ConnectionPool) EnsureConfig(conf *config.Pool) {
c.Lock()
defer c.Unlock()
c.c = conf
if len(c.servers) == 0 {
// connect to a server
shard := c.c.Shards["0"]
srv := shard.Servers[0] // TODO choose a better way
s, err := server.Dial(context.Background(), fmt.Sprintf("%s:%d", srv.Host(), srv.Port()), c.user, shard.Database, nil)
for idx, s := range conf.Shards {
i, err := strconv.Atoi(idx)
if err != nil {
log.Println("error connecting to server", err)
log.Printf("expected shard name to be a number, found '%s'", idx)
continue
}
c.servers = append(c.servers, s)
for i >= len(c.shards) {
c.shards = append(c.shards, shard{})
}
sc := s
if !reflect.DeepEqual(c.shards[i].conf, &sc) {
// disconnect all connections, switch to new conf
c.shards[i].servers = nil
c.shards[i].conf = &sc
}
}
}
func (c *ConnectionPool) chooseShard(query string) *shard {
c.RLock()
defer c.RUnlock()
if len(c.shards) == 0 {
return nil
}
// TODO better choose func for sharding, this is not deterministic
return &c.shards[rand.Intn(len(c.shards))]
}
// chooseServer locks and returns a server for you to use
func (c *ConnectionPool) chooseServer(query string) *servers {
s := c.chooseShard(query)
if s == nil {
log.Println("no available shard for query!")
return nil
}
s.Lock()
defer s.Unlock()
// TODO ideally this would choose the server based on load, capabilities, etc
// TODO protect this server from being used by other workers while we use it
// TODO use c.pool.query_router to route queries
for _, srv := range s.servers {
if srv.TryLock() {
return srv
}
}
// connect to primary server
primary, err := server.Dial(context.Background(), fmt.Sprintf("%s:%d", s.conf.Servers[0].Host(), s.conf.Servers[0].Port()), c.user, s.conf.Database, nil)
if err != nil {
log.Println("failed to connect to server", err)
return nil
}
srv := &servers{
primary: primary,
}
srv.Lock()
s.servers = append(s.servers, srv)
return srv
}
func (c *ConnectionPool) worker() {
for {
q := <-c.queries
// TODO ideally this would choose the server based on load, capabilities, etc
// TODO protect this server from being used by other workers while we use it
// TODO use c.pool.query_router to route queries
err := c.servers[0].Query(q.query, q.rep)
srv := c.chooseServer(q.query)
if srv == nil {
log.Printf("call to query '%s' failed", q.query)
continue
}
// run the query
err := srv.primary.Query(q.query, q.rep)
srv.Unlock()
if err != nil {
log.Println(err)
}
......@@ -71,10 +154,12 @@ func (c *ConnectionPool) GetUser() *config.User {
}
func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus {
if len(c.servers) > 0 {
return c.servers[0].GetServerInfo()
srv := c.chooseServer("")
defer srv.Unlock()
if srv == nil {
return nil
}
return nil
return srv.primary.GetServerInfo()
}
func (c *ConnectionPool) Query(ctx context.Context, q string) (<-chan protocol.Packet, error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment