Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-05-29 10:55:40 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-05-29 10:55:40 +0300
commitc9eb369f24f3287605018ccabdd9d97f4e021010 (patch)
tree9159f3059d6324d94accbc5a8358bb49dd411938
parentd4476a558743d6935c58405eb450139f58c70937 (diff)
parentac74eaed5222864127397bb9b3a96041e3e3d9fe (diff)
Merge branch 'po-praefect-job-counter-storage-label' into 'master'
Praefect gauge for replication jobs scoped by storage Closes #2780 See merge request gitlab-org/gitaly!2207
-rw-r--r--changelogs/unreleased/po-praefect-job-counter-storage-label.yml5
-rw-r--r--cmd/praefect/main.go5
-rw-r--r--internal/praefect/helper_test.go1
-rw-r--r--internal/praefect/metrics/metrics.go42
-rw-r--r--internal/praefect/metrics/metrics_test.go10
-rw-r--r--internal/praefect/metrics/prometheus.go16
-rw-r--r--internal/praefect/replicator.go39
-rw-r--r--internal/praefect/replicator_test.go5
-rw-r--r--internal/testhelper/promtest/gauge.go22
9 files changed, 114 insertions, 31 deletions
diff --git a/changelogs/unreleased/po-praefect-job-counter-storage-label.yml b/changelogs/unreleased/po-praefect-job-counter-storage-label.yml
new file mode 100644
index 000000000..01029ee6c
--- /dev/null
+++ b/changelogs/unreleased/po-praefect-job-counter-storage-label.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect gauge for replication jobs scoped by storage
+merge_request: 2207
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index c41ed9820..0002913cb 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -215,7 +215,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
return err
}
- queueMetric, err := metrics.RegisterReplicationJobsInFlight()
+ storageJobs, err := metrics.RegisterReplicationJobsInFlightByStorage()
if err != nil {
return err
}
@@ -269,7 +269,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
nodeManager,
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
- praefect.WithQueueMetric(queueMetric))
+ praefect.WithInFlightJobsGauge(storageJobs),
+ )
srv = praefect.NewServer(coordinator.StreamDirector, logger, protoregistry.GitalyProtoPreregistered, conf)
)
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index a8db569ce..e7059bae2 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -250,7 +250,6 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
conf.VirtualStorageNames(),
opt.withQueue,
opt.withNodeMgr,
- WithQueueMetric(&promtest.MockGauge{}),
)
prf := NewServer(coordinator.StreamDirector, opt.withLogger, opt.withAnnotations, conf)
diff --git a/internal/praefect/metrics/metrics.go b/internal/praefect/metrics/metrics.go
new file mode 100644
index 000000000..8b59274e8
--- /dev/null
+++ b/internal/praefect/metrics/metrics.go
@@ -0,0 +1,42 @@
+package metrics
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+ namespace = "gitaly"
+ subsystem = "praefect"
+ labelVirtualStorage = "virtual_storage"
+ labelGitalyStorage = "gitaly_storage"
+)
+
+// StorageGauge is a metric wrapper that abstracts and simplifies the interface
+// of the underlying type. It is intended for gauges that are scoped by virtual
+// storage and by Gitaly storage.
+type StorageGauge struct {
+ gv *prometheus.GaugeVec
+}
+
+func newStorageGauge(name string) StorageGauge {
+ sg := StorageGauge{}
+ sg.gv = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: name,
+ },
+ []string{labelVirtualStorage, labelGitalyStorage},
+ )
+ return sg
+}
+
+// Inc will inc the gauge for the specified virtual and gitaly storage
+func (sg StorageGauge) Inc(virtualStorage, gitalyStorage string) {
+ sg.gv.WithLabelValues(virtualStorage, gitalyStorage).Inc()
+}
+
+// Dec will dec the gauge for the specified virtual and gitaly storage
+func (sg StorageGauge) Dec(virtualStorage, gitalyStorage string) {
+ sg.gv.WithLabelValues(virtualStorage, gitalyStorage).Dec()
+}
diff --git a/internal/praefect/metrics/metrics_test.go b/internal/praefect/metrics/metrics_test.go
new file mode 100644
index 000000000..0e6d0e767
--- /dev/null
+++ b/internal/praefect/metrics/metrics_test.go
@@ -0,0 +1,10 @@
+package metrics
+
+import "testing"
+
+func TestStorageGauge(t *testing.T) {
+ sg := newStorageGauge("test")
+ // the following will panic if the number of labels is wrong:
+ sg.Inc("storage-1", "gitaly-1")
+ sg.Dec("storage-1", "gitaly-1")
+}
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index 87dee00ca..808fedc1c 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -53,17 +53,11 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) {
return nodeLatency, prometheus.Register(nodeLatency)
}
-// RegisterReplicationJobsInFlight creates and registers a gauge
-// to track the size of the replication queue
-func RegisterReplicationJobsInFlight() (metrics.Gauge, error) {
- replicationJobsInFlight := prometheus.NewGauge(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "praefect",
- Name: "replication_jobs",
- },
- )
- return replicationJobsInFlight, prometheus.Register(replicationJobsInFlight)
+// RegisterReplicationJobsInFlightByStorage creates and registers a storage
+// gauge for tracking the in flight replication jobs
+func RegisterReplicationJobsInFlightByStorage() (StorageGauge, error) {
+ sg := newStorageGauge("replication_jobs")
+ return sg, prometheus.Register(sg.gv)
}
// RegisterTransactionCounter creates and registers a Prometheus counter to
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 6df96d488..d6354161c 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -245,17 +245,24 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
return primaryChecksum == replicaChecksum, nil
}
+// StorageGauge is used to track the in-flight replication jobs by virtual and
+// Gitaly target storage
+type StorageGauge interface {
+ Inc(virtualStorage, gitalyStorage string)
+ Dec(virtualStorage, gitalyStorage string)
+}
+
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
- log *logrus.Entry
- queue datastore.ReplicationEventQueue
- nodeManager nodes.Manager
- virtualStorages []string // replicas this replicator is responsible for
- replicator Replicator // does the actual replication logic
- replQueueMetric prommetrics.Gauge
- replLatencyMetric prommetrics.HistogramVec
- replDelayMetric prommetrics.HistogramVec
- replJobTimeout time.Duration
+ log *logrus.Entry
+ queue datastore.ReplicationEventQueue
+ nodeManager nodes.Manager
+ virtualStorages []string // replicas this replicator is responsible for
+ replicator Replicator // does the actual replication logic
+ replInFlightMetric StorageGauge
+ replLatencyMetric prommetrics.HistogramVec
+ replDelayMetric prommetrics.HistogramVec
+ replJobTimeout time.Duration
// whitelist contains the project names of the repos we wish to replicate
whitelist map[string]struct{}
}
@@ -263,10 +270,11 @@ type ReplMgr struct {
// ReplMgrOpt allows a replicator to be configured with additional options
type ReplMgrOpt func(*ReplMgr)
-// WithQueueMetric is an option to set the queue size prometheus metric
-func WithQueueMetric(g prommetrics.Gauge) func(*ReplMgr) {
+// WithInFlightJobsGauge is an option to set the replication jobs in-flight
+// gauge
+func WithInFlightJobsGauge(sg StorageGauge) ReplMgrOpt {
return func(m *ReplMgr) {
- m.replQueueMetric = g
+ m.replInFlightMetric = sg
}
}
@@ -296,7 +304,6 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep
nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
- replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}),
}
for _, opt := range opts {
@@ -484,8 +491,10 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds())
- r.replQueueMetric.Inc()
- defer r.replQueueMetric.Dec()
+ if r.replInFlightMetric != nil {
+ r.replInFlightMetric.Inc(event.Job.VirtualStorage, event.Job.TargetNodeStorage)
+ defer r.replInFlightMetric.Dec(event.Job.VirtualStorage, event.Job.TargetNodeStorage)
+ }
switch event.Job.Change {
case datastore.UpdateRepo:
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index fbfcb96ab..d35ab8d4a 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -153,7 +153,8 @@ func TestProcessReplicationJob(t *testing.T) {
var replicator defaultReplicator
replicator.log = entry
- var mockReplicationGauge promtest.MockGauge
+ mockReplicationGauge := promtest.NewMockStorageGauge()
+
var mockReplicationLatencyHistogramVec promtest.MockHistogramVec
var mockReplicationDelayHistogramVec promtest.MockHistogramVec
@@ -164,7 +165,7 @@ func TestProcessReplicationJob(t *testing.T) {
nodeMgr,
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
- WithQueueMetric(&mockReplicationGauge),
+ WithInFlightJobsGauge(mockReplicationGauge),
)
replMgr.replicator = replicator
diff --git a/internal/testhelper/promtest/gauge.go b/internal/testhelper/promtest/gauge.go
index 7036280bb..084655e11 100644
--- a/internal/testhelper/promtest/gauge.go
+++ b/internal/testhelper/promtest/gauge.go
@@ -40,3 +40,25 @@ func (m *MockGauge) Dec() {
m.Value--
m.decs++
}
+
+// MockStorageGauge wraps a MockGauge
+type MockStorageGauge struct {
+ *MockGauge
+}
+
+// NewMockStorageGauge returns an initialized mock storage gauge
+func NewMockStorageGauge() *MockStorageGauge {
+ return &MockStorageGauge{
+ &MockGauge{},
+ }
+}
+
+// Inc will track total calls to this method while ignoring params
+func (m *MockStorageGauge) Inc(_, _ string) {
+ m.MockGauge.Inc()
+}
+
+// Dec will track total calls to this method while ignoring params
+func (m *MockStorageGauge) Dec(_, _ string) {
+ m.MockGauge.Dec()
+}