Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-19 13:23:23 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-19 13:23:23 +0300
commit63e15a79c51641952580b54bcbe7935fa38b0128 (patch)
treeb68b8c31f6cb4f9e83ba0ef75607f4a8ef99f02a
parent42ec6310cf850b388089b3e88b42180e78fc4415 (diff)
parentc765c47d082ef6072ae06e37ec563a5537e5e809 (diff)
Merge branch 'pks-praefect-remove-refactor-test' into 'master'
cmd/praefect: Improve style of test for removal of replicaton jobs Closes #3944 See merge request gitlab-org/gitaly!4569
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go72
1 files changed, 38 insertions, 34 deletions
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index 1831d131b..ccc96aa7f 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
@@ -235,13 +236,13 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
virtualStorage = "praefect"
relativePath = "relative_path/to/repo.git"
)
- ctx := testhelper.Context(t)
+ ctx := testhelper.Context(t)
db := testdb.New(t)
queue := datastore.NewPostgresReplicationEventQueue(db)
- // Set replication event in_progress.
+ // Create an event that is "in-progress" to verify that it is not removed by the command.
inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.CreateRepo,
@@ -251,11 +252,15 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
},
})
require.NoError(t, err)
- inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10)
+ // Dequeue the event to move it into "in_progress" state.
+ dequeuedEvents, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10)
require.NoError(t, err)
- require.Len(t, inProgress1, 1)
+ require.Len(t, dequeuedEvents, 1)
+ require.Equal(t, inProgressEvent.ID, dequeuedEvents[0].ID)
+ require.Equal(t, datastore.JobStateInProgress, dequeuedEvents[0].State)
- // New event - events in the 'ready' state should be removed.
+ // Create a second event that is "ready" to verify that it is getting removed by the
+ // command.
_, err = queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
@@ -267,7 +272,7 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
})
require.NoError(t, err)
- // Failed event - should be removed as well.
+ // And create a third event that is in "failed" state, which should also get cleaned up.
failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
@@ -278,14 +283,16 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
},
})
require.NoError(t, err)
- inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10)
+ // Dequeue the job to move it into "in-progress".
+ dequeuedEvents, err = queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10)
require.NoError(t, err)
- require.Len(t, inProgress2, 1)
- // Acknowledge with failed status, so it will remain in the database for the next processing
- // attempt or until it is deleted by the 'removeReplicationEvents' method.
- acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID})
+ require.Len(t, dequeuedEvents, 1)
+ require.Equal(t, failedEvent.ID, dequeuedEvents[0].ID)
+ require.Equal(t, datastore.JobStateInProgress, dequeuedEvents[0].State)
+ // And then acknowledge it to move it into "failed" state.
+ acknowledgedJobIDs, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{failedEvent.ID})
require.NoError(t, err)
- require.Equal(t, []uint64{inProgress2[0].ID}, acks2)
+ require.Equal(t, []uint64{failedEvent.ID}, acknowledgedJobIDs)
ticker := helper.NewManualTicker()
defer ticker.Stop()
@@ -296,36 +303,33 @@ func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewDiscardingLogger(t), db.DB, ticker)
}()
+ // Tick multiple times so that we know that at least one event must have been processed by
+ // the command.
+ ticker.Tick()
ticker.Tick()
ticker.Tick()
- ticker.Tick() // blocks until previous tick is consumed
- // Now we acknowledge in_progress job, so it stops the processing loop or the command.
- acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID})
- if assert.NoError(t, err) {
- assert.Equal(t, []uint64{inProgress1[0].ID}, acks)
- }
+ // Verify that the database now only contains a single job, which is the "in_progress" one.
+ var jobIDs glsql.Uint64Provider
+ rows, err := db.QueryContext(ctx, `SELECT id FROM replication_queue`)
+ require.NoError(t, err)
+ defer rows.Close()
+ require.NoError(t, glsql.ScanAll(rows, &jobIDs))
+ require.NoError(t, rows.Err())
+ require.Equal(t, []uint64{inProgressEvent.ID}, jobIDs.Values())
+
+ // Now we acknowledge the "in_progress" job so that it will also get pruned. This
+ // will also stop the processing loop as there are no more jobs left.
+ acknowledgedJobIDs, err = queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{inProgressEvent.ID}, acknowledgedJobIDs)
ticker.Tick()
- timeout := time.After(time.Minute)
- for checkChan, exists := errChan, true; exists; {
- select {
- case err := <-checkChan:
- require.NoError(t, err)
- close(errChan)
- checkChan = nil
- case <-timeout:
- require.FailNow(t, "timeout reached, looks like the command hasn't made any progress")
- case <-time.After(50 * time.Millisecond):
- // Wait until job removed
- row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID)
- require.NoError(t, row.Scan(&exists))
- }
- }
- // Once there are no in_progress jobs anymore the method returns.
+ // The command should stop now because there are no more jobs in the replication queue.
require.NoError(t, <-errChan)
+ // And now we can finally assert that the replication queue is empty.
var notExists bool
row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`)
require.NoError(t, row.Scan(&notExists))