Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-04-09 14:13:01 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-04-09 14:13:01 +0300
commit2ff8bb5b69e7024607e6820a5c6aed1c7033e45e (patch)
tree749206af0762875fd674e861aeed613e2f050c6f
parent23f6e1516567cdb916f00995e7ed43e4dff68809 (diff)
parentd7d7f619ff97ece46d912c8be0a35a93d7a80c73 (diff)
Merge branch 'pks-ref-delete-refs-voting' into 'master'
ref: Fix missing votes for `DeleteRefs()` RPC See merge request gitlab-org/gitaly!3347
-rw-r--r--changelogs/unreleased/pks-ref-delete-refs-voting.yml5
-rw-r--r--internal/git/updateref/updateref.go16
-rw-r--r--internal/git/updateref/updateref_test.go19
-rw-r--r--internal/gitaly/hook/referencetransaction.go2
-rw-r--r--internal/gitaly/hook/transactions.go32
-rw-r--r--internal/gitaly/hook/transactions_test.go22
-rw-r--r--internal/gitaly/service/operations/testhelper_test.go2
-rw-r--r--internal/gitaly/service/ref/delete_refs.go33
-rw-r--r--internal/gitaly/service/ref/delete_refs_test.go79
-rw-r--r--internal/gitaly/service/ref/server.go11
-rw-r--r--internal/gitaly/service/ref/testhelper_test.go2
-rw-r--r--internal/gitaly/service/register.go2
-rw-r--r--internal/gitaly/service/remote/fetch_internal_remote_test.go5
-rw-r--r--internal/gitaly/service/repository/apply_gitattributes.go8
-rw-r--r--internal/gitaly/transaction/manager.go48
-rw-r--r--internal/gitaly/transaction/manager_test.go6
-rw-r--r--internal/gitaly/transaction/mock.go30
-rw-r--r--internal/gitaly/transaction/vote.go52
-rw-r--r--internal/gitaly/transaction/vote_test.go57
-rw-r--r--internal/middleware/commandstatshandler/commandstatshandler_test.go4
-rw-r--r--internal/praefect/replicator_test.go2
21 files changed, 366 insertions, 71 deletions
diff --git a/changelogs/unreleased/pks-ref-delete-refs-voting.yml b/changelogs/unreleased/pks-ref-delete-refs-voting.yml
new file mode 100644
index 000000000..8be7e6c61
--- /dev/null
+++ b/changelogs/unreleased/pks-ref-delete-refs-voting.yml
@@ -0,0 +1,5 @@
+---
+title: 'ref: Fix missing votes for `DeleteRefs()` RPC'
+merge_request: 3347
+author:
+type: fixed
diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go
index fc7779851..c80f1764e 100644
--- a/internal/git/updateref/updateref.go
+++ b/internal/git/updateref/updateref.go
@@ -70,7 +70,9 @@ func New(ctx context.Context, conf config.Cfg, gitCmdFactory git.CommandFactory,
// transactional behaviour. Which effectively means that without an
// explicit "commit", no changes will be inadvertently committed to
// disk.
- fmt.Fprintf(cmd, "start\x00")
+ if _, err := cmd.Write([]byte("start\x00")); err != nil {
+ return nil, err
+ }
return &Updater{repo: repo, cmd: cmd, stderr: &stderr}, nil
}
@@ -94,9 +96,19 @@ func (u *Updater) Delete(reference git.ReferenceName) error {
return err
}
+// Prepare prepares the reference transaction by locking all references and determining their
+// current values. The updates are not yet committed and will be rolled back in case there is no
+// call to `Wait()`. This call is optional.
+func (u *Updater) Prepare() error {
+ _, err := fmt.Fprintf(u.cmd, "prepare\x00")
+ return err
+}
+
// Wait applies the commands specified in other calls to the Updater
func (u *Updater) Wait() error {
- fmt.Fprintf(u.cmd, "commit\x00")
+ if _, err := u.cmd.Write([]byte("commit\x00")); err != nil {
+ return err
+ }
if err := u.cmd.Wait(); err != nil {
return fmt.Errorf("git update-ref: %v, stderr: %q", err, u.stderr)
diff --git a/internal/git/updateref/updateref_test.go b/internal/git/updateref/updateref_test.go
index 8da1d1664..d439e444a 100644
--- a/internal/git/updateref/updateref_test.go
+++ b/internal/git/updateref/updateref_test.go
@@ -81,6 +81,7 @@ func TestUpdate(t *testing.T) {
require.NotEqual(t, commit.Id, sha, "%s points to HEAD: %s in the test repository", ref.String(), sha)
require.NoError(t, updater.Update(ref, sha, ""))
+ require.NoError(t, updater.Prepare())
require.NoError(t, updater.Wait())
// check the ref was updated
@@ -115,6 +116,24 @@ func TestDelete(t *testing.T) {
require.Equal(t, localrepo.ErrObjectNotFound, err, "expected 'not found' error got %v", err)
}
+func TestUpdater_prepareLocksTransaction(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ _, repo, updater := setupUpdater(t, ctx)
+
+ commit, logErr := repo.ReadCommit(ctx, "refs/heads/master")
+ require.NoError(t, logErr)
+
+ require.NoError(t, updater.Update("refs/heads/feature", commit.Id, ""))
+ require.NoError(t, updater.Prepare())
+ require.NoError(t, updater.Update("refs/heads/feature", commit.Id, ""))
+
+ err := updater.Wait()
+ require.Error(t, err, "cannot update after prepare")
+ require.Contains(t, err.Error(), "fatal: prepared transactions can only be closed")
+}
+
func TestBulkOperation(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/gitaly/hook/referencetransaction.go b/internal/gitaly/hook/referencetransaction.go
index 10475c251..1028ab367 100644
--- a/internal/gitaly/hook/referencetransaction.go
+++ b/internal/gitaly/hook/referencetransaction.go
@@ -60,7 +60,7 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state
hash := sha1.Sum(changes)
- if err := m.voteOnTransaction(ctx, hash[:], payload); err != nil {
+ if err := m.voteOnTransaction(ctx, hash, payload); err != nil {
return fmt.Errorf("error voting on transaction: %w", err)
}
diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go
index 4b5ed2266..8144cd9c9 100644
--- a/internal/gitaly/hook/transactions.go
+++ b/internal/gitaly/hook/transactions.go
@@ -3,28 +3,12 @@ package hook
import (
"context"
"errors"
- "fmt"
- "time"
"gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
)
-const (
- // transactionTimeout is the timeout used for all transactional
- // actions like voting and stopping of transactions. This timeout is
- // quite high: usually, a transaction should finish in at most a few
- // milliseconds. There are cases though where it may take a lot longer,
- // like when executing logic on the primary node only: the primary's
- // vote will be delayed until that logic finishes while secondaries are
- // waiting for the primary to cast its vote on the transaction. Given
- // that the primary-only logic's execution time scales with repository
- // size for the access checks and that it is potentially even unbounded
- // due to custom hooks, we thus use a high timeout. It shouldn't
- // normally be hit, but if it is hit then it indicates a real problem.
- transactionTimeout = 5 * time.Minute
-)
-
func isPrimary(payload git.HooksPayload) bool {
if payload.Transaction == nil {
return true
@@ -45,24 +29,14 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git.
if payload.Praefect == nil {
return errors.New("transaction without Praefect server")
}
-
- transactionCtx, cancel := context.WithTimeout(ctx, transactionTimeout)
- defer cancel()
-
- if err := handler(transactionCtx, *payload.Transaction, *payload.Praefect); err != nil {
- // Add some additional context to cancellation errors so that
- // we know which of the contexts got canceled.
- if errors.Is(err, context.Canceled) && errors.Is(transactionCtx.Err(), context.Canceled) && ctx.Err() == nil {
- return fmt.Errorf("transaction timeout %s exceeded: %w", transactionTimeout, err)
- }
-
+ if err := handler(ctx, *payload.Transaction, *payload.Praefect); err != nil {
return err
}
return nil
}
-func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, payload git.HooksPayload) error {
+func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash transaction.Vote, payload git.HooksPayload) error {
return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
return m.txManager.Vote(ctx, tx, praefect, hash)
})
diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go
index 97c4ec469..624618d2a 100644
--- a/internal/gitaly/hook/transactions_test.go
+++ b/internal/gitaly/hook/transactions_test.go
@@ -13,24 +13,12 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
)
-type mockTransactionManager struct {
- vote func(context.Context, metadata.Transaction, metadata.PraefectServer, []byte) error
- stop func(context.Context, metadata.Transaction, metadata.PraefectServer) error
-}
-
-func (m *mockTransactionManager) Vote(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote []byte) error {
- return m.vote(ctx, tx, praefect, vote)
-}
-
-func (m *mockTransactionManager) Stop(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
- return m.stop(ctx, tx, praefect)
-}
-
func TestHookManager_stopCalled(t *testing.T) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t)
@@ -43,7 +31,7 @@ func TestHookManager_stopCalled(t *testing.T) {
Token: "foo",
}
- var mockTxMgr mockTransactionManager
+ var mockTxMgr transaction.MockManager
hookManager := NewManager(config.NewLocator(cfg), &mockTxMgr, GitlabAPIStub, cfg)
hooksPayload, err := git.NewHooksPayload(
@@ -113,7 +101,7 @@ func TestHookManager_stopCalled(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
wasInvoked := false
- mockTxMgr.stop = func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
+ mockTxMgr.StopFn = func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
require.Equal(t, expectedTx, tx)
require.Equal(t, expectedPraefect, praefect)
wasInvoked = true
@@ -130,8 +118,8 @@ func TestHookManager_stopCalled(t *testing.T) {
func TestHookManager_contextCancellationCancelsVote(t *testing.T) {
cfg, repo, _ := testcfg.BuildWithRepo(t)
- mockTxMgr := mockTransactionManager{
- vote: func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote []byte) error {
+ mockTxMgr := transaction.MockManager{
+ VoteFn: func(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote transaction.Vote) error {
<-ctx.Done()
return fmt.Errorf("mock error: %s", ctx.Err())
},
diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go
index 7803f5a9d..47030d29a 100644
--- a/internal/gitaly/service/operations/testhelper_test.go
+++ b/internal/gitaly/service/operations/testhelper_test.go
@@ -90,7 +90,7 @@ func runOperationServiceServer(t *testing.T) (string, func()) {
gitalypb.RegisterOperationServiceServer(srv.GrpcServer(), server)
gitalypb.RegisterHookServiceServer(srv.GrpcServer(), hook.NewServer(config.Config, hookManager, gitCmdFactory))
gitalypb.RegisterRepositoryServiceServer(srv.GrpcServer(), repository.NewServer(config.Config, RubyServer, locator, txManager, gitCmdFactory))
- gitalypb.RegisterRefServiceServer(srv.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory))
+ gitalypb.RegisterRefServiceServer(srv.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory, txManager))
gitalypb.RegisterCommitServiceServer(srv.GrpcServer(), commit.NewServer(config.Config, locator, gitCmdFactory, nil))
gitalypb.RegisterSSHServiceServer(srv.GrpcServer(), ssh.NewServer(config.Config, locator, gitCmdFactory))
reflection.Register(srv.GrpcServer())
diff --git a/internal/gitaly/service/ref/delete_refs.go b/internal/gitaly/service/ref/delete_refs.go
index 3db189171..f4aef41d0 100644
--- a/internal/gitaly/service/ref/delete_refs.go
+++ b/internal/gitaly/service/ref/delete_refs.go
@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/internal/git/updateref"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc/codes"
@@ -20,23 +21,45 @@ func (s *server) DeleteRefs(ctx context.Context, in *gitalypb.DeleteRefsRequest)
return nil, status.Errorf(codes.InvalidArgument, "DeleteRefs: %v", err)
}
- updater, err := updateref.New(ctx, s.cfg, s.gitCmdFactory, in.GetRepository())
+ refnames, err := s.refsToRemove(ctx, in)
if err != nil {
- if errors.Is(err, git.ErrInvalidArg) {
- return nil, helper.ErrInvalidArgument(err)
- }
return nil, helper.ErrInternal(err)
}
- refnames, err := s.refsToRemove(ctx, in)
+ updater, err := updateref.New(ctx, s.cfg, s.gitCmdFactory, in.GetRepository())
if err != nil {
+ if errors.Is(err, git.ErrInvalidArg) {
+ return nil, helper.ErrInvalidArgument(err)
+ }
return nil, helper.ErrInternal(err)
}
+ voteHash := transaction.NewVoteHash()
for _, ref := range refnames {
if err := updater.Delete(ref); err != nil {
return &gitalypb.DeleteRefsResponse{GitError: err.Error()}, nil
}
+
+ if _, err := voteHash.Write([]byte(ref.String() + "\n")); err != nil {
+ return nil, helper.ErrInternalf("could not update vote hash: %v", err)
+ }
+ }
+
+ if err := updater.Prepare(); err != nil {
+ return nil, helper.ErrInternalf("could not prepare ref update: %v", err)
+ }
+
+ vote, err := voteHash.Vote()
+ if err != nil {
+ return nil, helper.ErrInternalf("could not compute vote: %v", err)
+ }
+
+ // All deletes we're doing in this RPC are force deletions. Because we're required to filter
+ // out transactions which only consist of force deletions, we never do any voting via the
+ // reference-transaction hook here. Instead, we need to resort to a manual vote which is
+ // simply the concatenation of all reference we're about to delete.
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ return nil, helper.ErrInternal(err)
}
if err := updater.Wait(); err != nil {
diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go
index 80c16ff89..09585920b 100644
--- a/internal/gitaly/service/ref/delete_refs_test.go
+++ b/internal/gitaly/service/ref/delete_refs_test.go
@@ -1,6 +1,7 @@
package ref
import (
+ "context"
"testing"
"github.com/stretchr/testify/assert"
@@ -8,7 +9,14 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/hook"
+ hookservice "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/hook"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc/codes"
)
@@ -69,6 +77,77 @@ func TestSuccessfulDeleteRefs(t *testing.T) {
}
}
+func TestDeleteRefs_transaction(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ testhelper.ConfigureGitalyHooksBin(t, cfg)
+
+ var votes int
+ txManager := &transaction.MockManager{
+ VoteFn: func(context.Context, metadata.Transaction, metadata.PraefectServer, transaction.Vote) error {
+ votes++
+ return nil
+ },
+ }
+
+ locator := config.NewLocator(cfg)
+ hookManager := hook.NewManager(locator, txManager, hook.GitlabAPIStub, cfg)
+ gitCmdFactory := git.NewExecCommandFactory(cfg)
+
+ srv := testhelper.NewServer(t, nil, nil, testhelper.WithInternalSocket(cfg))
+ gitalypb.RegisterRefServiceServer(srv.GrpcServer(), NewServer(cfg, locator, gitCmdFactory, txManager))
+ gitalypb.RegisterHookServiceServer(srv.GrpcServer(), hookservice.NewServer(cfg, hookManager, gitCmdFactory))
+ srv.Start(t)
+ t.Cleanup(srv.Stop)
+
+ client, conn := newRefServiceClient(t, "unix://"+srv.Socket())
+ t.Cleanup(func() { conn.Close() })
+
+ ctx, cancel := testhelper.Context()
+ t.Cleanup(cancel)
+
+ ctx, err := metadata.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
+ ctx, err = (&metadata.PraefectServer{SocketPath: "i-dont-care"}).Inject(ctx)
+ require.NoError(t, err)
+ ctx = helper.IncomingToOutgoing(ctx)
+
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.DeleteRefsRequest
+ expectedVotes int
+ }{
+ {
+ desc: "delete nothing",
+ request: &gitalypb.DeleteRefsRequest{
+ ExceptWithPrefix: [][]byte{[]byte("refs/")},
+ },
+ expectedVotes: 1,
+ },
+ {
+ desc: "delete all refs",
+ request: &gitalypb.DeleteRefsRequest{
+ ExceptWithPrefix: [][]byte{[]byte("nonexisting/prefix/")},
+ },
+ expectedVotes: 1,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ votes = 0
+
+ repo, _, cleanup := gittest.CloneRepoAtStorage(t, cfg.Storages[0], tc.desc)
+ t.Cleanup(cleanup)
+ tc.request.Repository = repo
+
+ response, err := client.DeleteRefs(ctx, tc.request)
+ require.NoError(t, err)
+ require.Empty(t, response.GitError)
+
+ require.Equal(t, tc.expectedVotes, votes)
+ })
+ }
+}
+
func TestFailedDeleteRefsRequestDueToGitError(t *testing.T) {
_, repo, _, client := setupRefService(t)
diff --git a/internal/gitaly/service/ref/server.go b/internal/gitaly/service/ref/server.go
index 610e98e0f..7c0da1771 100644
--- a/internal/gitaly/service/ref/server.go
+++ b/internal/gitaly/service/ref/server.go
@@ -3,17 +3,24 @@ package ref
import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
type server struct {
cfg config.Cfg
+ txManager transaction.Manager
locator storage.Locator
gitCmdFactory git.CommandFactory
}
// NewServer creates a new instance of a grpc RefServer
-func NewServer(cfg config.Cfg, locator storage.Locator, gitCmdFactory git.CommandFactory) gitalypb.RefServiceServer {
- return &server{cfg: cfg, locator: locator, gitCmdFactory: gitCmdFactory}
+func NewServer(cfg config.Cfg, locator storage.Locator, gitCmdFactory git.CommandFactory, txManager transaction.Manager) gitalypb.RefServiceServer {
+ return &server{
+ cfg: cfg,
+ txManager: txManager,
+ locator: locator,
+ gitCmdFactory: gitCmdFactory,
+ }
}
diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go
index 1b9597b18..81ed79fdb 100644
--- a/internal/gitaly/service/ref/testhelper_test.go
+++ b/internal/gitaly/service/ref/testhelper_test.go
@@ -77,7 +77,7 @@ func runRefServiceServer(t testing.TB, cfg config.Cfg) string {
gitCmdFactory := git.NewExecCommandFactory(cfg)
srv := testhelper.NewServer(t, nil, nil, testhelper.WithInternalSocket(cfg))
- gitalypb.RegisterRefServiceServer(srv.GrpcServer(), NewServer(cfg, locator, gitCmdFactory))
+ gitalypb.RegisterRefServiceServer(srv.GrpcServer(), NewServer(cfg, locator, gitCmdFactory, txManager))
gitalypb.RegisterHookServiceServer(srv.GrpcServer(), hookservice.NewServer(cfg, hookManager, gitCmdFactory))
srv.Start(t)
t.Cleanup(srv.Stop)
diff --git a/internal/gitaly/service/register.go b/internal/gitaly/service/register.go
index 7b9934bf2..9941c5887 100644
--- a/internal/gitaly/service/register.go
+++ b/internal/gitaly/service/register.go
@@ -77,7 +77,7 @@ func RegisterAll(
gitalypb.RegisterDiffServiceServer(grpcServer, diff.NewServer(cfg, locator, gitCmdFactory))
gitalypb.RegisterNamespaceServiceServer(grpcServer, namespace.NewServer(locator))
gitalypb.RegisterOperationServiceServer(grpcServer, operations.NewServer(cfg, rubyServer, hookManager, locator, conns, gitCmdFactory))
- gitalypb.RegisterRefServiceServer(grpcServer, ref.NewServer(cfg, locator, gitCmdFactory))
+ gitalypb.RegisterRefServiceServer(grpcServer, ref.NewServer(cfg, locator, gitCmdFactory, txManager))
gitalypb.RegisterRepositoryServiceServer(grpcServer, repository.NewServer(cfg, rubyServer, locator, txManager, gitCmdFactory))
gitalypb.RegisterSSHServiceServer(grpcServer, ssh.NewServer(
cfg,
diff --git a/internal/gitaly/service/remote/fetch_internal_remote_test.go b/internal/gitaly/service/remote/fetch_internal_remote_test.go
index b34dda6e8..5f537dcee 100644
--- a/internal/gitaly/service/remote/fetch_internal_remote_test.go
+++ b/internal/gitaly/service/remote/fetch_internal_remote_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
@@ -15,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ref"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/service/remote"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ssh"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/storage"
@@ -71,13 +73,14 @@ func testSuccessfulFetchInternalRemote(t *testing.T, ctx context.Context) {
locator := config.NewLocator(config.Config)
gitCmdFactory := git.NewExecCommandFactory(config.Config)
+ txManager := transaction.NewManager(config.Config, backchannel.NewRegistry())
hookManager := &mockHookManager{}
gitaly0Server := testhelper.NewServer(t, nil, nil,
testhelper.WithStorages([]string{"gitaly-0"}),
testhelper.WithInternalSocket(config.Config),
)
gitalypb.RegisterSSHServiceServer(gitaly0Server.GrpcServer(), ssh.NewServer(config.Config, locator, gitCmdFactory))
- gitalypb.RegisterRefServiceServer(gitaly0Server.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory))
+ gitalypb.RegisterRefServiceServer(gitaly0Server.GrpcServer(), ref.NewServer(config.Config, locator, gitCmdFactory, txManager))
gitalypb.RegisterHookServiceServer(gitaly0Server.GrpcServer(), hook.NewServer(config.Config, hookManager, gitCmdFactory))
reflection.Register(gitaly0Server.GrpcServer())
gitaly0Server.Start(t)
diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go
index 6f7190b26..7a876fb81 100644
--- a/internal/gitaly/service/repository/apply_gitattributes.go
+++ b/internal/gitaly/service/repository/apply_gitattributes.go
@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc/codes"
@@ -107,7 +108,12 @@ func (s *server) vote(ctx context.Context, oid git.ObjectID) error {
return fmt.Errorf("vote with invalid object ID: %w", err)
}
- if err := s.txManager.Vote(ctx, tx, *praefect, hash); err != nil {
+ vote, err := transaction.VoteFromHash(hash)
+ if err != nil {
+ return fmt.Errorf("cannot convert OID to vote: %w", err)
+ }
+
+ if err := s.txManager.Vote(ctx, tx, *praefect, vote); err != nil {
return fmt.Errorf("vote failed: %w", err)
}
diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go
index 906515c29..0c12ef959 100644
--- a/internal/gitaly/transaction/manager.go
+++ b/internal/gitaly/transaction/manager.go
@@ -4,6 +4,8 @@ import (
"context"
"encoding/hex"
"errors"
+ "fmt"
+ "time"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
@@ -15,6 +17,21 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+const (
+ // transactionTimeout is the timeout used for all transactional
+ // actions like voting and stopping of transactions. This timeout is
+ // quite high: usually, a transaction should finish in at most a few
+ // milliseconds. There are cases though where it may take a lot longer,
+ // like when executing logic on the primary node only: the primary's
+ // vote will be delayed until that logic finishes while secondaries are
+ // waiting for the primary to cast its vote on the transaction. Given
+ // that the primary-only logic's execution time scales with repository
+ // size for the access checks and that it is potentially even unbounded
+ // due to custom hooks, we thus use a high timeout. It shouldn't
+ // normally be hit, but if it is hit then it indicates a real problem.
+ transactionTimeout = 5 * time.Minute
+)
+
var (
// ErrTransactionAborted indicates a transaction was aborted, either
// because it timed out or because the vote failed to reach quorum.
@@ -30,7 +47,7 @@ var (
type Manager interface {
// Vote casts a vote on the given transaction which is hosted by the
// given Praefect server.
- Vote(context.Context, metadata.Transaction, metadata.PraefectServer, []byte) error
+ Vote(context.Context, metadata.Transaction, metadata.PraefectServer, Vote) error
// Stop gracefully stops the given transaction which is hosted by the
// given Praefect server.
@@ -86,7 +103,7 @@ func (m *PoolManager) getTransactionClient(ctx context.Context, server metadata.
// Vote connects to the given server and casts hash as a vote for the
// transaction identified by tx.
-func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer, hash []byte) error {
+func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server metadata.PraefectServer, hash Vote) error {
client, err := m.getTransactionClient(ctx, server)
if err != nil {
return err
@@ -95,17 +112,26 @@ func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server
logger := m.log(ctx).WithFields(logrus.Fields{
"transaction.id": tx.ID,
"transaction.voter": tx.Node,
- "transaction.hash": hex.EncodeToString(hash),
+ "transaction.hash": hex.EncodeToString(hash.Bytes()),
})
defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration()
- response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ transactionCtx, cancel := context.WithTimeout(ctx, transactionTimeout)
+ defer cancel()
+
+ response, err := client.VoteTransaction(transactionCtx, &gitalypb.VoteTransactionRequest{
TransactionId: tx.ID,
Node: tx.Node,
- ReferenceUpdatesHash: hash,
+ ReferenceUpdatesHash: hash.Bytes(),
})
if err != nil {
+ // Add some additional context to cancellation errors so that
+ // we know which of the contexts got canceled.
+ if errors.Is(err, context.Canceled) && errors.Is(transactionCtx.Err(), context.Canceled) && ctx.Err() == nil {
+ return fmt.Errorf("transaction timeout %s exceeded: %w", transactionTimeout, err)
+ }
+
logger.WithError(err).Error("vote failed")
return err
}
@@ -148,3 +174,15 @@ func (m *PoolManager) Stop(ctx context.Context, tx metadata.Transaction, server
func (m *PoolManager) log(ctx context.Context) logrus.FieldLogger {
return ctxlogrus.Extract(ctx).WithField("component", "transaction.PoolManager")
}
+
+// VoteOnContext casts the vote on a transaction identified by the context, if there is any.
+func VoteOnContext(ctx context.Context, m Manager, vote Vote) error {
+ transaction, praefect, err := metadata.TransactionMetadataFromContext(ctx)
+ if err != nil {
+ return err
+ }
+ if transaction == nil {
+ return nil
+ }
+ return m.Vote(ctx, *transaction, *praefect, vote)
+}
diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go
index c9b5b44dd..fb14b08cc 100644
--- a/internal/gitaly/transaction/manager_test.go
+++ b/internal/gitaly/transaction/manager_test.go
@@ -49,7 +49,7 @@ func TestPoolManager_Vote(t *testing.T) {
for _, tc := range []struct {
desc string
transaction metadata.Transaction
- vote []byte
+ vote Vote
voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
expectedErr error
}{
@@ -59,11 +59,11 @@ func TestPoolManager_Vote(t *testing.T) {
ID: 1,
Node: "node",
},
- vote: []byte("foobar"),
+ vote: VoteFromData([]byte("foobar")),
voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
require.Equal(t, uint64(1), request.TransactionId)
require.Equal(t, "node", request.Node)
- require.Equal(t, request.ReferenceUpdatesHash, []byte("foobar"))
+ require.Equal(t, request.ReferenceUpdatesHash, VoteFromData([]byte("foobar")).Bytes())
return &gitalypb.VoteTransactionResponse{
State: gitalypb.VoteTransactionResponse_COMMIT,
diff --git a/internal/gitaly/transaction/mock.go b/internal/gitaly/transaction/mock.go
new file mode 100644
index 000000000..1f617909d
--- /dev/null
+++ b/internal/gitaly/transaction/mock.go
@@ -0,0 +1,30 @@
+package transaction
+
+import (
+ "context"
+ "errors"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
+)
+
+// MockManager is a mock Manager for use in tests.
+type MockManager struct {
+ VoteFn func(context.Context, metadata.Transaction, metadata.PraefectServer, Vote) error
+ StopFn func(context.Context, metadata.Transaction, metadata.PraefectServer) error
+}
+
+// Vote calls the MockManager's Vote function, if set. Otherwise, it returns an error.
+func (m *MockManager) Vote(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer, vote Vote) error {
+ if m.VoteFn == nil {
+ return errors.New("mock does not implement Vote function")
+ }
+ return m.VoteFn(ctx, tx, praefect, vote)
+}
+
+// Stop calls the MockManager's Stop function, if set. Otherwise, it returns an error.
+func (m *MockManager) Stop(ctx context.Context, tx metadata.Transaction, praefect metadata.PraefectServer) error {
+ if m.StopFn == nil {
+ return errors.New("mock does not implement Stop function")
+ }
+ return m.StopFn(ctx, tx, praefect)
+}
diff --git a/internal/gitaly/transaction/vote.go b/internal/gitaly/transaction/vote.go
new file mode 100644
index 000000000..a205a3785
--- /dev/null
+++ b/internal/gitaly/transaction/vote.go
@@ -0,0 +1,52 @@
+package transaction
+
+import (
+ "crypto/sha1"
+ "fmt"
+ "hash"
+)
+
+const (
+ // voteSize is the number of bytes of a vote.
+ voteSize = sha1.Size
+)
+
+// Vote is a vote cast by a node.
+type Vote [voteSize]byte
+
+// Bytes returns a byte slice containing the hash.
+func (v Vote) Bytes() []byte {
+ return v[:]
+}
+
+// VoteFromHash converts the given byte slice containing a hash into a vote.
+func VoteFromHash(bytes []byte) (Vote, error) {
+ if len(bytes) != voteSize {
+ return Vote{}, fmt.Errorf("invalid vote length %d", len(bytes))
+ }
+
+ var vote Vote
+ copy(vote[:], bytes)
+
+ return vote, nil
+}
+
+// VoteFromData hashes the given data and converts it to a vote.
+func VoteFromData(data []byte) Vote {
+ return sha1.Sum(data)
+}
+
+// VoteHash is the hash structure used to compute a Vote from arbitrary data.
+type VoteHash struct {
+ hash.Hash
+}
+
+// NewVoteHash returns a new VoteHash.
+func NewVoteHash() VoteHash {
+ return VoteHash{sha1.New()}
+}
+
+// Vote hashes all data written into VoteHash and returns the resulting Vote.
+func (v VoteHash) Vote() (Vote, error) {
+ return VoteFromHash(v.Sum(nil))
+}
diff --git a/internal/gitaly/transaction/vote_test.go b/internal/gitaly/transaction/vote_test.go
new file mode 100644
index 000000000..36af20a09
--- /dev/null
+++ b/internal/gitaly/transaction/vote_test.go
@@ -0,0 +1,57 @@
+package transaction
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestVoteFromHash(t *testing.T) {
+ _, err := VoteFromHash([]byte{})
+ require.Error(t, err)
+
+ _, err = VoteFromHash(bytes.Repeat([]byte{1}, voteSize-1))
+ require.Equal(t, fmt.Errorf("invalid vote length %d", 19), err)
+
+ _, err = VoteFromHash(bytes.Repeat([]byte{1}, voteSize+1))
+ require.Equal(t, fmt.Errorf("invalid vote length %d", 21), err)
+
+ vote, err := VoteFromHash(bytes.Repeat([]byte{1}, voteSize))
+ require.NoError(t, err)
+ require.Equal(t, bytes.Repeat([]byte{1}, voteSize), vote.Bytes())
+}
+
+func TestVoteFromData(t *testing.T) {
+ require.Equal(t, Vote{
+ 0xda, 0x39, 0xa3, 0xee, 0x5e, 0x6b, 0x4b, 0x0d, 0x32, 0x55,
+ 0xbf, 0xef, 0x95, 0x60, 0x18, 0x90, 0xaf, 0xd8, 0x07, 0x09,
+ }, VoteFromData([]byte{}))
+
+ require.Equal(t, Vote{
+ 0x88, 0x43, 0xd7, 0xf9, 0x24, 0x16, 0x21, 0x1d, 0xe9, 0xeb,
+ 0xb9, 0x63, 0xff, 0x4c, 0xe2, 0x81, 0x25, 0x93, 0x28, 0x78,
+ }, VoteFromData([]byte("foobar")))
+}
+
+func TestVoteHash(t *testing.T) {
+ hash := NewVoteHash()
+
+ vote, err := hash.Vote()
+ require.NoError(t, err)
+ require.Equal(t, VoteFromData([]byte{}), vote)
+
+ _, err = hash.Write([]byte("foo"))
+ require.NoError(t, err)
+ vote, err = hash.Vote()
+ require.NoError(t, err)
+ require.Equal(t, VoteFromData([]byte("foo")), vote)
+
+ _, err = hash.Write([]byte("bar"))
+ require.NoError(t, err)
+
+ vote, err = hash.Vote()
+ require.NoError(t, err)
+ require.Equal(t, VoteFromData([]byte("foobar")), vote)
+}
diff --git a/internal/middleware/commandstatshandler/commandstatshandler_test.go b/internal/middleware/commandstatshandler/commandstatshandler_test.go
index 177b02a45..740929ba9 100644
--- a/internal/middleware/commandstatshandler/commandstatshandler_test.go
+++ b/internal/middleware/commandstatshandler/commandstatshandler_test.go
@@ -11,9 +11,11 @@ import (
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ref"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -44,7 +46,7 @@ func createNewServer(t *testing.T, cfg config.Cfg) *grpc.Server {
server := grpc.NewServer(opts...)
- gitalypb.RegisterRefServiceServer(server, ref.NewServer(cfg, config.NewLocator(cfg), git.NewExecCommandFactory(cfg)))
+ gitalypb.RegisterRefServiceServer(server, ref.NewServer(cfg, config.NewLocator(cfg), git.NewExecCommandFactory(cfg), transaction.NewManager(cfg, backchannel.NewRegistry())))
return server
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index e18cc5eda..e8792a6bc 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -1065,7 +1065,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) {
gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer(gitaly_config.Config, locator, gitCmdFactory))
gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(gitaly_config.Config, RubyServer, locator, gitCmdFactory))
gitalypb.RegisterSSHServiceServer(svr, ssh.NewServer(gitaly_config.Config, locator, gitCmdFactory))
- gitalypb.RegisterRefServiceServer(svr, ref.NewServer(gitaly_config.Config, locator, gitCmdFactory))
+ gitalypb.RegisterRefServiceServer(svr, ref.NewServer(gitaly_config.Config, locator, gitCmdFactory, txManager))
gitalypb.RegisterHookServiceServer(svr, hook.NewServer(gitaly_config.Config, hookManager, gitCmdFactory))
healthpb.RegisterHealthServer(svr, health.NewServer())
reflection.Register(svr)