Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-07-10 16:41:52 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-07-10 16:41:52 +0300
commit931afeb1b6d67f3a4f327d1b585eeba7da0cf7b1 (patch)
tree6a95bfc9ffacb393d2f7b5b655039e2e62c6f358
parent385bacf01af3dff153e3f3362bb714dd3cfb6fa5 (diff)
Praefect: Collapse duplicate replication jobs
ParamsAssembler removed as it is redundant and can be replaced with (col = ANY($1)) with pq.Array value instead. Comment from method implementation removed as redundant. Comment on the interface fully describes the method. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2438
-rw-r--r--internal/praefect/datastore/glsql/postgres.go66
-rw-r--r--internal/praefect/datastore/glsql/postgres_test.go47
-rw-r--r--internal/praefect/datastore/queue.go31
3 files changed, 16 insertions, 128 deletions
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 2d3f36150..125de5956 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -4,8 +4,6 @@ package glsql
import (
"context"
"database/sql"
- "strconv"
- "strings"
// Blank import to enable integration of github.com/lib/pq into database/sql
_ "github.com/lib/pq"
@@ -111,70 +109,6 @@ func (txq *txQuery) log(err error, msg string) {
}
}
-// Uint64sToInterfaces converts list of uint64 values to the list of empty interfaces.
-func Uint64sToInterfaces(vs ...uint64) []interface{} {
- if vs == nil {
- return nil
- }
-
- rs := make([]interface{}, len(vs))
- for i, v := range vs {
- rs[i] = v
- }
- return rs
-}
-
-// GeneratePlaceholders returns string with 'count' placeholders starting from 'start' index.
-// 1 will be used if provided value for 'start' is less then 1.
-// 1 will be used if provided value for 'count' is less then 1.
-func GeneratePlaceholders(start, count int) string {
- if start < 1 {
- start = 1
- }
-
- if count <= 1 {
- return "$" + strconv.Itoa(start)
- }
-
- var builder = strings.Builder{}
- for i := start; i < start+count; i++ {
- if i != start {
- builder.WriteString(",")
- }
- builder.WriteString("$")
- builder.WriteString(strconv.Itoa(i))
- }
- return builder.String()
-}
-
-// NewParamsAssembler returns
-func NewParamsAssembler() *ParamsAssembler {
- return &ParamsAssembler{}
-}
-
-// ParamsAssembler helps to assemble parameters of the query together providing placeholders that must be used in query.
-type ParamsAssembler []interface{}
-
-// AddParams receives n params and assemble them with other params and returns generated placeholder as a result.
-func (pm *ParamsAssembler) AddParams(params []interface{}) string {
- start := len(*pm)
- *pm = append(*pm, params...)
- return GeneratePlaceholders(start+1, len(params))
-}
-
-// AddParam receives param and assemble it with other params and returns generated placeholder as a result.
-func (pm *ParamsAssembler) AddParam(param interface{}) string {
- return pm.AddParams([]interface{}{param})
-}
-
-// Params returns list of previously assembled parameters.
-func (pm *ParamsAssembler) Params() []interface{} {
- if pm != nil {
- return *pm
- }
- return nil
-}
-
// DestProvider returns list of pointers that will be used to scan values into.
type DestProvider interface {
// To returns list of pointers.
diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go
index 56a2585e3..33db541df 100644
--- a/internal/praefect/datastore/glsql/postgres_test.go
+++ b/internal/praefect/datastore/glsql/postgres_test.go
@@ -298,21 +298,6 @@ func createBasicTable(t *testing.T, db DB, tname string) func() {
}
}
-func TestUint64sToInterfaces(t *testing.T) {
- for _, tc := range []struct {
- From []uint64
- Exp []interface{}
- }{
- {From: nil, Exp: nil},
- {From: []uint64{1}, Exp: []interface{}{uint64(1)}},
- {From: []uint64{2, 3, 0}, Exp: []interface{}{uint64(2), uint64(3), uint64(0)}},
- } {
- t.Run("", func(t *testing.T) {
- require.Equal(t, tc.Exp, Uint64sToInterfaces(tc.From...))
- })
- }
-}
-
func TestUint64Provider(t *testing.T) {
var provider Uint64Provider
@@ -335,38 +320,6 @@ func TestUint64Provider(t *testing.T) {
require.Equal(t, []uint64{100, 200, 300}, provider.Values())
}
-func TestParamsAssembler(t *testing.T) {
- assembler := NewParamsAssembler()
-
- require.Equal(t, "$1", assembler.AddParam(1))
- require.Equal(t, []interface{}{1}, assembler.Params())
-
- require.Equal(t, "$2", assembler.AddParam('a'))
- require.Equal(t, []interface{}{1, 'a'}, assembler.Params())
-
- require.Equal(t, "$3,$4", assembler.AddParams([]interface{}{"b", uint64(4)}))
- require.Equal(t, []interface{}{1, 'a', "b", uint64(4)}, assembler.Params())
-}
-
-func TestGeneratePlaceholders(t *testing.T) {
- for _, tc := range []struct {
- Start, Count int
- Exp string
- }{
- {Start: -1, Count: -1, Exp: "$1"},
- {Start: 0, Count: -1, Exp: "$1"},
- {Start: 0, Count: 0, Exp: "$1"},
- {Start: 1, Count: 0, Exp: "$1"},
- {Start: 1, Count: 1, Exp: "$1"},
- {Start: 5, Count: 3, Exp: "$5,$6,$7"},
- {Start: 5, Count: -1, Exp: "$5"},
- } {
- t.Run("", func(t *testing.T) {
- require.Equal(t, tc.Exp, GeneratePlaceholders(tc.Start, tc.Count))
- })
- }
-}
-
func TestScanAll(t *testing.T) {
db := getDB(t)
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index c1feb46c2..e64a84e92 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -19,9 +19,12 @@ type ReplicationEventQueue interface {
Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
// Dequeue retrieves events from the persistent queue using provided limitations and filters.
Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
- // Acknowledge updates previously dequeued events with new state releasing resources acquired for it.
- // It only updates events that are in 'in_progress' state.
- // It returns list of ids that was actually acknowledged.
+ // Acknowledge updates previously dequeued events with the new state and releases resources acquired for it.
+ // It updates events that are in 'in_progress' state to the state that is passed in.
+ // It also updates state of similar events (scheduled fot the same repository with same change from the same source)
+ // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is
+ // 'completed'. Otherwise it won't be changed.
+ // It returns sub-set of passed in ids that were updated.
Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
// GetOutdatedRepositories returns storages by repositories which are considered outdated. A repository is considered
// outdated if the latest replication job is not in 'complete' state or the latest replication job does not originate
@@ -248,11 +251,6 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
return res, nil
}
-// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it.
-// It updates events that are in 'in_progress' state to the state that is passed in.
-// It also updates state of similar events that are in 'ready' state and created before the target
-// event was dequeue for the processing if the new state is 'completed'. Otherwise it won't be changed.
-// It returns sub-set of passed in ids that were updated.
func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
if len(ids) == 0 {
return nil, nil
@@ -262,13 +260,16 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
return nil, err
}
- params := glsql.NewParamsAssembler()
- newState := params.AddParam(state)
+ pqIDs := make(pq.Int64Array, len(ids))
+ for i, id := range ids {
+ pqIDs[i] = int64(id)
+ }
+
query := `
WITH existing AS (
SELECT id, lock_id, updated_at, job
FROM replication_queue
- WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `)
+ WHERE id = ANY($1)
AND state = 'in_progress'
FOR UPDATE
)
@@ -276,14 +277,14 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
UPDATE replication_queue AS queue
SET
state = CASE WHEN state = 'in_progress' THEN
- ` + newState + `::REPLICATION_JOB_STATE
+ $2::REPLICATION_JOB_STATE
ELSE
- (CASE WHEN ` + newState + ` = 'completed' THEN 'completed' ELSE queue.state END)::REPLICATION_JOB_STATE
+ (CASE WHEN $2 = 'completed' THEN 'completed' ELSE queue.state END)::REPLICATION_JOB_STATE
END,
updated_at = CASE WHEN state = 'in_progress' THEN
NOW() AT TIME ZONE 'UTC'
ELSE
- (CASE WHEN ` + newState + ` = 'completed' THEN NOW() AT TIME ZONE 'UTC' ELSE queue.updated_at END)
+ (CASE WHEN $2 = 'completed' THEN NOW() AT TIME ZONE 'UTC' ELSE queue.updated_at END)
END
FROM existing
WHERE existing.id = queue.id
@@ -318,7 +319,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
)
SELECT id
FROM existing`
- rows, err := rq.qc.QueryContext(ctx, query, params.Params()...)
+ rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}