Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-09-19 10:28:44 +0300
committerJames Liu <jliu@gitlab.com>2023-09-19 10:28:44 +0300
commit45f05ea7efd431d07edcce5fe56e348b8ae95d6c (patch)
tree66f3d985a0485425e7eac04af8fd4b7dabf55ded
parent76b2eed1b30847ff69b6e96a9845388405101c6e (diff)
parenteddc72ff69b431dce4fefcf53013585a74628536 (diff)
Merge branch 'pks-ssh-unify-upload-command-execution' into 'master'
ssh: Unify logic to handle git-upload-pack(1) and git-upload-archive(1) See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6379 Merged-by: James Liu <jliu@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: James Liu <jliu@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: James Liu <jliu@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--internal/command/command.go8
-rw-r--r--internal/gitaly/service/ssh/monitor_stdin_command.go41
-rw-r--r--internal/gitaly/service/ssh/upload_archive.go46
-rw-r--r--internal/gitaly/service/ssh/upload_archive_test.go2
-rw-r--r--internal/gitaly/service/ssh/upload_command.go114
-rw-r--r--internal/gitaly/service/ssh/upload_pack.go67
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go23
7 files changed, 151 insertions, 150 deletions
diff --git a/internal/command/command.go b/internal/command/command.go
index 5f19dc1c3..ad8906e9d 100644
--- a/internal/command/command.go
+++ b/internal/command/command.go
@@ -585,12 +585,12 @@ func AllowedEnvironment(envs []string) []string {
// ExitStatus will return the exit-code from an error returned by Wait().
func ExitStatus(err error) (int, bool) {
- exitError, ok := err.(*exec.ExitError)
- if !ok {
- return 0, false
+ var exitErr *exec.ExitError
+ if errors.As(err, &exitErr) {
+ return exitErr.ExitCode(), true
}
- return exitError.ExitCode(), true
+ return 0, false
}
func methodFromContext(ctx context.Context) (service string, method string) {
diff --git a/internal/gitaly/service/ssh/monitor_stdin_command.go b/internal/gitaly/service/ssh/monitor_stdin_command.go
deleted file mode 100644
index 26af0eb26..000000000
--- a/internal/gitaly/service/ssh/monitor_stdin_command.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package ssh
-
-import (
- "context"
- "fmt"
- "io"
-
- "gitlab.com/gitlab-org/gitaly/v16/internal/command"
- "gitlab.com/gitlab-org/gitaly/v16/internal/git"
- "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
- "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
-)
-
-func monitorStdinCommand(
- ctx context.Context,
- gitCmdFactory git.CommandFactory,
- repo *gitalypb.Repository,
- stdin io.Reader,
- stdout, stderr io.Writer,
- sc git.Command,
- opts ...git.CmdOpt,
-) (*command.Command, *pktline.ReadMonitor, error) {
- stdinPipe, monitor, cleanup, err := pktline.NewReadMonitor(ctx, stdin)
- if err != nil {
- return nil, nil, fmt.Errorf("create monitor: %w", err)
- }
-
- cmd, err := gitCmdFactory.New(ctx, repo, sc, append([]git.CmdOpt{
- git.WithStdin(stdinPipe),
- git.WithStdout(stdout),
- git.WithStderr(stderr),
- git.WithFinalizer(func(context.Context, *command.Command) { cleanup() }),
- }, opts...)...)
- stdinPipe.Close() // this now belongs to cmd
- if err != nil {
- cleanup()
- return nil, nil, fmt.Errorf("start cmd: %w", err)
- }
-
- return cmd, monitor, err
-}
diff --git a/internal/gitaly/service/ssh/upload_archive.go b/internal/gitaly/service/ssh/upload_archive.go
index 7315235d6..c0f4d1f7d 100644
--- a/internal/gitaly/service/ssh/upload_archive.go
+++ b/internal/gitaly/service/ssh/upload_archive.go
@@ -1,7 +1,6 @@
package ssh
import (
- "context"
"errors"
"fmt"
"sync"
@@ -32,8 +31,7 @@ func (s *server) SSHUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer
}
func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveServer, req *gitalypb.SSHUploadArchiveRequest) error {
- ctx, cancelCtx := context.WithCancel(stream.Context())
- defer cancelCtx()
+ ctx := stream.Context()
repoPath, err := s.locator.GetRepoPath(req.Repository)
if err != nil {
@@ -53,45 +51,23 @@ func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer
return stream.Send(&gitalypb.SSHUploadArchiveResponse{Stderr: p})
})
- cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, git.Command{
- Name: "upload-archive",
- Args: []string{repoPath},
- })
- if err != nil {
- return err
- }
-
timeoutTicker := s.uploadArchiveRequestTimeoutTickerFactory()
- // upload-archive expects a list of options terminated by a flush packet:
- // https://github.com/git/git/blob/v2.22.0/builtin/upload-archive.c#L38
- //
- // Place a timeout on receiving the flush packet to mitigate use-after-check
- // attacks
- go monitor.Monitor(ctx, pktline.PktFlush(), timeoutTicker, cancelCtx)
-
- if err := cmd.Wait(); err != nil {
- // When waiting for the packfile negotiation to end times out we'll cancel the local
- // context, but not cancel the overall RPC's context. Our statushandler middleware
- // thus cannot observe the fact that we're cancelling the context, and neither do we
- // provide any valuable information to the caller that we do indeed kill the command
- // because of our own internal timeout.
- //
- // We thus need to special-case the situation where we cancel our own context in
- // order to provide that information and return a proper gRPC error code.
- if ctx.Err() != nil && stream.Context().Err() == nil {
- return structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err())
- }
-
+ if err := runUploadCommand(ctx, s.gitCmdFactory, req.GetRepository(), stdin, stdout, stderr, timeoutTicker, pktline.PktFlush(), git.Command{
+ Name: "upload-archive",
+ Args: []string{repoPath},
+ }); err != nil {
if status, ok := command.ExitStatus(err); ok {
- if sendErr := stream.Send(&gitalypb.SSHUploadArchiveResponse{
+ if err := stream.Send(&gitalypb.SSHUploadArchiveResponse{
ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
- }); sendErr != nil {
- return sendErr
+ }); err != nil {
+ return fmt.Errorf("sending exit status: %w", err)
}
+
return fmt.Errorf("send: %w", err)
}
- return fmt.Errorf("wait cmd: %w", err)
+
+ return fmt.Errorf("running upload-archive: %w", err)
}
return stream.Send(&gitalypb.SSHUploadArchiveResponse{
diff --git a/internal/gitaly/service/ssh/upload_archive_test.go b/internal/gitaly/service/ssh/upload_archive_test.go
index b5ca85835..175f8f863 100644
--- a/internal/gitaly/service/ssh/upload_archive_test.go
+++ b/internal/gitaly/service/ssh/upload_archive_test.go
@@ -59,7 +59,7 @@ func TestFailedUploadArchiveRequestDueToTimeout(t *testing.T) {
// Because the client says nothing, the server would block. Because of
// the timeout, it won't block forever, and return with a non-zero exit
// code instead.
- requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) {
+ requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-archive: waiting for negotiation: context canceled"), func() (int32, error) {
resp, err := stream.Recv()
if err != nil {
return 0, err
diff --git a/internal/gitaly/service/ssh/upload_command.go b/internal/gitaly/service/ssh/upload_command.go
new file mode 100644
index 000000000..d8c9b3b01
--- /dev/null
+++ b/internal/gitaly/service/ssh/upload_command.go
@@ -0,0 +1,114 @@
+package ssh
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strings"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/command"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+)
+
+// runUploadCommand runs an uploading command like git-upload-pack(1) or git-upload-archive(1). It serves multiple
+// purposes:
+//
+// - It sets up a large buffer reader such that we can write the data more efficiently.
+//
+// - It logs how many bytes have been sent.
+//
+// - It installs a timeout such that we can address time-of-check-to-time-of-use-style races. Otherwise it would be
+// possible to open the connection early, keep it open for an extended amount of time, and only do the negotiation of
+// what is to be sent at a later point when permissions of the user might have changed.
+func runUploadCommand(
+ rpcContext context.Context,
+ gitCmdFactory git.CommandFactory,
+ repo *gitalypb.Repository,
+ stdin io.Reader,
+ stdout, stderr io.Writer,
+ timeoutTicker helper.Ticker,
+ boundaryPacket []byte,
+ sc git.Command,
+ opts ...git.CmdOpt,
+) error {
+ ctx, cancelCtx := context.WithCancel(rpcContext)
+ defer cancelCtx()
+
+ var stderrBuilder strings.Builder
+ stderr = io.MultiWriter(stderr, &stderrBuilder)
+
+ stdoutCounter := &helper.CountingWriter{W: stdout}
+ // Use large copy buffer to reduce the number of system calls
+ stdout = &largeBufferReaderFrom{Writer: stdoutCounter}
+
+ stdinPipe, monitor, cleanup, err := pktline.NewReadMonitor(ctx, stdin)
+ if err != nil {
+ return fmt.Errorf("create monitor: %w", err)
+ }
+
+ cmd, err := gitCmdFactory.New(ctx, repo, sc, append([]git.CmdOpt{
+ git.WithStdin(stdinPipe),
+ git.WithStdout(stdout),
+ git.WithStderr(stderr),
+ git.WithFinalizer(func(context.Context, *command.Command) { cleanup() }),
+ }, opts...)...)
+ stdinPipe.Close() // this now belongs to cmd
+ if err != nil {
+ cleanup()
+ return fmt.Errorf("starting command: %w", err)
+ }
+
+ go monitor.Monitor(ctx, boundaryPacket, timeoutTicker, cancelCtx)
+
+ if err := cmd.Wait(); err != nil {
+ // The read monitor will cancel the local `ctx` when we do not observe a specific packet before the
+ // timeout ticker ticks. This is done to address a time-of-check-to-time-of-use-style race, where the
+ // client opens a connection but doesn't yet perform the negotiation of what data the server should
+ // send. Because access checks only happen at the beginning of the call, it may be the case that the
+ // client's permissions have changed since the RPC call started.
+ //
+ // To address this issue, we thus timebox the maximum amount of time between the start of the RPC call
+ // and the end of the negotiation phase. While this doesn't completely address the issue, it's the best
+ // we can reasonably do here.
+ //
+ // To distinguish cancellation of the overall RPC call and a timeout of the negotiation phase we use two
+ // different contexts. In the case where the local context has been cancelled, we know that the reason
+ // for cancellation is that the negotiation phase did not finish in time and thus return a more specific
+ // error.
+ if ctx.Err() != nil && rpcContext.Err() == nil {
+ return structerr.NewDeadlineExceeded("waiting for negotiation: %w", ctx.Err())
+ }
+
+ // A common error case is that the client is terminating the request prematurely,
+ // e.g. by killing their git-fetch(1) process because it's taking too long. This is
+ // an expected failure, but we're not in a position to easily tell this error apart
+ // from other errors returned by git-upload-pack(1). So we have to resort to parsing
+ // the error message returned by Git, and if we see that it matches we return an
+ // error with a `Canceled` error code.
+ //
+ // Note that we're being quite strict with how we match the error for now. We may
+ // have to make it more lenient in case we see that this doesn't catch all cases.
+ if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" {
+ return structerr.NewCanceled("user canceled the request")
+ }
+
+ return fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String())
+ }
+
+ log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details")
+
+ return err
+}
+
+type largeBufferReaderFrom struct {
+ io.Writer
+}
+
+func (rf *largeBufferReaderFrom) ReadFrom(r io.Reader) (int64, error) {
+ return io.CopyBuffer(rf.Writer, r, make([]byte, 64*1024))
+}
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go
index 5e8b9b850..d4d306533 100644
--- a/internal/gitaly/service/ssh/upload_pack.go
+++ b/internal/gitaly/service/ssh/upload_pack.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "strings"
"sync"
"gitlab.com/gitlab-org/gitaly/v16/internal/command"
@@ -14,7 +13,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
- "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/stream"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
@@ -74,14 +72,7 @@ type sshUploadPackRequest interface {
GetGitProtocol() string
}
-func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) {
- ctx, cancelCtx := context.WithCancel(rpcContext)
- defer cancelCtx()
-
- stdoutCounter := &helper.CountingWriter{W: stdout}
- // Use large copy buffer to reduce the number of system calls
- stdout = &largeBufferReaderFrom{Writer: stdoutCounter}
-
+func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (negotiation *stats.PackfileNegotiation, _ int, _ error) {
repo := req.GetRepository()
repoPath, err := s.locator.GetRepoPath(repo)
if err != nil {
@@ -126,17 +117,6 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ
git.WithPackObjectsHookEnv(repo, "ssh"),
}
- var stderrBuilder strings.Builder
- stderr = io.MultiWriter(stderr, &stderrBuilder)
-
- cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, git.Command{
- Name: "upload-pack",
- Args: []string{repoPath},
- }, commandOpts...)
- if err != nil {
- return nil, 0, err
- }
-
timeoutTicker := s.uploadPackRequestTimeoutTickerFactory()
// upload-pack negotiation is terminated by either a flush, or the "done"
@@ -145,41 +125,14 @@ func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequ
// "flush" tells the server it can terminate, while "done" tells it to start
// generating a packfile. Add a timeout to the second case to mitigate
// use-after-check attacks.
- go monitor.Monitor(ctx, pktline.PktDone(), timeoutTicker, cancelCtx)
-
- if err := cmd.Wait(); err != nil {
+ if err := runUploadCommand(ctx, s.gitCmdFactory, repo, stdin, stdout, stderr, timeoutTicker, pktline.PktDone(), git.Command{
+ Name: "upload-pack",
+ Args: []string{repoPath},
+ }, commandOpts...); err != nil {
status, _ := command.ExitStatus(err)
-
- // When waiting for the packfile negotiation to end times out we'll cancel the local
- // context, but not cancel the overall RPC's context. Our statushandler middleware
- // thus cannot observe the fact that we're cancelling the context, and neither do we
- // provide any valuable information to the caller that we do indeed kill the command
- // because of our own internal timeout.
- //
- // We thus need to special-case the situation where we cancel our own context in
- // order to provide that information and return a proper gRPC error code.
- if ctx.Err() != nil && rpcContext.Err() == nil {
- return nil, status, structerr.NewDeadlineExceeded("waiting for packfile negotiation: %w", ctx.Err())
- }
-
- // A common error case is that the client is terminating the request prematurely,
- // e.g. by killing their git-fetch(1) process because it's taking too long. This is
- // an expected failure, but we're not in a position to easily tell this error apart
- // from other errors returned by git-upload-pack(1). So we have to resort to parsing
- // the error message returned by Git, and if we see that it matches we return an
- // error with a `Canceled` error code.
- //
- // Note that we're being quite strict with how we match the error for now. We may
- // have to make it more lenient in case we see that this doesn't catch all cases.
- if stderrBuilder.String() == "fatal: the remote end hung up unexpectedly\n" {
- return nil, status, structerr.NewCanceled("user canceled the fetch")
- }
-
- return nil, status, fmt.Errorf("cmd wait: %w, stderr: %q", err, stderrBuilder.String())
+ return nil, status, fmt.Errorf("running upload-pack: %w", err)
}
- log.FromContext(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details")
-
return nil, 0, nil
}
@@ -194,14 +147,6 @@ func validateFirstUploadPackRequest(locator storage.Locator, req *gitalypb.SSHUp
return nil
}
-type largeBufferReaderFrom struct {
- io.Writer
-}
-
-func (rf *largeBufferReaderFrom) ReadFrom(r io.Reader) (int64, error) {
- return io.CopyBuffer(rf.Writer, r, make([]byte, 64*1024))
-}
-
func (s *server) SSHUploadPackWithSidechannel(ctx context.Context, req *gitalypb.SSHUploadPackWithSidechannelRequest) (*gitalypb.SSHUploadPackWithSidechannelResponse, error) {
conn, err := sidechannel.OpenSidechannel(ctx)
if err != nil {
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index 328337f0d..483a653a8 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -141,7 +141,7 @@ func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) {
// Because the client says nothing, the server would block. Because of
// the timeout, it won't block forever, and return with a non-zero exit
// code instead.
- requireFailedSSHStream(t, structerr.NewDeadlineExceeded("waiting for packfile negotiation: context canceled"), func() (int32, error) {
+ requireFailedSSHStream(t, structerr.NewDeadlineExceeded("running upload-pack: waiting for negotiation: context canceled"), func() (int32, error) {
resp, err := stream.Recv()
if err != nil {
return 0, err
@@ -247,7 +247,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
expectedErr: structerr.NewInternal(
- "cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"",
+ "running upload-pack: cmd wait: exit status 128, stderr: \"fatal: unknown capability 'want %s multi_ack'\\n\"",
commitID,
),
},
@@ -270,7 +270,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
expectedErr: structerr.NewInternal(
- "cmd wait: exit status 128, stderr: %q",
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
"fatal: git upload-pack: protocol error, expected to get object ID, not 'command=fetch'\n",
),
},
@@ -290,7 +290,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
- expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q",
+ expectedErr: structerr.NewInternal(
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
"fatal: git upload-pack: not our ref "+strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())+"\n",
),
},
@@ -308,7 +309,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
- expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q",
+ expectedErr: structerr.NewInternal(
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
"fatal: git upload-pack: protocol error, expected to get object ID, not 'want 1111 multi_ack'\n",
),
},
@@ -341,7 +343,7 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
return nil
},
- expectedErr: structerr.NewCanceled("user canceled the fetch"),
+ expectedErr: structerr.NewCanceled("running upload-pack: user canceled the request"),
},
{
desc: "garbage",
@@ -354,7 +356,10 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
require.NoError(t, clientConn.CloseWrite())
return nil
},
- expectedErr: structerr.NewInternal("cmd wait: exit status 128, stderr: %q", "fatal: unknown capability 'foobar'\n"),
+ expectedErr: structerr.NewInternal(
+ "running upload-pack: cmd wait: exit status 128, stderr: %q",
+ "fatal: unknown capability 'foobar'\n",
+ ),
},
{
desc: "close and cancellation",
@@ -438,6 +443,8 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
}
func requireFailedSSHStream(t *testing.T, expectedErr error, recv func() (int32, error)) {
+ t.Helper()
+
done := make(chan struct{})
var code int32
var err error
@@ -802,7 +809,7 @@ func TestUploadPack_gitFailure(t *testing.T) {
require.NoError(t, stream.CloseSend())
err = recvUntilError(t, stream)
- testhelper.RequireGrpcError(t, structerr.NewInternal(`cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err)
+ testhelper.RequireGrpcError(t, structerr.NewInternal(`running upload-pack: cmd wait: exit status 128, stderr: "fatal: bad config line 1 in file ./config\n"`), err)
}
func recvUntilError(t *testing.T, stream gitalypb.SSHService_SSHUploadPackClient) error {