diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-18 13:45:19 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-19 08:55:48 +0300 |
commit | 8744378b62f7add4c2017628e5d0e5a4635876cc (patch) | |
tree | 4fe56237a075b197fd22aacd0e389f7f2a779590 | |
parent | ebec60281e44f2e2da3f28d014bd09b0703dec9b (diff) |
ssh: Unify logic to handle git-upload-pack(1) and git-upload-archive(1)
The logic to run git-upload-pack(1) and git-upload-archive(1) is quite
complex. Despite the need to splice several standard streams, we also
need to handle the case where negotiation of the data that is to be sent
does not finish, which is done to address time-of-check-time-of-use
attacks.
Right now, much of the logic to do this setup is split up across the
callers and `monitorStdinCommand()`. As there are multiple callers of
the latter function, this means that some of the logic is duplicated.
Furthermore, it is quite hard to change `monitorStdinCommand()` to alter
its semantics, as it is not at all a self-contained building block.
Refactor this to move much of the logic into `monitorStdinCommand()`,
which brings us a bunch of advantages:
- The monitoring logic is more self-contained so that we can amend
it at a later point. This was the original motivation of this
patch as it will be required to fix cases where we don't correctly
detect that the mentioned negotiation has completed.
- It allows us to use the same counting writer for SSHUploadArchive
that we already use for SSHUploadPack. We thus get better insight
into how much data we're typically returning in this RPC.
- It allows us to use the same large buffer reader that we already
use for SSHUploadPack. This reader uses a larger-than-usual buffer
so that we can avoid many syscalls and should thus help to make
SSHUploadArchive more efficient.
- We can reuse the bits to detect cancellation via our pktline
monitor in case negotiation does not finish in time.
- We can reuse the error handling where the remote side hangs up
unexpectedly, which is thrown by Git's pktline protocol in case
the remote side hangs up in the middle of a write. This allows us
to label user-cancelled requests as `codes.Canceled` instead of
`codes.Internal` in SSHUploadArchive.
So overall we gain quite a lot of benefits with this refactoring while
also simplifying our code base.
Changelog: fixed
-rw-r--r-- | internal/gitaly/service/ssh/monitor_stdin_command.go | 65 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_archive.go | 46 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_archive_test.go | 2 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack.go | 59 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack_test.go | 23 |
5 files changed, 93 insertions, 102 deletions
diff --git a/internal/gitaly/service/ssh/monitor_stdin_command.go b/internal/gitaly/service/ssh/monitor_stdin_command.go index 26af0eb26..b8d54e120 100644 --- a/internal/gitaly/service/ssh/monitor_stdin_command.go +++ b/internal/gitaly/service/ssh/monitor_stdin_command.go @@ -4,25 +4,41 @@ import ( "context" "fmt" "io" + "strings" "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func monitorStdinCommand( - ctx context.Context, + rpcContext context.Context, gitCmdFactory git.CommandFactory, repo *gitalypb.Repository, stdin io.Reader, stdout, stderr io.Writer, + timeoutTicker helper.Ticker, + boundaryPacket []byte, sc git.Command, opts ...git.CmdOpt, -) (*command.Command, *pktline.ReadMonitor, error) { +) error { + ctx, cancelCtx := context.WithCancel(rpcContext) + defer cancelCtx() + + var stderrBuilder strings.Builder + stderr = io.MultiWriter(stderr, &stderrBuilder) + + stdoutCounter := &helper.CountingWriter{W: stdout} + // Use large copy buffer to reduce the number of system calls + stdout = &largeBufferReaderFrom{Writer: stdoutCounter} + stdinPipe, monitor, cleanup, err := pktline.NewReadMonitor(ctx, stdin) if err != nil { - return nil, nil, fmt.Errorf("create monitor: %w", err) + return fmt.Errorf("create monitor: %w", err) } cmd, err := gitCmdFactory.New(ctx, repo, sc, append([]git.CmdOpt{ @@ -34,8 +50,47 @@ func monitorStdinCommand( stdinPipe.Close() // this now belongs to cmd if err != nil { cleanup() - return nil, nil, fmt.Errorf("start cmd: %w", err) + return fmt.Errorf("starting command: %w", err) } - return cmd, monitor, err + go monitor.Monitor(ctx, boundaryPacket, timeoutTicker, cancelCtx) + + if err := cmd.Wait(); err != nil { + // The read monitor will cancel the local `ctx` when we do not observe a specific packet before the + // timeout ticker ticks. This is done to address a time-of-check-to-time-of-use-style race, where the + // client opens a connection but doesn't yet perform the negotiation of what data the server should + // send. Because access checks only happen at the beginning of the call, it may be the case that the + // client's permissions have changed since the RPC call started. + // + // To address this issue, we thus timebox the maximum amount of time between the start of the RPC call + // and the end of the negotiation phase. While this doesn't completely address the issue, it's the best + // we can reasonably do here. + // + // To distinguish cancellation of the overall RPC call and a timeout of the negotiation phase we use two + // different contexts. In the case where the local context has been cancelled, we know that the reason + // for cancellation is that the negotiation phase did not finish in time and thus return a more specific + // error. + if ctx.Err() != nil && rpcContext.Err() == nil { + return structerr.NewDeadlineExceeded("waiting for negotiation: %w", ctx.Err()) + } + + // A common error case is that the client is terminating the request prematurely, + // e.g. by killing their git-fetch(1) process because it's taking too long. This is + // an expected failure, but we're not in a position to easily tell this error apart + // from other errors returned by git-upload-pack(1). So we have to resort to parsing + // the error message returned by Git, and if we see that it matches we return an + // error with a `Canceled` error code. + // + // Note that we're being quite strict with how we match the error for now. We may + // have to make it more lenient in case we see that this doesn't catch all cases. + if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" { + return structerr.NewCanceled("user canceled the request") + } + + return fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String()) + } + + log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details") + + return err } diff --git a/internal/gitaly/service/ssh/upload_archive.go b/internal/gitaly/service/ssh/upload_archive.go index 7315235d6..4451383c4 100644 --- a/internal/gitaly/service/ssh/upload_archive.go +++ b/internal/gitaly/service/ssh/upload_archive.go @@ -1,7 +1,6 @@ package ssh import ( - "context" "errors" "fmt" "sync" @@ -32,8 +31,7 @@ func (s *server) SSHUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer } func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveServer, req *gitalypb.SSHUploadArchiveRequest) error { - ctx, cancelCtx := context.WithCancel(stream.Context()) - defer cancelCtx() + ctx := stream.Context() repoPath, err := s.locator.GetRepoPath(req.Repository) if err != nil { @@ -53,45 +51,23 @@ func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer return stream.Send(&gitalypb.SSHUploadArchiveResponse{Stderr: p}) }) - cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, git.Command{ - Name: "upload-archive", - Args: []string{repoPath}, - }) - if err != nil { - return err - } - timeoutTicker := s.uploadArchiveRequestTimeoutTickerFactory() - // upload-archive expects a list of options terminated by a flush packet: - // https://github.com/git/git/blob/v2.22.0/builtin/upload-archive.c#L38 - // - // Place a timeout on receiving the flush packet to mitigate use-after-check - // attacks - go monitor.Monitor(ctx, pktline.PktFlush(), timeoutTicker, cancelCtx) - - if err := cmd.Wait(); err != nil { - // When waiting for the packfile negotiation to end times out we'll cancel the local - // context, but not cancel the overall RPC's context. Our statushandler middleware - // thus cannot observe the fact that we're cancelling the context, and neither do we - // provide any valuable information to the caller that we do indeed kill the command - // because of our own internal timeout. - // - // We thus need to special-case the situation where we cancel our own context in - // order to provide that information and return a proper gRPC error code. - if ctx.Err() != nil && stream.Context().Err() == nil { - return structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err()) - } - + if err := monitorStdinCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, timeoutTicker, pktline.PktFlush(), git.Command{ + Name: "upload-archive", + Args: []string{repoPath}, + }); err != nil { if status, ok := command.ExitStatus(err); ok { - if sendErr := stream.Send(&gitalypb.SSHUploadArchiveResponse{ + if err := stream.Send(&gitalypb.SSHUploadArchiveResponse{ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, - }); sendErr != nil { - return sendErr + }); err != nil { + return fmt.Errorf("sending exit status: %w", err) } + return fmt.Errorf("send: %w", err) } - return fmt.Errorf("wait cmd: %w", err) + + return fmt.Errorf("running upload-archive: %w", err) } return stream.Send(&gitalypb.SSHUploadArchiveResponse{ diff --git a/internal/gitaly/service/ssh/upload_archive_test.go b/internal/gitaly/service/ssh/upload_archive_test.go index b5ca85835..175f8f863 100644 --- a/internal/gitaly/service/ssh/upload_archive_test.go +++ b/internal/gitaly/service/ssh/upload_archive_test.go @@ -59,7 +59,7 @@ func TestFailedUploadArchiveRequestDueToTimeout(t *testing.T) { // Because the client says nothing, the server would block. Because of // the timeout, it won't block forever, and return with a non-zero exit // code instead. - requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) { + requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-archive: waiting for negotiation: context canceled"), func() (int32, error) { resp, err := stream.Recv() if err != nil { return 0, err diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index 5e8b9b850..3f73d0e16 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "strings" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/command" @@ -14,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/stream" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -74,14 +72,7 @@ type sshUploadPackRequest interface { GetGitProtocol() string } -func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) { - ctx, cancelCtx := context.WithCancel(rpcContext) - defer cancelCtx() - - stdoutCounter := &helper.CountingWriter{W: stdout} - // Use large copy buffer to reduce the number of system calls - stdout = &largeBufferReaderFrom{Writer: stdoutCounter} - +func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) { repo := req.GetRepository() repoPath, err := s.locator.GetRepoPath(repo) if err != nil { @@ -126,17 +117,6 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ git.WithPackObjectsHookEnv(repo, "ssh"), } - var stderrBuilder strings.Builder - stderr = io.MultiWriter(stderr, &stderrBuilder) - - cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, git.Command{ - Name: "upload-pack", - Args: []string{repoPath}, - }, commandOpts...) - if err != nil { - return nil, 0, err - } - timeoutTicker := s.uploadPackRequestTimeoutTickerFactory() // upload-pack negotiation is terminated by either a flush, or the "done" @@ -145,41 +125,14 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ // "flush" tells the server it can terminate, while "done" tells it to start // generating a packfile. Add a timeout to the second case to mitigate // use-after-check attacks. - go monitor.Monitor(ctx, pktline.PktDone(), timeoutTicker, cancelCtx) - - if err := cmd.Wait(); err != nil { + if err := monitorStdinCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, timeoutTicker, pktline.PktDone(), git.Command{ + Name: "upload-pack", + Args: []string{repoPath}, + }, commandOpts...); err != nil { status, _ := command.ExitStatus(err) - - // When waiting for the packfile negotiation to end times out we'll cancel the local - // context, but not cancel the overall RPC's context. Our statushandler middleware - // thus cannot observe the fact that we're cancelling the context, and neither do we - // provide any valuable information to the caller that we do indeed kill the command - // because of our own internal timeout. - // - // We thus need to special-case the situation where we cancel our own context in - // order to provide that information and return a proper gRPC error code. - if ctx.Err() != nil && rpcContext.Err() == nil { - return nil, status, structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err()) - } - - // A common error case is that the client is terminating the request prematurely, - // e.g. by killing their git-fetch(1) process because it's taking too long. This is - // an expected failure, but we're not in a position to easily tell this error apart - // from other errors returned by git-upload-pack(1). So we have to resort to parsing - // the error message returned by Git, and if we see that it matches we return an - // error with a `Canceled` error code. - // - // Note that we're being quite strict with how we match the error for now. We may - // have to make it more lenient in case we see that this doesn't catch all cases. - if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" { - return nil, status, structerr.NewCanceled("user canceled the fetch") - } - - return nil, status, fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String()) + return nil, status, fmt.Errorf("running upload-pack: %w", err) } - log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details") - return nil, 0, nil } diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go index 328337f0d..483a653a8 100644 --- a/internal/gitaly/service/ssh/upload_pack_test.go +++ b/internal/gitaly/service/ssh/upload_pack_test.go @@ -141,7 +141,7 @@ func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) { // Because the client says nothing, the server would block. Because of // the timeout, it won't block forever, and return with a non-zero exit // code instead. - requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) { + requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-pack: waiting for negotiation: context canceled"), func() (int32, error) { resp, err := stream.Recv() if err != nil { return 0, err @@ -247,7 +247,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, expectedErr: structerr.NewInternal( - "cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"", + "running upload-pack: cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"", commitID, ), }, @@ -270,7 +270,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, expectedErr: structerr.NewInternal( - "cmd wait: exit status 128, stderr: %q", + "running upload-pack: cmd wait: exit status 128, stderr: %q", "fatal: git upload-pack: protocol error, expected to get object ID, not 'command=fetch'\n", ), }, @@ -290,7 +290,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, - expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", + expectedErr: structerr.NewInternal( + "running upload-pack: cmd wait: exit status 128, stderr: %q", "fatal: git upload-pack: not our ref "+strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())+"\n", ), }, @@ -308,7 +309,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, - expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", + expectedErr: structerr.NewInternal( + "running upload-pack: cmd wait: exit status 128, stderr: %q", "fatal: git upload-pack: protocol error, expected to get object ID, not 'want 1111 multi_ack'\n", ), }, @@ -341,7 +343,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { return nil }, - expectedErr: structerr.NewCanceled("user canceled the fetch"), + expectedErr: structerr.NewCanceled("running upload-pack: user canceled the request"), }, { desc: "garbage", @@ -354,7 +356,10 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { require.NoError(t, clientConn.CloseWrite()) return nil }, - expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", "fatal: unknown capability 'foobar'\n"), + expectedErr: structerr.NewInternal( + "running upload-pack: cmd wait: exit status 128, stderr: %q", + "fatal: unknown capability 'foobar'\n", + ), }, { desc: "close and cancellation", @@ -438,6 +443,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) { } func requireFailedSSHStream(t *testing.T, expectedErr error, recv func() (int32, error)) { + t.Helper() + done := make(chan struct{}) var code int32 var err error @@ -802,7 +809,7 @@ func TestUploadPack_gitFailure(t *testing.T) { require.NoError(t, stream.CloseSend()) err = recvUntilError(t, stream) - testhelper.RequireGrpcError(t, structerr.NewInternal(`cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err) + testhelper.RequireGrpcError(t, structerr.NewInternal(`running upload-pack: cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err) } func recvUntilError(t *testing.T, stream gitalypb.SSHService_SSHUploadPackClient) error { |