Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-03-18 14:12:54 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-03-18 15:13:30 +0300
commita12845df217007a63c54b0664f9f3eb36b5337da (patch)
tree7ef71e8d7bd969b726d7f26912fd03dc491e10ee
parent1275584eccfa21f6a9b92d4eba5e914a3232659e (diff)
operations: Fix missing votes on squashed commitspks-operations-squash-commit-voting
The UserSquash RPC computes a squashed commit by first rebasing a range of commits on top of another commit, and then collapsing these into a single commit. This RPC is notably different from almost all of our other RPCs because it never writes any references to disk, and neither does it ever execute any access checks as the other User RPCs do. This design is quite weird: - There is a known race window where the new objects are not referenced, so they could be pruned by maintenance calls. - We accept objects into the repository which may not be sanctioned by our access checks. - Replication jobs cannot replicate the squashed commit because they aren't referenced. - We never perform transactional voting because no references are updated. Together these problems show that the RPC call is misdesigned, but fixing this design would require a bigger refactoring to make it work alright in Rails. In this commit we fix the last bullet point though: because this RPC never performs transactional voting, we're always creating replication jobs after the call finishes because Praefect didn't observe any transactional votes. As mentioned though, we don't have any reference to vote on, so the best we can do is vote on the commit ID of the newly written squash commit to make sure that it is the same across nodes. This commit does so by introducing a quarantine directory that is used to stage all new objects first before they're migrated to the final repository. We then vote on the object ID of the staged squash commit. Only if this vote is successful will we successfully commit the object to disk. This commit thus solves two things: first it fixes the missing transactional voting. And second it causes us to discard all objects in case the RPC errors. Changelog: fixed
-rw-r--r--internal/gitaly/service/operations/squash.go55
-rw-r--r--internal/gitaly/service/operations/squash_test.go194
-rw-r--r--internal/metadata/featureflag/ff_user_squash_quarantined_voting.go7
3 files changed, 243 insertions, 13 deletions
diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go
index b73bc58ba..3e6d19dd5 100644
--- a/internal/gitaly/service/operations/squash.go
+++ b/internal/gitaly/service/operations/squash.go
@@ -8,10 +8,14 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/quarantine"
"gitlab.com/gitlab-org/gitaly/v14/internal/git2go"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -101,12 +105,28 @@ func (er gitError) Error() string {
func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest) (string, error) {
repo := s.localrepo(req.GetRepository())
-
repoPath, err := repo.Path()
if err != nil {
return "", helper.ErrInternalf("cannot resolve repo path: %w", err)
}
+ // All new objects are staged into a quarantine directory first so that we can do
+ // transactional voting before we commit data to disk.
+ var quarantineDir *quarantine.Dir
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ var quarantineRepo *localrepo.Repo
+ quarantineDir, quarantineRepo, err = s.quarantinedRepo(ctx, req.GetRepository())
+ if err != nil {
+ return "", helper.ErrInternalf("creating quarantine: %w", err)
+ }
+
+ repo = quarantineRepo
+ repoPath, err = quarantineRepo.Path()
+ if err != nil {
+ return "", helper.ErrInternalf("getting quarantine path: %w", err)
+ }
+ }
+
// We need to retrieve the start commit such that we can create the new commit with
// all parents of the start commit.
startCommit, err := repo.ResolveRevision(ctx, git.Revision(req.GetStartSha()+"^{commit}"))
@@ -261,5 +281,36 @@ func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest
})
}
- return text.ChompBytes(stdout.Bytes()), nil
+ commitID := text.ChompBytes(stdout.Bytes())
+
+ // The RPC is badly designed in that it never updates and references, but only creates the
+ // objects and writes them to disk. We still use a quarantine directory to stage the new
+ // objects, vote on them and migrate them into the main directory if quorum was reached so
+ // that we don't pollute the object directory with objects we don't want to have in the
+ // first place.
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ if err := transaction.VoteOnContext(
+ ctx,
+ s.txManager,
+ voting.VoteFromData([]byte(commitID)),
+ voting.Prepared,
+ ); err != nil {
+ return "", helper.ErrAbortedf("preparatory vote on squashed commit: %w", err)
+ }
+
+ if err := quarantineDir.Migrate(); err != nil {
+ return "", helper.ErrInternalf("migrating quarantine directory: %w", err)
+ }
+
+ if err := transaction.VoteOnContext(
+ ctx,
+ s.txManager,
+ voting.VoteFromData([]byte(commitID)),
+ voting.Committed,
+ ); err != nil {
+ return "", helper.ErrAbortedf("committing vote on squashed commit: %w", err)
+ }
+ }
+
+ return commitID, nil
}
diff --git a/internal/gitaly/service/operations/squash_test.go b/internal/gitaly/service/operations/squash_test.go
index d0b111215..f20f12fe5 100644
--- a/internal/gitaly/service/operations/squash_test.go
+++ b/internal/gitaly/service/operations/squash_test.go
@@ -13,10 +13,15 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -36,6 +41,11 @@ var (
func TestUserSquash_successful(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashSuccessful)
+}
+
+func testUserSquashSuccessful(t *testing.T, ctx context.Context) {
+ t.Parallel()
for _, tc := range []struct {
desc string
@@ -53,8 +63,6 @@ func TestUserSquash_successful(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- ctx := testhelper.Context(t)
-
ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx)
repo := localrepo.NewTestRepo(t, cfg, repoProto)
@@ -90,9 +98,143 @@ func TestUserSquash_successful(t *testing.T) {
}
}
+func TestUserSquash_transactional(t *testing.T) {
+ t.Parallel()
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashTransactional)
+}
+
+func testUserSquashTransactional(t *testing.T, ctx context.Context) {
+ t.Parallel()
+ txManager := transaction.MockManager{}
+
+ ctx, cfg, client := setupOperationsServiceWithoutRepo(t, ctx,
+ testserver.WithTransactionManager(&txManager),
+ )
+
+ squashedCommitID := "c653dc8f98dba7f7a42c2e3c4b8d850d195e60b6"
+ squashedCommitVote := voting.VoteFromData([]byte(squashedCommitID))
+
+ for _, tc := range []struct {
+ desc string
+ voteFn func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error
+ expectedErr error
+ expectedVotes []voting.Vote
+ expectedExists bool
+ }{
+ {
+ desc: "successful",
+ voteFn: func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
+ return nil
+ },
+ expectedVotes: []voting.Vote{
+ // Only a single vote because we abort on the first one.
+ squashedCommitVote,
+ squashedCommitVote,
+ },
+ expectedExists: true,
+ },
+ {
+ desc: "preparatory vote failure",
+ voteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
+ return fmt.Errorf("vote failed")
+ },
+ expectedErr: helper.ErrAbortedf("preparatory vote on squashed commit: vote failed"),
+ expectedVotes: []voting.Vote{
+ squashedCommitVote,
+ },
+ expectedExists: false,
+ },
+ {
+ desc: "committing vote failure",
+ voteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
+ if phase == voting.Committed {
+ return fmt.Errorf("vote failed")
+ }
+ return nil
+ },
+ expectedErr: helper.ErrAbortedf("committing vote on squashed commit: vote failed"),
+ expectedVotes: []voting.Vote{
+ squashedCommitVote,
+ squashedCommitVote,
+ },
+ // Even though the committing vote has failed, we expect objects to have
+ // been migrated after the preparatory vote. The commit should thus exist in
+ // the repository.
+ expectedExists: true,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
+ ctx = metadata.IncomingToOutgoing(ctx)
+
+ // We need to use a voting function which simply does nothing at first so
+ // that `CreateRepository()` isn't impacted.
+ txManager.VoteFn = func(_ context.Context, _ txinfo.Transaction, _ voting.Vote, _ voting.Phase) error {
+ return nil
+ }
+
+ repoProto, _ := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{
+ Seed: gittest.SeedGitLabTest,
+ })
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ var votes []voting.Vote
+ txManager.VoteFn = func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
+ votes = append(votes, vote)
+ return tc.voteFn(ctx, tx, vote, phase)
+ }
+
+ response, err := client.UserSquash(ctx, &gitalypb.UserSquashRequest{
+ Repository: repoProto,
+ User: gittest.TestUser,
+ Author: author,
+ CommitMessage: []byte("Squashed commit"),
+ StartSha: startSha,
+ EndSha: endSha,
+ Timestamp: &timestamppb.Timestamp{Seconds: 1234512345},
+ })
+
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ if tc.expectedErr == nil {
+ require.NoError(t, err)
+ require.Equal(t, squashedCommitID, response.SquashSha)
+ } else {
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ }
+
+ require.Equal(t, tc.expectedVotes, votes)
+ } else {
+ // There is no transactional voting when the feature flag is
+ // disabled, so we'd never return an error from it either.
+ require.NoError(t, err)
+ require.Empty(t, votes)
+ }
+
+ exists, err := repo.HasRevision(ctx, git.Revision(squashedCommitID+"^{commit}"))
+ require.NoError(t, err)
+
+ // In case the feature flag is enabled we use a quarantine directory to
+ // stage the new objects. So if we fail to reach quorum in the preparatory
+ // vote we expect the object to not have been migrated to the repository. On
+ // the other hand, if the feature flag is disabled, the object would always
+ // exist no matter whether the RPC is successful or not.
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ require.Equal(t, tc.expectedExists, exists)
+ } else {
+ require.True(t, exists)
+ }
+ })
+ }
+}
+
func TestUserSquash_stableID(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashStableID)
+}
+
+func testUserSquashStableID(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repoProto, _, client := setupOperationsService(t, ctx)
@@ -150,7 +292,11 @@ func ensureSplitIndexExists(t *testing.T, cfg config.Cfg, repoDir string) bool {
func TestUserSquash_threeWayMerge(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashThreeWayMerge)
+}
+
+func testUserSquashThreeWayMerge(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repoProto, _, client := setupOperationsService(t, ctx)
@@ -184,7 +330,11 @@ func TestUserSquash_threeWayMerge(t *testing.T) {
func TestUserSquash_splitIndex(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashSplitIndex)
+}
+
+func testUserSquashSplitIndex(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repo, repoPath, client := setupOperationsService(t, ctx)
@@ -206,8 +356,11 @@ func TestUserSquash_splitIndex(t *testing.T) {
}
func TestUserSquash_renames(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashRenames)
+}
+
+func testUserSquashRenames(t *testing.T, ctx context.Context) {
t.Parallel()
- ctx := testhelper.Context(t)
ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx)
@@ -266,7 +419,11 @@ func TestUserSquash_renames(t *testing.T) {
func TestUserSquash_missingFileOnTargetBranch(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashMissingFileOnTargetBranch)
+}
+
+func testUserSquashMissingFileOnTargetBranch(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, _, repo, _, client := setupOperationsService(t, ctx)
@@ -288,7 +445,11 @@ func TestUserSquash_missingFileOnTargetBranch(t *testing.T) {
func TestUserSquash_emptyCommit(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashEmptyCommit)
+}
+
+func testUserSquashEmptyCommit(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx)
repo := localrepo.NewTestRepo(t, cfg, repoProto)
@@ -486,7 +647,10 @@ func TestUserSquash_validation(t *testing.T) {
}
func TestUserSquash_conflicts(t *testing.T) {
- testhelper.NewFeatureSets(featureflag.UserSquashImprovedErrorHandling).Run(t, testUserSquashConflicts)
+ testhelper.NewFeatureSets(
+ featureflag.UserSquashImprovedErrorHandling,
+ featureflag.UserSquashQuarantinedVoting,
+ ).Run(t, testUserSquashConflicts)
}
func testUserSquashConflicts(t *testing.T, ctx context.Context) {
@@ -542,7 +706,11 @@ func testUserSquashConflicts(t *testing.T, ctx context.Context) {
func TestUserSquash_ancestry(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashAncestry)
+}
+
+func testUserSquashAncestry(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repo, repoPath, client := setupOperationsService(t, ctx)
@@ -575,7 +743,11 @@ func TestUserSquash_ancestry(t *testing.T) {
}
func TestUserSquash_gitError(t *testing.T) {
- testhelper.NewFeatureSets(featureflag.UserSquashImprovedErrorHandling).Run(t, testUserSquashGitError)
+ t.Parallel()
+ testhelper.NewFeatureSets(
+ featureflag.UserSquashImprovedErrorHandling,
+ featureflag.UserSquashQuarantinedVoting,
+ ).Run(t, testUserSquashGitError)
}
func testUserSquashGitError(t *testing.T, ctx context.Context) {
diff --git a/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go b/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go
new file mode 100644
index 000000000..7f75ebedf
--- /dev/null
+++ b/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go
@@ -0,0 +1,7 @@
+package featureflag
+
+// UserSquashQuarantinedVoting enables the use of a quarantine directory to stage all objects
+// created by UserSquash into a temporary directory. This quarantine directory will only be migrated
+// into the final repository when the RPC is successfully, including a new transactional vote on the
+// object ID of the resulting squashed commit.
+var UserSquashQuarantinedVoting = NewFeatureFlag("user_squash_quarantined_voting", false)