good morning!!!!

Skip to content
Snippets Groups Projects
Commit 5e481e75 authored by José Carlos Nieto's avatar José Carlos Nieto
Browse files

PostgreSQL: Trying again after a small delay instead of giving up when no more...

PostgreSQL: Trying again after a small delay instead of giving up when no more connections are available.
parent 96bcc81a
No related branches found
No related tags found
No related merge requests found
...@@ -58,6 +58,8 @@ type cachedStatement struct { ...@@ -58,6 +58,8 @@ type cachedStatement struct {
query string query string
} }
var waitForConnMu sync.Mutex
var ( var (
_ = db.Database(&database{}) _ = db.Database(&database{})
_ = db.Tx(&tx{}) _ = db.Tx(&tx{})
...@@ -140,7 +142,12 @@ func (d *database) Open() error { ...@@ -140,7 +142,12 @@ func (d *database) Open() error {
d.connURL = conn d.connURL = conn
} }
if d.session, err = sqlx.Open(`postgres`, d.connURL.String()); err != nil { connFn := func(d **database) (err error) {
(*d).session, err = sqlx.Open(`postgres`, (*d).connURL.String())
return
}
if err := waitForConnection(func() error { return connFn(&d) }); err != nil {
return err return err
} }
...@@ -355,12 +362,16 @@ func (d *database) Transaction() (db.Tx, error) { ...@@ -355,12 +362,16 @@ func (d *database) Transaction() (db.Tx, error) {
return nil, err return nil, err
} }
if sqlTx, err = clone.session.Beginx(); err != nil { connFn := func(sqlTx **sqlx.Tx) (err error) {
*sqlTx, err = clone.session.Beginx()
return
}
if err := waitForConnection(func() error { return connFn(&sqlTx) }); err != nil {
return nil, err return nil, err
} }
clone.tx = sqltx.New(sqlTx) clone.tx = sqltx.New(sqlTx)
return &tx{Tx: clone.tx, database: clone}, nil return &tx{Tx: clone.tx, database: clone}, nil
} }
...@@ -622,3 +633,37 @@ func (d *database) getPrimaryKey(tableName string) ([]string, error) { ...@@ -622,3 +633,37 @@ func (d *database) getPrimaryKey(tableName string) ([]string, error) {
return tableSchema.PrimaryKey, nil return tableSchema.PrimaryKey, nil
} }
// waitForConnection tries to execute the connectFn function, if connectFn
// returns an error, then waitForConnection will keep trying until connectFn
// returns nil. Maximum waiting time is 5s after having acquired the lock.
func waitForConnection(connectFn func() error) error {
// This lock ensures first-come, first-served and prevents opening too many
// file descriptors.
waitForConnMu.Lock()
defer waitForConnMu.Unlock()
// Minimum waiting time.
waitTime := time.Millisecond * 10
// Waitig 5 seconds for a successful connection.
for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; {
if err := connectFn(); err != nil {
if strings.Contains(err.Error(), `too many clients`) {
// Sleep and try again if, and only if, the server replied with a "too
// many clients" error.
time.Sleep(waitTime)
if waitTime < time.Millisecond*500 {
// Wait a bit more next time.
waitTime = waitTime * 2
}
continue
}
// Return any other error immediately.
return err
}
return nil
}
return db.ErrGivingUpTryingToConnect
}
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
...@@ -1912,3 +1913,50 @@ func TestOptionTypeJsonbStruct(t *testing.T) { ...@@ -1912,3 +1913,50 @@ func TestOptionTypeJsonbStruct(t *testing.T) {
t.Fatalf("Expecting Num to be 123") t.Fatalf("Expecting Num to be 123")
} }
} }
// TestExhaustConnections simulates a "too many connections" situation
// triggered by opening more transactions than available connections.
// upper.io/db deals with this problem by waiting a bit more for the connection
// to be established.
func TestExhaustConnections(t *testing.T) {
var err error
var sess db.Database
var wg sync.WaitGroup
if sess, err = db.Open(Adapter, settings); err != nil {
t.Fatal(err)
}
// By default, PostgreSQL accepts 100 connections only.
for i := 0; i < 500; i++ {
wg.Add(1)
t.Logf("Tx %d: Pending", i)
go func(t *testing.T, wg *sync.WaitGroup, i int) {
var tx db.Tx
defer wg.Done()
start := time.Now()
// Requesting a new transaction session.
if tx, err = sess.Transaction(); err != nil {
t.Fatal(err)
}
t.Logf("Tx %d: OK (waiting time: %v)", i, time.Now().Sub(start))
// Let's suppose that we do some complex stuff and that the transaction
// lasts 3 seconds.
time.Sleep(time.Second * 3)
if err := tx.Rollback(); err != nil {
t.Fatal(err)
}
t.Logf("Tx %d: Done", i)
}(t, &wg, i)
}
wg.Wait()
sess.Close()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment