Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-05-25 13:38:27 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-05-25 13:38:27 +0300
commit395f7c52943f9d2d101bcc2265373b2671ca0321 (patch)
treec15a01fb2c7d86a6cb7f73d67ddf28b83a314479
parent0a748a623ffecd910dbc2f8f2ff542df3b067622 (diff)
parentbdf5df69bf32c8e736bd33bc4920d57e5ee3261e (diff)
Merge branch 'pks-coordinator-tx-replication-metrics' into 'master'
coordinator: Add replication metrics for transactions See merge request gitlab-org/gitaly!3519
-rw-r--r--internal/praefect/coordinator.go53
-rw-r--r--internal/praefect/coordinator_test.go50
2 files changed, 89 insertions, 14 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 5b2cb72fa..741962107 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -236,13 +236,14 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- router Router
- txMgr *transactions.Manager
- queue datastore.ReplicationEventQueue
- rs datastore.RepositoryStore
- registry *protoregistry.Registry
- conf config.Config
- votersMetric *prometheus.HistogramVec
+ router Router
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
+ registry *protoregistry.Registry
+ conf config.Config
+ votersMetric *prometheus.HistogramVec
+ txReplicationCountMetric *prometheus.CounterVec
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
@@ -276,6 +277,13 @@ func NewCoordinator(
},
[]string{"virtual_storage"},
),
+ txReplicationCountMetric: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_tx_replications_total",
+ Help: "The number of replication jobs scheduled for transactional RPCs",
+ },
+ []string{"reason"},
+ ),
}
return coordinator
@@ -287,6 +295,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) {
func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) {
c.votersMetric.Collect(metrics)
+ c.txReplicationCountMetric.Collect(metrics)
}
func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
@@ -738,7 +747,8 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(
+ ctx, route, transaction, nodeErrors, c.txReplicationCountMetric)
if !primaryDirtied {
// If the primary replica was not modified then we don't need to consider the secondaries
// outdated. Praefect requires the primary to be always part of the quorum, so no changes
@@ -774,15 +784,11 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
+ replicationCountMetric *prometheus.CounterVec,
) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
- // Replication targets were not added to the transaction, most likely because they are
- // either not healthy or out of date. We thus need to make sure to create replication jobs
- // for them.
- outdated = append(outdated, route.ReplicationTargets...)
-
primaryErr := nodeErrors.errByNode[route.Primary.Storage]
// If there were subtransactions, we only assume some changes were made if one of the subtransactions
@@ -796,11 +802,27 @@ func getUpdatedAndOutdatedSecondaries(
primaryDirtied = transaction.DidCommitAnySubtransaction() ||
(transaction.CountSubtransactions() == 0 && primaryErr == nil)
+ recordReplication := func(reason string, replicationCount int) {
+ // If the primary wasn't dirtied, then we never replicate any changes. While this is
+ // duplicates logic defined elsewhere, it's probably good enough given that we only
+ // talk about metrics here.
+ if primaryDirtied && replicationCount > 0 {
+ replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount))
+ }
+ }
+
+ // Replication targets were not added to the transaction, most likely because they are
+ // either not healthy or out of date. We thus need to make sure to create replication jobs
+ // for them.
+ outdated = append(outdated, route.ReplicationTargets...)
+ recordReplication("outdated", len(route.ReplicationTargets))
+
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
if primaryErr != nil {
ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("primary-failed", len(route.Secondaries))
return
}
@@ -811,6 +833,7 @@ func getUpdatedAndOutdatedSecondaries(
if transaction.CountSubtransactions() == 0 {
ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("no-votes", len(route.Secondaries))
return
}
@@ -820,6 +843,7 @@ func getUpdatedAndOutdatedSecondaries(
if err != nil {
ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("missing-tx-state", len(route.Secondaries))
return
}
@@ -831,6 +855,7 @@ func getUpdatedAndOutdatedSecondaries(
ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
}
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("primary-not-committed", len(route.Secondaries))
return
}
@@ -839,11 +864,13 @@ func getUpdatedAndOutdatedSecondaries(
for _, secondary := range route.Secondaries {
if nodeErrors.errByNode[secondary.Storage] != nil {
outdated = append(outdated, secondary.Storage)
+ recordReplication("node-failed", 1)
continue
}
if nodeStates[secondary.Storage] != transactions.VoteCommitted {
outdated = append(outdated, secondary.Storage)
+ recordReplication("node-not-committed", 1)
continue
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index b0bee67d3..0b3c28b7a 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -12,6 +13,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -1582,6 +1585,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied bool
expectedOutdated []string
expectedUpdated []string
+ expectedMetrics map[string]int
}{
{
desc: "single committed node",
@@ -1627,6 +1631,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "single failing node with replica",
@@ -1650,6 +1657,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "single node without transaction with replica",
@@ -1660,6 +1670,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 0,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "multiple committed nodes",
@@ -1691,6 +1704,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-failed": 2,
+ },
},
{
desc: "multiple committed nodes with secondary err",
@@ -1707,6 +1723,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedUpdated: []string{"s2"},
expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-failed": 1,
+ },
},
{
desc: "partial success",
@@ -1723,6 +1742,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedUpdated: []string{"s2"},
expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ },
},
{
desc: "failure with (impossible) secondary success",
@@ -1738,6 +1760,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-not-committed": 2,
+ },
},
{
desc: "multiple nodes without subtransactions",
@@ -1752,6 +1777,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 0,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "no-votes": 2,
+ },
},
{
desc: "multiple nodes with replica and partial failures",
@@ -1769,6 +1797,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "r1", "r2"},
expectedUpdated: []string{"s2"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
{
desc: "multiple nodes with replica and partial err",
@@ -1785,6 +1817,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ expectedMetrics: map[string]int{
+ "node-failed": 1,
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -1823,10 +1860,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ metric := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: "stub", Help: "help",
+ }, []string{"reason"})
+
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric)
require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
+
+ expectedMetrics := "# HELP stub help\n# TYPE stub counter\n"
+ for metric, value := range tc.expectedMetrics {
+ expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value)
+ }
+
+ require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics)))
})
}
}