diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-08 15:05:39 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-04-10 02:27:26 +0300 |
commit | 1d621e5c421cc2d2581f68a1da94d1bbb6af990a (patch) | |
tree | 2f457ad39e7db07a88a14a6c951e9202e076f81c | |
parent | 5f47f6116b629e6820aea68b8d60dc8ce976a2b3 (diff) |
implement CountDeadJobs for postgres replication queue
Implements CountDeadJobs for Postgres replication queue in preparation
for 'praefect dataloss' command.
-rw-r--r-- | internal/praefect/datastore/memory_test.go | 32 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 37 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 4 |
3 files changed, 58 insertions, 15 deletions
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go index 274e3bb85..9eaefcadf 100644 --- a/internal/praefect/datastore/memory_test.go +++ b/internal/praefect/datastore/memory_test.go @@ -14,6 +14,10 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) ctx, cancel := testhelper.Context() defer cancel() + // take the time here to include the also the + // completed and cancelled jobs in timerange + beforeOldest := time.Now() + const target = "target" ackJobsToDeath := func(t *testing.T) { t.Helper() @@ -37,6 +41,10 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) } } + // postgres only handles timestamps with a microsecond resolution thus + // we have to work with the time in microsecond sized steps + const tick = time.Microsecond + // add some other job states to the datastore to ensure they are not counted for relPath, state := range map[string]JobState{"repo/completed-job": JobStateCompleted, "repo/cancelled-job": JobStateCancelled} { _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: relPath, TargetNodeStorage: target}}) @@ -49,12 +57,12 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) require.NoError(t, err) } - beforeOldest := time.Now() - _, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}}) + oldest, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "old", TargetNodeStorage: target}}) require.NoError(t, err) - afterOldest := time.Now() - dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + afterOldest := oldest.CreatedAt.Add(tick) + + dead, err := q.CountDeadReplicationJobs(ctx, beforeOldest, afterOldest) require.NoError(t, err) require.Empty(t, dead, "should not include ready jobs") @@ -64,34 +72,32 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) _, err = q.Acknowledge(ctx, JobStateFailed, []uint64{jobs[0].ID}) require.NoError(t, err) - dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, afterOldest) require.NoError(t, err) require.Empty(t, dead, "should not include failed jobs") ackJobsToDeath(t) - dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, afterOldest) require.NoError(t, err) require.Equal(t, map[string]int64{"old": 1}, dead, "should include dead job") - _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) + middle, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) require.NoError(t, err) - afterMiddle := time.Now() ackJobsToDeath(t) - dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, middle.CreatedAt.Add(tick)) require.NoError(t, err) require.Equal(t, map[string]int64{"old": 1, "new": 1}, dead, "should include both dead jobs") - time.Sleep(time.Millisecond) - _, err = q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) + newest, err := q.Enqueue(ctx, ReplicationEvent{Job: ReplicationJob{RelativePath: "new", TargetNodeStorage: target}}) require.NoError(t, err) ackJobsToDeath(t) - dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, time.Now()) + dead, err = q.CountDeadReplicationJobs(ctx, beforeOldest, newest.CreatedAt.Add(tick)) require.NoError(t, err) require.Equal(t, map[string]int64{"old": 1, "new": 2}, dead, "dead job are grouped by relative path") - dead, err = q.CountDeadReplicationJobs(ctx, afterOldest, afterMiddle) + dead, err = q.CountDeadReplicationJobs(ctx, middle.CreatedAt, newest.CreatedAt.Add(-tick)) require.NoError(t, err) require.Equal(t, map[string]int64{"new": 1}, dead, "should only count the in-between dead job") } diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 6de79e6ce..16e893a3f 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -8,7 +8,7 @@ import ( "fmt" "time" - "gitlab.com/gitlab-org/gitaly/internal/helper" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) @@ -152,8 +152,41 @@ type PostgresReplicationEventQueue struct { qc glsql.Querier } +// CountDeadReplicationJobs returns the dead replication job counts by relative path within the +// given timerange. The timerange beginning is inclusive and ending is exclusive. func (rq PostgresReplicationEventQueue) CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) { - return nil, helper.Unimplemented + const q = ` + SELECT job->>'relative_path', count(*) + FROM replication_queue + WHERE state = 'dead' + AND created_at >= $1 + AND created_at < $2 + GROUP BY job->>'relative_path'; + ` + + rows, err := rq.qc.QueryContext(ctx, q, from.UTC(), to.UTC()) + if err != nil { + return nil, err + } + defer func() { + if err := rows.Close(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("error closing database rows") + } + }() + + out := map[string]int64{} + for rows.Next() { + var relativePath string + var count int64 + + if err := rows.Scan(&relativePath, &count); err != nil { + return nil, err + } + + out[relativePath] = count + } + + return out, rows.Err() } func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 56010f86c..5a0bd91ad 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -12,6 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) +func TestPostgresReplicationEventQueue_CountDeadReplicationJobs(t *testing.T) { + ContractTestCountDeadReplicationJobs(t, PostgresReplicationEventQueue{getDB(t).DB}) +} + func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { db := getDB(t) |