From b2f43268c68cd30b46fdd166272456cf542eb751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Nieto?= <jose.carlos@menteslibres.net> Date: Tue, 6 Oct 2015 06:46:19 -0500 Subject: [PATCH] Moving database shared logic to an internal package. --- internal/sqladapter/collection.go | 21 +- internal/sqladapter/database.go | 309 ++++++++++++++++++++ postgresql/collection.go | 2 +- postgresql/database.go | 451 +++++++----------------------- postgresql/database_test.go | 21 -- postgresql/tx.go | 3 + 6 files changed, 425 insertions(+), 382 deletions(-) diff --git a/internal/sqladapter/collection.go b/internal/sqladapter/collection.go index bd13ec55..e86e3cb4 100644 --- a/internal/sqladapter/collection.go +++ b/internal/sqladapter/collection.go @@ -5,23 +5,30 @@ import ( "upper.io/db/util/sqlutil/result" ) -type Collection struct { +type Collection interface { + Name() string + Exists() bool + Find(conds ...interface{}) db.Result + Database() Database +} + +type BaseCollection struct { database Database tableName string } // NewCollection returns a collection with basic methods. -func NewCollection(d Database, tableName string) *Collection { - return &Collection{database: d, tableName: tableName} +func NewCollection(d Database, tableName string) Collection { + return &BaseCollection{database: d, tableName: tableName} } // Name returns the name of the table. -func (c *Collection) Name() string { +func (c *BaseCollection) Name() string { return c.tableName } // Exists returns true if the collection exists. -func (c *Collection) Exists() bool { +func (c *BaseCollection) Exists() bool { if err := c.Database().TableExists(c.Name()); err != nil { return false } @@ -29,11 +36,11 @@ func (c *Collection) Exists() bool { } // Find creates a result set with the given conditions. -func (c *Collection) Find(conds ...interface{}) db.Result { +func (c *BaseCollection) Find(conds ...interface{}) db.Result { return result.NewResult(c.Database().Builder(), c.Name(), conds) } // Database returns the database session that backs the collection. -func (c *Collection) Database() Database { +func (c *BaseCollection) Database() Database { return c.database } diff --git a/internal/sqladapter/database.go b/internal/sqladapter/database.go index b12869da..5ac62d88 100644 --- a/internal/sqladapter/database.go +++ b/internal/sqladapter/database.go @@ -1,11 +1,320 @@ package sqladapter import ( + "database/sql" + "sync" + "time" + + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" // PostgreSQL driver. + "upper.io/cache" "upper.io/db" + "upper.io/db/builder" + "upper.io/db/util/adapter" + "upper.io/db/util/schema" + "upper.io/db/util/sqlgen" + "upper.io/db/util/sqlutil" + "upper.io/db/util/sqlutil/tx" ) +type PartialDatabase interface { + PopulateSchema() error + TableExists(name string) error + TablePrimaryKey(name string) ([]string, error) + NewTable(name string) db.Collection + CompileAndReplacePlaceholders(stmt *sqlgen.Statement) (query string) + Err(in error) (out error) +} + type Database interface { db.Database TableExists(name string) error TablePrimaryKey(name string) ([]string, error) } + +type BaseDatabase struct { + partial PartialDatabase + sess *sqlx.DB + + connURL db.ConnectionURL + tx *sqltx.Tx + schema *schema.DatabaseSchema + cachedStatements *cache.Cache + collections map[string]db.Collection + collectionsMu sync.Mutex + builder db.QueryBuilder + + template *sqlgen.Template +} + +type cachedStatement struct { + *sqlx.Stmt + query string +} + +func NewDatabase(partial PartialDatabase, connURL db.ConnectionURL, template *sqlgen.Template) *BaseDatabase { + d := &BaseDatabase{ + partial: partial, + connURL: connURL, + template: template, + } + + d.builder = builder.NewBuilder(d, d.template) + d.cachedStatements = cache.NewCache() + + return d +} + +func (d *BaseDatabase) Session() *sqlx.DB { + return d.sess +} + +func (d *BaseDatabase) Template() *sqlgen.Template { + return d.template +} + +func (d *BaseDatabase) BindTx(tx *sqlx.Tx) { + d.tx = sqltx.New(tx) +} + +func (d *BaseDatabase) Tx() *sqltx.Tx { + return d.tx +} + +func (d *BaseDatabase) NewSchema() { + d.schema = schema.NewDatabaseSchema() +} + +func (d *BaseDatabase) Schema() *schema.DatabaseSchema { + return d.schema +} + +func (d *BaseDatabase) Bind(sess *sqlx.DB) error { + d.sess = sess + return d.populate() +} + +func (d *BaseDatabase) populate() error { + + d.collections = make(map[string]db.Collection) + + if d.schema == nil { + if err := d.partial.PopulateSchema(); err != nil { + return err + } + } + + return nil +} + +func (d *BaseDatabase) Clone(partial PartialDatabase) *BaseDatabase { + clone := NewDatabase(partial, d.connURL, d.template) + clone.schema = d.schema + return clone +} + +// Ping checks whether a connection to the database is still alive by pinging +// it, establishing a connection if necessary. +func (d *BaseDatabase) Ping() error { + return d.sess.Ping() +} + +// Close terminates the current database session. +func (d *BaseDatabase) Close() error { + if d.sess != nil { + if d.tx != nil && !d.tx.Done() { + d.tx.Rollback() + } + d.cachedStatements.Clear() + return d.sess.Close() + } + return nil +} + +// C returns a collection interface. +func (d *BaseDatabase) C(name string) db.Collection { + if c, ok := d.collections[name]; ok { + return c + } + + c, err := d.Collection(name) + if err != nil { + return &adapter.NonExistentCollection{Err: err} + } + return c +} + +// Collection returns the table that matches the given name. +func (d *BaseDatabase) Collection(name string) (db.Collection, error) { + if d.tx != nil { + if d.tx.Done() { + return nil, sql.ErrTxDone + } + } + + if err := d.partial.TableExists(name); err != nil { + return nil, err + } + + col := d.partial.NewTable(name) + + d.collectionsMu.Lock() + d.collections[name] = col + d.collectionsMu.Unlock() + + return col, nil +} + +func (d *BaseDatabase) ConnectionURL() db.ConnectionURL { + return d.connURL +} + +// Name returns the name of the database. +func (d *BaseDatabase) Name() string { + return d.schema.Name +} + +// Exec compiles and executes a statement that does not return any rows. +func (d *BaseDatabase) Exec(stmt *sqlgen.Statement, args ...interface{}) (sql.Result, error) { + var query string + var p *sqlx.Stmt + var err error + + if db.Debug { + var start, end int64 + start = time.Now().UnixNano() + + defer func() { + end = time.Now().UnixNano() + sqlutil.Log(query, args, err, start, end) + }() + } + + if p, query, err = d.prepareStatement(stmt); err != nil { + return nil, err + } + + return p.Exec(args...) +} + +// Query compiles and executes a statement that returns rows. +func (d *BaseDatabase) Query(stmt *sqlgen.Statement, args ...interface{}) (*sqlx.Rows, error) { + var query string + var p *sqlx.Stmt + var err error + + if db.Debug { + var start, end int64 + start = time.Now().UnixNano() + + defer func() { + end = time.Now().UnixNano() + sqlutil.Log(query, args, err, start, end) + }() + } + + if p, query, err = d.prepareStatement(stmt); err != nil { + return nil, err + } + + return p.Queryx(args...) +} + +// QueryRow compiles and executes a statement that returns at most one row. +func (d *BaseDatabase) QueryRow(stmt *sqlgen.Statement, args ...interface{}) (*sqlx.Row, error) { + var query string + var p *sqlx.Stmt + var err error + + if db.Debug { + var start, end int64 + start = time.Now().UnixNano() + + defer func() { + end = time.Now().UnixNano() + sqlutil.Log(query, args, err, start, end) + }() + } + + if p, query, err = d.prepareStatement(stmt); err != nil { + return nil, err + } + + return p.QueryRowx(args...), nil +} + +// Builder returns a custom query builder. +func (d *BaseDatabase) Builder() db.QueryBuilder { + return d.builder +} + +// Driver returns the underlying *sqlx.DB instance. +func (d *BaseDatabase) Driver() interface{} { + return d.sess +} + +func (d *BaseDatabase) prepareStatement(stmt *sqlgen.Statement) (p *sqlx.Stmt, query string, err error) { + if d.sess == nil { + return nil, "", db.ErrNotConnected + } + + pc, ok := d.cachedStatements.ReadRaw(stmt) + + if ok { + ps := pc.(*cachedStatement) + p = ps.Stmt + query = ps.query + } else { + query = d.partial.CompileAndReplacePlaceholders(stmt) + + if d.tx != nil { + p, err = d.tx.Preparex(query) + } else { + p, err = d.sess.Preparex(query) + } + + if err != nil { + return nil, query, err + } + + d.cachedStatements.Write(stmt, &cachedStatement{p, query}) + } + + return p, query, nil +} + +var waitForConnMu sync.Mutex + +// waitForConnection tries to execute the connectFn function, if connectFn +// returns an error, then waitForConnection will keep trying until connectFn +// returns nil. Maximum waiting time is 5s after having acquired the lock. +func (d *BaseDatabase) WaitForConnection(connectFn func() error) error { + // This lock ensures first-come, first-served and prevents opening too many + // file descriptors. + waitForConnMu.Lock() + defer waitForConnMu.Unlock() + + // Minimum waiting time. + waitTime := time.Millisecond * 10 + + // Waitig 5 seconds for a successful connection. + for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; { + if err := connectFn(); err != nil { + if d.partial.Err(err) == db.ErrTooManyClients { + // Sleep and try again if, and only if, the server replied with a "too + // many clients" error. + time.Sleep(waitTime) + if waitTime < time.Millisecond*500 { + // Wait a bit more next time. + waitTime = waitTime * 2 + } + continue + } + // Return any other error immediately. + return err + } + return nil + } + + return db.ErrGivingUpTryingToConnect +} diff --git a/postgresql/collection.go b/postgresql/collection.go index 14378bdf..50a8c304 100644 --- a/postgresql/collection.go +++ b/postgresql/collection.go @@ -33,7 +33,7 @@ import ( ) type table struct { - *sqladapter.Collection + sqladapter.Collection } var _ = db.Collection(&table{}) diff --git a/postgresql/database.go b/postgresql/database.go index 2a786349..91360c59 100644 --- a/postgresql/database.go +++ b/postgresql/database.go @@ -22,84 +22,24 @@ package postgresql import ( - "database/sql" "strconv" "strings" - "sync" - "time" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" // PostgreSQL driver. - "upper.io/cache" "upper.io/db" - "upper.io/db/builder" - "upper.io/db/util/adapter" - "upper.io/db/util/schema" + "upper.io/db/internal/sqladapter" "upper.io/db/util/sqlgen" - "upper.io/db/util/sqlutil" - "upper.io/db/util/sqlutil/tx" ) type database struct { - connURL db.ConnectionURL - session *sqlx.DB - tx *sqltx.Tx - schema *schema.DatabaseSchema - cachedStatements *cache.Cache - collections map[string]*table - collectionsMu sync.Mutex - builder db.QueryBuilder + *sqladapter.BaseDatabase } -type cachedStatement struct { - *sqlx.Stmt - query string -} - -var waitForConnMu sync.Mutex - -var ( - _ = db.Database(&database{}) - _ = db.Tx(&tx{}) -) - -type columnSchemaT struct { - Name string `db:"column_name"` - DataType string `db:"data_type"` -} - -func (d *database) prepareStatement(stmt *sqlgen.Statement) (p *sqlx.Stmt, query string, err error) { - if d.session == nil { - return nil, "", db.ErrNotConnected - } - - pc, ok := d.cachedStatements.ReadRaw(stmt) - - if ok { - ps := pc.(*cachedStatement) - p = ps.Stmt - query = ps.query - } else { - query = compileAndReplacePlaceholders(stmt) +var _ = db.Database(&database{}) - if d.tx != nil { - p, err = d.tx.Preparex(query) - } else { - p, err = d.session.Preparex(query) - } - - if err != nil { - return nil, query, err - } - - d.cachedStatements.Write(stmt, &cachedStatement{p, query}) - } - - return p, query, nil -} - -func compileAndReplacePlaceholders(stmt *sqlgen.Statement) (query string) { - buf := stmt.Compile(template.Template) +func (d *database) CompileAndReplacePlaceholders(stmt *sqlgen.Statement) (query string) { + buf := stmt.Compile(d.Template()) j := 1 for i := range buf { @@ -114,188 +54,96 @@ func compileAndReplacePlaceholders(stmt *sqlgen.Statement) (query string) { return query } -// Driver returns the underlying *sqlx.DB instance. -func (d *database) Driver() interface{} { - return d.session +func (d *database) Err(err error) error { + s := err.Error() + if strings.Contains(s, `too many clients`) || strings.Contains(s, `remaining connection slots are reserved`) { + return db.ErrTooManyClients + } + return err } // Open attempts to connect to the database server using already stored settings. func (d *database) Open() error { - var err error - - // Before db.ConnectionURL we used a unified db.Settings struct. This - // condition checks for that type and provides backwards compatibility. - if settings, ok := d.connURL.(db.Settings); ok { - - conn := ConnectionURL{ - User: settings.User, - Password: settings.Password, - Address: db.HostPort(settings.Host, uint(settings.Port)), - Database: settings.Database, - Options: map[string]string{ - "sslmode": "disable", - }, - } - - d.connURL = conn - } + var sess *sqlx.DB - connFn := func(d **database) (err error) { - (*d).session, err = sqlx.Open(`postgres`, (*d).connURL.String()) + connFn := func(sess **sqlx.DB) (err error) { + *sess, err = sqlx.Open("postgres", d.ConnectionURL().String()) return } - if err := waitForConnection(func() error { return connFn(&d) }); err != nil { + if err := d.WaitForConnection(func() error { return connFn(&sess) }); err != nil { return err } - d.builder = builder.NewBuilder(d, template.Template) - - d.cachedStatements = cache.NewCache() - - d.collections = make(map[string]*table) + return d.Bind(sess) +} - if d.schema == nil { - if err = d.populateSchema(); err != nil { - return err - } +func (d *database) Setup(connURL db.ConnectionURL) error { + if d.BaseDatabase != nil { + d.Close() } - - return nil + d.BaseDatabase = sqladapter.NewDatabase(d, connURL, template.Template) + return d.Open() } -// Clone returns a cloned db.Database session, this is typically used for -// transactions. -func (d *database) Clone() (db.Database, error) { - return d.clone() +// Use changes the active database. +func (d *database) Use(name string) (err error) { + var conn ConnectionURL + if conn, err = ParseURL(d.ConnectionURL().String()); err != nil { + return err + } + conn.Database = name + return d.Setup(conn) } func (d *database) clone() (*database, error) { - clone := &database{ - schema: d.schema, - } - if err := clone.Setup(d.connURL); err != nil { + clone := &database{} + clone.BaseDatabase = d.BaseDatabase.Clone(clone) + if err := clone.Open(); err != nil { return nil, err } return clone, nil } -// Ping checks whether a connection to the database is still alive by pinging -// it, establishing a connection if necessary. -func (d *database) Ping() error { - return d.session.Ping() -} - -// Close terminates the current database session. -func (d *database) Close() error { - if d.session != nil { - if d.tx != nil && !d.tx.Done() { - d.tx.Rollback() - } - d.cachedStatements.Clear() - return d.session.Close() - } - return nil +func (d *database) Clone() (db.Database, error) { + return d.clone() } -// C returns a collection interface. -func (d *database) C(name string) db.Collection { - if c, ok := d.collections[name]; ok { - return c - } - - c, err := d.Collection(name) - if err != nil { - return &adapter.NonExistentCollection{Err: err} - } - return c -} - -// Collection returns the table that matches the given name. -func (d *database) Collection(name string) (db.Collection, error) { - if d.tx != nil { - if d.tx.Done() { - return nil, sql.ErrTxDone - } - } - - if err := d.TableExists(name); err != nil { - return nil, err - } - - col := newTable(d, name) - - d.collectionsMu.Lock() - d.collections[name] = col - d.collectionsMu.Unlock() - - return col, nil +func (d *database) NewTable(name string) db.Collection { + return newTable(d, name) } // Collections returns a list of non-system tables from the database. func (d *database) Collections() (collections []string, err error) { - tablesInSchema := len(d.schema.Tables) - - // Is schema already populated? - if tablesInSchema > 0 { - // Pulling table names from schema. - return d.schema.Tables, nil - } - - // Querying table names. - q := d.Builder().Select("table_name"). - From("information_schema.tables"). - Where("table_schema = ?", "public") - - var row struct { - TableName string `db:"table_name"` - } - - iter := q.Iterator() - for iter.Next(&row) { - d.schema.AddTable(row.TableName) - collections = append(collections, row.TableName) - } - - return collections, nil -} + if len(d.Schema().Tables) == 0 { + q := d.Builder().Select("table_name"). + From("information_schema.tables"). + Where("table_schema = ?", "public") -// Use changes the active database. -func (d *database) Use(name string) (err error) { - var conn ConnectionURL + var row struct { + TableName string `db:"table_name"` + } - if conn, err = ParseURL(d.connURL.String()); err != nil { - return err + iter := q.Iterator() + for iter.Next(&row) { + d.Schema().AddTable(row.TableName) + } } - conn.Database = name - - d.connURL = conn - - d.schema = nil - - return d.Open() + return d.Schema().Tables, nil } // Drop removes all tables from the current database. func (d *database) Drop() error { - _, err := d.Query(&sqlgen.Statement{ + stmt := &sqlgen.Statement{ Type: sqlgen.DropDatabase, - Database: sqlgen.DatabaseWithName(d.schema.Name), - }) - return err -} - -// Setup stores database settings. -func (d *database) Setup(connURL db.ConnectionURL) error { - d.connURL = connURL - return d.Open() -} - -// Name returns the name of the database. -func (d *database) Name() string { - return d.schema.Name + Database: sqlgen.DatabaseWithName(d.Schema().Name), + } + if _, err := d.Builder().Exec(stmt); err != nil { + return err + } + return nil } // Transaction starts a transaction block and returns a db.Tx struct that can @@ -310,93 +158,25 @@ func (d *database) Transaction() (db.Tx, error) { } connFn := func(sqlTx **sqlx.Tx) (err error) { - *sqlTx, err = clone.session.Beginx() + *sqlTx, err = clone.Session().Beginx() return } - if err := waitForConnection(func() error { return connFn(&sqlTx) }); err != nil { - return nil, err - } - - clone.tx = sqltx.New(sqlTx) - return &tx{Tx: clone.tx, database: clone}, nil -} - -// Exec compiles and executes a statement that does not return any rows. -func (d *database) Exec(stmt *sqlgen.Statement, args ...interface{}) (sql.Result, error) { - var query string - var p *sqlx.Stmt - var err error - - if db.Debug { - var start, end int64 - start = time.Now().UnixNano() - - defer func() { - end = time.Now().UnixNano() - sqlutil.Log(query, args, err, start, end) - }() - } - - if p, query, err = d.prepareStatement(stmt); err != nil { - return nil, err - } - - return p.Exec(args...) -} - -// Query compiles and executes a statement that returns rows. -func (d *database) Query(stmt *sqlgen.Statement, args ...interface{}) (*sqlx.Rows, error) { - var query string - var p *sqlx.Stmt - var err error - - if db.Debug { - var start, end int64 - start = time.Now().UnixNano() - - defer func() { - end = time.Now().UnixNano() - sqlutil.Log(query, args, err, start, end) - }() - } - - if p, query, err = d.prepareStatement(stmt); err != nil { + if err := d.WaitForConnection(func() error { return connFn(&sqlTx) }); err != nil { return nil, err } - return p.Queryx(args...) -} - -// QueryRow compiles and executes a statement that returns at most one row. -func (d *database) QueryRow(stmt *sqlgen.Statement, args ...interface{}) (*sqlx.Row, error) { - var query string - var p *sqlx.Stmt - var err error - - if db.Debug { - var start, end int64 - start = time.Now().UnixNano() - - defer func() { - end = time.Now().UnixNano() - sqlutil.Log(query, args, err, start, end) - }() - } - - if p, query, err = d.prepareStatement(stmt); err != nil { - return nil, err - } + clone.BindTx(sqlTx) - return p.QueryRowx(args...), nil + return &tx{Tx: clone.Tx(), database: clone}, nil } -// populateSchema looks up for the table info in the database and populates its +// PopulateSchema looks up for the table info in the database and populates its // schema for internal use. -func (d *database) populateSchema() (err error) { +func (d *database) PopulateSchema() (err error) { var collections []string - d.schema = schema.NewDatabaseSchema() + d.NewSchema() // Get database name. q := d.Builder().Select(db.Raw{"CURRENT_DATABASE() AS name"}) @@ -409,7 +189,7 @@ func (d *database) populateSchema() (err error) { return err } - d.schema.Name = row.Name + d.Schema().Name = row.Name if collections, err = d.Collections(); err != nil { return err @@ -425,56 +205,60 @@ func (d *database) populateSchema() (err error) { } func (d *database) TableExists(name string) error { - if !d.schema.HasTable(name) { - q := d.Builder().Select("table_name"). - From("information_schema.tables"). - Where("table_catalog = ? AND table_name = ?", d.schema.Name, name) + if d.Schema().HasTable(name) { + return nil + } - var row map[string]string + q := d.Builder().Select("table_name"). + From("information_schema.tables"). + Where("table_catalog = ? AND table_name = ?", d.Schema().Name, name) - if err := q.Iterator().One(&row); err != nil { - return db.ErrCollectionDoesNotExist - } + var row map[string]string + + if err := q.Iterator().One(&row); err != nil { + return db.ErrCollectionDoesNotExist } return nil } func (d *database) TableColumns(tableName string) ([]string, error) { + s := d.Schema() - tableSchema := d.schema.Table(tableName) + if len(s.Table(tableName).Columns) == 0 { - if len(tableSchema.Columns) > 0 { - return tableSchema.Columns, nil - } + q := d.Builder().Select("column_name"). + From("information_schema.columns"). + Where("table_catalog = ? AND table_name = ?", d.Schema().Name, tableName) - q := d.Builder().Select("column_name", "data_type"). - From("information_schema.columns"). - Where("table_catalog = ? AND table_name = ?", d.schema.Name, tableName) - - var rows []columnSchemaT + var rows []struct { + Name string `db:"column_name"` + } - if err := q.Iterator().All(&rows); err != nil { - return nil, err - } + if err := q.Iterator().All(&rows); err != nil { + return nil, err + } - d.schema.TableInfo[tableName].Columns = make([]string, 0, len(rows)) + s.TableInfo[tableName].Columns = make([]string, 0, len(rows)) - for i := range rows { - d.schema.TableInfo[tableName].Columns = append(d.schema.TableInfo[tableName].Columns, rows[i].Name) + for i := range rows { + s.TableInfo[tableName].Columns = append(s.TableInfo[tableName].Columns, rows[i].Name) + } } - return d.schema.TableInfo[tableName].Columns, nil + return s.Table(tableName).Columns, nil } func (d *database) TablePrimaryKey(tableName string) ([]string, error) { - tableSchema := d.schema.Table(tableName) + s := d.Schema() + + ts := s.Table(tableName) - if len(tableSchema.PrimaryKey) != 0 { - return tableSchema.PrimaryKey, nil + if len(ts.PrimaryKey) != 0 { + return ts.PrimaryKey, nil } - tableSchema.PrimaryKey = make([]string, 0, 1) + ts.PrimaryKey = make([]string, 0, 1) q := d.Builder().Select("pg_attribute.attname AS pkey"). From("pg_index", "pg_class", "pg_attribute"). @@ -493,47 +277,8 @@ func (d *database) TablePrimaryKey(tableName string) ([]string, error) { } for iter.Next(&row) { - tableSchema.PrimaryKey = append(tableSchema.PrimaryKey, row.Key) - } - - return tableSchema.PrimaryKey, nil -} - -// Builder returns a custom query builder. -func (d *database) Builder() db.QueryBuilder { - return d.builder -} - -// waitForConnection tries to execute the connectFn function, if connectFn -// returns an error, then waitForConnection will keep trying until connectFn -// returns nil. Maximum waiting time is 5s after having acquired the lock. -func waitForConnection(connectFn func() error) error { - // This lock ensures first-come, first-served and prevents opening too many - // file descriptors. - waitForConnMu.Lock() - defer waitForConnMu.Unlock() - - // Minimum waiting time. - waitTime := time.Millisecond * 10 - - // Waitig 5 seconds for a successful connection. - for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; { - if err := connectFn(); err != nil { - if strings.Contains(err.Error(), `too many clients`) || strings.Contains(err.Error(), `remaining connection slots are reserved`) { - // Sleep and try again if, and only if, the server replied with a "too - // many clients" error. - time.Sleep(waitTime) - if waitTime < time.Millisecond*500 { - // Wait a bit more next time. - waitTime = waitTime * 2 - } - continue - } - // Return any other error immediately. - return err - } - return nil + ts.PrimaryKey = append(ts.PrimaryKey, row.Key) } - return db.ErrGivingUpTryingToConnect + return ts.PrimaryKey, nil } diff --git a/postgresql/database_test.go b/postgresql/database_test.go index 9586da20..bb678809 100644 --- a/postgresql/database_test.go +++ b/postgresql/database_test.go @@ -223,27 +223,6 @@ func SkipTestOpenWithWrongData(t *testing.T) { } } -// Old settings must be compatible. -func TestOldSettings(t *testing.T) { - var err error - var sess db.Database - - oldSettings := db.Settings{ - Database: databaseName, - User: username, - Password: password, - Host: host, - } - - // Opening database. - if sess, err = db.Open(Adapter, oldSettings); err != nil { - t.Fatal(err) - } - - // Closing database. - sess.Close() -} - // Test Use func TestUse(t *testing.T) { var err error diff --git a/postgresql/tx.go b/postgresql/tx.go index 5392c922..89560de7 100644 --- a/postgresql/tx.go +++ b/postgresql/tx.go @@ -22,6 +22,7 @@ package postgresql import ( + "upper.io/db" "upper.io/db/util/sqlutil/tx" ) @@ -30,6 +31,8 @@ type tx struct { *database } +var _ = db.Tx(&tx{}) + // Driver returns the current transaction session. func (t *tx) Driver() interface{} { if t != nil && t.Tx != nil { -- GitLab