diff options
author | John Cai <jcai@gitlab.com> | 2019-12-22 01:27:08 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-06 20:29:02 +0300 |
commit | c93d3ceef98e6641b3dd0dd0e841ebd8d23151a1 (patch) | |
tree | b41b7d51e09efa4a139d489e19f8629a51d66d63 | |
parent | 002139d7600998bb4703c3a8fcd87673bce83386 (diff) |
Refactor to prometheus test package
-rw-r--r-- | changelogs/unreleased/jc-fix-replication-metrics.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 24 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/metrics/prom.go | 48 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 45 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 28 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 36 | ||||
-rw-r--r-- | internal/testhelper/metrics.go | 36 | ||||
-rw-r--r-- | internal/testhelper/promtest/gauge.go | 42 | ||||
-rw-r--r-- | internal/testhelper/promtest/histogram.go | 18 |
10 files changed, 166 insertions, 120 deletions
diff --git a/changelogs/unreleased/jc-fix-replication-metrics.yml b/changelogs/unreleased/jc-fix-replication-metrics.yml new file mode 100644 index 000000000..e4bec2231 --- /dev/null +++ b/changelogs/unreleased/jc-fix-replication-metrics.yml @@ -0,0 +1,5 @@ +--- +title: Use configurable buckets for praefect replication metrics +merge_request: 1719 +author: +type: changed diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index eaf0a00a9..451c56e65 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -87,8 +87,6 @@ func configure() (config.Config, error) { logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener") conf.Prometheus.Configure() - metrics.Register(conf.Prometheus) - go func() { if err := monitoring.Serve( monitoring.WithListenerAddress(conf.PrometheusListenAddr), @@ -119,11 +117,27 @@ func run(cfgs []starter.Config, conf config.Config) error { } } + latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) + if err != nil { + return err + } + + queueMetric, err := metrics.RegisterReplicationJobsInFlight() + if err != nil { + return err + } + var ( // top level server dependencies - ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) - repl = praefect.NewReplMgr("default", logger, ds, clientConnections) + ds = datastore.NewInMemory(conf) + coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr( + "default", + logger, + ds, + clientConnections, + praefect.WithLatencyMetric(latencyMetric), + praefect.WithQueueMetric(queueMetric)) srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) serverErrors = make(chan error, 1) ) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index f0058be82..6e8791afb 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/service/repository" gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -184,8 +185,9 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client logEntry, ds, clientCC, + WithQueueMetric(&promtest.MockGauge{}), + WithLatencyMetric(&promtest.MockHistogram{}), ) - prf := NewServer( coordinator, replmgr, diff --git a/internal/praefect/metrics/prom.go b/internal/praefect/metrics/prom.go deleted file mode 100644 index cb7ada15a..000000000 --- a/internal/praefect/metrics/prom.go +++ /dev/null @@ -1,48 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" - promconfig "gitlab.com/gitlab-org/gitaly/internal/config/prometheus" -) - -var ( - replicationLatency prometheus.Histogram - - replicationJobsInFlight = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "praefect", - Name: "replication_jobs", - }, - ) - - // RecordReplicationLatency records replication latency - RecordReplicationLatency = func(d float64) { - go replicationLatency.Observe(d) - } - - // IncReplicationJobsInFlight increases the gauge that keeps track of in flight replication jobs - IncReplicationJobsInFlight = func() { - go replicationJobsInFlight.Inc() - } - - // DecReplicationJobsInFlight decreases the gauge that keeps track of in flight replication jobs - DecReplicationJobsInFlight = func() { - go replicationJobsInFlight.Dec() - } -) - -// Register registers praefect prometheus metrics -func Register(conf promconfig.Config) { - replicationLatency = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "praefect", - Name: "replication_latency", - Buckets: conf.GRPCLatencyBuckets, - }, - ) - - prometheus.MustRegister(replicationLatency) - prometheus.MustRegister(replicationJobsInFlight) -} diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go new file mode 100644 index 000000000..fb53fe277 --- /dev/null +++ b/internal/praefect/metrics/prometheus.go @@ -0,0 +1,45 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + promconfig "gitlab.com/gitlab-org/gitaly/internal/config/prometheus" +) + +// RegisterReplicationLatency creates and registers a prometheus histogram +// to observe replication latency times +func RegisterReplicationLatency(conf promconfig.Config) (Histogram, error) { + replicationLatency := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "replication_latency", + Buckets: conf.GRPCLatencyBuckets, + }, + ) + + return replicationLatency, prometheus.Register(replicationLatency) +} + +// RegisterReplicationJobsInFlight creates and registers a gauge +// to track the size of the replication queue +func RegisterReplicationJobsInFlight() (Gauge, error) { + replicationJobsInFlight := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "replication_jobs", + }, + ) + return replicationJobsInFlight, prometheus.Register(replicationJobsInFlight) +} + +// Gauge is a subset of a prometheus Gauge +type Gauge interface { + Inc() + Dec() +} + +// Histogram is a subset of a prometheus Histogram +type Histogram interface { + Observe(float64) +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 1817f1193..f99e97018 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -5,11 +5,13 @@ import ( "fmt" "time" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "golang.org/x/sync/errgroup" @@ -127,6 +129,8 @@ type ReplMgr struct { clientConnections *conn.ClientConnections targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic + replQueueMetric metrics.Gauge + replLatencyMetric metrics.Histogram // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} @@ -135,6 +139,20 @@ type ReplMgr struct { // ReplMgrOpt allows a replicator to be configured with additional options type ReplMgrOpt func(*ReplMgr) +// WithQueueMetric is an option to set the queue size prometheus metric +func WithQueueMetric(g metrics.Gauge) func(*ReplMgr) { + return func(m *ReplMgr) { + m.replQueueMetric = g + } +} + +// WithLatencyMetric is an option to set the queue size prometheus metric +func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) { + return func(m *ReplMgr) { + m.replLatencyMetric = h + } +} + // NewReplMgr initializes a replication manager with the provided dependencies // and options func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { @@ -145,6 +163,8 @@ func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datast replicator: defaultReplicator{log}, targetNode: targetNode, clientConnections: c, + replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), + replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}), } for _, opt := range opts { @@ -278,8 +298,8 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) { } replStart := time.Now() - metrics.IncReplicationJobsInFlight() - defer metrics.DecReplicationJobsInFlight() + r.replQueueMetric.Inc() + defer r.replQueueMetric.Dec() switch job.Change { case datastore.UpdateRepo: @@ -295,7 +315,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) { } replDuration := time.Since(replStart) - metrics.RecordReplicationLatency(float64(replDuration / time.Millisecond)) + r.replLatencyMetric.Observe(float64(replDuration / time.Millisecond)) if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil { l.WithError(err).Error("error when updating replication job status to complete") diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index ba0fb6f6f..fcb8d64ac 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -9,8 +9,6 @@ import ( "testing" "github.com/stretchr/testify/require" - "google.golang.org/grpc" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" @@ -21,25 +19,12 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" ) func TestProceessReplicationJob(t *testing.T) { - mockReplicationLatency := &testhelper.MockHistogram{} - mockReplicationGauge := &testhelper.MockGauge{} - - recordReplicationLatency = func(d float64) { - mockReplicationLatency.Observe(d) - } - - incReplicationJobsInFlight = func() { - mockReplicationGauge.Inc() - } - - decReplicationJobsInFlight = func() { - mockReplicationGauge.Dec() - } - srv, srvSocketPath := runFullGitalyServer(t) defer srv.Stop() @@ -116,21 +101,20 @@ func TestProceessReplicationJob(t *testing.T) { clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token) clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token) - replMgr := &ReplMgr{ - log: gitaly_log.Default(), - datastore: ds, - clientConnections: clientCC, - replicator: replicator, - } + var mockReplicationGauge promtest.MockGauge + var mockReplicationHistogram promtest.MockHistogram + + replMgr := NewReplMgr("", gitaly_log.Default(), ds, clientCC, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) + replMgr.replicator = replicator replMgr.processReplJob(ctx, jobs[0]) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) - require.Equal(t, 1, mockReplicationGauge.IncrementCalled) - require.Equal(t, 1, mockReplicationGauge.DecrementCalled) - require.Len(t, mockReplicationLatency.Values, 1) + require.Equal(t, 1, mockReplicationGauge.IncsCalled()) + require.Equal(t, 1, mockReplicationGauge.DecsCalled()) + require.Len(t, mockReplicationHistogram.Values, 1) } func TestConfirmReplication(t *testing.T) { diff --git a/internal/testhelper/metrics.go b/internal/testhelper/metrics.go deleted file mode 100644 index a7359332e..000000000 --- a/internal/testhelper/metrics.go +++ /dev/null @@ -1,36 +0,0 @@ -package testhelper - -import "sync" - -// MockGauge is a simplified prometheus gauge with Inc and Dec that can be inspected -type MockGauge struct { - sync.Mutex - IncrementCalled, DecrementCalled int -} - -// Inc increments the IncrementCalled counter -func (mg *MockGauge) Inc() { - mg.Lock() - defer mg.Unlock() - mg.IncrementCalled++ -} - -// Dec increments the DecrementCalled counter -func (mg *MockGauge) Dec() { - mg.Lock() - defer mg.Unlock() - mg.DecrementCalled++ -} - -// MockHistogram is a simplified prometheus histogram with Observe -type MockHistogram struct { - sync.Mutex - Values []float64 -} - -// Observe adds a value to the Values slice -func (mh *MockHistogram) Observe(d float64) { - mh.Lock() - defer mh.Unlock() - mh.Values = append(mh.Values, d) -} diff --git a/internal/testhelper/promtest/gauge.go b/internal/testhelper/promtest/gauge.go new file mode 100644 index 000000000..7036280bb --- /dev/null +++ b/internal/testhelper/promtest/gauge.go @@ -0,0 +1,42 @@ +package promtest + +import ( + "sync" +) + +// MockGauge is a mock gauge that adheres to prometheus.Gauge for use in unit tests +type MockGauge struct { + m sync.RWMutex + Value float64 + incs, decs int +} + +// IncsCalled gives the number of times Inc() was been called +func (m *MockGauge) IncsCalled() int { + m.m.RLock() + defer m.m.RUnlock() + return m.incs +} + +// DecsCalled gives the number of times Inc() was been called +func (m *MockGauge) DecsCalled() int { + m.m.RLock() + defer m.m.RUnlock() + return m.decs +} + +// Inc increments the gauge value +func (m *MockGauge) Inc() { + m.m.Lock() + defer m.m.Unlock() + m.Value++ + m.incs++ +} + +// Dec decrements the gauge value +func (m *MockGauge) Dec() { + m.m.Lock() + defer m.m.Unlock() + m.Value-- + m.decs++ +} diff --git a/internal/testhelper/promtest/histogram.go b/internal/testhelper/promtest/histogram.go new file mode 100644 index 000000000..b35d58e0a --- /dev/null +++ b/internal/testhelper/promtest/histogram.go @@ -0,0 +1,18 @@ +package promtest + +import ( + "sync" +) + +// MockHistogram is a mock histogram that adheres to prometheus.Histogram for use in unit tests +type MockHistogram struct { + m sync.RWMutex + Values []float64 +} + +// Observe observes a value for the mock histogram +func (m *MockHistogram) Observe(v float64) { + m.m.Lock() + defer m.m.Unlock() + m.Values = append(m.Values, v) +} |