Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-08-13 12:10:35 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-08-17 13:19:24 +0300
commit534f05289ae1dfae09e18abb597374b93f5147fb (patch)
treebeb4e5b97ba5cb6c0c6fe2b198928f3832403c68
parentd4ab5ef2168db11f97918ceab78f3c7503e4db80 (diff)
transactions: Setup metrics in transaction manager
Right now, setup of metrics used in the transaction manager is split across multiple locations. This makes the process of adding new metrics more involved than it needs to be and is a source of bugs in case any of those locations is not updated. Improve the situation by moving setup of metrics into the transaction manager. Metrics are exposed by implementing the Collector interface and registering the transaction manager itself as a metric.
-rw-r--r--cmd/praefect/main.go24
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator_test.go12
-rw-r--r--internal/praefect/helper_test.go8
-rw-r--r--internal/praefect/metrics/prometheus.go44
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/praefect/server_factory_test.go2
-rw-r--r--internal/praefect/server_test.go2
-rw-r--r--internal/praefect/transaction_test.go67
-rw-r--r--internal/praefect/transactions/manager.go72
10 files changed, 91 insertions, 144 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 4783279bc..bdedccba7 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -243,27 +243,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
nodeManager.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration())
logger.Info("background started: gitaly nodes health monitoring")
- transactionCounterMetric, err := metrics.RegisterTransactionCounter()
- if err != nil {
- return err
- }
-
- transactionDelayMetric, err := metrics.RegisterTransactionDelay(conf.Prometheus)
- if err != nil {
- return err
- }
-
- subtransactionsHistogram, err := metrics.RegisterSubtransactionsHistogram()
- if err != nil {
- return err
- }
-
- transactionManager := transactions.NewManager(
- transactions.WithCounterMetric(transactionCounterMetric),
- transactions.WithDelayMetric(transactionDelayMetric),
- transactions.WithSubtransactionsMetric(subtransactionsHistogram),
- )
-
transactionVoters, err := metrics.RegisterTransactionVoters(conf)
if err != nil {
return err
@@ -271,6 +250,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
+ transactionManager = transactions.NewManager(conf)
+
coordinator = praefect.NewCoordinator(
queue,
rs,
@@ -303,6 +284,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
)
prometheus.MustRegister(
+ transactionManager,
repl,
datastore.NewRepositoryStoreCollector(logger, rs, nodeManager),
)
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index f93589ad9..cfe119a58 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -171,7 +171,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
registry, err := protoregistry.New(fd)
require.NoError(t, err)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index e8a4644f4..134360901 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -92,7 +92,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}},
}, nil
}},
- transactions.NewManager(),
+ transactions.NewManager(conf),
conf,
protoregistry.GitalyProtoPreregistered,
)
@@ -155,7 +155,7 @@ func TestStreamDirectorMutator(t *testing.T) {
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
queueInterceptor,
@@ -357,7 +357,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
waitNodeToChangeHealthStatus(ctx, t, node, true)
}
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
// set up the generations prior to transaction
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
@@ -494,7 +494,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
queue,
@@ -583,7 +583,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
queue,
@@ -782,7 +782,7 @@ func TestAbsentCorrelationID(t *testing.T) {
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
queueInterceptor,
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 551b55d3c..58107b4be 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -175,7 +175,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
return runPraefectServer(t, conf, buildOptions{
withQueue: queue,
- withTxMgr: transactions.NewManager(),
+ withTxMgr: transactions.NewManager(conf),
withBackends: withRealGitalyShared(t),
})
}
@@ -184,8 +184,8 @@ func defaultQueue(conf config.Config) datastore.ReplicationEventQueue {
return datastore.NewMemoryReplicationEventQueue(conf)
}
-func defaultTxMgr() *transactions.Manager {
- return transactions.NewManager()
+func defaultTxMgr(conf config.Config) *transactions.Manager {
+ return transactions.NewManager(conf)
}
func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
@@ -209,7 +209,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withRepoStore = defaultRepoStore(conf)
}
if opt.withTxMgr == nil {
- opt.withTxMgr = defaultTxMgr()
+ opt.withTxMgr = defaultTxMgr(conf)
}
if opt.withBackends != nil {
cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...)
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index fc26eaad8..41fa120a6 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -54,50 +54,6 @@ func RegisterNodeLatency(conf promconfig.Config) (metrics.HistogramVec, error) {
return nodeLatency, prometheus.Register(nodeLatency)
}
-// RegisterTransactionCounter creates and registers a Prometheus counter to
-// track the number of transactions and their outcomes.
-func RegisterTransactionCounter() (*prometheus.CounterVec, error) {
- transactionCounter := prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "gitaly",
- Subsystem: "praefect",
- Name: "transactions_total",
- Help: "Total number of transaction actions",
- },
- []string{"action"},
- )
- return transactionCounter, prometheus.Register(transactionCounter)
-}
-
-// RegisterTransactionDelay creates and registers a Prometheus histogram to
-// track the delay of actions performed on transactions.
-func RegisterTransactionDelay(conf promconfig.Config) (metrics.HistogramVec, error) {
- transactionDelay := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "praefect",
- Name: "transactions_delay_seconds",
- Help: "Delay between casting a vote and reaching quorum",
- Buckets: conf.GRPCLatencyBuckets,
- },
- []string{"action"},
- )
- return transactionDelay, prometheus.Register(transactionDelay)
-}
-
-// RegisterSubtransactionsHistogram creates and registers a Prometheus counter to
-// gauge the number of subtransactions per transaction.
-func RegisterSubtransactionsHistogram() (metrics.Histogram, error) {
- subtransactionsHistogram := prometheus.NewHistogram(
- prometheus.HistogramOpts{
- Name: "gitaly_praefect_subtransactions_per_transaction_total",
- Help: "The number of subtransactions created for a single registered transaction",
- Buckets: []float64{0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0},
- },
- )
- return subtransactionsHistogram, prometheus.Register(subtransactionsHistogram)
-}
-
// RegisterTransactionVoters creates and registers a Prometheus counter to gauge
// the number of voters per transaction.
func RegisterTransactionVoters(conf config.Config) (metrics.HistogramVec, error) {
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index ee8aaf464..9d98a3e7b 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -226,7 +226,7 @@ func TestPropagateReplicationJob(t *testing.T) {
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 293659c46..a453ef718 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -82,7 +82,7 @@ func TestServerFactory(t *testing.T) {
nodeMgr, err := nodes.NewManager(logger, conf, nil, datastore.NewMemoryRepositoryStore(conf.StorageNames()), &promtest.MockHistogramVec{})
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(conf)
registry := protoregistry.GitalyProtoPreregistered
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, registry)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 696c31acb..0df9fdf93 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -748,7 +748,7 @@ func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *
}
func TestProxyWrites(t *testing.T) {
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(config.Config{})
smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index e997157a6..71f5e3bc0 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -1,16 +1,17 @@
package praefect
import (
+ "bytes"
"crypto/sha1"
"fmt"
"sync"
"testing"
"time"
- "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -26,9 +27,9 @@ type voter struct {
shouldSucceed bool
}
-func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
+func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
conf := testConfig(1)
- txMgr := transactions.NewManager(opts...)
+ txMgr := transactions.NewManager(conf)
cc, _, cleanup := runPraefectServer(t, conf, buildOptions{
withTxMgr: txMgr,
withNodeMgr: nullNodeMgr{}, // to suppress node address issues
@@ -36,40 +37,38 @@ func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*
return cc, txMgr, cleanup
}
-func setupMetrics() (*prometheus.CounterVec, []transactions.ManagerOpt) {
- counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"status"})
- return counter, []transactions.ManagerOpt{
- transactions.WithCounterMetric(counter),
- }
-}
-
type counterMetrics struct {
registered, started, invalid, committed int
}
-func verifyCounterMetrics(t *testing.T, counter *prometheus.CounterVec, expected counterMetrics) {
+func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) {
t.Helper()
- registered, err := counter.GetMetricWithLabelValues("registered")
- require.NoError(t, err)
- require.Equal(t, float64(expected.registered), testutil.ToFloat64(registered))
-
- started, err := counter.GetMetricWithLabelValues("started")
- require.NoError(t, err)
- require.Equal(t, float64(expected.started), testutil.ToFloat64(started))
+ metrics := []struct {
+ name string
+ value int
+ }{
+ {"registered", expected.registered},
+ {"started", expected.started},
+ {"invalid", expected.invalid},
+ {"committed", expected.committed},
+ }
- invalid, err := counter.GetMetricWithLabelValues("invalid")
- require.NoError(t, err)
- require.Equal(t, float64(expected.invalid), testutil.ToFloat64(invalid))
+ var expectedMetric bytes.Buffer
+ expectedMetric.WriteString("# HELP gitaly_praefect_transactions_total Total number of transaction actions\n")
+ expectedMetric.WriteString("# TYPE gitaly_praefect_transactions_total counter\n")
+ for _, metric := range metrics {
+ if metric.value == 0 {
+ continue
+ }
+ expectedMetric.WriteString(fmt.Sprintf("gitaly_praefect_transactions_total{action=\"%s\"} %d\n", metric.name, metric.value))
+ }
- committed, err := counter.GetMetricWithLabelValues("committed")
- require.NoError(t, err)
- require.Equal(t, float64(expected.committed), testutil.ToFloat64(committed))
+ require.NoError(t, testutil.CollectAndCompare(manager, &expectedMetric, "gitaly_praefect_transactions_total"))
}
func TestTransactionSucceeds(t *testing.T) {
- counter, opts := setupMetrics()
- cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...)
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
defer cleanup()
ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
@@ -95,7 +94,7 @@ func TestTransactionSucceeds(t *testing.T) {
require.NoError(t, err)
require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State)
- verifyCounterMetrics(t, counter, counterMetrics{
+ verifyCounterMetrics(t, txMgr, counterMetrics{
registered: 1,
started: 1,
committed: 1,
@@ -249,7 +248,7 @@ func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) {
ctx, cleanup := testhelper.Context()
defer cleanup()
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(config.Config{})
_, _, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{}, 1)
require.Equal(t, transactions.ErrMissingNodes, err)
@@ -298,7 +297,7 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) {
ctx, cleanup := testhelper.Context()
defer cleanup()
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(config.Config{})
for _, tc := range tc {
t.Run(tc.desc, func(t *testing.T) {
@@ -552,8 +551,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
}
func TestTransactionFailures(t *testing.T) {
- counter, opts := setupMetrics()
- cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...)
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
defer cleanup()
ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
@@ -570,7 +568,7 @@ func TestTransactionFailures(t *testing.T) {
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
- verifyCounterMetrics(t, counter, counterMetrics{
+ verifyCounterMetrics(t, txMgr, counterMetrics{
started: 1,
invalid: 1,
})
@@ -623,8 +621,7 @@ func TestTransactionCancellation(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
- counter, opts := setupMetrics()
- cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...)
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
defer cleanup()
ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
@@ -679,7 +676,7 @@ func TestTransactionCancellation(t *testing.T) {
require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state")
}
- verifyCounterMetrics(t, counter, tc.expectedMetrics)
+ verifyCounterMetrics(t, txMgr, tc.expectedMetrics)
})
}
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index c3527db16..b663c4827 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -14,7 +14,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
)
var ErrNotFound = errors.New("transaction not found")
@@ -27,8 +27,8 @@ type Manager struct {
lock sync.Mutex
transactions map[uint64]*Transaction
counterMetric *prometheus.CounterVec
- delayMetric metrics.HistogramVec
- subtransactionsMetric metrics.Histogram
+ delayMetric *prometheus.HistogramVec
+ subtransactionsMetric prometheus.Histogram
}
// TransactionIDGenerator is an interface for types that can generate transaction IDs.
@@ -61,27 +61,6 @@ func (t *transactionIDGenerator) ID() uint64 {
// ManagerOpt is a self referential option for Manager
type ManagerOpt func(*Manager)
-// WithCounterMetric is an option to set the counter Prometheus metric
-func WithCounterMetric(counterMetric *prometheus.CounterVec) ManagerOpt {
- return func(mgr *Manager) {
- mgr.counterMetric = counterMetric
- }
-}
-
-// WithDelayMetric is an option to set the delay Prometheus metric
-func WithDelayMetric(delayMetric metrics.HistogramVec) ManagerOpt {
- return func(mgr *Manager) {
- mgr.delayMetric = delayMetric
- }
-}
-
-// WithSubtransactionsMetric is an option to set the subtransactions Prometheus metric
-func WithSubtransactionsMetric(subtransactionsMetric metrics.Histogram) ManagerOpt {
- return func(mgr *Manager) {
- mgr.subtransactionsMetric = subtransactionsMetric
- }
-}
-
// WithTransactionIDGenerator is an option to set the transaction ID generator
func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt {
return func(mgr *Manager) {
@@ -90,13 +69,36 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt {
}
// NewManager creates a new transactions Manager.
-func NewManager(opts ...ManagerOpt) *Manager {
+func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager {
mgr := &Manager{
- txIDGenerator: newTransactionIDGenerator(),
- transactions: make(map[uint64]*Transaction),
- counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}),
- delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}),
- subtransactionsMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}),
+ txIDGenerator: newTransactionIDGenerator(),
+ transactions: make(map[uint64]*Transaction),
+ counterMetric: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "transactions_total",
+ Help: "Total number of transaction actions",
+ },
+ []string{"action"},
+ ),
+ delayMetric: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "transactions_delay_seconds",
+ Help: "Delay between casting a vote and reaching quorum",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"action"},
+ ),
+ subtransactionsMetric: prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Name: "gitaly_praefect_subtransactions_per_transaction_total",
+ Help: "The number of subtransactions created for a single registered transaction",
+ Buckets: []float64{0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0},
+ },
+ ),
}
for _, opt := range opts {
@@ -106,6 +108,16 @@ func NewManager(opts ...ManagerOpt) *Manager {
return mgr
}
+func (mgr *Manager) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(mgr, descs)
+}
+
+func (mgr *Manager) Collect(metrics chan<- prometheus.Metric) {
+ mgr.counterMetric.Collect(metrics)
+ mgr.delayMetric.Collect(metrics)
+ mgr.subtransactionsMetric.Collect(metrics)
+}
+
func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger {
return ctxlogrus.Extract(ctx).WithField("component", "transactions.Manager")
}