Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2021-05-20 21:37:52 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2021-05-20 21:37:52 +0300
commitc202c0760cecf7b29ddb9ec46a794e208f4da82b (patch)
tree99f88af98c1e0a39e2e2d5643d45cc9a639d43b9
parent81efd4ff7c7e775d5b6de43aa84042d82462d1f4 (diff)
parent978695b2b0555871c6f5413753dd663f666ec3da (diff)
Merge branch 'smh-check-if-primary-was-modified' into 'master'
Consider primary modified only if a subtransaction was committed See merge request gitlab-org/gitaly!3494
-rw-r--r--internal/praefect/coordinator.go48
-rw-r--r--internal/praefect/coordinator_pg_test.go131
-rw-r--r--internal/praefect/coordinator_test.go120
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go17
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_ext_test.go523
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go532
-rw-r--r--internal/praefect/transactions/transaction.go24
7 files changed, 791 insertions, 604 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 3286b3b31..887116148 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -739,7 +739,13 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ if !primaryDirtied {
+ // If the primary replica was not modified then we don't need to consider the secondaries
+ // outdated. Praefect requires the primary to be always part of the quorum, so no changes
+ // to secondaries would be made without primary being in agreement.
+ return nil
+ }
return c.newRequestFinalizer(
ctx, virtualStorage, targetRepo, route.Primary.Storage,
@@ -750,9 +756,11 @@ func (c *Coordinator) createTransactionFinalizer(
// getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated
// after the given transaction. A node is considered outdated, if one of the following is true:
//
-// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware
-// of transactions and thus failed to correctly assert its state matches across nodes. This is
-// rather pessimistic, as it could also indicate that an RPC simply didn't change anything.
+// - No subtransactions were created and the RPC was successful on the primary. This really is only
+// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert
+// its state matches across nodes. This is rather pessimistic, as it could also indicate that an
+// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no
+// subtransactions, we assume no changes were done and that the nodes failed prior to voting.
//
// - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all
// nodes need to get replication jobs.
@@ -767,7 +775,7 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
-) (updated []string, outdated []string) {
+) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
@@ -776,17 +784,31 @@ func getUpdatedAndOutdatedSecondaries(
// for them.
outdated = append(outdated, route.ReplicationTargets...)
+ primaryErr := nodeErrors.errByNode[route.Primary.Storage]
+
+ // If there were subtransactions, we only assume some changes were made if one of the subtransactions
+ // was committed.
+ //
+ // If there were no subtransactions, we assume changes were performed only if the primary successfully
+ // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere.
+ //
+ // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and
+ // the nodes simply failed before voting.
+ primaryDirtied = transaction.DidCommitAnySubtransaction() ||
+ (transaction.CountSubtransactions() == 0 && primaryErr == nil)
+
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
- if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil {
- ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction")
+ if primaryErr != nil {
+ ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
return
}
- // If no subtransaction happened, then the called RPC may not be aware of transactions at
- // all. We thus need to assume it changed repository state and need to create replication
- // jobs.
+ // If no subtransaction happened, then the called RPC may not be aware of transactions or
+ // the nodes failed before casting any votes. If the primary failed the RPC, we assume
+ // no changes were done and the nodes hit an error prior to voting. If the primary processed
+ // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere.
if transaction.CountSubtransactions() == 0 {
ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
@@ -802,9 +824,9 @@ func getUpdatedAndOutdatedSecondaries(
return
}
- // If the primary node did not commit the transaction, then we must assume that it dirtied
- // on-disk state. This modified state may not be what we want, but it's what we got. So in
- // order to ensure a consistent state, we need to replicate.
+ // If the primary node did not commit the transaction but there were some subtransactions committed,
+ // then we must assume that it dirtied on-disk state. This modified state may not be what we want,
+ // but it's what we got. So in order to ensure a consistent state, we need to replicate.
if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted {
if state == transactions.VoteFailed {
ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index eb769eaa3..df5dec438 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -32,10 +32,14 @@ func getDB(t *testing.T) glsql.DB {
}
func TestStreamDirectorMutator_Transaction(t *testing.T) {
+ type subtransactions []struct {
+ vote string
+ shouldSucceed bool
+ }
+
type node struct {
primary bool
- vote string
- shouldSucceed bool
+ subtransactions subtransactions
shouldGetRepl bool
shouldParticipate bool
generation int
@@ -43,45 +47,63 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
testcases := []struct {
- desc string
- nodes []node
+ desc string
+ primaryFails bool
+ nodes []node
}{
{
desc: "successful vote should not create replication jobs",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ },
+ },
+ {
+ desc: "successful vote should create replication jobs if the primary fails",
+ primaryFails: true,
+ nodes: []node{
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ },
+ },
+ {
+ desc: "failing vote should not create replication jobs without committed subtransactions",
+ nodes: []node{
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
},
},
{
- desc: "failing vote should not create replication jobs",
+ desc: "failing vote should create replication jobs with committed subtransaction",
nodes: []node{
- {primary: true, vote: "foo", shouldSucceed: false, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "qux", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
- {primary: false, vote: "bar", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
desc: "primary should reach quorum with disagreeing secondary",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
desc: "quorum should create replication jobs for disagreeing node",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "barfoo", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
desc: "only consistent secondaries should participate",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2},
- {primary: false, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: 1, expectedGeneration: 2},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2},
+ {primary: false, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: 1, expectedGeneration: 2},
{shouldParticipate: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0},
{shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
},
@@ -89,30 +111,51 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
{
desc: "secondaries should not participate when primary's generation is unknown",
nodes: []node{
- {primary: true, vote: "foobar", shouldSucceed: true, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0},
{shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
},
},
{
- // If the transaction didn't receive any votes at all, we need to assume
- // that the RPC wasn't aware of transactions and thus need to schedule
- // replication jobs.
- desc: "unstarted transaction should create replication jobs",
+ // All transactional RPCs are expected to cast vote if they are successful. If they don't, something is wrong
+ // and we should replicate to the secondaries to be sure.
+ desc: "unstarted transaction creates replication jobs if the primary is successful",
nodes: []node{
- {primary: true, shouldSucceed: true, shouldGetRepl: false, expectedGeneration: 1},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, expectedGeneration: 0},
+ {primary: true, shouldGetRepl: false, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: true, expectedGeneration: 0},
},
},
{
- // If the transaction didn't receive any votes at all, we need to assume
- // that the RPC wasn't aware of transactions and thus need to schedule
- // replication jobs.
- desc: "unstarted transaction should create replication jobs for outdated node",
+ // If the RPC fails without any subtransactions, the Gitalys would not have performed any changes yet.
+ // We don't have to consider the secondaries outdated.
+ desc: "unstarted transaction doesn't create replication jobs if the primary fails",
+ primaryFails: true,
nodes: []node{
- {primary: true, shouldSucceed: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0},
- {primary: false, shouldSucceed: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
+ {primary: true, expectedGeneration: 0},
+ {primary: false, expectedGeneration: 0},
+ },
+ },
+ {
+ // If there were no subtransactions and the RPC failed, the primary should not have performed any changes.
+ // We don't need to schedule replication jobs to replication targets either as they'd have jobs
+ // already scheduled by the earlier RPC that made them outdated or by the reconciler.
+ desc: "unstarted transaction should not create replication jobs for outdated node if the primary fails",
+ primaryFails: true,
+ nodes: []node{
+ {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: false, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: false, generation: 0, expectedGeneration: 0},
+ {primary: false, shouldGetRepl: false, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
+ },
+ },
+ {
+ // If there were no subtransactions and the primary did not fail, we should schedule replication jobs to every secondary.
+ // All transactional RPCs are expected to vote if they are successful.
+ desc: "unstarted transaction should create replication jobs for outdated node if the primary succeeds",
+ nodes: []node{
+ {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2},
+ {primary: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: true, generation: 0, expectedGeneration: 0},
+ {primary: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
},
},
}
@@ -217,17 +260,27 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
go func() {
defer voterWaitGroup.Done()
- vote := voting.VoteFromData([]byte(node.vote))
- err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote)
- if node.shouldSucceed {
- assert.NoError(t, err)
- } else {
- assert.True(t, errors.Is(err, transactions.ErrTransactionFailed))
+ for _, subtransaction := range node.subtransactions {
+ vote := voting.VoteFromData([]byte(subtransaction.vote))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote)
+ if subtransaction.shouldSucceed {
+ if !assert.NoError(t, err) {
+ break
+ }
+ } else {
+ if !assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) {
+ break
+ }
+ }
}
}()
}
voterWaitGroup.Wait()
+ if tc.primaryFails {
+ streamParams.Primary().ErrHandler(errors.New("rpc failure"))
+ }
+
err = streamParams.RequestFinalizer()
require.NoError(t, err)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 6f85bd875..b0bee67d3 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1539,8 +1539,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
}
type mockTransaction struct {
- nodeStates map[string]transactions.VoteResult
- subtransactions int
+ nodeStates map[string]transactions.VoteResult
+ subtransactions int
+ didCommitAnySubtransaction bool
}
func (t mockTransaction) ID() uint64 {
@@ -1551,6 +1552,10 @@ func (t mockTransaction) CountSubtransactions() int {
return t.subtransactions
}
+func (t mockTransaction) DidCommitAnySubtransaction() bool {
+ return t.didCommitAnySubtransaction
+}
+
func (t mockTransaction) State() (map[string]transactions.VoteResult, error) {
return t.nodeStates, nil
}
@@ -1568,13 +1573,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
anyErr := errors.New("arbitrary error")
for _, tc := range []struct {
- desc string
- primary node
- secondaries []node
- replicas []string
- subtransactions int
- expectedOutdated []string
- expectedUpdated []string
+ desc string
+ primary node
+ secondaries []node
+ replicas []string
+ subtransactions int
+ didCommitAnySubtransaction bool
+ expectedPrimaryDirtied bool
+ expectedOutdated []string
+ expectedUpdated []string
}{
{
desc: "single committed node",
@@ -1582,7 +1589,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- subtransactions: 1,
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
},
{
desc: "single failed node",
@@ -1604,7 +1613,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
primary: node{
name: "primary",
},
- subtransactions: 0,
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
},
{
desc: "single successful node with replica",
@@ -1612,9 +1622,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
},
{
desc: "single failing node with replica",
@@ -1633,18 +1645,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
state: transactions.VoteCommitted,
err: anyErr,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
},
{
desc: "single node without transaction with replica",
primary: node{
name: "primary",
},
- replicas: []string{"replica"},
- subtransactions: 0,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
},
{
desc: "multiple committed nodes",
@@ -1656,8 +1671,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s1", "s2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s1", "s2"},
},
{
desc: "multiple committed nodes with primary err",
@@ -1670,8 +1687,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
},
{
desc: "multiple committed nodes with secondary err",
@@ -1683,9 +1702,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted, err: anyErr},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
},
{
desc: "partial success",
@@ -1697,9 +1718,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
},
{
desc: "failure with (impossible) secondary success",
@@ -1711,8 +1734,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
},
{
desc: "multiple nodes without subtransactions",
@@ -1724,8 +1749,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 0,
- expectedOutdated: []string{"s1", "s2"},
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
},
{
desc: "multiple nodes with replica and partial failures",
@@ -1737,10 +1763,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- replicas: []string{"r1", "r2"},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "r1", "r2"},
- expectedUpdated: []string{"s2"},
+ replicas: []string{"r1", "r2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "r1", "r2"},
+ expectedUpdated: []string{"s2"},
},
{
desc: "multiple nodes with replica and partial err",
@@ -1752,9 +1780,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted, err: anyErr},
},
- replicas: []string{"r1", "r2"},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ replicas: []string{"r1", "r2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2", "r1", "r2"},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -1776,8 +1806,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
transaction := mockTransaction{
- nodeStates: states,
- subtransactions: tc.subtransactions,
+ nodeStates: states,
+ subtransactions: tc.subtransactions,
+ didCommitAnySubtransaction: tc.didCommitAnySubtransaction,
}
route := RepositoryMutatorRoute{
@@ -1792,7 +1823,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
})
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index aeac70181..743aedf8a 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -10,6 +10,7 @@ package proxy
import (
"context"
"errors"
+ "fmt"
"io"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
@@ -96,6 +97,20 @@ type streamAndDestination struct {
cancel func()
}
+// failDestinationWithErrors marks all of the destinations in the StreamParameters as
+// having failed with the given error.
+func failDestinationsWithError(params *StreamParameters, err error) {
+ if params.Primary().ErrHandler != nil {
+ _ = params.Primary().ErrHandler(err)
+ }
+
+ for _, secondary := range params.Secondaries() {
+ if secondary.ErrHandler != nil {
+ _ = secondary.ErrHandler(err)
+ }
+ }
+}
+
// handler is where the real magic of proxying happens.
// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
// forwarding it to a ClientStream established against the relevant ClientConn.
@@ -127,6 +142,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
primaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Primary().Conn, fullMethodName, params.CallOptions()...)
if err != nil {
+ failDestinationsWithError(params, fmt.Errorf("initiate primary stream: %w", err))
return err
}
@@ -143,6 +159,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, destination.Conn, fullMethodName, params.CallOptions()...)
if err != nil {
+ failDestinationsWithError(params, fmt.Errorf("initiate secondary stream: %w", err))
return err
}
secondaryStreams = append(secondaryStreams, streamAndDestination{
diff --git a/internal/praefect/grpc-proxy/proxy/handler_ext_test.go b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go
new file mode 100644
index 000000000..eb47a6240
--- /dev/null
+++ b/internal/praefect/grpc-proxy/proxy/handler_ext_test.go
@@ -0,0 +1,523 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package proxy_test
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/getsentry/sentry-go"
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/stretchr/testify/suite"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "go.uber.org/goleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ pingDefaultValue = "I like kittens."
+ clientMdKey = "test-client-header"
+ serverHeaderMdKey = "test-client-header"
+ serverTrailerMdKey = "test-client-trailer"
+
+ rejectingMdKey = "test-reject-rpc-if-in-context"
+
+ countListResponses = 20
+)
+
+func TestMain(m *testing.M) {
+ defer testhelper.MustHaveNoChildProcess()
+ cleanup := testhelper.Configure()
+ defer cleanup()
+
+ goleak.VerifyTestMain(m)
+}
+
+// asserting service is implemented on the server side and serves as a handler for stuff
+type assertingService struct {
+ t *testing.T
+}
+
+func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) {
+ // Check that this call has client's metadata.
+ md, ok := metadata.FromIncomingContext(ctx)
+ assert.True(s.t, ok, "PingEmpty call must have metadata in context")
+ _, ok = md[clientMdKey]
+ assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata")
+ return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
+}
+
+func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) {
+ // Send user trailers and headers.
+ require.NoError(s.t, grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
+ require.NoError(s.t, grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")))
+ return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil
+}
+
+func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) {
+ return nil, status.Errorf(codes.ResourceExhausted, "Userspace error.")
+}
+
+func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error {
+ // Send user trailers and headers.
+ require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
+ for i := 0; i < countListResponses; i++ {
+ require.NoError(s.t, stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)}))
+ }
+ stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
+ return nil
+}
+
+func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error {
+ require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
+ counter := int32(0)
+ for {
+ ping, err := stream.Recv()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ require.NoError(s.t, err, "can't fail reading stream")
+ return err
+ }
+ pong := &pb.PingResponse{Value: ping.Value, Counter: counter}
+ if err := stream.Send(pong); err != nil {
+ require.NoError(s.t, err, "can't fail sending back a pong")
+ }
+ counter++
+ }
+ stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
+ return nil
+}
+
+// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues.
+type ProxyHappySuite struct {
+ suite.Suite
+
+ serverListener net.Listener
+ server *grpc.Server
+ proxyListener net.Listener
+ proxy *grpc.Server
+ serverClientConn *grpc.ClientConn
+
+ client *grpc.ClientConn
+ testClient pb.TestServiceClient
+ testClientConn *grpc.ClientConn
+}
+
+func (s *ProxyHappySuite) ctx() context.Context {
+ // Make all RPC calls last at most 1 sec, meaning all async issues or deadlock will not kill tests.
+ ctx, _ := context.WithTimeout(context.TODO(), 120*time.Second) // nolint: govet
+ return ctx
+}
+
+func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
+ ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(clientMdKey, "true"))
+ out, err := s.testClient.PingEmpty(ctx, &pb.Empty{})
+ require.NoError(s.T(), err, "PingEmpty should succeed without errors")
+ require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out)
+}
+
+func (s *ProxyHappySuite) TestPingEmpty_StressTest() {
+ for i := 0; i < 50; i++ {
+ s.TestPingEmptyCarriesClientMetadata()
+ }
+}
+
+func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() {
+ headerMd := make(metadata.MD)
+ trailerMd := make(metadata.MD)
+ // This is an awkward calling convention... but meh.
+ out, err := s.testClient.Ping(s.ctx(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd))
+ require.NoError(s.T(), err, "Ping should succeed without errors")
+ require.Equal(s.T(), &pb.PingResponse{Value: "foo", Counter: 42}, out)
+ assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data")
+ assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data")
+}
+
+func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
+ sentryTriggered := 0
+ sentrySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ sentryTriggered++
+ }))
+ defer sentrySrv.Close()
+
+ // minimal required sentry client configuration
+ sentryURL, err := url.Parse(sentrySrv.URL)
+ require.NoError(s.T(), err)
+ sentryURL.User = url.UserPassword("stub", "stub")
+ sentryURL.Path = "/stub/1"
+
+ require.NoError(s.T(), sentry.Init(sentry.ClientOptions{
+ Dsn: sentryURL.String(),
+ Transport: sentry.NewHTTPSyncTransport(),
+ }))
+
+ sentry.CaptureEvent(sentry.NewEvent())
+ require.Equal(s.T(), 1, sentryTriggered, "sentry configured incorrectly")
+
+ _, err = s.testClient.PingError(s.ctx(), &pb.PingRequest{Value: "foo"})
+ require.Error(s.T(), err, "PingError should never succeed")
+ assert.Equal(s.T(), codes.ResourceExhausted, status.Code(err))
+ assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message())
+ require.Equal(s.T(), 1, sentryTriggered, "sentry must not be triggered because errors from remote must be just propagated")
+}
+
+func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() {
+ // See SetupSuite where the StreamDirector has a special case.
+ ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(rejectingMdKey, "true"))
+ _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"})
+ require.Error(s.T(), err, "Director should reject this RPC")
+ assert.Equal(s.T(), codes.PermissionDenied, status.Code(err))
+ assert.Equal(s.T(), "testing rejection", status.Convert(err).Message())
+}
+
+func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
+ stream, err := s.testClient.PingStream(s.ctx())
+ require.NoError(s.T(), err, "PingStream request should be successful.")
+
+ for i := 0; i < countListResponses; i++ {
+ ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)}
+ require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail")
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if i == 0 {
+ // Check that the header arrives before all entries.
+ headerMd, err := stream.Header()
+ require.NoError(s.T(), err, "PingStream headers should not error.")
+ assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata")
+ }
+ assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id")
+ }
+ require.NoError(s.T(), stream.CloseSend(), "no error on close send")
+ _, err = stream.Recv()
+ require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK")
+ // Check that the trailer headers are here.
+ trailerMd := stream.Trailer()
+ assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
+}
+
+func (s *ProxyHappySuite) TestPingStream_StressTest() {
+ for i := 0; i < 50; i++ {
+ s.TestPingStream_FullDuplexWorks()
+ }
+}
+
+func (s *ProxyHappySuite) SetupSuite() {
+ var err error
+
+ s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0")
+ require.NoError(s.T(), err, "must be able to allocate a port for proxyListener")
+ s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
+ require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
+
+ s.server = grpc.NewServer()
+ pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})
+
+ // Setup of the proxy's Director.
+ s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
+ require.NoError(s.T(), err, "must not error on deferred client Dial")
+ director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ payload, err := peeker.Peek()
+ if err != nil {
+ return nil, err
+ }
+
+ md, ok := metadata.FromIncomingContext(ctx)
+ if ok {
+ if _, exists := md[rejectingMdKey]; exists {
+ return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection")
+ }
+ }
+
+ // Explicitly copy the metadata, otherwise the tests will fail.
+ return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: s.serverClientConn, Msg: payload}, nil, nil, nil), nil
+ }
+
+ s.proxy = grpc.NewServer(
+ grpc.CustomCodec(proxy.NewCodec()),
+ grpc.StreamInterceptor(
+ grpc_middleware.ChainStreamServer(
+ // context tags usage is required by sentryhandler.StreamLogHandler
+ grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)),
+ // sentry middleware to capture errors
+ sentryhandler.StreamLogHandler,
+ ),
+ ),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ )
+ // Ping handler is handled as an explicit registration and not as a TransparentHandler.
+ proxy.RegisterService(s.proxy, director,
+ "mwitkow.testproto.TestService",
+ "Ping")
+
+ // Start the serving loops.
+ go func() {
+ s.server.Serve(s.serverListener)
+ }()
+ go func() {
+ s.proxy.Serve(s.proxyListener)
+ }()
+
+ ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
+ defer cancel()
+
+ s.testClientConn, err = grpc.DialContext(ctx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure())
+ require.NoError(s.T(), err, "must not error on deferred client Dial")
+ s.testClient = pb.NewTestServiceClient(s.testClientConn)
+}
+
+func (s *ProxyHappySuite) TearDownSuite() {
+ if s.client != nil {
+ s.client.Close()
+ }
+ if s.testClientConn != nil {
+ s.testClientConn.Close()
+ }
+ if s.serverClientConn != nil {
+ s.serverClientConn.Close()
+ }
+ // Close all transports so the logs don't get spammy.
+ time.Sleep(10 * time.Millisecond)
+ if s.proxy != nil {
+ s.proxy.Stop()
+ s.proxyListener.Close()
+ }
+ if s.serverListener != nil {
+ s.server.Stop()
+ s.serverListener.Close()
+ }
+}
+
+func TestProxyHappySuite(t *testing.T) {
+ suite.Run(t, &ProxyHappySuite{})
+}
+
+func TestProxyErrorPropagation(t *testing.T) {
+ errBackend := status.Error(codes.InvalidArgument, "backend error")
+ errDirector := status.Error(codes.FailedPrecondition, "director error")
+ errRequestFinalizer := status.Error(codes.Internal, "request finalizer error")
+
+ for _, tc := range []struct {
+ desc string
+ backendError error
+ directorError error
+ requestFinalizerError error
+ returnedError error
+ errHandler func(error) error
+ }{
+ {
+ desc: "backend error is propagated",
+ backendError: errBackend,
+ returnedError: errBackend,
+ },
+ {
+ desc: "director error is propagated",
+ directorError: errDirector,
+ returnedError: errDirector,
+ },
+ {
+ desc: "request finalizer error is propagated",
+ requestFinalizerError: errRequestFinalizer,
+ returnedError: errRequestFinalizer,
+ },
+ {
+ desc: "director error cancels proxying",
+ backendError: errBackend,
+ requestFinalizerError: errRequestFinalizer,
+ directorError: errDirector,
+ returnedError: errDirector,
+ },
+ {
+ desc: "backend error prioritized over request finalizer error",
+ backendError: errBackend,
+ requestFinalizerError: errRequestFinalizer,
+ returnedError: errBackend,
+ },
+ {
+ desc: "err handler gets error",
+ backendError: errBackend,
+ requestFinalizerError: errRequestFinalizer,
+ returnedError: errBackend,
+ errHandler: func(err error) error {
+ require.Equal(t, errBackend, err)
+ return errBackend
+ },
+ },
+ {
+ desc: "err handler can swallow error",
+ backendError: errBackend,
+ returnedError: io.EOF,
+ errHandler: func(err error) error {
+ require.Equal(t, errBackend, err)
+ return nil
+ },
+ },
+ {
+ desc: "swallowed error surfaces request finalizer error",
+ backendError: errBackend,
+ requestFinalizerError: errRequestFinalizer,
+ returnedError: errRequestFinalizer,
+ errHandler: func(err error) error {
+ require.Equal(t, errBackend, err)
+ return nil
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ tmpDir := testhelper.TempDir(t)
+
+ backendListener, err := net.Listen("unix", filepath.Join(tmpDir, "backend"))
+ require.NoError(t, err)
+
+ backendServer := grpc.NewServer(grpc.UnknownServiceHandler(func(interface{}, grpc.ServerStream) error {
+ return tc.backendError
+ }))
+ go func() { backendServer.Serve(backendListener) }()
+ defer backendServer.Stop()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ backendClientConn, err := grpc.DialContext(ctx, "unix://"+backendListener.Addr().String(),
+ grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, backendClientConn.Close())
+ }()
+
+ proxyListener, err := net.Listen("unix", filepath.Join(tmpDir, "proxy"))
+ require.NoError(t, err)
+
+ proxyServer := grpc.NewServer(
+ grpc.CustomCodec(proxy.NewCodec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return proxy.NewStreamParameters(
+ proxy.Destination{
+ Ctx: ctx,
+ Conn: backendClientConn,
+ ErrHandler: tc.errHandler,
+ },
+ nil,
+ func() error { return tc.requestFinalizerError },
+ nil,
+ ), tc.directorError
+ })),
+ )
+
+ go func() { proxyServer.Serve(proxyListener) }()
+ defer proxyServer.Stop()
+
+ proxyClientConn, err := grpc.DialContext(ctx, "unix://"+proxyListener.Addr().String(), grpc.WithInsecure())
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, proxyClientConn.Close())
+ }()
+
+ resp, err := pb.NewTestServiceClient(proxyClientConn).Ping(ctx, &pb.PingRequest{})
+ require.Equal(t, tc.returnedError, err)
+ require.Nil(t, resp)
+ })
+ }
+}
+
+func TestRegisterStreamHandlers(t *testing.T) {
+ directorCalledError := errors.New("director was called")
+
+ server := grpc.NewServer(
+ grpc.CustomCodec(proxy.NewCodec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return nil, directorCalledError
+ })),
+ )
+
+ var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool
+
+ pingValue := "hello"
+
+ pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
+ pingStreamHandlerCalled = true
+ var req pb.PingRequest
+
+ if err := stream.RecvMsg(&req); err != nil {
+ return err
+ }
+
+ require.Equal(t, pingValue, req.Value)
+
+ return stream.SendMsg(nil)
+ }
+
+ pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
+ pingEmptyStreamHandlerCalled = true
+ var req pb.Empty
+
+ if err := stream.RecvMsg(&req); err != nil {
+ return err
+ }
+
+ return stream.SendMsg(nil)
+ }
+
+ streamers := map[string]grpc.StreamHandler{
+ "Ping": pingStreamHandler,
+ "PingEmpty": pingEmptyStreamHandler,
+ }
+
+ proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers)
+
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
+
+ listener, err := net.Listen("unix", serverSocketPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ go server.Serve(listener)
+ defer server.Stop()
+
+ cc, err := client.Dial("unix://"+serverSocketPath, []grpc.DialOption{grpc.WithBlock()})
+ require.NoError(t, err)
+ defer cc.Close()
+
+ testServiceClient := pb.NewTestServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue})
+ require.NoError(t, err)
+ require.True(t, pingStreamHandlerCalled)
+
+ _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{})
+ require.NoError(t, err)
+ require.True(t, pingEmptyStreamHandlerCalled)
+
+ // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler
+ _, err = testServiceClient.PingError(ctx, &pb.PingRequest{})
+ require.Equal(t, status.Error(codes.Unknown, directorCalledError.Error()), err)
+}
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index eb47a6240..b33fcba43 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -1,523 +1,39 @@
-// Copyright 2017 Michal Witkowski. All Rights Reserved.
-// See LICENSE for licensing terms.
-
-package proxy_test
+package proxy
import (
- "context"
"errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/http/httptest"
- "net/url"
- "path/filepath"
- "strings"
"testing"
- "time"
- "github.com/getsentry/sentry-go"
- grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
- grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "github.com/stretchr/testify/suite"
- "gitlab.com/gitlab-org/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
- "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata"
- "gitlab.com/gitlab-org/gitaly/internal/testhelper"
- "go.uber.org/goleak"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
-)
-
-const (
- pingDefaultValue = "I like kittens."
- clientMdKey = "test-client-header"
- serverHeaderMdKey = "test-client-header"
- serverTrailerMdKey = "test-client-trailer"
-
- rejectingMdKey = "test-reject-rpc-if-in-context"
-
- countListResponses = 20
)
-func TestMain(m *testing.M) {
- defer testhelper.MustHaveNoChildProcess()
- cleanup := testhelper.Configure()
- defer cleanup()
-
- goleak.VerifyTestMain(m)
-}
-
-// asserting service is implemented on the server side and serves as a handler for stuff
-type assertingService struct {
- t *testing.T
-}
-
-func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) {
- // Check that this call has client's metadata.
- md, ok := metadata.FromIncomingContext(ctx)
- assert.True(s.t, ok, "PingEmpty call must have metadata in context")
- _, ok = md[clientMdKey]
- assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata")
- return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil
-}
-
-func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) {
- // Send user trailers and headers.
- require.NoError(s.t, grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
- require.NoError(s.t, grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")))
- return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil
-}
-
-func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*pb.Empty, error) {
- return nil, status.Errorf(codes.ResourceExhausted, "Userspace error.")
-}
-
-func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error {
- // Send user trailers and headers.
- require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
- for i := 0; i < countListResponses; i++ {
- require.NoError(s.t, stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)}))
- }
- stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
- return nil
-}
-
-func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error {
- require.NoError(s.t, stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")))
- counter := int32(0)
- for {
- ping, err := stream.Recv()
- if err == io.EOF {
- break
- } else if err != nil {
- require.NoError(s.t, err, "can't fail reading stream")
- return err
- }
- pong := &pb.PingResponse{Value: ping.Value, Counter: counter}
- if err := stream.Send(pong); err != nil {
- require.NoError(s.t, err, "can't fail sending back a pong")
- }
- counter++
- }
- stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles."))
- return nil
-}
-
-// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues.
-type ProxyHappySuite struct {
- suite.Suite
-
- serverListener net.Listener
- server *grpc.Server
- proxyListener net.Listener
- proxy *grpc.Server
- serverClientConn *grpc.ClientConn
-
- client *grpc.ClientConn
- testClient pb.TestServiceClient
- testClientConn *grpc.ClientConn
-}
-
-func (s *ProxyHappySuite) ctx() context.Context {
- // Make all RPC calls last at most 1 sec, meaning all async issues or deadlock will not kill tests.
- ctx, _ := context.WithTimeout(context.TODO(), 120*time.Second) // nolint: govet
- return ctx
-}
-
-func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
- ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(clientMdKey, "true"))
- out, err := s.testClient.PingEmpty(ctx, &pb.Empty{})
- require.NoError(s.T(), err, "PingEmpty should succeed without errors")
- require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out)
-}
-
-func (s *ProxyHappySuite) TestPingEmpty_StressTest() {
- for i := 0; i < 50; i++ {
- s.TestPingEmptyCarriesClientMetadata()
- }
-}
-
-func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() {
- headerMd := make(metadata.MD)
- trailerMd := make(metadata.MD)
- // This is an awkward calling convention... but meh.
- out, err := s.testClient.Ping(s.ctx(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd))
- require.NoError(s.T(), err, "Ping should succeed without errors")
- require.Equal(s.T(), &pb.PingResponse{Value: "foo", Counter: 42}, out)
- assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data")
- assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data")
-}
-
-func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
- sentryTriggered := 0
- sentrySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- sentryTriggered++
- }))
- defer sentrySrv.Close()
-
- // minimal required sentry client configuration
- sentryURL, err := url.Parse(sentrySrv.URL)
- require.NoError(s.T(), err)
- sentryURL.User = url.UserPassword("stub", "stub")
- sentryURL.Path = "/stub/1"
-
- require.NoError(s.T(), sentry.Init(sentry.ClientOptions{
- Dsn: sentryURL.String(),
- Transport: sentry.NewHTTPSyncTransport(),
- }))
-
- sentry.CaptureEvent(sentry.NewEvent())
- require.Equal(s.T(), 1, sentryTriggered, "sentry configured incorrectly")
-
- _, err = s.testClient.PingError(s.ctx(), &pb.PingRequest{Value: "foo"})
- require.Error(s.T(), err, "PingError should never succeed")
- assert.Equal(s.T(), codes.ResourceExhausted, status.Code(err))
- assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message())
- require.Equal(s.T(), 1, sentryTriggered, "sentry must not be triggered because errors from remote must be just propagated")
-}
-
-func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() {
- // See SetupSuite where the StreamDirector has a special case.
- ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(rejectingMdKey, "true"))
- _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"})
- require.Error(s.T(), err, "Director should reject this RPC")
- assert.Equal(s.T(), codes.PermissionDenied, status.Code(err))
- assert.Equal(s.T(), "testing rejection", status.Convert(err).Message())
-}
-
-func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
- stream, err := s.testClient.PingStream(s.ctx())
- require.NoError(s.T(), err, "PingStream request should be successful.")
-
- for i := 0; i < countListResponses; i++ {
- ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)}
- require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail")
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if i == 0 {
- // Check that the header arrives before all entries.
- headerMd, err := stream.Header()
- require.NoError(s.T(), err, "PingStream headers should not error.")
- assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata")
- }
- assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id")
- }
- require.NoError(s.T(), stream.CloseSend(), "no error on close send")
- _, err = stream.Recv()
- require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK")
- // Check that the trailer headers are here.
- trailerMd := stream.Trailer()
- assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
-}
-
-func (s *ProxyHappySuite) TestPingStream_StressTest() {
- for i := 0; i < 50; i++ {
- s.TestPingStream_FullDuplexWorks()
- }
-}
-
-func (s *ProxyHappySuite) SetupSuite() {
- var err error
-
- s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0")
- require.NoError(s.T(), err, "must be able to allocate a port for proxyListener")
- s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
- require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
+func TestFailDestinationWithError(t *testing.T) {
+ expectedErr := errors.New("some error")
- s.server = grpc.NewServer()
- pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})
-
- // Setup of the proxy's Director.
- s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
- require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
- payload, err := peeker.Peek()
- if err != nil {
- return nil, err
- }
-
- md, ok := metadata.FromIncomingContext(ctx)
- if ok {
- if _, exists := md[rejectingMdKey]; exists {
- return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection")
- }
- }
-
- // Explicitly copy the metadata, otherwise the tests will fail.
- return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: s.serverClientConn, Msg: payload}, nil, nil, nil), nil
- }
-
- s.proxy = grpc.NewServer(
- grpc.CustomCodec(proxy.NewCodec()),
- grpc.StreamInterceptor(
- grpc_middleware.ChainStreamServer(
- // context tags usage is required by sentryhandler.StreamLogHandler
- grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)),
- // sentry middleware to capture errors
- sentryhandler.StreamLogHandler,
- ),
- ),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
- )
- // Ping handler is handled as an explicit registration and not as a TransparentHandler.
- proxy.RegisterService(s.proxy, director,
- "mwitkow.testproto.TestService",
- "Ping")
-
- // Start the serving loops.
- go func() {
- s.server.Serve(s.serverListener)
- }()
- go func() {
- s.proxy.Serve(s.proxyListener)
- }()
-
- ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
- defer cancel()
-
- s.testClientConn, err = grpc.DialContext(ctx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure())
- require.NoError(s.T(), err, "must not error on deferred client Dial")
- s.testClient = pb.NewTestServiceClient(s.testClientConn)
-}
-
-func (s *ProxyHappySuite) TearDownSuite() {
- if s.client != nil {
- s.client.Close()
- }
- if s.testClientConn != nil {
- s.testClientConn.Close()
- }
- if s.serverClientConn != nil {
- s.serverClientConn.Close()
- }
- // Close all transports so the logs don't get spammy.
- time.Sleep(10 * time.Millisecond)
- if s.proxy != nil {
- s.proxy.Stop()
- s.proxyListener.Close()
- }
- if s.serverListener != nil {
- s.server.Stop()
- s.serverListener.Close()
- }
-}
-
-func TestProxyHappySuite(t *testing.T) {
- suite.Run(t, &ProxyHappySuite{})
-}
+ t.Run("works with nil ErrHandlers", func(t *testing.T) {
+ require.NotPanics(t, func() {
+ failDestinationsWithError(&StreamParameters{
+ primary: Destination{},
+ secondaries: []Destination{{}},
+ }, expectedErr)
+ })
+ })
-func TestProxyErrorPropagation(t *testing.T) {
- errBackend := status.Error(codes.InvalidArgument, "backend error")
- errDirector := status.Error(codes.FailedPrecondition, "director error")
- errRequestFinalizer := status.Error(codes.Internal, "request finalizer error")
+ t.Run("fails both primary and secondaries", func(t *testing.T) {
+ var primaryErr, secondaryErr error
- for _, tc := range []struct {
- desc string
- backendError error
- directorError error
- requestFinalizerError error
- returnedError error
- errHandler func(error) error
- }{
- {
- desc: "backend error is propagated",
- backendError: errBackend,
- returnedError: errBackend,
- },
- {
- desc: "director error is propagated",
- directorError: errDirector,
- returnedError: errDirector,
- },
- {
- desc: "request finalizer error is propagated",
- requestFinalizerError: errRequestFinalizer,
- returnedError: errRequestFinalizer,
- },
- {
- desc: "director error cancels proxying",
- backendError: errBackend,
- requestFinalizerError: errRequestFinalizer,
- directorError: errDirector,
- returnedError: errDirector,
- },
- {
- desc: "backend error prioritized over request finalizer error",
- backendError: errBackend,
- requestFinalizerError: errRequestFinalizer,
- returnedError: errBackend,
- },
- {
- desc: "err handler gets error",
- backendError: errBackend,
- requestFinalizerError: errRequestFinalizer,
- returnedError: errBackend,
- errHandler: func(err error) error {
- require.Equal(t, errBackend, err)
- return errBackend
- },
- },
- {
- desc: "err handler can swallow error",
- backendError: errBackend,
- returnedError: io.EOF,
- errHandler: func(err error) error {
- require.Equal(t, errBackend, err)
+ failDestinationsWithError(&StreamParameters{
+ primary: Destination{ErrHandler: func(err error) error {
+ primaryErr = err
return nil
- },
- },
- {
- desc: "swallowed error surfaces request finalizer error",
- backendError: errBackend,
- requestFinalizerError: errRequestFinalizer,
- returnedError: errRequestFinalizer,
- errHandler: func(err error) error {
- require.Equal(t, errBackend, err)
+ }},
+ secondaries: []Destination{{ErrHandler: func(err error) error {
+ secondaryErr = err
return nil
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- tmpDir := testhelper.TempDir(t)
-
- backendListener, err := net.Listen("unix", filepath.Join(tmpDir, "backend"))
- require.NoError(t, err)
-
- backendServer := grpc.NewServer(grpc.UnknownServiceHandler(func(interface{}, grpc.ServerStream) error {
- return tc.backendError
- }))
- go func() { backendServer.Serve(backendListener) }()
- defer backendServer.Stop()
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- backendClientConn, err := grpc.DialContext(ctx, "unix://"+backendListener.Addr().String(),
- grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
- require.NoError(t, err)
- defer func() {
- require.NoError(t, backendClientConn.Close())
- }()
-
- proxyListener, err := net.Listen("unix", filepath.Join(tmpDir, "proxy"))
- require.NoError(t, err)
-
- proxyServer := grpc.NewServer(
- grpc.CustomCodec(proxy.NewCodec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
- return proxy.NewStreamParameters(
- proxy.Destination{
- Ctx: ctx,
- Conn: backendClientConn,
- ErrHandler: tc.errHandler,
- },
- nil,
- func() error { return tc.requestFinalizerError },
- nil,
- ), tc.directorError
- })),
- )
-
- go func() { proxyServer.Serve(proxyListener) }()
- defer proxyServer.Stop()
-
- proxyClientConn, err := grpc.DialContext(ctx, "unix://"+proxyListener.Addr().String(), grpc.WithInsecure())
- require.NoError(t, err)
- defer func() {
- require.NoError(t, proxyClientConn.Close())
- }()
-
- resp, err := pb.NewTestServiceClient(proxyClientConn).Ping(ctx, &pb.PingRequest{})
- require.Equal(t, tc.returnedError, err)
- require.Nil(t, resp)
- })
- }
-}
-
-func TestRegisterStreamHandlers(t *testing.T) {
- directorCalledError := errors.New("director was called")
-
- server := grpc.NewServer(
- grpc.CustomCodec(proxy.NewCodec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
- return nil, directorCalledError
- })),
- )
-
- var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool
-
- pingValue := "hello"
-
- pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
- pingStreamHandlerCalled = true
- var req pb.PingRequest
-
- if err := stream.RecvMsg(&req); err != nil {
- return err
- }
-
- require.Equal(t, pingValue, req.Value)
-
- return stream.SendMsg(nil)
- }
-
- pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
- pingEmptyStreamHandlerCalled = true
- var req pb.Empty
-
- if err := stream.RecvMsg(&req); err != nil {
- return err
- }
-
- return stream.SendMsg(nil)
- }
-
- streamers := map[string]grpc.StreamHandler{
- "Ping": pingStreamHandler,
- "PingEmpty": pingEmptyStreamHandler,
- }
-
- proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers)
-
- serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
-
- listener, err := net.Listen("unix", serverSocketPath)
- if err != nil {
- t.Fatal(err)
- }
-
- go server.Serve(listener)
- defer server.Stop()
-
- cc, err := client.Dial("unix://"+serverSocketPath, []grpc.DialOption{grpc.WithBlock()})
- require.NoError(t, err)
- defer cc.Close()
-
- testServiceClient := pb.NewTestServiceClient(cc)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue})
- require.NoError(t, err)
- require.True(t, pingStreamHandlerCalled)
-
- _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{})
- require.NoError(t, err)
- require.True(t, pingEmptyStreamHandlerCalled)
+ }}},
+ }, expectedErr)
- // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler
- _, err = testServiceClient.PingError(ctx, &pb.PingRequest{})
- require.Equal(t, status.Error(codes.Unknown, directorCalledError.Error()), err)
+ require.Equal(t, expectedErr, primaryErr)
+ require.Equal(t, expectedErr, secondaryErr)
+ })
}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 90d8f6bf9..840710d29 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -57,6 +57,8 @@ type Transaction interface {
CountSubtransactions() int
// State returns the state of each voter part of the transaction.
State() (map[string]VoteResult, error)
+ // DidCommitAnySubtransaction returns whether the transaction committed at least one subtransaction.
+ DidCommitAnySubtransaction() bool
}
// transaction is a session where a set of voters votes on one or more
@@ -184,6 +186,28 @@ func (t *transaction) CountSubtransactions() int {
return len(t.subtransactions)
}
+// DidCommitSubtransaction returns whether the transaction committed at least one subtransaction.
+func (t *transaction) DidCommitAnySubtransaction() bool {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ if len(t.subtransactions) == 0 {
+ return false
+ }
+
+ // We only need to check the first subtransaction. If it failed, there would
+ // be no further subtransactions.
+ for _, result := range t.subtransactions[0].state() {
+ // It's sufficient to find a single commit in the subtransaction
+ // to say it was committed.
+ if result == VoteCommitted {
+ return true
+ }
+ }
+
+ return false
+}
+
// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
// node hasn't yet voted on or creates a new one if the node has succeeded on
// all subtransactions. In case the node has failed on any of the