diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-10 17:51:59 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-10 18:18:35 +0300 |
commit | 77c96d3f6a5038a52b724f7a78b7a61f1e824ece (patch) | |
tree | fac7350e36ddbecafdad9d5191b04bf865fde7ea | |
parent | 74fa51387fbc4ac199309f5c0461d4cdde2a1a2f (diff) |
include change type in in-flight replication jobs gauge
Includes replication job change type as a label in in-flight
replication jobs gauge.
-rw-r--r-- | changelogs/unreleased/smh-change-type.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 9 | ||||
-rw-r--r-- | internal/praefect/metrics/metrics.go | 42 | ||||
-rw-r--r-- | internal/praefect/metrics/metrics_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 7 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 50 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 13 |
7 files changed, 40 insertions, 96 deletions
diff --git a/changelogs/unreleased/smh-change-type.yml b/changelogs/unreleased/smh-change-type.yml new file mode 100644 index 000000000..1bb8dea82 --- /dev/null +++ b/changelogs/unreleased/smh-change-type.yml @@ -0,0 +1,5 @@ +--- +title: Include change type as a label on in-flight replication jobs gauge +merge_request: 2373 +author: +type: changed diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index d52a4c308..0f42edc77 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -89,6 +89,7 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/bootstrap" "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" @@ -213,11 +214,6 @@ func run(cfgs []starter.Config, conf config.Config) error { return err } - storageJobs, err := metrics.RegisterReplicationJobsInFlightByStorage() - if err != nil { - return err - } - var db *sql.DB if conf.NeedsSQL() { @@ -267,7 +263,6 @@ func run(cfgs []starter.Config, conf config.Config) error { nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), - praefect.WithInFlightJobsGauge(storageJobs), ) srvFactory = praefect.NewServerFactory( conf, @@ -280,6 +275,8 @@ func run(cfgs []starter.Config, conf config.Config) error { ) ) + prometheus.MustRegister(repl) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/praefect/metrics/metrics.go b/internal/praefect/metrics/metrics.go deleted file mode 100644 index 8b59274e8..000000000 --- a/internal/praefect/metrics/metrics.go +++ /dev/null @@ -1,42 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -const ( - namespace = "gitaly" - subsystem = "praefect" - labelVirtualStorage = "virtual_storage" - labelGitalyStorage = "gitaly_storage" -) - -// StorageGauge is a metric wrapper that abstracts and simplifies the interface -// of the underlying type. It is intended for gauges that are scoped by virtual -// storage and by Gitaly storage. -type StorageGauge struct { - gv *prometheus.GaugeVec -} - -func newStorageGauge(name string) StorageGauge { - sg := StorageGauge{} - sg.gv = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: name, - }, - []string{labelVirtualStorage, labelGitalyStorage}, - ) - return sg -} - -// Inc will inc the gauge for the specified virtual and gitaly storage -func (sg StorageGauge) Inc(virtualStorage, gitalyStorage string) { - sg.gv.WithLabelValues(virtualStorage, gitalyStorage).Inc() -} - -// Dec will dec the gauge for the specified virtual and gitaly storage -func (sg StorageGauge) Dec(virtualStorage, gitalyStorage string) { - sg.gv.WithLabelValues(virtualStorage, gitalyStorage).Dec() -} diff --git a/internal/praefect/metrics/metrics_test.go b/internal/praefect/metrics/metrics_test.go deleted file mode 100644 index 0e6d0e767..000000000 --- a/internal/praefect/metrics/metrics_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package metrics - -import "testing" - -func TestStorageGauge(t *testing.T) { - sg := newStorageGauge("test") - // the following will panic if the number of labels is wrong: - sg.Inc("storage-1", "gitaly-1") - sg.Dec("storage-1", "gitaly-1") -} diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index 7db03bf7b..d552d5f4d 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -53,13 +53,6 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) { return nodeLatency, prometheus.Register(nodeLatency) } -// RegisterReplicationJobsInFlightByStorage creates and registers a storage -// gauge for tracking the in flight replication jobs -func RegisterReplicationJobsInFlightByStorage() (StorageGauge, error) { - sg := newStorageGauge("replication_jobs") - return sg, prometheus.Register(sg.gv) -} - // RegisterTransactionCounter creates and registers a Prometheus counter to // track the number of transactions and their outcomes. func RegisterTransactionCounter() (*prometheus.CounterVec, error) { diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index b6a64b39f..a52faad23 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -245,13 +245,6 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, return primaryChecksum == replicaChecksum, nil } -// StorageGauge is used to track the in-flight replication jobs by virtual and -// Gitaly target storage -type StorageGauge interface { - Inc(virtualStorage, gitalyStorage string) - Dec(virtualStorage, gitalyStorage string) -} - // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Entry @@ -259,7 +252,7 @@ type ReplMgr struct { nodeManager nodes.Manager virtualStorages []string // replicas this replicator is responsible for replicator Replicator // does the actual replication logic - replInFlightMetric StorageGauge + replInFlightMetric *prometheus.GaugeVec replLatencyMetric prommetrics.HistogramVec replDelayMetric prommetrics.HistogramVec replJobTimeout time.Duration @@ -270,14 +263,6 @@ type ReplMgr struct { // ReplMgrOpt allows a replicator to be configured with additional options type ReplMgrOpt func(*ReplMgr) -// WithInFlightJobsGauge is an option to set the replication jobs in-flight -// gauge -func WithInFlightJobsGauge(sg StorageGauge) ReplMgrOpt { - return func(m *ReplMgr) { - m.replInFlightMetric = sg - } -} - // WithLatencyMetric is an option to set the latency prometheus metric func WithLatencyMetric(h prommetrics.HistogramVec) func(*ReplMgr) { return func(m *ReplMgr) { @@ -296,12 +281,18 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { // and options func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ - log: log.WithField("component", "replication_manager"), - queue: queue, - whitelist: map[string]struct{}{}, - replicator: defaultReplicator{log}, - virtualStorages: virtualStorages, - nodeManager: nodeMgr, + log: log.WithField("component", "replication_manager"), + queue: queue, + whitelist: map[string]struct{}{}, + replicator: defaultReplicator{log}, + virtualStorages: virtualStorages, + nodeManager: nodeMgr, + replInFlightMetric: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gitaly_praefect_replication_jobs", + Help: "Number of replication jobs in flight.", + }, []string{"virtual_storage", "gitaly_storage", "change_type"}, + ), replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), } @@ -313,6 +304,14 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep return r } +func (r ReplMgr) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(r, ch) +} + +func (r ReplMgr) Collect(ch chan<- prometheus.Metric) { + r.replInFlightMetric.Collect(ch) +} + // WithWhitelist will configure a whitelist for repos to allow replication func WithWhitelist(whitelistedRepos []string) ReplMgrOpt { return func(r *ReplMgr) { @@ -524,10 +523,9 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds()) - if r.replInFlightMetric != nil { - r.replInFlightMetric.Inc(event.Job.VirtualStorage, event.Job.TargetNodeStorage) - defer r.replInFlightMetric.Dec(event.Job.VirtualStorage, event.Job.TargetNodeStorage) - } + inFlightGauge := r.replInFlightMetric.WithLabelValues(event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.Change.String()) + inFlightGauge.Inc() + defer inFlightGauge.Dec() switch event.Job.Change { case datastore.UpdateRepo: diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 019df5963..a61838d85 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -7,11 +7,13 @@ import ( "net" "os" "path/filepath" + "strings" "sync/atomic" "testing" "time" "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" @@ -130,6 +132,7 @@ func TestProcessReplicationJob(t *testing.T) { for _, secondary := range shard.Secondaries { events = append(events, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + VirtualStorage: "default", Change: datastore.UpdateRepo, TargetNodeStorage: secondary.GetStorage(), SourceNodeStorage: shard.Primary.GetStorage(), @@ -149,8 +152,6 @@ func TestProcessReplicationJob(t *testing.T) { var replicator defaultReplicator replicator.log = entry - mockReplicationGauge := promtest.NewMockStorageGauge() - var mockReplicationLatencyHistogramVec promtest.MockHistogramVec var mockReplicationDelayHistogramVec promtest.MockHistogramVec @@ -161,7 +162,6 @@ func TestProcessReplicationJob(t *testing.T) { nodeMgr, WithLatencyMetric(&mockReplicationLatencyHistogramVec), WithDelayMetric(&mockReplicationDelayHistogramVec), - WithInFlightJobsGauge(mockReplicationGauge), ) replMgr.replicator = replicator @@ -176,10 +176,13 @@ func TestProcessReplicationJob(t *testing.T) { testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "gc") require.Less(t, testhelper.GetGitPackfileDirSize(t, replicatedPath), int64(100), "expect a small pack directory") - require.Equal(t, 1, mockReplicationGauge.IncsCalled()) - require.Equal(t, 1, mockReplicationGauge.DecsCalled()) require.Equal(t, mockReplicationLatencyHistogramVec.LabelsCalled(), [][]string{{"update"}}) require.Equal(t, mockReplicationDelayHistogramVec.LabelsCalled(), [][]string{{"update"}}) + require.NoError(t, testutil.CollectAndCompare(replMgr, strings.NewReader(` +# HELP gitaly_praefect_replication_jobs Number of replication jobs in flight. +# TYPE gitaly_praefect_replication_jobs gauge +gitaly_praefect_replication_jobs{change_type="update",gitaly_storage="backup",virtual_storage="default"} 0 +`))) } func TestPropagateReplicationJob(t *testing.T) { |