diff options
author | Karthik Nayak <knayak@gitlab.com> | 2023-12-07 20:52:42 +0300 |
---|---|---|
committer | Karthik Nayak <knayak@gitlab.com> | 2023-12-08 14:36:55 +0300 |
commit | 3c1fc525900d00291485a6f823455617e1264fb3 (patch) | |
tree | f90fc0955f4b7dce32b17691af694f10691e6d3f | |
parent | c0b07ba36fc8fc40830f061cb55a5c951a166e1c (diff) |
hook: Introduce `ProcReceiveHandler` and `ProcReceiveRegistry`
Introduce the `ProcReceiveHandler` which provides the mechanism for RPCs
which invoked git-receive-pack(1) to interact with the proc-receive
hook. It provides access to a list of references that a transaction is
attempting to update, and functions to accept or reject individual
updates.
Also, introduce `ProcReceiveRegistry`, a registry which provides the
proc-receive handlers against a provided transaction ID. The registry
allows RPCs which perform commands that execute git-receive-pack(1) to
hook into the proc-receive handler. The RPC must register itself with
the registry by calling RegisterWaiter(), this provides a channel where
the handler will be provided along with a registry cleanup function.
-rw-r--r-- | internal/cli/gitaly/serve.go | 11 | ||||
-rw-r--r-- | internal/cli/gitaly/subcmd_check.go | 11 | ||||
-rw-r--r-- | internal/gitaly/hook/manager.go | 31 | ||||
-rw-r--r-- | internal/gitaly/hook/postreceive_test.go | 14 | ||||
-rw-r--r-- | internal/gitaly/hook/prereceive_test.go | 15 | ||||
-rw-r--r-- | internal/gitaly/hook/procreceive_handler.go | 196 | ||||
-rw-r--r-- | internal/gitaly/hook/procreceive_handler_test.go | 260 | ||||
-rw-r--r-- | internal/gitaly/hook/procreceive_registry.go | 103 | ||||
-rw-r--r-- | internal/gitaly/hook/procreceive_registry_test.go | 146 | ||||
-rw-r--r-- | internal/gitaly/hook/transactions_test.go | 4 | ||||
-rw-r--r-- | internal/gitaly/hook/update_test.go | 4 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 2 | ||||
-rw-r--r-- | internal/gitaly/service/hook/testhelper_test.go | 2 | ||||
-rw-r--r-- | internal/gitaly/service/operations/merge_branch_test.go | 4 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 10 |
15 files changed, 783 insertions, 30 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 9db91a758..beee3407a 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -253,7 +253,16 @@ func run(cfg config.Cfg, logger log.Logger) error { } prometheus.MustRegister(gitlabClient) - hookManager = hook.NewManager(cfg, locator, logger, gitCmdFactory, transactionManager, gitlabClient, hook.NewTransactionRegistry(txRegistry)) + hookManager = hook.NewManager( + cfg, + locator, + logger, + gitCmdFactory, + transactionManager, + gitlabClient, + hook.NewTransactionRegistry(txRegistry), + hook.NewProcReceiveRegistry(), + ) } conns := client.NewPool( diff --git a/internal/cli/gitaly/subcmd_check.go b/internal/cli/gitaly/subcmd_check.go index 5ca3419d1..7f1cbb397 100644 --- a/internal/cli/gitaly/subcmd_check.go +++ b/internal/cli/gitaly/subcmd_check.go @@ -74,5 +74,14 @@ func checkAPI(cfg config.Cfg, logger log.Logger) (*gitlab.CheckInfo, error) { } defer cleanup() - return hook.NewManager(cfg, config.NewLocator(cfg), logger, gitCmdFactory, nil, gitlabAPI, hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry())).Check(context.Background()) + return hook.NewManager( + cfg, + config.NewLocator(cfg), + logger, + gitCmdFactory, + nil, + gitlabAPI, + hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()), + hook.NewProcReceiveRegistry(), + ).Check(context.Background()) } diff --git a/internal/gitaly/hook/manager.go b/internal/gitaly/hook/manager.go index dd4b2a5d5..1cdd18e33 100644 --- a/internal/gitaly/hook/manager.go +++ b/internal/gitaly/hook/manager.go @@ -78,13 +78,14 @@ func NewTransactionRegistry(txRegistry *storagemgr.TransactionRegistry) Transact // GitLabHookManager is a hook manager containing Git hook business logic. It // uses the GitLab API to authenticate and track ongoing hook calls. type GitLabHookManager struct { - cfg config.Cfg - locator storage.Locator - logger log.Logger - gitCmdFactory git.CommandFactory - txManager transaction.Manager - gitlabClient gitlab.Client - txRegistry TransactionRegistry + cfg config.Cfg + locator storage.Locator + logger log.Logger + gitCmdFactory git.CommandFactory + txManager transaction.Manager + gitlabClient gitlab.Client + txRegistry TransactionRegistry + procReceiveRegistry *ProcReceiveRegistry } // NewManager returns a new hook manager @@ -96,14 +97,16 @@ func NewManager( txManager transaction.Manager, gitlabClient gitlab.Client, txRegistry TransactionRegistry, + procReceiveRegistry *ProcReceiveRegistry, ) *GitLabHookManager { return &GitLabHookManager{ - cfg: cfg, - locator: locator, - logger: logger, - gitCmdFactory: gitCmdFactory, - txManager: txManager, - gitlabClient: gitlabClient, - txRegistry: txRegistry, + cfg: cfg, + locator: locator, + logger: logger, + gitCmdFactory: gitCmdFactory, + txManager: txManager, + gitlabClient: gitlabClient, + txRegistry: txRegistry, + procReceiveRegistry: procReceiveRegistry, } } diff --git a/internal/gitaly/hook/postreceive_test.go b/internal/gitaly/hook/postreceive_test.go index 808035831..31fc8948a 100644 --- a/internal/gitaly/hook/postreceive_test.go +++ b/internal/gitaly/hook/postreceive_test.go @@ -84,7 +84,7 @@ func TestPostReceive_customHook(t *testing.T) { txManager := transaction.NewTrackingManager() hookManager := NewManager(cfg, locator, testhelper.SharedLogger(t), gitCmdFactory, txManager, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) receiveHooksPayload := &git.UserDetails{ UserID: "1234", @@ -381,7 +381,15 @@ func TestPostReceive_gitlab(t *testing.T) { }, } - hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()), &gitlabAPI, NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + hookManager := NewManager( + cfg, + config.NewLocator(cfg), + testhelper.SharedLogger(t), + gittest.NewCommandFactory(t, cfg), + transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()), + &gitlabAPI, NewTransactionRegistry(storagemgr.NewTransactionRegistry()), + NewProcReceiveRegistry(), + ) gittest.WriteCustomHook(t, repoPath, "post-receive", []byte("#!/bin/sh\necho hook called\n")) @@ -419,7 +427,7 @@ func TestPostReceive_quarantine(t *testing.T) { hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), nil, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) gittest.WriteCustomHook(t, repoPath, "post-receive", []byte(fmt.Sprintf( `#!/bin/sh diff --git a/internal/gitaly/hook/prereceive_test.go b/internal/gitaly/hook/prereceive_test.go index 77f6bb4a1..1bfab9447 100644 --- a/internal/gitaly/hook/prereceive_test.go +++ b/internal/gitaly/hook/prereceive_test.go @@ -43,7 +43,7 @@ func TestPrereceive_customHooks(t *testing.T) { txManager := transaction.NewTrackingManager() hookManager := NewManager(cfg, locator, testhelper.SharedLogger(t), gitCmdFactory, txManager, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) receiveHooksPayload := &git.UserDetails{ UserID: "1234", @@ -228,7 +228,7 @@ func TestPrereceive_quarantine(t *testing.T) { hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), nil, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) //nolint:gitaly-linters gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte(fmt.Sprintf( @@ -419,7 +419,16 @@ func TestPrereceive_gitlab(t *testing.T) { }, } - hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()), &gitlabAPI, NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + hookManager := NewManager( + cfg, + config.NewLocator(cfg), + testhelper.SharedLogger(t), + gittest.NewCommandFactory(t, cfg), + transaction.NewManager(cfg, testhelper.SharedLogger(t), backchannel.NewRegistry()), + &gitlabAPI, + NewTransactionRegistry(storagemgr.NewTransactionRegistry()), + NewProcReceiveRegistry(), + ) gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte("#!/bin/sh\necho called\n")) diff --git a/internal/gitaly/hook/procreceive_handler.go b/internal/gitaly/hook/procreceive_handler.go new file mode 100644 index 000000000..943447442 --- /dev/null +++ b/internal/gitaly/hook/procreceive_handler.go @@ -0,0 +1,196 @@ +package hook + +import ( + "bytes" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +type procReceiveHandler struct { + stdout io.Writer + doneCh chan<- struct{} + updates []ReferenceUpdate + transactionID storage.TransactionID + atomic bool +} + +// NewProcReceiveHandler returns a ProcReceiveHandler implementation. +// The function, returns the handler along with a channel which indicates completion +// of the handlers usage. +// +// ProcReceiveHandler is used to intercept git-receive-pack(1)'s execute-commands +// code. This allows us to intercept reference updates before writing to the +// disk via the proc-receive hook (https://git-scm.com/docs/githooks#proc-receive). +// +// The handler is transmitted to RPCs which executed git-receive-pack(1), so they +// can accept or reject individual reference updates. +func NewProcReceiveHandler(env []string, stdin io.Reader, stdout io.Writer) (ProcReceiveHandler, <-chan struct{}, error) { + payload, err := git.HooksPayloadFromEnv(env) + if err != nil { + return nil, nil, fmt.Errorf("extracting hooks payload: %w", err) + } + + // This hook only works when there is a transaction present. + if payload.TransactionID == 0 { + return nil, nil, fmt.Errorf("no transaction found in payload") + } + + scanner := pktline.NewScanner(stdin) + + // Version and feature negotiation. + if !scanner.Scan() { + return nil, nil, fmt.Errorf("expected version negotiation: %w", scanner.Err()) + } + + data, err := pktline.Payload(scanner.Bytes()) + if err != nil { + return nil, nil, fmt.Errorf("receiving header: %w", err) + } + + after, ok := bytes.CutPrefix(data, []byte("version=1\000")) + if !ok { + return nil, nil, fmt.Errorf("unsupported version: %s", data) + } + featureRequests := parseFeatureRequest(after) + + if !scanner.Scan() { + return nil, nil, fmt.Errorf("expected flush: %w", scanner.Err()) + } + + if !pktline.IsFlush(scanner.Bytes()) { + return nil, nil, fmt.Errorf("expected pkt flush") + } + + if _, err := pktline.WriteString(stdout, fmt.Sprintf("version=1\000%s", featureRequests)); err != nil { + return nil, nil, fmt.Errorf("writing version: %w", err) + } + + if err := pktline.WriteFlush(stdout); err != nil { + return nil, nil, fmt.Errorf("flushing version: %w", err) + } + + updates := []ReferenceUpdate{} + for scanner.Scan() { + line := scanner.Bytes() + + // When all reference updates are transmitted, we expect a flush. + if pktline.IsFlush(line) { + break + } + + data, err := pktline.Payload(line) + if err != nil { + return nil, nil, fmt.Errorf("receiving reference update: %w", err) + } + + update, err := parseRefUpdate(data) + if err != nil { + return nil, nil, fmt.Errorf("parse reference update: %w", err) + } + updates = append(updates, update) + } + + if err := scanner.Err(); err != nil { + return nil, nil, fmt.Errorf("parsing stdin: %w", err) + } + + ch := make(chan struct{}) + + return &procReceiveHandler{ + transactionID: payload.TransactionID, + atomic: featureRequests.atomic, + stdout: stdout, + updates: updates, + doneCh: ch, + }, ch, nil +} + +// TransactionID provides the storage.TransactionID associated with the +// handler. +func (h *procReceiveHandler) TransactionID() storage.TransactionID { + return h.transactionID +} + +// Atomic denotes whether the push was atomic. +func (h *procReceiveHandler) Atomic() bool { + return h.atomic +} + +// ReferenceUpdates provides the reference updates to be made. +func (h *procReceiveHandler) ReferenceUpdates() []ReferenceUpdate { + return h.updates +} + +// AcceptUpdate accepts a given reference update. +func (h *procReceiveHandler) AcceptUpdate(referenceName git.ReferenceName) error { + if _, err := pktline.WriteString(h.stdout, fmt.Sprintf("ok %s", referenceName)); err != nil { + return fmt.Errorf("write ref %s ok: %w", referenceName, err) + } + + return nil +} + +// RejectUpdate rejects a given reference update with the given reason. +func (h *procReceiveHandler) RejectUpdate(referenceName git.ReferenceName, reason string) error { + if _, err := pktline.WriteString(h.stdout, fmt.Sprintf("ng %s %s", referenceName, reason)); err != nil { + return fmt.Errorf("write ref %s ng: %w", referenceName, err) + } + + return nil +} + +// Close must be called to clean up the proc-receive hook. +func (h *procReceiveHandler) Close() error { + defer close(h.doneCh) + + if err := pktline.WriteFlush(h.stdout); err != nil { + return fmt.Errorf("flushing updates: %w", err) + } + + return nil +} + +func parseRefUpdate(data []byte) (ReferenceUpdate, error) { + var update ReferenceUpdate + + split := bytes.Split(data, []byte(" ")) + if len(split) != 3 { + return update, fmt.Errorf("unknown ref update format: %s", split) + } + + update.Ref = git.ReferenceName(split[2]) + update.OldOID = git.ObjectID(split[0]) + update.NewOID = git.ObjectID(split[1]) + + return update, nil +} + +type procReceiveFeatureRequests struct { + atomic bool +} + +func (r *procReceiveFeatureRequests) String() string { + s := "" + if r.atomic { + s = "atomic" + } + + return s +} + +// parseFeatureRequest parses the features requested. +func parseFeatureRequest(data []byte) *procReceiveFeatureRequests { + var featureRequests procReceiveFeatureRequests + + for _, feature := range bytes.Split(data, []byte(" ")) { + if bytes.Equal(feature, []byte("atomic")) { + featureRequests.atomic = true + } + } + + return &featureRequests +} diff --git a/internal/gitaly/hook/procreceive_handler_test.go b/internal/gitaly/hook/procreceive_handler_test.go new file mode 100644 index 000000000..c038adfc0 --- /dev/null +++ b/internal/gitaly/hook/procreceive_handler_test.go @@ -0,0 +1,260 @@ +package hook + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" +) + +func TestProcReceiveHandler(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + receiveHooksPayload := &git.UserDetails{ + UserID: "1234", + Username: "user", + Protocol: "web", + } + + payload, err := git.NewHooksPayload( + cfg, + repo, + gittest.DefaultObjectHash, + nil, + receiveHooksPayload, + git.PreReceiveHook, + featureflag.FromContext(ctx), + 1, + ).Env() + require.NoError(t, err) + + type setupData struct { + env []string + ctx context.Context + stdin string + expectedErr error + expectedStdout string + expectedUpdates []ReferenceUpdate + expectedAtomic bool + handlerSteps func(handler ProcReceiveHandler) error + } + + for _, tc := range []struct { + desc string + setup func(t *testing.T, ctx context.Context) setupData + }{ + { + desc: "no payload", + setup: func(t *testing.T, ctx context.Context) setupData { + return setupData{ + env: []string{}, + ctx: ctx, + expectedErr: fmt.Errorf("extracting hooks payload: %w", errors.New("no hooks payload found in environment")), + } + }, + }, + { + desc: "invalid version", + setup: func(t *testing.T, ctx context.Context) setupData { + var stdin bytes.Buffer + _, err := pktline.WriteString(&stdin, "version=2") + require.NoError(t, err) + + return setupData{ + env: []string{payload}, + ctx: ctx, + stdin: stdin.String(), + expectedErr: errors.New("unsupported version: version=2"), + } + }, + }, + { + desc: "single reference with atomic", + setup: func(t *testing.T, ctx context.Context) setupData { + var stdin bytes.Buffer + _, err := pktline.WriteString(&stdin, "version=1\000push-options atomic") + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s", + gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main")) + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + + var stdout bytes.Buffer + _, err = pktline.WriteString(&stdout, "version=1\000atomic") + require.NoError(t, err) + err = pktline.WriteFlush(&stdout) + require.NoError(t, err) + _, err = pktline.WriteString(&stdout, "ok refs/heads/main") + require.NoError(t, err) + err = pktline.WriteFlush(&stdout) + require.NoError(t, err) + + return setupData{ + env: []string{payload}, + ctx: ctx, + stdin: stdin.String(), + expectedStdout: stdout.String(), + expectedAtomic: true, + expectedUpdates: []ReferenceUpdate{ + { + Ref: "refs/heads/main", + OldOID: gittest.DefaultObjectHash.ZeroOID, + NewOID: gittest.DefaultObjectHash.EmptyTreeOID, + }, + }, + handlerSteps: func(handler ProcReceiveHandler) error { + require.NoError(t, handler.AcceptUpdate("refs/heads/main")) + return handler.Close() + }, + } + }, + }, + { + desc: "single reference without atomic", + setup: func(t *testing.T, ctx context.Context) setupData { + var stdin bytes.Buffer + _, err := pktline.WriteString(&stdin, "version=1\000push-options") + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s", + gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main")) + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + + var stdout bytes.Buffer + _, err = pktline.WriteString(&stdout, "version=1\000") + require.NoError(t, err) + err = pktline.WriteFlush(&stdout) + require.NoError(t, err) + _, err = pktline.WriteString(&stdout, "ok refs/heads/main") + require.NoError(t, err) + err = pktline.WriteFlush(&stdout) + require.NoError(t, err) + + return setupData{ + env: []string{payload}, + ctx: ctx, + stdin: stdin.String(), + expectedStdout: stdout.String(), + expectedUpdates: []ReferenceUpdate{ + { + Ref: "refs/heads/main", + OldOID: gittest.DefaultObjectHash.ZeroOID, + NewOID: gittest.DefaultObjectHash.EmptyTreeOID, + }, + }, + handlerSteps: func(handler ProcReceiveHandler) error { + require.NoError(t, handler.AcceptUpdate("refs/heads/main")) + return handler.Close() + }, + } + }, + }, + { + desc: "multiple references", + setup: func(t *testing.T, ctx context.Context) setupData { + var stdin bytes.Buffer + _, err := pktline.WriteString(&stdin, "version=1\000push-options") + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s", + gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main")) + require.NoError(t, err) + _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s", + gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/branch")) + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + + var stdout bytes.Buffer + _, err = pktline.WriteString(&stdout, "version=1\000") + require.NoError(t, err) + err = pktline.WriteFlush(&stdout) + require.NoError(t, err) + _, err = pktline.WriteString(&stdout, "ok refs/heads/main") + require.NoError(t, err) + _, err = pktline.WriteString(&stdout, "ng refs/heads/branch for fun") + require.NoError(t, err) + err = pktline.WriteFlush(&stdout) + require.NoError(t, err) + + return setupData{ + env: []string{payload}, + ctx: ctx, + stdin: stdin.String(), + expectedStdout: stdout.String(), + expectedUpdates: []ReferenceUpdate{ + { + Ref: "refs/heads/main", + OldOID: gittest.DefaultObjectHash.ZeroOID, + NewOID: gittest.DefaultObjectHash.EmptyTreeOID, + }, + { + Ref: "refs/heads/branch", + OldOID: gittest.DefaultObjectHash.ZeroOID, + NewOID: gittest.DefaultObjectHash.EmptyTreeOID, + }, + }, + handlerSteps: func(handler ProcReceiveHandler) error { + require.NoError(t, handler.AcceptUpdate("refs/heads/main")) + require.NoError(t, handler.RejectUpdate("refs/heads/branch", "for fun")) + return handler.Close() + }, + } + }, + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + setup := tc.setup(t, ctx) + + var stdout bytes.Buffer + handler, doneCh, err := NewProcReceiveHandler(setup.env, strings.NewReader(setup.stdin), &stdout) + if err != nil || setup.expectedErr != nil { + require.Equal(t, setup.expectedErr, err) + return + } + + updates := handler.ReferenceUpdates() + require.Equal(t, setup.expectedUpdates, updates) + + select { + case <-doneCh: + t.Fatal("done returned before handler called Close()") + default: + } + + require.NoError(t, setup.handlerSteps(handler)) + // When Close() is called, we must receive a confirmation. + <-doneCh + + require.Equal(t, setup.expectedStdout, stdout.String()) + }) + } +} diff --git a/internal/gitaly/hook/procreceive_registry.go b/internal/gitaly/hook/procreceive_registry.go new file mode 100644 index 000000000..b62fa6ae2 --- /dev/null +++ b/internal/gitaly/hook/procreceive_registry.go @@ -0,0 +1,103 @@ +package hook + +import ( + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +// ReferenceUpdate denotes a single reference update to be made. +type ReferenceUpdate struct { + Ref git.ReferenceName + OldOID git.ObjectID + NewOID git.ObjectID +} + +// ProcReceiveHandler provides the mechanism for RPCs which invoked +// git-receive-pack(1) to interact with the proc-receive hook. It provides +// access to a list of references that a transaction is attempting to update, +// and functions to accept or reject individual updates. +type ProcReceiveHandler interface { + // TransactionID provides the storage.TransactionID associated with the + // handler. + TransactionID() storage.TransactionID + + // Atomic denotes whether the push was atomic. + Atomic() bool + + // ReferenceUpdates provides the reference updates to be made. + ReferenceUpdates() []ReferenceUpdate + + // AcceptUpdate tells the registry to accept a given reference update. + AcceptUpdate(referenceName git.ReferenceName) error + // RejectUpdate tells the registry to reject a given reference update, along + // with a reason. + RejectUpdate(referenceName git.ReferenceName, reason string) error + + // Close must be called to clean up the proc-receive hook. + Close() error +} + +// ProcReceiveRegistry is the registry which provides the proc-receive handlers +// (https://git-scm.com/docs/githooks#proc-receive) against a provided transaction ID. +// +// The registry allows RPCs which perform commands that execute git-receive-pack(1) +// to hook into the proc-receive handler. The RPC must register itself with the +// registry by calling RegisterWaiter(), this provides a channel where the handler +// will be provided along with a registry cleanup function. +// +// When the handler for the associated transaction ID is added to the registry +// via the Transmit() function, it will be propagated to the channel. +type ProcReceiveRegistry struct { + waiters map[storage.TransactionID]chan<- ProcReceiveHandler + m sync.Mutex +} + +// NewProcReceiveRegistry creates a new registry by allocating the required +// variables. +func NewProcReceiveRegistry() *ProcReceiveRegistry { + return &ProcReceiveRegistry{ + waiters: make(map[storage.TransactionID]chan<- ProcReceiveHandler), + } +} + +// RegisterWaiter registers a waiter against the provided transactionID. +// The function returns a channel to obtain the handler, a cleanup function +// which must be called and an error if any. +func (r *ProcReceiveRegistry) RegisterWaiter(id storage.TransactionID) (<-chan ProcReceiveHandler, func(), error) { + r.m.Lock() + defer r.m.Unlock() + + if _, ok := r.waiters[id]; ok { + return nil, nil, fmt.Errorf("cannot register id: %d again", id) + } + + ch := make(chan ProcReceiveHandler) + r.waiters[id] = ch + + cleanup := func() { + r.m.Lock() + defer r.m.Unlock() + + delete(r.waiters, id) + } + + return ch, cleanup, nil +} + +// Transmit transmits a handler to its waiter. +func (r *ProcReceiveRegistry) Transmit(handler ProcReceiveHandler) error { + r.m.Lock() + defer r.m.Unlock() + + ch, ok := r.waiters[handler.TransactionID()] + if !ok { + return fmt.Errorf("no waiters for id: %d", handler.TransactionID()) + } + + ch <- handler + + return nil +} diff --git a/internal/gitaly/hook/procreceive_registry_test.go b/internal/gitaly/hook/procreceive_registry_test.go new file mode 100644 index 000000000..e2fd1f3f1 --- /dev/null +++ b/internal/gitaly/hook/procreceive_registry_test.go @@ -0,0 +1,146 @@ +package hook + +import ( + "bytes" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" +) + +func TestProcReceiveRegistry(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + receiveHooksPayload := &git.UserDetails{ + UserID: "1234", + Username: "user", + Protocol: "web", + } + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + newHandler := func(id storage.TransactionID) (ProcReceiveHandler, <-chan struct{}) { + payload, err := git.NewHooksPayload( + cfg, + repo, + gittest.DefaultObjectHash, + nil, + receiveHooksPayload, + git.PreReceiveHook, + featureflag.FromContext(ctx), + id, + ).Env() + require.NoError(t, err) + + var stdin bytes.Buffer + _, err = pktline.WriteString(&stdin, "version=1\000push-options atomic") + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + _, err = pktline.WriteString(&stdin, fmt.Sprintf("%s %s %s", + gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.EmptyTreeOID, "refs/heads/main")) + require.NoError(t, err) + err = pktline.WriteFlush(&stdin) + require.NoError(t, err) + + var stdout bytes.Buffer + + handler, doneCh, err := NewProcReceiveHandler([]string{payload}, &stdin, &stdout) + require.NoError(t, err) + + return handler, doneCh + } + + t.Run("transmit called before register", func(t *testing.T) { + t.Parallel() + registry := NewProcReceiveRegistry() + + handler, _ := newHandler(1) + err := registry.Transmit(handler) + + require.Equal(t, fmt.Errorf("no waiters for id: 1"), err) + }) + + t.Run("waiter registered twice", func(t *testing.T) { + t.Parallel() + registry := NewProcReceiveRegistry() + + _, _, err := registry.RegisterWaiter(1) + require.NoError(t, err) + + _, _, err = registry.RegisterWaiter(1) + require.Equal(t, fmt.Errorf("cannot register id: 1 again"), err) + }) + + t.Run("multiple handlers", func(t *testing.T) { + t.Parallel() + + registry := NewProcReceiveRegistry() + + handler1, _ := newHandler(1) + handler2, _ := newHandler(2) + handler3, _ := newHandler(3) + + recvCh1, cleanup, err := registry.RegisterWaiter(1) + require.NoError(t, err) + defer cleanup() + + recvCh2, cleanup2, err := registry.RegisterWaiter(2) + require.NoError(t, err) + defer cleanup2() + + recvCh3, cleanup3, err := registry.RegisterWaiter(3) + require.NoError(t, err) + defer cleanup3() + + wg := sync.WaitGroup{} + wg.Add(6) + + go func() { + defer wg.Done() + handlerObtained := <-recvCh1 + assert.Equal(t, handler1, handlerObtained) + }() + + go func() { + defer wg.Done() + handlerObtained := <-recvCh2 + assert.Equal(t, handler2, handlerObtained) + }() + + go func() { + defer wg.Done() + handlerObtained := <-recvCh3 + assert.Equal(t, handler3, handlerObtained) + }() + + go func() { + defer wg.Done() + assert.NoError(t, registry.Transmit(handler1)) + }() + + go func() { + defer wg.Done() + assert.NoError(t, registry.Transmit(handler2)) + }() + + go func() { + defer wg.Done() + assert.NoError(t, registry.Transmit(handler3)) + }() + + wg.Wait() + }) +} diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go index 568cef0f8..8bcaa97f3 100644 --- a/internal/gitaly/hook/transactions_test.go +++ b/internal/gitaly/hook/transactions_test.go @@ -38,7 +38,7 @@ func TestHookManager_stopCalled(t *testing.T) { var mockTxMgr transaction.MockManager hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), &mockTxMgr, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) hooksPayload, err := git.NewHooksPayload( cfg, @@ -144,7 +144,7 @@ func TestHookManager_contextCancellationCancelsVote(t *testing.T) { hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), &mockTxMgr, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) hooksPayload, err := git.NewHooksPayload( cfg, diff --git a/internal/gitaly/hook/update_test.go b/internal/gitaly/hook/update_test.go index 8d8ab8e30..8f446dd16 100644 --- a/internal/gitaly/hook/update_test.go +++ b/internal/gitaly/hook/update_test.go @@ -41,7 +41,7 @@ func TestUpdate_customHooks(t *testing.T) { txManager := transaction.NewTrackingManager() hookManager := NewManager(cfg, locator, testhelper.SharedLogger(t), gitCmdFactory, txManager, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) receiveHooksPayload := &git.UserDetails{ UserID: "1234", @@ -258,7 +258,7 @@ func TestUpdate_quarantine(t *testing.T) { hookManager := NewManager(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t), gittest.NewCommandFactory(t, cfg), nil, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), NewTransactionRegistry(storagemgr.NewTransactionRegistry()), NewProcReceiveRegistry()) //nolint:gitaly-linters gittest.WriteCustomHook(t, repoPath, "update", []byte(fmt.Sprintf( diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 1c8ff4d0d..835442db9 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -195,7 +195,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { gitCmdFactory := gittest.NewCommandFactory(t, cfg) hookManager := hook.NewManager(cfg, locator, logger, gitCmdFactory, txManager, gitlab.NewMockClient( t, gitlab.MockAllowed, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()), hook.NewProcReceiveRegistry()) catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator, logger) diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index 90064ec67..16b9d1f14 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -59,7 +59,7 @@ func runHooksServerWithTransactionRegistry(tb testing.TB, cfg config.Cfg, opts [ return testserver.RunGitalyServer(tb, cfg, func(srv *grpc.Server, deps *service.Dependencies) { if txRegistry != nil { - deps.GitalyHookManager = gitalyhook.NewManager(deps.GetCfg(), deps.GetLocator(), deps.GetLogger(), deps.GetGitCmdFactory(), deps.GetTxManager(), deps.GetGitlabClient(), txRegistry) + deps.GitalyHookManager = gitalyhook.NewManager(deps.GetCfg(), deps.GetLocator(), deps.GetLogger(), deps.GetGitCmdFactory(), deps.GetTxManager(), deps.GetGitlabClient(), txRegistry, gitalyhook.NewProcReceiveRegistry()) } hookServer := NewServer(deps) diff --git a/internal/gitaly/service/operations/merge_branch_test.go b/internal/gitaly/service/operations/merge_branch_test.go index aaf450194..e0e681722 100644 --- a/internal/gitaly/service/operations/merge_branch_test.go +++ b/internal/gitaly/service/operations/merge_branch_test.go @@ -1148,7 +1148,9 @@ func testUserMergeBranchAllowed(t *testing.T, ctx context.Context) { }, gitlab.MockPreReceive, gitlab.MockPostReceive, - ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry())) + ), hook.NewTransactionRegistry(storagemgr.NewTransactionRegistry()), + hook.NewProcReceiveRegistry(), + ) ctx, cfg, client := setupOperationsServiceWithCfg( t, ctx, cfg, diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 9af5d742b..d7210b6b8 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -324,7 +324,15 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.hookMgr == nil { - gsd.hookMgr = hook.NewManager(cfg, gsd.locator, gsd.logger, gsd.gitCmdFactory, gsd.txMgr, gsd.gitlabClient, hook.NewTransactionRegistry(gsd.transactionRegistry)) + gsd.hookMgr = hook.NewManager( + cfg, gsd.locator, + gsd.logger, + gsd.gitCmdFactory, + gsd.txMgr, + gsd.gitlabClient, + hook.NewTransactionRegistry(gsd.transactionRegistry), + hook.NewProcReceiveRegistry(), + ) } if gsd.catfileCache == nil { |