good morning!!!!

Skip to content
Snippets Groups Projects
Commit b5363d09 authored by José Carlos Nieto's avatar José Carlos Nieto
Browse files

Merge branch 'issue-110' into issue-103

parents 4f1016e9 50f0f327
No related branches found
No related tags found
No related merge requests found
...@@ -49,6 +49,7 @@ script: ...@@ -49,6 +49,7 @@ script:
- go test -v . - go test -v .
- BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/mysql - BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/mysql
- BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/postgresql - BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/postgresql
- BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/ql # - BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/ql # Temporarily skipping, see https://github.com/cznic/ql/issues/107
- BENCHTIME=2s make test -C $GOPATH/src/upper.io/db/ql
- BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/sqlite - BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/sqlite
- BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/mongo - BENCHTIME=2s make bench -C $GOPATH/src/upper.io/db/mongo
BENCHTIME ?= 10s #BENCHTIME ?= 10s
BENCHTIME := 1s
build: build:
go build && go install go build && go install
...@@ -17,4 +18,4 @@ test: reset-db ...@@ -17,4 +18,4 @@ test: reset-db
$(MAKE) -C _example $(MAKE) -C _example
bench: reset-db bench: reset-db
go test -v -test.bench=. -test.benchtime=$(BENCHTIME) -benchmem go test -v -bench=. -benchtime=$(BENCHTIME) -benchmem -test.parallel=1 -short -cpu 1
...@@ -475,15 +475,6 @@ func BenchmarkSQLSelect(b *testing.B) { ...@@ -475,15 +475,6 @@ func BenchmarkSQLSelect(b *testing.B) {
var err error var err error
var sess db.Database var sess db.Database
connectAndAddFakeRows()
/*
// This is failing for some reason, I suspect that QL's Close() removes
// some cached value that the next Open() tries to use.
if sess, err = connectAndAddFakeRows(); err != nil {
b.Fatal(err)
}
*/
if sess, err = db.Open(Adapter, settings); err != nil { if sess, err = db.Open(Adapter, settings); err != nil {
b.Fatal(err) b.Fatal(err)
} }
...@@ -509,18 +500,10 @@ func BenchmarkSQLPreparedSelect(b *testing.B) { ...@@ -509,18 +500,10 @@ func BenchmarkSQLPreparedSelect(b *testing.B) {
var err error var err error
var sess db.Database var sess db.Database
connectAndAddFakeRows()
/*
// This is failing for some reason, I suspect that QL's Close() removes
// some cached value that the next Open() tries to use.
if sess, err = connectAndAddFakeRows(); err != nil { if sess, err = connectAndAddFakeRows(); err != nil {
b.Fatal(err) b.Fatal(err)
} }
*/
if sess, err = db.Open(Adapter, settings); err != nil {
b.Fatal(err)
}
defer sess.Close() defer sess.Close()
driver := sess.Driver().(*sqlx.DB) driver := sess.Driver().(*sqlx.DB)
......
...@@ -23,9 +23,11 @@ package ql ...@@ -23,9 +23,11 @@ package ql
import ( import (
"database/sql" "database/sql"
"errors"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
_ "github.com/cznic/ql/driver" // QL driver _ "github.com/cznic/ql/driver" // QL driver
...@@ -58,6 +60,16 @@ type cachedStatement struct { ...@@ -58,6 +60,16 @@ type cachedStatement struct {
query string query string
} }
var (
fileOpenCount int32
waitForFdMu sync.Mutex
errTooManyOpenFiles = errors.New(`Too many open database files.`)
)
const (
maxOpenFiles = 5
)
var ( var (
_ = db.Database(&database{}) _ = db.Database(&database{})
_ = db.Tx(&tx{}) _ = db.Tx(&tx{})
...@@ -135,7 +147,22 @@ func (d *database) Open() error { ...@@ -135,7 +147,22 @@ func (d *database) Open() error {
d.connURL = conn d.connURL = conn
} }
if d.session, err = sqlx.Open(`ql`, d.connURL.String()); err != nil { openFn := func(d **database) (err error) {
openFiles := atomic.LoadInt32(&fileOpenCount)
if openFiles > maxOpenFiles {
return errTooManyOpenFiles
}
(*d).session, err = sqlx.Open(`ql`, (*d).connURL.String())
if err == nil {
atomic.AddInt32(&fileOpenCount, 1)
}
return
}
if err := waitForFreeFd(func() error { return openFn(&d) }); err != nil {
return err return err
} }
...@@ -179,7 +206,14 @@ func (d *database) Ping() error { ...@@ -179,7 +206,14 @@ func (d *database) Ping() error {
// Close terminates the current database session. // Close terminates the current database session.
func (d *database) Close() error { func (d *database) Close() error {
if d.session != nil { if d.session != nil {
return d.session.Close() if err := d.session.Close(); err != nil {
panic(err.Error())
}
d.session = nil
if atomic.AddInt32(&fileOpenCount, -1) < 0 {
return errors.New(`Close() without Open()?`)
}
return nil
} }
return nil return nil
} }
...@@ -340,12 +374,16 @@ func (d *database) Transaction() (db.Tx, error) { ...@@ -340,12 +374,16 @@ func (d *database) Transaction() (db.Tx, error) {
return nil, err return nil, err
} }
if sqlTx, err = clone.session.Beginx(); err != nil { openFn := func(sqlTx **sqlx.Tx) (err error) {
*sqlTx, err = clone.session.Beginx()
return
}
if err := waitForFreeFd(func() error { return openFn(&sqlTx) }); err != nil {
return nil, err return nil, err
} }
clone.tx = sqltx.New(sqlTx) clone.tx = sqltx.New(sqlTx)
return &tx{Tx: clone.tx, database: clone}, nil return &tx{Tx: clone.tx, database: clone}, nil
} }
...@@ -556,3 +594,37 @@ func (d *database) tableColumns(tableName string) ([]string, error) { ...@@ -556,3 +594,37 @@ func (d *database) tableColumns(tableName string) ([]string, error) {
return d.schema.TableInfo[tableName].Columns, nil return d.schema.TableInfo[tableName].Columns, nil
} }
// waitForFreeFd tries to execute the openFn function, if openFn
// returns an error, then waitForFreeFd will keep trying until openFn
// returns nil. Maximum waiting time is 5s after having acquired the lock.
func waitForFreeFd(openFn func() error) error {
// This lock ensures first-come, first-served and prevents opening too many
// file descriptors.
waitForFdMu.Lock()
defer waitForFdMu.Unlock()
// Minimum waiting time.
waitTime := time.Millisecond * 10
// Waitig 5 seconds for a successful connection.
for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; {
if err := openFn(); err != nil {
if err == errTooManyOpenFiles {
// Sleep and try again if, and only if, the server replied with a "too
// many clients" error.
time.Sleep(waitTime)
if waitTime < time.Millisecond*500 {
// Wait a bit more next time.
waitTime = waitTime * 2
}
continue
}
// Return any other error immediately.
return err
}
return nil
}
return db.ErrGivingUpTryingToConnect
}
...@@ -34,6 +34,7 @@ import ( ...@@ -34,6 +34,7 @@ import (
//"errors" //"errors"
"math/rand" "math/rand"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
...@@ -1267,3 +1268,55 @@ func TestDataTypes(t *testing.T) { ...@@ -1267,3 +1268,55 @@ func TestDataTypes(t *testing.T) {
} }
} }
*/ */
// TestExhaustConnections simulates a "too many connections" situation
// triggered by opening more transactions than available connections.
// upper.io/db deals with this problem by waiting a bit more for the connection
// to be established.
func TestExhaustConnections(t *testing.T) {
var err error
var sess db.Database
var wg sync.WaitGroup
originalFileOpenCount := fileOpenCount
if sess, err = db.Open(Adapter, settings); err != nil {
t.Fatal(err)
}
// Putting less stress into QL. See https://github.com/cznic/ql/issues/107
for i := 0; i < 5; i++ {
wg.Add(1)
t.Logf("Tx %d: Pending", i)
go func(t *testing.T, wg *sync.WaitGroup, i int) {
var tx db.Tx
defer wg.Done()
start := time.Now()
// Requesting a new transaction session.
if tx, err = sess.Transaction(); err != nil {
panic(err.Error())
}
t.Logf("Tx %d: OK (waiting time: %v)", i, time.Now().Sub(start))
// We have to use a shorten time here.
time.Sleep(time.Millisecond * 500)
if err := tx.Rollback(); err != nil {
panic(err.Error())
}
t.Logf("Tx %d: Done", i)
}(t, &wg, i)
}
wg.Wait()
sess.Close()
if fileOpenCount != originalFileOpenCount {
t.Fatalf("File open count must be %d.", originalFileOpenCount)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment