Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/coordinator.go')
-rw-r--r--internal/praefect/coordinator.go81
1 files changed, 59 insertions, 22 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index dc1b41529..887116148 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
+ "time"
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
@@ -18,11 +19,11 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"golang.org/x/sync/errgroup"
@@ -38,6 +39,12 @@ type transactionsCondition func(context.Context) bool
func transactionsEnabled(context.Context) bool { return true }
func transactionsDisabled(context.Context) bool { return false }
+func transactionsFlag(flag featureflag.FeatureFlag) transactionsCondition {
+ return func(ctx context.Context) bool {
+ return featureflag.IsEnabled(ctx, flag)
+ }
+}
+
// transactionRPCs contains the list of repository-scoped mutating calls which may take part in
// transactions. An optional feature flag can be added to conditionally enable transactional
// behaviour. If none is given, it's always enabled.
@@ -68,6 +75,7 @@ var transactionRPCs = map[string]transactionsCondition{
"/gitaly.RepositoryService/CloneFromPool": transactionsEnabled,
"/gitaly.RepositoryService/CloneFromPoolInternal": transactionsEnabled,
"/gitaly.RepositoryService/CreateFork": transactionsEnabled,
+ "/gitaly.RepositoryService/CreateRepository": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromBundle": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromSnapshot": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromURL": transactionsEnabled,
@@ -81,6 +89,9 @@ var transactionRPCs = map[string]transactionsCondition{
"/gitaly.WikiService/WikiUpdatePage": transactionsEnabled,
"/gitaly.WikiService/WikiWritePage": transactionsEnabled,
+ "/gitaly.RepositoryService/SetConfig": transactionsFlag(featureflag.TxConfig),
+ "/gitaly.RepositoryService/DeleteConfig": transactionsFlag(featureflag.TxConfig),
+
// The following RPCs don't perform any reference updates and thus
// shouldn't use transactions.
"/gitaly.ObjectPoolService/CreateObjectPool": transactionsDisabled,
@@ -91,9 +102,6 @@ var transactionRPCs = map[string]transactionsCondition{
"/gitaly.ObjectPoolService/UnlinkRepositoryFromObjectPool": transactionsDisabled,
"/gitaly.RefService/PackRefs": transactionsDisabled,
"/gitaly.RepositoryService/Cleanup": transactionsDisabled,
- "/gitaly.RepositoryService/CreateRepository": transactionsDisabled,
- "/gitaly.RepositoryService/DeleteConfig": transactionsDisabled,
- "/gitaly.RepositoryService/Fsck": transactionsDisabled,
"/gitaly.RepositoryService/GarbageCollect": transactionsDisabled,
"/gitaly.RepositoryService/MidxRepack": transactionsDisabled,
"/gitaly.RepositoryService/OptimizeRepository": transactionsDisabled,
@@ -102,7 +110,6 @@ var transactionRPCs = map[string]transactionsCondition{
"/gitaly.RepositoryService/RepackFull": transactionsDisabled,
"/gitaly.RepositoryService/RepackIncremental": transactionsDisabled,
"/gitaly.RepositoryService/RestoreCustomHooks": transactionsDisabled,
- "/gitaly.RepositoryService/SetConfig": transactionsDisabled,
"/gitaly.RepositoryService/WriteCommitGraph": transactionsDisabled,
// These shouldn't ever use transactions for the sake of not creating
@@ -289,7 +296,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
"relative_path": call.targetRepo.RelativePath,
})
- praefectServer, err := metadata.PraefectFromConfig(c.conf)
+ praefectServer, err := txinfo.PraefectFromConfig(c.conf)
if err != nil {
return nil, fmt.Errorf("repo scoped: could not create Praefect configuration: %w", err)
}
@@ -458,7 +465,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
errByNode: make(map[string]error),
}
- injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
+ injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
if err != nil {
return nil, err
}
@@ -476,7 +483,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, err
}
- injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false)
+ injectedCtx, err := txinfo.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false)
if err != nil {
return nil, err
}
@@ -732,7 +739,13 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ if !primaryDirtied {
+ // If the primary replica was not modified then we don't need to consider the secondaries
+ // outdated. Praefect requires the primary to be always part of the quorum, so no changes
+ // to secondaries would be made without primary being in agreement.
+ return nil
+ }
return c.newRequestFinalizer(
ctx, virtualStorage, targetRepo, route.Primary.Storage,
@@ -743,9 +756,11 @@ func (c *Coordinator) createTransactionFinalizer(
// getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated
// after the given transaction. A node is considered outdated, if one of the following is true:
//
-// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware
-// of transactions and thus failed to correctly assert its state matches across nodes. This is
-// rather pessimistic, as it could also indicate that an RPC simply didn't change anything.
+// - No subtransactions were created and the RPC was successful on the primary. This really is only
+// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert
+// its state matches across nodes. This is rather pessimistic, as it could also indicate that an
+// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no
+// subtransactions, we assume no changes were done and that the nodes failed prior to voting.
//
// - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all
// nodes need to get replication jobs.
@@ -760,7 +775,7 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
-) (updated []string, outdated []string) {
+) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
@@ -769,17 +784,31 @@ func getUpdatedAndOutdatedSecondaries(
// for them.
outdated = append(outdated, route.ReplicationTargets...)
+ primaryErr := nodeErrors.errByNode[route.Primary.Storage]
+
+ // If there were subtransactions, we only assume some changes were made if one of the subtransactions
+ // was committed.
+ //
+ // If there were no subtransactions, we assume changes were performed only if the primary successfully
+ // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere.
+ //
+ // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and
+ // the nodes simply failed before voting.
+ primaryDirtied = transaction.DidCommitAnySubtransaction() ||
+ (transaction.CountSubtransactions() == 0 && primaryErr == nil)
+
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
- if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil {
- ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction")
+ if primaryErr != nil {
+ ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
return
}
- // If no subtransaction happened, then the called RPC may not be aware of transactions at
- // all. We thus need to assume it changed repository state and need to create replication
- // jobs.
+ // If no subtransaction happened, then the called RPC may not be aware of transactions or
+ // the nodes failed before casting any votes. If the primary failed the RPC, we assume
+ // no changes were done and the nodes hit an error prior to voting. If the primary processed
+ // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere.
if transaction.CountSubtransactions() == 0 {
ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
@@ -795,9 +824,9 @@ func getUpdatedAndOutdatedSecondaries(
return
}
- // If the primary node did not commit the transaction, then we must assume that it dirtied
- // on-disk state. This modified state may not be what we want, but it's what we got. So in
- // order to ensure a consistent state, we need to replicate.
+ // If the primary node did not commit the transaction but there were some subtransactions committed,
+ // then we must assume that it dirtied on-disk state. This modified state may not be what we want,
+ // but it's what we got. So in order to ensure a consistent state, we need to replicate.
if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted {
if state == transactions.VoteFailed {
ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
@@ -845,6 +874,13 @@ func (c *Coordinator) newRequestFinalizer(
cause string,
) func() error {
return func() error {
+ // Use a separate timeout for the database operations. If the request times out, the passed in context is
+ // canceled. We need to perform the database updates regardless whether the request was canceled or not as
+ // the primary replica could have been dirtied and secondaries become outdated. Otherwise we'd have no idea of
+ // the possible changes performed on the disk.
+ ctx, cancel := context.WithTimeout(helper.SuppressCancellation(ctx), 30*time.Second)
+ defer cancel()
+
log := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
"replication.cause": cause,
"replication.change": change,
@@ -897,7 +933,8 @@ func (c *Coordinator) newRequestFinalizer(
virtualStorage,
targetRepo.GetRelativePath(),
primary,
- append(updatedSecondaries, outdatedSecondaries...),
+ updatedSecondaries,
+ outdatedSecondaries,
repositorySpecificPrimariesEnabled,
variableReplicationFactorEnabled,
); err != nil {