good morning!!!!

Skip to content
Snippets Groups Projects
Commit cbda52a6 authored by José Carlos Nieto's avatar José Carlos Nieto
Browse files

SQLite3: Adding artificial limit to prevent panicking when trying to open too many database files.

parent 68b15cfb
Branches
Tags
No related merge requests found
...@@ -23,9 +23,11 @@ package sqlite ...@@ -23,9 +23,11 @@ package sqlite
import ( import (
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
...@@ -62,6 +64,18 @@ type cachedStatement struct { ...@@ -62,6 +64,18 @@ type cachedStatement struct {
query string query string
} }
var (
fileOpenCount int32
waitForFdMu sync.Mutex
errTooManyOpenFiles = errors.New(`Too many open database files.`)
)
const (
// If we try to open lots of sessions cgo will panic without a warning, this
// artificial limit was added to prevent that panic.
maxOpenFiles = 100
)
var ( var (
_ = db.Database(&database{}) _ = db.Database(&database{})
_ = db.Tx(&tx{}) _ = db.Tx(&tx{})
...@@ -130,7 +144,22 @@ func (d *database) Open() error { ...@@ -130,7 +144,22 @@ func (d *database) Open() error {
d.connURL = conn d.connURL = conn
} }
if d.session, err = sqlx.Open(`sqlite3`, d.connURL.String()); err != nil { openFn := func(d **database) (err error) {
openFiles := atomic.LoadInt32(&fileOpenCount)
if openFiles > maxOpenFiles {
return errTooManyOpenFiles
}
(*d).session, err = sqlx.Open(`sqlite3`, (*d).connURL.String())
if err == nil {
atomic.AddInt32(&fileOpenCount, 1)
}
return
}
if err := waitForFreeFd(func() error { return openFn(&d) }); err != nil {
return err return err
} }
...@@ -174,7 +203,14 @@ func (d *database) Ping() error { ...@@ -174,7 +203,14 @@ func (d *database) Ping() error {
// Close terminates the current database session. // Close terminates the current database session.
func (d *database) Close() error { func (d *database) Close() error {
if d.session != nil { if d.session != nil {
return d.session.Close() if err := d.session.Close(); err != nil {
panic(err.Error())
}
d.session = nil
if atomic.AddInt32(&fileOpenCount, -1) < 0 {
return errors.New(`Close() without Open()?`)
}
return nil
} }
return nil return nil
} }
...@@ -347,12 +383,16 @@ func (d *database) Transaction() (db.Tx, error) { ...@@ -347,12 +383,16 @@ func (d *database) Transaction() (db.Tx, error) {
return nil, err return nil, err
} }
if sqlTx, err = clone.session.Beginx(); err != nil { openFn := func(sqlTx **sqlx.Tx) (err error) {
*sqlTx, err = clone.session.Beginx()
return
}
if err := waitForFreeFd(func() error { return openFn(&sqlTx) }); err != nil {
return nil, err return nil, err
} }
clone.tx = sqltx.New(sqlTx) clone.tx = sqltx.New(sqlTx)
return &tx{Tx: clone.tx, database: clone}, nil return &tx{Tx: clone.tx, database: clone}, nil
} }
...@@ -585,3 +625,37 @@ func (d *database) doRawQuery(query string, args ...interface{}) (*sqlx.Rows, er ...@@ -585,3 +625,37 @@ func (d *database) doRawQuery(query string, args ...interface{}) (*sqlx.Rows, er
return rows, err return rows, err
} }
// waitForFreeFd tries to execute the openFn function, if openFn
// returns an error, then waitForFreeFd will keep trying until openFn
// returns nil. Maximum waiting time is 5s after having acquired the lock.
func waitForFreeFd(openFn func() error) error {
// This lock ensures first-come, first-served and prevents opening too many
// file descriptors.
waitForFdMu.Lock()
defer waitForFdMu.Unlock()
// Minimum waiting time.
waitTime := time.Millisecond * 10
// Waitig 5 seconds for a successful connection.
for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; {
if err := openFn(); err != nil {
if err == errTooManyOpenFiles {
// Sleep and try again if, and only if, the server replied with a "too
// many clients" error.
time.Sleep(waitTime)
if waitTime < time.Millisecond*500 {
// Wait a bit more next time.
waitTime = waitTime * 2
}
continue
}
// Return any other error immediately.
return err
}
return nil
}
return db.ErrGivingUpTryingToConnect
}
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
...@@ -1388,3 +1389,56 @@ func TestDataTypes(t *testing.T) { ...@@ -1388,3 +1389,56 @@ func TestDataTypes(t *testing.T) {
t.Fatalf("Struct is different.") t.Fatalf("Struct is different.")
} }
} }
// TestExhaustConnections simulates a "too many connections" situation
// triggered by opening more transactions than available connections.
// upper.io/db deals with this problem by waiting a bit more for the connection
// to be established.
func TestExhaustConnections(t *testing.T) {
var err error
var sess db.Database
var wg sync.WaitGroup
originalFileOpenCount := fileOpenCount
if sess, err = db.Open(Adapter, settings); err != nil {
t.Fatal(err)
}
// SQLite does not require any kind of connection blocking
for i := 0; i < 500; i++ {
wg.Add(1)
t.Logf("Tx %d: Pending", i)
go func(t *testing.T, wg *sync.WaitGroup, i int) {
var tx db.Tx
defer wg.Done()
start := time.Now()
// Requesting a new transaction session.
if tx, err = sess.Transaction(); err != nil {
panic(err.Error())
}
t.Logf("Tx %d: OK (waiting time: %v)", i, time.Now().Sub(start))
// Let's suppose that we do some complex stuff and that the transaction
// lasts 3 seconds.
time.Sleep(time.Second * 3)
if err := tx.Rollback(); err != nil {
panic(err.Error())
}
t.Logf("Tx %d: Done", i)
}(t, &wg, i)
}
wg.Wait()
sess.Close()
if fileOpenCount != originalFileOpenCount {
t.Fatalf("File open count must be %d.", originalFileOpenCount)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment