diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2021-07-20 20:26:31 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2021-07-27 17:46:37 +0300 |
commit | 40caf51e4f2350d729432dad501545144ef5625d (patch) | |
tree | 0dd3c819b9fc61ae44253b50de9574c269197e4d | |
parent | 459c107cf6a0e12efca231f10508cd49be34450f (diff) |
Add PackObjectsHookStream clientjv-pack-objects-hook
This is feature-flagged and won't be called by default.
Changelog: other
-rw-r--r-- | cmd/gitaly-hooks/hooks.go | 60 | ||||
-rw-r--r-- | cmd/gitaly-hooks/hooks_test.go | 46 | ||||
-rw-r--r-- | internal/git/hooks_options.go | 4 | ||||
-rw-r--r-- | internal/metadata/featureflag/feature_flags.go | 3 |
4 files changed, 101 insertions, 12 deletions
diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go index 03b33ddb8..1b21a4d46 100644 --- a/cmd/gitaly-hooks/hooks.go +++ b/cmd/gitaly-hooks/hooks.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "net" "os" "strings" @@ -13,6 +14,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" @@ -21,6 +23,7 @@ import ( gitalylog "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/stream" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "gitlab.com/gitlab-org/labkit/tracing" @@ -339,9 +342,17 @@ func packObjectsHook(ctx context.Context, payload git.HooksPayload, hookClient g fixedArgs = append(fixedArgs, fixFilterQuoteBug(a)) } - if err := handlePackObjects(ctx, hookClient, payload.Repo, fixedArgs); err != nil { - logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHook RPC failed") - return 1, nil + switch os.Getenv("GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM") { + case "1": + if err := handlePackObjectsStream(ctx, payload, fixedArgs); err != nil { + logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHookStream RPC failed") + return 1, nil + } + default: + if err := handlePackObjects(ctx, hookClient, payload.Repo, fixedArgs); err != nil { + logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHook RPC failed") + return 1, nil + } } return 0, nil @@ -406,3 +417,46 @@ type nopExitStatus struct { } func (nopExitStatus) GetExitStatus() *gitalypb.ExitStatus { return nil } + +func handlePackObjectsStream(ctx context.Context, payload git.HooksPayload, args []string) error { + req := &gitalypb.PackObjectsHookStreamRequest{ + Repository: payload.Repo, + Args: args, + } + + callback := func(c net.Conn) error { + if _, err := io.Copy( + pktline.NewSidebandWriter(c).Writer(0), + os.Stdin, + ); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { + var err error + switch band { + case 1: + _, err = os.Stdout.Write(data) + case 2: + _, err = os.Stderr.Write(data) + default: + err = fmt.Errorf("unexpected side band: %d", band) + } + return err + }) + } + + return streamrpc.Call( + ctx, + streamrpc.DialNet("unix://"+payload.InternalSocket), + "/gitaly.HookService/PackObjectsHookStream", + req, + callback, + streamrpc.WithCredentials( + gitalyauth.RPCCredentialsV2(payload.InternalSocketToken), + ), + ) +} diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index 424dd60a3..c5ccdadce 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/command" @@ -33,6 +34,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" ) type glHookValues struct { @@ -567,8 +569,8 @@ func TestCheckBadCreds(t *testing.T) { require.Regexp(t, `Checking GitLab API access: .* level=error msg="Internal API error" .* error="authorization failed" method=GET status=401 url="http://127.0.0.1:[0-9]+/api/v4/internal/check"\nFAIL`, stdout.String()) } -func runHookServiceServer(t *testing.T, cfg config.Cfg) { - runHookServiceWithGitlabClient(t, cfg, gitlab.NewMockClient()) +func runHookServiceServer(t *testing.T, cfg config.Cfg, serverOpts ...testserver.GitalyServerOpt) { + runHookServiceWithGitlabClient(t, cfg, gitlab.NewMockClient(), serverOpts...) } type featureFlagAsserter struct { @@ -607,12 +609,18 @@ func (svc featureFlagAsserter) PackObjectsHook(stream gitalypb.HookService_PackO return svc.wrapped.PackObjectsHook(stream) } -func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client) { +func (svc featureFlagAsserter) PackObjectsHookStream(ctx context.Context, req *gitalypb.PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + svc.assertFlags(ctx) + return svc.wrapped.PackObjectsHookStream(ctx, req) +} + +func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client, serverOpts ...testserver.GitalyServerOpt) { + serverOpts = append(serverOpts, testserver.WithGitLabClient(gitlabClient)) testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{ t: t, wrapped: hook.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory()), }) - }, testserver.WithGitLabClient(gitlabClient)) + }, serverOpts...) } func requireContainsOnce(t *testing.T, s string, contains string) { @@ -666,25 +674,45 @@ func TestGitalyHooksPackObjects(t *testing.T) { testCases := []struct { desc string extraArgs []string + extraEnv []string + method string }{ - {desc: "regular clone"}, - {desc: "shallow clone", extraArgs: []string{"--depth=1"}}, - {desc: "partial clone", extraArgs: []string{"--filter=blob:none"}}, + {desc: "regular clone", method: "PackObjectsHook"}, + {desc: "shallow clone", extraArgs: []string{"--depth=1"}, method: "PackObjectsHook"}, + {desc: "partial clone", extraArgs: []string{"--filter=blob:none"}, method: "PackObjectsHook"}, + { + desc: "regular clone StreamRPC", + extraEnv: []string{"GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM=1"}, + method: "PackObjectsHookStream", + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - runHookServiceServer(t, cfg) + logger, hook := test.NewNullLogger() + runHookServiceServer(t, cfg, testserver.WithLogger(logger)) tempDir := testhelper.TempDir(t) args := append(baseArgs[1:], tc.extraArgs...) args = append(args, repoPath, tempDir) cmd := exec.Command(baseArgs[0], args...) - cmd.Env = env + cmd.Env = append(env, tc.extraEnv...) cmd.Stderr = os.Stderr require.NoError(t, cmd.Run()) + + foundMethod := false + for _, e := range hook.AllEntries() { + t.Log(e.Data) + if e.Data["grpc.service"] != "gitaly.HookService" { + continue + } + + require.Equal(t, tc.method, e.Data["grpc.method"]) + foundMethod = true + } + require.True(t, foundMethod) }) } } diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go index 7ad3ff9f7..b8b278904 100644 --- a/internal/git/hooks_options.go +++ b/internal/git/hooks_options.go @@ -66,6 +66,10 @@ func WithPackObjectsHookEnv(ctx context.Context, repo *gitalypb.Repository, cfg Value: filepath.Join(cfg.BinDir, "gitaly-hooks"), }) + if featureflag.PackObjectsHookStream.IsEnabled(ctx) { + cc.env = append(cc.env, "GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM=1") + } + return nil } } diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index 8b9175f66..55e0eb720 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -24,6 +24,8 @@ var ( FindAllTagsPipeline = FeatureFlag{Name: "find_all_tags_pipeline", OnByDefault: false} // TxRemoveRepository enables transactionsal voting for the RemoveRepository RPC. TxRemoveRepository = FeatureFlag{Name: "tx_remove_repository", OnByDefault: false} + // GitalyHooksPackObjectsHookStream enables StreamRPC in 'gitaly-hooks git pack-objects'. + PackObjectsHookStream = FeatureFlag{Name: "pack_objects_hook_stream", OnByDefault: false} ) // All includes all feature flags. @@ -36,4 +38,5 @@ var All = []FeatureFlag{ ReplicateRepositoryDirectFetch, FindAllTagsPipeline, TxRemoveRepository, + PackObjectsHookStream, } |