diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-04-09 14:13:01 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-04-09 14:13:01 +0300 |
commit | 2ff8bb5b69e7024607e6820a5c6aed1c7033e45e (patch) | |
tree | 749206af0762875fd674e861aeed613e2f050c6f | |
parent | 23f6e1516567cdb916f00995e7ed43e4dff68809 (diff) | |
parent | d7d7f619ff97ece46d912c8be0a35a93d7a80c73 (diff) |
Merge branch 'pks-ref-delete-refs-voting' into 'master'
ref: Fix missing votes for `DeleteRefs()` RPC
See merge request gitlab-org/gitaly!3347
21 files changed, 366 insertions, 71 deletions
diff --git a/changelogs/unreleased/pks-ref-delete-refs-voting.yml b/changelogs/unreleased/pks-ref-delete-refs-voting.yml new file mode 100644 index 000000000..8be7e6c61 --- /dev/null +++ b/changelogs/unreleased/pks-ref-delete-refs-voting.yml @@ -0,0 +1,5 @@ +--- +title: 'ref: Fix missing votes for `DeleteRefs()` RPC' +merge_request: 3347 +author: +type: fixed diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go index fc7779851..c80f1764e 100644 --- a/internal/git/updateref/updateref.go +++ b/internal/git/updateref/updateref.go @@ -70,7 +70,9 @@ func New(ctx context.Context, conf config.Cfg, gitCmdFactory git.CommandFactory, // transactional behaviour. Which effectively means that without an // explicit "commit", no changes will be inadvertently committed to // disk. - fmt.Fprintf(cmd, "start\x00") + if _, err := cmd.Write([]byte("start\x00")); err != nil { + return nil, err + } return &Updater{repo: repo, cmd: cmd, stderr: &stderr}, nil } @@ -94,9 +96,19 @@ func (u *Updater) Delete(reference git.ReferenceName) error { return err } +// Prepare prepares the reference transaction by locking all references and determining their +// current values. The updates are not yet committed and will be rolled back in case there is no +// call to `Wait()`. This call is optional. +func (u *Updater) Prepare() error { + _, err := fmt.Fprintf(u.cmd, "prepare\x00") + return err +} + // Wait applies the commands specified in other calls to the Updater func (u *Updater) Wait() error { - fmt.Fprintf(u.cmd, "commit\x00") + if _, err := u.cmd.Write([]byte("commit\x00")); err != nil { + return err + } if err := u.cmd.Wait(); err != nil { return fmt.Errorf("git update-ref: %v, stderr: %q", err, u.stderr) diff --git a/internal/git/updateref/updateref_test.go b/internal/git/updateref/updateref_test.go index 8da1d1664..d439e444a 100644 --- a/internal/git/updateref/updateref_test.go +++ b/internal/git/updateref/updateref_test.go @@ -81,6 +81,7 @@ func TestUpdate(t *testing.T) { require.NotEqual(t, commit.Id, sha, "%s points to HEAD: %s in the test repository", ref.String(), sha) require.NoError(t, updater.Update(ref, sha, "")) + require.NoError(t, updater.Prepare()) require.NoError(t, updater.Wait()) // check the ref was updated @@ -115,6 +116,24 @@ func TestDelete(t *testing.T) { require.Equal(t, localrepo.ErrObjectNotFound, err, "expected 'not found' error got %v", err) } +func TestUpdater_prepareLocksTransaction(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + _, repo, updater := setupUpdater(t, ctx) + + commit, logErr := repo.ReadCommit(ctx, "refs/heads/master") + require.NoError(t, logErr) + + require.NoError(t, updater.Update("refs/heads/feature", commit.Id, "")) + require.NoError(t, updater.Prepare()) + require.NoError(t, updater.Update("refs/heads/feature", commit.Id, "")) + + err := updater.Wait() + require.Error(t, err, "cannot update after prepare") + require.Contains(t, err.Error(), "fatal: prepared transactions can only be closed") +} + func TestBulkOperation(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/gitaly/hook/referencetransaction.go b/internal/gitaly/hook/referencetransaction.go index 10475c251..1028ab367 100644 --- a/internal/gitaly/hook/referencetransaction.go +++ b/internal/gitaly/hook/referencetransaction.go @@ -60,7 +60,7 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state hash := sha1.Sum(changes) - if err := m.voteOnTransaction(ctx, hash[:], payload); err != nil { + if err := m.voteOnTransaction(ctx, hash, payload); err != nil { return fmt.Errorf("error voting on transaction: %w", err) } diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go index 4b5ed2266..8144cd9c9 100644 --- a/internal/gitaly/hook/transactions.go +++ b/internal/gitaly/hook/transactions.go @@ -3,28 +3,12 @@ package hook import ( "context" "errors" - "fmt" - "time" "gitlab.com/gitlab-org/gitaly/internal/git" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" ) -const ( - // transactionTimeout is the timeout used for all transactional - // actions like voting and stopping of transactions. This timeout is - // quite high: usually, a transaction should finish in at most a few - // milliseconds. There are cases though where it may take a lot longer, - // like when executing logic on the primary node only: the primary's - // vote will be delayed until that logic finishes while secondaries are - // waiting for the primary to cast its vote on the transaction. Given - // that the primary-only logic's execution time scales with repository - // size for the access checks and that it is potentially even unbounded - // due to custom hooks, we thus use a high timeout. It shouldn't - // normally be hit, but if it is hit then it indicates a real problem. - transactionTimeout = 5 * time.Minute -) - func isPrimary(payload git.HooksPayload) bool { if payload.Transaction == nil { return true @@ -45,24 +29,14 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git. if payload.Praefect == nil { return errors.New("transaction without Praefect server") } - - transactionCtx, cancel := context.WithTimeout(ctx, transactionTimeout) - defer cancel() - - if err := handler(transactionCtx, *payload.Transaction, *payload.Praefect); err != nil { - // Add some additional context to cancellation errors so that - // we know which of the contexts got canceled. - if errors.Is(err, context.Canceled) && errors.Is(transactionCtx.Err(), context.Canceled) && ctx.Err() == nil { - return fmt.Errorf("transaction timeout %s exceeded: %w", transactionTimeout, err) - } - + if err := handler(ctx, *payload.Transaction, *payload.Praefect); err != nil { return err } return nil } -func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, payload git.HooksPayload) error { +func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash transaction.Vote, payload git.HooksPayload) error { return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { return m.txManager.Vote(ctx, tx, praefect, hash) }) diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go index 97c4ec469..624618d2a 100644 --- a/internal/gitaly/hook/transactions_test.go +++ b/internal/gitaly/hook/transactions_test.go @@ -13,24 +13,12 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg" ) -type mockTransactionManager struct { - vote func(context.Context, metadata.Transaction, metadata.PraefectServer, []byte) error - stop func(context.Context, metadata.Transaction, metadata.PraefectServer) error -} - -func (m *mockTransactionManager) Vote(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote []byte) error { - return m.vote(ctx, tx, praefect, vote) -} - -func (m *mockTransactionManager) Stop(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { - return m.stop(ctx, tx, praefect) -} - func TestHookManager_stopCalled(t *testing.T) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) @@ -43,7 +31,7 @@ func TestHookManager_stopCalled(t *testing.T) { Token: "foo", } - var mockTxMgr mockTransactionManager + var mockTxMgr transaction.MockManager hookManager := NewManager(config.NewLocator(cfg), &mockTxMgr, GitlabAPIStub, cfg) hooksPayload, err := git.NewHooksPayload( @@ -113,7 +101,7 @@ func TestHookManager_stopCalled(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { wasInvoked := false - mockTxMgr.stop = func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { + mockTxMgr.StopFn = func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { require.Equal(t, expectedTx, tx) require.Equal(t, expectedPraefect, praefect) wasInvoked = true @@ -130,8 +118,8 @@ func TestHookManager_stopCalled(t *testing.T) { func TestHookManager_contextCancellationCancelsVote(t *testing.T) { cfg, repo, _ := testcfg.BuildWithRepo(t) - mockTxMgr := mockTransactionManager{ - vote: func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote []byte) error { + mockTxMgr := transaction.MockManager{ + VoteFn: func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote transaction.Vote) error { <-ctx.Done() return fmt.Errorf("mock error: %s", ctx.Err()) }, diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index 7803f5a9d..47030d29a 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -90,7 +90,7 @@ func runOperationServiceServer(t *testing.T) (string, func()) { gitalypb.RegisterOperationServiceServer(srv.GrpcServer(), server) gitalypb.RegisterHookServiceServer(srv.GrpcServer(), hook.NewServer(config.Config, hookManager, gitCmdFactory)) gitalypb.RegisterRepositoryServiceServer(srv.GrpcServer(), repository.NewServer(config.Config, RubyServer, locator, txManager, gitCmdFactory)) - gitalypb.RegisterRefServiceServer(srv.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory)) + gitalypb.RegisterRefServiceServer(srv.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory, txManager)) gitalypb.RegisterCommitServiceServer(srv.GrpcServer(), commit.NewServer(config.Config, locator, gitCmdFactory, nil)) gitalypb.RegisterSSHServiceServer(srv.GrpcServer(), ssh.NewServer(config.Config, locator, gitCmdFactory)) reflection.Register(srv.GrpcServer()) diff --git a/internal/gitaly/service/ref/delete_refs.go b/internal/gitaly/service/ref/delete_refs.go index 3db189171..f4aef41d0 100644 --- a/internal/gitaly/service/ref/delete_refs.go +++ b/internal/gitaly/service/ref/delete_refs.go @@ -9,6 +9,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" @@ -20,23 +21,45 @@ func (s *server) DeleteRefs(ctx context.Context, in *gitalypb.DeleteRefsRequest) return nil, status.Errorf(codes.InvalidArgument, "DeleteRefs: %v", err) } - updater, err := updateref.New(ctx, s.cfg, s.gitCmdFactory, in.GetRepository()) + refnames, err := s.refsToRemove(ctx, in) if err != nil { - if errors.Is(err, git.ErrInvalidArg) { - return nil, helper.ErrInvalidArgument(err) - } return nil, helper.ErrInternal(err) } - refnames, err := s.refsToRemove(ctx, in) + updater, err := updateref.New(ctx, s.cfg, s.gitCmdFactory, in.GetRepository()) if err != nil { + if errors.Is(err, git.ErrInvalidArg) { + return nil, helper.ErrInvalidArgument(err) + } return nil, helper.ErrInternal(err) } + voteHash := transaction.NewVoteHash() for _, ref := range refnames { if err := updater.Delete(ref); err != nil { return &gitalypb.DeleteRefsResponse{GitError: err.Error()}, nil } + + if _, err := voteHash.Write([]byte(ref.String() + "\n")); err != nil { + return nil, helper.ErrInternalf("could not update vote hash: %v", err) + } + } + + if err := updater.Prepare(); err != nil { + return nil, helper.ErrInternalf("could not prepare ref update: %v", err) + } + + vote, err := voteHash.Vote() + if err != nil { + return nil, helper.ErrInternalf("could not compute vote: %v", err) + } + + // All deletes we're doing in this RPC are force deletions. Because we're required to filter + // out transactions which only consist of force deletions, we never do any voting via the + // reference-transaction hook here. Instead, we need to resort to a manual vote which is + // simply the concatenation of all reference we're about to delete. + if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil { + return nil, helper.ErrInternal(err) } if err := updater.Wait(); err != nil { diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index 80c16ff89..09585920b 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -1,6 +1,7 @@ package ref import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -8,7 +9,14 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/hook" + hookservice "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/hook" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" ) @@ -69,6 +77,77 @@ func TestSuccessfulDeleteRefs(t *testing.T) { } } +func TestDeleteRefs_transaction(t *testing.T) { + cfg := testcfg.Build(t) + + testhelper.ConfigureGitalyHooksBin(t, cfg) + + var votes int + txManager := &transaction.MockManager{ + VoteFn: func(context.Context, metadata.Transaction, metadata.PraefectServer, transaction.Vote) error { + votes++ + return nil + }, + } + + locator := config.NewLocator(cfg) + hookManager := hook.NewManager(locator, txManager, hook.GitlabAPIStub, cfg) + gitCmdFactory := git.NewExecCommandFactory(cfg) + + srv := testhelper.NewServer(t, nil, nil, testhelper.WithInternalSocket(cfg)) + gitalypb.RegisterRefServiceServer(srv.GrpcServer(), NewServer(cfg, locator, gitCmdFactory, txManager)) + gitalypb.RegisterHookServiceServer(srv.GrpcServer(), hookservice.NewServer(cfg, hookManager, gitCmdFactory)) + srv.Start(t) + t.Cleanup(srv.Stop) + + client, conn := newRefServiceClient(t, "unix://"+srv.Socket()) + t.Cleanup(func() { conn.Close() }) + + ctx, cancel := testhelper.Context() + t.Cleanup(cancel) + + ctx, err := metadata.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) + ctx, err = (&metadata.PraefectServer{SocketPath: "i-dont-care"}).Inject(ctx) + require.NoError(t, err) + ctx = helper.IncomingToOutgoing(ctx) + + for _, tc := range []struct { + desc string + request *gitalypb.DeleteRefsRequest + expectedVotes int + }{ + { + desc: "delete nothing", + request: &gitalypb.DeleteRefsRequest{ + ExceptWithPrefix: [][]byte{[]byte("refs/")}, + }, + expectedVotes: 1, + }, + { + desc: "delete all refs", + request: &gitalypb.DeleteRefsRequest{ + ExceptWithPrefix: [][]byte{[]byte("nonexisting/prefix/")}, + }, + expectedVotes: 1, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + votes = 0 + + repo, _, cleanup := gittest.CloneRepoAtStorage(t, cfg.Storages[0], tc.desc) + t.Cleanup(cleanup) + tc.request.Repository = repo + + response, err := client.DeleteRefs(ctx, tc.request) + require.NoError(t, err) + require.Empty(t, response.GitError) + + require.Equal(t, tc.expectedVotes, votes) + }) + } +} + func TestFailedDeleteRefsRequestDueToGitError(t *testing.T) { _, repo, _, client := setupRefService(t) diff --git a/internal/gitaly/service/ref/server.go b/internal/gitaly/service/ref/server.go index 610e98e0f..7c0da1771 100644 --- a/internal/gitaly/service/ref/server.go +++ b/internal/gitaly/service/ref/server.go @@ -3,17 +3,24 @@ package ref import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) type server struct { cfg config.Cfg + txManager transaction.Manager locator storage.Locator gitCmdFactory git.CommandFactory } // NewServer creates a new instance of a grpc RefServer -func NewServer(cfg config.Cfg, locator storage.Locator, gitCmdFactory git.CommandFactory) gitalypb.RefServiceServer { - return &server{cfg: cfg, locator: locator, gitCmdFactory: gitCmdFactory} +func NewServer(cfg config.Cfg, locator storage.Locator, gitCmdFactory git.CommandFactory, txManager transaction.Manager) gitalypb.RefServiceServer { + return &server{ + cfg: cfg, + txManager: txManager, + locator: locator, + gitCmdFactory: gitCmdFactory, + } } diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index 1b9597b18..81ed79fdb 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -77,7 +77,7 @@ func runRefServiceServer(t testing.TB, cfg config.Cfg) string { gitCmdFactory := git.NewExecCommandFactory(cfg) srv := testhelper.NewServer(t, nil, nil, testhelper.WithInternalSocket(cfg)) - gitalypb.RegisterRefServiceServer(srv.GrpcServer(), NewServer(cfg, locator, gitCmdFactory)) + gitalypb.RegisterRefServiceServer(srv.GrpcServer(), NewServer(cfg, locator, gitCmdFactory, txManager)) gitalypb.RegisterHookServiceServer(srv.GrpcServer(), hookservice.NewServer(cfg, hookManager, gitCmdFactory)) srv.Start(t) t.Cleanup(srv.Stop) diff --git a/internal/gitaly/service/register.go b/internal/gitaly/service/register.go index 7b9934bf2..9941c5887 100644 --- a/internal/gitaly/service/register.go +++ b/internal/gitaly/service/register.go @@ -77,7 +77,7 @@ func RegisterAll( gitalypb.RegisterDiffServiceServer(grpcServer, diff.NewServer(cfg, locator, gitCmdFactory)) gitalypb.RegisterNamespaceServiceServer(grpcServer, namespace.NewServer(locator)) gitalypb.RegisterOperationServiceServer(grpcServer, operations.NewServer(cfg, rubyServer, hookManager, locator, conns, gitCmdFactory)) - gitalypb.RegisterRefServiceServer(grpcServer, ref.NewServer(cfg, locator, gitCmdFactory)) + gitalypb.RegisterRefServiceServer(grpcServer, ref.NewServer(cfg, locator, gitCmdFactory, txManager)) gitalypb.RegisterRepositoryServiceServer(grpcServer, repository.NewServer(cfg, rubyServer, locator, txManager, gitCmdFactory)) gitalypb.RegisterSSHServiceServer(grpcServer, ssh.NewServer( cfg, diff --git a/internal/gitaly/service/remote/fetch_internal_remote_test.go b/internal/gitaly/service/remote/fetch_internal_remote_test.go index b34dda6e8..5f537dcee 100644 --- a/internal/gitaly/service/remote/fetch_internal_remote_test.go +++ b/internal/gitaly/service/remote/fetch_internal_remote_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/backchannel" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" @@ -15,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ref" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/remote" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ssh" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/storage" @@ -71,13 +73,14 @@ func testSuccessfulFetchInternalRemote(t *testing.T, ctx context.Context) { locator := config.NewLocator(config.Config) gitCmdFactory := git.NewExecCommandFactory(config.Config) + txManager := transaction.NewManager(config.Config, backchannel.NewRegistry()) hookManager := &mockHookManager{} gitaly0Server := testhelper.NewServer(t, nil, nil, testhelper.WithStorages([]string{"gitaly-0"}), testhelper.WithInternalSocket(config.Config), ) gitalypb.RegisterSSHServiceServer(gitaly0Server.GrpcServer(), ssh.NewServer(config.Config, locator, gitCmdFactory)) - gitalypb.RegisterRefServiceServer(gitaly0Server.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory)) + gitalypb.RegisterRefServiceServer(gitaly0Server.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory, txManager)) gitalypb.RegisterHookServiceServer(gitaly0Server.GrpcServer(), hook.NewServer(config.Config, hookManager, gitCmdFactory)) reflection.Register(gitaly0Server.GrpcServer()) gitaly0Server.Start(t) diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go index 6f7190b26..7a876fb81 100644 --- a/internal/gitaly/service/repository/apply_gitattributes.go +++ b/internal/gitaly/service/repository/apply_gitattributes.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" @@ -107,7 +108,12 @@ func (s *server) vote(ctx context.Context, oid git.ObjectID) error { return fmt.Errorf("vote with invalid object ID: %w", err) } - if err := s.txManager.Vote(ctx, tx, *praefect, hash); err != nil { + vote, err := transaction.VoteFromHash(hash) + if err != nil { + return fmt.Errorf("cannot convert OID to vote: %w", err) + } + + if err := s.txManager.Vote(ctx, tx, *praefect, vote); err != nil { return fmt.Errorf("vote failed: %w", err) } diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go index 906515c29..0c12ef959 100644 --- a/internal/gitaly/transaction/manager.go +++ b/internal/gitaly/transaction/manager.go @@ -4,6 +4,8 @@ import ( "context" "encoding/hex" "errors" + "fmt" + "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" @@ -15,6 +17,21 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) +const ( + // transactionTimeout is the timeout used for all transactional + // actions like voting and stopping of transactions. This timeout is + // quite high: usually, a transaction should finish in at most a few + // milliseconds. There are cases though where it may take a lot longer, + // like when executing logic on the primary node only: the primary's + // vote will be delayed until that logic finishes while secondaries are + // waiting for the primary to cast its vote on the transaction. Given + // that the primary-only logic's execution time scales with repository + // size for the access checks and that it is potentially even unbounded + // due to custom hooks, we thus use a high timeout. It shouldn't + // normally be hit, but if it is hit then it indicates a real problem. + transactionTimeout = 5 * time.Minute +) + var ( // ErrTransactionAborted indicates a transaction was aborted, either // because it timed out or because the vote failed to reach quorum. @@ -30,7 +47,7 @@ var ( type Manager interface { // Vote casts a vote on the given transaction which is hosted by the // given Praefect server. - Vote(context.Context, metadata.Transaction, metadata.PraefectServer, []byte) error + Vote(context.Context, metadata.Transaction, metadata.PraefectServer, Vote) error // Stop gracefully stops the given transaction which is hosted by the // given Praefect server. @@ -86,7 +103,7 @@ func (m *PoolManager) getTransactionClient(ctx context.Context, server metadata. // Vote connects to the given server and casts hash as a vote for the // transaction identified by tx. -func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer, hash []byte) error { +func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer, hash Vote) error { client, err := m.getTransactionClient(ctx, server) if err != nil { return err @@ -95,17 +112,26 @@ func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server logger := m.log(ctx).WithFields(logrus.Fields{ "transaction.id": tx.ID, "transaction.voter": tx.Node, - "transaction.hash": hex.EncodeToString(hash), + "transaction.hash": hex.EncodeToString(hash.Bytes()), }) defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration() - response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + transactionCtx, cancel := context.WithTimeout(ctx, transactionTimeout) + defer cancel() + + response, err := client.VoteTransaction(transactionCtx, &gitalypb.VoteTransactionRequest{ TransactionId: tx.ID, Node: tx.Node, - ReferenceUpdatesHash: hash, + ReferenceUpdatesHash: hash.Bytes(), }) if err != nil { + // Add some additional context to cancellation errors so that + // we know which of the contexts got canceled. + if errors.Is(err, context.Canceled) && errors.Is(transactionCtx.Err(), context.Canceled) && ctx.Err() == nil { + return fmt.Errorf("transaction timeout %s exceeded: %w", transactionTimeout, err) + } + logger.WithError(err).Error("vote failed") return err } @@ -148,3 +174,15 @@ func (m *PoolManager) Stop(ctx context.Context, tx metadata.Transaction, server func (m *PoolManager) log(ctx context.Context) logrus.FieldLogger { return ctxlogrus.Extract(ctx).WithField("component", "transaction.PoolManager") } + +// VoteOnContext casts the vote on a transaction identified by the context, if there is any. +func VoteOnContext(ctx context.Context, m Manager, vote Vote) error { + transaction, praefect, err := metadata.TransactionMetadataFromContext(ctx) + if err != nil { + return err + } + if transaction == nil { + return nil + } + return m.Vote(ctx, *transaction, *praefect, vote) +} diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index c9b5b44dd..fb14b08cc 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -49,7 +49,7 @@ func TestPoolManager_Vote(t *testing.T) { for _, tc := range []struct { desc string transaction metadata.Transaction - vote []byte + vote Vote voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) expectedErr error }{ @@ -59,11 +59,11 @@ func TestPoolManager_Vote(t *testing.T) { ID: 1, Node: "node", }, - vote: []byte("foobar"), + vote: VoteFromData([]byte("foobar")), voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { require.Equal(t, uint64(1), request.TransactionId) require.Equal(t, "node", request.Node) - require.Equal(t, request.ReferenceUpdatesHash, []byte("foobar")) + require.Equal(t, request.ReferenceUpdatesHash, VoteFromData([]byte("foobar")).Bytes()) return &gitalypb.VoteTransactionResponse{ State: gitalypb.VoteTransactionResponse_COMMIT, diff --git a/internal/gitaly/transaction/mock.go b/internal/gitaly/transaction/mock.go new file mode 100644 index 000000000..1f617909d --- /dev/null +++ b/internal/gitaly/transaction/mock.go @@ -0,0 +1,30 @@ +package transaction + +import ( + "context" + "errors" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" +) + +// MockManager is a mock Manager for use in tests. +type MockManager struct { + VoteFn func(context.Context, metadata.Transaction, metadata.PraefectServer, Vote) error + StopFn func(context.Context, metadata.Transaction, metadata.PraefectServer) error +} + +// Vote calls the MockManager's Vote function, if set. Otherwise, it returns an error. +func (m *MockManager) Vote(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote Vote) error { + if m.VoteFn == nil { + return errors.New("mock does not implement Vote function") + } + return m.VoteFn(ctx, tx, praefect, vote) +} + +// Stop calls the MockManager's Stop function, if set. Otherwise, it returns an error. +func (m *MockManager) Stop(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error { + if m.StopFn == nil { + return errors.New("mock does not implement Stop function") + } + return m.StopFn(ctx, tx, praefect) +} diff --git a/internal/gitaly/transaction/vote.go b/internal/gitaly/transaction/vote.go new file mode 100644 index 000000000..a205a3785 --- /dev/null +++ b/internal/gitaly/transaction/vote.go @@ -0,0 +1,52 @@ +package transaction + +import ( + "crypto/sha1" + "fmt" + "hash" +) + +const ( + // voteSize is the number of bytes of a vote. + voteSize = sha1.Size +) + +// Vote is a vote cast by a node. +type Vote [voteSize]byte + +// Bytes returns a byte slice containing the hash. +func (v Vote) Bytes() []byte { + return v[:] +} + +// VoteFromHash converts the given byte slice containing a hash into a vote. +func VoteFromHash(bytes []byte) (Vote, error) { + if len(bytes) != voteSize { + return Vote{}, fmt.Errorf("invalid vote length %d", len(bytes)) + } + + var vote Vote + copy(vote[:], bytes) + + return vote, nil +} + +// VoteFromData hashes the given data and converts it to a vote. +func VoteFromData(data []byte) Vote { + return sha1.Sum(data) +} + +// VoteHash is the hash structure used to compute a Vote from arbitrary data. +type VoteHash struct { + hash.Hash +} + +// NewVoteHash returns a new VoteHash. +func NewVoteHash() VoteHash { + return VoteHash{sha1.New()} +} + +// Vote hashes all data written into VoteHash and returns the resulting Vote. +func (v VoteHash) Vote() (Vote, error) { + return VoteFromHash(v.Sum(nil)) +} diff --git a/internal/gitaly/transaction/vote_test.go b/internal/gitaly/transaction/vote_test.go new file mode 100644 index 000000000..36af20a09 --- /dev/null +++ b/internal/gitaly/transaction/vote_test.go @@ -0,0 +1,57 @@ +package transaction + +import ( + "bytes" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVoteFromHash(t *testing.T) { + _, err := VoteFromHash([]byte{}) + require.Error(t, err) + + _, err = VoteFromHash(bytes.Repeat([]byte{1}, voteSize-1)) + require.Equal(t, fmt.Errorf("invalid vote length %d", 19), err) + + _, err = VoteFromHash(bytes.Repeat([]byte{1}, voteSize+1)) + require.Equal(t, fmt.Errorf("invalid vote length %d", 21), err) + + vote, err := VoteFromHash(bytes.Repeat([]byte{1}, voteSize)) + require.NoError(t, err) + require.Equal(t, bytes.Repeat([]byte{1}, voteSize), vote.Bytes()) +} + +func TestVoteFromData(t *testing.T) { + require.Equal(t, Vote{ + 0xda, 0x39, 0xa3, 0xee, 0x5e, 0x6b, 0x4b, 0x0d, 0x32, 0x55, + 0xbf, 0xef, 0x95, 0x60, 0x18, 0x90, 0xaf, 0xd8, 0x07, 0x09, + }, VoteFromData([]byte{})) + + require.Equal(t, Vote{ + 0x88, 0x43, 0xd7, 0xf9, 0x24, 0x16, 0x21, 0x1d, 0xe9, 0xeb, + 0xb9, 0x63, 0xff, 0x4c, 0xe2, 0x81, 0x25, 0x93, 0x28, 0x78, + }, VoteFromData([]byte("foobar"))) +} + +func TestVoteHash(t *testing.T) { + hash := NewVoteHash() + + vote, err := hash.Vote() + require.NoError(t, err) + require.Equal(t, VoteFromData([]byte{}), vote) + + _, err = hash.Write([]byte("foo")) + require.NoError(t, err) + vote, err = hash.Vote() + require.NoError(t, err) + require.Equal(t, VoteFromData([]byte("foo")), vote) + + _, err = hash.Write([]byte("bar")) + require.NoError(t, err) + + vote, err = hash.Vote() + require.NoError(t, err) + require.Equal(t, VoteFromData([]byte("foobar")), vote) +} diff --git a/internal/middleware/commandstatshandler/commandstatshandler_test.go b/internal/middleware/commandstatshandler/commandstatshandler_test.go index 177b02a45..740929ba9 100644 --- a/internal/middleware/commandstatshandler/commandstatshandler_test.go +++ b/internal/middleware/commandstatshandler/commandstatshandler_test.go @@ -11,9 +11,11 @@ import ( grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/backchannel" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ref" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -44,7 +46,7 @@ func createNewServer(t *testing.T, cfg config.Cfg) *grpc.Server { server := grpc.NewServer(opts...) - gitalypb.RegisterRefServiceServer(server, ref.NewServer(cfg, config.NewLocator(cfg), git.NewExecCommandFactory(cfg))) + gitalypb.RegisterRefServiceServer(server, ref.NewServer(cfg, config.NewLocator(cfg), git.NewExecCommandFactory(cfg), transaction.NewManager(cfg, backchannel.NewRegistry()))) return server } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index e18cc5eda..e8792a6bc 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -1065,7 +1065,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) { gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer(gitaly_config.Config, locator, gitCmdFactory)) gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(gitaly_config.Config, RubyServer, locator, gitCmdFactory)) gitalypb.RegisterSSHServiceServer(svr, ssh.NewServer(gitaly_config.Config, locator, gitCmdFactory)) - gitalypb.RegisterRefServiceServer(svr, ref.NewServer(gitaly_config.Config, locator, gitCmdFactory)) + gitalypb.RegisterRefServiceServer(svr, ref.NewServer(gitaly_config.Config, locator, gitCmdFactory, txManager)) gitalypb.RegisterHookServiceServer(svr, hook.NewServer(gitaly_config.Config, hookManager, gitCmdFactory)) healthpb.RegisterHealthServer(svr, health.NewServer()) reflection.Register(svr) |