Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-03-10 20:00:05 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-03-10 20:00:05 +0300
commitc5965df04cf581c2e11fa0dba9bbf56fe40e4282 (patch)
tree23cc2a74c4e9724da2f16b83174d2a6c78633187
parent38f68c610c0b8394aa2aacac5b3a1460293e0383 (diff)
Praefect: Move replication queue to database
Implementation of the queue for replication jobs using Postgres database. It is contains three tables: one for replication jobs itself, one to make a locks for a storage to support single job run and the last one to track the lock acquisition if there is multiple jobs. It also includes changes to glsql package that depends on database independently. Now each call to GetDB will truncate existing tables with identity restart and cascade removal. 'test-postgres' task now has '-count=1' flag to prevent caching. Part of: https://gitlab.com/gitlab-org/gitaly/issues/2166
-rw-r--r--_support/Makefile.template2
-rw-r--r--changelogs/unreleased/ps-persistent-queue.yml5
-rw-r--r--internal/praefect/datastore/datastore.go16
-rw-r--r--internal/praefect/datastore/glsql/init_test.go2
-rw-r--r--internal/praefect/datastore/glsql/postgres.go120
-rw-r--r--internal/praefect/datastore/glsql/postgres_test.go99
-rw-r--r--internal/praefect/datastore/glsql/testing.go36
-rw-r--r--internal/praefect/datastore/glsql/testing_test.go2
-rw-r--r--internal/praefect/datastore/init_test.go22
-rw-r--r--internal/praefect/datastore/migrations/20200224220728_job_queue.go46
-rw-r--r--internal/praefect/datastore/queue.go265
-rw-r--r--internal/praefect/datastore/queue_test.go622
12 files changed, 1218 insertions, 19 deletions
diff --git a/_support/Makefile.template b/_support/Makefile.template
index e75179beb..059b5adea 100644
--- a/_support/Makefile.template
+++ b/_support/Makefile.template
@@ -151,7 +151,7 @@ rspec-gitlab-shell: {{ .GitlabShellDir }}/config.yml assemble-go prepare-tests
.PHONY: test-postgres
test-postgres: prepare-tests
- @cd {{ .SourceDir }} && go test -tags postgres gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/...
+ @cd {{ .SourceDir }} && go test -tags postgres -count=1 gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/...
.PHONY: verify
verify: check-mod-tidy check-formatting notice-up-to-date check-proto rubocop
diff --git a/changelogs/unreleased/ps-persistent-queue.yml b/changelogs/unreleased/ps-persistent-queue.yml
new file mode 100644
index 000000000..48fbe24de
--- /dev/null
+++ b/changelogs/unreleased/ps-persistent-queue.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: Move replication queue to database'
+merge_request: 1865
+author:
+type: added
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 5f950e19e..a16743a13 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -55,7 +55,21 @@ const (
RenameRepo
)
-// Params represent additional set of parameters required for replication job.
+func (ct ChangeType) String() string {
+ switch ct {
+ case UpdateRepo:
+ return "update"
+ case DeleteRepo:
+ return "delete"
+ case RenameRepo:
+ return "rename"
+ default:
+ return "UNDEFINED"
+ }
+}
+
+// Params represent additional information required to process event after fetching it from storage.
+// It must be JSON encodable/decodable to persist it without problems.
type Params map[string]interface{}
// ReplJob is an instance of a queued replication job. A replication job is
diff --git a/internal/praefect/datastore/glsql/init_test.go b/internal/praefect/datastore/glsql/init_test.go
index 34b024451..8423b6205 100644
--- a/internal/praefect/datastore/glsql/init_test.go
+++ b/internal/praefect/datastore/glsql/init_test.go
@@ -16,3 +16,5 @@ func TestMain(m *testing.M) {
}
os.Exit(code)
}
+
+func getDB(t testing.TB) DB { return GetDB(t, "glsql") }
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 34250241b..455282af6 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -4,6 +4,8 @@ package glsql
import (
"context"
"database/sql"
+ "strconv"
+ "strings"
// Blank import to enable integration of github.com/lib/pq into database/sql
_ "github.com/lib/pq"
@@ -33,6 +35,12 @@ func Migrate(db *sql.DB) (int, error) {
return migrate.Exec(db, "postgres", migrationSource, migrate.Up)
}
+// Querier is an abstraction on *sql.DB and *sql.Tx that allows to use their methods without awareness about actual type.
+type Querier interface {
+ QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
+ QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+}
+
// TxQuery runs operations inside transaction and commits|rollbacks on Done.
type TxQuery interface {
// Exec calls op function with provided ctx.
@@ -100,3 +108,115 @@ func (txq *txQuery) log(err error, msg string) {
txq.logger.WithError(err).Error(msg)
}
}
+
+// Uint64sToInterfaces converts list of uint64 values to the list of empty interfaces.
+func Uint64sToInterfaces(vs ...uint64) []interface{} {
+ if vs == nil {
+ return nil
+ }
+
+ rs := make([]interface{}, len(vs))
+ for i, v := range vs {
+ rs[i] = v
+ }
+ return rs
+}
+
+// GeneratePlaceholders returns string with 'count' placeholders starting from 'start' index.
+// 1 will be used if provided value for 'start' is less then 1.
+// 1 will be used if provided value for 'count' is less then 1.
+func GeneratePlaceholders(start, count int) string {
+ if start < 1 {
+ start = 1
+ }
+
+ if count <= 1 {
+ return "$" + strconv.Itoa(start)
+ }
+
+ var builder = strings.Builder{}
+ for i := start; i < start+count; i++ {
+ if i != start {
+ builder.WriteString(",")
+ }
+ builder.WriteString("$")
+ builder.WriteString(strconv.Itoa(i))
+ }
+ return builder.String()
+}
+
+// NewParamsAssembler returns
+func NewParamsAssembler() *ParamsAssembler {
+ return &ParamsAssembler{}
+}
+
+// ParamsAssembler helps to assemble parameters of the query together providing placeholders that must be used in query.
+type ParamsAssembler []interface{}
+
+// AddParams receives n params and assemble them with other params and returns generated placeholder as a result.
+func (pm *ParamsAssembler) AddParams(params []interface{}) string {
+ start := len(*pm)
+ *pm = append(*pm, params...)
+ return GeneratePlaceholders(start+1, len(params))
+}
+
+// AddParam receives param and assemble it with other params and returns generated placeholder as a result.
+func (pm *ParamsAssembler) AddParam(param interface{}) string {
+ return pm.AddParams([]interface{}{param})
+}
+
+// Params returns list of previously assembled parameters.
+func (pm *ParamsAssembler) Params() []interface{} {
+ if pm != nil {
+ return *pm
+ }
+ return nil
+}
+
+// DestProvider returns list of pointers that will be used to scan values into.
+type DestProvider interface {
+ // To returns list of pointers.
+ // It is not an idempotent operation and each call will return a new list.
+ To() []interface{}
+}
+
+// ScanAll reads all data from 'rows' into holders provided by 'in'.
+// It will also 'Close' source after completion.
+func ScanAll(rows *sql.Rows, in DestProvider) (err error) {
+ defer func() {
+ if cErr := rows.Close(); cErr != nil && err == nil {
+ err = cErr
+ }
+ }()
+
+ for rows.Next() {
+ if err = rows.Scan(in.To()...); err != nil {
+ return err
+ }
+ }
+ err = rows.Err()
+ return err
+}
+
+// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result aa a slice.
+type Uint64Provider []*uint64
+
+// Values returns list of values read from *sql.Rows
+func (p *Uint64Provider) Values() []uint64 {
+ if len(*p) == 0 {
+ return nil
+ }
+
+ r := make([]uint64, len(*p))
+ for i, v := range *p {
+ r[i] = *v
+ }
+ return r
+}
+
+// To returns a list of pointers that will be used as a destination for scan operation.
+func (p *Uint64Provider) To() []interface{} {
+ var d = new(uint64)
+ *p = append(*p, d)
+ return []interface{}{d}
+}
diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go
index 586375c65..7ad1049e0 100644
--- a/internal/praefect/datastore/glsql/postgres_test.go
+++ b/internal/praefect/datastore/glsql/postgres_test.go
@@ -46,7 +46,7 @@ func TestOpenDB(t *testing.T) {
}
func TestTxQuery_MultipleOperationsSuccess(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -76,7 +76,7 @@ func TestTxQuery_MultipleOperationsSuccess(t *testing.T) {
}
func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -118,7 +118,7 @@ func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) {
}
func TestTxQuery_ContextHandled(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
@@ -155,7 +155,7 @@ func TestTxQuery_ContextHandled(t *testing.T) {
}
func TestTxQuery_FailedToCommit(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -189,7 +189,7 @@ func TestTxQuery_FailedToCommit(t *testing.T) {
}
func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -237,7 +237,7 @@ func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) {
}
func TestTxQuery_FailedToCommitWithFailedOperation(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -295,3 +295,90 @@ func createBasicTable(t *testing.T, db DB, tname string) func() {
require.NoError(t, err)
}
}
+
+func TestUint64sToInterfaces(t *testing.T) {
+ for _, tc := range []struct {
+ From []uint64
+ Exp []interface{}
+ }{
+ {From: nil, Exp: nil},
+ {From: []uint64{1}, Exp: []interface{}{uint64(1)}},
+ {From: []uint64{2, 3, 0}, Exp: []interface{}{uint64(2), uint64(3), uint64(0)}},
+ } {
+ t.Run("", func(t *testing.T) {
+ require.Equal(t, tc.Exp, Uint64sToInterfaces(tc.From...))
+ })
+ }
+}
+
+func TestUint64Provider(t *testing.T) {
+ var provider Uint64Provider
+
+ dst1 := provider.To()
+ require.Equal(t, []interface{}{new(uint64)}, dst1, "must be a single value holder")
+ val1 := dst1[0].(*uint64)
+ *val1 = uint64(100)
+
+ dst2 := provider.To()
+ require.Equal(t, []interface{}{new(uint64)}, dst2, "must be a single value holder")
+ val2 := dst2[0].(*uint64)
+ *val2 = uint64(200)
+
+ require.Equal(t, []uint64{100, 200}, provider.Values())
+
+ dst3 := provider.To()
+ val3 := dst3[0].(*uint64)
+ *val3 = uint64(300)
+
+ require.Equal(t, []uint64{100, 200, 300}, provider.Values())
+}
+
+func TestParamsAssembler(t *testing.T) {
+ assembler := NewParamsAssembler()
+
+ require.Equal(t, "$1", assembler.AddParam(1))
+ require.Equal(t, []interface{}{1}, assembler.Params())
+
+ require.Equal(t, "$2", assembler.AddParam('a'))
+ require.Equal(t, []interface{}{1, 'a'}, assembler.Params())
+
+ require.Equal(t, "$3,$4", assembler.AddParams([]interface{}{"b", uint64(4)}))
+ require.Equal(t, []interface{}{1, 'a', "b", uint64(4)}, assembler.Params())
+}
+
+func TestGeneratePlaceholders(t *testing.T) {
+ for _, tc := range []struct {
+ Start, Count int
+ Exp string
+ }{
+ {Start: -1, Count: -1, Exp: "$1"},
+ {Start: 0, Count: -1, Exp: "$1"},
+ {Start: 0, Count: 0, Exp: "$1"},
+ {Start: 1, Count: 0, Exp: "$1"},
+ {Start: 1, Count: 1, Exp: "$1"},
+ {Start: 5, Count: 3, Exp: "$5,$6,$7"},
+ {Start: 5, Count: -1, Exp: "$5"},
+ } {
+ t.Run("", func(t *testing.T) {
+ require.Equal(t, tc.Exp, GeneratePlaceholders(tc.Start, tc.Count))
+ })
+ }
+}
+
+func TestScanAll(t *testing.T) {
+ db := getDB(t)
+
+ var ids Uint64Provider
+ notEmptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id)")
+ require.NoError(t, err)
+
+ require.NoError(t, ScanAll(notEmptyRows, &ids))
+ require.Equal(t, []uint64{1, 200, 300500}, ids.Values())
+
+ var nothing Uint64Provider
+ emptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id) WHERE id < 0")
+ require.NoError(t, err)
+
+ require.NoError(t, ScanAll(emptyRows, &nothing))
+ require.Equal(t, ([]uint64)(nil), nothing.Values())
+}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index eba642c65..893f3c6bc 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -30,7 +30,7 @@ type DB struct {
func (db DB) Truncate(t testing.TB, tables ...string) {
t.Helper()
- tmpl := strings.Repeat("TRUNCATE TABLE %q RESTART IDENTITY;\n", len(tables))
+ tmpl := strings.Repeat("TRUNCATE TABLE %q RESTART IDENTITY CASCADE;\n", len(tables))
params := make([]interface{}, len(tables))
for i, table := range tables {
params[i] = table
@@ -48,6 +48,14 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
}
+func (db DB) TruncateAll(t testing.TB) {
+ db.Truncate(t,
+ "gitaly_replication_queue_job_lock",
+ "gitaly_replication_queue",
+ "gitaly_replication_queue_lock",
+ )
+}
+
// Close removes schema if it was used and releases connection pool.
func (db DB) Close() error {
if err := db.DB.Close(); err != nil {
@@ -59,25 +67,29 @@ func (db DB) Close() error {
// GetDB returns a wrapper around the database connection pool.
// Must be used only for testing.
// The new database 'gitaly_test' will be re-created for each package that uses this function.
-// The best place to call it is in individual testing functions
+// Each call will also truncate all tables with their identities restarted if any.
+// The best place to call it is in individual testing functions.
// It uses env vars:
// PGHOST - required, URL/socket/dir
// PGPORT - required, binding port
// PGUSER - optional, user - `$ whoami` would be used if not provided
-func GetDB(t testing.TB) DB {
+func GetDB(t testing.TB, database string) DB {
t.Helper()
testDBInitOnce.Do(func() {
- sqlDB := initGitalyTestDB(t)
+ sqlDB := initGitalyTestDB(t, database)
_, mErr := Migrate(sqlDB)
require.NoError(t, mErr, "failed to run database migration")
testDB = DB{DB: sqlDB}
})
+
+ testDB.TruncateAll(t)
+
return testDB
}
-func initGitalyTestDB(t testing.TB) *sql.DB {
+func initGitalyTestDB(t testing.TB, database string) *sql.DB {
t.Helper()
host, hostFound := os.LookupEnv("PGHOST")
@@ -101,17 +113,21 @@ func initGitalyTestDB(t testing.TB) *sql.DB {
require.NoError(t, oErr, "failed to connect to 'postgres' database")
defer func() { require.NoError(t, postgresDB.Close()) }()
- _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS gitaly_test")
+ rows, tErr := postgresDB.Query("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'")
+ require.NoError(t, tErr)
+ require.NoError(t, rows.Close())
+
+ _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS " + database)
require.NoError(t, dErr, "failed to drop 'gitaly_test' database")
- _, cErr := postgresDB.Exec("CREATE DATABASE gitaly_test WITH ENCODING 'UTF8'")
- require.NoError(t, cErr, "failed to create 'gitaly_test' database")
+ _, cErr := postgresDB.Exec("CREATE DATABASE " + database + " WITH ENCODING 'UTF8'")
+ require.NoErrorf(t, cErr, "failed to create %q database", database)
require.NoError(t, postgresDB.Close(), "error on closing connection to 'postgres' database")
// connect to the testing database
- dbCfg.DBName = "gitaly_test"
+ dbCfg.DBName = database
gitalyTestDB, err := OpenDB(dbCfg)
- require.NoError(t, err, "failed to connect to 'gitaly_test' database")
+ require.NoErrorf(t, err, "failed to connect to %q database", database)
return gitalyTestDB
}
diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/praefect/datastore/glsql/testing_test.go
index ad576d1ba..6d1035bba 100644
--- a/internal/praefect/datastore/glsql/testing_test.go
+++ b/internal/praefect/datastore/glsql/testing_test.go
@@ -9,7 +9,7 @@ import (
)
func TestDB_Truncate(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
_, err := db.Exec("CREATE TABLE truncate_tbl(id BIGSERIAL PRIMARY KEY)")
require.NoError(t, err)
diff --git a/internal/praefect/datastore/init_test.go b/internal/praefect/datastore/init_test.go
new file mode 100644
index 000000000..bfbffa166
--- /dev/null
+++ b/internal/praefect/datastore/init_test.go
@@ -0,0 +1,22 @@
+// +build postgres
+
+package datastore
+
+import (
+ "log"
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+func TestMain(m *testing.M) {
+ code := m.Run()
+ // Clean closes connection to database once all tests are done
+ if err := glsql.Clean(); err != nil {
+ log.Fatalln(err, "database disconnection failure")
+ }
+ os.Exit(code)
+}
+
+func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t, "datastore") }
diff --git a/internal/praefect/datastore/migrations/20200224220728_job_queue.go b/internal/praefect/datastore/migrations/20200224220728_job_queue.go
new file mode 100644
index 000000000..e63116354
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200224220728_job_queue.go
@@ -0,0 +1,46 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200224220728_job_queue",
+ Up: []string{`
+CREATE TYPE GITALY_REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed')
+`, `
+CREATE TABLE gitaly_replication_queue_lock
+(
+ id TEXT PRIMARY KEY
+ , acquired BOOLEAN NOT NULL DEFAULT FALSE
+)
+`, `
+CREATE TABLE gitaly_replication_queue
+(
+ id BIGSERIAL PRIMARY KEY
+ , state GITALY_REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready'
+ , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
+ , updated_at TIMESTAMP WITHOUT TIME ZONE
+ , attempt INTEGER NOT NULL DEFAULT 3
+ , lock_id TEXT
+ , job JSONB
+)`, `
+CREATE TABLE gitaly_replication_queue_job_lock
+(
+ job_id BIGINT REFERENCES gitaly_replication_queue(id)
+ , lock_id TEXT REFERENCES gitaly_replication_queue_lock(id)
+ , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
+ , CONSTRAINT gitaly_replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id)
+)`,
+ },
+ Down: []string{`
+DROP TABLE IF EXISTS gitaly_replication_queue_job_lock CASCADE
+`, `
+DROP TABLE IF EXISTS gitaly_replication_queue CASCADE
+`, `
+DROP TABLE IF EXISTS gitaly_replication_queue_lock CASCADE
+`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
new file mode 100644
index 000000000..db0c0edf4
--- /dev/null
+++ b/internal/praefect/datastore/queue.go
@@ -0,0 +1,265 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "database/sql/driver"
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
+type ReplicationEventQueue interface {
+ // Enqueue puts provided event into the persistent queue.
+ Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
+ // Dequeue retrieves events from the persistent queue using provided limitations and filters.
+ Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error)
+}
+
+// ReplicationJob is a persistent representation of the replication job.
+type ReplicationJob struct {
+ Change string `json:"change"`
+ RelativePath string `json:"relative_path"`
+ TargetNodeStorage string `json:"target_node_storage"`
+ SourceNodeStorage string `json:"source_node_storage"`
+ Params Params `json:"params"`
+}
+
+func (job *ReplicationJob) Scan(value interface{}) error {
+ if value == nil {
+ return nil
+ }
+
+ d, ok := value.([]byte)
+ if !ok {
+ return fmt.Errorf("unexpected type received: %T", value)
+ }
+
+ return json.Unmarshal(d, job)
+}
+
+func (job ReplicationJob) Value() (driver.Value, error) {
+ return json.Marshal(job)
+}
+
+// ReplicationEvent is a persistent representation of the replication event.
+type ReplicationEvent struct {
+ ID uint64
+ State string
+ Attempt int
+ LockID string
+ CreatedAt time.Time
+ UpdatedAt *time.Time
+ Job ReplicationJob
+}
+
+// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
+func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) {
+ var mapping []interface{}
+ for _, column := range columns {
+ switch column {
+ case "id":
+ mapping = append(mapping, &event.ID)
+ case "state":
+ mapping = append(mapping, &event.State)
+ case "created_at":
+ mapping = append(mapping, &event.CreatedAt)
+ case "updated_at":
+ mapping = append(mapping, &event.UpdatedAt)
+ case "attempt":
+ mapping = append(mapping, &event.Attempt)
+ case "lock_id":
+ mapping = append(mapping, &event.LockID)
+ case "job":
+ mapping = append(mapping, &event.Job)
+ default:
+ return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column)
+ }
+ }
+ return mapping, nil
+}
+
+// Scan fills receive fields with values fetched from database based on the set of columns/column aliases.
+func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error {
+ mappings, err := event.Mapping(columns)
+ if err != nil {
+ return err
+ }
+ return rows.Scan(mappings...)
+}
+
+// ScanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases.
+func ScanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) {
+ columns, err := rows.Columns()
+ if err != nil {
+ return events, err
+ }
+
+ defer func() {
+ if cErr := rows.Close(); cErr != nil && err == nil {
+ err = cErr
+ }
+ }()
+
+ for rows.Next() {
+ var event ReplicationEvent
+ if err = event.Scan(columns, rows); err != nil {
+ return events, err
+ }
+ events = append(events, event)
+ }
+
+ return events, rows.Err()
+}
+
+// interface implementation protection
+var _ ReplicationEventQueue = PostgresReplicationEventQueue{}
+
+// PostgresReplicationEventQueue is a Postgres implementation of persistent queue.
+type PostgresReplicationEventQueue struct {
+ qc glsql.Querier
+}
+
+func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
+ query := `
+WITH insert_lock AS (
+ INSERT INTO gitaly_replication_queue_lock(id)
+ VALUES ($1 || '|' || $2)
+ ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
+ RETURNING id
+)
+INSERT INTO gitaly_replication_queue(lock_id, job)
+SELECT insert_lock.id, $3
+FROM insert_lock
+RETURNING id, state, created_at, updated_at, lock_id, attempt, job`
+ // this will always return a single row result (because of lock uniqueness) or an error
+ rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job)
+ if err != nil {
+ return ReplicationEvent{}, err
+ }
+
+ events, err := ScanReplicationEvents(rows)
+ if err != nil {
+ return ReplicationEvent{}, err
+ }
+
+ return events[0], nil
+}
+
+func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) {
+ query := `
+WITH to_lock AS (
+ SELECT id
+ FROM gitaly_replication_queue_lock AS repo_lock
+ WHERE repo_lock.acquired = FALSE AND repo_lock.id IN (
+ SELECT lock_id
+ FROM gitaly_replication_queue
+ WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
+ ORDER BY created_at
+ LIMIT $2 FOR UPDATE
+ )
+ FOR UPDATE SKIP LOCKED
+)
+, jobs AS (
+ UPDATE gitaly_replication_queue AS queue
+ SET attempt = queue.attempt - 1,
+ state = 'in_progress',
+ updated_at = NOW() AT TIME ZONE 'UTC'
+ FROM to_lock
+ WHERE queue.lock_id IN (SELECT id FROM to_lock)
+ AND state NOT IN ('in_progress', 'cancelled', 'completed')
+ AND queue.id IN (
+ SELECT id
+ FROM gitaly_replication_queue
+ WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
+ ORDER BY created_at
+ LIMIT $2
+ )
+ RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job
+)
+, track_job_lock AS (
+ INSERT INTO gitaly_replication_queue_job_lock (job_id, lock_id, triggered_at)
+ SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs
+ RETURNING lock_id
+)
+, do_lock AS (
+ UPDATE gitaly_replication_queue_lock
+ SET acquired = TRUE
+ WHERE id IN (SELECT lock_id FROM track_job_lock)
+)
+SELECT id, state, created_at, updated_at, lock_id, attempt, job
+FROM jobs
+ORDER BY id
+`
+ rows, err := rq.qc.QueryContext(ctx, query, nodeStorage, count)
+ if err != nil {
+ return nil, err
+ }
+
+ return ScanReplicationEvents(rows)
+}
+
+// Acknowledge updates previously dequeued events with new state releasing resources acquired for it.
+// It only updates events that are in 'in_progress' state.
+// It returns list of ids that was actually acknowledged.
+func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state string, ids []uint64) ([]uint64, error) {
+ if len(ids) == 0 {
+ return nil, nil
+ }
+
+ params := glsql.NewParamsAssembler()
+ query := `
+WITH existing AS (
+ SELECT id, lock_id
+ FROM gitaly_replication_queue
+ WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `)
+ AND state = 'in_progress'
+ FOR UPDATE
+)
+, to_release AS (
+ UPDATE gitaly_replication_queue AS queue
+ SET state = ` + params.AddParam(state) + `
+ FROM existing
+ WHERE existing.id = queue.id
+ RETURNING queue.id, queue.lock_id
+)
+, removed_job_lock AS (
+ DELETE FROM gitaly_replication_queue_job_lock AS job_lock
+ USING to_release AS job_failed
+ WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id
+ RETURNING job_failed.lock_id
+)
+, release AS (
+ UPDATE gitaly_replication_queue_lock
+ SET acquired = FALSE
+ WHERE id IN (
+ SELECT existing.lock_id
+ FROM (
+ SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id
+ ) AS removed
+ JOIN (
+ SELECT lock_id, COUNT(*) AS amount
+ FROM gitaly_replication_queue_job_lock
+ WHERE lock_id IN (SELECT lock_id FROM removed_job_lock)
+ GROUP BY lock_id
+ ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
+ )
+)
+SELECT id
+FROM existing
+`
+ rows, err := rq.qc.QueryContext(ctx, query, params.Params()...)
+ if err != nil {
+ return nil, err
+ }
+
+ var acknowledged glsql.Uint64Provider
+ if err := glsql.ScanAll(rows, &acknowledged); err != nil {
+ return nil, err
+ }
+
+ return acknowledged.Values(), nil
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
new file mode 100644
index 000000000..3294ec68a
--- /dev/null
+++ b/internal/praefect/datastore/queue_test.go
@@ -0,0 +1,622 @@
+// +build postgres
+
+package datastore
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ actualEvent, err := queue.Enqueue(ctx, eventType) // initial event
+ require.NoError(t, err)
+ actualEvent.CreatedAt = time.Time{} // we need to setup it to default because it is not possible to get it beforehand for expected
+
+ expLock := LockRow{ID: "gitaly-1|/project/path-1", Acquired: false}
+
+ expEvent := ReplicationEvent{
+ ID: 1,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ require.Equal(t, expEvent, actualEvent)
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent})
+ requireLocks(t, ctx, db, []LockRow{expLock}) // expected a new lock for new event
+ db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0)
+}
+
+func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: RenameRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "",
+ Params: Params{"RelativePath": "/project/path-1-renamed"},
+ },
+ }
+
+ eventType3 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ event1, err := queue.Enqueue(ctx, eventType1) // initial event
+ require.NoError(t, err)
+
+ expLock1 := LockRow{ID: "gitaly-1|/project/path-1", Acquired: false}
+ expLock2 := LockRow{ID: "gitaly-2|/project/path-1", Acquired: false}
+ expLock3 := LockRow{ID: "gitaly-1|/project/path-2", Acquired: false}
+
+ expEvent1 := ReplicationEvent{
+ ID: event1.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1})
+ requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event
+ db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0)
+
+ event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event
+ require.NoError(t, err)
+
+ expEvent2 := ReplicationEvent{
+ ID: event2.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
+ requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event
+
+ event3, err := queue.Enqueue(ctx, eventType2) // event for another target
+ require.NoError(t, err)
+
+ expEvent3 := ReplicationEvent{
+ ID: event3.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-2|/project/path-1",
+ Job: ReplicationJob{
+ Change: "rename",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "",
+ Params: Params{"RelativePath": "/project/path-1-renamed"},
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
+ requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event
+
+ event4, err := queue.Enqueue(ctx, eventType3) // event for another repo
+ require.NoError(t, err)
+
+ expEvent4 := ReplicationEvent{
+ ID: event4.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-2",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4})
+ requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo
+
+ db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) // there is no fetches it must be empty
+}
+
+func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ event := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ event, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err, "failed to fill in event queue")
+
+ noEvents, err := queue.Dequeue(ctx, "not existing storage", 5)
+ require.NoError(t, err)
+ require.Len(t, noEvents, 0, "there must be no events dequeued for not existing storage")
+
+ expectedEvent := event
+ expectedEvent.State = "in_progress"
+ expectedEvent.Attempt = 2
+
+ expectedLock := LockRow{ID: event.LockID, Acquired: true} // as we deque events we acquire lock for processing
+
+ expectedJobLock := JobLockRow{JobID: event.ID, LockID: event.LockID} // and there is a track if job is under processing in separate table
+
+ actual, err := queue.Dequeue(ctx, event.Job.TargetNodeStorage, 5)
+ require.NoError(t, err)
+
+ for i := range actual {
+ actual[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ }
+ require.Equal(t, []ReplicationEvent{expectedEvent}, actual)
+
+ // there is only one single lock for all fetched events
+ requireLocks(t, ctx, db, []LockRow{expectedLock})
+ requireJobLocks(t, ctx, db, []JobLockRow{expectedJobLock})
+}
+
+// expected results are listed as literals on purpose to be more explicit about what is going on with data
+func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: DeleteRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "",
+ Params: nil,
+ },
+ }
+
+ eventType3 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: RenameRepo.String(),
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: Params{"RelativePath": "/project/path-2-renamed"},
+ },
+ }
+
+ events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3} // events to fill in the queue
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ // first request to deque
+ const limitFirstN = 3 // limit on the amount of jobs we are gonna to deque
+
+ expectedEvents1 := make([]ReplicationEvent, limitFirstN)
+ expectedJobLocks1 := make([]JobLockRow, limitFirstN)
+ for i := range expectedEvents1 {
+ expectedEvents1[i] = events[i]
+ expectedEvents1[i].State = "in_progress"
+ expectedEvents1[i].Attempt = 2
+
+ expectedJobLocks1[i].JobID = expectedEvents1[i].ID
+ expectedJobLocks1[i].LockID = "gitaly-1|/project/path-1"
+ }
+
+ // we expect only first two types of events by limiting count to 3
+ dequeuedEvents1, err := queue.Dequeue(ctx, "gitaly-1", limitFirstN)
+ require.NoError(t, err)
+ for i := range dequeuedEvents1 {
+ dequeuedEvents1[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ }
+ require.Equal(t, expectedEvents1, dequeuedEvents1)
+
+ requireLocks(t, ctx, db, []LockRow{
+ // there is only one single lock for all fetched events because of their 'repo' and 'target' combination
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, expectedJobLocks1)
+
+ // second request to deque
+
+ // there must be only last event fetched from the queue
+ expectedEvents2 := []ReplicationEvent{events[len(events)-1]}
+ expectedEvents2[0].State = "in_progress"
+ expectedEvents2[0].Attempt = 2
+
+ expectedJobLocks2 := []JobLockRow{{JobID: 5, LockID: "gitaly-1|/project/path-2"}}
+
+ dequeuedEvents2, err := queue.Dequeue(ctx, "gitaly-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents2, 1, "only one event must be fetched from the queue")
+
+ dequeuedEvents2[0].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ require.Equal(t, expectedEvents2, dequeuedEvents2)
+
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, append(expectedJobLocks1, expectedJobLocks2...))
+
+ // this event wasn't not consumed by the first deque because of the limit
+ // it is also wasn't consumed by the second deque because there is already a lock acquired for this type of event
+ remainingEvents := []ReplicationEvent{events[3]}
+ expectedEvents := append(append(expectedEvents1, remainingEvents...), expectedEvents2...)
+ requireEvents(t, ctx, db, expectedEvents)
+}
+
+func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ event := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ event, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err, "failed to fill in event queue")
+
+ actual, err := queue.Dequeue(ctx, event.Job.TargetNodeStorage, 100)
+ require.NoError(t, err)
+
+ // as we deque events we acquire lock for processing
+ requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: true}})
+ requireJobLocks(t, ctx, db, []JobLockRow{{JobID: event.ID, LockID: event.LockID}})
+
+ acknowledged, err := queue.Acknowledge(ctx, "completed", []uint64{actual[0].ID, 100500})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{actual[0].ID}, acknowledged)
+
+ event.State = "completed"
+ event.Attempt = 2
+ requireEvents(t, ctx, db, []ReplicationEvent{event})
+ // lock must be released as the event was acknowledged and there are no other events left protected under this lock
+ requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: false}})
+ // all associated with acknowledged event tracking bindings between lock and event must be removed
+ requireJobLocks(t, ctx, db, nil)
+}
+
+func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: DeleteRepo.String(),
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "",
+ Params: nil,
+ },
+ }
+
+ eventType3 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-3",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType4 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ // we expect only first two types of events by limiting count to 3
+ dequeuedEvents1, err := queue.Dequeue(ctx, "gitaly-1", 3)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents1, 3)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 3, LockID: "gitaly-1|/project/path-2"},
+ })
+
+ // release lock for events of second type
+ acknowledge1, err := queue.Acknowledge(ctx, "failed", []uint64{3})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{3}, acknowledge1)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ })
+
+ dequeuedEvents2, err := queue.Dequeue(ctx, "gitaly-1", 3)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents2, 2, "expected: event of type 3 and of type 2 ('failed' will be fetched for retry)")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: true},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 5, LockID: "gitaly-1|/project/path-3"},
+ {JobID: 3, LockID: "gitaly-1|/project/path-2"},
+ })
+
+ acknowledge2, err := queue.Acknowledge(ctx, "completed", []uint64{1, 3})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{1, 3}, acknowledge2)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: true},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 5, LockID: "gitaly-1|/project/path-3"},
+ })
+
+ dequeuedEvents3, err := queue.Dequeue(ctx, "gitaly-2", 3)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: true},
+ {ID: "gitaly-2|/project/path-1", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 5, LockID: "gitaly-1|/project/path-3"},
+ {JobID: 7, LockID: "gitaly-2|/project/path-1"},
+ })
+
+ acknowledged3, err := queue.Acknowledge(ctx, "completed", []uint64{2, 5, 7})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{2, 5, 7}, acknowledged3)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: false},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, nil)
+
+ dequeuedEvents4, err := queue.Dequeue(ctx, "gitaly-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents4, 2, "expected: event of type 4")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 4, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 6, LockID: "gitaly-1|/project/path-2"},
+ })
+
+ newEvent, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err)
+
+ acknowledge4, err := queue.Acknowledge(ctx, "completed", []uint64{newEvent.ID})
+ require.NoError(t, err)
+ require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged
+ var newEventState string
+ require.NoError(t, db.QueryRow("SELECT state FROM gitaly_replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState))
+ require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 4, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 6, LockID: "gitaly-1|/project/path-2"},
+ })
+}
+
+func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
+ t.Helper()
+
+ // as it is not possible to expect exact time of entity creation/update we do not fetch it from database
+ // and we do not take it into account from expected values.
+ exp := make([]ReplicationEvent, len(expected)) // make a copy to avoid side effects for passed values
+ for i, e := range expected {
+ exp[i] = e
+ // set to default values as they would not be initialized from database
+ exp[i].CreatedAt = time.Time{}
+ exp[i].UpdatedAt = nil
+ }
+
+ sqlStmt := `SELECT id, state, attempt, lock_id, job FROM gitaly_replication_queue ORDER BY id`
+ rows, err := db.QueryContext(ctx, sqlStmt)
+ require.NoError(t, err)
+
+ actual, err := ScanReplicationEvents(rows)
+ require.NoError(t, err)
+ require.Equal(t, exp, actual)
+}
+
+// LockRow exists only for testing purposes and represents entries from gitaly_replication_queue_lock table.
+type LockRow struct {
+ ID string
+ Acquired bool
+}
+
+func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) {
+ t.Helper()
+
+ sqlStmt := `SELECT id, acquired FROM gitaly_replication_queue_lock`
+ rows, err := db.QueryContext(ctx, sqlStmt)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
+
+ var actual []LockRow
+ for rows.Next() {
+ var entry LockRow
+ require.NoError(t, rows.Scan(&entry.ID, &entry.Acquired), "failed to scan entry")
+ actual = append(actual, entry)
+ }
+ require.NoError(t, rows.Err(), "completion of result loop scan")
+ require.ElementsMatch(t, expected, actual)
+}
+
+// JobLockRow exists only for testing purposes and represents entries from gitaly_replication_queue_job_lock table.
+type JobLockRow struct {
+ JobID uint64
+ LockID string
+ TriggeredAt time.Time
+}
+
+func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) {
+ t.Helper()
+
+ sqlStmt := `SELECT job_id, lock_id FROM gitaly_replication_queue_job_lock ORDER BY triggered_at`
+ rows, err := db.QueryContext(ctx, sqlStmt)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
+
+ var actual []JobLockRow
+ for rows.Next() {
+ var entry JobLockRow
+ require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID), "failed to scan entry")
+ actual = append(actual, entry)
+ }
+ require.NoError(t, rows.Err(), "completion of result loop scan")
+ require.ElementsMatch(t, expected, actual)
+}