diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2021-05-20 21:37:52 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2021-05-20 21:37:52 +0300 |
commit | c202c0760cecf7b29ddb9ec46a794e208f4da82b (patch) | |
tree | 99f88af98c1e0a39e2e2d5643d45cc9a639d43b9 | |
parent | 81efd4ff7c7e775d5b6de43aa84042d82462d1f4 (diff) | |
parent | 978695b2b0555871c6f5413753dd663f666ec3da (diff) |
Merge branch 'smh-check-if-primary-was-modified' into 'master'
Consider primary modified only if a subtransaction was committed
See merge request gitlab-org/gitaly!3494
-rw-r--r-- | internal/praefect/coordinator.go | 48 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 131 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 120 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 17 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_ext_test.go | 523 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 532 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 24 |
7 files changed, 791 insertions, 604 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 3286b3b31..887116148 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -739,7 +739,13 @@ func (c *Coordinator) createTransactionFinalizer( nodeErrors *nodeErrors, ) func() error { return func() error { - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + if !primaryDirtied { + // If the primary replica was not modified then we don't need to consider the secondaries + // outdated. Praefect requires the primary to be always part of the quorum, so no changes + // to secondaries would be made without primary being in agreement. + return nil + } return c.newRequestFinalizer( ctx, virtualStorage, targetRepo, route.Primary.Storage, @@ -750,9 +756,11 @@ func (c *Coordinator) createTransactionFinalizer( // getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated // after the given transaction. A node is considered outdated, if one of the following is true: // -// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware -// of transactions and thus failed to correctly assert its state matches across nodes. This is -// rather pessimistic, as it could also indicate that an RPC simply didn't change anything. +// - No subtransactions were created and the RPC was successful on the primary. This really is only +// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert +// its state matches across nodes. This is rather pessimistic, as it could also indicate that an +// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no +// subtransactions, we assume no changes were done and that the nodes failed prior to voting. // // - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all // nodes need to get replication jobs. @@ -767,7 +775,7 @@ func getUpdatedAndOutdatedSecondaries( route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, -) (updated []string, outdated []string) { +) (primaryDirtied bool, updated []string, outdated []string) { nodeErrors.Lock() defer nodeErrors.Unlock() @@ -776,17 +784,31 @@ func getUpdatedAndOutdatedSecondaries( // for them. outdated = append(outdated, route.ReplicationTargets...) + primaryErr := nodeErrors.errByNode[route.Primary.Storage] + + // If there were subtransactions, we only assume some changes were made if one of the subtransactions + // was committed. + // + // If there were no subtransactions, we assume changes were performed only if the primary successfully + // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere. + // + // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and + // the nodes simply failed before voting. + primaryDirtied = transaction.DidCommitAnySubtransaction() || + (transaction.CountSubtransactions() == 0 && primaryErr == nil) + // If the primary errored, then we need to assume that it has modified on-disk state and // thus need to replicate those changes to secondaries. - if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil { - ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction") + if primaryErr != nil { + ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) return } - // If no subtransaction happened, then the called RPC may not be aware of transactions at - // all. We thus need to assume it changed repository state and need to create replication - // jobs. + // If no subtransaction happened, then the called RPC may not be aware of transactions or + // the nodes failed before casting any votes. If the primary failed the RPC, we assume + // no changes were done and the nodes hit an error prior to voting. If the primary processed + // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere. if transaction.CountSubtransactions() == 0 { ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) @@ -802,9 +824,9 @@ func getUpdatedAndOutdatedSecondaries( return } - // If the primary node did not commit the transaction, then we must assume that it dirtied - // on-disk state. This modified state may not be what we want, but it's what we got. So in - // order to ensure a consistent state, we need to replicate. + // If the primary node did not commit the transaction but there were some subtransactions committed, + // then we must assume that it dirtied on-disk state. This modified state may not be what we want, + // but it's what we got. So in order to ensure a consistent state, we need to replicate. if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted { if state == transactions.VoteFailed { ctxlogrus.Extract(ctx).Error("transaction: primary failed vote") diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index eb769eaa3..df5dec438 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -32,10 +32,14 @@ func getDB(t *testing.T) glsql.DB { } func TestStreamDirectorMutator_Transaction(t *testing.T) { + type subtransactions []struct { + vote string + shouldSucceed bool + } + type node struct { primary bool - vote string - shouldSucceed bool + subtransactions subtransactions shouldGetRepl bool shouldParticipate bool generation int @@ -43,45 +47,63 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } testcases := []struct { - desc string - nodes []node + desc string + primaryFails bool + nodes []node }{ { desc: "successful vote should not create replication jobs", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + }, + }, + { + desc: "successful vote should create replication jobs if the primary fails", + primaryFails: true, + nodes: []node{ + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + }, + }, + { + desc: "failing vote should not create replication jobs without committed subtransactions", + nodes: []node{ + {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0}, }, }, { - desc: "failing vote should not create replication jobs", + desc: "failing vote should create replication jobs with committed subtransaction", nodes: []node{ - {primary: true, vote: "foo", shouldSucceed: false, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "qux", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, - {primary: false, vote: "bar", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { desc: "primary should reach quorum with disagreeing secondary", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { desc: "quorum should create replication jobs for disagreeing node", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, - {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { desc: "only consistent secondaries should participate", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2}, - {primary: false, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2}, + {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2}, {shouldParticipate: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0}, {shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, @@ -89,30 +111,51 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { { desc: "secondaries should not participate when primary's generation is unknown", nodes: []node{ - {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0}, {shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, }, { - // If the transaction didn't receive any votes at all, we need to assume - // that the RPC wasn't aware of transactions and thus need to schedule - // replication jobs. - desc: "unstarted transaction should create replication jobs", + // All transactional RPCs are expected to cast vote if they are successful. If they don't, something is wrong + // and we should replicate to the secondaries to be sure. + desc: "unstarted transaction creates replication jobs if the primary is successful", nodes: []node{ - {primary: true, shouldSucceed: true, shouldGetRepl: false, expectedGeneration: 1}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, expectedGeneration: 0}, + {primary: true, shouldGetRepl: false, expectedGeneration: 1}, + {primary: false, shouldGetRepl: true, expectedGeneration: 0}, }, }, { - // If the transaction didn't receive any votes at all, we need to assume - // that the RPC wasn't aware of transactions and thus need to schedule - // replication jobs. - desc: "unstarted transaction should create replication jobs for outdated node", + // If the RPC fails without any subtransactions, the Gitalys would not have performed any changes yet. + // We don't have to consider the secondaries outdated. + desc: "unstarted transaction doesn't create replication jobs if the primary fails", + primaryFails: true, nodes: []node{ - {primary: true, shouldSucceed: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0}, - {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, + {primary: true, expectedGeneration: 0}, + {primary: false, expectedGeneration: 0}, + }, + }, + { + // If there were no subtransactions and the RPC failed, the primary should not have performed any changes. + // We don't need to schedule replication jobs to replication targets either as they'd have jobs + // already scheduled by the earlier RPC that made them outdated or by the reconciler. + desc: "unstarted transaction should not create replication jobs for outdated node if the primary fails", + primaryFails: true, + nodes: []node{ + {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 1}, + {primary: false, shouldGetRepl: false, generation: 1, expectedGeneration: 1}, + {primary: false, shouldGetRepl: false, generation: 0, expectedGeneration: 0}, + {primary: false, shouldGetRepl: false, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, + }, + }, + { + // If there were no subtransactions and the primary did not fail, we should schedule replication jobs to every secondary. + // All transactional RPCs are expected to vote if they are successful. + desc: "unstarted transaction should create replication jobs for outdated node if the primary succeeds", + nodes: []node{ + {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2}, + {primary: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1}, + {primary: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0}, + {primary: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, }, } @@ -217,17 +260,27 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { go func() { defer voterWaitGroup.Done() - vote := voting.VoteFromData([]byte(node.vote)) - err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote) - if node.shouldSucceed { - assert.NoError(t, err) - } else { - assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) + for _, subtransaction := range node.subtransactions { + vote := voting.VoteFromData([]byte(subtransaction.vote)) + err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote) + if subtransaction.shouldSucceed { + if !assert.NoError(t, err) { + break + } + } else { + if !assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) { + break + } + } } }() } voterWaitGroup.Wait() + if tc.primaryFails { + streamParams.Primary().ErrHandler(errors.New("rpc failure")) + } + err = streamParams.RequestFinalizer() require.NoError(t, err) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 6f85bd875..b0bee67d3 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1539,8 +1539,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { } type mockTransaction struct { - nodeStates map[string]transactions.VoteResult - subtransactions int + nodeStates map[string]transactions.VoteResult + subtransactions int + didCommitAnySubtransaction bool } func (t mockTransaction) ID() uint64 { @@ -1551,6 +1552,10 @@ func (t mockTransaction) CountSubtransactions() int { return t.subtransactions } +func (t mockTransaction) DidCommitAnySubtransaction() bool { + return t.didCommitAnySubtransaction +} + func (t mockTransaction) State() (map[string]transactions.VoteResult, error) { return t.nodeStates, nil } @@ -1568,13 +1573,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { anyErr := errors.New("arbitrary error") for _, tc := range []struct { - desc string - primary node - secondaries []node - replicas []string - subtransactions int - expectedOutdated []string - expectedUpdated []string + desc string + primary node + secondaries []node + replicas []string + subtransactions int + didCommitAnySubtransaction bool + expectedPrimaryDirtied bool + expectedOutdated []string + expectedUpdated []string }{ { desc: "single committed node", @@ -1582,7 +1589,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { name: "primary", state: transactions.VoteCommitted, }, - subtransactions: 1, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, }, { desc: "single failed node", @@ -1604,7 +1613,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { primary: node{ name: "primary", }, - subtransactions: 0, + subtransactions: 0, + expectedPrimaryDirtied: true, }, { desc: "single successful node with replica", @@ -1612,9 +1622,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { name: "primary", state: transactions.VoteCommitted, }, - replicas: []string{"replica"}, - subtransactions: 1, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, }, { desc: "single failing node with replica", @@ -1633,18 +1645,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { state: transactions.VoteCommitted, err: anyErr, }, - replicas: []string{"replica"}, - subtransactions: 1, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, }, { desc: "single node without transaction with replica", primary: node{ name: "primary", }, - replicas: []string{"replica"}, - subtransactions: 0, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + subtransactions: 0, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, }, { desc: "multiple committed nodes", @@ -1656,8 +1671,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedUpdated: []string{"s1", "s2"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s1", "s2"}, }, { desc: "multiple committed nodes with primary err", @@ -1670,8 +1687,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedOutdated: []string{"s1", "s2"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, }, { desc: "multiple committed nodes with secondary err", @@ -1683,9 +1702,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted, err: anyErr}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedUpdated: []string{"s2"}, - expectedOutdated: []string{"s1"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, }, { desc: "partial success", @@ -1697,9 +1718,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedUpdated: []string{"s2"}, - expectedOutdated: []string{"s1"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, }, { desc: "failure with (impossible) secondary success", @@ -1711,8 +1734,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 1, - expectedOutdated: []string{"s1", "s2"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, }, { desc: "multiple nodes without subtransactions", @@ -1724,8 +1749,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - subtransactions: 0, - expectedOutdated: []string{"s1", "s2"}, + subtransactions: 0, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, }, { desc: "multiple nodes with replica and partial failures", @@ -1737,10 +1763,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - replicas: []string{"r1", "r2"}, - subtransactions: 1, - expectedOutdated: []string{"s1", "r1", "r2"}, - expectedUpdated: []string{"s2"}, + replicas: []string{"r1", "r2"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "r1", "r2"}, + expectedUpdated: []string{"s2"}, }, { desc: "multiple nodes with replica and partial err", @@ -1752,9 +1780,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted, err: anyErr}, }, - replicas: []string{"r1", "r2"}, - subtransactions: 1, - expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + replicas: []string{"r1", "r2"}, + didCommitAnySubtransaction: true, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2", "r1", "r2"}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -1776,8 +1806,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } transaction := mockTransaction{ - nodeStates: states, - subtransactions: tc.subtransactions, + nodeStates: states, + subtransactions: tc.subtransactions, + didCommitAnySubtransaction: tc.didCommitAnySubtransaction, } route := RepositoryMutatorRoute{ @@ -1792,7 +1823,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) }) diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index aeac70181..743aedf8a 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -10,6 +10,7 @@ package proxy import ( "context" "errors" + "fmt" "io" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" @@ -96,6 +97,20 @@ type streamAndDestination struct { cancel func() } +// failDestinationWithErrors marks all of the destinations in the StreamParameters as +// having failed with the given error. +func failDestinationsWithError(params *StreamParameters, err error) { + if params.Primary().ErrHandler != nil { + _ = params.Primary().ErrHandler(err) + } + + for _, secondary := range params.Secondaries() { + if secondary.ErrHandler != nil { + _ = secondary.ErrHandler(err) + } + } +} + // handler is where the real magic of proxying happens. // It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, // forwarding it to a ClientStream established against the relevant ClientConn. @@ -127,6 +142,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina primaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Primary().Conn, fullMethodName, params.CallOptions()...) if err != nil { + failDestinationsWithError(params, fmt.Errorf("initiate primary stream: %w", err)) return err } @@ -143,6 +159,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, destination.Conn, fullMethodName, params.CallOptions()...) if err != nil { + failDestinationsWithError(params, fmt.Errorf("initiate secondary stream: %w", err)) return err } secondaryStreams = append(secondaryStreams, streamAndDestination{ diff --git a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go new file mode 100644 index 000000000..eb47a6240 --- /dev/null +++ b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go @@ -0,0 +1,523 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy_test + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "net/url" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/getsentry/sentry-go" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" + "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "go.uber.org/goleak" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const ( + pingDefaultValue = "I like kittens." + clientMdKey = "test-client-header" + serverHeaderMdKey = "test-client-header" + serverTrailerMdKey = "test-client-trailer" + + rejectingMdKey = "test-reject-rpc-if-in-context" + + countListResponses = 20 +) + +func TestMain(m *testing.M) { + defer testhelper.MustHaveNoChildProcess() + cleanup := testhelper.Configure() + defer cleanup() + + goleak.VerifyTestMain(m) +} + +// asserting service is implemented on the server side and serves as a handler for stuff +type assertingService struct { + t *testing.T +} + +func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) { + // Check that this call has client's metadata. + md, ok := metadata.FromIncomingContext(ctx) + assert.True(s.t, ok, "PingEmpty call must have metadata in context") + _, ok = md[clientMdKey] + assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata") + return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil +} + +func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) { + // Send user trailers and headers. + require.NoError(s.t, grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles."))) + require.NoError(s.t, grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))) + return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil +} + +func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) { + return nil, status.Errorf(codes.ResourceExhausted, "Userspace error.") +} + +func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error { + // Send user trailers and headers. + require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))) + for i := 0; i < countListResponses; i++ { + require.NoError(s.t, stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)})) + } + stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) + return nil +} + +func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error { + require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))) + counter := int32(0) + for { + ping, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + require.NoError(s.t, err, "can't fail reading stream") + return err + } + pong := &pb.PingResponse{Value: ping.Value, Counter: counter} + if err := stream.Send(pong); err != nil { + require.NoError(s.t, err, "can't fail sending back a pong") + } + counter++ + } + stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) + return nil +} + +// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues. +type ProxyHappySuite struct { + suite.Suite + + serverListener net.Listener + server *grpc.Server + proxyListener net.Listener + proxy *grpc.Server + serverClientConn *grpc.ClientConn + + client *grpc.ClientConn + testClient pb.TestServiceClient + testClientConn *grpc.ClientConn +} + +func (s *ProxyHappySuite) ctx() context.Context { + // Make all RPC calls last at most 1 sec, meaning all async issues or deadlock will not kill tests. + ctx, _ := context.WithTimeout(context.TODO(), 120*time.Second) // nolint: govet + return ctx +} + +func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() { + ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(clientMdKey, "true")) + out, err := s.testClient.PingEmpty(ctx, &pb.Empty{}) + require.NoError(s.T(), err, "PingEmpty should succeed without errors") + require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out) +} + +func (s *ProxyHappySuite) TestPingEmpty_StressTest() { + for i := 0; i < 50; i++ { + s.TestPingEmptyCarriesClientMetadata() + } +} + +func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { + headerMd := make(metadata.MD) + trailerMd := make(metadata.MD) + // This is an awkward calling convention... but meh. + out, err := s.testClient.Ping(s.ctx(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd)) + require.NoError(s.T(), err, "Ping should succeed without errors") + require.Equal(s.T(), &pb.PingResponse{Value: "foo", Counter: 42}, out) + assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data") + assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") +} + +func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { + sentryTriggered := 0 + sentrySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sentryTriggered++ + })) + defer sentrySrv.Close() + + // minimal required sentry client configuration + sentryURL, err := url.Parse(sentrySrv.URL) + require.NoError(s.T(), err) + sentryURL.User = url.UserPassword("stub", "stub") + sentryURL.Path = "/stub/1" + + require.NoError(s.T(), sentry.Init(sentry.ClientOptions{ + Dsn: sentryURL.String(), + Transport: sentry.NewHTTPSyncTransport(), + })) + + sentry.CaptureEvent(sentry.NewEvent()) + require.Equal(s.T(), 1, sentryTriggered, "sentry configured incorrectly") + + _, err = s.testClient.PingError(s.ctx(), &pb.PingRequest{Value: "foo"}) + require.Error(s.T(), err, "PingError should never succeed") + assert.Equal(s.T(), codes.ResourceExhausted, status.Code(err)) + assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message()) + require.Equal(s.T(), 1, sentryTriggered, "sentry must not be triggered because errors from remote must be just propagated") +} + +func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { + // See SetupSuite where the StreamDirector has a special case. + ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(rejectingMdKey, "true")) + _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"}) + require.Error(s.T(), err, "Director should reject this RPC") + assert.Equal(s.T(), codes.PermissionDenied, status.Code(err)) + assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) +} + +func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { + stream, err := s.testClient.PingStream(s.ctx()) + require.NoError(s.T(), err, "PingStream request should be successful.") + + for i := 0; i < countListResponses; i++ { + ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)} + require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail") + resp, err := stream.Recv() + if err == io.EOF { + break + } + if i == 0 { + // Check that the header arrives before all entries. + headerMd, err := stream.Header() + require.NoError(s.T(), err, "PingStream headers should not error.") + assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata") + } + assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id") + } + require.NoError(s.T(), stream.CloseSend(), "no error on close send") + _, err = stream.Recv() + require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") + // Check that the trailer headers are here. + trailerMd := stream.Trailer() + assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") +} + +func (s *ProxyHappySuite) TestPingStream_StressTest() { + for i := 0; i < 50; i++ { + s.TestPingStream_FullDuplexWorks() + } +} + +func (s *ProxyHappySuite) SetupSuite() { + var err error + + s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for proxyListener") + s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + + s.server = grpc.NewServer() + pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) + + // Setup of the proxy's Director. + s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) + require.NoError(s.T(), err, "must not error on deferred client Dial") + director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + payload, err := peeker.Peek() + if err != nil { + return nil, err + } + + md, ok := metadata.FromIncomingContext(ctx) + if ok { + if _, exists := md[rejectingMdKey]; exists { + return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") + } + } + + // Explicitly copy the metadata, otherwise the tests will fail. + return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: s.serverClientConn, Msg: payload}, nil, nil, nil), nil + } + + s.proxy = grpc.NewServer( + grpc.CustomCodec(proxy.NewCodec()), + grpc.StreamInterceptor( + grpc_middleware.ChainStreamServer( + // context tags usage is required by sentryhandler.StreamLogHandler + grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)), + // sentry middleware to capture errors + sentryhandler.StreamLogHandler, + ), + ), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + ) + // Ping handler is handled as an explicit registration and not as a TransparentHandler. + proxy.RegisterService(s.proxy, director, + "mwitkow.testproto.TestService", + "Ping") + + // Start the serving loops. + go func() { + s.server.Serve(s.serverListener) + }() + go func() { + s.proxy.Serve(s.proxyListener) + }() + + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() + + s.testClientConn, err = grpc.DialContext(ctx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure()) + require.NoError(s.T(), err, "must not error on deferred client Dial") + s.testClient = pb.NewTestServiceClient(s.testClientConn) +} + +func (s *ProxyHappySuite) TearDownSuite() { + if s.client != nil { + s.client.Close() + } + if s.testClientConn != nil { + s.testClientConn.Close() + } + if s.serverClientConn != nil { + s.serverClientConn.Close() + } + // Close all transports so the logs don't get spammy. + time.Sleep(10 * time.Millisecond) + if s.proxy != nil { + s.proxy.Stop() + s.proxyListener.Close() + } + if s.serverListener != nil { + s.server.Stop() + s.serverListener.Close() + } +} + +func TestProxyHappySuite(t *testing.T) { + suite.Run(t, &ProxyHappySuite{}) +} + +func TestProxyErrorPropagation(t *testing.T) { + errBackend := status.Error(codes.InvalidArgument, "backend error") + errDirector := status.Error(codes.FailedPrecondition, "director error") + errRequestFinalizer := status.Error(codes.Internal, "request finalizer error") + + for _, tc := range []struct { + desc string + backendError error + directorError error + requestFinalizerError error + returnedError error + errHandler func(error) error + }{ + { + desc: "backend error is propagated", + backendError: errBackend, + returnedError: errBackend, + }, + { + desc: "director error is propagated", + directorError: errDirector, + returnedError: errDirector, + }, + { + desc: "request finalizer error is propagated", + requestFinalizerError: errRequestFinalizer, + returnedError: errRequestFinalizer, + }, + { + desc: "director error cancels proxying", + backendError: errBackend, + requestFinalizerError: errRequestFinalizer, + directorError: errDirector, + returnedError: errDirector, + }, + { + desc: "backend error prioritized over request finalizer error", + backendError: errBackend, + requestFinalizerError: errRequestFinalizer, + returnedError: errBackend, + }, + { + desc: "err handler gets error", + backendError: errBackend, + requestFinalizerError: errRequestFinalizer, + returnedError: errBackend, + errHandler: func(err error) error { + require.Equal(t, errBackend, err) + return errBackend + }, + }, + { + desc: "err handler can swallow error", + backendError: errBackend, + returnedError: io.EOF, + errHandler: func(err error) error { + require.Equal(t, errBackend, err) + return nil + }, + }, + { + desc: "swallowed error surfaces request finalizer error", + backendError: errBackend, + requestFinalizerError: errRequestFinalizer, + returnedError: errRequestFinalizer, + errHandler: func(err error) error { + require.Equal(t, errBackend, err) + return nil + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tmpDir := testhelper.TempDir(t) + + backendListener, err := net.Listen("unix", filepath.Join(tmpDir, "backend")) + require.NoError(t, err) + + backendServer := grpc.NewServer(grpc.UnknownServiceHandler(func(interface{}, grpc.ServerStream) error { + return tc.backendError + })) + go func() { backendServer.Serve(backendListener) }() + defer backendServer.Stop() + + ctx, cancel := testhelper.Context() + defer cancel() + + backendClientConn, err := grpc.DialContext(ctx, "unix://"+backendListener.Addr().String(), + grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) + require.NoError(t, err) + defer func() { + require.NoError(t, backendClientConn.Close()) + }() + + proxyListener, err := net.Listen("unix", filepath.Join(tmpDir, "proxy")) + require.NoError(t, err) + + proxyServer := grpc.NewServer( + grpc.CustomCodec(proxy.NewCodec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + return proxy.NewStreamParameters( + proxy.Destination{ + Ctx: ctx, + Conn: backendClientConn, + ErrHandler: tc.errHandler, + }, + nil, + func() error { return tc.requestFinalizerError }, + nil, + ), tc.directorError + })), + ) + + go func() { proxyServer.Serve(proxyListener) }() + defer proxyServer.Stop() + + proxyClientConn, err := grpc.DialContext(ctx, "unix://"+proxyListener.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + defer func() { + require.NoError(t, proxyClientConn.Close()) + }() + + resp, err := pb.NewTestServiceClient(proxyClientConn).Ping(ctx, &pb.PingRequest{}) + require.Equal(t, tc.returnedError, err) + require.Nil(t, resp) + }) + } +} + +func TestRegisterStreamHandlers(t *testing.T) { + directorCalledError := errors.New("director was called") + + server := grpc.NewServer( + grpc.CustomCodec(proxy.NewCodec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + return nil, directorCalledError + })), + ) + + var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool + + pingValue := "hello" + + pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { + pingStreamHandlerCalled = true + var req pb.PingRequest + + if err := stream.RecvMsg(&req); err != nil { + return err + } + + require.Equal(t, pingValue, req.Value) + + return stream.SendMsg(nil) + } + + pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { + pingEmptyStreamHandlerCalled = true + var req pb.Empty + + if err := stream.RecvMsg(&req); err != nil { + return err + } + + return stream.SendMsg(nil) + } + + streamers := map[string]grpc.StreamHandler{ + "Ping": pingStreamHandler, + "PingEmpty": pingEmptyStreamHandler, + } + + proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers) + + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) + + listener, err := net.Listen("unix", serverSocketPath) + if err != nil { + t.Fatal(err) + } + + go server.Serve(listener) + defer server.Stop() + + cc, err := client.Dial("unix://"+serverSocketPath, []grpc.DialOption{grpc.WithBlock()}) + require.NoError(t, err) + defer cc.Close() + + testServiceClient := pb.NewTestServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue}) + require.NoError(t, err) + require.True(t, pingStreamHandlerCalled) + + _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{}) + require.NoError(t, err) + require.True(t, pingEmptyStreamHandlerCalled) + + // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler + _, err = testServiceClient.PingError(ctx, &pb.PingRequest{}) + require.Equal(t, status.Error(codes.Unknown, directorCalledError.Error()), err) +} diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index eb47a6240..b33fcba43 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -1,523 +1,39 @@ -// Copyright 2017 Michal Witkowski. All Rights Reserved. -// See LICENSE for licensing terms. - -package proxy_test +package proxy import ( - "context" "errors" - "fmt" - "io" - "net" - "net/http" - "net/http/httptest" - "net/url" - "path/filepath" - "strings" "testing" - "time" - "github.com/getsentry/sentry-go" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "gitlab.com/gitlab-org/gitaly/client" - "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" - "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" - "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata" - "gitlab.com/gitlab-org/gitaly/internal/testhelper" - "go.uber.org/goleak" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -const ( - pingDefaultValue = "I like kittens." - clientMdKey = "test-client-header" - serverHeaderMdKey = "test-client-header" - serverTrailerMdKey = "test-client-trailer" - - rejectingMdKey = "test-reject-rpc-if-in-context" - - countListResponses = 20 ) -func TestMain(m *testing.M) { - defer testhelper.MustHaveNoChildProcess() - cleanup := testhelper.Configure() - defer cleanup() - - goleak.VerifyTestMain(m) -} - -// asserting service is implemented on the server side and serves as a handler for stuff -type assertingService struct { - t *testing.T -} - -func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) { - // Check that this call has client's metadata. - md, ok := metadata.FromIncomingContext(ctx) - assert.True(s.t, ok, "PingEmpty call must have metadata in context") - _, ok = md[clientMdKey] - assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata") - return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil -} - -func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) { - // Send user trailers and headers. - require.NoError(s.t, grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles."))) - require.NoError(s.t, grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))) - return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil -} - -func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) { - return nil, status.Errorf(codes.ResourceExhausted, "Userspace error.") -} - -func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error { - // Send user trailers and headers. - require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))) - for i := 0; i < countListResponses; i++ { - require.NoError(s.t, stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)})) - } - stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) - return nil -} - -func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error { - require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles."))) - counter := int32(0) - for { - ping, err := stream.Recv() - if err == io.EOF { - break - } else if err != nil { - require.NoError(s.t, err, "can't fail reading stream") - return err - } - pong := &pb.PingResponse{Value: ping.Value, Counter: counter} - if err := stream.Send(pong); err != nil { - require.NoError(s.t, err, "can't fail sending back a pong") - } - counter++ - } - stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) - return nil -} - -// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues. -type ProxyHappySuite struct { - suite.Suite - - serverListener net.Listener - server *grpc.Server - proxyListener net.Listener - proxy *grpc.Server - serverClientConn *grpc.ClientConn - - client *grpc.ClientConn - testClient pb.TestServiceClient - testClientConn *grpc.ClientConn -} - -func (s *ProxyHappySuite) ctx() context.Context { - // Make all RPC calls last at most 1 sec, meaning all async issues or deadlock will not kill tests. - ctx, _ := context.WithTimeout(context.TODO(), 120*time.Second) // nolint: govet - return ctx -} - -func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() { - ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(clientMdKey, "true")) - out, err := s.testClient.PingEmpty(ctx, &pb.Empty{}) - require.NoError(s.T(), err, "PingEmpty should succeed without errors") - require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out) -} - -func (s *ProxyHappySuite) TestPingEmpty_StressTest() { - for i := 0; i < 50; i++ { - s.TestPingEmptyCarriesClientMetadata() - } -} - -func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { - headerMd := make(metadata.MD) - trailerMd := make(metadata.MD) - // This is an awkward calling convention... but meh. - out, err := s.testClient.Ping(s.ctx(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd)) - require.NoError(s.T(), err, "Ping should succeed without errors") - require.Equal(s.T(), &pb.PingResponse{Value: "foo", Counter: 42}, out) - assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data") - assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") -} - -func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { - sentryTriggered := 0 - sentrySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sentryTriggered++ - })) - defer sentrySrv.Close() - - // minimal required sentry client configuration - sentryURL, err := url.Parse(sentrySrv.URL) - require.NoError(s.T(), err) - sentryURL.User = url.UserPassword("stub", "stub") - sentryURL.Path = "/stub/1" - - require.NoError(s.T(), sentry.Init(sentry.ClientOptions{ - Dsn: sentryURL.String(), - Transport: sentry.NewHTTPSyncTransport(), - })) - - sentry.CaptureEvent(sentry.NewEvent()) - require.Equal(s.T(), 1, sentryTriggered, "sentry configured incorrectly") - - _, err = s.testClient.PingError(s.ctx(), &pb.PingRequest{Value: "foo"}) - require.Error(s.T(), err, "PingError should never succeed") - assert.Equal(s.T(), codes.ResourceExhausted, status.Code(err)) - assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message()) - require.Equal(s.T(), 1, sentryTriggered, "sentry must not be triggered because errors from remote must be just propagated") -} - -func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { - // See SetupSuite where the StreamDirector has a special case. - ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(rejectingMdKey, "true")) - _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"}) - require.Error(s.T(), err, "Director should reject this RPC") - assert.Equal(s.T(), codes.PermissionDenied, status.Code(err)) - assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) -} - -func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { - stream, err := s.testClient.PingStream(s.ctx()) - require.NoError(s.T(), err, "PingStream request should be successful.") - - for i := 0; i < countListResponses; i++ { - ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)} - require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail") - resp, err := stream.Recv() - if err == io.EOF { - break - } - if i == 0 { - // Check that the header arrives before all entries. - headerMd, err := stream.Header() - require.NoError(s.T(), err, "PingStream headers should not error.") - assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata") - } - assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id") - } - require.NoError(s.T(), stream.CloseSend(), "no error on close send") - _, err = stream.Recv() - require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") - // Check that the trailer headers are here. - trailerMd := stream.Trailer() - assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") -} - -func (s *ProxyHappySuite) TestPingStream_StressTest() { - for i := 0; i < 50; i++ { - s.TestPingStream_FullDuplexWorks() - } -} - -func (s *ProxyHappySuite) SetupSuite() { - var err error - - s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") - require.NoError(s.T(), err, "must be able to allocate a port for proxyListener") - s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") - require.NoError(s.T(), err, "must be able to allocate a port for serverListener") +func TestFailDestinationWithError(t *testing.T) { + expectedErr := errors.New("some error") - s.server = grpc.NewServer() - pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) - - // Setup of the proxy's Director. - s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) - require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { - payload, err := peeker.Peek() - if err != nil { - return nil, err - } - - md, ok := metadata.FromIncomingContext(ctx) - if ok { - if _, exists := md[rejectingMdKey]; exists { - return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") - } - } - - // Explicitly copy the metadata, otherwise the tests will fail. - return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: s.serverClientConn, Msg: payload}, nil, nil, nil), nil - } - - s.proxy = grpc.NewServer( - grpc.CustomCodec(proxy.NewCodec()), - grpc.StreamInterceptor( - grpc_middleware.ChainStreamServer( - // context tags usage is required by sentryhandler.StreamLogHandler - grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)), - // sentry middleware to capture errors - sentryhandler.StreamLogHandler, - ), - ), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), - ) - // Ping handler is handled as an explicit registration and not as a TransparentHandler. - proxy.RegisterService(s.proxy, director, - "mwitkow.testproto.TestService", - "Ping") - - // Start the serving loops. - go func() { - s.server.Serve(s.serverListener) - }() - go func() { - s.proxy.Serve(s.proxyListener) - }() - - ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) - defer cancel() - - s.testClientConn, err = grpc.DialContext(ctx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure()) - require.NoError(s.T(), err, "must not error on deferred client Dial") - s.testClient = pb.NewTestServiceClient(s.testClientConn) -} - -func (s *ProxyHappySuite) TearDownSuite() { - if s.client != nil { - s.client.Close() - } - if s.testClientConn != nil { - s.testClientConn.Close() - } - if s.serverClientConn != nil { - s.serverClientConn.Close() - } - // Close all transports so the logs don't get spammy. - time.Sleep(10 * time.Millisecond) - if s.proxy != nil { - s.proxy.Stop() - s.proxyListener.Close() - } - if s.serverListener != nil { - s.server.Stop() - s.serverListener.Close() - } -} - -func TestProxyHappySuite(t *testing.T) { - suite.Run(t, &ProxyHappySuite{}) -} + t.Run("works with nil ErrHandlers", func(t *testing.T) { + require.NotPanics(t, func() { + failDestinationsWithError(&StreamParameters{ + primary: Destination{}, + secondaries: []Destination{{}}, + }, expectedErr) + }) + }) -func TestProxyErrorPropagation(t *testing.T) { - errBackend := status.Error(codes.InvalidArgument, "backend error") - errDirector := status.Error(codes.FailedPrecondition, "director error") - errRequestFinalizer := status.Error(codes.Internal, "request finalizer error") + t.Run("fails both primary and secondaries", func(t *testing.T) { + var primaryErr, secondaryErr error - for _, tc := range []struct { - desc string - backendError error - directorError error - requestFinalizerError error - returnedError error - errHandler func(error) error - }{ - { - desc: "backend error is propagated", - backendError: errBackend, - returnedError: errBackend, - }, - { - desc: "director error is propagated", - directorError: errDirector, - returnedError: errDirector, - }, - { - desc: "request finalizer error is propagated", - requestFinalizerError: errRequestFinalizer, - returnedError: errRequestFinalizer, - }, - { - desc: "director error cancels proxying", - backendError: errBackend, - requestFinalizerError: errRequestFinalizer, - directorError: errDirector, - returnedError: errDirector, - }, - { - desc: "backend error prioritized over request finalizer error", - backendError: errBackend, - requestFinalizerError: errRequestFinalizer, - returnedError: errBackend, - }, - { - desc: "err handler gets error", - backendError: errBackend, - requestFinalizerError: errRequestFinalizer, - returnedError: errBackend, - errHandler: func(err error) error { - require.Equal(t, errBackend, err) - return errBackend - }, - }, - { - desc: "err handler can swallow error", - backendError: errBackend, - returnedError: io.EOF, - errHandler: func(err error) error { - require.Equal(t, errBackend, err) + failDestinationsWithError(&StreamParameters{ + primary: Destination{ErrHandler: func(err error) error { + primaryErr = err return nil - }, - }, - { - desc: "swallowed error surfaces request finalizer error", - backendError: errBackend, - requestFinalizerError: errRequestFinalizer, - returnedError: errRequestFinalizer, - errHandler: func(err error) error { - require.Equal(t, errBackend, err) + }}, + secondaries: []Destination{{ErrHandler: func(err error) error { + secondaryErr = err return nil - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - tmpDir := testhelper.TempDir(t) - - backendListener, err := net.Listen("unix", filepath.Join(tmpDir, "backend")) - require.NoError(t, err) - - backendServer := grpc.NewServer(grpc.UnknownServiceHandler(func(interface{}, grpc.ServerStream) error { - return tc.backendError - })) - go func() { backendServer.Serve(backendListener) }() - defer backendServer.Stop() - - ctx, cancel := testhelper.Context() - defer cancel() - - backendClientConn, err := grpc.DialContext(ctx, "unix://"+backendListener.Addr().String(), - grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) - require.NoError(t, err) - defer func() { - require.NoError(t, backendClientConn.Close()) - }() - - proxyListener, err := net.Listen("unix", filepath.Join(tmpDir, "proxy")) - require.NoError(t, err) - - proxyServer := grpc.NewServer( - grpc.CustomCodec(proxy.NewCodec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { - return proxy.NewStreamParameters( - proxy.Destination{ - Ctx: ctx, - Conn: backendClientConn, - ErrHandler: tc.errHandler, - }, - nil, - func() error { return tc.requestFinalizerError }, - nil, - ), tc.directorError - })), - ) - - go func() { proxyServer.Serve(proxyListener) }() - defer proxyServer.Stop() - - proxyClientConn, err := grpc.DialContext(ctx, "unix://"+proxyListener.Addr().String(), grpc.WithInsecure()) - require.NoError(t, err) - defer func() { - require.NoError(t, proxyClientConn.Close()) - }() - - resp, err := pb.NewTestServiceClient(proxyClientConn).Ping(ctx, &pb.PingRequest{}) - require.Equal(t, tc.returnedError, err) - require.Nil(t, resp) - }) - } -} - -func TestRegisterStreamHandlers(t *testing.T) { - directorCalledError := errors.New("director was called") - - server := grpc.NewServer( - grpc.CustomCodec(proxy.NewCodec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { - return nil, directorCalledError - })), - ) - - var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool - - pingValue := "hello" - - pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { - pingStreamHandlerCalled = true - var req pb.PingRequest - - if err := stream.RecvMsg(&req); err != nil { - return err - } - - require.Equal(t, pingValue, req.Value) - - return stream.SendMsg(nil) - } - - pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { - pingEmptyStreamHandlerCalled = true - var req pb.Empty - - if err := stream.RecvMsg(&req); err != nil { - return err - } - - return stream.SendMsg(nil) - } - - streamers := map[string]grpc.StreamHandler{ - "Ping": pingStreamHandler, - "PingEmpty": pingEmptyStreamHandler, - } - - proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers) - - serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) - - listener, err := net.Listen("unix", serverSocketPath) - if err != nil { - t.Fatal(err) - } - - go server.Serve(listener) - defer server.Stop() - - cc, err := client.Dial("unix://"+serverSocketPath, []grpc.DialOption{grpc.WithBlock()}) - require.NoError(t, err) - defer cc.Close() - - testServiceClient := pb.NewTestServiceClient(cc) - - ctx, cancel := testhelper.Context() - defer cancel() - - _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue}) - require.NoError(t, err) - require.True(t, pingStreamHandlerCalled) - - _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{}) - require.NoError(t, err) - require.True(t, pingEmptyStreamHandlerCalled) + }}}, + }, expectedErr) - // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler - _, err = testServiceClient.PingError(ctx, &pb.PingRequest{}) - require.Equal(t, status.Error(codes.Unknown, directorCalledError.Error()), err) + require.Equal(t, expectedErr, primaryErr) + require.Equal(t, expectedErr, secondaryErr) + }) } diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 90d8f6bf9..840710d29 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -57,6 +57,8 @@ type Transaction interface { CountSubtransactions() int // State returns the state of each voter part of the transaction. State() (map[string]VoteResult, error) + // DidCommitAnySubtransaction returns whether the transaction committed at least one subtransaction. + DidCommitAnySubtransaction() bool } // transaction is a session where a set of voters votes on one or more @@ -184,6 +186,28 @@ func (t *transaction) CountSubtransactions() int { return len(t.subtransactions) } +// DidCommitSubtransaction returns whether the transaction committed at least one subtransaction. +func (t *transaction) DidCommitAnySubtransaction() bool { + t.lock.Lock() + defer t.lock.Unlock() + + if len(t.subtransactions) == 0 { + return false + } + + // We only need to check the first subtransaction. If it failed, there would + // be no further subtransactions. + for _, result := range t.subtransactions[0].state() { + // It's sufficient to find a single commit in the subtransaction + // to say it was committed. + if result == VoteCommitted { + return true + } + } + + return false +} + // getOrCreateSubtransaction gets an ongoing subtransaction on which the given // node hasn't yet voted on or creates a new one if the node has succeeded on // all subtransactions. In case the node has failed on any of the |