good morning!!!!

Skip to content
Snippets Groups Projects
Commit 68b15cfb authored by José Carlos Nieto's avatar José Carlos Nieto
Browse files

MySQL: Trying again after a small delay instead of giving up when no more...

MySQL: Trying again after a small delay instead of giving up when no more connections are available.
parent 6c586b46
No related branches found
No related tags found
No related merge requests found
......@@ -57,6 +57,8 @@ type cachedStatement struct {
query string
}
var waitForConnMu sync.Mutex
var (
_ = db.Database(&database{})
_ = db.Tx(&tx{})
......@@ -129,7 +131,7 @@ func (d *database) Open() error {
conn.Options["charset"] = "utf8"
}
// Connection charset, UTF-8 by default.
// parseTime=true changes the output type of DATE and DATETIME values to time.Time instead of []byte / string
if conn.Options["parseTime"] == "" {
conn.Options["parseTime"] = "true"
}
......@@ -150,7 +152,12 @@ func (d *database) Open() error {
d.connURL = conn
}
if d.session, err = sqlx.Open(`mysql`, d.connURL.String()); err != nil {
connFn := func(d **database) (err error) {
(*d).session, err = sqlx.Open(`mysql`, (*d).connURL.String())
return
}
if err := waitForConnection(func() error { return connFn(&d) }); err != nil {
return err
}
......@@ -364,12 +371,16 @@ func (d *database) Transaction() (db.Tx, error) {
return nil, err
}
if sqlTx, err = clone.session.Beginx(); err != nil {
connFn := func(sqlTx **sqlx.Tx) (err error) {
*sqlTx, err = clone.session.Beginx()
return
}
if err := waitForConnection(func() error { return connFn(&sqlTx) }); err != nil {
return nil, err
}
clone.tx = sqltx.New(sqlTx)
return &tx{Tx: clone.tx, database: clone}, nil
}
......@@ -643,3 +654,37 @@ func (d *database) getPrimaryKey(tableName string) ([]string, error) {
return tableSchema.PrimaryKey, nil
}
// waitForConnection tries to execute the connectFn function, if connectFn
// returns an error, then waitForConnection will keep trying until connectFn
// returns nil. Maximum waiting time is 5s after having acquired the lock.
func waitForConnection(connectFn func() error) error {
// This lock ensures first-come, first-served and prevents opening too many
// file descriptors.
waitForConnMu.Lock()
defer waitForConnMu.Unlock()
// Minimum waiting time.
waitTime := time.Millisecond * 10
// Waitig 5 seconds for a successful connection.
for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; {
if err := connectFn(); err != nil {
if strings.Contains(err.Error(), `many connections`) {
// Sleep and try again if, and only if, the server replied with a "too
// many clients" error.
time.Sleep(waitTime)
if waitTime < time.Millisecond*500 {
// Wait a bit more next time.
waitTime = waitTime * 2
}
continue
}
// Return any other error immediately.
return err
}
return nil
}
return db.ErrGivingUpTryingToConnect
}
......@@ -30,6 +30,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"
......@@ -1443,3 +1444,49 @@ func TestDataTypes(t *testing.T) {
t.Fatalf("Struct is different.")
}
}
// TestExhaustConnections simulates a "too many connections" situation
// triggered by opening more transactions than available connections.
// upper.io/db deals with this problem by waiting a bit more for the connection
// to be established.
func TestExhaustConnections(t *testing.T) {
var err error
var sess db.Database
var wg sync.WaitGroup
if sess, err = db.Open(Adapter, settings); err != nil {
t.Fatal(err)
}
for i := 0; i < 500; i++ {
wg.Add(1)
t.Logf("Tx %d: Pending", i)
go func(t *testing.T, wg *sync.WaitGroup, i int) {
var tx db.Tx
defer wg.Done()
start := time.Now()
// Requesting a new transaction session.
if tx, err = sess.Transaction(); err != nil {
panic(err.Error())
}
t.Logf("Tx %d: OK (waiting time: %v)", i, time.Now().Sub(start))
// Let's suppose that we do some complex stuff and that the transaction
// lasts 3 seconds.
time.Sleep(time.Second * 3)
if err := tx.Rollback(); err != nil {
panic(err.Error())
}
t.Logf("Tx %d: Done", i)
}(t, &wg, i)
}
wg.Wait()
sess.Close()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment