diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-04-06 20:19:53 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-04-06 20:19:53 +0300 |
commit | b28852de4ab1818d6c0ef199b36d4c2e82710caf (patch) | |
tree | 4bfeefd079a4b09c5d46da0415a1b86db120217a | |
parent | 07e565d52335026ac206843d340898eb46222a2e (diff) |
praefect: Convert replication latency metric to use a histogram vector
The replication latency metric is currently a simple histogram that
tracks the latency from starting a replication job until it's done. We
have multiple sets of replication jobs, though, which we can't discern
right now.
Convert the histogram to a histogram vector with a "type" label,
tracking what kind of replication job type we record.
-rw-r--r-- | changelogs/unreleased/pks-replication-delay-metric.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 6 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 2 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 22 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 24 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 17 |
7 files changed, 66 insertions, 11 deletions
diff --git a/changelogs/unreleased/pks-replication-delay-metric.yml b/changelogs/unreleased/pks-replication-delay-metric.yml new file mode 100644 index 000000000..6aef8d912 --- /dev/null +++ b/changelogs/unreleased/pks-replication-delay-metric.yml @@ -0,0 +1,5 @@ +--- +title: Add metric for replication delay +merge_request: 1997 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 31ddec044..9814d795b 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -161,6 +161,11 @@ func run(cfgs []starter.Config, conf config.Config) error { return err } + delayMetric, err := metrics.RegisterReplicationDelay(conf.Prometheus) + if err != nil { + return err + } + latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) if err != nil { return err @@ -205,6 +210,7 @@ func run(cfgs []starter.Config, conf config.Config) error { logger, ds, nodeManager, + praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) srv = praefect.NewServer(coordinator.StreamDirector, logger, registry, conf) diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index bbeac069b..0e5f14654 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "sync" + "time" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" @@ -101,6 +102,7 @@ type ReplJob struct { Attempts int Params Params // additional information required to run the job CorrelationID string // from original request + CreatedAt time.Time // when has the job been created? } // Datastore is a data persistence abstraction for all of Praefect's diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index ce1b1958d..1d1353cde 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -195,7 +195,6 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ds, nodeMgr, WithQueueMetric(&promtest.MockGauge{}), - WithLatencyMetric(&promtest.MockHistogram{}), ) prf := NewServer( coordinator.StreamDirector, diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index d45a70af0..47bdc1626 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -6,16 +6,34 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" ) +// RegisterReplicationDelay creates and registers a prometheus histogram +// to observe replication delay times +func RegisterReplicationDelay(conf promconfig.Config) (metrics.HistogramVec, error) { + replicationDelay := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "replication_delay", + Buckets: conf.GRPCLatencyBuckets, + }, + []string{"type"}, + ) + + return replicationDelay, prometheus.Register(replicationDelay) +} + + // RegisterReplicationLatency creates and registers a prometheus histogram // to observe replication latency times -func RegisterReplicationLatency(conf promconfig.Config) (metrics.Histogram, error) { - replicationLatency := prometheus.NewHistogram( +func RegisterReplicationLatency(conf promconfig.Config) (metrics.HistogramVec, error) { + replicationLatency := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gitaly", Subsystem: "praefect", Name: "replication_latency", Buckets: conf.GRPCLatencyBuckets, }, + []string{"type"}, ) return replicationLatency, prometheus.Register(replicationLatency) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index d21d82624..aad04b680 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -259,7 +259,8 @@ type ReplMgr struct { virtualStorage string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic replQueueMetric prommetrics.Gauge - replLatencyMetric prommetrics.Histogram + replLatencyMetric prommetrics.HistogramVec + replDelayMetric prommetrics.HistogramVec replJobTimeout time.Duration // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} @@ -275,13 +276,20 @@ func WithQueueMetric(g prommetrics.Gauge) func(*ReplMgr) { } } -// WithLatencyMetric is an option to set the queue size prometheus metric -func WithLatencyMetric(h prommetrics.Histogram) func(*ReplMgr) { +// WithLatencyMetric is an option to set the latency prometheus metric +func WithLatencyMetric(h prommetrics.HistogramVec) func(*ReplMgr) { return func(m *ReplMgr) { m.replLatencyMetric = h } } +// WithDelayMetric is an option to set the delay prometheus metric +func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { + return func(m *ReplMgr) { + m.replDelayMetric = h + } +} + // NewReplMgr initializes a replication manager with the provided dependencies // and options func NewReplMgr(virtualStorage string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { @@ -292,7 +300,8 @@ func NewReplMgr(virtualStorage string, log *logrus.Entry, datastore datastore.Da replicator: defaultReplicator{log}, virtualStorage: virtualStorage, nodeManager: nodeMgr, - replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), + replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), + replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}), } @@ -400,6 +409,7 @@ func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.Repl RelativePath: event.Job.RelativePath, Params: event.Job.Params, CorrelationID: correlationID, + CreatedAt: event.CreatedAt, } return replJob, nil @@ -509,6 +519,10 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, job.CorrelationID) replStart := time.Now() + + replDelay := replStart.Sub(job.CreatedAt) + r.replDelayMetric.WithLabelValues(job.Change.String()).Observe(replDelay.Seconds()) + r.replQueueMetric.Inc() defer r.replQueueMetric.Dec() @@ -534,7 +548,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour } replDuration := time.Since(replStart) - r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second)) + r.replLatencyMetric.WithLabelValues(job.Change.String()).Observe(replDuration.Seconds()) return nil } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 676a22401..3b45dad7b 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -153,9 +153,19 @@ func TestProcessReplicationJob(t *testing.T) { nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) var mockReplicationGauge promtest.MockGauge - var mockReplicationHistogram promtest.MockHistogram + var mockReplicationLatencyHistogramVec promtest.MockHistogramVec + var mockReplicationDelayHistogramVec promtest.MockHistogramVec + + replMgr := NewReplMgr( + "", + testhelper.DiscardTestEntry(t), + ds, + nodeMgr, + WithLatencyMetric(&mockReplicationLatencyHistogramVec), + WithDelayMetric(&mockReplicationDelayHistogramVec), + WithQueueMetric(&mockReplicationGauge), + ) - replMgr := NewReplMgr("", testhelper.DiscardTestEntry(t), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) replMgr.replicator = replicator shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) @@ -178,7 +188,8 @@ func TestProcessReplicationJob(t *testing.T) { require.Equal(t, 1, mockReplicationGauge.IncsCalled()) require.Equal(t, 1, mockReplicationGauge.DecsCalled()) - require.Len(t, mockReplicationHistogram.Values, 1) + require.Equal(t, mockReplicationLatencyHistogramVec.LabelsCalled(), [][]string{{"update"}}) + require.Equal(t, mockReplicationDelayHistogramVec.LabelsCalled(), [][]string{{"update"}}) } func TestPropagateReplicationJob(t *testing.T) { |