Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2022-03-18 18:56:38 +0300
committerToon Claes <toon@gitlab.com>2022-03-18 18:56:38 +0300
commitd84345d4f45748297ddea390158db33412f591e6 (patch)
treed6d5f623d31a7d30dce2bf8749a6f08334ef9c01
parent354435a98c201d28932d005c8bc6c2b1e0e13e37 (diff)
parentaf4ea3258f572b5f647b2d7eecf07553b41a4938 (diff)
Merge branch 'pks-operations-squash-commit-voting' into 'master'
operations: Fix missing votes on squashed commits Closes #4109 See merge request gitlab-org/gitaly!4417
-rw-r--r--internal/gitaly/service/operations/squash.go55
-rw-r--r--internal/gitaly/service/operations/squash_test.go194
-rw-r--r--internal/helper/error.go9
-rw-r--r--internal/helper/error_test.go10
-rw-r--r--internal/metadata/featureflag/ff_user_squash_quarantined_voting.go7
5 files changed, 262 insertions, 13 deletions
diff --git a/internal/gitaly/service/operations/squash.go b/internal/gitaly/service/operations/squash.go
index b73bc58ba..096dfc68b 100644
--- a/internal/gitaly/service/operations/squash.go
+++ b/internal/gitaly/service/operations/squash.go
@@ -8,10 +8,14 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/quarantine"
"gitlab.com/gitlab-org/gitaly/v14/internal/git2go"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -101,12 +105,28 @@ func (er gitError) Error() string {
func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest) (string, error) {
repo := s.localrepo(req.GetRepository())
-
repoPath, err := repo.Path()
if err != nil {
return "", helper.ErrInternalf("cannot resolve repo path: %w", err)
}
+ // All new objects are staged into a quarantine directory first so that we can do
+ // transactional voting before we commit data to disk.
+ var quarantineDir *quarantine.Dir
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ var quarantineRepo *localrepo.Repo
+ quarantineDir, quarantineRepo, err = s.quarantinedRepo(ctx, req.GetRepository())
+ if err != nil {
+ return "", helper.ErrInternalf("creating quarantine: %w", err)
+ }
+
+ repo = quarantineRepo
+ repoPath, err = quarantineRepo.Path()
+ if err != nil {
+ return "", helper.ErrInternalf("getting quarantine path: %w", err)
+ }
+ }
+
// We need to retrieve the start commit such that we can create the new commit with
// all parents of the start commit.
startCommit, err := repo.ResolveRevision(ctx, git.Revision(req.GetStartSha()+"^{commit}"))
@@ -261,5 +281,36 @@ func (s *Server) userSquash(ctx context.Context, req *gitalypb.UserSquashRequest
})
}
- return text.ChompBytes(stdout.Bytes()), nil
+ commitID := text.ChompBytes(stdout.Bytes())
+
+ // The RPC is badly designed in that it never updates any references, but only creates the
+ // objects and writes them to disk. We still use a quarantine directory to stage the new
+ // objects, vote on them and migrate them into the main directory if quorum was reached so
+ // that we don't pollute the object directory with objects we don't want to have in the
+ // first place.
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ if err := transaction.VoteOnContext(
+ ctx,
+ s.txManager,
+ voting.VoteFromData([]byte(commitID)),
+ voting.Prepared,
+ ); err != nil {
+ return "", helper.ErrAbortedf("preparatory vote on squashed commit: %w", err)
+ }
+
+ if err := quarantineDir.Migrate(); err != nil {
+ return "", helper.ErrInternalf("migrating quarantine directory: %w", err)
+ }
+
+ if err := transaction.VoteOnContext(
+ ctx,
+ s.txManager,
+ voting.VoteFromData([]byte(commitID)),
+ voting.Committed,
+ ); err != nil {
+ return "", helper.ErrAbortedf("committing vote on squashed commit: %w", err)
+ }
+ }
+
+ return commitID, nil
}
diff --git a/internal/gitaly/service/operations/squash_test.go b/internal/gitaly/service/operations/squash_test.go
index d0b111215..f20f12fe5 100644
--- a/internal/gitaly/service/operations/squash_test.go
+++ b/internal/gitaly/service/operations/squash_test.go
@@ -13,10 +13,15 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -36,6 +41,11 @@ var (
func TestUserSquash_successful(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashSuccessful)
+}
+
+func testUserSquashSuccessful(t *testing.T, ctx context.Context) {
+ t.Parallel()
for _, tc := range []struct {
desc string
@@ -53,8 +63,6 @@ func TestUserSquash_successful(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- ctx := testhelper.Context(t)
-
ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx)
repo := localrepo.NewTestRepo(t, cfg, repoProto)
@@ -90,9 +98,143 @@ func TestUserSquash_successful(t *testing.T) {
}
}
+func TestUserSquash_transactional(t *testing.T) {
+ t.Parallel()
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashTransactional)
+}
+
+func testUserSquashTransactional(t *testing.T, ctx context.Context) {
+ t.Parallel()
+ txManager := transaction.MockManager{}
+
+ ctx, cfg, client := setupOperationsServiceWithoutRepo(t, ctx,
+ testserver.WithTransactionManager(&txManager),
+ )
+
+ squashedCommitID := "c653dc8f98dba7f7a42c2e3c4b8d850d195e60b6"
+ squashedCommitVote := voting.VoteFromData([]byte(squashedCommitID))
+
+ for _, tc := range []struct {
+ desc string
+ voteFn func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error
+ expectedErr error
+ expectedVotes []voting.Vote
+ expectedExists bool
+ }{
+ {
+ desc: "successful",
+ voteFn: func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
+ return nil
+ },
+ expectedVotes: []voting.Vote{
+ // Only a single vote because we abort on the first one.
+ squashedCommitVote,
+ squashedCommitVote,
+ },
+ expectedExists: true,
+ },
+ {
+ desc: "preparatory vote failure",
+ voteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
+ return fmt.Errorf("vote failed")
+ },
+ expectedErr: helper.ErrAbortedf("preparatory vote on squashed commit: vote failed"),
+ expectedVotes: []voting.Vote{
+ squashedCommitVote,
+ },
+ expectedExists: false,
+ },
+ {
+ desc: "committing vote failure",
+ voteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
+ if phase == voting.Committed {
+ return fmt.Errorf("vote failed")
+ }
+ return nil
+ },
+ expectedErr: helper.ErrAbortedf("committing vote on squashed commit: vote failed"),
+ expectedVotes: []voting.Vote{
+ squashedCommitVote,
+ squashedCommitVote,
+ },
+ // Even though the committing vote has failed, we expect objects to have
+ // been migrated after the preparatory vote. The commit should thus exist in
+ // the repository.
+ expectedExists: true,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
+ ctx = metadata.IncomingToOutgoing(ctx)
+
+ // We need to use a voting function which simply does nothing at first so
+ // that `CreateRepository()` isn't impacted.
+ txManager.VoteFn = func(_ context.Context, _ txinfo.Transaction, _ voting.Vote, _ voting.Phase) error {
+ return nil
+ }
+
+ repoProto, _ := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{
+ Seed: gittest.SeedGitLabTest,
+ })
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ var votes []voting.Vote
+ txManager.VoteFn = func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
+ votes = append(votes, vote)
+ return tc.voteFn(ctx, tx, vote, phase)
+ }
+
+ response, err := client.UserSquash(ctx, &gitalypb.UserSquashRequest{
+ Repository: repoProto,
+ User: gittest.TestUser,
+ Author: author,
+ CommitMessage: []byte("Squashed commit"),
+ StartSha: startSha,
+ EndSha: endSha,
+ Timestamp: &timestamppb.Timestamp{Seconds: 1234512345},
+ })
+
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ if tc.expectedErr == nil {
+ require.NoError(t, err)
+ require.Equal(t, squashedCommitID, response.SquashSha)
+ } else {
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ }
+
+ require.Equal(t, tc.expectedVotes, votes)
+ } else {
+ // There is no transactional voting when the feature flag is
+ // disabled, so we'd never return an error from it either.
+ require.NoError(t, err)
+ require.Empty(t, votes)
+ }
+
+ exists, err := repo.HasRevision(ctx, git.Revision(squashedCommitID+"^{commit}"))
+ require.NoError(t, err)
+
+ // In case the feature flag is enabled we use a quarantine directory to
+ // stage the new objects. So if we fail to reach quorum in the preparatory
+ // vote we expect the object to not have been migrated to the repository. On
+ // the other hand, if the feature flag is disabled, the object would always
+ // exist no matter whether the RPC is successful or not.
+ if featureflag.UserSquashQuarantinedVoting.IsEnabled(ctx) {
+ require.Equal(t, tc.expectedExists, exists)
+ } else {
+ require.True(t, exists)
+ }
+ })
+ }
+}
+
func TestUserSquash_stableID(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashStableID)
+}
+
+func testUserSquashStableID(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repoProto, _, client := setupOperationsService(t, ctx)
@@ -150,7 +292,11 @@ func ensureSplitIndexExists(t *testing.T, cfg config.Cfg, repoDir string) bool {
func TestUserSquash_threeWayMerge(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashThreeWayMerge)
+}
+
+func testUserSquashThreeWayMerge(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repoProto, _, client := setupOperationsService(t, ctx)
@@ -184,7 +330,11 @@ func TestUserSquash_threeWayMerge(t *testing.T) {
func TestUserSquash_splitIndex(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashSplitIndex)
+}
+
+func testUserSquashSplitIndex(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repo, repoPath, client := setupOperationsService(t, ctx)
@@ -206,8 +356,11 @@ func TestUserSquash_splitIndex(t *testing.T) {
}
func TestUserSquash_renames(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashRenames)
+}
+
+func testUserSquashRenames(t *testing.T, ctx context.Context) {
t.Parallel()
- ctx := testhelper.Context(t)
ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx)
@@ -266,7 +419,11 @@ func TestUserSquash_renames(t *testing.T) {
func TestUserSquash_missingFileOnTargetBranch(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashMissingFileOnTargetBranch)
+}
+
+func testUserSquashMissingFileOnTargetBranch(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, _, repo, _, client := setupOperationsService(t, ctx)
@@ -288,7 +445,11 @@ func TestUserSquash_missingFileOnTargetBranch(t *testing.T) {
func TestUserSquash_emptyCommit(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashEmptyCommit)
+}
+
+func testUserSquashEmptyCommit(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx)
repo := localrepo.NewTestRepo(t, cfg, repoProto)
@@ -486,7 +647,10 @@ func TestUserSquash_validation(t *testing.T) {
}
func TestUserSquash_conflicts(t *testing.T) {
- testhelper.NewFeatureSets(featureflag.UserSquashImprovedErrorHandling).Run(t, testUserSquashConflicts)
+ testhelper.NewFeatureSets(
+ featureflag.UserSquashImprovedErrorHandling,
+ featureflag.UserSquashQuarantinedVoting,
+ ).Run(t, testUserSquashConflicts)
}
func testUserSquashConflicts(t *testing.T, ctx context.Context) {
@@ -542,7 +706,11 @@ func testUserSquashConflicts(t *testing.T, ctx context.Context) {
func TestUserSquash_ancestry(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.UserSquashQuarantinedVoting).Run(t, testUserSquashAncestry)
+}
+
+func testUserSquashAncestry(t *testing.T, ctx context.Context) {
+ t.Parallel()
ctx, cfg, repo, repoPath, client := setupOperationsService(t, ctx)
@@ -575,7 +743,11 @@ func TestUserSquash_ancestry(t *testing.T) {
}
func TestUserSquash_gitError(t *testing.T) {
- testhelper.NewFeatureSets(featureflag.UserSquashImprovedErrorHandling).Run(t, testUserSquashGitError)
+ t.Parallel()
+ testhelper.NewFeatureSets(
+ featureflag.UserSquashImprovedErrorHandling,
+ featureflag.UserSquashQuarantinedVoting,
+ ).Run(t, testUserSquashGitError)
}
func testUserSquashGitError(t *testing.T, ctx context.Context) {
diff --git a/internal/helper/error.go b/internal/helper/error.go
index 81f25ddcc..5196513e0 100644
--- a/internal/helper/error.go
+++ b/internal/helper/error.go
@@ -48,6 +48,9 @@ func ErrPermissionDenied(err error) error { return wrapError(codes.PermissionDen
// ErrAlreadyExists wraps err with codes.AlreadyExists, unless err is already a gRPC error.
func ErrAlreadyExists(err error) error { return wrapError(codes.AlreadyExists, err) }
+// ErrAborted wraps err with codes.Aborted, unless err is already a gRPC error.
+func ErrAborted(err error) error { return wrapError(codes.Aborted, err) }
+
// wrapError wraps the given error with the error code unless it's already a gRPC error. If given
// nil it will return nil.
func wrapError(code codes.Code, err error) error {
@@ -99,6 +102,12 @@ func ErrAlreadyExistsf(format string, a ...interface{}) error {
return formatError(codes.AlreadyExists, format, a...)
}
+// ErrAbortedf wraps a formatted error with codes.Aborted, unless the formatted error is a wrapped
+// gRPC error.
+func ErrAbortedf(format string, a ...interface{}) error {
+ return formatError(codes.Aborted, format, a...)
+}
+
// formatError will create a new error from the given format string. If the error string contains a
// %w verb and its corresponding error has a gRPC error code, then the returned error will keep this
// gRPC error code instead of using the one provided as an argument.
diff --git a/internal/helper/error_test.go b/internal/helper/error_test.go
index 05985a824..3b07f1570 100644
--- a/internal/helper/error_test.go
+++ b/internal/helper/error_test.go
@@ -60,6 +60,11 @@ func TestError(t *testing.T) {
errorf: ErrAlreadyExists,
code: codes.AlreadyExists,
},
+ {
+ desc: "Aborted",
+ errorf: ErrAborted,
+ code: codes.Aborted,
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
// tc.code and our canary test code must not
@@ -133,6 +138,11 @@ func testErrorfFormat(t *testing.T, errorFormat, errorFormatEqual string) {
errorf: ErrUnavailablef,
code: codes.Unavailable,
},
+ {
+ desc: "ErrAbortedf",
+ errorf: ErrAbortedf,
+ code: codes.Aborted,
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
require.NotEqual(t, tc.code, inputGRPCCode, "canary test code and tc.code may not be the same")
diff --git a/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go b/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go
new file mode 100644
index 000000000..0f3f8e50b
--- /dev/null
+++ b/internal/metadata/featureflag/ff_user_squash_quarantined_voting.go
@@ -0,0 +1,7 @@
+package featureflag
+
+// UserSquashQuarantinedVoting enables the use of a quarantine directory to stage all objects
+// created by UserSquash into a temporary directory. This quarantine directory will only be migrated
+// into the final repository when the RPC is successful, including a new transactional vote on the
+// object ID of the resulting squashed commit.
+var UserSquashQuarantinedVoting = NewFeatureFlag("user_squash_quarantined_voting", false)