Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-04-07 12:21:55 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-21 17:34:29 +0300
commitd2db9c1aadedb1318c2b9c76f0a0425eb3d72062 (patch)
tree2259294c52a283b6b631b03fb3072dee05e89194
parent1b1b95408d11a2532db5a44ffefd5cbab6e0effd (diff)
Instrument verification worker for observability
This commit adds metrics on the verification worker so the process becomes more observable. The metrics track the number of dequeued jobs per storage and then number of completed jobs per storage with their results. The number of stale leases releases is also tracked. This gives insight into how the verification work is progressing.
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/verifier.go78
-rw-r--r--internal/praefect/verifier_test.go160
3 files changed, 220 insertions, 19 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index d33183438..5a7989db9 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -359,6 +359,7 @@ func run(
hm,
conf.BackgroundVerification.VerificationInterval,
)
+ promreg.MustRegister(verifier)
go func() {
if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go
index 449e379ea..cf23ae7f4 100644
--- a/internal/praefect/verifier.go
+++ b/internal/praefect/verifier.go
@@ -7,6 +7,7 @@ import (
"sync"
"time"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
@@ -27,8 +28,18 @@ type MetadataVerifier struct {
leaseDuration time.Duration
healthChecker HealthChecker
verificationInterval time.Duration
+
+ dequeuedJobsTotal *prometheus.CounterVec
+ completedJobsTotal *prometheus.CounterVec
+ staleLeasesReleasedTotal prometheus.Counter
}
+const (
+ resultError = "error"
+ resultInvalid = "invalid"
+ resultValid = "valid"
+)
+
// NewMetadataVerifier creates a new MetadataVerifier.
func NewMetadataVerifier(
log logrus.FieldLogger,
@@ -37,7 +48,7 @@ func NewMetadataVerifier(
healthChecker HealthChecker,
verificationInterval time.Duration,
) *MetadataVerifier {
- return &MetadataVerifier{
+ v := &MetadataVerifier{
log: log,
db: db,
conns: conns,
@@ -45,7 +56,39 @@ func NewMetadataVerifier(
leaseDuration: 30 * time.Second,
healthChecker: healthChecker,
verificationInterval: verificationInterval,
+ dequeuedJobsTotal: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_verification_jobs_dequeued_total",
+ Help: "Number of verification jobs dequeud.",
+ },
+ []string{"virtual_storage", "storage"},
+ ),
+ completedJobsTotal: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_verification_jobs_completed_total",
+ Help: "Number of verification jobs completed and their result",
+ },
+ []string{"virtual_storage", "storage", "result"},
+ ),
+ staleLeasesReleasedTotal: prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_stale_verification_leases_released_total",
+ Help: "Number of stale verification leases released.",
+ },
+ ),
}
+
+ // pre-warm the metrics so all labels are exported prior to their first observation
+ for virtualStorage, storages := range conns {
+ for storage := range storages {
+ v.dequeuedJobsTotal.WithLabelValues(virtualStorage, storage)
+ for _, result := range []string{resultError, resultInvalid, resultValid} {
+ v.completedJobsTotal.WithLabelValues(virtualStorage, storage, result)
+ }
+ }
+ }
+
+ return v
}
type verificationJob struct {
@@ -150,6 +193,7 @@ func (v *MetadataVerifier) releaseExpiredLeases(ctx context.Context) error {
}
if totalReleased > 0 {
+ v.staleLeasesReleasedTotal.Add(float64(totalReleased))
v.log.WithField("leases_released", released).Info("released stale verification leases")
}
@@ -179,6 +223,8 @@ func (v *MetadataVerifier) run(ctx context.Context) error {
go func() {
defer wg.Done()
+ v.dequeuedJobsTotal.WithLabelValues(job.virtualStorage, job.storage).Inc()
+
exists, err := v.verify(ctx, jobs[i])
results[i] = verificationResult{
job: job,
@@ -190,7 +236,23 @@ func (v *MetadataVerifier) run(ctx context.Context) error {
wg.Wait()
- return v.updateMetadata(ctx, results)
+ if err := v.updateMetadata(ctx, results); err != nil {
+ return fmt.Errorf("update metadata: %w", err)
+ }
+
+ for _, r := range results {
+ result := resultError
+ if r.error == nil {
+ result = resultInvalid
+ if r.exists {
+ result = resultValid
+ }
+ }
+
+ v.completedJobsTotal.WithLabelValues(r.job.virtualStorage, r.job.storage, result).Inc()
+ }
+
+ return nil
}
// logRecord is a helper type for gathering the removed replicas and logging them.
@@ -357,3 +419,15 @@ func (v *MetadataVerifier) verify(ctx context.Context, job verificationJob) (boo
return resp.Exists, nil
}
+
+// Describe describes the collected metrics to Prometheus.
+func (v *MetadataVerifier) Describe(ch chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(v, ch)
+}
+
+// Collect collects the metrics exposed from the MetadataVerifier.
+func (v *MetadataVerifier) Collect(ch chan<- prometheus.Metric) {
+ v.dequeuedJobsTotal.Collect(ch)
+ v.completedJobsTotal.Collect(ch)
+ v.staleLeasesReleasedTotal.Collect(ch)
+}
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index bb2a099b9..9c2ffda9b 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -5,9 +5,11 @@ import (
"errors"
"fmt"
"math/rand"
+ "strings"
"testing"
"time"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
@@ -73,16 +75,18 @@ func TestVerifier(t *testing.T) {
type step struct {
expectedRemovals logRecord
expectedErrors map[string]map[string][]string
- healthyStorages StaticHealthChecker
expectedReplicas map[string]map[string][]string
}
for _, tc := range []struct {
- desc string
- erroringGitalys map[string]bool
- replicas replicas
- batchSize int
- steps []step
+ desc string
+ erroringGitalys map[string]bool
+ replicas replicas
+ healthyStorages StaticHealthChecker
+ batchSize int
+ steps []step
+ dequeuedJobsTotal map[string]map[string]int
+ completedJobsTotal map[string]map[string]map[string]int
}{
{
desc: "all replicas exist",
@@ -104,6 +108,18 @@ func TestVerifier(t *testing.T) {
},
},
},
+ dequeuedJobsTotal: map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: 1,
+ gitaly3: 1,
+ },
+ },
+ completedJobsTotal: map[string]map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: {"valid": 1},
+ gitaly3: {"valid": 1},
+ },
+ },
},
{
desc: "recently verified replicas are not picked",
@@ -137,9 +153,9 @@ func TestVerifier(t *testing.T) {
},
},
},
+ healthyStorages: StaticHealthChecker{"virtual-storage": {gitaly1, gitaly2}},
steps: []step{
{
- healthyStorages: StaticHealthChecker{"virtual-storage": {gitaly1, gitaly2}},
expectedReplicas: map[string]map[string][]string{
"virtual-storage": {
"repository-1": {gitaly1, gitaly2, gitaly3},
@@ -147,6 +163,18 @@ func TestVerifier(t *testing.T) {
},
},
},
+ dequeuedJobsTotal: map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: 1,
+ gitaly2: 1,
+ },
+ },
+ completedJobsTotal: map[string]map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: {"valid": 1},
+ gitaly2: {"valid": 1},
+ },
+ },
},
{
desc: "metadata not deleted for replicas which errored on verification",
@@ -174,6 +202,20 @@ func TestVerifier(t *testing.T) {
},
},
},
+ dequeuedJobsTotal: map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: 1,
+ gitaly2: 1,
+ gitaly3: 1,
+ },
+ },
+ completedJobsTotal: map[string]map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: {"valid": 1},
+ gitaly2: {"valid": 1},
+ gitaly3: {"error": 1},
+ },
+ },
},
{
desc: "replicas with leases acquired are not picked",
@@ -221,6 +263,20 @@ func TestVerifier(t *testing.T) {
},
},
},
+ dequeuedJobsTotal: map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: 1,
+ gitaly2: 1,
+ gitaly3: 1,
+ },
+ },
+ completedJobsTotal: map[string]map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: {"valid": 1},
+ gitaly2: {"invalid": 1},
+ gitaly3: {"invalid": 1},
+ },
+ },
},
{
desc: "verification time is updated when repository exists",
@@ -267,6 +323,20 @@ func TestVerifier(t *testing.T) {
},
},
},
+ dequeuedJobsTotal: map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: 1,
+ gitaly2: 1,
+ gitaly3: 1,
+ },
+ },
+ completedJobsTotal: map[string]map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: {"valid": 1},
+ gitaly2: {"invalid": 1},
+ gitaly3: {"invalid": 1},
+ },
+ },
},
{
desc: "all replicas are lost",
@@ -316,6 +386,20 @@ func TestVerifier(t *testing.T) {
expectedReplicas: map[string]map[string][]string{},
},
},
+ dequeuedJobsTotal: map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: 1,
+ gitaly2: 1,
+ gitaly3: 1,
+ },
+ },
+ completedJobsTotal: map[string]map[string]map[string]int{
+ "virtual-storage": {
+ gitaly1: {"invalid": 1},
+ gitaly2: {"invalid": 1},
+ gitaly3: {"invalid": 1},
+ },
+ },
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -490,18 +574,20 @@ func TestVerifier(t *testing.T) {
).Scan(&lockedRows))
require.Equal(t, 3, lockedRows)
- for _, step := range tc.steps {
- logger, hook := test.NewNullLogger()
+ logger, hook := test.NewNullLogger()
- healthyStorages := StaticHealthChecker{"virtual-storage": []string{gitaly1, gitaly2, gitaly3}}
- if step.healthyStorages != nil {
- healthyStorages = step.healthyStorages
- }
+ healthyStorages := StaticHealthChecker{"virtual-storage": []string{gitaly1, gitaly2, gitaly3}}
+ if tc.healthyStorages != nil {
+ healthyStorages = tc.healthyStorages
+ }
- verifier := NewMetadataVerifier(logger, db, conns, healthyStorages, 24*7*time.Hour)
- if tc.batchSize > 0 {
- verifier.batchSize = tc.batchSize
- }
+ verifier := NewMetadataVerifier(logger, db, conns, healthyStorages, 24*7*time.Hour)
+ if tc.batchSize > 0 {
+ verifier.batchSize = tc.batchSize
+ }
+
+ for _, step := range tc.steps {
+ hook.Reset()
runCtx, cancelRun := context.WithCancel(ctx)
err = verifier.Run(runCtx, helper.NewCountTicker(1, cancelRun))
@@ -558,6 +644,41 @@ func TestVerifier(t *testing.T) {
// Ensure all the metadata still contains the expected replicas
require.Equal(t, step.expectedReplicas, getAllReplicas(ctx, t, db))
}
+
+ require.NoError(t, testutil.CollectAndCompare(verifier, strings.NewReader(fmt.Sprintf(`
+# HELP gitaly_praefect_stale_verification_leases_released_total Number of stale verification leases released.
+# TYPE gitaly_praefect_stale_verification_leases_released_total counter
+gitaly_praefect_stale_verification_leases_released_total 0
+# HELP gitaly_praefect_verification_jobs_completed_total Number of verification jobs completed and their result
+# TYPE gitaly_praefect_verification_jobs_completed_total counter
+gitaly_praefect_verification_jobs_completed_total{result="error",storage="gitaly-0",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="error",storage="gitaly-1",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="error",storage="gitaly-2",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="invalid",storage="gitaly-0",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="invalid",storage="gitaly-1",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="invalid",storage="gitaly-2",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="valid",storage="gitaly-0",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="valid",storage="gitaly-1",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_completed_total{result="valid",storage="gitaly-2",virtual_storage="virtual-storage"} %d
+# HELP gitaly_praefect_verification_jobs_dequeued_total Number of verification jobs dequeud.
+# TYPE gitaly_praefect_verification_jobs_dequeued_total counter
+gitaly_praefect_verification_jobs_dequeued_total{storage="gitaly-0",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_dequeued_total{storage="gitaly-1",virtual_storage="virtual-storage"} %d
+gitaly_praefect_verification_jobs_dequeued_total{storage="gitaly-2",virtual_storage="virtual-storage"} %d
+ `,
+ tc.completedJobsTotal["virtual-storage"][gitaly1][resultError],
+ tc.completedJobsTotal["virtual-storage"][gitaly2][resultError],
+ tc.completedJobsTotal["virtual-storage"][gitaly3][resultError],
+ tc.completedJobsTotal["virtual-storage"][gitaly1][resultInvalid],
+ tc.completedJobsTotal["virtual-storage"][gitaly2][resultInvalid],
+ tc.completedJobsTotal["virtual-storage"][gitaly3][resultInvalid],
+ tc.completedJobsTotal["virtual-storage"][gitaly1][resultValid],
+ tc.completedJobsTotal["virtual-storage"][gitaly2][resultValid],
+ tc.completedJobsTotal["virtual-storage"][gitaly3][resultValid],
+ tc.dequeuedJobsTotal["virtual-storage"][gitaly1],
+ tc.dequeuedJobsTotal["virtual-storage"][gitaly2],
+ tc.dequeuedJobsTotal["virtual-storage"][gitaly3],
+ ))))
})
}
}
@@ -672,4 +793,9 @@ func TestVerifier_runExpiredLeaseReleaser(t *testing.T) {
require.Equal(t, map[string]map[string]map[string]struct{}{
"virtual-storage-1": {"relative-path-1": {"expired-lease-1": {}, "expired-lease-2": {}, "expired-lease-3": {}}},
}, actualReleased)
+ require.NoError(t, testutil.CollectAndCompare(verifier, strings.NewReader(`
+# HELP gitaly_praefect_stale_verification_leases_released_total Number of stale verification leases released.
+# TYPE gitaly_praefect_stale_verification_leases_released_total counter
+gitaly_praefect_stale_verification_leases_released_total 3
+ `)))
}