Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenri Philipps <hphilipps+commit@gitlab.com>2021-09-02 12:29:39 +0300
committerHenri Philipps <hphilipps+commit@gitlab.com>2021-09-02 12:29:39 +0300
commit8edd23d6df8fc26692ca81234ea60ca05325d6cc (patch)
treec84a06fec7c8f5b32f3bba72bd138b0e7d508c54
parent682fe7a7688035b91dbed30e74c477ede6581325 (diff)
parent23d7161a4a80bb18d9e0b0b304f2d5bd3cd6d467 (diff)
Merge branch 'pks-coordinator-replication-v13.12' into '13-12-stable'
Backport improved replication logic (v13.12) See merge request gitlab-org/gitaly!3825
-rw-r--r--internal/praefect/coordinator.go133
-rw-r--r--internal/praefect/coordinator_pg_test.go126
-rw-r--r--internal/praefect/coordinator_test.go286
-rw-r--r--internal/praefect/transactions/subtransaction.go18
-rw-r--r--internal/praefect/transactions/transaction.go24
-rw-r--r--internal/praefect/transactions/transaction_test.go27
6 files changed, 489 insertions, 125 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 45d0e29e2..d08a1df67 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -230,13 +230,14 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- router Router
- txMgr *transactions.Manager
- queue datastore.ReplicationEventQueue
- rs datastore.RepositoryStore
- registry *protoregistry.Registry
- conf config.Config
- votersMetric *prometheus.HistogramVec
+ router Router
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
+ registry *protoregistry.Registry
+ conf config.Config
+ votersMetric *prometheus.HistogramVec
+ txReplicationCountMetric *prometheus.CounterVec
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
@@ -270,6 +271,13 @@ func NewCoordinator(
},
[]string{"virtual_storage"},
),
+ txReplicationCountMetric: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_tx_replications_total",
+ Help: "The number of replication jobs scheduled for transactional RPCs",
+ },
+ []string{"reason"},
+ ),
}
return coordinator
@@ -281,6 +289,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) {
func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) {
c.votersMetric.Collect(metrics)
+ c.txReplicationCountMetric.Collect(metrics)
}
func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
@@ -471,6 +480,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
}
for _, secondary := range route.Secondaries {
+ secondary := secondary
secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage)
if err != nil {
return nil, err
@@ -732,7 +742,14 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(
+ ctx, route, transaction, nodeErrors, c.txReplicationCountMetric)
+ if !primaryDirtied {
+ // If the primary replica was not modified then we don't need to consider the secondaries
+ // outdated. Praefect requires the primary to be always part of the quorum, so no changes
+ // to secondaries would be made without primary being in agreement.
+ return nil
+ }
return c.newRequestFinalizer(
ctx, virtualStorage, targetRepo, route.Primary.Storage,
@@ -743,15 +760,18 @@ func (c *Coordinator) createTransactionFinalizer(
// getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated
// after the given transaction. A node is considered outdated, if one of the following is true:
//
-// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware
-// of transactions and thus failed to correctly assert its state matches across nodes. This is
-// rather pessimistic, as it could also indicate that an RPC simply didn't change anything.
+// - No subtransactions were created and the RPC was successful on the primary. This really is only
+// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert
+// its state matches across nodes. This is rather pessimistic, as it could also indicate that an
+// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no
+// subtransactions, we assume no changes were done and that the nodes failed prior to voting.
//
// - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all
// nodes need to get replication jobs.
//
-// - The node has errored. As a special case, if the primary fails all nodes need to get replication
-// jobs.
+// - The node has a different error state than the primary. If both primary and secondary have
+// returned the same error, then we assume they did the same thing and failed in the same
+// controlled way.
//
// Note that this function cannot and should not fail: if anything goes wrong, we need to create
// replication jobs to repair state.
@@ -760,29 +780,60 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
-) (updated []string, outdated []string) {
+ replicationCountMetric *prometheus.CounterVec,
+) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
+ primaryErr := nodeErrors.errByNode[route.Primary.Storage]
+
+ // If there were no subtransactions and the primary failed the RPC, we assume no changes
+ // have been made and the nodes simply failed before voting. We can thus return directly and
+ // notify the caller that the primary is not considered to be dirty.
+ if transaction.CountSubtransactions() == 0 && primaryErr != nil {
+ return false, nil, nil
+ }
+
+ // If there was a single subtransactions but the primary didn't cast a vote, then it means
+ // that the primary node has dropped out before secondaries were able to commit any changes
+ // to disk. Given that they cannot ever succeed without the primary, no change to disk
+ // should have happened.
+ if transaction.CountSubtransactions() == 1 && !transaction.DidVote(route.Primary.Storage) {
+ return false, nil, nil
+ }
+
+ primaryDirtied = true
+
+ nodesByState := make(map[string][]string)
+ defer func() {
+ ctxlogrus.Extract(ctx).
+ WithField("transaction.primary", route.Primary.Storage).
+ WithField("transaction.secondaries", nodesByState).
+ Info("transactional node states")
+
+ for reason, nodes := range nodesByState {
+ replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes)))
+ }
+ }()
+
+ markOutdated := func(reason string, nodes []string) {
+ if len(nodes) != 0 {
+ outdated = append(outdated, nodes...)
+ nodesByState[reason] = append(nodesByState[reason], nodes...)
+ }
+ }
+
// Replication targets were not added to the transaction, most likely because they are
// either not healthy or out of date. We thus need to make sure to create replication jobs
// for them.
- outdated = append(outdated, route.ReplicationTargets...)
+ markOutdated("outdated", route.ReplicationTargets)
- // If the primary errored, then we need to assume that it has modified on-disk state and
- // thus need to replicate those changes to secondaries.
- if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil {
- ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction")
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
- return
- }
-
- // If no subtransaction happened, then the called RPC may not be aware of transactions at
- // all. We thus need to assume it changed repository state and need to create replication
- // jobs.
+ // If no subtransaction happened, then the called RPC may not be aware of transactions or
+ // the nodes failed before casting any votes. If the primary failed the RPC, we assume
+ // no changes were done and the nodes hit an error prior to voting. If the primary processed
+ // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere.
if transaction.CountSubtransactions() == 0 {
- ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ markOutdated("no-votes", routerNodesToStorages(route.Secondaries))
return
}
@@ -790,36 +841,34 @@ func getUpdatedAndOutdatedSecondaries(
// safe route and just replicate to all secondaries.
nodeStates, err := transaction.State()
if err != nil {
- ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state")
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ markOutdated("missing-tx-state", routerNodesToStorages(route.Secondaries))
return
}
- // If the primary node did not commit the transaction, then we must assume that it dirtied
- // on-disk state. This modified state may not be what we want, but it's what we got. So in
- // order to ensure a consistent state, we need to replicate.
+ // If the primary node did not commit the transaction but there were some subtransactions committed,
+ // then we must assume that it dirtied on-disk state. This modified state may not be what we want,
+ // but it's what we got. So in order to ensure a consistent state, we need to replicate.
if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted {
- if state == transactions.VoteFailed {
- ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
- }
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ markOutdated("primary-not-committed", routerNodesToStorages(route.Secondaries))
return
}
- // Now we finally got the potentially happy case: in case the secondary didn't run into an
- // error and committed, it's considered up to date and thus does not need replication.
+ // Now we finally got the potentially happy case: when the secondary committed the
+ // transaction and has the same error state as the primary, then it's considered up to date
+ // and thus does not need replication.
for _, secondary := range route.Secondaries {
- if nodeErrors.errByNode[secondary.Storage] != nil {
- outdated = append(outdated, secondary.Storage)
+ if nodeErrors.errByNode[secondary.Storage] != primaryErr {
+ markOutdated("node-error-status", []string{secondary.Storage})
continue
}
if nodeStates[secondary.Storage] != transactions.VoteCommitted {
- outdated = append(outdated, secondary.Storage)
+ markOutdated("node-not-committed", []string{secondary.Storage})
continue
}
updated = append(updated, secondary.Storage)
+ nodesByState["updated"] = append(nodesByState["updated"], secondary.Storage)
}
return
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index eb769eaa3..dc03142dc 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -32,10 +32,14 @@ func getDB(t *testing.T) glsql.DB {
}
func TestStreamDirectorMutator_Transaction(t *testing.T) {
+ type subtransactions []struct {
+ vote string
+ shouldSucceed bool
+ }
+
type node struct {
primary bool
- vote string
- shouldSucceed bool
+ subtransactions subtransactions
shouldGetRepl bool
shouldParticipate bool
generation int
@@ -43,45 +47,63 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
testcases := []struct {
- desc string
- nodes []node
+ desc string
+ primaryFails bool
+ nodes []node
}{
{
desc: "successful vote should not create replication jobs",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ },
+ },
+ {
+ desc: "successful vote should create replication jobs if the primary fails",
+ primaryFails: true,
+ nodes: []node{
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ },
+ },
+ {
+ desc: "failing vote should create replication jobs without committed subtransactions",
+ nodes: []node{
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
- desc: "failing vote should not create replication jobs",
+ desc: "failing vote should create replication jobs with committed subtransaction",
nodes: []node{
- {primary: true, vote: "foo", shouldSucceed: false, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "qux", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
- {primary: false, vote: "bar", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
desc: "primary should reach quorum with disagreeing secondary",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
desc: "quorum should create replication jobs for disagreeing node",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
desc: "only consistent secondaries should participate",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2},
{shouldParticipate: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0},
{shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
},
@@ -89,30 +111,46 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
{
desc: "secondaries should not participate when primary's generation is unknown",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0},
{shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
},
},
{
- // If the transaction didn't receive any votes at all, we need to assume
- // that the RPC wasn't aware of transactions and thus need to schedule
- // replication jobs.
- desc: "unstarted transaction should create replication jobs",
+ // All transactional RPCs are expected to cast vote if they are successful. If they don't, something is wrong
+ // and we should replicate to the secondaries to be sure.
+ desc: "unstarted transaction creates replication jobs if the primary is successful",
nodes: []node{
- {primary: true, shouldSucceed: true, shouldGetRepl: false, expectedGeneration: 1},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, expectedGeneration: 0},
+ {primary: true, shouldGetRepl: false, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: true, expectedGeneration: 0},
},
},
{
- // If the transaction didn't receive any votes at all, we need to assume
- // that the RPC wasn't aware of transactions and thus need to schedule
- // replication jobs.
- desc: "unstarted transaction should create replication jobs for outdated node",
+ desc: "unstarted transaction does not create replication job",
+ primaryFails: true,
nodes: []node{
- {primary: true, shouldSucceed: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
+ {primary: true, expectedGeneration: 0},
+ {primary: false, shouldGetRepl: false, expectedGeneration: 0},
+ },
+ },
+ {
+ desc: "unstarted transaction should not create replication jobs for outdated node if the primary does not vote",
+ primaryFails: true,
+ nodes: []node{
+ {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: false, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: false, generation: 0, expectedGeneration: 0},
+ {primary: false, shouldGetRepl: false, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
+ },
+ },
+ {
+ // If there were no subtransactions and the primary did not fail, we should schedule replication jobs to every secondary.
+ // All transactional RPCs are expected to vote if they are successful.
+ desc: "unstarted transaction should create replication jobs for outdated node if the primary succeeds",
+ nodes: []node{
+ {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2},
+ {primary: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0},
+ {primary: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
},
},
}
@@ -217,17 +255,27 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
go func() {
defer voterWaitGroup.Done()
- vote := voting.VoteFromData([]byte(node.vote))
- err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote)
- if node.shouldSucceed {
- assert.NoError(t, err)
- } else {
- assert.True(t, errors.Is(err, transactions.ErrTransactionFailed))
+ for _, subtransaction := range node.subtransactions {
+ vote := voting.VoteFromData([]byte(subtransaction.vote))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote)
+ if subtransaction.shouldSucceed {
+ if !assert.NoError(t, err) {
+ break
+ }
+ } else {
+ if !assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) {
+ break
+ }
+ }
}
}()
}
voterWaitGroup.Wait()
+ if tc.primaryFails {
+ streamParams.Primary().ErrHandler(errors.New("rpc failure"))
+ }
+
err = streamParams.RequestFinalizer()
require.NoError(t, err)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index fdb4b543d..c7383491e 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -12,6 +13,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -1541,6 +1544,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
type mockTransaction struct {
nodeStates map[string]transactions.VoteResult
subtransactions int
+ didVote map[string]bool
}
func (t mockTransaction) ID() uint64 {
@@ -1551,6 +1555,10 @@ func (t mockTransaction) CountSubtransactions() int {
return t.subtransactions
}
+func (t mockTransaction) DidVote(node string) bool {
+ return t.didVote[node]
+}
+
func (t mockTransaction) State() (map[string]transactions.VoteResult, error) {
return t.nodeStates, nil
}
@@ -1568,13 +1576,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
anyErr := errors.New("arbitrary error")
for _, tc := range []struct {
- desc string
- primary node
- secondaries []node
- replicas []string
- subtransactions int
- expectedOutdated []string
- expectedUpdated []string
+ desc string
+ primary node
+ secondaries []node
+ replicas []string
+ subtransactions int
+ didVote map[string]bool
+ expectedPrimaryDirtied bool
+ expectedOutdated []string
+ expectedUpdated []string
+ expectedMetrics map[string]int
}{
{
desc: "single committed node",
@@ -1582,7 +1593,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- subtransactions: 1,
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
},
{
desc: "single failed node",
@@ -1604,7 +1619,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
primary: node{
name: "primary",
},
- subtransactions: 0,
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
},
{
desc: "single successful node with replica",
@@ -1612,19 +1628,24 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
- desc: "single failing node with replica",
+ desc: "single failing node with replica is not considered modified",
primary: node{
name: "primary",
state: transactions.VoteFailed,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ subtransactions: 1,
},
{
desc: "single erred node with replica",
@@ -1633,18 +1654,40 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
state: transactions.VoteCommitted,
err: anyErr,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
+ },
+ {
+ desc: "single erred node without commit with replica",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ replicas: []string{"replica"},
+ subtransactions: 1,
+ expectedPrimaryDirtied: false,
},
{
desc: "single node without transaction with replica",
primary: node{
name: "primary",
},
- replicas: []string{"replica"},
- subtransactions: 0,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
+ expectedMetrics: map[string]int{
+ "outdated": 1,
+ },
},
{
desc: "multiple committed nodes",
@@ -1656,8 +1699,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s1", "s2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "updated": 2,
+ },
},
{
desc: "multiple committed nodes with primary err",
@@ -1670,8 +1720,59 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "node-error-status": 2,
+ },
+ },
+ {
+ desc: "multiple committed nodes with same error as primary",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteCommitted, err: anyErr},
+ {name: "s2", state: transactions.VoteCommitted, err: anyErr},
+ },
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "updated": 2,
+ },
+ },
+ {
+ desc: "multiple committed nodes with different error as primary",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteCommitted, err: errors.New("somethingsomething")},
+ {name: "s2", state: transactions.VoteCommitted, err: anyErr},
+ },
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-error-status": 1,
+ "updated": 1,
+ },
},
{
desc: "multiple committed nodes with secondary err",
@@ -1683,9 +1784,40 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted, err: anyErr},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-error-status": 1,
+ "updated": 1,
+ },
+ },
+ {
+ desc: "multiple committed nodes with primary and missing secondary err",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteCommitted, err: anyErr},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s1"},
+ expectedOutdated: []string{"s2"},
+ expectedMetrics: map[string]int{
+ "node-error-status": 1,
+ "updated": 1,
+ },
},
{
desc: "partial success",
@@ -1697,9 +1829,17 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ "updated": 1,
+ },
},
{
desc: "failure with (impossible) secondary success",
@@ -1711,8 +1851,31 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "primary-not-committed": 2,
+ },
+ },
+ {
+ desc: "failure with no primary votes",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteFailed,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ didVote: map[string]bool{
+ "s1": true,
+ "s2": true,
+ },
+ subtransactions: 1,
},
{
desc: "multiple nodes without subtransactions",
@@ -1724,8 +1887,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 0,
- expectedOutdated: []string{"s1", "s2"},
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "no-votes": 2,
+ },
},
{
desc: "multiple nodes with replica and partial failures",
@@ -1737,10 +1904,19 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- replicas: []string{"r1", "r2"},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "r1", "r2"},
- expectedUpdated: []string{"s2"},
+ replicas: []string{"r1", "r2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "r1", "r2"},
+ expectedUpdated: []string{"s2"},
+ expectedMetrics: map[string]int{
+ "node-not-committed": 1,
+ "outdated": 2,
+ "updated": 1,
+ },
},
{
desc: "multiple nodes with replica and partial err",
@@ -1752,9 +1928,18 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted, err: anyErr},
},
- replicas: []string{"r1", "r2"},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ replicas: []string{"r1", "r2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ expectedMetrics: map[string]int{
+ "node-error-status": 1,
+ "node-not-committed": 1,
+ "outdated": 2,
+ },
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -1778,6 +1963,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
transaction := mockTransaction{
nodeStates: states,
subtransactions: tc.subtransactions,
+ didVote: tc.didVote,
}
route := RepositoryMutatorRoute{
@@ -1792,9 +1978,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ metric := prometheus.NewCounterVec(prometheus.CounterOpts{
+ Name: "stub", Help: "help",
+ }, []string{"reason"})
+
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric)
+ require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
+
+ expectedMetrics := "# HELP stub help\n# TYPE stub counter\n"
+ for metric, value := range tc.expectedMetrics {
+ expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value)
+ }
+
+ require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics)))
})
}
}
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 4193ea1f2..eda87702b 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -297,3 +297,21 @@ func (t *subtransaction) getResult(node string) (VoteResult, error) {
return voter.result, nil
}
+
+func (t *subtransaction) getVote(node string) (*voting.Vote, error) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return nil, fmt.Errorf("invalid node for transaction: %q", node)
+ }
+
+ if voter.vote == nil {
+ return nil, nil
+ }
+
+ // Return a copy of the vote.
+ vote := *voter.vote
+ return &vote, nil
+}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 90d8f6bf9..6baf696a5 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -57,6 +57,8 @@ type Transaction interface {
CountSubtransactions() int
// State returns the state of each voter part of the transaction.
State() (map[string]VoteResult, error)
+ // DidVote returns whether the given node has cast a vote.
+ DidVote(string) bool
}
// transaction is a session where a set of voters votes on one or more
@@ -184,6 +186,28 @@ func (t *transaction) CountSubtransactions() int {
return len(t.subtransactions)
}
+// DidVote determines whether the given node did cast a vote. If it's not possible to retrieve the
+// vote, then the node by definition didn't cast a vote.
+func (t *transaction) DidVote(node string) bool {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // If there are no subtransactions, then no vote could've been cast by the given node.
+ if len(t.subtransactions) == 0 {
+ return false
+ }
+
+ // It's sufficient to take a look at the first transaction.
+ vote, err := t.subtransactions[0].getVote(node)
+ if err != nil {
+ // If it's not possible to retrieve the vote, then we consider the note to not have
+ // cast a vote.
+ return false
+ }
+
+ return vote != nil
+}
+
// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
// node hasn't yet voted on or creates a new one if the node has succeeded on
// all subtransactions. In case the node has failed on any of the
diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go
index c20213c10..1f3074afc 100644
--- a/internal/praefect/transactions/transaction_test.go
+++ b/internal/praefect/transactions/transaction_test.go
@@ -24,3 +24,30 @@ func TestTransactionCancellationWithEmptyTransaction(t *testing.T) {
require.Error(t, err)
require.Equal(t, err, ErrTransactionCanceled)
}
+
+func TestTransaction_DidVote(t *testing.T) {
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ tx, err := newTransaction(1, []Voter{
+ {Name: "v1", Votes: 1},
+ {Name: "v2", Votes: 0},
+ }, 1)
+ require.NoError(t, err)
+
+ // An unregistered voter did not vote.
+ require.False(t, tx.DidVote("unregistered"))
+ // And neither of the registered ones did cast a vote yet.
+ require.False(t, tx.DidVote("v1"))
+ require.False(t, tx.DidVote("v2"))
+
+ // One of both nodes does cast a vote.
+ require.NoError(t, tx.vote(ctx, "v1", voting.VoteFromData([]byte{})))
+ require.True(t, tx.DidVote("v1"))
+ require.False(t, tx.DidVote("v2"))
+
+ // And now the second node does cast a vote, too.
+ require.NoError(t, tx.vote(ctx, "v2", voting.VoteFromData([]byte{})))
+ require.True(t, tx.DidVote("v1"))
+ require.True(t, tx.DidVote("v2"))
+}