Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-18 13:45:19 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-19 08:55:48 +0300
commit8744378b62f7add4c2017628e5d0e5a4635876cc (patch)
tree4fe56237a075b197fd22aacd0e389f7f2a779590
parentebec60281e44f2e2da3f28d014bd09b0703dec9b (diff)
ssh: Unify logic to handle git-upload-pack(1) and git-upload-archive(1)
The logic to run git-upload-pack(1) and git-upload-archive(1) is quite complex. Despite the need to splice several standard streams, we also need to handle the case where negotiation of the data that is to be sent does not finish, which is done to address time-of-check-time-of-use attacks. Right now, much of the logic to do this setup is split up across the callers and `monitorStdinCommand()`. As there are multiple callers of the latter function, this means that some of the logic is duplicated. Furthermore, it is quite hard to change `monitorStdinCommand()` to alter its semantics, as it is not at all a self-contained building block. Refactor this to move much of the logic into `monitorStdinCommand()`, which brings us a bunch of advantages: - The monitoring logic is more self-contained so that we can amend it at a later point. This was the original motivation of this patch as it will be required to fix cases where we don't correctly detect that the mentioned negotiation has completed. - It allows us to use the same counting writer for SSHUploadArchive that we already use for SSHUploadPack. We thus get better insight into how much data we're typically returning in this RPC. - It allows us to use the same large buffer reader that we already use for SSHUploadPack. This reader uses a larger-than-usual buffer so that we can avoid many syscalls and should thus help to make SSHUploadArchive more efficient. - We can reuse the bits to detect cancellation via our pktline monitor in case negotiation does not finish in time. - We can reuse the error handling where the remote side hangs up unexpectedly, which is thrown by Git's pktline protocol in case the remote side hangs up in the middle of a write. This allows us to label user-cancelled requests as `codes.Canceled` instead of `codes.Internal` in SSHUploadArchive. So overall we gain quite a lot of benefits with this refactoring while also simplifying our code base. Changelog: fixed
-rw-r--r--internal/gitaly/service/ssh/monitor_stdin_command.go65
-rw-r--r--internal/gitaly/service/ssh/upload_archive.go46
-rw-r--r--internal/gitaly/service/ssh/upload_archive_test.go2
-rw-r--r--internal/gitaly/service/ssh/upload_pack.go59
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go23
5 files changed, 93 insertions, 102 deletions
diff --git a/internal/gitaly/service/ssh/monitor_stdin_command.go b/internal/gitaly/service/ssh/monitor_stdin_command.go
index 26af0eb26..b8d54e120 100644
--- a/internal/gitaly/service/ssh/monitor_stdin_command.go
+++ b/internal/gitaly/service/ssh/monitor_stdin_command.go
@@ -4,25 +4,41 @@ import (
"context"
"fmt"
"io"
+ "strings"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
func monitorStdinCommand(
- ctx context.Context,
+ rpcContext context.Context,
gitCmdFactory git.CommandFactory,
repo *gitalypb.Repository,
stdin io.Reader,
stdout, stderr io.Writer,
+ timeoutTicker helper.Ticker,
+ boundaryPacket []byte,
sc git.Command,
opts ...git.CmdOpt,
-) (*command.Command, *pktline.ReadMonitor, error) {
+) error {
+ ctx, cancelCtx := context.WithCancel(rpcContext)
+ defer cancelCtx()
+
+ var stderrBuilder strings.Builder
+ stderr = io.MultiWriter(stderr, &stderrBuilder)
+
+ stdoutCounter := &helper.CountingWriter{W: stdout}
+ // Use large copy buffer to reduce the number of system calls
+ stdout = &largeBufferReaderFrom{Writer: stdoutCounter}
+
stdinPipe, monitor, cleanup, err := pktline.NewReadMonitor(ctx, stdin)
if err != nil {
- return nil, nil, fmt.Errorf("create monitor: %w", err)
+ return fmt.Errorf("create monitor: %w", err)
}
cmd, err := gitCmdFactory.New(ctx, repo, sc, append([]git.CmdOpt{
@@ -34,8 +50,47 @@ func monitorStdinCommand(
stdinPipe.Close() // this now belongs to cmd
if err != nil {
cleanup()
- return nil, nil, fmt.Errorf("start cmd: %w", err)
+ return fmt.Errorf("starting command: %w", err)
}
- return cmd, monitor, err
+ go monitor.Monitor(ctx, boundaryPacket, timeoutTicker, cancelCtx)
+
+ if err := cmd.Wait(); err != nil {
+ // The read monitor will cancel the local `ctx` when we do not observe a specific packet before the
+ // timeout ticker ticks. This is done to address a time-of-check-to-time-of-use-style race, where the
+ // client opens a connection but doesn't yet perform the negotiation of what data the server should
+ // send. Because access checks only happen at the beginning of the call, it may be the case that the
+ // client's permissions have changed since the RPC call started.
+ //
+ // To address this issue, we thus timebox the maximum amount of time between the start of the RPC call
+ // and the end of the negotiation phase. While this doesn't completely address the issue, it's the best
+ // we can reasonably do here.
+ //
+ // To distinguish cancellation of the overall RPC call and a timeout of the negotiation phase we use two
+ // different contexts. In the case where the local context has been cancelled, we know that the reason
+ // for cancellation is that the negotiation phase did not finish in time and thus return a more specific
+ // error.
+ if ctx.Err() != nil && rpcContext.Err() == nil {
+ return structerr.NewDeadlineExceeded("waiting for negotiation: %w", ctx.Err())
+ }
+
+ // A common error case is that the client is terminating the request prematurely,
+ // e.g. by killing their git-fetch(1) process because it's taking too long. This is
+ // an expected failure, but we're not in a position to easily tell this error apart
+ // from other errors returned by git-upload-pack(1). So we have to resort to parsing
+ // the error message returned by Git, and if we see that it matches we return an
+ // error with a `Canceled` error code.
+ //
+ // Note that we're being quite strict with how we match the error for now. We may
+ // have to make it more lenient in case we see that this doesn't catch all cases.
+ if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" {
+ return structerr.NewCanceled("user canceled the request")
+ }
+
+ return fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String())
+ }
+
+ log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details")
+
+ return err
}
diff --git a/internal/gitaly/service/ssh/upload_archive.go b/internal/gitaly/service/ssh/upload_archive.go
index 7315235d6..4451383c4 100644
--- a/internal/gitaly/service/ssh/upload_archive.go
+++ b/internal/gitaly/service/ssh/upload_archive.go
@@ -1,7 +1,6 @@
package ssh
import (
- "context"
"errors"
"fmt"
"sync"
@@ -32,8 +31,7 @@ func (s *server) SSHUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer
}
func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveServer, req *gitalypb.SSHUploadArchiveRequest) error {
- ctx, cancelCtx := context.WithCancel(stream.Context())
- defer cancelCtx()
+ ctx := stream.Context()
repoPath, err := s.locator.GetRepoPath(req.Repository)
if err != nil {
@@ -53,45 +51,23 @@ func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer
return stream.Send(&gitalypb.SSHUploadArchiveResponse{Stderr: p})
})
- cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, git.Command{
- Name: "upload-archive",
- Args: []string{repoPath},
- })
- if err != nil {
- return err
- }
-
timeoutTicker := s.uploadArchiveRequestTimeoutTickerFactory()
- // upload-archive expects a list of options terminated by a flush packet:
- // https://github.com/git/git/blob/v2.22.0/builtin/upload-archive.c#L38
- //
- // Place a timeout on receiving the flush packet to mitigate use-after-check
- // attacks
- go monitor.Monitor(ctx, pktline.PktFlush(), timeoutTicker, cancelCtx)
-
- if err := cmd.Wait(); err != nil {
- // When waiting for the packfile negotiation to end times out we'll cancel the local
- // context, but not cancel the overall RPC's context. Our statushandler middleware
- // thus cannot observe the fact that we're cancelling the context, and neither do we
- // provide any valuable information to the caller that we do indeed kill the command
- // because of our own internal timeout.
- //
- // We thus need to special-case the situation where we cancel our own context in
- // order to provide that information and return a proper gRPC error code.
- if ctx.Err() != nil && stream.Context().Err() == nil {
- return structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err())
- }
-
+ if err := monitorStdinCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, timeoutTicker, pktline.PktFlush(), git.Command{
+ Name: "upload-archive",
+ Args: []string{repoPath},
+ }); err != nil {
if status, ok := command.ExitStatus(err); ok {
- if sendErr := stream.Send(&gitalypb.SSHUploadArchiveResponse{
+ if err := stream.Send(&gitalypb.SSHUploadArchiveResponse{
ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
- }); sendErr != nil {
- return sendErr
+ }); err != nil {
+ return fmt.Errorf("sending exit status: %w", err)
}
+
return fmt.Errorf("send: %w", err)
}
- return fmt.Errorf("wait cmd: %w", err)
+
+ return fmt.Errorf("running upload-archive: %w", err)
}
return stream.Send(&gitalypb.SSHUploadArchiveResponse{
diff --git a/internal/gitaly/service/ssh/upload_archive_test.go b/internal/gitaly/service/ssh/upload_archive_test.go
index b5ca85835..175f8f863 100644
--- a/internal/gitaly/service/ssh/upload_archive_test.go
+++ b/internal/gitaly/service/ssh/upload_archive_test.go
@@ -59,7 +59,7 @@ func TestFailedUploadArchiveRequestDueToTimeout(t *testing.T) {
// Because the client says nothing, the server would block. Because of
// the timeout, it won't block forever, and return with a non-zero exit
// code instead.
- requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) {
+ requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-archive: waiting for negotiation: context canceled"), func() (int32, error) {
resp, err := stream.Recv()
if err != nil {
return 0, err
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go
index 5e8b9b850..3f73d0e16 100644
--- a/internal/gitaly/service/ssh/upload_pack.go
+++ b/internal/gitaly/service/ssh/upload_pack.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "strings"
"sync"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
@@ -14,7 +13,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
- "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/stream"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
@@ -74,14 +72,7 @@ type sshUploadPackRequest interface {
GetGitProtocol() string
}
-func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) {
- ctx, cancelCtx := context.WithCancel(rpcContext)
- defer cancelCtx()
-
- stdoutCounter := &helper.CountingWriter{W: stdout}
- // Use large copy buffer to reduce the number of system calls
- stdout = &largeBufferReaderFrom{Writer: stdoutCounter}
-
+func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) {
repo := req.GetRepository()
repoPath, err := s.locator.GetRepoPath(repo)
if err != nil {
@@ -126,17 +117,6 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ
git.WithPackObjectsHookEnv(repo, "ssh"),
}
- var stderrBuilder strings.Builder
- stderr = io.MultiWriter(stderr, &stderrBuilder)
-
- cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, git.Command{
- Name: "upload-pack",
- Args: []string{repoPath},
- }, commandOpts...)
- if err != nil {
- return nil, 0, err
- }
-
timeoutTicker := s.uploadPackRequestTimeoutTickerFactory()
// upload-pack negotiation is terminated by either a flush, or the "done"
@@ -145,41 +125,14 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ
// "flush" tells the server it can terminate, while "done" tells it to start
// generating a packfile. Add a timeout to the second case to mitigate
// use-after-check attacks.
- go monitor.Monitor(ctx, pktline.PktDone(), timeoutTicker, cancelCtx)
-
- if err := cmd.Wait(); err != nil {
+ if err := monitorStdinCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, timeoutTicker, pktline.PktDone(), git.Command{
+ Name: "upload-pack",
+ Args: []string{repoPath},
+ }, commandOpts...); err != nil {
status, _ := command.ExitStatus(err)
-
- // When waiting for the packfile negotiation to end times out we'll cancel the local
- // context, but not cancel the overall RPC's context. Our statushandler middleware
- // thus cannot observe the fact that we're cancelling the context, and neither do we
- // provide any valuable information to the caller that we do indeed kill the command
- // because of our own internal timeout.
- //
- // We thus need to special-case the situation where we cancel our own context in
- // order to provide that information and return a proper gRPC error code.
- if ctx.Err() != nil && rpcContext.Err() == nil {
- return nil, status, structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err())
- }
-
- // A common error case is that the client is terminating the request prematurely,
- // e.g. by killing their git-fetch(1) process because it's taking too long. This is
- // an expected failure, but we're not in a position to easily tell this error apart
- // from other errors returned by git-upload-pack(1). So we have to resort to parsing
- // the error message returned by Git, and if we see that it matches we return an
- // error with a `Canceled` error code.
- //
- // Note that we're being quite strict with how we match the error for now. We may
- // have to make it more lenient in case we see that this doesn't catch all cases.
- if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" {
- return nil, status, structerr.NewCanceled("user canceled the fetch")
- }
-
- return nil, status, fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String())
+ return nil, status, fmt.Errorf("running upload-pack: %w", err)
}
- log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details")
-
return nil, 0, nil
}
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index 328337f0d..483a653a8 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -141,7 +141,7 @@ func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) {
// Because the client says nothing, the server would block. Because of
// the timeout, it won't block forever, and return with a non-zero exit
// code instead.
- requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) {
+ requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-pack: waiting for negotiation: context canceled"), func() (int32, error) {
resp, err := stream.Recv()
if err != nil {
return 0, err
@@ -247,7 +247,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
expectedErr: structerr.NewInternal(
- "cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"",
+ "running upload-pack: cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"",
commitID,
),
},
@@ -270,7 +270,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
expectedErr: structerr.NewInternal(
- "cmd wait: exit status 128, stderr: %q",
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
"fatal: git upload-pack: protocol error, expected to get object ID, not 'command=fetch'\n",
),
},
@@ -290,7 +290,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
- expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q",
+ expectedErr: structerr.NewInternal(
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
"fatal: git upload-pack: not our ref "+strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())+"\n",
),
},
@@ -308,7 +309,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
- expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q",
+ expectedErr: structerr.NewInternal(
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
"fatal: git upload-pack: protocol error, expected to get object ID, not 'want 1111 multi_ack'\n",
),
},
@@ -341,7 +343,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
- expectedErr: structerr.NewCanceled("user canceled the fetch"),
+ expectedErr: structerr.NewCanceled("running upload-pack: user canceled the request"),
},
{
desc: "garbage",
@@ -354,7 +356,10 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
require.NoError(t, clientConn.CloseWrite())
return nil
},
- expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", "fatal: unknown capability 'foobar'\n"),
+ expectedErr: structerr.NewInternal(
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
+ "fatal: unknown capability 'foobar'\n",
+ ),
},
{
desc: "close and cancellation",
@@ -438,6 +443,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
}
func requireFailedSSHStream(t *testing.T, expectedErr error, recv func() (int32, error)) {
+ t.Helper()
+
done := make(chan struct{})
var code int32
var err error
@@ -802,7 +809,7 @@ func TestUploadPack_gitFailure(t *testing.T) {
require.NoError(t, stream.CloseSend())
err = recvUntilError(t, stream)
- testhelper.RequireGrpcError(t, structerr.NewInternal(`cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err)
+ testhelper.RequireGrpcError(t, structerr.NewInternal(`running upload-pack: cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err)
}
func recvUntilError(t *testing.T, stream gitalypb.SSHService_SSHUploadPackClient) error {