Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-04-08 15:05:39 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-04-08 15:23:53 +0300
commite20af28923977f76a978f4eafc3e85e6d21ea6e2 (patch)
tree62576d9f11e7cc2cf67e859cb9010395ea84f16c
parentd0d6fc55a933d165910bb9afce3333872adb3aa8 (diff)
implement CountDeadJobs for postgres replication queue
Implements CountDeadJobs for Postgres replication queue in preparation for 'praefect dataloss' command.
-rw-r--r--internal/praefect/datastore/memory_test.go32
-rw-r--r--internal/praefect/datastore/queue.go37
-rw-r--r--internal/praefect/datastore/queue_test.go4
3 files changed, 58 insertions, 15 deletions
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index 274e3bb85..9eaefcadf 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -14,6 +14,10 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue)
ctx, cancel := testhelper.Context()
defer cancel()
+ // take the time here to include the also the
+ // completed and cancelled jobs in timerange
+ beforeOldest := time.Now()
+
const target = "target"
ackJobsToDeath := func(t *testing.T) {
t.Helper()
@@ -37,6 +41,10 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue)
}
}
+ // postgres only handles timestamps with a microsecond resolution thus
+ // we have to work with the time in microsecond sized steps
+ const tick = time.Microsecond
+
// add some other job states to the datastore to ensure they are not counted
for relPath, state := range map[string]JobState{"repo/completed-job": JobStateCompleted, "repo/cancelled-job": JobStateCancelled} {
_, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: relPath, TargetNodeStorage: target}})
@@ -49,12 +57,12 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue)
require.NoError(t, err)
}
- beforeOldest := time.Now()
- _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}})
+ oldest, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}})
require.NoError(t, err)
- afterOldest := time.Now()
- dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ afterOldest := oldest.CreatedAt.Add(tick)
+
+ dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, afterOldest)
require.NoError(t, err)
require.Empty(t, dead, "should not include ready jobs")
@@ -64,34 +72,32 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue)
_, err = q.Acknowledge(ctx, JobStateFailed, []uint64{jobs[0].ID})
require.NoError(t, err)
- dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, afterOldest)
require.NoError(t, err)
require.Empty(t, dead, "should not include failed jobs")
ackJobsToDeath(t)
- dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, afterOldest)
require.NoError(t, err)
require.Equal(t, map[string]int64{"old": 1}, dead, "should include dead job")
- _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
+ middle, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
require.NoError(t, err)
- afterMiddle := time.Now()
ackJobsToDeath(t)
- dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, middle.CreatedAt.Add(tick))
require.NoError(t, err)
require.Equal(t, map[string]int64{"old": 1, "new": 1}, dead, "should include both dead jobs")
- time.Sleep(time.Millisecond)
- _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
+ newest, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}})
require.NoError(t, err)
ackJobsToDeath(t)
- dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now())
+ dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, newest.CreatedAt.Add(tick))
require.NoError(t, err)
require.Equal(t, map[string]int64{"old": 1, "new": 2}, dead, "dead job are grouped by relative path")
- dead, err = q.CountDeadReplicationJobs(ctx, afterOldest, afterMiddle)
+ dead, err = q.CountDeadReplicationJobs(ctx, middle.CreatedAt, newest.CreatedAt.Add(-tick))
require.NoError(t, err)
require.Equal(t, map[string]int64{"new": 1}, dead, "should only count the in-between dead job")
}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 6de79e6ce..16e893a3f 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -8,7 +8,7 @@ import (
"fmt"
"time"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
@@ -152,8 +152,41 @@ type PostgresReplicationEventQueue struct {
qc glsql.Querier
}
+// CountDeadReplicationJobs returns the dead replication job counts by relative path within the
+// given timerange. The timerange beginning is inclusive and ending is exclusive.
func (rq PostgresReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) {
- return nil, helper.Unimplemented
+ const q = `
+ SELECT job->>'relative_path', count(*)
+ FROM replication_queue
+ WHERE state = 'dead'
+ AND created_at >= $1
+ AND created_at < $2
+ GROUP BY job->>'relative_path';
+ `
+
+ rows, err := rq.qc.QueryContext(ctx, q, from.UTC(), to.UTC())
+ if err != nil {
+ return nil, err
+ }
+ defer func() {
+ if err := rows.Close(); err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Error("error closing database rows")
+ }
+ }()
+
+ out := map[string]int64{}
+ for rows.Next() {
+ var relativePath string
+ var count int64
+
+ if err := rows.Scan(&relativePath, &count); err != nil {
+ return nil, err
+ }
+
+ out[relativePath] = count
+ }
+
+ return out, rows.Err()
}
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 56010f86c..5a0bd91ad 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -12,6 +12,10 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
+func TestPostgresReplicationEventQueue_CountDeadReplicationJobs(t *testing.T) {
+ ContractTestCountDeadReplicationJobs(t, PostgresReplicationEventQueue{getDB(t).DB})
+}
+
func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
db := getDB(t)