Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarthik Nayak <knayak@gitlab.com>2023-12-07 20:52:42 +0300
committerKarthik Nayak <knayak@gitlab.com>2023-12-08 14:36:55 +0300
commit3c1fc525900d00291485a6f823455617e1264fb3 (patch)
treef90fc0955f4b7dce32b17691af694f10691e6d3f
parentc0b07ba36fc8fc40830f061cb55a5c951a166e1c (diff)
hook: Introduce `ProcReceiveHandler` and `ProcReceiveRegistry`
Introduce the `ProcReceiveHandler` which provides the mechanism for RPCs which invoked git-receive-pack(1) to interact with the proc-receive hook. It provides access to a list of references that a transaction is attempting to update, and functions to accept or reject individual updates. Also, introduce `ProcReceiveRegistry`, a registry which provides the proc-receive handlers against a provided transaction ID. The registry allows RPCs which perform commands that execute git-receive-pack(1) to hook into the proc-receive handler. The RPC must register itself with the registry by calling RegisterWaiter(), this provides a channel where the handler will be provided along with a registry cleanup function.
-rw-r--r--internal/cli/gitaly/serve.go11
-rw-r--r--internal/cli/gitaly/subcmd_check.go11
-rw-r--r--internal/gitaly/hook/manager.go31
-rw-r--r--internal/gitaly/hook/postreceive_test.go14
-rw-r--r--internal/gitaly/hook/prereceive_test.go15
-rw-r--r--internal/gitaly/hook/procreceive_handler.go196
-rw-r--r--internal/gitaly/hook/procreceive_handler_test.go260
-rw-r--r--internal/gitaly/hook/procreceive_registry.go103
-rw-r--r--internal/gitaly/hook/procreceive_registry_test.go146
-rw-r--r--internal/gitaly/hook/transactions_test.go4
-rw-r--r--internal/gitaly/hook/update_test.go4
-rw-r--r--internal/gitaly/server/auth_test.go2
-rw-r--r--internal/gitaly/service/hook/testhelper_test.go2
-rw-r--r--internal/gitaly/service/operations/merge_branch_test.go4
-rw-r--r--internal/testhelper/testserver/gitaly.go10
15 files changed, 783 insertions, 30 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 9db91a758..beee3407a 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -253,7 +253,16 @@ func run(cfg config.Cfg, logger log.Logger) error {
}
prometheus.MustRegister(gitlabClient)
- hookManager = hook.NewManager(cfg, locator, logger, gitCmdFactory, transactionManager, gitlabClient, hook.NewTransactionRegistry(txRegistry))
+ hookManager = hook.NewManager(
+ cfg,
+ locator,
+ logger,
+ gitCmdFactory,
+ transactionManager,
+ gitlabClient,
+ hook.NewTransactionRegistry(txRegistry),
+ hook.NewProcReceiveRegistry(),
+ )
}
conns := client.NewPool(
diff --git a/internal/cli/gitaly/subcmd_check.go b/internal/cli/gitaly/subcmd_check.go
index 5ca3419d1..7f1cbb397 100644
--- a/internal/cli/gitaly/subcmd_check.go
+++ b/internal/cli/gitaly/subcmd_check.go
@@ -74,5 +74,14 @@ func checkAPI(cfg config.Cfg, logger log.Logger) (*gitlab.CheckInfo, error) {
}
defer cleanup()
- return hook.NewManager(cfg, config.NewLocator(cfg), logger, gitCmdFactory, nil, gitlabAPI, hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry())).Check(context.Background())
+ return hook.NewManager(
+ cfg,
+ config.NewLocator(cfg),
+ logger,
+ gitCmdFactory,
+ nil,
+ gitlabAPI,
+ hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()),
+ hook.NewProcReceiveRegistry(),
+ ).Check(context.Background())
}
diff --git a/internal/gitaly/hook/manager.go b/internal/gitaly/hook/manager.go
index dd4b2a5d5..1cdd18e33 100644
--- a/internal/gitaly/hook/manager.go
+++ b/internal/gitaly/hook/manager.go
@@ -78,13 +78,14 @@ func NewTransactionRegistry(txRegistry *storagemgr.TransactionRegistry) Transact
// GitLabHookManager is a hook manager containing Git hook business logic. It
// uses the GitLab API to authenticate and track ongoing hook calls.
type GitLabHookManager struct {
- cfg config.Cfg
- locator storage.Locator
- logger log.Logger
- gitCmdFactory git.CommandFactory
- txManager transaction.Manager
- gitlabClient gitlab.Client
- txRegistry TransactionRegistry
+ cfg config.Cfg
+ locator storage.Locator
+ logger log.Logger
+ gitCmdFactory git.CommandFactory
+ txManager transaction.Manager
+ gitlabClient gitlab.Client
+ txRegistry TransactionRegistry
+ procReceiveRegistry *ProcReceiveRegistry
}
// NewManager returns a new hook manager
@@ -96,14 +97,16 @@ func NewManager(
txManager transaction.Manager,
gitlabClient gitlab.Client,
txRegistry TransactionRegistry,
+ procReceiveRegistry *ProcReceiveRegistry,
) *GitLabHookManager {
return &GitLabHookManager{
- cfg: cfg,
- locator: locator,
- logger: logger,
- gitCmdFactory: gitCmdFactory,
- txManager: txManager,
- gitlabClient: gitlabClient,
- txRegistry: txRegistry,
+ cfg: cfg,
+ locator: locator,
+ logger: logger,
+ gitCmdFactory: gitCmdFactory,
+ txManager: txManager,
+ gitlabClient: gitlabClient,
+ txRegistry: txRegistry,
+ procReceiveRegistry: procReceiveRegistry,
}
}
diff --git a/internal/gitaly/hook/postreceive_test.go b/internal/gitaly/hook/postreceive_test.go
index 808035831..31fc8948a 100644
--- a/internal/gitaly/hook/postreceive_test.go
+++ b/internal/gitaly/hook/postreceive_test.go
@@ -84,7 +84,7 @@ func TestPostReceive_customHook(t *testing.T) {
txManager := transaction.NewTrackingManager()
hookManager := NewManager(cfg, locator, testhelper.SharedLogger(t), gitCmdFactory, txManager, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
receiveHooksPayload := &git.UserDetails{
UserID: "1234",
@@ -381,7 +381,15 @@ func TestPostReceive_gitlab(t *testing.T) {
},
}
- hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()), &gitlabAPI, NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ hookManager := NewManager(
+ cfg,
+ config.NewLocator(cfg),
+ testhelper.SharedLogger(t),
+ gittest.NewCommandFactory(t, cfg),
+ transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()),
+ &gitlabAPI, NewTransactionRegistry(storagemgr.NewTransactionRegistry()),
+ NewProcReceiveRegistry(),
+ )
gittest.WriteCustomHook(t, repoPath, "post-receive", []byte("#!/bin/sh\necho hook called\n"))
@@ -419,7 +427,7 @@ func TestPostReceive_quarantine(t *testing.T) {
hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), nil, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
gittest.WriteCustomHook(t, repoPath, "post-receive", []byte(fmt.Sprintf(
`#!/bin/sh
diff --git a/internal/gitaly/hook/prereceive_test.go b/internal/gitaly/hook/prereceive_test.go
index 77f6bb4a1..1bfab9447 100644
--- a/internal/gitaly/hook/prereceive_test.go
+++ b/internal/gitaly/hook/prereceive_test.go
@@ -43,7 +43,7 @@ func TestPrereceive_customHooks(t *testing.T) {
txManager := transaction.NewTrackingManager()
hookManager := NewManager(cfg, locator, testhelper.SharedLogger(t), gitCmdFactory, txManager, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
receiveHooksPayload := &git.UserDetails{
UserID: "1234",
@@ -228,7 +228,7 @@ func TestPrereceive_quarantine(t *testing.T) {
hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), nil, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
//nolint:gitaly-linters
gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte(fmt.Sprintf(
@@ -419,7 +419,16 @@ func TestPrereceive_gitlab(t *testing.T) {
},
}
- hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()), &gitlabAPI, NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ hookManager := NewManager(
+ cfg,
+ config.NewLocator(cfg),
+ testhelper.SharedLogger(t),
+ gittest.NewCommandFactory(t, cfg),
+ transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()),
+ &gitlabAPI,
+ NewTransactionRegistry(storagemgr.NewTransactionRegistry()),
+ NewProcReceiveRegistry(),
+ )
gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte("#!/bin/sh\necho called\n"))
diff --git a/internal/gitaly/hook/procreceive_handler.go b/internal/gitaly/hook/procreceive_handler.go
new file mode 100644
index 000000000..943447442
--- /dev/null
+++ b/internal/gitaly/hook/procreceive_handler.go
@@ -0,0 +1,196 @@
+package hook
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+)
+
+type procReceiveHandler struct {
+ stdout io.Writer
+ doneCh chan<- struct{}
+ updates []ReferenceUpdate
+ transactionID storage.TransactionID
+ atomic bool
+}
+
+// NewProcReceiveHandler returns a ProcReceiveHandler implementation.
+// The function, returns the handler along with a channel which indicates completion
+// of the handlers usage.
+//
+// ProcReceiveHandler is used to intercept git-receive-pack(1)'s execute-commands
+// code. This allows us to intercept reference updates before writing to the
+// disk via the proc-receive hook (https://git-scm.com/docs/githooks#proc-receive).
+//
+// The handler is transmitted to RPCs which executed git-receive-pack(1), so they
+// can accept or reject individual reference updates.
+func NewProcReceiveHandler(env []string, stdin io.Reader, stdout io.Writer) (ProcReceiveHandler, <-chan struct{}, error) {
+ payload, err := git.HooksPayloadFromEnv(env)
+ if err != nil {
+ return nil, nil, fmt.Errorf("extracting hooks payload: %w", err)
+ }
+
+ // This hook only works when there is a transaction present.
+ if payload.TransactionID == 0 {
+ return nil, nil, fmt.Errorf("no transaction found in payload")
+ }
+
+ scanner := pktline.NewScanner(stdin)
+
+ // Version and feature negotiation.
+ if !scanner.Scan() {
+ return nil, nil, fmt.Errorf("expected version negotiation: %w", scanner.Err())
+ }
+
+ data, err := pktline.Payload(scanner.Bytes())
+ if err != nil {
+ return nil, nil, fmt.Errorf("receiving header: %w", err)
+ }
+
+ after, ok := bytes.CutPrefix(data, []byte("version=1\000"))
+ if !ok {
+ return nil, nil, fmt.Errorf("unsupported version: %s", data)
+ }
+ featureRequests := parseFeatureRequest(after)
+
+ if !scanner.Scan() {
+ return nil, nil, fmt.Errorf("expected flush: %w", scanner.Err())
+ }
+
+ if !pktline.IsFlush(scanner.Bytes()) {
+ return nil, nil, fmt.Errorf("expected pkt flush")
+ }
+
+ if _, err := pktline.WriteString(stdout, fmt.Sprintf("version=1\000%s", featureRequests)); err != nil {
+ return nil, nil, fmt.Errorf("writing version: %w", err)
+ }
+
+ if err := pktline.WriteFlush(stdout); err != nil {
+ return nil, nil, fmt.Errorf("flushing version: %w", err)
+ }
+
+ updates := []ReferenceUpdate{}
+ for scanner.Scan() {
+ line := scanner.Bytes()
+
+ // When all reference updates are transmitted, we expect a flush.
+ if pktline.IsFlush(line) {
+ break
+ }
+
+ data, err := pktline.Payload(line)
+ if err != nil {
+ return nil, nil, fmt.Errorf("receiving reference update: %w", err)
+ }
+
+ update, err := parseRefUpdate(data)
+ if err != nil {
+ return nil, nil, fmt.Errorf("parse reference update: %w", err)
+ }
+ updates = append(updates, update)
+ }
+
+ if err := scanner.Err(); err != nil {
+ return nil, nil, fmt.Errorf("parsing stdin: %w", err)
+ }
+
+ ch := make(chan struct{})
+
+ return &procReceiveHandler{
+ transactionID: payload.TransactionID,
+ atomic: featureRequests.atomic,
+ stdout: stdout,
+ updates: updates,
+ doneCh: ch,
+ }, ch, nil
+}
+
+// TransactionID provides the storage.TransactionID associated with the
+// handler.
+func (h *procReceiveHandler) TransactionID() storage.TransactionID {
+ return h.transactionID
+}
+
+// Atomic denotes whether the push was atomic.
+func (h *procReceiveHandler) Atomic() bool {
+ return h.atomic
+}
+
+// ReferenceUpdates provides the reference updates to be made.
+func (h *procReceiveHandler) ReferenceUpdates() []ReferenceUpdate {
+ return h.updates
+}
+
+// AcceptUpdate accepts a given reference update.
+func (h *procReceiveHandler) AcceptUpdate(referenceName git.ReferenceName) error {
+ if _, err := pktline.WriteString(h.stdout, fmt.Sprintf("ok %s", referenceName)); err != nil {
+ return fmt.Errorf("write ref %s ok: %w", referenceName, err)
+ }
+
+ return nil
+}
+
+// RejectUpdate rejects a given reference update with the given reason.
+func (h *procReceiveHandler) RejectUpdate(referenceName git.ReferenceName, reason string) error {
+ if _, err := pktline.WriteString(h.stdout, fmt.Sprintf("ng %s %s", referenceName, reason)); err != nil {
+ return fmt.Errorf("write ref %s ng: %w", referenceName, err)
+ }
+
+ return nil
+}
+
+// Close must be called to clean up the proc-receive hook.
+func (h *procReceiveHandler) Close() error {
+ defer close(h.doneCh)
+
+ if err := pktline.WriteFlush(h.stdout); err != nil {
+ return fmt.Errorf("flushing updates: %w", err)
+ }
+
+ return nil
+}
+
+func parseRefUpdate(data []byte) (ReferenceUpdate, error) {
+ var update ReferenceUpdate
+
+ split := bytes.Split(data, []byte(" "))
+ if len(split) != 3 {
+ return update, fmt.Errorf("unknown ref update format: %s", split)
+ }
+
+ update.Ref = git.ReferenceName(split[2])
+ update.OldOID = git.ObjectID(split[0])
+ update.NewOID = git.ObjectID(split[1])
+
+ return update, nil
+}
+
+type procReceiveFeatureRequests struct {
+ atomic bool
+}
+
+func (r *procReceiveFeatureRequests) String() string {
+ s := ""
+ if r.atomic {
+ s = "atomic"
+ }
+
+ return s
+}
+
+// parseFeatureRequest parses the features requested.
+func parseFeatureRequest(data []byte) *procReceiveFeatureRequests {
+ var featureRequests procReceiveFeatureRequests
+
+ for _, feature := range bytes.Split(data, []byte(" ")) {
+ if bytes.Equal(feature, []byte("atomic")) {
+ featureRequests.atomic = true
+ }
+ }
+
+ return &featureRequests
+}
diff --git a/internal/gitaly/hook/procreceive_handler_test.go b/internal/gitaly/hook/procreceive_handler_test.go
new file mode 100644
index 000000000..c038adfc0
--- /dev/null
+++ b/internal/gitaly/hook/procreceive_handler_test.go
@@ -0,0 +1,260 @@
+package hook
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+)
+
+func TestProcReceiveHandler(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+
+ repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ receiveHooksPayload := &git.UserDetails{
+ UserID: "1234",
+ Username: "user",
+ Protocol: "web",
+ }
+
+ payload, err := git.NewHooksPayload(
+ cfg,
+ repo,
+ gittest.DefaultObjectHash,
+ nil,
+ receiveHooksPayload,
+ git.PreReceiveHook,
+ featureflag.FromContext(ctx),
+ 1,
+ ).Env()
+ require.NoError(t, err)
+
+ type setupData struct {
+ env []string
+ ctx context.Context
+ stdin string
+ expectedErr error
+ expectedStdout string
+ expectedUpdates []ReferenceUpdate
+ expectedAtomic bool
+ handlerSteps func(handler ProcReceiveHandler) error
+ }
+
+ for _, tc := range []struct {
+ desc string
+ setup func(t *testing.T, ctx context.Context) setupData
+ }{
+ {
+ desc: "no payload",
+ setup: func(t *testing.T, ctx context.Context) setupData {
+ return setupData{
+ env: []string{},
+ ctx: ctx,
+ expectedErr: fmt.Errorf("extracting hooks payload: %w", errors.New("no hooks payload found in environment")),
+ }
+ },
+ },
+ {
+ desc: "invalid version",
+ setup: func(t *testing.T, ctx context.Context) setupData {
+ var stdin bytes.Buffer
+ _, err := pktline.WriteString(&stdin, "version=2")
+ require.NoError(t, err)
+
+ return setupData{
+ env: []string{payload},
+ ctx: ctx,
+ stdin: stdin.String(),
+ expectedErr: errors.New("unsupported version: version=2"),
+ }
+ },
+ },
+ {
+ desc: "single reference with atomic",
+ setup: func(t *testing.T, ctx context.Context) setupData {
+ var stdin bytes.Buffer
+ _, err := pktline.WriteString(&stdin, "version=1\000push-options atomic")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s",
+ gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main"))
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+
+ var stdout bytes.Buffer
+ _, err = pktline.WriteString(&stdout, "version=1\000atomic")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdout)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdout, "ok refs/heads/main")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdout)
+ require.NoError(t, err)
+
+ return setupData{
+ env: []string{payload},
+ ctx: ctx,
+ stdin: stdin.String(),
+ expectedStdout: stdout.String(),
+ expectedAtomic: true,
+ expectedUpdates: []ReferenceUpdate{
+ {
+ Ref: "refs/heads/main",
+ OldOID: gittest.DefaultObjectHash.ZeroOID,
+ NewOID: gittest.DefaultObjectHash.EmptyTreeOID,
+ },
+ },
+ handlerSteps: func(handler ProcReceiveHandler) error {
+ require.NoError(t, handler.AcceptUpdate("refs/heads/main"))
+ return handler.Close()
+ },
+ }
+ },
+ },
+ {
+ desc: "single reference without atomic",
+ setup: func(t *testing.T, ctx context.Context) setupData {
+ var stdin bytes.Buffer
+ _, err := pktline.WriteString(&stdin, "version=1\000push-options")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s",
+ gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main"))
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+
+ var stdout bytes.Buffer
+ _, err = pktline.WriteString(&stdout, "version=1\000")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdout)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdout, "ok refs/heads/main")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdout)
+ require.NoError(t, err)
+
+ return setupData{
+ env: []string{payload},
+ ctx: ctx,
+ stdin: stdin.String(),
+ expectedStdout: stdout.String(),
+ expectedUpdates: []ReferenceUpdate{
+ {
+ Ref: "refs/heads/main",
+ OldOID: gittest.DefaultObjectHash.ZeroOID,
+ NewOID: gittest.DefaultObjectHash.EmptyTreeOID,
+ },
+ },
+ handlerSteps: func(handler ProcReceiveHandler) error {
+ require.NoError(t, handler.AcceptUpdate("refs/heads/main"))
+ return handler.Close()
+ },
+ }
+ },
+ },
+ {
+ desc: "multiple references",
+ setup: func(t *testing.T, ctx context.Context) setupData {
+ var stdin bytes.Buffer
+ _, err := pktline.WriteString(&stdin, "version=1\000push-options")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s",
+ gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main"))
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s",
+ gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/branch"))
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+
+ var stdout bytes.Buffer
+ _, err = pktline.WriteString(&stdout, "version=1\000")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdout)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdout, "ok refs/heads/main")
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdout, "ng refs/heads/branch for fun")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdout)
+ require.NoError(t, err)
+
+ return setupData{
+ env: []string{payload},
+ ctx: ctx,
+ stdin: stdin.String(),
+ expectedStdout: stdout.String(),
+ expectedUpdates: []ReferenceUpdate{
+ {
+ Ref: "refs/heads/main",
+ OldOID: gittest.DefaultObjectHash.ZeroOID,
+ NewOID: gittest.DefaultObjectHash.EmptyTreeOID,
+ },
+ {
+ Ref: "refs/heads/branch",
+ OldOID: gittest.DefaultObjectHash.ZeroOID,
+ NewOID: gittest.DefaultObjectHash.EmptyTreeOID,
+ },
+ },
+ handlerSteps: func(handler ProcReceiveHandler) error {
+ require.NoError(t, handler.AcceptUpdate("refs/heads/main"))
+ require.NoError(t, handler.RejectUpdate("refs/heads/branch", "for fun"))
+ return handler.Close()
+ },
+ }
+ },
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ setup := tc.setup(t, ctx)
+
+ var stdout bytes.Buffer
+ handler, doneCh, err := NewProcReceiveHandler(setup.env, strings.NewReader(setup.stdin), &stdout)
+ if err != nil || setup.expectedErr != nil {
+ require.Equal(t, setup.expectedErr, err)
+ return
+ }
+
+ updates := handler.ReferenceUpdates()
+ require.Equal(t, setup.expectedUpdates, updates)
+
+ select {
+ case <-doneCh:
+ t.Fatal("done returned before handler called Close()")
+ default:
+ }
+
+ require.NoError(t, setup.handlerSteps(handler))
+ // When Close() is called, we must receive a confirmation.
+ <-doneCh
+
+ require.Equal(t, setup.expectedStdout, stdout.String())
+ })
+ }
+}
diff --git a/internal/gitaly/hook/procreceive_registry.go b/internal/gitaly/hook/procreceive_registry.go
new file mode 100644
index 000000000..b62fa6ae2
--- /dev/null
+++ b/internal/gitaly/hook/procreceive_registry.go
@@ -0,0 +1,103 @@
+package hook
+
+import (
+ "fmt"
+ "sync"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+)
+
+// ReferenceUpdate denotes a single reference update to be made.
+type ReferenceUpdate struct {
+ Ref git.ReferenceName
+ OldOID git.ObjectID
+ NewOID git.ObjectID
+}
+
+// ProcReceiveHandler provides the mechanism for RPCs which invoked
+// git-receive-pack(1) to interact with the proc-receive hook. It provides
+// access to a list of references that a transaction is attempting to update,
+// and functions to accept or reject individual updates.
+type ProcReceiveHandler interface {
+ // TransactionID provides the storage.TransactionID associated with the
+ // handler.
+ TransactionID() storage.TransactionID
+
+ // Atomic denotes whether the push was atomic.
+ Atomic() bool
+
+ // ReferenceUpdates provides the reference updates to be made.
+ ReferenceUpdates() []ReferenceUpdate
+
+ // AcceptUpdate tells the registry to accept a given reference update.
+ AcceptUpdate(referenceName git.ReferenceName) error
+ // RejectUpdate tells the registry to reject a given reference update, along
+ // with a reason.
+ RejectUpdate(referenceName git.ReferenceName, reason string) error
+
+ // Close must be called to clean up the proc-receive hook.
+ Close() error
+}
+
+// ProcReceiveRegistry is the registry which provides the proc-receive handlers
+// (https://git-scm.com/docs/githooks#proc-receive) against a provided transaction ID.
+//
+// The registry allows RPCs which perform commands that execute git-receive-pack(1)
+// to hook into the proc-receive handler. The RPC must register itself with the
+// registry by calling RegisterWaiter(), this provides a channel where the handler
+// will be provided along with a registry cleanup function.
+//
+// When the handler for the associated transaction ID is added to the registry
+// via the Transmit() function, it will be propagated to the channel.
+type ProcReceiveRegistry struct {
+ waiters map[storage.TransactionID]chan<- ProcReceiveHandler
+ m sync.Mutex
+}
+
+// NewProcReceiveRegistry creates a new registry by allocating the required
+// variables.
+func NewProcReceiveRegistry() *ProcReceiveRegistry {
+ return &ProcReceiveRegistry{
+ waiters: make(map[storage.TransactionID]chan<- ProcReceiveHandler),
+ }
+}
+
+// RegisterWaiter registers a waiter against the provided transactionID.
+// The function returns a channel to obtain the handler, a cleanup function
+// which must be called and an error if any.
+func (r *ProcReceiveRegistry) RegisterWaiter(id storage.TransactionID) (<-chan ProcReceiveHandler, func(), error) {
+ r.m.Lock()
+ defer r.m.Unlock()
+
+ if _, ok := r.waiters[id]; ok {
+ return nil, nil, fmt.Errorf("cannot register id: %d again", id)
+ }
+
+ ch := make(chan ProcReceiveHandler)
+ r.waiters[id] = ch
+
+ cleanup := func() {
+ r.m.Lock()
+ defer r.m.Unlock()
+
+ delete(r.waiters, id)
+ }
+
+ return ch, cleanup, nil
+}
+
+// Transmit transmits a handler to its waiter.
+func (r *ProcReceiveRegistry) Transmit(handler ProcReceiveHandler) error {
+ r.m.Lock()
+ defer r.m.Unlock()
+
+ ch, ok := r.waiters[handler.TransactionID()]
+ if !ok {
+ return fmt.Errorf("no waiters for id: %d", handler.TransactionID())
+ }
+
+ ch <- handler
+
+ return nil
+}
diff --git a/internal/gitaly/hook/procreceive_registry_test.go b/internal/gitaly/hook/procreceive_registry_test.go
new file mode 100644
index 000000000..e2fd1f3f1
--- /dev/null
+++ b/internal/gitaly/hook/procreceive_registry_test.go
@@ -0,0 +1,146 @@
+package hook
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+)
+
+func TestProcReceiveRegistry(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+ receiveHooksPayload := &git.UserDetails{
+ UserID: "1234",
+ Username: "user",
+ Protocol: "web",
+ }
+ repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ newHandler := func(id storage.TransactionID) (ProcReceiveHandler, <-chan struct{}) {
+ payload, err := git.NewHooksPayload(
+ cfg,
+ repo,
+ gittest.DefaultObjectHash,
+ nil,
+ receiveHooksPayload,
+ git.PreReceiveHook,
+ featureflag.FromContext(ctx),
+ id,
+ ).Env()
+ require.NoError(t, err)
+
+ var stdin bytes.Buffer
+ _, err = pktline.WriteString(&stdin, "version=1\000push-options atomic")
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+ _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s",
+ gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main"))
+ require.NoError(t, err)
+ err = pktline.WriteFlush(&stdin)
+ require.NoError(t, err)
+
+ var stdout bytes.Buffer
+
+ handler, doneCh, err := NewProcReceiveHandler([]string{payload}, &stdin, &stdout)
+ require.NoError(t, err)
+
+ return handler, doneCh
+ }
+
+ t.Run("transmit called before register", func(t *testing.T) {
+ t.Parallel()
+ registry := NewProcReceiveRegistry()
+
+ handler, _ := newHandler(1)
+ err := registry.Transmit(handler)
+
+ require.Equal(t, fmt.Errorf("no waiters for id: 1"), err)
+ })
+
+ t.Run("waiter registered twice", func(t *testing.T) {
+ t.Parallel()
+ registry := NewProcReceiveRegistry()
+
+ _, _, err := registry.RegisterWaiter(1)
+ require.NoError(t, err)
+
+ _, _, err = registry.RegisterWaiter(1)
+ require.Equal(t, fmt.Errorf("cannot register id: 1 again"), err)
+ })
+
+ t.Run("multiple handlers", func(t *testing.T) {
+ t.Parallel()
+
+ registry := NewProcReceiveRegistry()
+
+ handler1, _ := newHandler(1)
+ handler2, _ := newHandler(2)
+ handler3, _ := newHandler(3)
+
+ recvCh1, cleanup, err := registry.RegisterWaiter(1)
+ require.NoError(t, err)
+ defer cleanup()
+
+ recvCh2, cleanup2, err := registry.RegisterWaiter(2)
+ require.NoError(t, err)
+ defer cleanup2()
+
+ recvCh3, cleanup3, err := registry.RegisterWaiter(3)
+ require.NoError(t, err)
+ defer cleanup3()
+
+ wg := sync.WaitGroup{}
+ wg.Add(6)
+
+ go func() {
+ defer wg.Done()
+ handlerObtained := <-recvCh1
+ assert.Equal(t, handler1, handlerObtained)
+ }()
+
+ go func() {
+ defer wg.Done()
+ handlerObtained := <-recvCh2
+ assert.Equal(t, handler2, handlerObtained)
+ }()
+
+ go func() {
+ defer wg.Done()
+ handlerObtained := <-recvCh3
+ assert.Equal(t, handler3, handlerObtained)
+ }()
+
+ go func() {
+ defer wg.Done()
+ assert.NoError(t, registry.Transmit(handler1))
+ }()
+
+ go func() {
+ defer wg.Done()
+ assert.NoError(t, registry.Transmit(handler2))
+ }()
+
+ go func() {
+ defer wg.Done()
+ assert.NoError(t, registry.Transmit(handler3))
+ }()
+
+ wg.Wait()
+ })
+}
diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go
index 568cef0f8..8bcaa97f3 100644
--- a/internal/gitaly/hook/transactions_test.go
+++ b/internal/gitaly/hook/transactions_test.go
@@ -38,7 +38,7 @@ func TestHookManager_stopCalled(t *testing.T) {
var mockTxMgr transaction.MockManager
hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), &mockTxMgr, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
hooksPayload, err := git.NewHooksPayload(
cfg,
@@ -144,7 +144,7 @@ func TestHookManager_contextCancellationCancelsVote(t *testing.T) {
hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), &mockTxMgr, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
hooksPayload, err := git.NewHooksPayload(
cfg,
diff --git a/internal/gitaly/hook/update_test.go b/internal/gitaly/hook/update_test.go
index 8d8ab8e30..8f446dd16 100644
--- a/internal/gitaly/hook/update_test.go
+++ b/internal/gitaly/hook/update_test.go
@@ -41,7 +41,7 @@ func TestUpdate_customHooks(t *testing.T) {
txManager := transaction.NewTrackingManager()
hookManager := NewManager(cfg, locator, testhelper.SharedLogger(t), gitCmdFactory, txManager, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
receiveHooksPayload := &git.UserDetails{
UserID: "1234",
@@ -258,7 +258,7 @@ func TestUpdate_quarantine(t *testing.T) {
hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), nil, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry())
//nolint:gitaly-linters
gittest.WriteCustomHook(t, repoPath, "update", []byte(fmt.Sprintf(
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 1c8ff4d0d..835442db9 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -195,7 +195,7 @@ func runServer(t *testing.T, cfg config.Cfg) string {
gitCmdFactory := gittest.NewCommandFactory(t, cfg)
hookManager := hook.NewManager(cfg, locator, logger, gitCmdFactory, txManager, gitlab.NewMockClient(
t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive,
- ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()), hook.NewProcReceiveRegistry())
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator, logger)
diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go
index 90064ec67..16b9d1f14 100644
--- a/internal/gitaly/service/hook/testhelper_test.go
+++ b/internal/gitaly/service/hook/testhelper_test.go
@@ -59,7 +59,7 @@ func runHooksServerWithTransactionRegistry(tb testing.TB, cfg config.Cfg, opts [
return testserver.RunGitalyServer(tb, cfg, func(srv *grpc.Server, deps *service.Dependencies) {
if txRegistry != nil {
- deps.GitalyHookManager = gitalyhook.NewManager(deps.GetCfg(), deps.GetLocator(), deps.GetLogger(), deps.GetGitCmdFactory(), deps.GetTxManager(), deps.GetGitlabClient(), txRegistry)
+ deps.GitalyHookManager = gitalyhook.NewManager(deps.GetCfg(), deps.GetLocator(), deps.GetLogger(), deps.GetGitCmdFactory(), deps.GetTxManager(), deps.GetGitlabClient(), txRegistry, gitalyhook.NewProcReceiveRegistry())
}
hookServer := NewServer(deps)
diff --git a/internal/gitaly/service/operations/merge_branch_test.go b/internal/gitaly/service/operations/merge_branch_test.go
index aaf450194..e0e681722 100644
--- a/internal/gitaly/service/operations/merge_branch_test.go
+++ b/internal/gitaly/service/operations/merge_branch_test.go
@@ -1148,7 +1148,9 @@ func testUserMergeBranchAllowed(t *testing.T, ctx context.Context) {
},
gitlab.MockPreReceive,
gitlab.MockPostReceive,
- ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()))
+ ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()),
+ hook.NewProcReceiveRegistry(),
+ )
ctx, cfg, client := setupOperationsServiceWithCfg(
t, ctx, cfg,
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 9af5d742b..d7210b6b8 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -324,7 +324,15 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.hookMgr == nil {
- gsd.hookMgr = hook.NewManager(cfg, gsd.locator, gsd.logger, gsd.gitCmdFactory, gsd.txMgr, gsd.gitlabClient, hook.NewTransactionRegistry(gsd.transactionRegistry))
+ gsd.hookMgr = hook.NewManager(
+ cfg, gsd.locator,
+ gsd.logger,
+ gsd.gitCmdFactory,
+ gsd.txMgr,
+ gsd.gitlabClient,
+ hook.NewTransactionRegistry(gsd.transactionRegistry),
+ hook.NewProcReceiveRegistry(),
+ )
}
if gsd.catfileCache == nil {