Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-05-20 11:38:52 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-02 08:23:54 +0300
commitbae452e8eb49f211acdf510cdd6c505bace34e5c (patch)
tree8dda9db3c72b2055292bad03d697aadd1a91bd55
parentbe8677b08dccf88e082cba61d766aa33528f299c (diff)
coordinator: Add replication metrics for transactions
Without scraping through the logs, it's currently hard to determine how many replication jobs we're queueing for transactional RPC and what the reasons for those jobs are. To improve the situation, this commit adds a new metric which tracks the reason why replication jobs are queued to improve visibility. Changelog: added (cherry picked from commit bdf5df69bf32c8e736bd33bc4920d57e5ee3261e)
-rw-r--r--internal/praefect/coordinator.go43
-rw-r--r--internal/praefect/coordinator_test.go50
2 files changed, 84 insertions, 9 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b5d28cb20..15a7e9fa6 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -230,13 +230,14 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- router Router
- txMgr *transactions.Manager
- queue datastore.ReplicationEventQueue
- rs datastore.RepositoryStore
- registry *protoregistry.Registry
- conf config.Config
- votersMetric *prometheus.HistogramVec
+ router Router
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
+ registry *protoregistry.Registry
+ conf config.Config
+ votersMetric *prometheus.HistogramVec
+ txReplicationCountMetric *prometheus.CounterVec
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
@@ -270,6 +271,13 @@ func NewCoordinator(
},
[]string{"virtual_storage"},
),
+ txReplicationCountMetric: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_tx_replications_total",
+ Help: "The number of replication jobs scheduled for transactional RPCs",
+ },
+ []string{"reason"},
+ ),
}
return coordinator
@@ -281,6 +289,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) {
func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) {
c.votersMetric.Collect(metrics)
+ c.txReplicationCountMetric.Collect(metrics)
}
func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
@@ -732,7 +741,8 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(
+ ctx, route, transaction, nodeErrors, c.txReplicationCountMetric)
if !primaryDirtied {
// If the primary replica was not modified then we don't need to consider the secondaries
// outdated. Praefect requires the primary to be always part of the quorum, so no changes
@@ -768,6 +778,7 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
+ replicationCountMetric *prometheus.CounterVec,
) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
@@ -785,16 +796,27 @@ func getUpdatedAndOutdatedSecondaries(
primaryDirtied = transaction.DidCommitAnySubtransaction() ||
(transaction.CountSubtransactions() == 0 && primaryErr == nil)
+ recordReplication := func(reason string, replicationCount int) {
+ // If the primary wasn't dirtied, then we never replicate any changes. While this is
+ // duplicates logic defined elsewhere, it's probably good enough given that we only
+ // talk about metrics here.
+ if primaryDirtied && replicationCount > 0 {
+ replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount))
+ }
+ }
+
// Replication targets were not added to the transaction, most likely because they are
// either not healthy or out of date. We thus need to make sure to create replication jobs
// for them.
outdated = append(outdated, route.ReplicationTargets...)
+ recordReplication("outdated", len(route.ReplicationTargets))
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
if primaryErr != nil {
ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("primary-failed", len(route.Secondaries))
return
}
@@ -805,6 +827,7 @@ func getUpdatedAndOutdatedSecondaries(
if transaction.CountSubtransactions() == 0 {
ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("no-votes", len(route.Secondaries))
return
}
@@ -814,6 +837,7 @@ func getUpdatedAndOutdatedSecondaries(
if err != nil {
ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("missing-tx-state", len(route.Secondaries))
return
}
@@ -825,6 +849,7 @@ func getUpdatedAndOutdatedSecondaries(
ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
}
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ recordReplication("primary-not-committed", len(route.Secondaries))
return
}
@@ -833,11 +858,13 @@ func getUpdatedAndOutdatedSecondaries(
for _, secondary := range route.Secondaries {
if nodeErrors.errByNode[secondary.Storage] != nil {
outdated = append(outdated, secondary.Storage)
+ recordReplication("node-failed", 1)
continue
}
if nodeStates[secondary.Storage] != transactions.VoteCommitted {
outdated = append(outdated, secondary.Storage)
+ recordReplication("node-not-committed", 1)
continue
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index fd8f1e0ea..37029da49 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -12,6 +13,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -1582,6 +1585,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied bool
expectedOutdated []string
expectedUpdated []string
+ expectedMetrics map[string]int
}{
{
desc: "single committed node",
@@ -1627,6 +1631,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "single failing node with replica",
@@ -1650,6 +1657,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "single node without transaction with replica",
@@ -1660,6 +1670,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 0,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "multiple committed nodes",
@@ -1691,6 +1704,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-failed": 2,
+ },
},
{
desc: "multiple committed nodes with secondary err",
@@ -1707,6 +1723,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedUpdated: []string{"s2"},
expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-failed": 1,
+ },
},
{
desc: "partial success",
@@ -1723,6 +1742,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedUpdated: []string{"s2"},
expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ },
},
{
desc: "failure with (impossible) secondary success",
@@ -1738,6 +1760,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-not-committed": 2,
+ },
},
{
desc: "multiple nodes without subtransactions",
@@ -1752,6 +1777,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 0,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "no-votes": 2,
+ },
},
{
desc: "multiple nodes with replica and partial failures",
@@ -1769,6 +1797,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "r1", "r2"},
expectedUpdated: []string{"s2"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
{
desc: "multiple nodes with replica and partial err",
@@ -1785,6 +1817,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
expectedPrimaryDirtied: true,
expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ expectedMetrics: map[string]int{
+ "node-failed": 1,
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -1823,10 +1860,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ metric := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: "stub", Help: "help",
+ }, []string{"reason"})
+
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric)
require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
+
+ expectedMetrics := "# HELP stub help\n# TYPE stub counter\n"
+ for metric, value := range tc.expectedMetrics {
+ expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value)
+ }
+
+ require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics)))
})
}
}