diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-08-13 12:10:35 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-08-17 13:19:24 +0300 |
commit | 534f05289ae1dfae09e18abb597374b93f5147fb (patch) | |
tree | beb4e5b97ba5cb6c0c6fe2b198928f3832403c68 | |
parent | d4ab5ef2168db11f97918ceab78f3c7503e4db80 (diff) |
transactions: Setup metrics in transaction manager
Right now, setup of metrics used in the transaction manager is split
across multiple locations. This makes the process of adding new metrics
more involved than it needs to be and is a source of bugs in case any of
those locations is not updated.
Improve the situation by moving setup of metrics into the transaction
manager. Metrics are exposed by implementing the Collector interface and
registering the transaction manager itself as a metric.
-rw-r--r-- | cmd/praefect/main.go | 24 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 44 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 67 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 72 |
10 files changed, 91 insertions, 144 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 4783279bc..bdedccba7 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -243,27 +243,6 @@ func run(cfgs []starter.Config, conf config.Config) error { nodeManager.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration()) logger.Info("background started: gitaly nodes health monitoring") - transactionCounterMetric, err := metrics.RegisterTransactionCounter() - if err != nil { - return err - } - - transactionDelayMetric, err := metrics.RegisterTransactionDelay(conf.Prometheus) - if err != nil { - return err - } - - subtransactionsHistogram, err := metrics.RegisterSubtransactionsHistogram() - if err != nil { - return err - } - - transactionManager := transactions.NewManager( - transactions.WithCounterMetric(transactionCounterMetric), - transactions.WithDelayMetric(transactionDelayMetric), - transactions.WithSubtransactionsMetric(subtransactionsHistogram), - ) - transactionVoters, err := metrics.RegisterTransactionVoters(conf) if err != nil { return err @@ -271,6 +250,8 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies + transactionManager = transactions.NewManager(conf) + coordinator = praefect.NewCoordinator( queue, rs, @@ -303,6 +284,7 @@ func run(cfgs []starter.Config, conf config.Config) error { ) prometheus.MustRegister( + transactionManager, repl, datastore.NewRepositoryStoreCollector(logger, rs, nodeManager), ) diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f93589ad9..cfe119a58 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -171,7 +171,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) registry, err := protoregistry.New(fd) require.NoError(t, err) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e8a4644f4..134360901 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -92,7 +92,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }}, }, nil }}, - transactions.NewManager(), + transactions.NewManager(conf), conf, protoregistry.GitalyProtoPreregistered, ) @@ -155,7 +155,7 @@ func TestStreamDirectorMutator(t *testing.T) { require.NoError(t, err) nodeMgr.Start(0, time.Hour) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( queueInterceptor, @@ -357,7 +357,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { waitNodeToChangeHealthStatus(ctx, t, node, true) } - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) // set up the generations prior to transaction rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) @@ -494,7 +494,7 @@ func TestStreamDirectorAccessor(t *testing.T) { require.NoError(t, err) nodeMgr.Start(0, time.Minute) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( queue, @@ -583,7 +583,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { require.NoError(t, err) nodeMgr.Start(0, time.Minute) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( queue, @@ -782,7 +782,7 @@ func TestAbsentCorrelationID(t *testing.T) { require.NoError(t, err) nodeMgr.Start(0, time.Hour) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( queueInterceptor, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 551b55d3c..58107b4be 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -175,7 +175,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { return runPraefectServer(t, conf, buildOptions{ withQueue: queue, - withTxMgr: transactions.NewManager(), + withTxMgr: transactions.NewManager(conf), withBackends: withRealGitalyShared(t), }) } @@ -184,8 +184,8 @@ func defaultQueue(conf config.Config) datastore.ReplicationEventQueue { return datastore.NewMemoryReplicationEventQueue(conf) } -func defaultTxMgr() *transactions.Manager { - return transactions.NewManager() +func defaultTxMgr(conf config.Config) *transactions.Manager { + return transactions.NewManager(conf) } func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager { @@ -209,7 +209,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withRepoStore = defaultRepoStore(conf) } if opt.withTxMgr == nil { - opt.withTxMgr = defaultTxMgr() + opt.withTxMgr = defaultTxMgr(conf) } if opt.withBackends != nil { cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...) diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index fc26eaad8..41fa120a6 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -54,50 +54,6 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) { return nodeLatency, prometheus.Register(nodeLatency) } -// RegisterTransactionCounter creates and registers a Prometheus counter to -// track the number of transactions and their outcomes. -func RegisterTransactionCounter() (*prometheus.CounterVec, error) { - transactionCounter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "gitaly", - Subsystem: "praefect", - Name: "transactions_total", - Help: "Total number of transaction actions", - }, - []string{"action"}, - ) - return transactionCounter, prometheus.Register(transactionCounter) -} - -// RegisterTransactionDelay creates and registers a Prometheus histogram to -// track the delay of actions performed on transactions. -func RegisterTransactionDelay(conf promconfig.Config) (metrics.HistogramVec, error) { - transactionDelay := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "praefect", - Name: "transactions_delay_seconds", - Help: "Delay between casting a vote and reaching quorum", - Buckets: conf.GRPCLatencyBuckets, - }, - []string{"action"}, - ) - return transactionDelay, prometheus.Register(transactionDelay) -} - -// RegisterSubtransactionsHistogram creates and registers a Prometheus counter to -// gauge the number of subtransactions per transaction. -func RegisterSubtransactionsHistogram() (metrics.Histogram, error) { - subtransactionsHistogram := prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "gitaly_praefect_subtransactions_per_transaction_total", - Help: "The number of subtransactions created for a single registered transaction", - Buckets: []float64{0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0}, - }, - ) - return subtransactionsHistogram, prometheus.Register(subtransactionsHistogram) -} - // RegisterTransactionVoters creates and registers a Prometheus counter to gauge // the number of voters per transaction. func RegisterTransactionVoters(conf config.Config) (metrics.HistogramVec, error) { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index ee8aaf464..9d98a3e7b 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -226,7 +226,7 @@ func TestPropagateReplicationJob(t *testing.T) { require.NoError(t, err) nodeMgr.Start(0, time.Hour) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 293659c46..a453ef718 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -82,7 +82,7 @@ func TestServerFactory(t *testing.T) { nodeMgr, err := nodes.NewManager(logger, conf, nil, datastore.NewMemoryRepositoryStore(conf.StorageNames()), &promtest.MockHistogramVec{}) require.NoError(t, err) nodeMgr.Start(0, time.Second) - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(conf) registry := protoregistry.GitalyProtoPreregistered rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, registry) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 696c31acb..0df9fdf93 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -748,7 +748,7 @@ func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, * } func TestProxyWrites(t *testing.T) { - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(config.Config{}) smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr} diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index e997157a6..71f5e3bc0 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -1,16 +1,17 @@ package praefect import ( + "bytes" "crypto/sha1" "fmt" "sync" "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -26,9 +27,9 @@ type voter struct { shouldSucceed bool } -func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { +func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) - txMgr := transactions.NewManager(opts...) + txMgr := transactions.NewManager(conf) cc, _, cleanup := runPraefectServer(t, conf, buildOptions{ withTxMgr: txMgr, withNodeMgr: nullNodeMgr{}, // to suppress node address issues @@ -36,40 +37,38 @@ func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (* return cc, txMgr, cleanup } -func setupMetrics() (*prometheus.CounterVec, []transactions.ManagerOpt) { - counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"status"}) - return counter, []transactions.ManagerOpt{ - transactions.WithCounterMetric(counter), - } -} - type counterMetrics struct { registered, started, invalid, committed int } -func verifyCounterMetrics(t *testing.T, counter *prometheus.CounterVec, expected counterMetrics) { +func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) { t.Helper() - registered, err := counter.GetMetricWithLabelValues("registered") - require.NoError(t, err) - require.Equal(t, float64(expected.registered), testutil.ToFloat64(registered)) - - started, err := counter.GetMetricWithLabelValues("started") - require.NoError(t, err) - require.Equal(t, float64(expected.started), testutil.ToFloat64(started)) + metrics := []struct { + name string + value int + }{ + {"registered", expected.registered}, + {"started", expected.started}, + {"invalid", expected.invalid}, + {"committed", expected.committed}, + } - invalid, err := counter.GetMetricWithLabelValues("invalid") - require.NoError(t, err) - require.Equal(t, float64(expected.invalid), testutil.ToFloat64(invalid)) + var expectedMetric bytes.Buffer + expectedMetric.WriteString("# HELP gitaly_praefect_transactions_total Total number of transaction actions\n") + expectedMetric.WriteString("# TYPE gitaly_praefect_transactions_total counter\n") + for _, metric := range metrics { + if metric.value == 0 { + continue + } + expectedMetric.WriteString(fmt.Sprintf("gitaly_praefect_transactions_total{action=\"%s\"} %d\n", metric.name, metric.value)) + } - committed, err := counter.GetMetricWithLabelValues("committed") - require.NoError(t, err) - require.Equal(t, float64(expected.committed), testutil.ToFloat64(committed)) + require.NoError(t, testutil.CollectAndCompare(manager, &expectedMetric, "gitaly_praefect_transactions_total")) } func TestTransactionSucceeds(t *testing.T) { - counter, opts := setupMetrics() - cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) defer cleanup() ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second)) @@ -95,7 +94,7 @@ func TestTransactionSucceeds(t *testing.T) { require.NoError(t, err) require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State) - verifyCounterMetrics(t, counter, counterMetrics{ + verifyCounterMetrics(t, txMgr, counterMetrics{ registered: 1, started: 1, committed: 1, @@ -249,7 +248,7 @@ func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) { ctx, cleanup := testhelper.Context() defer cleanup() - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(config.Config{}) _, _, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{}, 1) require.Equal(t, transactions.ErrMissingNodes, err) @@ -298,7 +297,7 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) { ctx, cleanup := testhelper.Context() defer cleanup() - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(config.Config{}) for _, tc := range tc { t.Run(tc.desc, func(t *testing.T) { @@ -552,8 +551,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) { } func TestTransactionFailures(t *testing.T) { - counter, opts := setupMetrics() - cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...) + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) defer cleanup() ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second)) @@ -570,7 +568,7 @@ func TestTransactionFailures(t *testing.T) { require.Error(t, err) require.Equal(t, codes.NotFound, status.Code(err)) - verifyCounterMetrics(t, counter, counterMetrics{ + verifyCounterMetrics(t, txMgr, counterMetrics{ started: 1, invalid: 1, }) @@ -623,8 +621,7 @@ func TestTransactionCancellation(t *testing.T) { for _, tc := range testcases { t.Run(tc.desc, func(t *testing.T) { - counter, opts := setupMetrics() - cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) defer cleanup() ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second)) @@ -679,7 +676,7 @@ func TestTransactionCancellation(t *testing.T) { require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state") } - verifyCounterMetrics(t, counter, tc.expectedMetrics) + verifyCounterMetrics(t, txMgr, tc.expectedMetrics) }) } } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index c3527db16..b663c4827 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -14,7 +14,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" ) var ErrNotFound = errors.New("transaction not found") @@ -27,8 +27,8 @@ type Manager struct { lock sync.Mutex transactions map[uint64]*Transaction counterMetric *prometheus.CounterVec - delayMetric metrics.HistogramVec - subtransactionsMetric metrics.Histogram + delayMetric *prometheus.HistogramVec + subtransactionsMetric prometheus.Histogram } // TransactionIDGenerator is an interface for types that can generate transaction IDs. @@ -61,27 +61,6 @@ func (t *transactionIDGenerator) ID() uint64 { // ManagerOpt is a self referential option for Manager type ManagerOpt func(*Manager) -// WithCounterMetric is an option to set the counter Prometheus metric -func WithCounterMetric(counterMetric *prometheus.CounterVec) ManagerOpt { - return func(mgr *Manager) { - mgr.counterMetric = counterMetric - } -} - -// WithDelayMetric is an option to set the delay Prometheus metric -func WithDelayMetric(delayMetric metrics.HistogramVec) ManagerOpt { - return func(mgr *Manager) { - mgr.delayMetric = delayMetric - } -} - -// WithSubtransactionsMetric is an option to set the subtransactions Prometheus metric -func WithSubtransactionsMetric(subtransactionsMetric metrics.Histogram) ManagerOpt { - return func(mgr *Manager) { - mgr.subtransactionsMetric = subtransactionsMetric - } -} - // WithTransactionIDGenerator is an option to set the transaction ID generator func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { return func(mgr *Manager) { @@ -90,13 +69,36 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { } // NewManager creates a new transactions Manager. -func NewManager(opts ...ManagerOpt) *Manager { +func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { mgr := &Manager{ - txIDGenerator: newTransactionIDGenerator(), - transactions: make(map[uint64]*Transaction), - counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}), - delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}), - subtransactionsMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), + txIDGenerator: newTransactionIDGenerator(), + transactions: make(map[uint64]*Transaction), + counterMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "transactions_total", + Help: "Total number of transaction actions", + }, + []string{"action"}, + ), + delayMetric: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "transactions_delay_seconds", + Help: "Delay between casting a vote and reaching quorum", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"action"}, + ), + subtransactionsMetric: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitaly_praefect_subtransactions_per_transaction_total", + Help: "The number of subtransactions created for a single registered transaction", + Buckets: []float64{0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0}, + }, + ), } for _, opt := range opts { @@ -106,6 +108,16 @@ func NewManager(opts ...ManagerOpt) *Manager { return mgr } +func (mgr *Manager) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(mgr, descs) +} + +func (mgr *Manager) Collect(metrics chan<- prometheus.Metric) { + mgr.counterMetric.Collect(metrics) + mgr.delayMetric.Collect(metrics) + mgr.subtransactionsMetric.Collect(metrics) +} + func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { return ctxlogrus.Extract(ctx).WithField("component", "transactions.Manager") } |