Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-17 18:28:48 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-23 12:01:52 +0300
commitcd44d9e62a272c71c96b2057c59d6eab45b4d908 (patch)
tree98cb039fae338f90875f6488a7681466c0e619c6
parent6d24c8e6a651799f0ea2bdfd3386d58ed401407e (diff)
praefect: Consider RPC errors when creating replication jobs
Historically, we didn't have any access to error codes of proxying destinations if proxying to more than one node at once. This is mostly because the proxy code just conflated all errors into one, which made proper error handling next to impossible. As a result, the transactional finalizer cannot and thus doesn't take into account whether any of the nodes fails, which may lead to spotty coverage when creating replication jobs. Fix this shortcoming by setting up error handlers for all proxying destinations. Like this, we can easily grab any errors they encounter and put it into a local map, which we can then forward to the transaction finalizer. Now there's three cases to consider: - The primary raised an error. In that case, we cannot help it and need to create replication jobs to all nodes. - A secondary raised an error. In that case, we need to consider it as outdated and need to create a replication job for that specific node. - Neither of them failed, in which case the node is considered to be up-to-date. Note that currently we still pass the error through as-is even for the secondaries. We should instead ignore errors on secondaries, but this is left for a subsequent commit.
-rw-r--r--internal/praefect/coordinator.go53
-rw-r--r--internal/praefect/coordinator_test.go70
2 files changed, 118 insertions, 5 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b8e11e928..f5ffeecdb 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "sync"
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
@@ -357,6 +358,11 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNod
return c.txMgr.RegisterTransaction(ctx, voters, threshold)
}
+type nodeErrors struct {
+ sync.Mutex
+ errByNode map[string]error
+}
+
func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
targetRepo := call.targetRepo
virtualStorage := call.targetRepo.StorageName
@@ -406,12 +412,23 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
if err != nil {
return nil, fmt.Errorf("%w: %v %v", err, route.Primary, route.Secondaries)
}
+ finalizers = append(finalizers, transactionCleanup)
+
+ nodeErrors := &nodeErrors{
+ errByNode: make(map[string]error),
+ }
injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
if err != nil {
return nil, err
}
primaryDest.Ctx = helper.IncomingToOutgoing(injectedCtx)
+ primaryDest.ErrHandler = func(err error) error {
+ nodeErrors.Lock()
+ defer nodeErrors.Unlock()
+ nodeErrors.errByNode[route.Primary.Storage] = err
+ return err
+ }
for _, secondary := range route.Secondaries {
secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage)
@@ -428,11 +445,18 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
Ctx: helper.IncomingToOutgoing(injectedCtx),
Conn: secondary.Connection,
Msg: secondaryMsg,
+ ErrHandler: func(err error) error {
+ nodeErrors.Lock()
+ defer nodeErrors.Unlock()
+ nodeErrors.errByNode[secondary.Storage] = err
+ return err
+ },
})
}
finalizers = append(finalizers,
- transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, targetRepo, change, params),
+ c.createTransactionFinalizer(ctx, transaction, route, virtualStorage,
+ targetRepo, change, params, nodeErrors),
)
} else {
finalizers = append(finalizers,
@@ -655,9 +679,10 @@ func (c *Coordinator) createTransactionFinalizer(
targetRepo *gitalypb.Repository,
change datastore.ChangeType,
params datastore.Params,
+ nodeErrors *nodeErrors,
) func() error {
return func() error {
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction)
+ updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
return c.newRequestFinalizer(
ctx, virtualStorage, targetRepo, route.Primary.Storage,
@@ -675,18 +700,33 @@ func (c *Coordinator) createTransactionFinalizer(
// - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all
// nodes need to get replication jobs.
//
+// - The node has errored. As a special case, if the primary fails all nodes need to get replication
+// jobs.
+//
// Note that this function cannot and should not fail: if anything goes wrong, we need to create
// replication jobs to repair state.
func getUpdatedAndOutdatedSecondaries(
ctx context.Context,
route RepositoryMutatorRoute,
transaction transactions.Transaction,
+ nodeErrors *nodeErrors,
) (updated []string, outdated []string) {
+ nodeErrors.Lock()
+ defer nodeErrors.Unlock()
+
// Replication targets were not added to the transaction, most likely because they are
// either not healthy or out of date. We thus need to make sure to create replication jobs
// for them.
outdated = append(outdated, route.ReplicationTargets...)
+ // If the primary errored, then we need to assume that it has modified on-disk state and
+ // thus need to replicate those changes to secondaries.
+ if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction")
+ outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
+ return
+ }
+
// If no subtransaction happened, then the called RPC may not be aware of transactions at
// all. We thus need to assume it changed repository state and need to create replication
// jobs.
@@ -716,9 +756,14 @@ func getUpdatedAndOutdatedSecondaries(
return
}
- // Now we finally got the potentially happy case: in case the secondary committed, it's
- // considered up to date and thus does not need replication.
+ // Now we finally got the potentially happy case: in case the secondary didn't run into an
+ // error and committed, it's considered up to date and thus does not need replication.
for _, secondary := range route.Secondaries {
+ if nodeErrors.errByNode[secondary.Storage] != nil {
+ outdated = append(outdated, secondary.Storage)
+ continue
+ }
+
if nodeStates[secondary.Storage] != transactions.VoteCommitted {
outdated = append(outdated, secondary.Storage)
continue
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index e991af307..4e88323af 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1483,11 +1483,14 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
type node struct {
name string
state transactions.VoteResult
+ err error
}
ctx, cancel := testhelper.Context()
defer cancel()
+ anyErr := errors.New("arbitrary error")
+
for _, tc := range []struct {
desc string
primary node
@@ -1514,6 +1517,13 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
subtransactions: 1,
},
{
+ desc: "single erred node",
+ primary: node{
+ name: "primary",
+ err: anyErr,
+ },
+ },
+ {
desc: "single node without subtransactions",
primary: node{
name: "primary",
@@ -1541,6 +1551,17 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedOutdated: []string{"replica"},
},
{
+ desc: "single erred node with replica",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ replicas: []string{"replica"},
+ subtransactions: 1,
+ expectedOutdated: []string{"replica"},
+ },
+ {
desc: "single node without transaction with replica",
primary: node{
name: "primary",
@@ -1563,6 +1584,34 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedUpdated: []string{"s1", "s2"},
},
{
+ desc: "multiple committed nodes with primary err",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteCommitted},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ subtransactions: 1,
+ expectedOutdated: []string{"s1", "s2"},
+ },
+ {
+ desc: "multiple committed nodes with secondary err",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteCommitted, err: anyErr},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ subtransactions: 1,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
+ },
+ {
desc: "partial success",
primary: node{
name: "primary",
@@ -1617,11 +1666,29 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
expectedOutdated: []string{"s1", "r1", "r2"},
expectedUpdated: []string{"s2"},
},
+ {
+ desc: "multiple nodes with replica and partial err",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted, err: anyErr},
+ },
+ replicas: []string{"r1", "r2"},
+ subtransactions: 1,
+ expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
nodes := append(tc.secondaries, tc.primary)
voters := make([]transactions.Voter, len(nodes))
+
states := make(map[string]transactions.VoteResult)
+ nodeErrors := &nodeErrors{
+ errByNode: make(map[string]error),
+ }
for i, node := range nodes {
voters[i] = transactions.Voter{
@@ -1629,6 +1696,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
Votes: 1,
}
states[node.name] = node.state
+ nodeErrors.errByNode[node.name] = node.err
}
transaction := mockTransaction{
@@ -1648,7 +1716,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction)
+ updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
})