diff options
author | Toon Claes <toon@gitlab.com> | 2022-03-18 18:56:38 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2022-03-18 18:56:38 +0300 |
commit | d84345d4f45748297ddea390158db33412f591e6 (patch) | |
tree | d6d5f623d31a7d30dce2bf8749a6f08334ef9c01 | |
parent | 354435a98c201d28932d005c8bc6c2b1e0e13e37 (diff) | |
parent | af4ea3258f572b5f647b2d7eecf07553b41a4938 (diff) |
Merge branch 'pks-operations-squash-commit-voting' into 'master'
operations: Fix missing votes on squashed commits
Closes #4109
See merge request gitlab-org/gitaly!4417
-rw-r--r-- | internal/gitaly/service/operations/squash.go | 55 | ||||
-rw-r--r-- | internal/gitaly/service/operations/squash_test.go | 194 | ||||
-rw-r--r-- | internal/helper/error.go | 9 | ||||
-rw-r--r-- | internal/helper/error_test.go | 10 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_user_squash_quarantined_voting.go | 7 |
5 files changed, 262 insertions, 13 deletions
diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go index b73bc58ba..096dfc68b 100644 --- a/internal/gitaly/service/operations/squash.go +++ b/internal/gitaly/service/operations/squash.go @@ -8,10 +8,14 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v14/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -101,12 +105,28 @@ func (er gitError) Error() string { func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest) (string, error) { repo := s.localrepo(req.GetRepository()) - repoPath, err := repo.Path() if err != nil { return "", helper.ErrInternalf("cannot resolve repo path: %w", err) } + // All new objects are staged into a quarantine directory first so that we can do + // transactional voting before we commit data to disk. + var quarantineDir *quarantine.Dir + if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) { + var quarantineRepo *localrepo.Repo + quarantineDir, quarantineRepo, err = s.quarantinedRepo(ctx, req.GetRepository()) + if err != nil { + return "", helper.ErrInternalf("creating quarantine: %w", err) + } + + repo = quarantineRepo + repoPath, err = quarantineRepo.Path() + if err != nil { + return "", helper.ErrInternalf("getting quarantine path: %w", err) + } + } + // We need to retrieve the start commit such that we can create the new commit with // all parents of the start commit. startCommit, err := repo.ResolveRevision(ctx, git.Revision(req.GetStartSha()+"^{commit}")) @@ -261,5 +281,36 @@ func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest }) } - return text.ChompBytes(stdout.Bytes()), nil + commitID := text.ChompBytes(stdout.Bytes()) + + // The RPC is badly designed in that it never updates any references, but only creates the + // objects and writes them to disk. We still use a quarantine directory to stage the new + // objects, vote on them and migrate them into the main directory if quorum was reached so + // that we don't pollute the object directory with objects we don't want to have in the + // first place. + if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) { + if err := transaction.VoteOnContext( + ctx, + s.txManager, + voting.VoteFromData([]byte(commitID)), + voting.Prepared, + ); err != nil { + return "", helper.ErrAbortedf("preparatory vote on squashed commit: %w", err) + } + + if err := quarantineDir.Migrate(); err != nil { + return "", helper.ErrInternalf("migrating quarantine directory: %w", err) + } + + if err := transaction.VoteOnContext( + ctx, + s.txManager, + voting.VoteFromData([]byte(commitID)), + voting.Committed, + ); err != nil { + return "", helper.ErrAbortedf("committing vote on squashed commit: %w", err) + } + } + + return commitID, nil } diff --git a/internal/gitaly/service/operations/squash_test.go b/internal/gitaly/service/operations/squash_test.go index d0b111215..f20f12fe5 100644 --- a/internal/gitaly/service/operations/squash_test.go +++ b/internal/gitaly/service/operations/squash_test.go @@ -13,10 +13,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/protobuf/types/known/timestamppb" @@ -36,6 +41,11 @@ var ( func TestUserSquash_successful(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashSuccessful) +} + +func testUserSquashSuccessful(t *testing.T, ctx context.Context) { + t.Parallel() for _, tc := range []struct { desc string @@ -53,8 +63,6 @@ func TestUserSquash_successful(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - ctx := testhelper.Context(t) - ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx) repo := localrepo.NewTestRepo(t, cfg, repoProto) @@ -90,9 +98,143 @@ func TestUserSquash_successful(t *testing.T) { } } +func TestUserSquash_transactional(t *testing.T) { + t.Parallel() + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashTransactional) +} + +func testUserSquashTransactional(t *testing.T, ctx context.Context) { + t.Parallel() + txManager := transaction.MockManager{} + + ctx, cfg, client := setupOperationsServiceWithoutRepo(t, ctx, + testserver.WithTransactionManager(&txManager), + ) + + squashedCommitID := "c653dc8f98dba7f7a42c2e3c4b8d850d195e60b6" + squashedCommitVote := voting.VoteFromData([]byte(squashedCommitID)) + + for _, tc := range []struct { + desc string + voteFn func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error + expectedErr error + expectedVotes []voting.Vote + expectedExists bool + }{ + { + desc: "successful", + voteFn: func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error { + return nil + }, + expectedVotes: []voting.Vote{ + // Only a single vote because we abort on the first one. + squashedCommitVote, + squashedCommitVote, + }, + expectedExists: true, + }, + { + desc: "preparatory vote failure", + voteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error { + return fmt.Errorf("vote failed") + }, + expectedErr: helper.ErrAbortedf("preparatory vote on squashed commit: vote failed"), + expectedVotes: []voting.Vote{ + squashedCommitVote, + }, + expectedExists: false, + }, + { + desc: "committing vote failure", + voteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error { + if phase == voting.Committed { + return fmt.Errorf("vote failed") + } + return nil + }, + expectedErr: helper.ErrAbortedf("committing vote on squashed commit: vote failed"), + expectedVotes: []voting.Vote{ + squashedCommitVote, + squashedCommitVote, + }, + // Even though the committing vote has failed, we expect objects to have + // been migrated after the preparatory vote. The commit should thus exist in + // the repository. + expectedExists: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) + ctx = metadata.IncomingToOutgoing(ctx) + + // We need to use a voting function which simply does nothing at first so + // that `CreateRepository()` isn't impacted. + txManager.VoteFn = func(_ context.Context, _ txinfo.Transaction, _ voting.Vote, _ voting.Phase) error { + return nil + } + + repoProto, _ := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ + Seed: gittest.SeedGitLabTest, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + var votes []voting.Vote + txManager.VoteFn = func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error { + votes = append(votes, vote) + return tc.voteFn(ctx, tx, vote, phase) + } + + response, err := client.UserSquash(ctx, &gitalypb.UserSquashRequest{ + Repository: repoProto, + User: gittest.TestUser, + Author: author, + CommitMessage: []byte("Squashed commit"), + StartSha: startSha, + EndSha: endSha, + Timestamp: ×tamppb.Timestamp{Seconds: 1234512345}, + }) + + if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) { + if tc.expectedErr == nil { + require.NoError(t, err) + require.Equal(t, squashedCommitID, response.SquashSha) + } else { + testhelper.RequireGrpcError(t, tc.expectedErr, err) + } + + require.Equal(t, tc.expectedVotes, votes) + } else { + // There is no transactional voting when the feature flag is + // disabled, so we'd never return an error from it either. + require.NoError(t, err) + require.Empty(t, votes) + } + + exists, err := repo.HasRevision(ctx, git.Revision(squashedCommitID+"^{commit}")) + require.NoError(t, err) + + // In case the feature flag is enabled we use a quarantine directory to + // stage the new objects. So if we fail to reach quorum in the preparatory + // vote we expect the object to not have been migrated to the repository. On + // the other hand, if the feature flag is disabled, the object would always + // exist no matter whether the RPC is successful or not. + if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) { + require.Equal(t, tc.expectedExists, exists) + } else { + require.True(t, exists) + } + }) + } +} + func TestUserSquash_stableID(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashStableID) +} + +func testUserSquashStableID(t *testing.T, ctx context.Context) { + t.Parallel() ctx, cfg, repoProto, _, client := setupOperationsService(t, ctx) @@ -150,7 +292,11 @@ func ensureSplitIndexExists(t *testing.T, cfg config.Cfg, repoDir string) bool { func TestUserSquash_threeWayMerge(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashThreeWayMerge) +} + +func testUserSquashThreeWayMerge(t *testing.T, ctx context.Context) { + t.Parallel() ctx, cfg, repoProto, _, client := setupOperationsService(t, ctx) @@ -184,7 +330,11 @@ func TestUserSquash_threeWayMerge(t *testing.T) { func TestUserSquash_splitIndex(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashSplitIndex) +} + +func testUserSquashSplitIndex(t *testing.T, ctx context.Context) { + t.Parallel() ctx, cfg, repo, repoPath, client := setupOperationsService(t, ctx) @@ -206,8 +356,11 @@ func TestUserSquash_splitIndex(t *testing.T) { } func TestUserSquash_renames(t *testing.T) { + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashRenames) +} + +func testUserSquashRenames(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx) @@ -266,7 +419,11 @@ func TestUserSquash_renames(t *testing.T) { func TestUserSquash_missingFileOnTargetBranch(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashMissingFileOnTargetBranch) +} + +func testUserSquashMissingFileOnTargetBranch(t *testing.T, ctx context.Context) { + t.Parallel() ctx, _, repo, _, client := setupOperationsService(t, ctx) @@ -288,7 +445,11 @@ func TestUserSquash_missingFileOnTargetBranch(t *testing.T) { func TestUserSquash_emptyCommit(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashEmptyCommit) +} + +func testUserSquashEmptyCommit(t *testing.T, ctx context.Context) { + t.Parallel() ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx) repo := localrepo.NewTestRepo(t, cfg, repoProto) @@ -486,7 +647,10 @@ func TestUserSquash_validation(t *testing.T) { } func TestUserSquash_conflicts(t *testing.T) { - testhelper.NewFeatureSets(featureflag.UserSquashImprovedErrorHandling).Run(t, testUserSquashConflicts) + testhelper.NewFeatureSets( + featureflag.UserSquashImprovedErrorHandling, + featureflag.UserSquashQuarantinedVoting, + ).Run(t, testUserSquashConflicts) } func testUserSquashConflicts(t *testing.T, ctx context.Context) { @@ -542,7 +706,11 @@ func testUserSquashConflicts(t *testing.T, ctx context.Context) { func TestUserSquash_ancestry(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashAncestry) +} + +func testUserSquashAncestry(t *testing.T, ctx context.Context) { + t.Parallel() ctx, cfg, repo, repoPath, client := setupOperationsService(t, ctx) @@ -575,7 +743,11 @@ func TestUserSquash_ancestry(t *testing.T) { } func TestUserSquash_gitError(t *testing.T) { - testhelper.NewFeatureSets(featureflag.UserSquashImprovedErrorHandling).Run(t, testUserSquashGitError) + t.Parallel() + testhelper.NewFeatureSets( + featureflag.UserSquashImprovedErrorHandling, + featureflag.UserSquashQuarantinedVoting, + ).Run(t, testUserSquashGitError) } func testUserSquashGitError(t *testing.T, ctx context.Context) { diff --git a/internal/helper/error.go b/internal/helper/error.go index 81f25ddcc..5196513e0 100644 --- a/internal/helper/error.go +++ b/internal/helper/error.go @@ -48,6 +48,9 @@ func ErrPermissionDenied(err error) error { return wrapError(codes.PermissionDen // ErrAlreadyExists wraps err with codes.AlreadyExists, unless err is already a gRPC error. func ErrAlreadyExists(err error) error { return wrapError(codes.AlreadyExists, err) } +// ErrAborted wraps err with codes.Aborted, unless err is already a gRPC error. +func ErrAborted(err error) error { return wrapError(codes.Aborted, err) } + // wrapError wraps the given error with the error code unless it's already a gRPC error. If given // nil it will return nil. func wrapError(code codes.Code, err error) error { @@ -99,6 +102,12 @@ func ErrAlreadyExistsf(format string, a ...interface{}) error { return formatError(codes.AlreadyExists, format, a...) } +// ErrAbortedf wraps a formatted error with codes.Aborted, unless the formatted error is a wrapped +// gRPC error. +func ErrAbortedf(format string, a ...interface{}) error { + return formatError(codes.Aborted, format, a...) +} + // formatError will create a new error from the given format string. If the error string contains a // %w verb and its corresponding error has a gRPC error code, then the returned error will keep this // gRPC error code instead of using the one provided as an argument. diff --git a/internal/helper/error_test.go b/internal/helper/error_test.go index 05985a824..3b07f1570 100644 --- a/internal/helper/error_test.go +++ b/internal/helper/error_test.go @@ -60,6 +60,11 @@ func TestError(t *testing.T) { errorf: ErrAlreadyExists, code: codes.AlreadyExists, }, + { + desc: "Aborted", + errorf: ErrAborted, + code: codes.Aborted, + }, } { t.Run(tc.desc, func(t *testing.T) { // tc.code and our canary test code must not @@ -133,6 +138,11 @@ func testErrorfFormat(t *testing.T, errorFormat, errorFormatEqual string) { errorf: ErrUnavailablef, code: codes.Unavailable, }, + { + desc: "ErrAbortedf", + errorf: ErrAbortedf, + code: codes.Aborted, + }, } { t.Run(tc.desc, func(t *testing.T) { require.NotEqual(t, tc.code, inputGRPCCode, "canary test code and tc.code may not be the same") diff --git a/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go b/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go new file mode 100644 index 000000000..0f3f8e50b --- /dev/null +++ b/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go @@ -0,0 +1,7 @@ +package featureflag + +// UserSquashQuarantinedVoting enables the use of a quarantine directory to stage all objects +// created by UserSquash into a temporary directory. This quarantine directory will only be migrated +// into the final repository when the RPC is successful, including a new transactional vote on the +// object ID of the resulting squashed commit. +var UserSquashQuarantinedVoting = NewFeatureFlag("user_squash_quarantined_voting", false) |