diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-05-29 10:55:40 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-05-29 10:55:40 +0300 |
commit | c9eb369f24f3287605018ccabdd9d97f4e021010 (patch) | |
tree | 9159f3059d6324d94accbc5a8358bb49dd411938 | |
parent | d4476a558743d6935c58405eb450139f58c70937 (diff) | |
parent | ac74eaed5222864127397bb9b3a96041e3e3d9fe (diff) |
Merge branch 'po-praefect-job-counter-storage-label' into 'master'
Praefect gauge for replication jobs scoped by storage
Closes #2780
See merge request gitlab-org/gitaly!2207
-rw-r--r-- | changelogs/unreleased/po-praefect-job-counter-storage-label.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 5 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/metrics/metrics.go | 42 | ||||
-rw-r--r-- | internal/praefect/metrics/metrics_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 16 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 39 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 5 | ||||
-rw-r--r-- | internal/testhelper/promtest/gauge.go | 22 |
9 files changed, 114 insertions, 31 deletions
diff --git a/changelogs/unreleased/po-praefect-job-counter-storage-label.yml b/changelogs/unreleased/po-praefect-job-counter-storage-label.yml new file mode 100644 index 000000000..01029ee6c --- /dev/null +++ b/changelogs/unreleased/po-praefect-job-counter-storage-label.yml @@ -0,0 +1,5 @@ +--- +title: Praefect gauge for replication jobs scoped by storage +merge_request: 2207 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index c41ed9820..0002913cb 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -215,7 +215,7 @@ func run(cfgs []starter.Config, conf config.Config) error { return err } - queueMetric, err := metrics.RegisterReplicationJobsInFlight() + storageJobs, err := metrics.RegisterReplicationJobsInFlightByStorage() if err != nil { return err } @@ -269,7 +269,8 @@ func run(cfgs []starter.Config, conf config.Config) error { nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), - praefect.WithQueueMetric(queueMetric)) + praefect.WithInFlightJobsGauge(storageJobs), + ) srv = praefect.NewServer(coordinator.StreamDirector, logger, protoregistry.GitalyProtoPreregistered, conf) ) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index a8db569ce..e7059bae2 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -250,7 +250,6 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp conf.VirtualStorageNames(), opt.withQueue, opt.withNodeMgr, - WithQueueMetric(&promtest.MockGauge{}), ) prf := NewServer(coordinator.StreamDirector, opt.withLogger, opt.withAnnotations, conf) diff --git a/internal/praefect/metrics/metrics.go b/internal/praefect/metrics/metrics.go new file mode 100644 index 000000000..8b59274e8 --- /dev/null +++ b/internal/praefect/metrics/metrics.go @@ -0,0 +1,42 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "gitaly" + subsystem = "praefect" + labelVirtualStorage = "virtual_storage" + labelGitalyStorage = "gitaly_storage" +) + +// StorageGauge is a metric wrapper that abstracts and simplifies the interface +// of the underlying type. It is intended for gauges that are scoped by virtual +// storage and by Gitaly storage. +type StorageGauge struct { + gv *prometheus.GaugeVec +} + +func newStorageGauge(name string) StorageGauge { + sg := StorageGauge{} + sg.gv = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: name, + }, + []string{labelVirtualStorage, labelGitalyStorage}, + ) + return sg +} + +// Inc will inc the gauge for the specified virtual and gitaly storage +func (sg StorageGauge) Inc(virtualStorage, gitalyStorage string) { + sg.gv.WithLabelValues(virtualStorage, gitalyStorage).Inc() +} + +// Dec will dec the gauge for the specified virtual and gitaly storage +func (sg StorageGauge) Dec(virtualStorage, gitalyStorage string) { + sg.gv.WithLabelValues(virtualStorage, gitalyStorage).Dec() +} diff --git a/internal/praefect/metrics/metrics_test.go b/internal/praefect/metrics/metrics_test.go new file mode 100644 index 000000000..0e6d0e767 --- /dev/null +++ b/internal/praefect/metrics/metrics_test.go @@ -0,0 +1,10 @@ +package metrics + +import "testing" + +func TestStorageGauge(t *testing.T) { + sg := newStorageGauge("test") + // the following will panic if the number of labels is wrong: + sg.Inc("storage-1", "gitaly-1") + sg.Dec("storage-1", "gitaly-1") +} diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index 87dee00ca..808fedc1c 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -53,17 +53,11 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) { return nodeLatency, prometheus.Register(nodeLatency) } -// RegisterReplicationJobsInFlight creates and registers a gauge -// to track the size of the replication queue -func RegisterReplicationJobsInFlight() (metrics.Gauge, error) { - replicationJobsInFlight := prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "praefect", - Name: "replication_jobs", - }, - ) - return replicationJobsInFlight, prometheus.Register(replicationJobsInFlight) +// RegisterReplicationJobsInFlightByStorage creates and registers a storage +// gauge for tracking the in flight replication jobs +func RegisterReplicationJobsInFlightByStorage() (StorageGauge, error) { + sg := newStorageGauge("replication_jobs") + return sg, prometheus.Register(sg.gv) } // RegisterTransactionCounter creates and registers a Prometheus counter to diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 6df96d488..d6354161c 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -245,17 +245,24 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, return primaryChecksum == replicaChecksum, nil } +// StorageGauge is used to track the in-flight replication jobs by virtual and +// Gitaly target storage +type StorageGauge interface { + Inc(virtualStorage, gitalyStorage string) + Dec(virtualStorage, gitalyStorage string) +} + // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { - log *logrus.Entry - queue datastore.ReplicationEventQueue - nodeManager nodes.Manager - virtualStorages []string // replicas this replicator is responsible for - replicator Replicator // does the actual replication logic - replQueueMetric prommetrics.Gauge - replLatencyMetric prommetrics.HistogramVec - replDelayMetric prommetrics.HistogramVec - replJobTimeout time.Duration + log *logrus.Entry + queue datastore.ReplicationEventQueue + nodeManager nodes.Manager + virtualStorages []string // replicas this replicator is responsible for + replicator Replicator // does the actual replication logic + replInFlightMetric StorageGauge + replLatencyMetric prommetrics.HistogramVec + replDelayMetric prommetrics.HistogramVec + replJobTimeout time.Duration // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} } @@ -263,10 +270,11 @@ type ReplMgr struct { // ReplMgrOpt allows a replicator to be configured with additional options type ReplMgrOpt func(*ReplMgr) -// WithQueueMetric is an option to set the queue size prometheus metric -func WithQueueMetric(g prommetrics.Gauge) func(*ReplMgr) { +// WithInFlightJobsGauge is an option to set the replication jobs in-flight +// gauge +func WithInFlightJobsGauge(sg StorageGauge) ReplMgrOpt { return func(m *ReplMgr) { - m.replQueueMetric = g + m.replInFlightMetric = sg } } @@ -296,7 +304,6 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep nodeManager: nodeMgr, replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), - replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}), } for _, opt := range opts { @@ -484,8 +491,10 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds()) - r.replQueueMetric.Inc() - defer r.replQueueMetric.Dec() + if r.replInFlightMetric != nil { + r.replInFlightMetric.Inc(event.Job.VirtualStorage, event.Job.TargetNodeStorage) + defer r.replInFlightMetric.Dec(event.Job.VirtualStorage, event.Job.TargetNodeStorage) + } switch event.Job.Change { case datastore.UpdateRepo: diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index fbfcb96ab..d35ab8d4a 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -153,7 +153,8 @@ func TestProcessReplicationJob(t *testing.T) { var replicator defaultReplicator replicator.log = entry - var mockReplicationGauge promtest.MockGauge + mockReplicationGauge := promtest.NewMockStorageGauge() + var mockReplicationLatencyHistogramVec promtest.MockHistogramVec var mockReplicationDelayHistogramVec promtest.MockHistogramVec @@ -164,7 +165,7 @@ func TestProcessReplicationJob(t *testing.T) { nodeMgr, WithLatencyMetric(&mockReplicationLatencyHistogramVec), WithDelayMetric(&mockReplicationDelayHistogramVec), - WithQueueMetric(&mockReplicationGauge), + WithInFlightJobsGauge(mockReplicationGauge), ) replMgr.replicator = replicator diff --git a/internal/testhelper/promtest/gauge.go b/internal/testhelper/promtest/gauge.go index 7036280bb..084655e11 100644 --- a/internal/testhelper/promtest/gauge.go +++ b/internal/testhelper/promtest/gauge.go @@ -40,3 +40,25 @@ func (m *MockGauge) Dec() { m.Value-- m.decs++ } + +// MockStorageGauge wraps a MockGauge +type MockStorageGauge struct { + *MockGauge +} + +// NewMockStorageGauge returns an initialized mock storage gauge +func NewMockStorageGauge() *MockStorageGauge { + return &MockStorageGauge{ + &MockGauge{}, + } +} + +// Inc will track total calls to this method while ignoring params +func (m *MockStorageGauge) Inc(_, _ string) { + m.MockGauge.Inc() +} + +// Dec will track total calls to this method while ignoring params +func (m *MockStorageGauge) Dec(_, _ string) { + m.MockGauge.Dec() +} |