diff options
Diffstat (limited to 'internal/praefect/coordinator.go')
-rw-r--r-- | internal/praefect/coordinator.go | 81 |
1 files changed, 59 insertions, 22 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index dc1b41529..887116148 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" @@ -18,11 +19,11 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" + "gitlab.com/gitlab-org/gitaly/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" "golang.org/x/sync/errgroup" @@ -38,6 +39,12 @@ type transactionsCondition func(context.Context) bool func transactionsEnabled(context.Context) bool { return true } func transactionsDisabled(context.Context) bool { return false } +func transactionsFlag(flag featureflag.FeatureFlag) transactionsCondition { + return func(ctx context.Context) bool { + return featureflag.IsEnabled(ctx, flag) + } +} + // transactionRPCs contains the list of repository-scoped mutating calls which may take part in // transactions. An optional feature flag can be added to conditionally enable transactional // behaviour. If none is given, it's always enabled. @@ -68,6 +75,7 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.RepositoryService/CloneFromPool": transactionsEnabled, "/gitaly.RepositoryService/CloneFromPoolInternal": transactionsEnabled, "/gitaly.RepositoryService/CreateFork": transactionsEnabled, + "/gitaly.RepositoryService/CreateRepository": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromBundle": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromSnapshot": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromURL": transactionsEnabled, @@ -81,6 +89,9 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.WikiService/WikiUpdatePage": transactionsEnabled, "/gitaly.WikiService/WikiWritePage": transactionsEnabled, + "/gitaly.RepositoryService/SetConfig": transactionsFlag(featureflag.TxConfig), + "/gitaly.RepositoryService/DeleteConfig": transactionsFlag(featureflag.TxConfig), + // The following RPCs don't perform any reference updates and thus // shouldn't use transactions. "/gitaly.ObjectPoolService/CreateObjectPool": transactionsDisabled, @@ -91,9 +102,6 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.ObjectPoolService/UnlinkRepositoryFromObjectPool": transactionsDisabled, "/gitaly.RefService/PackRefs": transactionsDisabled, "/gitaly.RepositoryService/Cleanup": transactionsDisabled, - "/gitaly.RepositoryService/CreateRepository": transactionsDisabled, - "/gitaly.RepositoryService/DeleteConfig": transactionsDisabled, - "/gitaly.RepositoryService/Fsck": transactionsDisabled, "/gitaly.RepositoryService/GarbageCollect": transactionsDisabled, "/gitaly.RepositoryService/MidxRepack": transactionsDisabled, "/gitaly.RepositoryService/OptimizeRepository": transactionsDisabled, @@ -102,7 +110,6 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.RepositoryService/RepackFull": transactionsDisabled, "/gitaly.RepositoryService/RepackIncremental": transactionsDisabled, "/gitaly.RepositoryService/RestoreCustomHooks": transactionsDisabled, - "/gitaly.RepositoryService/SetConfig": transactionsDisabled, "/gitaly.RepositoryService/WriteCommitGraph": transactionsDisabled, // These shouldn't ever use transactions for the sake of not creating @@ -289,7 +296,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr "relative_path": call.targetRepo.RelativePath, }) - praefectServer, err := metadata.PraefectFromConfig(c.conf) + praefectServer, err := txinfo.PraefectFromConfig(c.conf) if err != nil { return nil, fmt.Errorf("repo scoped: could not create Praefect configuration: %w", err) } @@ -458,7 +465,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall errByNode: make(map[string]error), } - injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true) + injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true) if err != nil { return nil, err } @@ -476,7 +483,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, err } - injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false) + injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false) if err != nil { return nil, err } @@ -732,7 +739,13 @@ func (c *Coordinator) createTransactionFinalizer( nodeErrors *nodeErrors, ) func() error { return func() error { - updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + if !primaryDirtied { + // If the primary replica was not modified then we don't need to consider the secondaries + // outdated. Praefect requires the primary to be always part of the quorum, so no changes + // to secondaries would be made without primary being in agreement. + return nil + } return c.newRequestFinalizer( ctx, virtualStorage, targetRepo, route.Primary.Storage, @@ -743,9 +756,11 @@ func (c *Coordinator) createTransactionFinalizer( // getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated // after the given transaction. A node is considered outdated, if one of the following is true: // -// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware -// of transactions and thus failed to correctly assert its state matches across nodes. This is -// rather pessimistic, as it could also indicate that an RPC simply didn't change anything. +// - No subtransactions were created and the RPC was successful on the primary. This really is only +// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert +// its state matches across nodes. This is rather pessimistic, as it could also indicate that an +// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no +// subtransactions, we assume no changes were done and that the nodes failed prior to voting. // // - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all // nodes need to get replication jobs. @@ -760,7 +775,7 @@ func getUpdatedAndOutdatedSecondaries( route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, -) (updated []string, outdated []string) { +) (primaryDirtied bool, updated []string, outdated []string) { nodeErrors.Lock() defer nodeErrors.Unlock() @@ -769,17 +784,31 @@ func getUpdatedAndOutdatedSecondaries( // for them. outdated = append(outdated, route.ReplicationTargets...) + primaryErr := nodeErrors.errByNode[route.Primary.Storage] + + // If there were subtransactions, we only assume some changes were made if one of the subtransactions + // was committed. + // + // If there were no subtransactions, we assume changes were performed only if the primary successfully + // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere. + // + // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and + // the nodes simply failed before voting. + primaryDirtied = transaction.DidCommitAnySubtransaction() || + (transaction.CountSubtransactions() == 0 && primaryErr == nil) + // If the primary errored, then we need to assume that it has modified on-disk state and // thus need to replicate those changes to secondaries. - if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil { - ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction") + if primaryErr != nil { + ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) return } - // If no subtransaction happened, then the called RPC may not be aware of transactions at - // all. We thus need to assume it changed repository state and need to create replication - // jobs. + // If no subtransaction happened, then the called RPC may not be aware of transactions or + // the nodes failed before casting any votes. If the primary failed the RPC, we assume + // no changes were done and the nodes hit an error prior to voting. If the primary processed + // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere. if transaction.CountSubtransactions() == 0 { ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) @@ -795,9 +824,9 @@ func getUpdatedAndOutdatedSecondaries( return } - // If the primary node did not commit the transaction, then we must assume that it dirtied - // on-disk state. This modified state may not be what we want, but it's what we got. So in - // order to ensure a consistent state, we need to replicate. + // If the primary node did not commit the transaction but there were some subtransactions committed, + // then we must assume that it dirtied on-disk state. This modified state may not be what we want, + // but it's what we got. So in order to ensure a consistent state, we need to replicate. if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted { if state == transactions.VoteFailed { ctxlogrus.Extract(ctx).Error("transaction: primary failed vote") @@ -845,6 +874,13 @@ func (c *Coordinator) newRequestFinalizer( cause string, ) func() error { return func() error { + // Use a separate timeout for the database operations. If the request times out, the passed in context is + // canceled. We need to perform the database updates regardless whether the request was canceled or not as + // the primary replica could have been dirtied and secondaries become outdated. Otherwise we'd have no idea of + // the possible changes performed on the disk. + ctx, cancel := context.WithTimeout(helper.SuppressCancellation(ctx), 30*time.Second) + defer cancel() + log := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ "replication.cause": cause, "replication.change": change, @@ -897,7 +933,8 @@ func (c *Coordinator) newRequestFinalizer( virtualStorage, targetRepo.GetRelativePath(), primary, - append(updatedSecondaries, outdatedSecondaries...), + updatedSecondaries, + outdatedSecondaries, repositorySpecificPrimariesEnabled, variableReplicationFactorEnabled, ); err != nil { |