Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-05-20 11:38:52 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-05-21 17:29:45 +0300
commitbdf5df69bf32c8e736bd33bc4920d57e5ee3261e (patch)
tree76fbff124b4707d537e3114305b407965af7f224
parent30e482604548b0d923f13b11dbc5aa63fdc289f9 (diff)
coordinator: Add replication metrics for transactions
Without scraping through the logs, it's currently hard to determine how many replication jobs we're queueing for transactional RPC and what the reasons for those jobs are. To improve the situation, this commit adds a new metric which tracks the reason why replication jobs are queued to improve visibility. Changelog: added
-rw-r--r--internal/praefect/coordinator.go43
-rw-r--r--internal/praefect/coordinator_test.go50
2 files changed, 84 insertions, 9 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 75288add4..04800f89c 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -237,13 +237,14 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- router Router
- txMgr *transactions.Manager
- queue datastore.ReplicationEventQueue
- rs datastore.RepositoryStore
- registry *protoregistry.Registry
- conf config.Config
- votersMetric *prometheus.HistogramVec
+ router Router
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
+ registry *protoregistry.Registry
+ conf config.Config
+ votersMetric *prometheus.HistogramVec
+ txReplicationCountMetric *prometheus.CounterVec
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
@@ -277,6 +278,13 @@ func NewCoordinator(
},
[]string{"virtual_storage"},
),
+ txReplicationCountMetric: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_tx_replications_total",
+ Help: "The number of replication jobs scheduled for transactional RPCs",
+ },
+ []string{"reason"},
+ ),
}
return coordinator
@@ -288,6 +296,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) {
func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) {
c.votersMetric.Collect(metrics)
+ c.txReplicationCountMetric.Collect(metrics)
}
func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
@@ -739,7 +748,8 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(
+ ctx, route, transaction, nodeErrors, c.txReplicationCountMetric)
if !primaryDirtied {
// If the primary replica was not modified then we don't need to consider the secondaries
// outdated. Praefect requires the primary to be always part of the quorum, so no changes
@@ -775,6 +785,7 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
+ replicationCountMetric *prometheus.CounterVec,
) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
@@ -792,16 +803,27 @@ func getUpdatedAndOutdatedSecondaries(
primaryDirtied = transaction.DidCommitAnySubtransaction() ||
(transaction.CountSubtransactions() == 0 && primaryErr == nil)
+ recordReplication := func(reason string, replicationCount int) {
+ // If the primary wasn't dirtied, then we never replicate any changes. While this is
+ // duplicates logic defined elsewhere, it's probably good enough given that we only
+ // talk about metrics here.
+ if primaryDirtied && replicationCount > 0 {
+ replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount))
+ }
+ }
+
// Replication targets were not added to the transaction, most likely because they are
// either not healthy or out of date. We thus need to make sure to create replication jobs
// for them.
outdated = append(outdated, route.ReplicationTargets...)
+ recordReplication("outdated", len(route.ReplicationTargets))
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
if primaryErr != nil {
ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("primary-failed", len(route.Secondaries))
return
}
@@ -812,6 +834,7 @@ func getUpdatedAndOutdatedSecondaries(
if transaction.CountSubtransactions() == 0 {
ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("no-votes", len(route.Secondaries))
return
}
@@ -821,6 +844,7 @@ func getUpdatedAndOutdatedSecondaries(
if err != nil {
ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("missing-tx-state", len(route.Secondaries))
return
}
@@ -832,6 +856,7 @@ func getUpdatedAndOutdatedSecondaries(
ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
}
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("primary-not-committed", len(route.Secondaries))
return
}
@@ -840,11 +865,13 @@ func getUpdatedAndOutdatedSecondaries(
for _, secondary := range route.Secondaries {
if nodeErrors.errByNode[secondary.Storage] != nil {
outdated = append(outdated, secondary.Storage)
+ recordReplication("node-failed", 1)
continue
}
if nodeStates[secondary.Storage] != transactions.VoteCommitted {
outdated = append(outdated, secondary.Storage)
+ recordReplication("node-not-committed", 1)
continue
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index b0bee67d3..0b3c28b7a 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -12,6 +13,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -1582,6 +1585,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied bool
expectedOutdated []string
expectedUpdated []string
+ expectedMetrics map[string]int
}{
{
desc: "single committed node",
@@ -1627,6 +1631,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "single failing node with replica",
@@ -1650,6 +1657,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "single node without transaction with replica",
@@ -1660,6 +1670,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 0,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "multiple committed nodes",
@@ -1691,6 +1704,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-failed": 2,
+ },
},
{
desc: "multiple committed nodes with secondary err",
@@ -1707,6 +1723,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedUpdated: []string{"s2"},
expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-failed": 1,
+ },
},
{
desc: "partial success",
@@ -1723,6 +1742,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedUpdated: []string{"s2"},
expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ },
},
{
desc: "failure with (impossible) secondary success",
@@ -1738,6 +1760,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-not-committed": 2,
+ },
},
{
desc: "multiple nodes without subtransactions",
@@ -1752,6 +1777,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 0,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "no-votes": 2,
+ },
},
{
desc: "multiple nodes with replica and partial failures",
@@ -1769,6 +1797,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "r1", "r2"},
expectedUpdated: []string{"s2"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
{
desc: "multiple nodes with replica and partial err",
@@ -1785,6 +1817,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ expectedMetrics: map[string]int{
+ "node-failed": 1,
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -1823,10 +1860,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ metric := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: "stub", Help: "help",
+ }, []string{"reason"})
+
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric)
require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
+
+ expectedMetrics := "# HELP stub help\n# TYPE stub counter\n"
+ for metric, value := range tc.expectedMetrics {
+ expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value)
+ }
+
+ require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics)))
})
}
}