Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-07-29 12:53:51 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-07-29 12:53:51 +0300
commitcfd27a4402af9c7ccfa8ab21a49339f4bb2d949f (patch)
treef778f8145283c5eb1a0e103e5c1ca3dec96862a9
parent0fa08d953e0d5497fe5366836d0ed54b9ff557d8 (diff)
parent5caeaa1d3e5694a1f9179b5cf09aa663f4f75b42 (diff)
Merge branch 'pks-ssh-cancelled-packfile-negotiation' into 'master'
ssh: Handle timeout waiting on packfile negotiation with proper errors See merge request gitlab-org/gitaly!4761
-rw-r--r--internal/gitaly/service/ssh/upload_archive.go12
-rw-r--r--internal/gitaly/service/ssh/upload_archive_test.go3
-rw-r--r--internal/gitaly/service/ssh/upload_pack.go23
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go54
-rw-r--r--internal/helper/error.go9
-rw-r--r--internal/helper/error_test.go10
6 files changed, 79 insertions, 32 deletions
diff --git a/internal/gitaly/service/ssh/upload_archive.go b/internal/gitaly/service/ssh/upload_archive.go
index 7fef9bb2d..3dc362a91 100644
--- a/internal/gitaly/service/ssh/upload_archive.go
+++ b/internal/gitaly/service/ssh/upload_archive.go
@@ -69,6 +69,18 @@ func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer
go monitor.Monitor(ctx, pktline.PktFlush(), timeoutTicker, cancelCtx)
if err := cmd.Wait(); err != nil {
+ // When waiting for the packfile negotiation to end times out we'll cancel the local
+ // context, but not cancel the overall RPC's context. Our cancelhandler middleware
+ // thus cannot observe the fact that we're cancelling the context, and neither do we
+ // provide any valuable information to the caller that we do indeed kill the command
+ // because of our own internal timeout.
+ //
+ // We thus need to special-case the situation where we cancel our own context in
+ // order to provide that information and return a proper gRPC error code.
+ if ctx.Err() != nil && stream.Context().Err() == nil {
+ return helper.ErrDeadlineExceededf("waiting for packfile negotiation: %w", ctx.Err())
+ }
+
if status, ok := command.ExitStatus(err); ok {
if sendErr := stream.Send(&gitalypb.SSHUploadArchiveResponse{
ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
diff --git a/internal/gitaly/service/ssh/upload_archive_test.go b/internal/gitaly/service/ssh/upload_archive_test.go
index 11b328567..af2699a90 100644
--- a/internal/gitaly/service/ssh/upload_archive_test.go
+++ b/internal/gitaly/service/ssh/upload_archive_test.go
@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
@@ -41,7 +42,7 @@ func TestFailedUploadArchiveRequestDueToTimeout(t *testing.T) {
// Because the client says nothing, the server would block. Because of
// the timeout, it won't block forever, and return with a non-zero exit
// code instead.
- requireFailedSSHStream(t, func() (int32, error) {
+ requireFailedSSHStream(t, helper.ErrDeadlineExceededf("waiting for packfile negotiation: context canceled"), func() (int32, error) {
resp, err := stream.Recv()
if err != nil {
return 0, err
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go
index 807735bbc..e468c834c 100644
--- a/internal/gitaly/service/ssh/upload_pack.go
+++ b/internal/gitaly/service/ssh/upload_pack.go
@@ -28,13 +28,8 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e
return helper.ErrInternal(err)
}
- repository := ""
- if req.Repository != nil {
- repository = req.Repository.GlRepository
- }
-
ctxlogrus.Extract(ctx).WithFields(log.Fields{
- "GlRepository": repository,
+ "GlRepository": req.GetRepository().GetGlRepository(),
"GitConfigOptions": req.GitConfigOptions,
"GitProtocol": req.GitProtocol,
}).Debug("SSHUploadPack")
@@ -77,8 +72,8 @@ type sshUploadPackRequest interface {
GetGitProtocol() string
}
-func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
- ctx, cancelCtx := context.WithCancel(ctx)
+func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
+ ctx, cancelCtx := context.WithCancel(rpcContext)
defer cancelCtx()
stdoutCounter := &helper.CountingWriter{W: stdout}
@@ -152,6 +147,18 @@ func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, st
if err := cmd.Wait(); err != nil {
status, _ := command.ExitStatus(err)
+ // When waiting for the packfile negotiation to end times out we'll cancel the local
+ // context, but not cancel the overall RPC's context. Our cancelhandler middleware
+ // thus cannot observe the fact that we're cancelling the context, and neither do we
+ // provide any valuable information to the caller that we do indeed kill the command
+ // because of our own internal timeout.
+ //
+ // We thus need to special-case the situation where we cancel our own context in
+ // order to provide that information and return a proper gRPC error code.
+ if ctx.Err() != nil && rpcContext.Err() == nil {
+ return status, helper.ErrDeadlineExceededf("waiting for packfile negotiation: %w", ctx.Err())
+ }
+
// A common error case is that the client is terminating the request prematurely,
// e.g. by killing their git-fetch(1) process because it's taking too long. This is
// an expected failure, but we're not in a position to easily tell this error apart
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index a366d0f80..d7b311fc7 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -30,7 +30,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
)
@@ -97,21 +96,23 @@ func requireRevisionsEqual(t *testing.T, cfg config.Cfg, repoPathA, repoPathB, r
func TestUploadPack_timeout(t *testing.T) {
t.Parallel()
- runTestWithAndWithoutConfigOptions(t, testUploadPackTimeout, testcfg.WithPackObjectsCacheEnabled())
+ testhelper.NewFeatureSets(featureflag.UploadPackHideRefs).Run(t, func(t *testing.T, ctx context.Context) {
+ runTestWithAndWithoutConfigOptions(t, func(t *testing.T, opts ...testcfg.Option) {
+ testUploadPackTimeout(t, ctx, opts...)
+ }, testcfg.WithPackObjectsCacheEnabled())
+ })
}
-func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) {
+func testUploadPackTimeout(t *testing.T, ctx context.Context, opts ...testcfg.Option) {
cfg := testcfg.Build(t, opts...)
- cfg.SocketPath = runSSHServerWithOptions(t, cfg, []ServerOpt{WithUploadPackRequestTimeout(10 * time.Microsecond)})
+ cfg.SocketPath = runSSHServerWithOptions(t, cfg, []ServerOpt{WithUploadPackRequestTimeout(1)})
- repo, _ := gittest.CreateRepository(testhelper.Context(t), t, cfg, gittest.CreateRepositoryConfig{
- Seed: gittest.SeedGitLabTest,
- })
+ repo, repoPath := gittest.CreateRepository(testhelper.Context(t), t, cfg)
+ gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"))
client, conn := newSSHClient(t, cfg.SocketPath)
defer conn.Close()
- ctx := testhelper.Context(t)
stream, err := client.SSHUploadPack(ctx)
require.NoError(t, err)
@@ -122,7 +123,7 @@ func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) {
// Because the client says nothing, the server would block. Because of
// the timeout, it won't block forever, and return with a non-zero exit
// code instead.
- requireFailedSSHStream(t, func() (int32, error) {
+ requireFailedSSHStream(t, helper.ErrDeadlineExceededf("waiting for packfile negotiation: context canceled"), func() (int32, error) {
resp, err := stream.Recv()
if err != nil {
return 0, err
@@ -406,7 +407,7 @@ func testUploadPackWithSidechannelClient(t *testing.T, ctx context.Context) {
}
}
-func requireFailedSSHStream(t *testing.T, recv func() (int32, error)) {
+func requireFailedSSHStream(t *testing.T, expectedErr error, recv func() (int32, error)) {
done := make(chan struct{})
var code int32
var err error
@@ -420,7 +421,7 @@ func requireFailedSSHStream(t *testing.T, recv func() (int32, error)) {
select {
case <-done:
- testhelper.RequireGrpcCode(t, err, codes.Internal)
+ testhelper.RequireGrpcError(t, expectedErr, err)
require.NotEqual(t, 0, code, "exit status")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for SSH stream")
@@ -527,9 +528,21 @@ func testUploadPackSuccessful(t *testing.T, sidechannel bool, opts ...testcfg.Op
WithPackfileNegotiationMetrics(negotiationMetrics),
}, testserver.WithGitCommandFactory(protocolDetectingFactory))
- repo, repoPath := gittest.CreateRepository(testhelper.Context(t), t, cfg, gittest.CreateRepositoryConfig{
- Seed: gittest.SeedGitLabTest,
- })
+ repo, repoPath := gittest.CreateRepository(ctx, t, cfg)
+
+ smallBlobID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+ largeBlobID := gittest.WriteBlob(t, cfg, repoPath, bytes.Repeat([]byte("1"), 2048))
+
+ // We set up the commits so that HEAD does not reference the above two blobs. If it did we'd
+ // fetch the blobs regardless of `--filter=blob:limit`.
+ rootCommitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(), gittest.WithTreeEntries(
+ gittest.TreeEntry{Path: "small", Mode: "100644", OID: smallBlobID},
+ gittest.TreeEntry{Path: "large", Mode: "100644", OID: largeBlobID},
+ ))
+ gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(rootCommitID), gittest.WithBranch("main"), gittest.WithTreeEntries(
+ gittest.TreeEntry{Path: "unrelated", Mode: "100644", Content: "something"},
+ ))
+ gittest.WriteTag(t, cfg, repoPath, "v1.0.0", rootCommitID.Revision())
for _, tc := range []struct {
desc string
@@ -581,16 +594,11 @@ func testUploadPackSuccessful(t *testing.T, sidechannel bool, opts ...testcfg.Op
Repository: repo,
},
cloneFlags: []git.Option{
- git.ValueFlag{Name: "--filter", Value: "blob:limit=2048"},
+ git.ValueFlag{Name: "--filter", Value: "blob:limit=1024"},
},
verify: func(t *testing.T, repoPath string) {
- // Ruby file which is ~1kB in size and not present in HEAD
- blobLessThanLimit := git.ObjectID("6ee41e85cc9bf33c10b690df09ca735b22f3790f")
- // Image which is ~100kB in size and not present in HEAD
- blobGreaterThanLimit := git.ObjectID("18079e308ff9b3a5e304941020747e5c39b46c88")
-
- gittest.RequireObjectNotExists(t, cfg, repoPath, blobGreaterThanLimit)
- gittest.RequireObjectExists(t, cfg, repoPath, blobLessThanLimit)
+ gittest.RequireObjectNotExists(t, cfg, repoPath, largeBlobID)
+ gittest.RequireObjectExists(t, cfg, repoPath, smallBlobID)
},
},
{
@@ -627,7 +635,7 @@ func testUploadPackSuccessful(t *testing.T, sidechannel bool, opts ...testcfg.Op
Flags: tc.cloneFlags,
}, tc.request))
- requireRevisionsEqual(t, cfg, repoPath, localRepoPath, "refs/heads/master")
+ requireRevisionsEqual(t, cfg, repoPath, localRepoPath, "refs/heads/main")
metric, err := negotiationMetrics.GetMetricWithLabelValues("deepen")
require.NoError(t, err)
diff --git a/internal/helper/error.go b/internal/helper/error.go
index 517fb1d70..dd3e17f08 100644
--- a/internal/helper/error.go
+++ b/internal/helper/error.go
@@ -26,6 +26,9 @@ func (sw statusWrapper) Unwrap() error {
// ErrCanceled wraps err with codes.Canceled, unless err is already a gRPC error.
func ErrCanceled(err error) error { return wrapError(codes.Canceled, err) }
+// ErrDeadlineExceeded wraps err with codes.DeadlineExceeded, unless err is already a gRPC error.
+func ErrDeadlineExceeded(err error) error { return wrapError(codes.DeadlineExceeded, err) }
+
// ErrInternal wraps err with codes.Internal, unless err is already a gRPC error.
func ErrInternal(err error) error { return wrapError(codes.Internal, err) }
@@ -66,6 +69,12 @@ func ErrCanceledf(format string, a ...interface{}) error {
return formatError(codes.Canceled, format, a...)
}
+// ErrDeadlineExceededf wraps a formatted error with codes.DeadlineExceeded, unless the formatted
+// error is a wrapped gRPC error.
+func ErrDeadlineExceededf(format string, a ...interface{}) error {
+ return formatError(codes.DeadlineExceeded, format, a...)
+}
+
// ErrInternalf wraps a formatted error with codes.Internal, unless the formatted error is a
// wrapped gRPC error.
func ErrInternalf(format string, a ...interface{}) error {
diff --git a/internal/helper/error_test.go b/internal/helper/error_test.go
index 3de56385d..44df756cd 100644
--- a/internal/helper/error_test.go
+++ b/internal/helper/error_test.go
@@ -31,6 +31,11 @@ func TestError(t *testing.T) {
code: codes.Canceled,
},
{
+ desc: "DeadlineExceeded",
+ errorf: ErrDeadlineExceeded,
+ code: codes.DeadlineExceeded,
+ },
+ {
desc: "Internal",
errorf: ErrInternal,
code: codes.Internal,
@@ -104,6 +109,11 @@ func TestErrorf(t *testing.T) {
expectedCode: codes.Canceled,
},
{
+ desc: "DeadlineExceededf",
+ errorf: ErrDeadlineExceededf,
+ expectedCode: codes.DeadlineExceeded,
+ },
+ {
desc: "Internalf",
errorf: ErrInternalf,
expectedCode: codes.Internal,