diff options
author | James Liu <jliu@gitlab.com> | 2023-09-19 10:28:44 +0300 |
---|---|---|
committer | James Liu <jliu@gitlab.com> | 2023-09-19 10:28:44 +0300 |
commit | 45f05ea7efd431d07edcce5fe56e348b8ae95d6c (patch) | |
tree | 66f3d985a0485425e7eac04af8fd4b7dabf55ded | |
parent | 76b2eed1b30847ff69b6e96a9845388405101c6e (diff) | |
parent | eddc72ff69b431dce4fefcf53013585a74628536 (diff) |
Merge branch 'pks-ssh-unify-upload-command-execution' into 'master'
ssh: Unify logic to handle git-upload-pack(1) and git-upload-archive(1)
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6379
Merged-by: James Liu <jliu@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: James Liu <jliu@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: James Liu <jliu@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | internal/command/command.go | 8 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/monitor_stdin_command.go | 41 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_archive.go | 46 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_archive_test.go | 2 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_command.go | 114 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack.go | 67 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack_test.go | 23 |
7 files changed, 151 insertions, 150 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index 5f19dc1c3..ad8906e9d 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -585,12 +585,12 @@ func AllowedEnvironment(envs []string) []string { // ExitStatus will return the exit-code from an error returned by Wait(). func ExitStatus(err error) (int, bool) { - exitError, ok := err.(*exec.ExitError) - if !ok { - return 0, false + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + return exitErr.ExitCode(), true } - return exitError.ExitCode(), true + return 0, false } func methodFromContext(ctx context.Context) (service string, method string) { diff --git a/internal/gitaly/service/ssh/monitor_stdin_command.go b/internal/gitaly/service/ssh/monitor_stdin_command.go deleted file mode 100644 index 26af0eb26..000000000 --- a/internal/gitaly/service/ssh/monitor_stdin_command.go +++ /dev/null @@ -1,41 +0,0 @@ -package ssh - -import ( - "context" - "fmt" - "io" - - "gitlab.com/gitlab-org/gitaly/v16/internal/command" - "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" -) - -func monitorStdinCommand( - ctx context.Context, - gitCmdFactory git.CommandFactory, - repo *gitalypb.Repository, - stdin io.Reader, - stdout, stderr io.Writer, - sc git.Command, - opts ...git.CmdOpt, -) (*command.Command, *pktline.ReadMonitor, error) { - stdinPipe, monitor, cleanup, err := pktline.NewReadMonitor(ctx, stdin) - if err != nil { - return nil, nil, fmt.Errorf("create monitor: %w", err) - } - - cmd, err := gitCmdFactory.New(ctx, repo, sc, append([]git.CmdOpt{ - git.WithStdin(stdinPipe), - git.WithStdout(stdout), - git.WithStderr(stderr), - git.WithFinalizer(func(context.Context, *command.Command) { cleanup() }), - }, opts...)...) - stdinPipe.Close() // this now belongs to cmd - if err != nil { - cleanup() - return nil, nil, fmt.Errorf("start cmd: %w", err) - } - - return cmd, monitor, err -} diff --git a/internal/gitaly/service/ssh/upload_archive.go b/internal/gitaly/service/ssh/upload_archive.go index 7315235d6..c0f4d1f7d 100644 --- a/internal/gitaly/service/ssh/upload_archive.go +++ b/internal/gitaly/service/ssh/upload_archive.go @@ -1,7 +1,6 @@ package ssh import ( - "context" "errors" "fmt" "sync" @@ -32,8 +31,7 @@ func (s *server) SSHUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer } func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveServer, req *gitalypb.SSHUploadArchiveRequest) error { - ctx, cancelCtx := context.WithCancel(stream.Context()) - defer cancelCtx() + ctx := stream.Context() repoPath, err := s.locator.GetRepoPath(req.Repository) if err != nil { @@ -53,45 +51,23 @@ func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer return stream.Send(&gitalypb.SSHUploadArchiveResponse{Stderr: p}) }) - cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, git.Command{ - Name: "upload-archive", - Args: []string{repoPath}, - }) - if err != nil { - return err - } - timeoutTicker := s.uploadArchiveRequestTimeoutTickerFactory() - // upload-archive expects a list of options terminated by a flush packet: - // https://github.com/git/git/blob/v2.22.0/builtin/upload-archive.c#L38 - // - // Place a timeout on receiving the flush packet to mitigate use-after-check - // attacks - go monitor.Monitor(ctx, pktline.PktFlush(), timeoutTicker, cancelCtx) - - if err := cmd.Wait(); err != nil { - // When waiting for the packfile negotiation to end times out we'll cancel the local - // context, but not cancel the overall RPC's context. Our statushandler middleware - // thus cannot observe the fact that we're cancelling the context, and neither do we - // provide any valuable information to the caller that we do indeed kill the command - // because of our own internal timeout. - // - // We thus need to special-case the situation where we cancel our own context in - // order to provide that information and return a proper gRPC error code. - if ctx.Err() != nil && stream.Context().Err() == nil { - return structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err()) - } - + if err := runUploadCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, timeoutTicker, pktline.PktFlush(), git.Command{ + Name: "upload-archive", + Args: []string{repoPath}, + }); err != nil { if status, ok := command.ExitStatus(err); ok { - if sendErr := stream.Send(&gitalypb.SSHUploadArchiveResponse{ + if err := stream.Send(&gitalypb.SSHUploadArchiveResponse{ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, - }); sendErr != nil { - return sendErr + }); err != nil { + return fmt.Errorf("sending exit status: %w", err) } + return fmt.Errorf("send: %w", err) } - return fmt.Errorf("wait cmd: %w", err) + + return fmt.Errorf("running upload-archive: %w", err) } return stream.Send(&gitalypb.SSHUploadArchiveResponse{ diff --git a/internal/gitaly/service/ssh/upload_archive_test.go b/internal/gitaly/service/ssh/upload_archive_test.go index b5ca85835..175f8f863 100644 --- a/internal/gitaly/service/ssh/upload_archive_test.go +++ b/internal/gitaly/service/ssh/upload_archive_test.go @@ -59,7 +59,7 @@ func TestFailedUploadArchiveRequestDueToTimeout(t *testing.T) { // Because the client says nothing, the server would block. Because of // the timeout, it won't block forever, and return with a non-zero exit // code instead. - requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) { + requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-archive: waiting for negotiation: context canceled"), func() (int32, error) { resp, err := stream.Recv() if err != nil { return 0, err diff --git a/internal/gitaly/service/ssh/upload_command.go b/internal/gitaly/service/ssh/upload_command.go new file mode 100644 index 000000000..d8c9b3b01 --- /dev/null +++ b/internal/gitaly/service/ssh/upload_command.go @@ -0,0 +1,114 @@ +package ssh + +import ( + "context" + "fmt" + "io" + "strings" + + "gitlab.com/gitlab-org/gitaly/v16/internal/command" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// runUploadCommand runs an uploading command like git-upload-pack(1) or git-upload-archive(1). It serves multiple +// purposes: +// +// - It sets up a large buffer reader such that we can write the data more efficiently. +// +// - It logs how many bytes have been sent. +// +// - It installs a timeout such that we can address time-of-check-to-time-of-use-style races. Otherwise it would be +// possible to open the connection early, keep it open for an extended amount of time, and only do the negotiation of +// what is to be sent at a later point when permissions of the user might have changed. +func runUploadCommand( + rpcContext context.Context, + gitCmdFactory git.CommandFactory, + repo *gitalypb.Repository, + stdin io.Reader, + stdout, stderr io.Writer, + timeoutTicker helper.Ticker, + boundaryPacket []byte, + sc git.Command, + opts ...git.CmdOpt, +) error { + ctx, cancelCtx := context.WithCancel(rpcContext) + defer cancelCtx() + + var stderrBuilder strings.Builder + stderr = io.MultiWriter(stderr, &stderrBuilder) + + stdoutCounter := &helper.CountingWriter{W: stdout} + // Use large copy buffer to reduce the number of system calls + stdout = &largeBufferReaderFrom{Writer: stdoutCounter} + + stdinPipe, monitor, cleanup, err := pktline.NewReadMonitor(ctx, stdin) + if err != nil { + return fmt.Errorf("create monitor: %w", err) + } + + cmd, err := gitCmdFactory.New(ctx, repo, sc, append([]git.CmdOpt{ + git.WithStdin(stdinPipe), + git.WithStdout(stdout), + git.WithStderr(stderr), + git.WithFinalizer(func(context.Context, *command.Command) { cleanup() }), + }, opts...)...) + stdinPipe.Close() // this now belongs to cmd + if err != nil { + cleanup() + return fmt.Errorf("starting command: %w", err) + } + + go monitor.Monitor(ctx, boundaryPacket, timeoutTicker, cancelCtx) + + if err := cmd.Wait(); err != nil { + // The read monitor will cancel the local `ctx` when we do not observe a specific packet before the + // timeout ticker ticks. This is done to address a time-of-check-to-time-of-use-style race, where the + // client opens a connection but doesn't yet perform the negotiation of what data the server should + // send. Because access checks only happen at the beginning of the call, it may be the case that the + // client's permissions have changed since the RPC call started. + // + // To address this issue, we thus timebox the maximum amount of time between the start of the RPC call + // and the end of the negotiation phase. While this doesn't completely address the issue, it's the best + // we can reasonably do here. + // + // To distinguish cancellation of the overall RPC call and a timeout of the negotiation phase we use two + // different contexts. In the case where the local context has been cancelled, we know that the reason + // for cancellation is that the negotiation phase did not finish in time and thus return a more specific + // error. + if ctx.Err() != nil && rpcContext.Err() == nil { + return structerr.NewDeadlineExceeded("waiting for negotiation: %w", ctx.Err()) + } + + // A common error case is that the client is terminating the request prematurely, + // e.g. by killing their git-fetch(1) process because it's taking too long. This is + // an expected failure, but we're not in a position to easily tell this error apart + // from other errors returned by git-upload-pack(1). So we have to resort to parsing + // the error message returned by Git, and if we see that it matches we return an + // error with a `Canceled` error code. + // + // Note that we're being quite strict with how we match the error for now. We may + // have to make it more lenient in case we see that this doesn't catch all cases. + if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" { + return structerr.NewCanceled("user canceled the request") + } + + return fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String()) + } + + log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details") + + return err +} + +type largeBufferReaderFrom struct { + io.Writer +} + +func (rf *largeBufferReaderFrom) ReadFrom(r io.Reader) (int64, error) { + return io.CopyBuffer(rf.Writer, r, make([]byte, 64*1024)) +} diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index 5e8b9b850..d4d306533 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "strings" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/command" @@ -14,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/stream" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -74,14 +72,7 @@ type sshUploadPackRequest interface { GetGitProtocol() string } -func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) { - ctx, cancelCtx := context.WithCancel(rpcContext) - defer cancelCtx() - - stdoutCounter := &helper.CountingWriter{W: stdout} - // Use large copy buffer to reduce the number of system calls - stdout = &largeBufferReaderFrom{Writer: stdoutCounter} - +func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) { repo := req.GetRepository() repoPath, err := s.locator.GetRepoPath(repo) if err != nil { @@ -126,17 +117,6 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ git.WithPackObjectsHookEnv(repo, "ssh"), } - var stderrBuilder strings.Builder - stderr = io.MultiWriter(stderr, &stderrBuilder) - - cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, git.Command{ - Name: "upload-pack", - Args: []string{repoPath}, - }, commandOpts...) - if err != nil { - return nil, 0, err - } - timeoutTicker := s.uploadPackRequestTimeoutTickerFactory() // upload-pack negotiation is terminated by either a flush, or the "done" @@ -145,41 +125,14 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ // "flush" tells the server it can terminate, while "done" tells it to start // generating a packfile. Add a timeout to the second case to mitigate // use-after-check attacks. - go monitor.Monitor(ctx, pktline.PktDone(), timeoutTicker, cancelCtx) - - if err := cmd.Wait(); err != nil { + if err := runUploadCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, timeoutTicker, pktline.PktDone(), git.Command{ + Name: "upload-pack", + Args: []string{repoPath}, + }, commandOpts...); err != nil { status, _ := command.ExitStatus(err) - - // When waiting for the packfile negotiation to end times out we'll cancel the local - // context, but not cancel the overall RPC's context. Our statushandler middleware - // thus cannot observe the fact that we're cancelling the context, and neither do we - // provide any valuable information to the caller that we do indeed kill the command - // because of our own internal timeout. - // - // We thus need to special-case the situation where we cancel our own context in - // order to provide that information and return a proper gRPC error code. - if ctx.Err() != nil && rpcContext.Err() == nil { - return nil, status, structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err()) - } - - // A common error case is that the client is terminating the request prematurely, - // e.g. by killing their git-fetch(1) process because it's taking too long. This is - // an expected failure, but we're not in a position to easily tell this error apart - // from other errors returned by git-upload-pack(1). So we have to resort to parsing - // the error message returned by Git, and if we see that it matches we return an - // error with a `Canceled` error code. - // - // Note that we're being quite strict with how we match the error for now. We may - // have to make it more lenient in case we see that this doesn't catch all cases. - if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" { - return nil, status, structerr.NewCanceled("user canceled the fetch") - } - - return nil, status, fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String()) + return nil, status, fmt.Errorf("running upload-pack: %w", err) } - log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details") - return nil, 0, nil } @@ -194,14 +147,6 @@ func validateFirstUploadPackRequest(locator storage.Locator, req *gitalypb.SSHUp return nil } -type largeBufferReaderFrom struct { - io.Writer -} - -func (rf *largeBufferReaderFrom) ReadFrom(r io.Reader) (int64, error) { - return io.CopyBuffer(rf.Writer, r, make([]byte, 64*1024)) -} - func (s *server) SSHUploadPackWithSidechannel(ctx context.Context, req *gitalypb.SSHUploadPackWithSidechannelRequest) (*gitalypb.SSHUploadPackWithSidechannelResponse, error) { conn, err := sidechannel.OpenSidechannel(ctx) if err != nil { diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go index 328337f0d..483a653a8 100644 --- a/internal/gitaly/service/ssh/upload_pack_test.go +++ b/internal/gitaly/service/ssh/upload_pack_test.go @@ -141,7 +141,7 @@ func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) { // Because the client says nothing, the server would block. Because of // the timeout, it won't block forever, and return with a non-zero exit // code instead. - requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) { + requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-pack: waiting for negotiation: context canceled"), func() (int32, error) { resp, err := stream.Recv() if err != nil { return 0, err @@ -247,7 +247,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, expectedErr: structerr.NewInternal( - "cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"", + "running upload-pack: cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"", commitID, ), }, @@ -270,7 +270,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, expectedErr: structerr.NewInternal( - "cmd wait: exit status 128, stderr: %q", + "running upload-pack: cmd wait: exit status 128, stderr: %q", "fatal: git upload-pack: protocol error, expected to get object ID, not 'command=fetch'\n", ), }, @@ -290,7 +290,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, - expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", + expectedErr: structerr.NewInternal( + "running upload-pack: cmd wait: exit status 128, stderr: %q", "fatal: git upload-pack: not our ref "+strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())+"\n", ), }, @@ -308,7 +309,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, - expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", + expectedErr: structerr.NewInternal( + "running upload-pack: cmd wait: exit status 128, stderr: %q", "fatal: git upload-pack: protocol error, expected to get object ID, not 'want 1111 multi_ack'\n", ), }, @@ -341,7 +343,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, - expectedErr: structerr.NewCanceled("user canceled the fetch"), + expectedErr: structerr.NewCanceled("running upload-pack: user canceled the request"), }, { desc: "garbage", @@ -354,7 +356,10 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { require.NoError(t, clientConn.CloseWrite()) return nil }, - expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", "fatal: unknown capability 'foobar'\n"), + expectedErr: structerr.NewInternal( + "running upload-pack: cmd wait: exit status 128, stderr: %q", + "fatal: unknown capability 'foobar'\n", + ), }, { desc: "close and cancellation", @@ -438,6 +443,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { } func requireFailedSSHStream(t *testing.T, expectedErr error, recv func() (int32, error)) { + t.Helper() + done := make(chan struct{}) var code int32 var err error @@ -802,7 +809,7 @@ func TestUploadPack_gitFailure(t *testing.T) { require.NoError(t, stream.CloseSend()) err = recvUntilError(t, stream) - testhelper.RequireGrpcError(t, structerr.NewInternal(`cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err) + testhelper.RequireGrpcError(t, structerr.NewInternal(`running upload-pack: cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err) } func recvUntilError(t *testing.T, stream gitalypb.SSHService_SSHUploadPackClient) error { |