diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-02-19 00:05:12 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-02-19 00:05:12 +0300 |
commit | 025055e91d4360ee9f5133fc23d05d8f51e21ad8 (patch) | |
tree | 5f1bbbf48dd989116a551a19df823817e8beaddb | |
parent | 5631d0a80b2013359bc2a1ccd3c211b4801c8695 (diff) |
Support of basic interaction with Postgres database
New task and job to run Postgres database related tests.
Basic helper functions to make SQL operations easy to use.
Refactoring of sub-commands dependent to SQL.
Part of: https://gitlab.com/gitlab-org/gitaly/issues/2166
-rw-r--r-- | changelogs/unreleased/ps-sql-transactions-support.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 15 | ||||
-rw-r--r-- | cmd/praefect/subcmd.go (renamed from cmd/praefect/subcommand.go) | 34 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/doc.go | 36 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres.go | 70 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres_test.go | 258 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/testing.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/postgres.go (renamed from internal/praefect/datastore/db.go) | 28 |
8 files changed, 427 insertions, 27 deletions
diff --git a/changelogs/unreleased/ps-sql-transactions-support.yml b/changelogs/unreleased/ps-sql-transactions-support.yml new file mode 100644 index 000000000..1b6b93016 --- /dev/null +++ b/changelogs/unreleased/ps-sql-transactions-support.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect SQL: support of transactions' +merge_request: 1815 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index acf615f5a..81ea610e7 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -43,6 +43,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -225,7 +226,19 @@ func getStarterConfigs(socketPath, listenAddr string) ([]starter.Config, error) // Test Postgres connection, for diagnostic purposes only while we roll // out Postgres support. https://gitlab.com/gitlab-org/gitaly/issues/1755 func testSQLConnection(logger *logrus.Entry, conf config.Config) { - if err := datastore.CheckPostgresVersion(conf); err != nil { + db, err := glsql.OpenDB(conf.DB) + if err != nil { + logger.WithError(err).Error("SQL connection open failed") + return + } + + defer func() { + if err := db.Close(); err != nil { + logger.WithError(err).Error("SQL connection close failed") + } + }() + + if err := datastore.CheckPostgresVersion(db); err != nil { logger.WithError(err).Error("SQL connection check failed") } else { logger.Info("SQL connection check successful") diff --git a/cmd/praefect/subcommand.go b/cmd/praefect/subcmd.go index d59c6419f..0834a04ae 100644 --- a/cmd/praefect/subcommand.go +++ b/cmd/praefect/subcmd.go @@ -1,12 +1,14 @@ package main import ( + "database/sql" "fmt" "os" "os/signal" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) const invocationPrefix = progname + " -config CONFIG_TOML" @@ -39,7 +41,13 @@ func subCommand(conf config.Config, arg0 string, argRest []string) int { func sqlPing(conf config.Config) int { const subCmd = progname + " sql-ping" - if err := datastore.CheckPostgresVersion(conf); err != nil { + db, clean, code := openDB(conf.DB) + if code != 0 { + return code + } + defer clean() + + if err := datastore.CheckPostgresVersion(db); err != nil { printfErr("%s: fail: %v\n", subCmd, err) return 1 } @@ -51,7 +59,13 @@ func sqlPing(conf config.Config) int { func sqlMigrate(conf config.Config) int { const subCmd = progname + " sql-migrate" - n, err := datastore.Migrate(conf) + db, clean, code := openDB(conf.DB) + if code != 0 { + return code + } + defer clean() + + n, err := glsql.Migrate(db) if err != nil { printfErr("%s: fail: %v\n", subCmd, err) return 1 @@ -61,6 +75,22 @@ func sqlMigrate(conf config.Config) int { return 0 } +func openDB(conf config.DB) (*sql.DB, func(), int) { + db, err := glsql.OpenDB(conf) + if err != nil { + printfErr("sql open: %v\n", err) + return nil, nil, 1 + } + + clean := func() { + if err := db.Close(); err != nil { + printfErr("sql close: %v\n", err) + } + } + + return db, clean, 0 +} + func printfErr(format string, a ...interface{}) (int, error) { return fmt.Fprintf(os.Stderr, format, a...) } diff --git a/internal/praefect/datastore/glsql/doc.go b/internal/praefect/datastore/glsql/doc.go index 9983661ff..2ea28a54e 100644 --- a/internal/praefect/datastore/glsql/doc.go +++ b/internal/praefect/datastore/glsql/doc.go @@ -35,4 +35,40 @@ // this build tag to them. The example how to do this could be found in // internal/praefect/datastore/glsql/postgres_test.go file. +// To simplify usage of transactions the TxQuery interface was introduced. +// The main idea is to write code that won't be overwhelmed with transaction +// management and to use simple approach with OK/NOT OK check while running +// SQL queries using transaction. Let's take a look at the usage example: +// +// let's imagine we have this method and db is *sql.DB on the repository struct: +// func (r *repository) Save(ctx context.Context, u User) (err error) { +// // initialization of our new transactional scope +// txq := NewTxQuery(ctx, nil, r.db) +// // call for Done is required otherwise transaction will remain open +// // err must be not a nil value. In this case it is a reference to the +// // returned named parameter. It will be filled with error returned from Exec +// // func call if any and no other Exec function call will be triggered. +// defer txq.Done(&err) +// // the first operation is attempt to insert a new row +// // in case of failure the error would be propagated into &err passed to Done method +// // and it will return false that will be a signal that operation failed or was not +// // triggered at all because there was already a failed operation on this transaction. +// userAdded := txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { +// _, err := tx.Exec(ctx, "INSERT INTO user(name) VALUES ($1)", u.Name) +// return err +// }) +// // we can use checks for early return, but if there was an error on the previous operation +// // the next one won't be executed. +// if !userAdded { +// return +// } +// txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { +// _, err := tx.Exec(ctx, "UPDATE stats SET user_count = user_count + 1") +// return err +// }) +// } +// +// NOTE: because we use [pgbouncer](https://www.pgbouncer.org/) with transaction pooling +// it is [not allowed to use prepared statements](https://www.pgbouncer.org/faq.html). + package glsql diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index dcd81cbca..34250241b 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -2,11 +2,13 @@ package glsql import ( + "context" "database/sql" // Blank import to enable integration of github.com/lib/pq into database/sql _ "github.com/lib/pq" migrate "github.com/rubenv/sql-migrate" + "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/migrations" ) @@ -30,3 +32,71 @@ func Migrate(db *sql.DB) (int, error) { migrationSource := &migrate.MemoryMigrationSource{Migrations: migrations.All()} return migrate.Exec(db, "postgres", migrationSource, migrate.Up) } + +// TxQuery runs operations inside transaction and commits|rollbacks on Done. +type TxQuery interface { + // Exec calls op function with provided ctx. + // Returns true on success and false in case operation failed or wasn't called because of previously failed op. + Exec(ctx context.Context, op func(context.Context, *sql.Tx) error) bool + // Done must be called after work is finished to complete transaction. + // errPtr must not be nil. + // COMMIT will be executed if no errors happen during TxQuery usage. + // Otherwise it will be ROLLBACK operation. + Done(errPtr *error) +} + +// NewTxQuery creates entity that allows to run queries in scope of a transaction. +// It always returns non-nil value. +func NewTxQuery(ctx context.Context, logger logrus.FieldLogger, db *sql.DB) TxQuery { + tx, err := db.BeginTx(ctx, nil) + return &txQuery{ + tx: tx, + err: err, + logger: logger, + } +} + +type txQuery struct { + tx *sql.Tx + err error + logger logrus.FieldLogger +} + +// Exec calls op function with provided ctx. +// Returns true on success and false in case operation failed or wasn't called because of previously failed op. +func (txq *txQuery) Exec(ctx context.Context, op func(context.Context, *sql.Tx) error) bool { + if txq.err != nil { + return false + } + + txq.err = op(ctx, txq.tx) + return txq.err == nil +} + +// Done must be called after work is finished to complete transaction. +// errPtr must not be nil. +// COMMIT will be executed if no errors happen during txQuery usage. +// Otherwise it will be ROLLBACK operation. +func (txq *txQuery) Done(errPtr *error) { + if txq.err == nil { + txq.err = txq.tx.Commit() + if txq.err != nil { + txq.log(txq.err, "commit failed") + } + } else { + // Don't overwrite txq.err because it's already non-nil + if err := txq.tx.Rollback(); err != nil { + txq.log(err, "rollback failed") + } + } + + if *errPtr == nil { + *errPtr = txq.err + } +} + +func (txq *txQuery) log(err error, msg string) { + if txq.logger != nil { + txq.logger.WithError(err).Error(msg) + } +} diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go index ff056dabf..586375c65 100644 --- a/internal/praefect/datastore/glsql/postgres_test.go +++ b/internal/praefect/datastore/glsql/postgres_test.go @@ -3,12 +3,19 @@ package glsql import ( + "bytes" + "context" + "database/sql" + "errors" "os" "strconv" + "strings" "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) func TestOpenDB(t *testing.T) { @@ -37,3 +44,254 @@ func TestOpenDB(t *testing.T) { require.NoError(t, db.Close()) }) } + +func TestTxQuery_MultipleOperationsSuccess(t *testing.T) { + db := GetDB(t) + defer createBasicTable(t, db, "work_unit_test")() + + ctx, cancel := testhelper.Context() + defer cancel() + + const actions = 3 + txq := NewTxQuery(context.TODO(), nil, db.DB) + + defer func() { + var err error + txq.Done(&err) + require.NoError(t, err) + + db.RequireRowsInTable(t, "work_unit_test", actions) + }() + + for i := 0; i < actions; i++ { + require.True( + t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test VALUES (DEFAULT)") + return err + }), + "expects row to be inserted", + ) + } +} + +func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) { + db := GetDB(t) + defer createBasicTable(t, db, "work_unit_test")() + + ctx, cancel := testhelper.Context() + defer cancel() + + txq := NewTxQuery(ctx, nil, db.DB) + + defer func() { + var err error + txq.Done(&err) + require.EqualError(t, err, `pq: syntax error at or near "BAD"`, "expects error because of the incorrect SQL statement") + + db.RequireRowsInTable(t, "work_unit_test", 0) + }() + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)") + return err + }), + "expects row to be inserted", + ) + + require.False(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "BAD OPERATION") + return err + }), + "the SQL statement is not valid, expects to be reported as failed", + ) + + require.False(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + t.Fatal("this func should not be called") + return nil + }), + "because of previously failed SQL operation next statement expected not to be run", + ) +} + +func TestTxQuery_ContextHandled(t *testing.T) { + db := GetDB(t) + + defer createBasicTable(t, db, "work_unit_test")() + + ctx, cancel := testhelper.Context() + defer cancel() + + txq := NewTxQuery(ctx, nil, db.DB) + + defer func() { + var err error + txq.Done(&err) + require.EqualError(t, err, "context canceled") + + db.RequireRowsInTable(t, "work_unit_test", 0) + }() + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)") + return err + }), + "expects row to be inserted", + ) + + cancel() // explicit context cancellation to simulate situation when it is expired or cancelled + + require.False(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)") + return err + }), + "expects failed operation because of cancelled context", + ) +} + +func TestTxQuery_FailedToCommit(t *testing.T) { + db := GetDB(t) + defer createBasicTable(t, db, "work_unit_test")() + + ctx, cancel := testhelper.Context() + defer cancel() + + txq := NewTxQuery(ctx, nil, db.DB) + + defer func() { + var err error + txq.Done(&err) + require.EqualError(t, err, sql.ErrTxDone.Error(), "expects failed COMMIT because of previously executed COMMIT statement") + + db.RequireRowsInTable(t, "work_unit_test", 1) + }() + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)") + return err + }), + "expects row to be inserted", + ) + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + require.NoError(t, tx.Commit()) // COMMIT to get error on the next attempt to COMMIT from Done method + return nil + }), + "expects COMMIT without issues", + ) +} + +func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) { + db := GetDB(t) + defer createBasicTable(t, db, "work_unit_test")() + + ctx, cancel := testhelper.Context() + defer cancel() + + outBuffer := &bytes.Buffer{} + logger := logrus.New() + logger.Out = outBuffer + logger.Level = logrus.ErrorLevel + logger.Formatter = &logrus.JSONFormatter{ + DisableTimestamp: true, + PrettyPrint: false, + } + + txq := NewTxQuery(ctx, logger, db.DB) + + defer func() { + var err error + txq.Done(&err) + require.EqualError(t, err, "some unexpected error") + require.Equal(t, + `{"error":"sql: transaction has already been committed or rolled back","level":"error","msg":"rollback failed"}`, + strings.TrimSpace(outBuffer.String()), + "failed COMMIT/ROLLBACK operation must be logged in case of another error during transaction usage", + ) + + db.RequireRowsInTable(t, "work_unit_test", 1) + }() + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)") + return err + }), + "expects row to be inserted", + ) + + require.False(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + require.NoError(t, tx.Commit(), "expects successful COMMIT") // COMMIT to get error on the next attempt to COMMIT + return errors.New("some unexpected error") + }), + "expects failed operation because of explicit error returned", + ) +} + +func TestTxQuery_FailedToCommitWithFailedOperation(t *testing.T) { + db := GetDB(t) + defer createBasicTable(t, db, "work_unit_test")() + + ctx, cancel := testhelper.Context() + defer cancel() + + outBuffer := &bytes.Buffer{} + logger := logrus.New() + logger.Out = outBuffer + logger.Level = logrus.ErrorLevel + logger.Formatter = &logrus.JSONFormatter{ + DisableTimestamp: true, + PrettyPrint: false, + } + + txq := NewTxQuery(ctx, logger, db.DB) + + defer func() { + err := errors.New("some processing error") + txq.Done(&err) + require.EqualError(t, err, "some processing error") + require.Equal( + t, + `{"error":"sql: transaction has already been committed or rolled back","level":"error","msg":"commit failed"}`, + strings.TrimSpace(outBuffer.String()), + "failed COMMIT/ROLLBACK operation must be logged in case of another error during transaction usage", + ) + + db.RequireRowsInTable(t, "work_unit_test", 1) + }() + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "INSERT INTO work_unit_test(id) VALUES (DEFAULT)") + return err + }), + "expects row to be inserted", + ) + + require.True(t, + txq.Exec(ctx, func(ctx context.Context, tx *sql.Tx) error { + require.NoError(t, tx.Commit()) // COMMIT to get error on the next attempt to COMMIT + return nil + }), + "expects COMMIT without issues", + ) +} + +func createBasicTable(t *testing.T, db DB, tname string) func() { + t.Helper() + + _, err := db.Exec("CREATE TABLE " + tname + "(id BIGSERIAL PRIMARY KEY, col TEXT)") + require.NoError(t, err) + return func() { + _, err := db.Exec("DROP TABLE IF EXISTS " + tname) + require.NoError(t, err) + } +} diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 794366162..eba642c65 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -40,6 +40,14 @@ func (db DB) Truncate(t testing.TB, tables ...string) { require.NoError(t, err, "database truncation failed: %s", tables) } +func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { + t.Helper() + + var count int + require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM "+tname).Scan(&count)) + require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) +} + // Close removes schema if it was used and releases connection pool. func (db DB) Close() error { if err := db.DB.Close(); err != nil { diff --git a/internal/praefect/datastore/db.go b/internal/praefect/datastore/postgres.go index 3ec1e2368..0cb05e34a 100644 --- a/internal/praefect/datastore/db.go +++ b/internal/praefect/datastore/postgres.go @@ -6,23 +6,16 @@ import ( "fmt" "time" - // Blank import to enable integration of github.com/lib/pq into database/sql - _ "github.com/lib/pq" migrate "github.com/rubenv/sql-migrate" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/migrations" ) // CheckPostgresVersion checks the server version of the Postgres DB // specified in conf. This is a diagnostic for the Praefect Postgres // rollout. https://gitlab.com/gitlab-org/gitaly/issues/1755 -func CheckPostgresVersion(conf config.Config) error { - db, err := openDB(conf) - if err != nil { - return fmt.Errorf("sql open: %v", err) - } - defer db.Close() - +func CheckPostgresVersion(db *sql.DB) error { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() @@ -39,24 +32,11 @@ func CheckPostgresVersion(conf config.Config) error { return nil } -func openDB(conf config.Config) (*sql.DB, error) { return sql.Open("postgres", conf.DB.ToPQString()) } - const sqlMigrateDialect = "postgres" -// Migrate will apply all pending SQL migrations -func Migrate(conf config.Config) (int, error) { - db, err := openDB(conf) - if err != nil { - return 0, fmt.Errorf("sql open: %v", err) - } - defer db.Close() - - return migrate.Exec(db, sqlMigrateDialect, migrationSource(), migrate.Up) -} - // MigrateDownPlan does a dry run for rolling back at most max migrations. func MigrateDownPlan(conf config.Config, max int) ([]string, error) { - db, err := openDB(conf) + db, err := glsql.OpenDB(conf.DB) if err != nil { return nil, fmt.Errorf("sql open: %v", err) } @@ -77,7 +57,7 @@ func MigrateDownPlan(conf config.Config, max int) ([]string, error) { // MigrateDown rolls back at most max migrations. func MigrateDown(conf config.Config, max int) (int, error) { - db, err := openDB(conf) + db, err := glsql.OpenDB(conf.DB) if err != nil { return 0, fmt.Errorf("sql open: %v", err) } |