diff options
author | Henri Philipps <hphilipps+commit@gitlab.com> | 2021-09-02 12:29:39 +0300 |
---|---|---|
committer | Henri Philipps <hphilipps+commit@gitlab.com> | 2021-09-02 12:29:39 +0300 |
commit | 8edd23d6df8fc26692ca81234ea60ca05325d6cc (patch) | |
tree | c84a06fec7c8f5b32f3bba72bd138b0e7d508c54 | |
parent | 682fe7a7688035b91dbed30e74c477ede6581325 (diff) | |
parent | 23d7161a4a80bb18d9e0b0b304f2d5bd3cd6d467 (diff) |
Merge branch 'pks-coordinator-replication-v13.12' into '13-12-stable'
Backport improved replication logic (v13.12)
See merge request gitlab-org/gitaly!3825
-rw-r--r-- | internal/praefect/coordinator.go | 133 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 126 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 286 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 18 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 24 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction_test.go | 27 |
6 files changed, 489 insertions, 125 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 45d0e29e2..d08a1df67 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -230,13 +230,14 @@ type grpcCall struct { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - router Router - txMgr *transactions.Manager - queue datastore.ReplicationEventQueue - rs datastore.RepositoryStore - registry *protoregistry.Registry - conf config.Config - votersMetric *prometheus.HistogramVec + router Router + txMgr *transactions.Manager + queue datastore.ReplicationEventQueue + rs datastore.RepositoryStore + registry *protoregistry.Registry + conf config.Config + votersMetric *prometheus.HistogramVec + txReplicationCountMetric *prometheus.CounterVec } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -270,6 +271,13 @@ func NewCoordinator( }, []string{"virtual_storage"}, ), + txReplicationCountMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_tx_replications_total", + Help: "The number of replication jobs scheduled for transactional RPCs", + }, + []string{"reason"}, + ), } return coordinator @@ -281,6 +289,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) { func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) { c.votersMetric.Collect(metrics) + c.txReplicationCountMetric.Collect(metrics) } func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { @@ -471,6 +480,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } for _, secondary := range route.Secondaries { + secondary := secondary secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage) if err != nil { return nil, err @@ -732,7 +742,14 @@ func (c *Coordinator) createTransactionFinalizer( nodeErrors *nodeErrors, ) func() error { return func() error { - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries( + ctx, route, transaction, nodeErrors, c.txReplicationCountMetric) + if !primaryDirtied { + // If the primary replica was not modified then we don't need to consider the secondaries + // outdated. Praefect requires the primary to be always part of the quorum, so no changes + // to secondaries would be made without primary being in agreement. + return nil + } return c.newRequestFinalizer( ctx, virtualStorage, targetRepo, route.Primary.Storage, @@ -743,15 +760,18 @@ func (c *Coordinator) createTransactionFinalizer( // getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated // after the given transaction. A node is considered outdated, if one of the following is true: // -// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware -// of transactions and thus failed to correctly assert its state matches across nodes. This is -// rather pessimistic, as it could also indicate that an RPC simply didn't change anything. +// - No subtransactions were created and the RPC was successful on the primary. This really is only +// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert +// its state matches across nodes. This is rather pessimistic, as it could also indicate that an +// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no +// subtransactions, we assume no changes were done and that the nodes failed prior to voting. // // - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all // nodes need to get replication jobs. // -// - The node has errored. As a special case, if the primary fails all nodes need to get replication -// jobs. +// - The node has a different error state than the primary. If both primary and secondary have +// returned the same error, then we assume they did the same thing and failed in the same +// controlled way. // // Note that this function cannot and should not fail: if anything goes wrong, we need to create // replication jobs to repair state. @@ -760,29 +780,60 @@ func getUpdatedAndOutdatedSecondaries( route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, -) (updated []string, outdated []string) { + replicationCountMetric *prometheus.CounterVec, +) (primaryDirtied bool, updated []string, outdated []string) { nodeErrors.Lock() defer nodeErrors.Unlock() + primaryErr := nodeErrors.errByNode[route.Primary.Storage] + + // If there were no subtransactions and the primary failed the RPC, we assume no changes + // have been made and the nodes simply failed before voting. We can thus return directly and + // notify the caller that the primary is not considered to be dirty. + if transaction.CountSubtransactions() == 0 && primaryErr != nil { + return false, nil, nil + } + + // If there was a single subtransactions but the primary didn't cast a vote, then it means + // that the primary node has dropped out before secondaries were able to commit any changes + // to disk. Given that they cannot ever succeed without the primary, no change to disk + // should have happened. + if transaction.CountSubtransactions() == 1 && !transaction.DidVote(route.Primary.Storage) { + return false, nil, nil + } + + primaryDirtied = true + + nodesByState := make(map[string][]string) + defer func() { + ctxlogrus.Extract(ctx). + WithField("transaction.primary", route.Primary.Storage). + WithField("transaction.secondaries", nodesByState). + Info("transactional node states") + + for reason, nodes := range nodesByState { + replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes))) + } + }() + + markOutdated := func(reason string, nodes []string) { + if len(nodes) != 0 { + outdated = append(outdated, nodes...) + nodesByState[reason] = append(nodesByState[reason], nodes...) + } + } + // Replication targets were not added to the transaction, most likely because they are // either not healthy or out of date. We thus need to make sure to create replication jobs // for them. - outdated = append(outdated, route.ReplicationTargets...) + markOutdated("outdated", route.ReplicationTargets) - // If the primary errored, then we need to assume that it has modified on-disk state and - // thus need to replicate those changes to secondaries. - if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil { - ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - return - } - - // If no subtransaction happened, then the called RPC may not be aware of transactions at - // all. We thus need to assume it changed repository state and need to create replication - // jobs. + // If no subtransaction happened, then the called RPC may not be aware of transactions or + // the nodes failed before casting any votes. If the primary failed the RPC, we assume + // no changes were done and the nodes hit an error prior to voting. If the primary processed + // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere. if transaction.CountSubtransactions() == 0 { - ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + markOutdated("no-votes", routerNodesToStorages(route.Secondaries)) return } @@ -790,36 +841,34 @@ func getUpdatedAndOutdatedSecondaries( // safe route and just replicate to all secondaries. nodeStates, err := transaction.State() if err != nil { - ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + markOutdated("missing-tx-state", routerNodesToStorages(route.Secondaries)) return } - // If the primary node did not commit the transaction, then we must assume that it dirtied - // on-disk state. This modified state may not be what we want, but it's what we got. So in - // order to ensure a consistent state, we need to replicate. + // If the primary node did not commit the transaction but there were some subtransactions committed, + // then we must assume that it dirtied on-disk state. This modified state may not be what we want, + // but it's what we got. So in order to ensure a consistent state, we need to replicate. if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted { - if state == transactions.VoteFailed { - ctxlogrus.Extract(ctx).Error("transaction: primary failed vote") - } - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + markOutdated("primary-not-committed", routerNodesToStorages(route.Secondaries)) return } - // Now we finally got the potentially happy case: in case the secondary didn't run into an - // error and committed, it's considered up to date and thus does not need replication. + // Now we finally got the potentially happy case: when the secondary committed the + // transaction and has the same error state as the primary, then it's considered up to date + // and thus does not need replication. for _, secondary := range route.Secondaries { - if nodeErrors.errByNode[secondary.Storage] != nil { - outdated = append(outdated, secondary.Storage) + if nodeErrors.errByNode[secondary.Storage] != primaryErr { + markOutdated("node-error-status", []string{secondary.Storage}) continue } if nodeStates[secondary.Storage] != transactions.VoteCommitted { - outdated = append(outdated, secondary.Storage) + markOutdated("node-not-committed", []string{secondary.Storage}) continue } updated = append(updated, secondary.Storage) + nodesByState["updated"] = append(nodesByState["updated"], secondary.Storage) } return diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index eb769eaa3..dc03142dc 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -32,10 +32,14 @@ func getDB(t *testing.T) glsql.DB { } func TestStreamDirectorMutator_Transaction(t *testing.T) { + type subtransactions []struct { + vote string + shouldSucceed bool + } + type node struct { primary bool - vote string - shouldSucceed bool + subtransactions subtransactions shouldGetRepl bool shouldParticipate bool generation int @@ -43,45 +47,63 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } testcases := []struct { - desc string - nodes []node + desc string + primaryFails bool + nodes []node }{ { desc: "successful vote should not create replication jobs", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + }, + }, + { + desc: "successful vote should create replication jobs if the primary fails", + primaryFails: true, + nodes: []node{ + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + }, + }, + { + desc: "failing vote should create replication jobs without committed subtransactions", + nodes: []node{ + {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { - desc: "failing vote should not create replication jobs", + desc: "failing vote should create replication jobs with committed subtransaction", nodes: []node{ - {primary: true, vote: "foo", shouldSucceed: false, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "qux", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, - {primary: false, vote: "bar", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { desc: "primary should reach quorum with disagreeing secondary", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { desc: "quorum should create replication jobs for disagreeing node", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { desc: "only consistent secondaries should participate", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2}, {shouldParticipate: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0}, {shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, @@ -89,30 +111,46 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { { desc: "secondaries should not participate when primary's generation is unknown", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0}, {shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, }, { - // If the transaction didn't receive any votes at all, we need to assume - // that the RPC wasn't aware of transactions and thus need to schedule - // replication jobs. - desc: "unstarted transaction should create replication jobs", + // All transactional RPCs are expected to cast vote if they are successful. If they don't, something is wrong + // and we should replicate to the secondaries to be sure. + desc: "unstarted transaction creates replication jobs if the primary is successful", nodes: []node{ - {primary: true, shouldSucceed: true, shouldGetRepl: false, expectedGeneration: 1}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, expectedGeneration: 0}, + {primary: true, shouldGetRepl: false, expectedGeneration: 1}, + {primary: false, shouldGetRepl: true, expectedGeneration: 0}, }, }, { - // If the transaction didn't receive any votes at all, we need to assume - // that the RPC wasn't aware of transactions and thus need to schedule - // replication jobs. - desc: "unstarted transaction should create replication jobs for outdated node", + desc: "unstarted transaction does not create replication job", + primaryFails: true, nodes: []node{ - {primary: true, shouldSucceed: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, + {primary: true, expectedGeneration: 0}, + {primary: false, shouldGetRepl: false, expectedGeneration: 0}, + }, + }, + { + desc: "unstarted transaction should not create replication jobs for outdated node if the primary does not vote", + primaryFails: true, + nodes: []node{ + {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 1}, + {primary: false, shouldGetRepl: false, generation: 1, expectedGeneration: 1}, + {primary: false, shouldGetRepl: false, generation: 0, expectedGeneration: 0}, + {primary: false, shouldGetRepl: false, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, + }, + }, + { + // If there were no subtransactions and the primary did not fail, we should schedule replication jobs to every secondary. + // All transactional RPCs are expected to vote if they are successful. + desc: "unstarted transaction should create replication jobs for outdated node if the primary succeeds", + nodes: []node{ + {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2}, + {primary: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1}, + {primary: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0}, + {primary: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, }, } @@ -217,17 +255,27 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { go func() { defer voterWaitGroup.Done() - vote := voting.VoteFromData([]byte(node.vote)) - err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote) - if node.shouldSucceed { - assert.NoError(t, err) - } else { - assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) + for _, subtransaction := range node.subtransactions { + vote := voting.VoteFromData([]byte(subtransaction.vote)) + err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote) + if subtransaction.shouldSucceed { + if !assert.NoError(t, err) { + break + } + } else { + if !assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) { + break + } + } } }() } voterWaitGroup.Wait() + if tc.primaryFails { + streamParams.Primary().ErrHandler(errors.New("rpc failure")) + } + err = streamParams.RequestFinalizer() require.NoError(t, err) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index fdb4b543d..c7383491e 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "strings" "sync" "sync/atomic" "testing" @@ -12,6 +13,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1541,6 +1544,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { type mockTransaction struct { nodeStates map[string]transactions.VoteResult subtransactions int + didVote map[string]bool } func (t mockTransaction) ID() uint64 { @@ -1551,6 +1555,10 @@ func (t mockTransaction) CountSubtransactions() int { return t.subtransactions } +func (t mockTransaction) DidVote(node string) bool { + return t.didVote[node] +} + func (t mockTransaction) State() (map[string]transactions.VoteResult, error) { return t.nodeStates, nil } @@ -1568,13 +1576,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { anyErr := errors.New("arbitrary error") for _, tc := range []struct { - desc string - primary node - secondaries []node - replicas []string - subtransactions int - expectedOutdated []string - expectedUpdated []string + desc string + primary node + secondaries []node + replicas []string + subtransactions int + didVote map[string]bool + expectedPrimaryDirtied bool + expectedOutdated []string + expectedUpdated []string + expectedMetrics map[string]int }{ { desc: "single committed node", @@ -1582,7 +1593,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { name: "primary", state: transactions.VoteCommitted, }, - subtransactions: 1, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, }, { desc: "single failed node", @@ -1604,7 +1619,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { primary: node{ name: "primary", }, - subtransactions: 0, + subtransactions: 0, + expectedPrimaryDirtied: true, }, { desc: "single successful node with replica", @@ -1612,19 +1628,24 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { name: "primary", state: transactions.VoteCommitted, }, - replicas: []string{"replica"}, - subtransactions: 1, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { - desc: "single failing node with replica", + desc: "single failing node with replica is not considered modified", primary: node{ name: "primary", state: transactions.VoteFailed, }, - replicas: []string{"replica"}, - subtransactions: 1, - expectedOutdated: []string{"replica"}, + subtransactions: 1, }, { desc: "single erred node with replica", @@ -1633,18 +1654,40 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { state: transactions.VoteCommitted, err: anyErr, }, - replicas: []string{"replica"}, - subtransactions: 1, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, + }, + { + desc: "single erred node without commit with replica", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + replicas: []string{"replica"}, + subtransactions: 1, + expectedPrimaryDirtied: false, }, { desc: "single node without transaction with replica", primary: node{ name: "primary", }, - replicas: []string{"replica"}, - subtransactions: 0, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + subtransactions: 0, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "multiple committed nodes", @@ -1656,8 +1699,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedUpdated: []string{"s1", "s2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "updated": 2, + }, }, { desc: "multiple committed nodes with primary err", @@ -1670,8 +1720,59 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedOutdated: []string{"s1", "s2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "node-error-status": 2, + }, + }, + { + desc: "multiple committed nodes with same error as primary", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteCommitted, err: anyErr}, + {name: "s2", state: transactions.VoteCommitted, err: anyErr}, + }, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "updated": 2, + }, + }, + { + desc: "multiple committed nodes with different error as primary", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteCommitted, err: errors.New("somethingsomething")}, + {name: "s2", state: transactions.VoteCommitted, err: anyErr}, + }, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-error-status": 1, + "updated": 1, + }, }, { desc: "multiple committed nodes with secondary err", @@ -1683,9 +1784,40 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted, err: anyErr}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedUpdated: []string{"s2"}, - expectedOutdated: []string{"s1"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-error-status": 1, + "updated": 1, + }, + }, + { + desc: "multiple committed nodes with primary and missing secondary err", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteCommitted, err: anyErr}, + {name: "s2", state: transactions.VoteCommitted}, + }, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s1"}, + expectedOutdated: []string{"s2"}, + expectedMetrics: map[string]int{ + "node-error-status": 1, + "updated": 1, + }, }, { desc: "partial success", @@ -1697,9 +1829,17 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedUpdated: []string{"s2"}, - expectedOutdated: []string{"s1"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-not-committed": 1, + "updated": 1, + }, }, { desc: "failure with (impossible) secondary success", @@ -1711,8 +1851,31 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedOutdated: []string{"s1", "s2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "primary-not-committed": 2, + }, + }, + { + desc: "failure with no primary votes", + primary: node{ + name: "primary", + state: transactions.VoteFailed, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted}, + }, + didVote: map[string]bool{ + "s1": true, + "s2": true, + }, + subtransactions: 1, }, { desc: "multiple nodes without subtransactions", @@ -1724,8 +1887,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 0, - expectedOutdated: []string{"s1", "s2"}, + subtransactions: 0, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "no-votes": 2, + }, }, { desc: "multiple nodes with replica and partial failures", @@ -1737,10 +1904,19 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - replicas: []string{"r1", "r2"}, - subtransactions: 1, - expectedOutdated: []string{"s1", "r1", "r2"}, - expectedUpdated: []string{"s2"}, + replicas: []string{"r1", "r2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "r1", "r2"}, + expectedUpdated: []string{"s2"}, + expectedMetrics: map[string]int{ + "node-not-committed": 1, + "outdated": 2, + "updated": 1, + }, }, { desc: "multiple nodes with replica and partial err", @@ -1752,9 +1928,18 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted, err: anyErr}, }, - replicas: []string{"r1", "r2"}, - subtransactions: 1, - expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + replicas: []string{"r1", "r2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + expectedMetrics: map[string]int{ + "node-error-status": 1, + "node-not-committed": 1, + "outdated": 2, + }, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -1778,6 +1963,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { transaction := mockTransaction{ nodeStates: states, subtransactions: tc.subtransactions, + didVote: tc.didVote, } route := RepositoryMutatorRoute{ @@ -1792,9 +1978,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + metric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "stub", Help: "help", + }, []string{"reason"}) + + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric) + require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) + + expectedMetrics := "# HELP stub help\n# TYPE stub counter\n" + for metric, value := range tc.expectedMetrics { + expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value) + } + + require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics))) }) } } diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 4193ea1f2..eda87702b 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -297,3 +297,21 @@ func (t *subtransaction) getResult(node string) (VoteResult, error) { return voter.result, nil } + +func (t *subtransaction) getVote(node string) (*voting.Vote, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + voter, ok := t.votersByNode[node] + if !ok { + return nil, fmt.Errorf("invalid node for transaction: %q", node) + } + + if voter.vote == nil { + return nil, nil + } + + // Return a copy of the vote. + vote := *voter.vote + return &vote, nil +} diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 90d8f6bf9..6baf696a5 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -57,6 +57,8 @@ type Transaction interface { CountSubtransactions() int // State returns the state of each voter part of the transaction. State() (map[string]VoteResult, error) + // DidVote returns whether the given node has cast a vote. + DidVote(string) bool } // transaction is a session where a set of voters votes on one or more @@ -184,6 +186,28 @@ func (t *transaction) CountSubtransactions() int { return len(t.subtransactions) } +// DidVote determines whether the given node did cast a vote. If it's not possible to retrieve the +// vote, then the node by definition didn't cast a vote. +func (t *transaction) DidVote(node string) bool { + t.lock.Lock() + defer t.lock.Unlock() + + // If there are no subtransactions, then no vote could've been cast by the given node. + if len(t.subtransactions) == 0 { + return false + } + + // It's sufficient to take a look at the first transaction. + vote, err := t.subtransactions[0].getVote(node) + if err != nil { + // If it's not possible to retrieve the vote, then we consider the note to not have + // cast a vote. + return false + } + + return vote != nil +} + // getOrCreateSubtransaction gets an ongoing subtransaction on which the given // node hasn't yet voted on or creates a new one if the node has succeeded on // all subtransactions. In case the node has failed on any of the diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go index c20213c10..1f3074afc 100644 --- a/internal/praefect/transactions/transaction_test.go +++ b/internal/praefect/transactions/transaction_test.go @@ -24,3 +24,30 @@ func TestTransactionCancellationWithEmptyTransaction(t *testing.T) { require.Error(t, err) require.Equal(t, err, ErrTransactionCanceled) } + +func TestTransaction_DidVote(t *testing.T) { + ctx, cleanup := testhelper.Context() + defer cleanup() + + tx, err := newTransaction(1, []Voter{ + {Name: "v1", Votes: 1}, + {Name: "v2", Votes: 0}, + }, 1) + require.NoError(t, err) + + // An unregistered voter did not vote. + require.False(t, tx.DidVote("unregistered")) + // And neither of the registered ones did cast a vote yet. + require.False(t, tx.DidVote("v1")) + require.False(t, tx.DidVote("v2")) + + // One of both nodes does cast a vote. + require.NoError(t, tx.vote(ctx, "v1", voting.VoteFromData([]byte{}))) + require.True(t, tx.DidVote("v1")) + require.False(t, tx.DidVote("v2")) + + // And now the second node does cast a vote, too. + require.NoError(t, tx.vote(ctx, "v2", voting.VoteFromData([]byte{}))) + require.True(t, tx.DidVote("v1")) + require.True(t, tx.DidVote("v2")) +} |