diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-07-10 16:41:52 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-07-10 16:41:52 +0300 |
commit | 931afeb1b6d67f3a4f327d1b585eeba7da0cf7b1 (patch) | |
tree | 6a95bfc9ffacb393d2f7b5b655039e2e62c6f358 | |
parent | 385bacf01af3dff153e3f3362bb714dd3cfb6fa5 (diff) |
Praefect: Collapse duplicate replication jobs
ParamsAssembler removed as it is redundant and can be replaced
with (col = ANY($1)) with pq.Array value instead.
Comment from method implementation removed as redundant.
Comment on the interface fully describes the method.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2438
-rw-r--r-- | internal/praefect/datastore/glsql/postgres.go | 66 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres_test.go | 47 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 31 |
3 files changed, 16 insertions, 128 deletions
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 2d3f36150..125de5956 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -4,8 +4,6 @@ package glsql import ( "context" "database/sql" - "strconv" - "strings" // Blank import to enable integration of github.com/lib/pq into database/sql _ "github.com/lib/pq" @@ -111,70 +109,6 @@ func (txq *txQuery) log(err error, msg string) { } } -// Uint64sToInterfaces converts list of uint64 values to the list of empty interfaces. -func Uint64sToInterfaces(vs ...uint64) []interface{} { - if vs == nil { - return nil - } - - rs := make([]interface{}, len(vs)) - for i, v := range vs { - rs[i] = v - } - return rs -} - -// GeneratePlaceholders returns string with 'count' placeholders starting from 'start' index. -// 1 will be used if provided value for 'start' is less then 1. -// 1 will be used if provided value for 'count' is less then 1. -func GeneratePlaceholders(start, count int) string { - if start < 1 { - start = 1 - } - - if count <= 1 { - return "$" + strconv.Itoa(start) - } - - var builder = strings.Builder{} - for i := start; i < start+count; i++ { - if i != start { - builder.WriteString(",") - } - builder.WriteString("$") - builder.WriteString(strconv.Itoa(i)) - } - return builder.String() -} - -// NewParamsAssembler returns -func NewParamsAssembler() *ParamsAssembler { - return &ParamsAssembler{} -} - -// ParamsAssembler helps to assemble parameters of the query together providing placeholders that must be used in query. -type ParamsAssembler []interface{} - -// AddParams receives n params and assemble them with other params and returns generated placeholder as a result. -func (pm *ParamsAssembler) AddParams(params []interface{}) string { - start := len(*pm) - *pm = append(*pm, params...) - return GeneratePlaceholders(start+1, len(params)) -} - -// AddParam receives param and assemble it with other params and returns generated placeholder as a result. -func (pm *ParamsAssembler) AddParam(param interface{}) string { - return pm.AddParams([]interface{}{param}) -} - -// Params returns list of previously assembled parameters. -func (pm *ParamsAssembler) Params() []interface{} { - if pm != nil { - return *pm - } - return nil -} - // DestProvider returns list of pointers that will be used to scan values into. type DestProvider interface { // To returns list of pointers. diff --git a/internal/praefect/datastore/glsql/postgres_test.go b/internal/praefect/datastore/glsql/postgres_test.go index 56a2585e3..33db541df 100644 --- a/internal/praefect/datastore/glsql/postgres_test.go +++ b/internal/praefect/datastore/glsql/postgres_test.go @@ -298,21 +298,6 @@ func createBasicTable(t *testing.T, db DB, tname string) func() { } } -func TestUint64sToInterfaces(t *testing.T) { - for _, tc := range []struct { - From []uint64 - Exp []interface{} - }{ - {From: nil, Exp: nil}, - {From: []uint64{1}, Exp: []interface{}{uint64(1)}}, - {From: []uint64{2, 3, 0}, Exp: []interface{}{uint64(2), uint64(3), uint64(0)}}, - } { - t.Run("", func(t *testing.T) { - require.Equal(t, tc.Exp, Uint64sToInterfaces(tc.From...)) - }) - } -} - func TestUint64Provider(t *testing.T) { var provider Uint64Provider @@ -335,38 +320,6 @@ func TestUint64Provider(t *testing.T) { require.Equal(t, []uint64{100, 200, 300}, provider.Values()) } -func TestParamsAssembler(t *testing.T) { - assembler := NewParamsAssembler() - - require.Equal(t, "$1", assembler.AddParam(1)) - require.Equal(t, []interface{}{1}, assembler.Params()) - - require.Equal(t, "$2", assembler.AddParam('a')) - require.Equal(t, []interface{}{1, 'a'}, assembler.Params()) - - require.Equal(t, "$3,$4", assembler.AddParams([]interface{}{"b", uint64(4)})) - require.Equal(t, []interface{}{1, 'a', "b", uint64(4)}, assembler.Params()) -} - -func TestGeneratePlaceholders(t *testing.T) { - for _, tc := range []struct { - Start, Count int - Exp string - }{ - {Start: -1, Count: -1, Exp: "$1"}, - {Start: 0, Count: -1, Exp: "$1"}, - {Start: 0, Count: 0, Exp: "$1"}, - {Start: 1, Count: 0, Exp: "$1"}, - {Start: 1, Count: 1, Exp: "$1"}, - {Start: 5, Count: 3, Exp: "$5,$6,$7"}, - {Start: 5, Count: -1, Exp: "$5"}, - } { - t.Run("", func(t *testing.T) { - require.Equal(t, tc.Exp, GeneratePlaceholders(tc.Start, tc.Count)) - }) - } -} - func TestScanAll(t *testing.T) { db := getDB(t) diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index c1feb46c2..e64a84e92 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -19,9 +19,12 @@ type ReplicationEventQueue interface { Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) // Dequeue retrieves events from the persistent queue using provided limitations and filters. Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) - // Acknowledge updates previously dequeued events with new state releasing resources acquired for it. - // It only updates events that are in 'in_progress' state. - // It returns list of ids that was actually acknowledged. + // Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. + // It updates events that are in 'in_progress' state to the state that is passed in. + // It also updates state of similar events (scheduled fot the same repository with same change from the same source) + // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is + // 'completed'. Otherwise it won't be changed. + // It returns sub-set of passed in ids that were updated. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) // GetOutdatedRepositories returns storages by repositories which are considered outdated. A repository is considered // outdated if the latest replication job is not in 'complete' state or the latest replication job does not originate @@ -248,11 +251,6 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor return res, nil } -// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. -// It updates events that are in 'in_progress' state to the state that is passed in. -// It also updates state of similar events that are in 'ready' state and created before the target -// event was dequeue for the processing if the new state is 'completed'. Otherwise it won't be changed. -// It returns sub-set of passed in ids that were updated. func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { if len(ids) == 0 { return nil, nil @@ -262,13 +260,16 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J return nil, err } - params := glsql.NewParamsAssembler() - newState := params.AddParam(state) + pqIDs := make(pq.Int64Array, len(ids)) + for i, id := range ids { + pqIDs[i] = int64(id) + } + query := ` WITH existing AS ( SELECT id, lock_id, updated_at, job FROM replication_queue - WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `) + WHERE id = ANY($1) AND state = 'in_progress' FOR UPDATE ) @@ -276,14 +277,14 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J UPDATE replication_queue AS queue SET state = CASE WHEN state = 'in_progress' THEN - ` + newState + `::REPLICATION_JOB_STATE + $2::REPLICATION_JOB_STATE ELSE - (CASE WHEN ` + newState + ` = 'completed' THEN 'completed' ELSE queue.state END)::REPLICATION_JOB_STATE + (CASE WHEN $2 = 'completed' THEN 'completed' ELSE queue.state END)::REPLICATION_JOB_STATE END, updated_at = CASE WHEN state = 'in_progress' THEN NOW() AT TIME ZONE 'UTC' ELSE - (CASE WHEN ` + newState + ` = 'completed' THEN NOW() AT TIME ZONE 'UTC' ELSE queue.updated_at END) + (CASE WHEN $2 = 'completed' THEN NOW() AT TIME ZONE 'UTC' ELSE queue.updated_at END) END FROM existing WHERE existing.id = queue.id @@ -318,7 +319,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J ) SELECT id FROM existing` - rows, err := rq.qc.QueryContext(ctx, query, params.Params()...) + rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state) if err != nil { return nil, fmt.Errorf("query: %w", err) } |