Welcome to mirror list, hosted at ThFree Co, Russian Federation.

queue_bm_test.go « datastore « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4f671f500297826761e0ed078f239da3e5bab2e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package datastore

import (
	"testing"

	"github.com/stretchr/testify/require"
	"gitlab.com/gitlab-org/gitaly/internal/testhelper"
	"gitlab.com/gitlab-org/gitaly/internal/testhelper/testdb"
)

func BenchmarkPostgresReplicationEventQueue_Acknowledge(b *testing.B) {
	// go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore
	b.Run("small", func(b *testing.B) {
		benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 10, JobStateInProgress: 10, JobStateFailed: 10})
	})

	// go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/medium -benchtime=100x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore
	b.Run("medium", func(b *testing.B) {
		benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 1_000, JobStateInProgress: 100, JobStateFailed: 100})
	})

	// go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/big -benchtime=10x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore
	b.Run("big", func(b *testing.B) {
		benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 100_000, JobStateInProgress: 100, JobStateFailed: 100})
	})

	// go test -tags=postgres -test.run=~ -test.bench=BenchmarkPostgresReplicationEventQueue_Acknowledge/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore
	b.Run("huge", func(b *testing.B) {
		benchmarkPostgresReplicationEventQueueAcknowledge(b, map[JobState]int{JobStateReady: 1_000_000, JobStateInProgress: 100, JobStateFailed: 100})
	})
}

func benchmarkPostgresReplicationEventQueueAcknowledge(b *testing.B, setup map[JobState]int) {
	db := testdb.New(b)
	ctx := testhelper.Context(b)

	queue := PostgresReplicationEventQueue{db.DB}
	eventTmpl := ReplicationEvent{
		Job: ReplicationJob{
			Change:            UpdateRepo,
			RelativePath:      "/project/path-",
			TargetNodeStorage: "gitaly-1",
			SourceNodeStorage: "gitaly-0",
			VirtualStorage:    "praefect",
		},
	}

	getEventIDs := func(events []ReplicationEvent) []uint64 {
		ids := make([]uint64, len(events))
		for i, event := range events {
			ids[i] = event.ID
		}
		return ids
	}

	for n := 0; n < b.N; n++ {
		b.StopTimer()
		b.ResetTimer()

		db.TruncateAll(b)

		total := 0
		for _, count := range setup {
			// at first we need to enqueue all events and then move them to proper states
			total += count
		}

		_, err := db.DB.ExecContext(
			ctx,
			`INSERT INTO replication_queue (state, lock_id, job)
			SELECT 'ready', 'praefect|gitaly-1|/project/path-'|| T.I, ('{"change":"update","relative_path":"/project/path-'|| T.I || '","virtual_storage":"praefect","source_node_storage":"gitaly-0","target_node_storage":"gitaly-1"}')::JSONB
			FROM GENERATE_SERIES(1, $1) T(I)`,
			total,
		)
		require.NoError(b, err)

		_, err = db.DB.ExecContext(
			ctx,
			`INSERT INTO replication_queue_lock
			SELECT DISTINCT lock_id FROM replication_queue`,
		)
		require.NoError(b, err)

		events, err := queue.Dequeue(ctx, eventTmpl.Job.VirtualStorage, eventTmpl.Job.TargetNodeStorage, setup[JobStateFailed]+setup[JobStateInProgress])
		require.NoError(b, err)

		_, err = queue.Acknowledge(ctx, JobStateFailed, getEventIDs(events[:setup[JobStateFailed]]))
		require.NoError(b, err)

		events, err = queue.Dequeue(ctx, eventTmpl.Job.VirtualStorage, eventTmpl.Job.TargetNodeStorage, 10)
		require.NoError(b, err)

		b.StartTimer()
		_, err = queue.Acknowledge(ctx, JobStateCompleted, getEventIDs(events))
		b.StopTimer()
		require.NoError(b, err)
	}
}