diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-17 18:28:48 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-23 12:01:52 +0300 |
commit | cd44d9e62a272c71c96b2057c59d6eab45b4d908 (patch) | |
tree | 98cb039fae338f90875f6488a7681466c0e619c6 | |
parent | 6d24c8e6a651799f0ea2bdfd3386d58ed401407e (diff) |
praefect: Consider RPC errors when creating replication jobs
Historically, we didn't have any access to error codes of proxying
destinations if proxying to more than one node at once. This is mostly
because the proxy code just conflated all errors into one, which made
proper error handling next to impossible. As a result, the transactional
finalizer cannot and thus doesn't take into account whether any of the
nodes fails, which may lead to spotty coverage when creating replication
jobs.
Fix this shortcoming by setting up error handlers for all proxying
destinations. Like this, we can easily grab any errors they encounter
and put it into a local map, which we can then forward to the
transaction finalizer. Now there's three cases to consider:
- The primary raised an error. In that case, we cannot help it and
need to create replication jobs to all nodes.
- A secondary raised an error. In that case, we need to consider it
as outdated and need to create a replication job for that specific
node.
- Neither of them failed, in which case the node is considered to be
up-to-date.
Note that currently we still pass the error through as-is even for the
secondaries. We should instead ignore errors on secondaries, but this is
left for a subsequent commit.
-rw-r--r-- | internal/praefect/coordinator.go | 53 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 70 |
2 files changed, 118 insertions, 5 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b8e11e928..f5ffeecdb 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" @@ -357,6 +358,11 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNod return c.txMgr.RegisterTransaction(ctx, voters, threshold) } +type nodeErrors struct { + sync.Mutex + errByNode map[string]error +} + func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { targetRepo := call.targetRepo virtualStorage := call.targetRepo.StorageName @@ -406,12 +412,23 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall if err != nil { return nil, fmt.Errorf("%w: %v %v", err, route.Primary, route.Secondaries) } + finalizers = append(finalizers, transactionCleanup) + + nodeErrors := &nodeErrors{ + errByNode: make(map[string]error), + } injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true) if err != nil { return nil, err } primaryDest.Ctx = helper.IncomingToOutgoing(injectedCtx) + primaryDest.ErrHandler = func(err error) error { + nodeErrors.Lock() + defer nodeErrors.Unlock() + nodeErrors.errByNode[route.Primary.Storage] = err + return err + } for _, secondary := range route.Secondaries { secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage) @@ -428,11 +445,18 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall Ctx: helper.IncomingToOutgoing(injectedCtx), Conn: secondary.Connection, Msg: secondaryMsg, + ErrHandler: func(err error) error { + nodeErrors.Lock() + defer nodeErrors.Unlock() + nodeErrors.errByNode[secondary.Storage] = err + return err + }, }) } finalizers = append(finalizers, - transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, targetRepo, change, params), + c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, + targetRepo, change, params, nodeErrors), ) } else { finalizers = append(finalizers, @@ -655,9 +679,10 @@ func (c *Coordinator) createTransactionFinalizer( targetRepo *gitalypb.Repository, change datastore.ChangeType, params datastore.Params, + nodeErrors *nodeErrors, ) func() error { return func() error { - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction) + updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) return c.newRequestFinalizer( ctx, virtualStorage, targetRepo, route.Primary.Storage, @@ -675,18 +700,33 @@ func (c *Coordinator) createTransactionFinalizer( // - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all // nodes need to get replication jobs. // +// - The node has errored. As a special case, if the primary fails all nodes need to get replication +// jobs. +// // Note that this function cannot and should not fail: if anything goes wrong, we need to create // replication jobs to repair state. func getUpdatedAndOutdatedSecondaries( ctx context.Context, route RepositoryMutatorRoute, transaction transactions.Transaction, + nodeErrors *nodeErrors, ) (updated []string, outdated []string) { + nodeErrors.Lock() + defer nodeErrors.Unlock() + // Replication targets were not added to the transaction, most likely because they are // either not healthy or out of date. We thus need to make sure to create replication jobs // for them. outdated = append(outdated, route.ReplicationTargets...) + // If the primary errored, then we need to assume that it has modified on-disk state and + // thus need to replicate those changes to secondaries. + if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil { + ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction") + outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + return + } + // If no subtransaction happened, then the called RPC may not be aware of transactions at // all. We thus need to assume it changed repository state and need to create replication // jobs. @@ -716,9 +756,14 @@ func getUpdatedAndOutdatedSecondaries( return } - // Now we finally got the potentially happy case: in case the secondary committed, it's - // considered up to date and thus does not need replication. + // Now we finally got the potentially happy case: in case the secondary didn't run into an + // error and committed, it's considered up to date and thus does not need replication. for _, secondary := range route.Secondaries { + if nodeErrors.errByNode[secondary.Storage] != nil { + outdated = append(outdated, secondary.Storage) + continue + } + if nodeStates[secondary.Storage] != transactions.VoteCommitted { outdated = append(outdated, secondary.Storage) continue diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e991af307..4e88323af 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1483,11 +1483,14 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { type node struct { name string state transactions.VoteResult + err error } ctx, cancel := testhelper.Context() defer cancel() + anyErr := errors.New("arbitrary error") + for _, tc := range []struct { desc string primary node @@ -1514,6 +1517,13 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, }, { + desc: "single erred node", + primary: node{ + name: "primary", + err: anyErr, + }, + }, + { desc: "single node without subtransactions", primary: node{ name: "primary", @@ -1541,6 +1551,17 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedOutdated: []string{"replica"}, }, { + desc: "single erred node with replica", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + replicas: []string{"replica"}, + subtransactions: 1, + expectedOutdated: []string{"replica"}, + }, + { desc: "single node without transaction with replica", primary: node{ name: "primary", @@ -1563,6 +1584,34 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedUpdated: []string{"s1", "s2"}, }, { + desc: "multiple committed nodes with primary err", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteCommitted}, + {name: "s2", state: transactions.VoteCommitted}, + }, + subtransactions: 1, + expectedOutdated: []string{"s1", "s2"}, + }, + { + desc: "multiple committed nodes with secondary err", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteCommitted, err: anyErr}, + {name: "s2", state: transactions.VoteCommitted}, + }, + subtransactions: 1, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, + }, + { desc: "partial success", primary: node{ name: "primary", @@ -1617,11 +1666,29 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedOutdated: []string{"s1", "r1", "r2"}, expectedUpdated: []string{"s2"}, }, + { + desc: "multiple nodes with replica and partial err", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted, err: anyErr}, + }, + replicas: []string{"r1", "r2"}, + subtransactions: 1, + expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + }, } { t.Run(tc.desc, func(t *testing.T) { nodes := append(tc.secondaries, tc.primary) voters := make([]transactions.Voter, len(nodes)) + states := make(map[string]transactions.VoteResult) + nodeErrors := &nodeErrors{ + errByNode: make(map[string]error), + } for i, node := range nodes { voters[i] = transactions.Voter{ @@ -1629,6 +1696,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { Votes: 1, } states[node.name] = node.state + nodeErrors.errByNode[node.name] = node.err } transaction := mockTransaction{ @@ -1648,7 +1716,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction) + updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) }) |