Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-02-19 00:05:12 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-02-19 00:05:12 +0300
commit025055e91d4360ee9f5133fc23d05d8f51e21ad8 (patch)
tree5f1bbbf48dd989116a551a19df823817e8beaddb
parent5631d0a80b2013359bc2a1ccd3c211b4801c8695 (diff)
Support of basic interaction with Postgres database
New task and job to run Postgres database related tests. Basic helper functions to make SQL operations easy to use. Refactoring of sub-commands dependent to SQL. Part of: https://gitlab.com/gitlab-org/gitaly/issues/2166
-rw-r--r--changelogs/unreleased/ps-sql-transactions-support.yml5
-rw-r--r--cmd/praefect/main.go15
-rw-r--r--cmd/praefect/subcmd.go (renamed from cmd/praefect/subcommand.go)34
-rw-r--r--internal/praefect/datastore/glsql/doc.go36
-rw-r--r--internal/praefect/datastore/glsql/postgres.go70
-rw-r--r--internal/praefect/datastore/glsql/postgres_test.go258
-rw-r--r--internal/praefect/datastore/glsql/testing.go8
-rw-r--r--internal/praefect/datastore/postgres.go (renamed from internal/praefect/datastore/db.go)28
8 files changed, 427 insertions, 27 deletions
diff --git a/changelogs/unreleased/ps-sql-transactions-support.yml b/changelogs/unreleased/ps-sql-transactions-support.yml
new file mode 100644
index 000000000..1b6b93016
--- /dev/null
+++ b/changelogs/unreleased/ps-sql-transactions-support.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect SQL: support of transactions'
+merge_request: 1815
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index acf615f5a..81ea610e7 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -43,6 +43,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -225,7 +226,19 @@ func getStarterConfigs(socketPath, listenAddr string) ([]starter.Config, error)
// Test Postgres connection, for diagnostic purposes only while we roll
// out Postgres support. https://gitlab.com/gitlab-org/gitaly/issues/1755
func testSQLConnection(logger *logrus.Entry, conf config.Config) {
- if err := datastore.CheckPostgresVersion(conf); err != nil {
+ db, err := glsql.OpenDB(conf.DB)
+ if err != nil {
+ logger.WithError(err).Error("SQL connection open failed")
+ return
+ }
+
+ defer func() {
+ if err := db.Close(); err != nil {
+ logger.WithError(err).Error("SQL connection close failed")
+ }
+ }()
+
+ if err := datastore.CheckPostgresVersion(db); err != nil {
logger.WithError(err).Error("SQL connection check failed")
} else {
logger.Info("SQL connection check successful")
diff --git a/cmd/praefect/subcommand.go b/cmd/praefect/subcmd.go
index d59c6419f..0834a04ae 100644
--- a/cmd/praefect/subcommand.go
+++ b/cmd/praefect/subcmd.go
@@ -1,12 +1,14 @@
package main
import (
+ "database/sql"
"fmt"
"os"
"os/signal"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
const invocationPrefix = progname + " -config CONFIG_TOML"
@@ -39,7 +41,13 @@ func subCommand(conf config.Config, arg0 string, argRest []string) int {
func sqlPing(conf config.Config) int {
const subCmd = progname + " sql-ping"
- if err := datastore.CheckPostgresVersion(conf); err != nil {
+ db, clean, code := openDB(conf.DB)
+ if code != 0 {
+ return code
+ }
+ defer clean()
+
+ if err := datastore.CheckPostgresVersion(db); err != nil {
printfErr("%s: fail: %v\n", subCmd, err)
return 1
}
@@ -51,7 +59,13 @@ func sqlPing(conf config.Config) int {
func sqlMigrate(conf config.Config) int {
const subCmd = progname + " sql-migrate"
- n, err := datastore.Migrate(conf)
+ db, clean, code := openDB(conf.DB)
+ if code != 0 {
+ return code
+ }
+ defer clean()
+
+ n, err := glsql.Migrate(db)
if err != nil {
printfErr("%s: fail: %v\n", subCmd, err)
return 1
@@ -61,6 +75,22 @@ func sqlMigrate(conf config.Config) int {
return 0
}
+func openDB(conf config.DB) (*sql.DB, func(), int) {
+ db, err := glsql.OpenDB(conf)
+ if err != nil {
+ printfErr("sql open: %v\n", err)
+ return nil, nil, 1
+ }
+
+ clean := func() {
+ if err := db.Close(); err != nil {
+ printfErr("sql close: %v\n", err)
+ }
+ }
+
+ return db, clean, 0
+}
+
func printfErr(format string, a ...interface{}) (int, error) {
return fmt.Fprintf(os.Stderr, format, a...)
}
diff --git a/internal/praefect/datastore/glsql/doc.go b/internal/praefect/datastore/glsql/doc.go
index 9983661ff..2ea28a54e 100644
--- a/internal/praefect/datastore/glsql/doc.go
+++ b/internal/praefect/datastore/glsql/doc.go
@@ -35,4 +35,40 @@
// this build tag to them. The example how to do this could be found in
// internal/praefect/datastore/glsql/postgres_test.go file.
+// To simplify usage of transactions the TxQuery interface was introduced.
+// The main idea is to write code that won't be overwhelmed with transaction
+// management and to use simple approach with OK/NOT OK check while running
+// SQL queries using transaction. Let's take a look at the usage example:
+//
+// let's imagine we have this method and db is *sql.DB on the repository struct:
+// func (r *repository) Save(ctx context.Context, u User) (err error) {
+// // initialization of our new transactional scope
+// txq := NewTxQuery(ctx, nil, r.db)
+// // call for Done is required otherwise transaction will remain open
+// // err must be not a nil value. In this case it is a reference to the
+// // returned named parameter. It will be filled with error returned from Exec
+// // func call if any and no other Exec function call will be triggered.
+// defer txq.Done(&err)
+// // the first operation is attempt to insert a new row
+// // in case of failure the error would be propagated into &err passed to Done method
+// // and it will return false that will be a signal that operation failed or was not
+// // triggered at all because there was already a failed operation on this transaction.
+// userAdded := txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+// _, err := tx.Exec(ctx, "INSERT INTO user(name) VALUES ($1)", u.Name)
+// return err
+// })
+// // we can use checks for early return, but if there was an error on the previous operation
+// // the next one won't be executed.
+// if !userAdded {
+// return
+// }
+// txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+// _, err := tx.Exec(ctx, "UPDATE stats SET user_count = user_count + 1")
+// return err
+// })
+// }
+//
+// NOTE: because we use [pgbouncer](https://www.pgbouncer.org/) with transaction pooling
+// it is [not allowed to use prepared statements](https://www.pgbouncer.org/faq.html).
+
package glsql
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index dcd81cbca..34250241b 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -2,11 +2,13 @@
package glsql
import (
+ "context"
"database/sql"
// Blank import to enable integration of github.com/lib/pq into database/sql
_ "github.com/lib/pq"
migrate "github.com/rubenv/sql-migrate"
+ "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/migrations"
)
@@ -30,3 +32,71 @@ func Migrate(db *sql.DB) (int, error) {
migrationSource := &migrate.MemoryMigrationSource{Migrations: migrations.All()}
return migrate.Exec(db, "postgres", migrationSource, migrate.Up)
}
+
+// TxQuery runs operations inside transaction and commits|rollbacks on Done.
+type TxQuery interface {
+ // Exec calls op function with provided ctx.
+ // Returns true on success and false in case operation failed or wasn't called because of previously failed op.
+ Exec(ctx context.Context, op func(context.Context, *sql.Tx) error) bool
+ // Done must be called after work is finished to complete transaction.
+ // errPtr must not be nil.
+ // COMMIT will be executed if no errors happen during TxQuery usage.
+ // Otherwise it will be ROLLBACK operation.
+ Done(errPtr *error)
+}
+
+// NewTxQuery creates entity that allows to run queries in scope of a transaction.
+// It always returns non-nil value.
+func NewTxQuery(ctx context.Context, logger logrus.FieldLogger, db *sql.DB) TxQuery {
+ tx, err := db.BeginTx(ctx, nil)
+ return &txQuery{
+ tx: tx,
+ err: err,
+ logger: logger,
+ }
+}
+
+type txQuery struct {
+ tx *sql.Tx
+ err error
+ logger logrus.FieldLogger
+}
+
+// Exec calls op function with provided ctx.
+// Returns true on success and false in case operation failed or wasn't called because of previously failed op.
+func (txq *txQuery) Exec(ctx context.Context, op func(context.Context, *sql.Tx) error) bool {
+ if txq.err != nil {
+ return false
+ }
+
+ txq.err = op(ctx, txq.tx)
+ return txq.err == nil
+}
+
+// Done must be called after work is finished to complete transaction.
+// errPtr must not be nil.
+// COMMIT will be executed if no errors happen during txQuery usage.
+// Otherwise it will be ROLLBACK operation.
+func (txq *txQuery) Done(errPtr *error) {
+ if txq.err == nil {
+ txq.err = txq.tx.Commit()
+ if txq.err != nil {
+ txq.log(txq.err, "commit failed")
+ }
+ } else {
+ // Don't overwrite txq.err because it's already non-nil
+ if err := txq.tx.Rollback(); err != nil {
+ txq.log(err, "rollback failed")
+ }
+ }
+
+ if *errPtr == nil {
+ *errPtr = txq.err
+ }
+}
+
+func (txq *txQuery) log(err error, msg string) {
+ if txq.logger != nil {
+ txq.logger.WithError(err).Error(msg)
+ }
+}
diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go
index ff056dabf..586375c65 100644
--- a/internal/praefect/datastore/glsql/postgres_test.go
+++ b/internal/praefect/datastore/glsql/postgres_test.go
@@ -3,12 +3,19 @@
package glsql
import (
+ "bytes"
+ "context"
+ "database/sql"
+ "errors"
"os"
"strconv"
+ "strings"
"testing"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
func TestOpenDB(t *testing.T) {
@@ -37,3 +44,254 @@ func TestOpenDB(t *testing.T) {
require.NoError(t, db.Close())
})
}
+
+func TestTxQuery_MultipleOperationsSuccess(t *testing.T) {
+ db := GetDB(t)
+ defer createBasicTable(t, db, "work_unit_test")()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const actions = 3
+ txq := NewTxQuery(context.TODO(), nil, db.DB)
+
+ defer func() {
+ var err error
+ txq.Done(&err)
+ require.NoError(t, err)
+
+ db.RequireRowsInTable(t, "work_unit_test", actions)
+ }()
+
+ for i := 0; i < actions; i++ {
+ require.True(
+ t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test VALUES (DEFAULT)")
+ return err
+ }),
+ "expects row to be inserted",
+ )
+ }
+}
+
+func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) {
+ db := GetDB(t)
+ defer createBasicTable(t, db, "work_unit_test")()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ txq := NewTxQuery(ctx, nil, db.DB)
+
+ defer func() {
+ var err error
+ txq.Done(&err)
+ require.EqualError(t, err, `pq: syntax error at or near "BAD"`, "expects error because of the incorrect SQL statement")
+
+ db.RequireRowsInTable(t, "work_unit_test", 0)
+ }()
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)")
+ return err
+ }),
+ "expects row to be inserted",
+ )
+
+ require.False(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "BAD OPERATION")
+ return err
+ }),
+ "the SQL statement is not valid, expects to be reported as failed",
+ )
+
+ require.False(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ t.Fatal("this func should not be called")
+ return nil
+ }),
+ "because of previously failed SQL operation next statement expected not to be run",
+ )
+}
+
+func TestTxQuery_ContextHandled(t *testing.T) {
+ db := GetDB(t)
+
+ defer createBasicTable(t, db, "work_unit_test")()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ txq := NewTxQuery(ctx, nil, db.DB)
+
+ defer func() {
+ var err error
+ txq.Done(&err)
+ require.EqualError(t, err, "context canceled")
+
+ db.RequireRowsInTable(t, "work_unit_test", 0)
+ }()
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)")
+ return err
+ }),
+ "expects row to be inserted",
+ )
+
+ cancel() // explicit context cancellation to simulate situation when it is expired or cancelled
+
+ require.False(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)")
+ return err
+ }),
+ "expects failed operation because of cancelled context",
+ )
+}
+
+func TestTxQuery_FailedToCommit(t *testing.T) {
+ db := GetDB(t)
+ defer createBasicTable(t, db, "work_unit_test")()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ txq := NewTxQuery(ctx, nil, db.DB)
+
+ defer func() {
+ var err error
+ txq.Done(&err)
+ require.EqualError(t, err, sql.ErrTxDone.Error(), "expects failed COMMIT because of previously executed COMMIT statement")
+
+ db.RequireRowsInTable(t, "work_unit_test", 1)
+ }()
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)")
+ return err
+ }),
+ "expects row to be inserted",
+ )
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ require.NoError(t, tx.Commit()) // COMMIT to get error on the next attempt to COMMIT from Done method
+ return nil
+ }),
+ "expects COMMIT without issues",
+ )
+}
+
+func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) {
+ db := GetDB(t)
+ defer createBasicTable(t, db, "work_unit_test")()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ outBuffer := &bytes.Buffer{}
+ logger := logrus.New()
+ logger.Out = outBuffer
+ logger.Level = logrus.ErrorLevel
+ logger.Formatter = &logrus.JSONFormatter{
+ DisableTimestamp: true,
+ PrettyPrint: false,
+ }
+
+ txq := NewTxQuery(ctx, logger, db.DB)
+
+ defer func() {
+ var err error
+ txq.Done(&err)
+ require.EqualError(t, err, "some unexpected error")
+ require.Equal(t,
+ `{"error":"sql: transaction has already been committed or rolled back","level":"error","msg":"rollback failed"}`,
+ strings.TrimSpace(outBuffer.String()),
+ "failed COMMIT/ROLLBACK operation must be logged in case of another error during transaction usage",
+ )
+
+ db.RequireRowsInTable(t, "work_unit_test", 1)
+ }()
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)")
+ return err
+ }),
+ "expects row to be inserted",
+ )
+
+ require.False(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ require.NoError(t, tx.Commit(), "expects successful COMMIT") // COMMIT to get error on the next attempt to COMMIT
+ return errors.New("some unexpected error")
+ }),
+ "expects failed operation because of explicit error returned",
+ )
+}
+
+func TestTxQuery_FailedToCommitWithFailedOperation(t *testing.T) {
+ db := GetDB(t)
+ defer createBasicTable(t, db, "work_unit_test")()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ outBuffer := &bytes.Buffer{}
+ logger := logrus.New()
+ logger.Out = outBuffer
+ logger.Level = logrus.ErrorLevel
+ logger.Formatter = &logrus.JSONFormatter{
+ DisableTimestamp: true,
+ PrettyPrint: false,
+ }
+
+ txq := NewTxQuery(ctx, logger, db.DB)
+
+ defer func() {
+ err := errors.New("some processing error")
+ txq.Done(&err)
+ require.EqualError(t, err, "some processing error")
+ require.Equal(
+ t,
+ `{"error":"sql: transaction has already been committed or rolled back","level":"error","msg":"commit failed"}`,
+ strings.TrimSpace(outBuffer.String()),
+ "failed COMMIT/ROLLBACK operation must be logged in case of another error during transaction usage",
+ )
+
+ db.RequireRowsInTable(t, "work_unit_test", 1)
+ }()
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)")
+ return err
+ }),
+ "expects row to be inserted",
+ )
+
+ require.True(t,
+ txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error {
+ require.NoError(t, tx.Commit()) // COMMIT to get error on the next attempt to COMMIT
+ return nil
+ }),
+ "expects COMMIT without issues",
+ )
+}
+
+func createBasicTable(t *testing.T, db DB, tname string) func() {
+ t.Helper()
+
+ _, err := db.Exec("CREATE TABLE " + tname + "(id BIGSERIAL PRIMARY KEY, col TEXT)")
+ require.NoError(t, err)
+ return func() {
+ _, err := db.Exec("DROP TABLE IF EXISTS " + tname)
+ require.NoError(t, err)
+ }
+}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index 794366162..eba642c65 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -40,6 +40,14 @@ func (db DB) Truncate(t testing.TB, tables ...string) {
require.NoError(t, err, "database truncation failed: %s", tables)
}
+func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
+ t.Helper()
+
+ var count int
+ require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count))
+ require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
+}
+
// Close removes schema if it was used and releases connection pool.
func (db DB) Close() error {
if err := db.DB.Close(); err != nil {
diff --git a/internal/praefect/datastore/db.go b/internal/praefect/datastore/postgres.go
index 3ec1e2368..0cb05e34a 100644
--- a/internal/praefect/datastore/db.go
+++ b/internal/praefect/datastore/postgres.go
@@ -6,23 +6,16 @@ import (
"fmt"
"time"
- // Blank import to enable integration of github.com/lib/pq into database/sql
- _ "github.com/lib/pq"
migrate "github.com/rubenv/sql-migrate"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/migrations"
)
// CheckPostgresVersion checks the server version of the Postgres DB
// specified in conf. This is a diagnostic for the Praefect Postgres
// rollout. https://gitlab.com/gitlab-org/gitaly/issues/1755
-func CheckPostgresVersion(conf config.Config) error {
- db, err := openDB(conf)
- if err != nil {
- return fmt.Errorf("sql open: %v", err)
- }
- defer db.Close()
-
+func CheckPostgresVersion(db *sql.DB) error {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
@@ -39,24 +32,11 @@ func CheckPostgresVersion(conf config.Config) error {
return nil
}
-func openDB(conf config.Config) (*sql.DB, error) { return sql.Open("postgres", conf.DB.ToPQString()) }
-
const sqlMigrateDialect = "postgres"
-// Migrate will apply all pending SQL migrations
-func Migrate(conf config.Config) (int, error) {
- db, err := openDB(conf)
- if err != nil {
- return 0, fmt.Errorf("sql open: %v", err)
- }
- defer db.Close()
-
- return migrate.Exec(db, sqlMigrateDialect, migrationSource(), migrate.Up)
-}
-
// MigrateDownPlan does a dry run for rolling back at most max migrations.
func MigrateDownPlan(conf config.Config, max int) ([]string, error) {
- db, err := openDB(conf)
+ db, err := glsql.OpenDB(conf.DB)
if err != nil {
return nil, fmt.Errorf("sql open: %v", err)
}
@@ -77,7 +57,7 @@ func MigrateDownPlan(conf config.Config, max int) ([]string, error) {
// MigrateDown rolls back at most max migrations.
func MigrateDown(conf config.Config, max int) (int, error) {
- db, err := openDB(conf)
+ db, err := glsql.OpenDB(conf.DB)
if err != nil {
return 0, fmt.Errorf("sql open: %v", err)
}