Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-15 10:24:24 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-15 10:24:24 +0300
commitb6ab69fe7d83a7f1b0a2ac68e56992c0416bbe85 (patch)
treeecd091f7d5e451f0a9db8501161a453213338390
parent290b75aca15308d950f1db4b3c190df6bcf72cad (diff)
parent80284bc249b14d31662e256fddf1efbb75f8d6bf (diff)
Merge branch 'pks-2pc-metrics' into 'master'
Metrics and improved logging for the transaction manager See merge request gitlab-org/gitaly!2165
-rw-r--r--cmd/praefect/main.go15
-rw-r--r--internal/praefect/metrics/prometheus.go29
-rw-r--r--internal/praefect/transaction_test.go66
-rw-r--r--internal/praefect/transactions/manager.go101
4 files changed, 183 insertions, 28 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 009734f0d..19d5e25fa 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -237,7 +237,20 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
nodeManager.Start(1*time.Second, 3*time.Second)
- transactionManager := transactions.NewManager()
+ transactionCounterMetric, err := metrics.RegisterTransactionCounter()
+ if err != nil {
+ return err
+ }
+
+ transactionDelayMetric, err := metrics.RegisterTransactionDelay(conf.Prometheus)
+ if err != nil {
+ return err
+ }
+
+ transactionManager := transactions.NewManager(
+ transactions.WithCounterMetric(transactionCounterMetric),
+ transactions.WithDelayMetric(transactionDelayMetric),
+ )
registry := protoregistry.New()
if err = registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...); err != nil {
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index 48ac91708..f49dfb9ba 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -66,6 +66,35 @@ func RegisterReplicationJobsInFlight() (metrics.Gauge, error) {
return replicationJobsInFlight, prometheus.Register(replicationJobsInFlight)
}
+// RegisterTransactionCounter creates and registers a Prometheus counter to
+// track the number of transactions and their outcomes.
+func RegisterTransactionCounter() (*prometheus.CounterVec, error) {
+ transactionCounter := prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "transactions_total",
+ },
+ []string{"action"},
+ )
+ return transactionCounter, prometheus.Register(transactionCounter)
+}
+
+// RegisterTransactionDelay creates and registers a Prometheus histogram to
+// track the delay of actions performed on transactions.
+func RegisterTransactionDelay(conf promconfig.Config) (metrics.HistogramVec, error) {
+ transactionDelay := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "transactions_delay_seconds",
+ Buckets: conf.GRPCLatencyBuckets,
+ },
+ []string{"action"},
+ )
+ return transactionDelay, prometheus.Register(transactionDelay)
+}
+
var MethodTypeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "gitaly",
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 1f9ce7a39..675de2cc8 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -6,6 +6,8 @@ import (
"testing"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
@@ -16,7 +18,7 @@ import (
"google.golang.org/grpc/status"
)
-func runPraefectWithTransactionMgr(t *testing.T) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
+func runPraefectWithTransactionMgr(t *testing.T, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
conf := testConfig(1)
ds := datastore.Datastore{
@@ -24,14 +26,47 @@ func runPraefectWithTransactionMgr(t *testing.T) (*grpc.ClientConn, *transaction
ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
- txMgr := transactions.NewManager()
+ txMgr := transactions.NewManager(opts...)
conn, _, cleanup := runPraefectServer(t, conf, ds, txMgr)
return conn, txMgr, cleanup
}
+func setupMetrics() (*prometheus.CounterVec, []transactions.ManagerOpt) {
+ counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"status"})
+ return counter, []transactions.ManagerOpt{
+ transactions.WithCounterMetric(counter),
+ }
+}
+
+type counterMetrics struct {
+ registered, started, invalid, committed int
+}
+
+func verifyCounterMetrics(t *testing.T, counter *prometheus.CounterVec, expected counterMetrics) {
+ t.Helper()
+
+ registered, err := counter.GetMetricWithLabelValues("registered")
+ require.NoError(t, err)
+ require.Equal(t, float64(expected.registered), testutil.ToFloat64(registered))
+
+ started, err := counter.GetMetricWithLabelValues("started")
+ require.NoError(t, err)
+ require.Equal(t, float64(expected.started), testutil.ToFloat64(started))
+
+ invalid, err := counter.GetMetricWithLabelValues("invalid")
+ require.NoError(t, err)
+ require.Equal(t, float64(expected.invalid), testutil.ToFloat64(invalid))
+
+ committed, err := counter.GetMetricWithLabelValues("committed")
+ require.NoError(t, err)
+ require.Equal(t, float64(expected.committed), testutil.ToFloat64(committed))
+}
+
func TestTransactionSucceeds(t *testing.T) {
- cc, txMgr, cleanup := runPraefectWithTransactionMgr(t)
+ counter, opts := setupMetrics()
+
+ cc, txMgr, cleanup := runPraefectWithTransactionMgr(t, opts...)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -53,6 +88,12 @@ func TestTransactionSucceeds(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, gitalypb.StartTransactionResponse_COMMIT, response.State)
+
+ verifyCounterMetrics(t, counter, counterMetrics{
+ registered: 1,
+ started: 1,
+ committed: 1,
+ })
}
func TestTransactionFailsWithMultipleNodes(t *testing.T) {
@@ -67,7 +108,9 @@ func TestTransactionFailsWithMultipleNodes(t *testing.T) {
}
func TestTransactionFailures(t *testing.T) {
- cc, _, cleanup := runPraefectWithTransactionMgr(t)
+ counter, opts := setupMetrics()
+
+ cc, _, cleanup := runPraefectWithTransactionMgr(t, opts...)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -83,10 +126,17 @@ func TestTransactionFailures(t *testing.T) {
})
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
+
+ verifyCounterMetrics(t, counter, counterMetrics{
+ started: 1,
+ invalid: 1,
+ })
}
func TestTransactionCancellation(t *testing.T) {
- cc, txMgr, cleanup := runPraefectWithTransactionMgr(t)
+ counter, opts := setupMetrics()
+
+ cc, txMgr, cleanup := runPraefectWithTransactionMgr(t, opts...)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -108,4 +158,10 @@ func TestTransactionCancellation(t *testing.T) {
})
require.Error(t, err)
require.Equal(t, codes.NotFound, status.Code(err))
+
+ verifyCounterMetrics(t, counter, counterMetrics{
+ registered: 1,
+ started: 1,
+ invalid: 1,
+ })
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 2a9b85382..c7f4e647a 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -7,25 +7,55 @@ import (
"fmt"
"math/rand"
"sync"
+ "time"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
)
// Manager handles reference transactions for Praefect. It is required in order
// for Praefect to handle transactions directly instead of having to reach out
// to reference transaction RPCs.
type Manager struct {
- lock sync.Mutex
- transactions map[uint64]string
+ lock sync.Mutex
+ transactions map[uint64]string
+ counterMetric *prometheus.CounterVec
+ delayMetric metrics.HistogramVec
+}
+
+// ManagerOpt is a self referential option for Manager
+type ManagerOpt func(*Manager)
+
+// WithCounterMetric is an option to set the counter Prometheus metric
+func WithCounterMetric(counterMetric *prometheus.CounterVec) ManagerOpt {
+ return func(mgr *Manager) {
+ mgr.counterMetric = counterMetric
+ }
+}
+
+// WithDelayMetric is an option to set the delay Prometheus metric
+func WithDelayMetric(delayMetric metrics.HistogramVec) ManagerOpt {
+ return func(mgr *Manager) {
+ mgr.delayMetric = delayMetric
+ }
}
// NewManager creates a new transactions Manager.
-func NewManager() *Manager {
- return &Manager{
- transactions: make(map[uint64]string),
+func NewManager(opts ...ManagerOpt) *Manager {
+ mgr := &Manager{
+ transactions: make(map[uint64]string),
+ counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}),
+ delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}),
}
+
+ for _, opt := range opts {
+ opt(mgr)
+ }
+
+ return mgr
}
func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger {
@@ -65,6 +95,8 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui
"nodes": nodes,
}).Debug("RegisterTransaction")
+ mgr.counterMetric.WithLabelValues("registered").Inc()
+
return transactionID, func() {
mgr.cancelTransaction(transactionID)
}, nil
@@ -76,22 +108,7 @@ func (mgr *Manager) cancelTransaction(transactionID uint64) {
delete(mgr.transactions, transactionID)
}
-// StartTransaction is called by a client who's starting a reference
-// transaction. As we currently only have primary nodes which perform reference
-// transactions, this function doesn't yet do anything of interest but will
-// always instruct the node to commit, if given valid transaction parameters.
-// In future, it will wait for all clients of a given transaction to start the
-// transaction and perform a vote.
-func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
- mgr.lock.Lock()
- defer mgr.lock.Unlock()
-
- mgr.log(ctx).WithFields(logrus.Fields{
- "transaction_id": transactionID,
- "node": node,
- "hash": hex.EncodeToString(hash),
- }).Debug("StartTransaction")
-
+func (mgr *Manager) verifyTransaction(transactionID uint64, node string, hash []byte) error {
// While the reference updates hash is not used yet, we already verify
// it's there. At a later point, the hash will be used to verify that
// all voting nodes agree on the same updates.
@@ -99,7 +116,10 @@ func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64,
return helper.ErrInvalidArgumentf("invalid reference hash: %q", hash)
}
+ mgr.lock.Lock()
transaction, ok := mgr.transactions[transactionID]
+ mgr.lock.Unlock()
+
if !ok {
return helper.ErrNotFound(fmt.Errorf("no such transaction: %d", transactionID))
}
@@ -108,10 +128,47 @@ func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64,
return helper.ErrInternalf("invalid node for transaction: %q", node)
}
+ return nil
+}
+
+// StartTransaction is called by a client who's starting a reference
+// transaction. As we currently only have primary nodes which perform reference
+// transactions, this function doesn't yet do anything of interest but will
+// always instruct the node to commit, if given valid transaction parameters.
+// In future, it will wait for all clients of a given transaction to start the
+// transaction and perform a vote.
+func (mgr *Manager) StartTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
+ start := time.Now()
+ defer func() {
+ delay := time.Since(start)
+ mgr.delayMetric.WithLabelValues("vote").Observe(delay.Seconds())
+ }()
+
+ mgr.counterMetric.WithLabelValues("started").Inc()
+
+ mgr.log(ctx).WithFields(logrus.Fields{
+ "transaction_id": transactionID,
+ "node": node,
+ "hash": hex.EncodeToString(hash),
+ }).Debug("StartTransaction")
+
+ if err := mgr.verifyTransaction(transactionID, node, hash); err != nil {
+ mgr.log(ctx).WithFields(logrus.Fields{
+ "transaction_id": transactionID,
+ "node": node,
+ "hash": hex.EncodeToString(hash),
+ }).WithError(err).Error("StartTransaction: transaction invalid")
+ mgr.counterMetric.WithLabelValues("invalid").Inc()
+ return err
+ }
+
mgr.log(ctx).WithFields(logrus.Fields{
"transaction_id": transactionID,
+ "node": node,
"hash": hex.EncodeToString(hash),
- }).Debug("CommitTransaction")
+ }).Debug("StartTransaction: transaction committed")
+
+ mgr.counterMetric.WithLabelValues("committed").Inc()
return nil
}