diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-15 10:24:24 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-15 10:24:24 +0300 |
commit | b6ab69fe7d83a7f1b0a2ac68e56992c0416bbe85 (patch) | |
tree | ecd091f7d5e451f0a9db8501161a453213338390 | |
parent | 290b75aca15308d950f1db4b3c190df6bcf72cad (diff) | |
parent | 80284bc249b14d31662e256fddf1efbb75f8d6bf (diff) |
Merge branch 'pks-2pc-metrics' into 'master'
Metrics and improved logging for the transaction manager
See merge request gitlab-org/gitaly!2165
-rw-r--r-- | cmd/praefect/main.go | 15 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 29 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 66 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 101 |
4 files changed, 183 insertions, 28 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 009734f0d..19d5e25fa 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -237,7 +237,20 @@ func run(cfgs []starter.Config, conf config.Config) error { } nodeManager.Start(1*time.Second, 3*time.Second) - transactionManager := transactions.NewManager() + transactionCounterMetric, err := metrics.RegisterTransactionCounter() + if err != nil { + return err + } + + transactionDelayMetric, err := metrics.RegisterTransactionDelay(conf.Prometheus) + if err != nil { + return err + } + + transactionManager := transactions.NewManager( + transactions.WithCounterMetric(transactionCounterMetric), + transactions.WithDelayMetric(transactionDelayMetric), + ) registry := protoregistry.New() if err = registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...); err != nil { diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index 48ac91708..f49dfb9ba 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -66,6 +66,35 @@ func RegisterReplicationJobsInFlight() (metrics.Gauge, error) { return replicationJobsInFlight, prometheus.Register(replicationJobsInFlight) } +// RegisterTransactionCounter creates and registers a Prometheus counter to +// track the number of transactions and their outcomes. +func RegisterTransactionCounter() (*prometheus.CounterVec, error) { + transactionCounter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "transactions_total", + }, + []string{"action"}, + ) + return transactionCounter, prometheus.Register(transactionCounter) +} + +// RegisterTransactionDelay creates and registers a Prometheus histogram to +// track the delay of actions performed on transactions. +func RegisterTransactionDelay(conf promconfig.Config) (metrics.HistogramVec, error) { + transactionDelay := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "transactions_delay_seconds", + Buckets: conf.GRPCLatencyBuckets, + }, + []string{"action"}, + ) + return transactionDelay, prometheus.Register(transactionDelay) +} + var MethodTypeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "gitaly", diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 1f9ce7a39..675de2cc8 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" @@ -16,7 +18,7 @@ import ( "google.golang.org/grpc/status" ) -func runPraefectWithTransactionMgr(t *testing.T) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { +func runPraefectWithTransactionMgr(t *testing.T, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) ds := datastore.Datastore{ @@ -24,14 +26,47 @@ func runPraefectWithTransactionMgr(t *testing.T) (*grpc.ClientConn, *transaction ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } - txMgr := transactions.NewManager() + txMgr := transactions.NewManager(opts...) conn, _, cleanup := runPraefectServer(t, conf, ds, txMgr) return conn, txMgr, cleanup } +func setupMetrics() (*prometheus.CounterVec, []transactions.ManagerOpt) { + counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"status"}) + return counter, []transactions.ManagerOpt{ + transactions.WithCounterMetric(counter), + } +} + +type counterMetrics struct { + registered, started, invalid, committed int +} + +func verifyCounterMetrics(t *testing.T, counter *prometheus.CounterVec, expected counterMetrics) { + t.Helper() + + registered, err := counter.GetMetricWithLabelValues("registered") + require.NoError(t, err) + require.Equal(t, float64(expected.registered), testutil.ToFloat64(registered)) + + started, err := counter.GetMetricWithLabelValues("started") + require.NoError(t, err) + require.Equal(t, float64(expected.started), testutil.ToFloat64(started)) + + invalid, err := counter.GetMetricWithLabelValues("invalid") + require.NoError(t, err) + require.Equal(t, float64(expected.invalid), testutil.ToFloat64(invalid)) + + committed, err := counter.GetMetricWithLabelValues("committed") + require.NoError(t, err) + require.Equal(t, float64(expected.committed), testutil.ToFloat64(committed)) +} + func TestTransactionSucceeds(t *testing.T) { - cc, txMgr, cleanup := runPraefectWithTransactionMgr(t) + counter, opts := setupMetrics() + + cc, txMgr, cleanup := runPraefectWithTransactionMgr(t, opts...) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -53,6 +88,12 @@ func TestTransactionSucceeds(t *testing.T) { }) require.NoError(t, err) require.Equal(t, gitalypb.StartTransactionResponse_COMMIT, response.State) + + verifyCounterMetrics(t, counter, counterMetrics{ + registered: 1, + started: 1, + committed: 1, + }) } func TestTransactionFailsWithMultipleNodes(t *testing.T) { @@ -67,7 +108,9 @@ func TestTransactionFailsWithMultipleNodes(t *testing.T) { } func TestTransactionFailures(t *testing.T) { - cc, _, cleanup := runPraefectWithTransactionMgr(t) + counter, opts := setupMetrics() + + cc, _, cleanup := runPraefectWithTransactionMgr(t, opts...) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -83,10 +126,17 @@ func TestTransactionFailures(t *testing.T) { }) require.Error(t, err) require.Equal(t, codes.NotFound, status.Code(err)) + + verifyCounterMetrics(t, counter, counterMetrics{ + started: 1, + invalid: 1, + }) } func TestTransactionCancellation(t *testing.T) { - cc, txMgr, cleanup := runPraefectWithTransactionMgr(t) + counter, opts := setupMetrics() + + cc, txMgr, cleanup := runPraefectWithTransactionMgr(t, opts...) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -108,4 +158,10 @@ func TestTransactionCancellation(t *testing.T) { }) require.Error(t, err) require.Equal(t, codes.NotFound, status.Code(err)) + + verifyCounterMetrics(t, counter, counterMetrics{ + registered: 1, + started: 1, + invalid: 1, + }) } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 2a9b85382..c7f4e647a 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -7,25 +7,55 @@ import ( "fmt" "math/rand" "sync" + "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" ) // Manager handles reference transactions for Praefect. It is required in order // for Praefect to handle transactions directly instead of having to reach out // to reference transaction RPCs. type Manager struct { - lock sync.Mutex - transactions map[uint64]string + lock sync.Mutex + transactions map[uint64]string + counterMetric *prometheus.CounterVec + delayMetric metrics.HistogramVec +} + +// ManagerOpt is a self referential option for Manager +type ManagerOpt func(*Manager) + +// WithCounterMetric is an option to set the counter Prometheus metric +func WithCounterMetric(counterMetric *prometheus.CounterVec) ManagerOpt { + return func(mgr *Manager) { + mgr.counterMetric = counterMetric + } +} + +// WithDelayMetric is an option to set the delay Prometheus metric +func WithDelayMetric(delayMetric metrics.HistogramVec) ManagerOpt { + return func(mgr *Manager) { + mgr.delayMetric = delayMetric + } } // NewManager creates a new transactions Manager. -func NewManager() *Manager { - return &Manager{ - transactions: make(map[uint64]string), +func NewManager(opts ...ManagerOpt) *Manager { + mgr := &Manager{ + transactions: make(map[uint64]string), + counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}), + delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}), } + + for _, opt := range opts { + opt(mgr) + } + + return mgr } func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { @@ -65,6 +95,8 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui "nodes": nodes, }).Debug("RegisterTransaction") + mgr.counterMetric.WithLabelValues("registered").Inc() + return transactionID, func() { mgr.cancelTransaction(transactionID) }, nil @@ -76,22 +108,7 @@ func (mgr *Manager) cancelTransaction(transactionID uint64) { delete(mgr.transactions, transactionID) } -// StartTransaction is called by a client who's starting a reference -// transaction. As we currently only have primary nodes which perform reference -// transactions, this function doesn't yet do anything of interest but will -// always instruct the node to commit, if given valid transaction parameters. -// In future, it will wait for all clients of a given transaction to start the -// transaction and perform a vote. -func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - mgr.log(ctx).WithFields(logrus.Fields{ - "transaction_id": transactionID, - "node": node, - "hash": hex.EncodeToString(hash), - }).Debug("StartTransaction") - +func (mgr *Manager) verifyTransaction(transactionID uint64, node string, hash []byte) error { // While the reference updates hash is not used yet, we already verify // it's there. At a later point, the hash will be used to verify that // all voting nodes agree on the same updates. @@ -99,7 +116,10 @@ func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64, return helper.ErrInvalidArgumentf("invalid reference hash: %q", hash) } + mgr.lock.Lock() transaction, ok := mgr.transactions[transactionID] + mgr.lock.Unlock() + if !ok { return helper.ErrNotFound(fmt.Errorf("no such transaction: %d", transactionID)) } @@ -108,10 +128,47 @@ func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64, return helper.ErrInternalf("invalid node for transaction: %q", node) } + return nil +} + +// StartTransaction is called by a client who's starting a reference +// transaction. As we currently only have primary nodes which perform reference +// transactions, this function doesn't yet do anything of interest but will +// always instruct the node to commit, if given valid transaction parameters. +// In future, it will wait for all clients of a given transaction to start the +// transaction and perform a vote. +func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { + start := time.Now() + defer func() { + delay := time.Since(start) + mgr.delayMetric.WithLabelValues("vote").Observe(delay.Seconds()) + }() + + mgr.counterMetric.WithLabelValues("started").Inc() + + mgr.log(ctx).WithFields(logrus.Fields{ + "transaction_id": transactionID, + "node": node, + "hash": hex.EncodeToString(hash), + }).Debug("StartTransaction") + + if err := mgr.verifyTransaction(transactionID, node, hash); err != nil { + mgr.log(ctx).WithFields(logrus.Fields{ + "transaction_id": transactionID, + "node": node, + "hash": hex.EncodeToString(hash), + }).WithError(err).Error("StartTransaction: transaction invalid") + mgr.counterMetric.WithLabelValues("invalid").Inc() + return err + } + mgr.log(ctx).WithFields(logrus.Fields{ "transaction_id": transactionID, + "node": node, "hash": hex.EncodeToString(hash), - }).Debug("CommitTransaction") + }).Debug("StartTransaction: transaction committed") + + mgr.counterMetric.WithLabelValues("committed").Inc() return nil } |