diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-03-10 20:00:05 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-03-10 20:00:05 +0300 |
commit | aa910b0b6fbc3f005a9924dee731123dfca2e48d (patch) | |
tree | 07168948db8eba1b6833860b047d5fbba66b3c62 | |
parent | ca611acce3b696423c6693fa7f5833703a08975f (diff) | |
parent | c5965df04cf581c2e11fa0dba9bbf56fe40e4282 (diff) |
Merge branch 'ps-persistent-queue' into 'master'
Praefect: Move replication queue to database
See merge request gitlab-org/gitaly!1865
-rw-r--r-- | _support/Makefile.template | 2 | ||||
-rw-r--r-- | changelogs/unreleased/ps-persistent-queue.yml | 5 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 16 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/init_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres.go | 120 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres_test.go | 99 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/testing.go | 36 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/testing_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/init_test.go | 22 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20200224220728_job_queue.go | 46 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 265 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 622 |
12 files changed, 1218 insertions, 19 deletions
diff --git a/_support/Makefile.template b/_support/Makefile.template index 541b69a4d..172f3dabe 100644 --- a/_support/Makefile.template +++ b/_support/Makefile.template @@ -151,7 +151,7 @@ rspec-gitlab-shell: {{ .GitlabShellDir }}/config.yml assemble-go prepare-tests .PHONY: test-postgres test-postgres: prepare-tests - @cd {{ .SourceDir }} && go test -tags postgres gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/... + @cd {{ .SourceDir }} && go test -tags postgres -count=1 gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/... .PHONY: verify verify: check-mod-tidy check-formatting notice-up-to-date check-proto rubocop diff --git a/changelogs/unreleased/ps-persistent-queue.yml b/changelogs/unreleased/ps-persistent-queue.yml new file mode 100644 index 000000000..48fbe24de --- /dev/null +++ b/changelogs/unreleased/ps-persistent-queue.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect: Move replication queue to database' +merge_request: 1865 +author: +type: added diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 5f950e19e..a16743a13 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -55,7 +55,21 @@ const ( RenameRepo ) -// Params represent additional set of parameters required for replication job. +func (ct ChangeType) String() string { + switch ct { + case UpdateRepo: + return "update" + case DeleteRepo: + return "delete" + case RenameRepo: + return "rename" + default: + return "UNDEFINED" + } +} + +// Params represent additional information required to process event after fetching it from storage. +// It must be JSON encodable/decodable to persist it without problems. type Params map[string]interface{} // ReplJob is an instance of a queued replication job. A replication job is diff --git a/internal/praefect/datastore/glsql/init_test.go b/internal/praefect/datastore/glsql/init_test.go index 34b024451..8423b6205 100644 --- a/internal/praefect/datastore/glsql/init_test.go +++ b/internal/praefect/datastore/glsql/init_test.go @@ -16,3 +16,5 @@ func TestMain(m *testing.M) { } os.Exit(code) } + +func getDB(t testing.TB) DB { return GetDB(t, "glsql") } diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 34250241b..455282af6 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -4,6 +4,8 @@ package glsql import ( "context" "database/sql" + "strconv" + "strings" // Blank import to enable integration of github.com/lib/pq into database/sql _ "github.com/lib/pq" @@ -33,6 +35,12 @@ func Migrate(db *sql.DB) (int, error) { return migrate.Exec(db, "postgres", migrationSource, migrate.Up) } +// Querier is an abstraction on *sql.DB and *sql.Tx that allows to use their methods without awareness about actual type. +type Querier interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row +} + // TxQuery runs operations inside transaction and commits|rollbacks on Done. type TxQuery interface { // Exec calls op function with provided ctx. @@ -100,3 +108,115 @@ func (txq *txQuery) log(err error, msg string) { txq.logger.WithError(err).Error(msg) } } + +// Uint64sToInterfaces converts list of uint64 values to the list of empty interfaces. +func Uint64sToInterfaces(vs ...uint64) []interface{} { + if vs == nil { + return nil + } + + rs := make([]interface{}, len(vs)) + for i, v := range vs { + rs[i] = v + } + return rs +} + +// GeneratePlaceholders returns string with 'count' placeholders starting from 'start' index. +// 1 will be used if provided value for 'start' is less then 1. +// 1 will be used if provided value for 'count' is less then 1. +func GeneratePlaceholders(start, count int) string { + if start < 1 { + start = 1 + } + + if count <= 1 { + return "$" + strconv.Itoa(start) + } + + var builder = strings.Builder{} + for i := start; i < start+count; i++ { + if i != start { + builder.WriteString(",") + } + builder.WriteString("$") + builder.WriteString(strconv.Itoa(i)) + } + return builder.String() +} + +// NewParamsAssembler returns +func NewParamsAssembler() *ParamsAssembler { + return &ParamsAssembler{} +} + +// ParamsAssembler helps to assemble parameters of the query together providing placeholders that must be used in query. +type ParamsAssembler []interface{} + +// AddParams receives n params and assemble them with other params and returns generated placeholder as a result. +func (pm *ParamsAssembler) AddParams(params []interface{}) string { + start := len(*pm) + *pm = append(*pm, params...) + return GeneratePlaceholders(start+1, len(params)) +} + +// AddParam receives param and assemble it with other params and returns generated placeholder as a result. +func (pm *ParamsAssembler) AddParam(param interface{}) string { + return pm.AddParams([]interface{}{param}) +} + +// Params returns list of previously assembled parameters. +func (pm *ParamsAssembler) Params() []interface{} { + if pm != nil { + return *pm + } + return nil +} + +// DestProvider returns list of pointers that will be used to scan values into. +type DestProvider interface { + // To returns list of pointers. + // It is not an idempotent operation and each call will return a new list. + To() []interface{} +} + +// ScanAll reads all data from 'rows' into holders provided by 'in'. +// It will also 'Close' source after completion. +func ScanAll(rows *sql.Rows, in DestProvider) (err error) { + defer func() { + if cErr := rows.Close(); cErr != nil && err == nil { + err = cErr + } + }() + + for rows.Next() { + if err = rows.Scan(in.To()...); err != nil { + return err + } + } + err = rows.Err() + return err +} + +// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result aa a slice. +type Uint64Provider []*uint64 + +// Values returns list of values read from *sql.Rows +func (p *Uint64Provider) Values() []uint64 { + if len(*p) == 0 { + return nil + } + + r := make([]uint64, len(*p)) + for i, v := range *p { + r[i] = *v + } + return r +} + +// To returns a list of pointers that will be used as a destination for scan operation. +func (p *Uint64Provider) To() []interface{} { + var d = new(uint64) + *p = append(*p, d) + return []interface{}{d} +} diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go index 54fcba8ae..56a2585e3 100644 --- a/internal/praefect/datastore/glsql/postgres_test.go +++ b/internal/praefect/datastore/glsql/postgres_test.go @@ -48,7 +48,7 @@ func TestOpenDB(t *testing.T) { } func TestTxQuery_MultipleOperationsSuccess(t *testing.T) { - db := GetDB(t) + db := getDB(t) defer createBasicTable(t, db, "work_unit_test")() ctx, cancel := testhelper.Context() @@ -78,7 +78,7 @@ func TestTxQuery_MultipleOperationsSuccess(t *testing.T) { } func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) { - db := GetDB(t) + db := getDB(t) defer createBasicTable(t, db, "work_unit_test")() ctx, cancel := testhelper.Context() @@ -120,7 +120,7 @@ func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) { } func TestTxQuery_ContextHandled(t *testing.T) { - db := GetDB(t) + db := getDB(t) defer createBasicTable(t, db, "work_unit_test")() @@ -157,7 +157,7 @@ func TestTxQuery_ContextHandled(t *testing.T) { } func TestTxQuery_FailedToCommit(t *testing.T) { - db := GetDB(t) + db := getDB(t) defer createBasicTable(t, db, "work_unit_test")() ctx, cancel := testhelper.Context() @@ -191,7 +191,7 @@ func TestTxQuery_FailedToCommit(t *testing.T) { } func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) { - db := GetDB(t) + db := getDB(t) defer createBasicTable(t, db, "work_unit_test")() ctx, cancel := testhelper.Context() @@ -239,7 +239,7 @@ func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) { } func TestTxQuery_FailedToCommitWithFailedOperation(t *testing.T) { - db := GetDB(t) + db := getDB(t) defer createBasicTable(t, db, "work_unit_test")() ctx, cancel := testhelper.Context() @@ -297,3 +297,90 @@ func createBasicTable(t *testing.T, db DB, tname string) func() { require.NoError(t, err) } } + +func TestUint64sToInterfaces(t *testing.T) { + for _, tc := range []struct { + From []uint64 + Exp []interface{} + }{ + {From: nil, Exp: nil}, + {From: []uint64{1}, Exp: []interface{}{uint64(1)}}, + {From: []uint64{2, 3, 0}, Exp: []interface{}{uint64(2), uint64(3), uint64(0)}}, + } { + t.Run("", func(t *testing.T) { + require.Equal(t, tc.Exp, Uint64sToInterfaces(tc.From...)) + }) + } +} + +func TestUint64Provider(t *testing.T) { + var provider Uint64Provider + + dst1 := provider.To() + require.Equal(t, []interface{}{new(uint64)}, dst1, "must be a single value holder") + val1 := dst1[0].(*uint64) + *val1 = uint64(100) + + dst2 := provider.To() + require.Equal(t, []interface{}{new(uint64)}, dst2, "must be a single value holder") + val2 := dst2[0].(*uint64) + *val2 = uint64(200) + + require.Equal(t, []uint64{100, 200}, provider.Values()) + + dst3 := provider.To() + val3 := dst3[0].(*uint64) + *val3 = uint64(300) + + require.Equal(t, []uint64{100, 200, 300}, provider.Values()) +} + +func TestParamsAssembler(t *testing.T) { + assembler := NewParamsAssembler() + + require.Equal(t, "$1", assembler.AddParam(1)) + require.Equal(t, []interface{}{1}, assembler.Params()) + + require.Equal(t, "$2", assembler.AddParam('a')) + require.Equal(t, []interface{}{1, 'a'}, assembler.Params()) + + require.Equal(t, "$3,$4", assembler.AddParams([]interface{}{"b", uint64(4)})) + require.Equal(t, []interface{}{1, 'a', "b", uint64(4)}, assembler.Params()) +} + +func TestGeneratePlaceholders(t *testing.T) { + for _, tc := range []struct { + Start, Count int + Exp string + }{ + {Start: -1, Count: -1, Exp: "$1"}, + {Start: 0, Count: -1, Exp: "$1"}, + {Start: 0, Count: 0, Exp: "$1"}, + {Start: 1, Count: 0, Exp: "$1"}, + {Start: 1, Count: 1, Exp: "$1"}, + {Start: 5, Count: 3, Exp: "$5,$6,$7"}, + {Start: 5, Count: -1, Exp: "$5"}, + } { + t.Run("", func(t *testing.T) { + require.Equal(t, tc.Exp, GeneratePlaceholders(tc.Start, tc.Count)) + }) + } +} + +func TestScanAll(t *testing.T) { + db := getDB(t) + + var ids Uint64Provider + notEmptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id)") + require.NoError(t, err) + + require.NoError(t, ScanAll(notEmptyRows, &ids)) + require.Equal(t, []uint64{1, 200, 300500}, ids.Values()) + + var nothing Uint64Provider + emptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id) WHERE id < 0") + require.NoError(t, err) + + require.NoError(t, ScanAll(emptyRows, ¬hing)) + require.Equal(t, ([]uint64)(nil), nothing.Values()) +} diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index f05fd7051..eceddf891 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -31,7 +31,7 @@ type DB struct { func (db DB) Truncate(t testing.TB, tables ...string) { t.Helper() - tmpl := strings.Repeat("TRUNCATE TABLE %q RESTART IDENTITY;\n", len(tables)) + tmpl := strings.Repeat("TRUNCATE TABLE %q RESTART IDENTITY CASCADE;\n", len(tables)) params := make([]interface{}, len(tables)) for i, table := range tables { params[i] = table @@ -49,6 +49,14 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) } +func (db DB) TruncateAll(t testing.TB) { + db.Truncate(t, + "gitaly_replication_queue_job_lock", + "gitaly_replication_queue", + "gitaly_replication_queue_lock", + ) +} + // Close removes schema if it was used and releases connection pool. func (db DB) Close() error { if err := db.DB.Close(); err != nil { @@ -60,25 +68,29 @@ func (db DB) Close() error { // GetDB returns a wrapper around the database connection pool. // Must be used only for testing. // The new database 'gitaly_test' will be re-created for each package that uses this function. -// The best place to call it is in individual testing functions +// Each call will also truncate all tables with their identities restarted if any. +// The best place to call it is in individual testing functions. // It uses env vars: // PGHOST - required, URL/socket/dir // PGPORT - required, binding port // PGUSER - optional, user - `$ whoami` would be used if not provided -func GetDB(t testing.TB) DB { +func GetDB(t testing.TB, database string) DB { t.Helper() testDBInitOnce.Do(func() { - sqlDB := initGitalyTestDB(t) + sqlDB := initGitalyTestDB(t, database) _, mErr := Migrate(sqlDB) require.NoError(t, mErr, "failed to run database migration") testDB = DB{DB: sqlDB} }) + + testDB.TruncateAll(t) + return testDB } -func initGitalyTestDB(t testing.TB) *sql.DB { +func initGitalyTestDB(t testing.TB, database string) *sql.DB { t.Helper() getEnvFromGDK(t) @@ -104,17 +116,21 @@ func initGitalyTestDB(t testing.TB) *sql.DB { require.NoError(t, oErr, "failed to connect to 'postgres' database") defer func() { require.NoError(t, postgresDB.Close()) }() - _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS gitaly_test") + rows, tErr := postgresDB.Query("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'") + require.NoError(t, tErr) + require.NoError(t, rows.Close()) + + _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS " + database) require.NoError(t, dErr, "failed to drop 'gitaly_test' database") - _, cErr := postgresDB.Exec("CREATE DATABASE gitaly_test WITH ENCODING 'UTF8'") - require.NoError(t, cErr, "failed to create 'gitaly_test' database") + _, cErr := postgresDB.Exec("CREATE DATABASE " + database + " WITH ENCODING 'UTF8'") + require.NoErrorf(t, cErr, "failed to create %q database", database) require.NoError(t, postgresDB.Close(), "error on closing connection to 'postgres' database") // connect to the testing database - dbCfg.DBName = "gitaly_test" + dbCfg.DBName = database gitalyTestDB, err := OpenDB(dbCfg) - require.NoError(t, err, "failed to connect to 'gitaly_test' database") + require.NoErrorf(t, err, "failed to connect to %q database", database) return gitalyTestDB } diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/praefect/datastore/glsql/testing_test.go index ad576d1ba..6d1035bba 100644 --- a/internal/praefect/datastore/glsql/testing_test.go +++ b/internal/praefect/datastore/glsql/testing_test.go @@ -9,7 +9,7 @@ import ( ) func TestDB_Truncate(t *testing.T) { - db := GetDB(t) + db := getDB(t) _, err := db.Exec("CREATE TABLE truncate_tbl(id BIGSERIAL PRIMARY KEY)") require.NoError(t, err) diff --git a/internal/praefect/datastore/init_test.go b/internal/praefect/datastore/init_test.go new file mode 100644 index 000000000..bfbffa166 --- /dev/null +++ b/internal/praefect/datastore/init_test.go @@ -0,0 +1,22 @@ +// +build postgres + +package datastore + +import ( + "log" + "os" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" +) + +func TestMain(m *testing.M) { + code := m.Run() + // Clean closes connection to database once all tests are done + if err := glsql.Clean(); err != nil { + log.Fatalln(err, "database disconnection failure") + } + os.Exit(code) +} + +func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t, "datastore") } diff --git a/internal/praefect/datastore/migrations/20200224220728_job_queue.go b/internal/praefect/datastore/migrations/20200224220728_job_queue.go new file mode 100644 index 000000000..e63116354 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200224220728_job_queue.go @@ -0,0 +1,46 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200224220728_job_queue", + Up: []string{` +CREATE TYPE GITALY_REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed') +`, ` +CREATE TABLE gitaly_replication_queue_lock +( + id TEXT PRIMARY KEY + , acquired BOOLEAN NOT NULL DEFAULT FALSE +) +`, ` +CREATE TABLE gitaly_replication_queue +( + id BIGSERIAL PRIMARY KEY + , state GITALY_REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready' + , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC') + , updated_at TIMESTAMP WITHOUT TIME ZONE + , attempt INTEGER NOT NULL DEFAULT 3 + , lock_id TEXT + , job JSONB +)`, ` +CREATE TABLE gitaly_replication_queue_job_lock +( + job_id BIGINT REFERENCES gitaly_replication_queue(id) + , lock_id TEXT REFERENCES gitaly_replication_queue_lock(id) + , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC') + , CONSTRAINT gitaly_replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id) +)`, + }, + Down: []string{` +DROP TABLE IF EXISTS gitaly_replication_queue_job_lock CASCADE +`, ` +DROP TABLE IF EXISTS gitaly_replication_queue CASCADE +`, ` +DROP TABLE IF EXISTS gitaly_replication_queue_lock CASCADE +`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go new file mode 100644 index 000000000..db0c0edf4 --- /dev/null +++ b/internal/praefect/datastore/queue.go @@ -0,0 +1,265 @@ +package datastore + +import ( + "context" + "database/sql" + "database/sql/driver" + "encoding/json" + "fmt" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" +) + +// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back. +type ReplicationEventQueue interface { + // Enqueue puts provided event into the persistent queue. + Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) + // Dequeue retrieves events from the persistent queue using provided limitations and filters. + Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) +} + +// ReplicationJob is a persistent representation of the replication job. +type ReplicationJob struct { + Change string `json:"change"` + RelativePath string `json:"relative_path"` + TargetNodeStorage string `json:"target_node_storage"` + SourceNodeStorage string `json:"source_node_storage"` + Params Params `json:"params"` +} + +func (job *ReplicationJob) Scan(value interface{}) error { + if value == nil { + return nil + } + + d, ok := value.([]byte) + if !ok { + return fmt.Errorf("unexpected type received: %T", value) + } + + return json.Unmarshal(d, job) +} + +func (job ReplicationJob) Value() (driver.Value, error) { + return json.Marshal(job) +} + +// ReplicationEvent is a persistent representation of the replication event. +type ReplicationEvent struct { + ID uint64 + State string + Attempt int + LockID string + CreatedAt time.Time + UpdatedAt *time.Time + Job ReplicationJob +} + +// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases. +func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) { + var mapping []interface{} + for _, column := range columns { + switch column { + case "id": + mapping = append(mapping, &event.ID) + case "state": + mapping = append(mapping, &event.State) + case "created_at": + mapping = append(mapping, &event.CreatedAt) + case "updated_at": + mapping = append(mapping, &event.UpdatedAt) + case "attempt": + mapping = append(mapping, &event.Attempt) + case "lock_id": + mapping = append(mapping, &event.LockID) + case "job": + mapping = append(mapping, &event.Job) + default: + return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column) + } + } + return mapping, nil +} + +// Scan fills receive fields with values fetched from database based on the set of columns/column aliases. +func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error { + mappings, err := event.Mapping(columns) + if err != nil { + return err + } + return rows.Scan(mappings...) +} + +// ScanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases. +func ScanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) { + columns, err := rows.Columns() + if err != nil { + return events, err + } + + defer func() { + if cErr := rows.Close(); cErr != nil && err == nil { + err = cErr + } + }() + + for rows.Next() { + var event ReplicationEvent + if err = event.Scan(columns, rows); err != nil { + return events, err + } + events = append(events, event) + } + + return events, rows.Err() +} + +// interface implementation protection +var _ ReplicationEventQueue = PostgresReplicationEventQueue{} + +// PostgresReplicationEventQueue is a Postgres implementation of persistent queue. +type PostgresReplicationEventQueue struct { + qc glsql.Querier +} + +func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { + query := ` +WITH insert_lock AS ( + INSERT INTO gitaly_replication_queue_lock(id) + VALUES ($1 || '|' || $2) + ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id + RETURNING id +) +INSERT INTO gitaly_replication_queue(lock_id, job) +SELECT insert_lock.id, $3 +FROM insert_lock +RETURNING id, state, created_at, updated_at, lock_id, attempt, job` + // this will always return a single row result (because of lock uniqueness) or an error + rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job) + if err != nil { + return ReplicationEvent{}, err + } + + events, err := ScanReplicationEvents(rows) + if err != nil { + return ReplicationEvent{}, err + } + + return events[0], nil +} + +func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) { + query := ` +WITH to_lock AS ( + SELECT id + FROM gitaly_replication_queue_lock AS repo_lock + WHERE repo_lock.acquired = FALSE AND repo_lock.id IN ( + SELECT lock_id + FROM gitaly_replication_queue + WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1 + ORDER BY created_at + LIMIT $2 FOR UPDATE + ) + FOR UPDATE SKIP LOCKED +) +, jobs AS ( + UPDATE gitaly_replication_queue AS queue + SET attempt = queue.attempt - 1, + state = 'in_progress', + updated_at = NOW() AT TIME ZONE 'UTC' + FROM to_lock + WHERE queue.lock_id IN (SELECT id FROM to_lock) + AND state NOT IN ('in_progress', 'cancelled', 'completed') + AND queue.id IN ( + SELECT id + FROM gitaly_replication_queue + WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1 + ORDER BY created_at + LIMIT $2 + ) + RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job +) +, track_job_lock AS ( + INSERT INTO gitaly_replication_queue_job_lock (job_id, lock_id, triggered_at) + SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs + RETURNING lock_id +) +, do_lock AS ( + UPDATE gitaly_replication_queue_lock + SET acquired = TRUE + WHERE id IN (SELECT lock_id FROM track_job_lock) +) +SELECT id, state, created_at, updated_at, lock_id, attempt, job +FROM jobs +ORDER BY id +` + rows, err := rq.qc.QueryContext(ctx, query, nodeStorage, count) + if err != nil { + return nil, err + } + + return ScanReplicationEvents(rows) +} + +// Acknowledge updates previously dequeued events with new state releasing resources acquired for it. +// It only updates events that are in 'in_progress' state. +// It returns list of ids that was actually acknowledged. +func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state string, ids []uint64) ([]uint64, error) { + if len(ids) == 0 { + return nil, nil + } + + params := glsql.NewParamsAssembler() + query := ` +WITH existing AS ( + SELECT id, lock_id + FROM gitaly_replication_queue + WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `) + AND state = 'in_progress' + FOR UPDATE +) +, to_release AS ( + UPDATE gitaly_replication_queue AS queue + SET state = ` + params.AddParam(state) + ` + FROM existing + WHERE existing.id = queue.id + RETURNING queue.id, queue.lock_id +) +, removed_job_lock AS ( + DELETE FROM gitaly_replication_queue_job_lock AS job_lock + USING to_release AS job_failed + WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id + RETURNING job_failed.lock_id +) +, release AS ( + UPDATE gitaly_replication_queue_lock + SET acquired = FALSE + WHERE id IN ( + SELECT existing.lock_id + FROM ( + SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id + ) AS removed + JOIN ( + SELECT lock_id, COUNT(*) AS amount + FROM gitaly_replication_queue_job_lock + WHERE lock_id IN (SELECT lock_id FROM removed_job_lock) + GROUP BY lock_id + ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount + ) +) +SELECT id +FROM existing +` + rows, err := rq.qc.QueryContext(ctx, query, params.Params()...) + if err != nil { + return nil, err + } + + var acknowledged glsql.Uint64Provider + if err := glsql.ScanAll(rows, &acknowledged); err != nil { + return nil, err + } + + return acknowledged.Values(), nil +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go new file mode 100644 index 000000000..3294ec68a --- /dev/null +++ b/internal/praefect/datastore/queue_test.go @@ -0,0 +1,622 @@ +// +build postgres + +package datastore + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + eventType := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + actualEvent, err := queue.Enqueue(ctx, eventType) // initial event + require.NoError(t, err) + actualEvent.CreatedAt = time.Time{} // we need to setup it to default because it is not possible to get it beforehand for expected + + expLock := LockRow{ID: "gitaly-1|/project/path-1", Acquired: false} + + expEvent := ReplicationEvent{ + ID: 1, + State: "ready", + Attempt: 3, + LockID: "gitaly-1|/project/path-1", + Job: ReplicationJob{ + Change: "update", + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + require.Equal(t, expEvent, actualEvent) + requireEvents(t, ctx, db, []ReplicationEvent{expEvent}) + requireLocks(t, ctx, db, []LockRow{expLock}) // expected a new lock for new event + db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) +} + +func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + eventType1 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + eventType2 := ReplicationEvent{ + Job: ReplicationJob{ + Change: RenameRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "", + Params: Params{"RelativePath": "/project/path-1-renamed"}, + }, + } + + eventType3 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + event1, err := queue.Enqueue(ctx, eventType1) // initial event + require.NoError(t, err) + + expLock1 := LockRow{ID: "gitaly-1|/project/path-1", Acquired: false} + expLock2 := LockRow{ID: "gitaly-2|/project/path-1", Acquired: false} + expLock3 := LockRow{ID: "gitaly-1|/project/path-2", Acquired: false} + + expEvent1 := ReplicationEvent{ + ID: event1.ID, + State: "ready", + Attempt: 3, + LockID: "gitaly-1|/project/path-1", + Job: ReplicationJob{ + Change: "update", + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1}) + requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event + db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) + + event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event + require.NoError(t, err) + + expEvent2 := ReplicationEvent{ + ID: event2.ID, + State: "ready", + Attempt: 3, + LockID: "gitaly-1|/project/path-1", + Job: ReplicationJob{ + Change: "update", + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2}) + requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event + + event3, err := queue.Enqueue(ctx, eventType2) // event for another target + require.NoError(t, err) + + expEvent3 := ReplicationEvent{ + ID: event3.ID, + State: "ready", + Attempt: 3, + LockID: "gitaly-2|/project/path-1", + Job: ReplicationJob{ + Change: "rename", + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "", + Params: Params{"RelativePath": "/project/path-1-renamed"}, + }, + } + + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) + requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event + + event4, err := queue.Enqueue(ctx, eventType3) // event for another repo + require.NoError(t, err) + + expEvent4 := ReplicationEvent{ + ID: event4.ID, + State: "ready", + Attempt: 3, + LockID: "gitaly-1|/project/path-2", + Job: ReplicationJob{ + Change: "update", + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4}) + requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo + + db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) // there is no fetches it must be empty +} + +func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + event := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + event, err := queue.Enqueue(ctx, event) + require.NoError(t, err, "failed to fill in event queue") + + noEvents, err := queue.Dequeue(ctx, "not existing storage", 5) + require.NoError(t, err) + require.Len(t, noEvents, 0, "there must be no events dequeued for not existing storage") + + expectedEvent := event + expectedEvent.State = "in_progress" + expectedEvent.Attempt = 2 + + expectedLock := LockRow{ID: event.LockID, Acquired: true} // as we deque events we acquire lock for processing + + expectedJobLock := JobLockRow{JobID: event.ID, LockID: event.LockID} // and there is a track if job is under processing in separate table + + actual, err := queue.Dequeue(ctx, event.Job.TargetNodeStorage, 5) + require.NoError(t, err) + + for i := range actual { + actual[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database + } + require.Equal(t, []ReplicationEvent{expectedEvent}, actual) + + // there is only one single lock for all fetched events + requireLocks(t, ctx, db, []LockRow{expectedLock}) + requireJobLocks(t, ctx, db, []JobLockRow{expectedJobLock}) +} + +// expected results are listed as literals on purpose to be more explicit about what is going on with data +func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + eventType1 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + eventType2 := ReplicationEvent{ + Job: ReplicationJob{ + Change: DeleteRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "", + Params: nil, + }, + } + + eventType3 := ReplicationEvent{ + Job: ReplicationJob{ + Change: RenameRepo.String(), + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: Params{"RelativePath": "/project/path-2-renamed"}, + }, + } + + events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3} // events to fill in the queue + for i := range events { + var err error + events[i], err = queue.Enqueue(ctx, events[i]) + require.NoError(t, err, "failed to fill in event queue") + } + + // first request to deque + const limitFirstN = 3 // limit on the amount of jobs we are gonna to deque + + expectedEvents1 := make([]ReplicationEvent, limitFirstN) + expectedJobLocks1 := make([]JobLockRow, limitFirstN) + for i := range expectedEvents1 { + expectedEvents1[i] = events[i] + expectedEvents1[i].State = "in_progress" + expectedEvents1[i].Attempt = 2 + + expectedJobLocks1[i].JobID = expectedEvents1[i].ID + expectedJobLocks1[i].LockID = "gitaly-1|/project/path-1" + } + + // we expect only first two types of events by limiting count to 3 + dequeuedEvents1, err := queue.Dequeue(ctx, "gitaly-1", limitFirstN) + require.NoError(t, err) + for i := range dequeuedEvents1 { + dequeuedEvents1[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database + } + require.Equal(t, expectedEvents1, dequeuedEvents1) + + requireLocks(t, ctx, db, []LockRow{ + // there is only one single lock for all fetched events because of their 'repo' and 'target' combination + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: false}, + }) + requireJobLocks(t, ctx, db, expectedJobLocks1) + + // second request to deque + + // there must be only last event fetched from the queue + expectedEvents2 := []ReplicationEvent{events[len(events)-1]} + expectedEvents2[0].State = "in_progress" + expectedEvents2[0].Attempt = 2 + + expectedJobLocks2 := []JobLockRow{{JobID: 5, LockID: "gitaly-1|/project/path-2"}} + + dequeuedEvents2, err := queue.Dequeue(ctx, "gitaly-1", 100500) + require.NoError(t, err) + require.Len(t, dequeuedEvents2, 1, "only one event must be fetched from the queue") + + dequeuedEvents2[0].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database + require.Equal(t, expectedEvents2, dequeuedEvents2) + + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: true}, + }) + requireJobLocks(t, ctx, db, append(expectedJobLocks1, expectedJobLocks2...)) + + // this event wasn't not consumed by the first deque because of the limit + // it is also wasn't consumed by the second deque because there is already a lock acquired for this type of event + remainingEvents := []ReplicationEvent{events[3]} + expectedEvents := append(append(expectedEvents1, remainingEvents...), expectedEvents2...) + requireEvents(t, ctx, db, expectedEvents) +} + +func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + event := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + event, err := queue.Enqueue(ctx, event) + require.NoError(t, err, "failed to fill in event queue") + + actual, err := queue.Dequeue(ctx, event.Job.TargetNodeStorage, 100) + require.NoError(t, err) + + // as we deque events we acquire lock for processing + requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: true}}) + requireJobLocks(t, ctx, db, []JobLockRow{{JobID: event.ID, LockID: event.LockID}}) + + acknowledged, err := queue.Acknowledge(ctx, "completed", []uint64{actual[0].ID, 100500}) + require.NoError(t, err) + require.Equal(t, []uint64{actual[0].ID}, acknowledged) + + event.State = "completed" + event.Attempt = 2 + requireEvents(t, ctx, db, []ReplicationEvent{event}) + // lock must be released as the event was acknowledged and there are no other events left protected under this lock + requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: false}}) + // all associated with acknowledged event tracking bindings between lock and event must be removed + requireJobLocks(t, ctx, db, nil) +} + +func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + queue := PostgresReplicationEventQueue{db.DB} + + eventType1 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + eventType2 := ReplicationEvent{ + Job: ReplicationJob{ + Change: DeleteRepo.String(), + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "", + Params: nil, + }, + } + + eventType3 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-3", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + eventType4 := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo.String(), + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "gitaly-0", + Params: nil, + }, + } + + events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue + for i := range events { + var err error + events[i], err = queue.Enqueue(ctx, events[i]) + require.NoError(t, err, "failed to fill in event queue") + } + + // we expect only first two types of events by limiting count to 3 + dequeuedEvents1, err := queue.Dequeue(ctx, "gitaly-1", 3) + require.NoError(t, err) + require.Len(t, dequeuedEvents1, 3) + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: true}, + {ID: "gitaly-1|/project/path-3", Acquired: false}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 1, LockID: "gitaly-1|/project/path-1"}, + {JobID: 2, LockID: "gitaly-1|/project/path-1"}, + {JobID: 3, LockID: "gitaly-1|/project/path-2"}, + }) + + // release lock for events of second type + acknowledge1, err := queue.Acknowledge(ctx, "failed", []uint64{3}) + require.NoError(t, err) + require.Equal(t, []uint64{3}, acknowledge1) + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: false}, + {ID: "gitaly-1|/project/path-3", Acquired: false}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 1, LockID: "gitaly-1|/project/path-1"}, + {JobID: 2, LockID: "gitaly-1|/project/path-1"}, + }) + + dequeuedEvents2, err := queue.Dequeue(ctx, "gitaly-1", 3) + require.NoError(t, err) + require.Len(t, dequeuedEvents2, 2, "expected: event of type 3 and of type 2 ('failed' will be fetched for retry)") + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: true}, + {ID: "gitaly-1|/project/path-3", Acquired: true}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 1, LockID: "gitaly-1|/project/path-1"}, + {JobID: 2, LockID: "gitaly-1|/project/path-1"}, + {JobID: 5, LockID: "gitaly-1|/project/path-3"}, + {JobID: 3, LockID: "gitaly-1|/project/path-2"}, + }) + + acknowledge2, err := queue.Acknowledge(ctx, "completed", []uint64{1, 3}) + require.NoError(t, err) + require.Equal(t, []uint64{1, 3}, acknowledge2) + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: false}, + {ID: "gitaly-1|/project/path-3", Acquired: true}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 2, LockID: "gitaly-1|/project/path-1"}, + {JobID: 5, LockID: "gitaly-1|/project/path-3"}, + }) + + dequeuedEvents3, err := queue.Dequeue(ctx, "gitaly-2", 3) + require.NoError(t, err) + require.Len(t, dequeuedEvents3, 1, "expected: event of type 4") + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: false}, + {ID: "gitaly-1|/project/path-3", Acquired: true}, + {ID: "gitaly-2|/project/path-1", Acquired: true}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 2, LockID: "gitaly-1|/project/path-1"}, + {JobID: 5, LockID: "gitaly-1|/project/path-3"}, + {JobID: 7, LockID: "gitaly-2|/project/path-1"}, + }) + + acknowledged3, err := queue.Acknowledge(ctx, "completed", []uint64{2, 5, 7}) + require.NoError(t, err) + require.Equal(t, []uint64{2, 5, 7}, acknowledged3) + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: false}, + {ID: "gitaly-1|/project/path-2", Acquired: false}, + {ID: "gitaly-1|/project/path-3", Acquired: false}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, nil) + + dequeuedEvents4, err := queue.Dequeue(ctx, "gitaly-1", 100500) + require.NoError(t, err) + require.Len(t, dequeuedEvents4, 2, "expected: event of type 4") + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: true}, + {ID: "gitaly-1|/project/path-3", Acquired: false}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 4, LockID: "gitaly-1|/project/path-1"}, + {JobID: 6, LockID: "gitaly-1|/project/path-2"}, + }) + + newEvent, err := queue.Enqueue(ctx, eventType1) + require.NoError(t, err) + + acknowledge4, err := queue.Acknowledge(ctx, "completed", []uint64{newEvent.ID}) + require.NoError(t, err) + require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged + var newEventState string + require.NoError(t, db.QueryRow("SELECT state FROM gitaly_replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState)) + require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)") + requireLocks(t, ctx, db, []LockRow{ + {ID: "gitaly-1|/project/path-1", Acquired: true}, + {ID: "gitaly-1|/project/path-2", Acquired: true}, + {ID: "gitaly-1|/project/path-3", Acquired: false}, + {ID: "gitaly-2|/project/path-1", Acquired: false}, + }) + requireJobLocks(t, ctx, db, []JobLockRow{ + {JobID: 4, LockID: "gitaly-1|/project/path-1"}, + {JobID: 6, LockID: "gitaly-1|/project/path-2"}, + }) +} + +func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { + t.Helper() + + // as it is not possible to expect exact time of entity creation/update we do not fetch it from database + // and we do not take it into account from expected values. + exp := make([]ReplicationEvent, len(expected)) // make a copy to avoid side effects for passed values + for i, e := range expected { + exp[i] = e + // set to default values as they would not be initialized from database + exp[i].CreatedAt = time.Time{} + exp[i].UpdatedAt = nil + } + + sqlStmt := `SELECT id, state, attempt, lock_id, job FROM gitaly_replication_queue ORDER BY id` + rows, err := db.QueryContext(ctx, sqlStmt) + require.NoError(t, err) + + actual, err := ScanReplicationEvents(rows) + require.NoError(t, err) + require.Equal(t, exp, actual) +} + +// LockRow exists only for testing purposes and represents entries from gitaly_replication_queue_lock table. +type LockRow struct { + ID string + Acquired bool +} + +func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) { + t.Helper() + + sqlStmt := `SELECT id, acquired FROM gitaly_replication_queue_lock` + rows, err := db.QueryContext(ctx, sqlStmt) + require.NoError(t, err) + defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }() + + var actual []LockRow + for rows.Next() { + var entry LockRow + require.NoError(t, rows.Scan(&entry.ID, &entry.Acquired), "failed to scan entry") + actual = append(actual, entry) + } + require.NoError(t, rows.Err(), "completion of result loop scan") + require.ElementsMatch(t, expected, actual) +} + +// JobLockRow exists only for testing purposes and represents entries from gitaly_replication_queue_job_lock table. +type JobLockRow struct { + JobID uint64 + LockID string + TriggeredAt time.Time +} + +func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) { + t.Helper() + + sqlStmt := `SELECT job_id, lock_id FROM gitaly_replication_queue_job_lock ORDER BY triggered_at` + rows, err := db.QueryContext(ctx, sqlStmt) + require.NoError(t, err) + defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }() + + var actual []JobLockRow + for rows.Next() { + var entry JobLockRow + require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID), "failed to scan entry") + actual = append(actual, entry) + } + require.NoError(t, rows.Err(), "completion of result loop scan") + require.ElementsMatch(t, expected, actual) +} |