Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-03-10 20:00:05 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-03-10 20:00:05 +0300
commitaa910b0b6fbc3f005a9924dee731123dfca2e48d (patch)
tree07168948db8eba1b6833860b047d5fbba66b3c62
parentca611acce3b696423c6693fa7f5833703a08975f (diff)
parentc5965df04cf581c2e11fa0dba9bbf56fe40e4282 (diff)
Merge branch 'ps-persistent-queue' into 'master'
Praefect: Move replication queue to database See merge request gitlab-org/gitaly!1865
-rw-r--r--_support/Makefile.template2
-rw-r--r--changelogs/unreleased/ps-persistent-queue.yml5
-rw-r--r--internal/praefect/datastore/datastore.go16
-rw-r--r--internal/praefect/datastore/glsql/init_test.go2
-rw-r--r--internal/praefect/datastore/glsql/postgres.go120
-rw-r--r--internal/praefect/datastore/glsql/postgres_test.go99
-rw-r--r--internal/praefect/datastore/glsql/testing.go36
-rw-r--r--internal/praefect/datastore/glsql/testing_test.go2
-rw-r--r--internal/praefect/datastore/init_test.go22
-rw-r--r--internal/praefect/datastore/migrations/20200224220728_job_queue.go46
-rw-r--r--internal/praefect/datastore/queue.go265
-rw-r--r--internal/praefect/datastore/queue_test.go622
12 files changed, 1218 insertions, 19 deletions
diff --git a/_support/Makefile.template b/_support/Makefile.template
index 541b69a4d..172f3dabe 100644
--- a/_support/Makefile.template
+++ b/_support/Makefile.template
@@ -151,7 +151,7 @@ rspec-gitlab-shell: {{ .GitlabShellDir }}/config.yml assemble-go prepare-tests
.PHONY: test-postgres
test-postgres: prepare-tests
- @cd {{ .SourceDir }} && go test -tags postgres gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/...
+ @cd {{ .SourceDir }} && go test -tags postgres -count=1 gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/...
.PHONY: verify
verify: check-mod-tidy check-formatting notice-up-to-date check-proto rubocop
diff --git a/changelogs/unreleased/ps-persistent-queue.yml b/changelogs/unreleased/ps-persistent-queue.yml
new file mode 100644
index 000000000..48fbe24de
--- /dev/null
+++ b/changelogs/unreleased/ps-persistent-queue.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: Move replication queue to database'
+merge_request: 1865
+author:
+type: added
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 5f950e19e..a16743a13 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -55,7 +55,21 @@ const (
RenameRepo
)
-// Params represent additional set of parameters required for replication job.
+func (ct ChangeType) String() string {
+ switch ct {
+ case UpdateRepo:
+ return "update"
+ case DeleteRepo:
+ return "delete"
+ case RenameRepo:
+ return "rename"
+ default:
+ return "UNDEFINED"
+ }
+}
+
+// Params represent additional information required to process event after fetching it from storage.
+// It must be JSON encodable/decodable to persist it without problems.
type Params map[string]interface{}
// ReplJob is an instance of a queued replication job. A replication job is
diff --git a/internal/praefect/datastore/glsql/init_test.go b/internal/praefect/datastore/glsql/init_test.go
index 34b024451..8423b6205 100644
--- a/internal/praefect/datastore/glsql/init_test.go
+++ b/internal/praefect/datastore/glsql/init_test.go
@@ -16,3 +16,5 @@ func TestMain(m *testing.M) {
}
os.Exit(code)
}
+
+func getDB(t testing.TB) DB { return GetDB(t, "glsql") }
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 34250241b..455282af6 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -4,6 +4,8 @@ package glsql
import (
"context"
"database/sql"
+ "strconv"
+ "strings"
// Blank import to enable integration of github.com/lib/pq into database/sql
_ "github.com/lib/pq"
@@ -33,6 +35,12 @@ func Migrate(db *sql.DB) (int, error) {
return migrate.Exec(db, "postgres", migrationSource, migrate.Up)
}
+// Querier is an abstraction on *sql.DB and *sql.Tx that allows to use their methods without awareness about actual type.
+type Querier interface {
+ QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
+ QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+}
+
// TxQuery runs operations inside transaction and commits|rollbacks on Done.
type TxQuery interface {
// Exec calls op function with provided ctx.
@@ -100,3 +108,115 @@ func (txq *txQuery) log(err error, msg string) {
txq.logger.WithError(err).Error(msg)
}
}
+
+// Uint64sToInterfaces converts list of uint64 values to the list of empty interfaces.
+func Uint64sToInterfaces(vs ...uint64) []interface{} {
+ if vs == nil {
+ return nil
+ }
+
+ rs := make([]interface{}, len(vs))
+ for i, v := range vs {
+ rs[i] = v
+ }
+ return rs
+}
+
+// GeneratePlaceholders returns string with 'count' placeholders starting from 'start' index.
+// 1 will be used if provided value for 'start' is less then 1.
+// 1 will be used if provided value for 'count' is less then 1.
+func GeneratePlaceholders(start, count int) string {
+ if start < 1 {
+ start = 1
+ }
+
+ if count <= 1 {
+ return "$" + strconv.Itoa(start)
+ }
+
+ var builder = strings.Builder{}
+ for i := start; i < start+count; i++ {
+ if i != start {
+ builder.WriteString(",")
+ }
+ builder.WriteString("$")
+ builder.WriteString(strconv.Itoa(i))
+ }
+ return builder.String()
+}
+
+// NewParamsAssembler returns
+func NewParamsAssembler() *ParamsAssembler {
+ return &ParamsAssembler{}
+}
+
+// ParamsAssembler helps to assemble parameters of the query together providing placeholders that must be used in query.
+type ParamsAssembler []interface{}
+
+// AddParams receives n params and assemble them with other params and returns generated placeholder as a result.
+func (pm *ParamsAssembler) AddParams(params []interface{}) string {
+ start := len(*pm)
+ *pm = append(*pm, params...)
+ return GeneratePlaceholders(start+1, len(params))
+}
+
+// AddParam receives param and assemble it with other params and returns generated placeholder as a result.
+func (pm *ParamsAssembler) AddParam(param interface{}) string {
+ return pm.AddParams([]interface{}{param})
+}
+
+// Params returns list of previously assembled parameters.
+func (pm *ParamsAssembler) Params() []interface{} {
+ if pm != nil {
+ return *pm
+ }
+ return nil
+}
+
+// DestProvider returns list of pointers that will be used to scan values into.
+type DestProvider interface {
+ // To returns list of pointers.
+ // It is not an idempotent operation and each call will return a new list.
+ To() []interface{}
+}
+
+// ScanAll reads all data from 'rows' into holders provided by 'in'.
+// It will also 'Close' source after completion.
+func ScanAll(rows *sql.Rows, in DestProvider) (err error) {
+ defer func() {
+ if cErr := rows.Close(); cErr != nil && err == nil {
+ err = cErr
+ }
+ }()
+
+ for rows.Next() {
+ if err = rows.Scan(in.To()...); err != nil {
+ return err
+ }
+ }
+ err = rows.Err()
+ return err
+}
+
+// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result aa a slice.
+type Uint64Provider []*uint64
+
+// Values returns list of values read from *sql.Rows
+func (p *Uint64Provider) Values() []uint64 {
+ if len(*p) == 0 {
+ return nil
+ }
+
+ r := make([]uint64, len(*p))
+ for i, v := range *p {
+ r[i] = *v
+ }
+ return r
+}
+
+// To returns a list of pointers that will be used as a destination for scan operation.
+func (p *Uint64Provider) To() []interface{} {
+ var d = new(uint64)
+ *p = append(*p, d)
+ return []interface{}{d}
+}
diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go
index 54fcba8ae..56a2585e3 100644
--- a/internal/praefect/datastore/glsql/postgres_test.go
+++ b/internal/praefect/datastore/glsql/postgres_test.go
@@ -48,7 +48,7 @@ func TestOpenDB(t *testing.T) {
}
func TestTxQuery_MultipleOperationsSuccess(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -78,7 +78,7 @@ func TestTxQuery_MultipleOperationsSuccess(t *testing.T) {
}
func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -120,7 +120,7 @@ func TestTxQuery_FailedOperationInTheMiddle(t *testing.T) {
}
func TestTxQuery_ContextHandled(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
@@ -157,7 +157,7 @@ func TestTxQuery_ContextHandled(t *testing.T) {
}
func TestTxQuery_FailedToCommit(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -191,7 +191,7 @@ func TestTxQuery_FailedToCommit(t *testing.T) {
}
func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -239,7 +239,7 @@ func TestTxQuery_FailedToRollbackWithFailedOperation(t *testing.T) {
}
func TestTxQuery_FailedToCommitWithFailedOperation(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
defer createBasicTable(t, db, "work_unit_test")()
ctx, cancel := testhelper.Context()
@@ -297,3 +297,90 @@ func createBasicTable(t *testing.T, db DB, tname string) func() {
require.NoError(t, err)
}
}
+
+func TestUint64sToInterfaces(t *testing.T) {
+ for _, tc := range []struct {
+ From []uint64
+ Exp []interface{}
+ }{
+ {From: nil, Exp: nil},
+ {From: []uint64{1}, Exp: []interface{}{uint64(1)}},
+ {From: []uint64{2, 3, 0}, Exp: []interface{}{uint64(2), uint64(3), uint64(0)}},
+ } {
+ t.Run("", func(t *testing.T) {
+ require.Equal(t, tc.Exp, Uint64sToInterfaces(tc.From...))
+ })
+ }
+}
+
+func TestUint64Provider(t *testing.T) {
+ var provider Uint64Provider
+
+ dst1 := provider.To()
+ require.Equal(t, []interface{}{new(uint64)}, dst1, "must be a single value holder")
+ val1 := dst1[0].(*uint64)
+ *val1 = uint64(100)
+
+ dst2 := provider.To()
+ require.Equal(t, []interface{}{new(uint64)}, dst2, "must be a single value holder")
+ val2 := dst2[0].(*uint64)
+ *val2 = uint64(200)
+
+ require.Equal(t, []uint64{100, 200}, provider.Values())
+
+ dst3 := provider.To()
+ val3 := dst3[0].(*uint64)
+ *val3 = uint64(300)
+
+ require.Equal(t, []uint64{100, 200, 300}, provider.Values())
+}
+
+func TestParamsAssembler(t *testing.T) {
+ assembler := NewParamsAssembler()
+
+ require.Equal(t, "$1", assembler.AddParam(1))
+ require.Equal(t, []interface{}{1}, assembler.Params())
+
+ require.Equal(t, "$2", assembler.AddParam('a'))
+ require.Equal(t, []interface{}{1, 'a'}, assembler.Params())
+
+ require.Equal(t, "$3,$4", assembler.AddParams([]interface{}{"b", uint64(4)}))
+ require.Equal(t, []interface{}{1, 'a', "b", uint64(4)}, assembler.Params())
+}
+
+func TestGeneratePlaceholders(t *testing.T) {
+ for _, tc := range []struct {
+ Start, Count int
+ Exp string
+ }{
+ {Start: -1, Count: -1, Exp: "$1"},
+ {Start: 0, Count: -1, Exp: "$1"},
+ {Start: 0, Count: 0, Exp: "$1"},
+ {Start: 1, Count: 0, Exp: "$1"},
+ {Start: 1, Count: 1, Exp: "$1"},
+ {Start: 5, Count: 3, Exp: "$5,$6,$7"},
+ {Start: 5, Count: -1, Exp: "$5"},
+ } {
+ t.Run("", func(t *testing.T) {
+ require.Equal(t, tc.Exp, GeneratePlaceholders(tc.Start, tc.Count))
+ })
+ }
+}
+
+func TestScanAll(t *testing.T) {
+ db := getDB(t)
+
+ var ids Uint64Provider
+ notEmptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id)")
+ require.NoError(t, err)
+
+ require.NoError(t, ScanAll(notEmptyRows, &ids))
+ require.Equal(t, []uint64{1, 200, 300500}, ids.Values())
+
+ var nothing Uint64Provider
+ emptyRows, err := db.Query("SELECT id FROM (VALUES (1), (200), (300500)) AS t(id) WHERE id < 0")
+ require.NoError(t, err)
+
+ require.NoError(t, ScanAll(emptyRows, &nothing))
+ require.Equal(t, ([]uint64)(nil), nothing.Values())
+}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index f05fd7051..eceddf891 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -31,7 +31,7 @@ type DB struct {
func (db DB) Truncate(t testing.TB, tables ...string) {
t.Helper()
- tmpl := strings.Repeat("TRUNCATE TABLE %q RESTART IDENTITY;\n", len(tables))
+ tmpl := strings.Repeat("TRUNCATE TABLE %q RESTART IDENTITY CASCADE;\n", len(tables))
params := make([]interface{}, len(tables))
for i, table := range tables {
params[i] = table
@@ -49,6 +49,14 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
}
+func (db DB) TruncateAll(t testing.TB) {
+ db.Truncate(t,
+ "gitaly_replication_queue_job_lock",
+ "gitaly_replication_queue",
+ "gitaly_replication_queue_lock",
+ )
+}
+
// Close removes schema if it was used and releases connection pool.
func (db DB) Close() error {
if err := db.DB.Close(); err != nil {
@@ -60,25 +68,29 @@ func (db DB) Close() error {
// GetDB returns a wrapper around the database connection pool.
// Must be used only for testing.
// The new database 'gitaly_test' will be re-created for each package that uses this function.
-// The best place to call it is in individual testing functions
+// Each call will also truncate all tables with their identities restarted if any.
+// The best place to call it is in individual testing functions.
// It uses env vars:
// PGHOST - required, URL/socket/dir
// PGPORT - required, binding port
// PGUSER - optional, user - `$ whoami` would be used if not provided
-func GetDB(t testing.TB) DB {
+func GetDB(t testing.TB, database string) DB {
t.Helper()
testDBInitOnce.Do(func() {
- sqlDB := initGitalyTestDB(t)
+ sqlDB := initGitalyTestDB(t, database)
_, mErr := Migrate(sqlDB)
require.NoError(t, mErr, "failed to run database migration")
testDB = DB{DB: sqlDB}
})
+
+ testDB.TruncateAll(t)
+
return testDB
}
-func initGitalyTestDB(t testing.TB) *sql.DB {
+func initGitalyTestDB(t testing.TB, database string) *sql.DB {
t.Helper()
getEnvFromGDK(t)
@@ -104,17 +116,21 @@ func initGitalyTestDB(t testing.TB) *sql.DB {
require.NoError(t, oErr, "failed to connect to 'postgres' database")
defer func() { require.NoError(t, postgresDB.Close()) }()
- _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS gitaly_test")
+ rows, tErr := postgresDB.Query("SELECT PG_TERMINATE_BACKEND(pid) FROM PG_STAT_ACTIVITY WHERE datname = '" + database + "'")
+ require.NoError(t, tErr)
+ require.NoError(t, rows.Close())
+
+ _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS " + database)
require.NoError(t, dErr, "failed to drop 'gitaly_test' database")
- _, cErr := postgresDB.Exec("CREATE DATABASE gitaly_test WITH ENCODING 'UTF8'")
- require.NoError(t, cErr, "failed to create 'gitaly_test' database")
+ _, cErr := postgresDB.Exec("CREATE DATABASE " + database + " WITH ENCODING 'UTF8'")
+ require.NoErrorf(t, cErr, "failed to create %q database", database)
require.NoError(t, postgresDB.Close(), "error on closing connection to 'postgres' database")
// connect to the testing database
- dbCfg.DBName = "gitaly_test"
+ dbCfg.DBName = database
gitalyTestDB, err := OpenDB(dbCfg)
- require.NoError(t, err, "failed to connect to 'gitaly_test' database")
+ require.NoErrorf(t, err, "failed to connect to %q database", database)
return gitalyTestDB
}
diff --git a/internal/praefect/datastore/glsql/testing_test.go b/internal/praefect/datastore/glsql/testing_test.go
index ad576d1ba..6d1035bba 100644
--- a/internal/praefect/datastore/glsql/testing_test.go
+++ b/internal/praefect/datastore/glsql/testing_test.go
@@ -9,7 +9,7 @@ import (
)
func TestDB_Truncate(t *testing.T) {
- db := GetDB(t)
+ db := getDB(t)
_, err := db.Exec("CREATE TABLE truncate_tbl(id BIGSERIAL PRIMARY KEY)")
require.NoError(t, err)
diff --git a/internal/praefect/datastore/init_test.go b/internal/praefect/datastore/init_test.go
new file mode 100644
index 000000000..bfbffa166
--- /dev/null
+++ b/internal/praefect/datastore/init_test.go
@@ -0,0 +1,22 @@
+// +build postgres
+
+package datastore
+
+import (
+ "log"
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+func TestMain(m *testing.M) {
+ code := m.Run()
+ // Clean closes connection to database once all tests are done
+ if err := glsql.Clean(); err != nil {
+ log.Fatalln(err, "database disconnection failure")
+ }
+ os.Exit(code)
+}
+
+func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t, "datastore") }
diff --git a/internal/praefect/datastore/migrations/20200224220728_job_queue.go b/internal/praefect/datastore/migrations/20200224220728_job_queue.go
new file mode 100644
index 000000000..e63116354
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200224220728_job_queue.go
@@ -0,0 +1,46 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200224220728_job_queue",
+ Up: []string{`
+CREATE TYPE GITALY_REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed')
+`, `
+CREATE TABLE gitaly_replication_queue_lock
+(
+ id TEXT PRIMARY KEY
+ , acquired BOOLEAN NOT NULL DEFAULT FALSE
+)
+`, `
+CREATE TABLE gitaly_replication_queue
+(
+ id BIGSERIAL PRIMARY KEY
+ , state GITALY_REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready'
+ , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
+ , updated_at TIMESTAMP WITHOUT TIME ZONE
+ , attempt INTEGER NOT NULL DEFAULT 3
+ , lock_id TEXT
+ , job JSONB
+)`, `
+CREATE TABLE gitaly_replication_queue_job_lock
+(
+ job_id BIGINT REFERENCES gitaly_replication_queue(id)
+ , lock_id TEXT REFERENCES gitaly_replication_queue_lock(id)
+ , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
+ , CONSTRAINT gitaly_replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id)
+)`,
+ },
+ Down: []string{`
+DROP TABLE IF EXISTS gitaly_replication_queue_job_lock CASCADE
+`, `
+DROP TABLE IF EXISTS gitaly_replication_queue CASCADE
+`, `
+DROP TABLE IF EXISTS gitaly_replication_queue_lock CASCADE
+`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
new file mode 100644
index 000000000..db0c0edf4
--- /dev/null
+++ b/internal/praefect/datastore/queue.go
@@ -0,0 +1,265 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "database/sql/driver"
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
+type ReplicationEventQueue interface {
+ // Enqueue puts provided event into the persistent queue.
+ Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
+ // Dequeue retrieves events from the persistent queue using provided limitations and filters.
+ Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error)
+}
+
+// ReplicationJob is a persistent representation of the replication job.
+type ReplicationJob struct {
+ Change string `json:"change"`
+ RelativePath string `json:"relative_path"`
+ TargetNodeStorage string `json:"target_node_storage"`
+ SourceNodeStorage string `json:"source_node_storage"`
+ Params Params `json:"params"`
+}
+
+func (job *ReplicationJob) Scan(value interface{}) error {
+ if value == nil {
+ return nil
+ }
+
+ d, ok := value.([]byte)
+ if !ok {
+ return fmt.Errorf("unexpected type received: %T", value)
+ }
+
+ return json.Unmarshal(d, job)
+}
+
+func (job ReplicationJob) Value() (driver.Value, error) {
+ return json.Marshal(job)
+}
+
+// ReplicationEvent is a persistent representation of the replication event.
+type ReplicationEvent struct {
+ ID uint64
+ State string
+ Attempt int
+ LockID string
+ CreatedAt time.Time
+ UpdatedAt *time.Time
+ Job ReplicationJob
+}
+
+// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
+func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) {
+ var mapping []interface{}
+ for _, column := range columns {
+ switch column {
+ case "id":
+ mapping = append(mapping, &event.ID)
+ case "state":
+ mapping = append(mapping, &event.State)
+ case "created_at":
+ mapping = append(mapping, &event.CreatedAt)
+ case "updated_at":
+ mapping = append(mapping, &event.UpdatedAt)
+ case "attempt":
+ mapping = append(mapping, &event.Attempt)
+ case "lock_id":
+ mapping = append(mapping, &event.LockID)
+ case "job":
+ mapping = append(mapping, &event.Job)
+ default:
+ return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column)
+ }
+ }
+ return mapping, nil
+}
+
+// Scan fills receive fields with values fetched from database based on the set of columns/column aliases.
+func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error {
+ mappings, err := event.Mapping(columns)
+ if err != nil {
+ return err
+ }
+ return rows.Scan(mappings...)
+}
+
+// ScanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases.
+func ScanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) {
+ columns, err := rows.Columns()
+ if err != nil {
+ return events, err
+ }
+
+ defer func() {
+ if cErr := rows.Close(); cErr != nil && err == nil {
+ err = cErr
+ }
+ }()
+
+ for rows.Next() {
+ var event ReplicationEvent
+ if err = event.Scan(columns, rows); err != nil {
+ return events, err
+ }
+ events = append(events, event)
+ }
+
+ return events, rows.Err()
+}
+
+// interface implementation protection
+var _ ReplicationEventQueue = PostgresReplicationEventQueue{}
+
+// PostgresReplicationEventQueue is a Postgres implementation of persistent queue.
+type PostgresReplicationEventQueue struct {
+ qc glsql.Querier
+}
+
+func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
+ query := `
+WITH insert_lock AS (
+ INSERT INTO gitaly_replication_queue_lock(id)
+ VALUES ($1 || '|' || $2)
+ ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
+ RETURNING id
+)
+INSERT INTO gitaly_replication_queue(lock_id, job)
+SELECT insert_lock.id, $3
+FROM insert_lock
+RETURNING id, state, created_at, updated_at, lock_id, attempt, job`
+ // this will always return a single row result (because of lock uniqueness) or an error
+ rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job)
+ if err != nil {
+ return ReplicationEvent{}, err
+ }
+
+ events, err := ScanReplicationEvents(rows)
+ if err != nil {
+ return ReplicationEvent{}, err
+ }
+
+ return events[0], nil
+}
+
+func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) {
+ query := `
+WITH to_lock AS (
+ SELECT id
+ FROM gitaly_replication_queue_lock AS repo_lock
+ WHERE repo_lock.acquired = FALSE AND repo_lock.id IN (
+ SELECT lock_id
+ FROM gitaly_replication_queue
+ WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
+ ORDER BY created_at
+ LIMIT $2 FOR UPDATE
+ )
+ FOR UPDATE SKIP LOCKED
+)
+, jobs AS (
+ UPDATE gitaly_replication_queue AS queue
+ SET attempt = queue.attempt - 1,
+ state = 'in_progress',
+ updated_at = NOW() AT TIME ZONE 'UTC'
+ FROM to_lock
+ WHERE queue.lock_id IN (SELECT id FROM to_lock)
+ AND state NOT IN ('in_progress', 'cancelled', 'completed')
+ AND queue.id IN (
+ SELECT id
+ FROM gitaly_replication_queue
+ WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
+ ORDER BY created_at
+ LIMIT $2
+ )
+ RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job
+)
+, track_job_lock AS (
+ INSERT INTO gitaly_replication_queue_job_lock (job_id, lock_id, triggered_at)
+ SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs
+ RETURNING lock_id
+)
+, do_lock AS (
+ UPDATE gitaly_replication_queue_lock
+ SET acquired = TRUE
+ WHERE id IN (SELECT lock_id FROM track_job_lock)
+)
+SELECT id, state, created_at, updated_at, lock_id, attempt, job
+FROM jobs
+ORDER BY id
+`
+ rows, err := rq.qc.QueryContext(ctx, query, nodeStorage, count)
+ if err != nil {
+ return nil, err
+ }
+
+ return ScanReplicationEvents(rows)
+}
+
+// Acknowledge updates previously dequeued events with new state releasing resources acquired for it.
+// It only updates events that are in 'in_progress' state.
+// It returns list of ids that was actually acknowledged.
+func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state string, ids []uint64) ([]uint64, error) {
+ if len(ids) == 0 {
+ return nil, nil
+ }
+
+ params := glsql.NewParamsAssembler()
+ query := `
+WITH existing AS (
+ SELECT id, lock_id
+ FROM gitaly_replication_queue
+ WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `)
+ AND state = 'in_progress'
+ FOR UPDATE
+)
+, to_release AS (
+ UPDATE gitaly_replication_queue AS queue
+ SET state = ` + params.AddParam(state) + `
+ FROM existing
+ WHERE existing.id = queue.id
+ RETURNING queue.id, queue.lock_id
+)
+, removed_job_lock AS (
+ DELETE FROM gitaly_replication_queue_job_lock AS job_lock
+ USING to_release AS job_failed
+ WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id
+ RETURNING job_failed.lock_id
+)
+, release AS (
+ UPDATE gitaly_replication_queue_lock
+ SET acquired = FALSE
+ WHERE id IN (
+ SELECT existing.lock_id
+ FROM (
+ SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id
+ ) AS removed
+ JOIN (
+ SELECT lock_id, COUNT(*) AS amount
+ FROM gitaly_replication_queue_job_lock
+ WHERE lock_id IN (SELECT lock_id FROM removed_job_lock)
+ GROUP BY lock_id
+ ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
+ )
+)
+SELECT id
+FROM existing
+`
+ rows, err := rq.qc.QueryContext(ctx, query, params.Params()...)
+ if err != nil {
+ return nil, err
+ }
+
+ var acknowledged glsql.Uint64Provider
+ if err := glsql.ScanAll(rows, &acknowledged); err != nil {
+ return nil, err
+ }
+
+ return acknowledged.Values(), nil
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
new file mode 100644
index 000000000..3294ec68a
--- /dev/null
+++ b/internal/praefect/datastore/queue_test.go
@@ -0,0 +1,622 @@
+// +build postgres
+
+package datastore
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ actualEvent, err := queue.Enqueue(ctx, eventType) // initial event
+ require.NoError(t, err)
+ actualEvent.CreatedAt = time.Time{} // we need to setup it to default because it is not possible to get it beforehand for expected
+
+ expLock := LockRow{ID: "gitaly-1|/project/path-1", Acquired: false}
+
+ expEvent := ReplicationEvent{
+ ID: 1,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ require.Equal(t, expEvent, actualEvent)
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent})
+ requireLocks(t, ctx, db, []LockRow{expLock}) // expected a new lock for new event
+ db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0)
+}
+
+func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: RenameRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "",
+ Params: Params{"RelativePath": "/project/path-1-renamed"},
+ },
+ }
+
+ eventType3 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ event1, err := queue.Enqueue(ctx, eventType1) // initial event
+ require.NoError(t, err)
+
+ expLock1 := LockRow{ID: "gitaly-1|/project/path-1", Acquired: false}
+ expLock2 := LockRow{ID: "gitaly-2|/project/path-1", Acquired: false}
+ expLock3 := LockRow{ID: "gitaly-1|/project/path-2", Acquired: false}
+
+ expEvent1 := ReplicationEvent{
+ ID: event1.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1})
+ requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event
+ db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0)
+
+ event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event
+ require.NoError(t, err)
+
+ expEvent2 := ReplicationEvent{
+ ID: event2.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
+ requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event
+
+ event3, err := queue.Enqueue(ctx, eventType2) // event for another target
+ require.NoError(t, err)
+
+ expEvent3 := ReplicationEvent{
+ ID: event3.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-2|/project/path-1",
+ Job: ReplicationJob{
+ Change: "rename",
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "",
+ Params: Params{"RelativePath": "/project/path-1-renamed"},
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
+ requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event
+
+ event4, err := queue.Enqueue(ctx, eventType3) // event for another repo
+ require.NoError(t, err)
+
+ expEvent4 := ReplicationEvent{
+ ID: event4.ID,
+ State: "ready",
+ Attempt: 3,
+ LockID: "gitaly-1|/project/path-2",
+ Job: ReplicationJob{
+ Change: "update",
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4})
+ requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo
+
+ db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) // there is no fetches it must be empty
+}
+
+func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ event := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ event, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err, "failed to fill in event queue")
+
+ noEvents, err := queue.Dequeue(ctx, "not existing storage", 5)
+ require.NoError(t, err)
+ require.Len(t, noEvents, 0, "there must be no events dequeued for not existing storage")
+
+ expectedEvent := event
+ expectedEvent.State = "in_progress"
+ expectedEvent.Attempt = 2
+
+ expectedLock := LockRow{ID: event.LockID, Acquired: true} // as we deque events we acquire lock for processing
+
+ expectedJobLock := JobLockRow{JobID: event.ID, LockID: event.LockID} // and there is a track if job is under processing in separate table
+
+ actual, err := queue.Dequeue(ctx, event.Job.TargetNodeStorage, 5)
+ require.NoError(t, err)
+
+ for i := range actual {
+ actual[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ }
+ require.Equal(t, []ReplicationEvent{expectedEvent}, actual)
+
+ // there is only one single lock for all fetched events
+ requireLocks(t, ctx, db, []LockRow{expectedLock})
+ requireJobLocks(t, ctx, db, []JobLockRow{expectedJobLock})
+}
+
+// expected results are listed as literals on purpose to be more explicit about what is going on with data
+func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: DeleteRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "",
+ Params: nil,
+ },
+ }
+
+ eventType3 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: RenameRepo.String(),
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: Params{"RelativePath": "/project/path-2-renamed"},
+ },
+ }
+
+ events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3} // events to fill in the queue
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ // first request to deque
+ const limitFirstN = 3 // limit on the amount of jobs we are gonna to deque
+
+ expectedEvents1 := make([]ReplicationEvent, limitFirstN)
+ expectedJobLocks1 := make([]JobLockRow, limitFirstN)
+ for i := range expectedEvents1 {
+ expectedEvents1[i] = events[i]
+ expectedEvents1[i].State = "in_progress"
+ expectedEvents1[i].Attempt = 2
+
+ expectedJobLocks1[i].JobID = expectedEvents1[i].ID
+ expectedJobLocks1[i].LockID = "gitaly-1|/project/path-1"
+ }
+
+ // we expect only first two types of events by limiting count to 3
+ dequeuedEvents1, err := queue.Dequeue(ctx, "gitaly-1", limitFirstN)
+ require.NoError(t, err)
+ for i := range dequeuedEvents1 {
+ dequeuedEvents1[i].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ }
+ require.Equal(t, expectedEvents1, dequeuedEvents1)
+
+ requireLocks(t, ctx, db, []LockRow{
+ // there is only one single lock for all fetched events because of their 'repo' and 'target' combination
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, expectedJobLocks1)
+
+ // second request to deque
+
+ // there must be only last event fetched from the queue
+ expectedEvents2 := []ReplicationEvent{events[len(events)-1]}
+ expectedEvents2[0].State = "in_progress"
+ expectedEvents2[0].Attempt = 2
+
+ expectedJobLocks2 := []JobLockRow{{JobID: 5, LockID: "gitaly-1|/project/path-2"}}
+
+ dequeuedEvents2, err := queue.Dequeue(ctx, "gitaly-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents2, 1, "only one event must be fetched from the queue")
+
+ dequeuedEvents2[0].UpdatedAt = nil // it is not possible to determine update_at value as it is generated on UPDATE in database
+ require.Equal(t, expectedEvents2, dequeuedEvents2)
+
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, append(expectedJobLocks1, expectedJobLocks2...))
+
+ // this event wasn't not consumed by the first deque because of the limit
+ // it is also wasn't consumed by the second deque because there is already a lock acquired for this type of event
+ remainingEvents := []ReplicationEvent{events[3]}
+ expectedEvents := append(append(expectedEvents1, remainingEvents...), expectedEvents2...)
+ requireEvents(t, ctx, db, expectedEvents)
+}
+
+func TestPostgresReplicationEventQueue_Acknowledge(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ event := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ event, err := queue.Enqueue(ctx, event)
+ require.NoError(t, err, "failed to fill in event queue")
+
+ actual, err := queue.Dequeue(ctx, event.Job.TargetNodeStorage, 100)
+ require.NoError(t, err)
+
+ // as we deque events we acquire lock for processing
+ requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: true}})
+ requireJobLocks(t, ctx, db, []JobLockRow{{JobID: event.ID, LockID: event.LockID}})
+
+ acknowledged, err := queue.Acknowledge(ctx, "completed", []uint64{actual[0].ID, 100500})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{actual[0].ID}, acknowledged)
+
+ event.State = "completed"
+ event.Attempt = 2
+ requireEvents(t, ctx, db, []ReplicationEvent{event})
+ // lock must be released as the event was acknowledged and there are no other events left protected under this lock
+ requireLocks(t, ctx, db, []LockRow{{ID: event.LockID, Acquired: false}})
+ // all associated with acknowledged event tracking bindings between lock and event must be removed
+ requireJobLocks(t, ctx, db, nil)
+}
+
+func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ eventType1 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType2 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: DeleteRepo.String(),
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "",
+ Params: nil,
+ },
+ }
+
+ eventType3 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-3",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ eventType4 := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo.String(),
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-0",
+ Params: nil,
+ },
+ }
+
+ events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ // we expect only first two types of events by limiting count to 3
+ dequeuedEvents1, err := queue.Dequeue(ctx, "gitaly-1", 3)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents1, 3)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 3, LockID: "gitaly-1|/project/path-2"},
+ })
+
+ // release lock for events of second type
+ acknowledge1, err := queue.Acknowledge(ctx, "failed", []uint64{3})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{3}, acknowledge1)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ })
+
+ dequeuedEvents2, err := queue.Dequeue(ctx, "gitaly-1", 3)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents2, 2, "expected: event of type 3 and of type 2 ('failed' will be fetched for retry)")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: true},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 1, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 5, LockID: "gitaly-1|/project/path-3"},
+ {JobID: 3, LockID: "gitaly-1|/project/path-2"},
+ })
+
+ acknowledge2, err := queue.Acknowledge(ctx, "completed", []uint64{1, 3})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{1, 3}, acknowledge2)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: true},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 5, LockID: "gitaly-1|/project/path-3"},
+ })
+
+ dequeuedEvents3, err := queue.Dequeue(ctx, "gitaly-2", 3)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: true},
+ {ID: "gitaly-2|/project/path-1", Acquired: true},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 2, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 5, LockID: "gitaly-1|/project/path-3"},
+ {JobID: 7, LockID: "gitaly-2|/project/path-1"},
+ })
+
+ acknowledged3, err := queue.Acknowledge(ctx, "completed", []uint64{2, 5, 7})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{2, 5, 7}, acknowledged3)
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: false},
+ {ID: "gitaly-1|/project/path-2", Acquired: false},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, nil)
+
+ dequeuedEvents4, err := queue.Dequeue(ctx, "gitaly-1", 100500)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEvents4, 2, "expected: event of type 4")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 4, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 6, LockID: "gitaly-1|/project/path-2"},
+ })
+
+ newEvent, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err)
+
+ acknowledge4, err := queue.Acknowledge(ctx, "completed", []uint64{newEvent.ID})
+ require.NoError(t, err)
+ require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged
+ var newEventState string
+ require.NoError(t, db.QueryRow("SELECT state FROM gitaly_replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState))
+ require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
+ requireLocks(t, ctx, db, []LockRow{
+ {ID: "gitaly-1|/project/path-1", Acquired: true},
+ {ID: "gitaly-1|/project/path-2", Acquired: true},
+ {ID: "gitaly-1|/project/path-3", Acquired: false},
+ {ID: "gitaly-2|/project/path-1", Acquired: false},
+ })
+ requireJobLocks(t, ctx, db, []JobLockRow{
+ {JobID: 4, LockID: "gitaly-1|/project/path-1"},
+ {JobID: 6, LockID: "gitaly-1|/project/path-2"},
+ })
+}
+
+func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
+ t.Helper()
+
+ // as it is not possible to expect exact time of entity creation/update we do not fetch it from database
+ // and we do not take it into account from expected values.
+ exp := make([]ReplicationEvent, len(expected)) // make a copy to avoid side effects for passed values
+ for i, e := range expected {
+ exp[i] = e
+ // set to default values as they would not be initialized from database
+ exp[i].CreatedAt = time.Time{}
+ exp[i].UpdatedAt = nil
+ }
+
+ sqlStmt := `SELECT id, state, attempt, lock_id, job FROM gitaly_replication_queue ORDER BY id`
+ rows, err := db.QueryContext(ctx, sqlStmt)
+ require.NoError(t, err)
+
+ actual, err := ScanReplicationEvents(rows)
+ require.NoError(t, err)
+ require.Equal(t, exp, actual)
+}
+
+// LockRow exists only for testing purposes and represents entries from gitaly_replication_queue_lock table.
+type LockRow struct {
+ ID string
+ Acquired bool
+}
+
+func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) {
+ t.Helper()
+
+ sqlStmt := `SELECT id, acquired FROM gitaly_replication_queue_lock`
+ rows, err := db.QueryContext(ctx, sqlStmt)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
+
+ var actual []LockRow
+ for rows.Next() {
+ var entry LockRow
+ require.NoError(t, rows.Scan(&entry.ID, &entry.Acquired), "failed to scan entry")
+ actual = append(actual, entry)
+ }
+ require.NoError(t, rows.Err(), "completion of result loop scan")
+ require.ElementsMatch(t, expected, actual)
+}
+
+// JobLockRow exists only for testing purposes and represents entries from gitaly_replication_queue_job_lock table.
+type JobLockRow struct {
+ JobID uint64
+ LockID string
+ TriggeredAt time.Time
+}
+
+func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) {
+ t.Helper()
+
+ sqlStmt := `SELECT job_id, lock_id FROM gitaly_replication_queue_job_lock ORDER BY triggered_at`
+ rows, err := db.QueryContext(ctx, sqlStmt)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
+
+ var actual []JobLockRow
+ for rows.Next() {
+ var entry JobLockRow
+ require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID), "failed to scan entry")
+ actual = append(actual, entry)
+ }
+ require.NoError(t, rows.Err(), "completion of result loop scan")
+ require.ElementsMatch(t, expected, actual)
+}