diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-07-29 12:53:51 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-07-29 12:53:51 +0300 |
commit | cfd27a4402af9c7ccfa8ab21a49339f4bb2d949f (patch) | |
tree | f778f8145283c5eb1a0e103e5c1ca3dec96862a9 | |
parent | 0fa08d953e0d5497fe5366836d0ed54b9ff557d8 (diff) | |
parent | 5caeaa1d3e5694a1f9179b5cf09aa663f4f75b42 (diff) |
Merge branch 'pks-ssh-cancelled-packfile-negotiation' into 'master'
ssh: Handle timeout waiting on packfile negotiation with proper errors
See merge request gitlab-org/gitaly!4761
-rw-r--r-- | internal/gitaly/service/ssh/upload_archive.go | 12 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_archive_test.go | 3 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack.go | 23 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack_test.go | 54 | ||||
-rw-r--r-- | internal/helper/error.go | 9 | ||||
-rw-r--r-- | internal/helper/error_test.go | 10 |
6 files changed, 79 insertions, 32 deletions
diff --git a/internal/gitaly/service/ssh/upload_archive.go b/internal/gitaly/service/ssh/upload_archive.go index 7fef9bb2d..3dc362a91 100644 --- a/internal/gitaly/service/ssh/upload_archive.go +++ b/internal/gitaly/service/ssh/upload_archive.go @@ -69,6 +69,18 @@ func (s *server) sshUploadArchive(stream gitalypb.SSHService_SSHUploadArchiveSer go monitor.Monitor(ctx, pktline.PktFlush(), timeoutTicker, cancelCtx) if err := cmd.Wait(); err != nil { + // When waiting for the packfile negotiation to end times out we'll cancel the local + // context, but not cancel the overall RPC's context. Our cancelhandler middleware + // thus cannot observe the fact that we're cancelling the context, and neither do we + // provide any valuable information to the caller that we do indeed kill the command + // because of our own internal timeout. + // + // We thus need to special-case the situation where we cancel our own context in + // order to provide that information and return a proper gRPC error code. + if ctx.Err() != nil && stream.Context().Err() == nil { + return helper.ErrDeadlineExceededf("waiting for packfile negotiation: %w", ctx.Err()) + } + if status, ok := command.ExitStatus(err); ok { if sendErr := stream.Send(&gitalypb.SSHUploadArchiveResponse{ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, diff --git a/internal/gitaly/service/ssh/upload_archive_test.go b/internal/gitaly/service/ssh/upload_archive_test.go index 11b328567..af2699a90 100644 --- a/internal/gitaly/service/ssh/upload_archive_test.go +++ b/internal/gitaly/service/ssh/upload_archive_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" @@ -41,7 +42,7 @@ func TestFailedUploadArchiveRequestDueToTimeout(t *testing.T) { // Because the client says nothing, the server would block. Because of // the timeout, it won't block forever, and return with a non-zero exit // code instead. - requireFailedSSHStream(t, func() (int32, error) { + requireFailedSSHStream(t, helper.ErrDeadlineExceededf("waiting for packfile negotiation: context canceled"), func() (int32, error) { resp, err := stream.Recv() if err != nil { return 0, err diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index 807735bbc..e468c834c 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -28,13 +28,8 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e return helper.ErrInternal(err) } - repository := "" - if req.Repository != nil { - repository = req.Repository.GlRepository - } - ctxlogrus.Extract(ctx).WithFields(log.Fields{ - "GlRepository": repository, + "GlRepository": req.GetRepository().GetGlRepository(), "GitConfigOptions": req.GitConfigOptions, "GitProtocol": req.GitProtocol, }).Debug("SSHUploadPack") @@ -77,8 +72,8 @@ type sshUploadPackRequest interface { GetGitProtocol() string } -func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) { - ctx, cancelCtx := context.WithCancel(ctx) +func (s *server) sshUploadPack(rpcContext context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) { + ctx, cancelCtx := context.WithCancel(rpcContext) defer cancelCtx() stdoutCounter := &helper.CountingWriter{W: stdout} @@ -152,6 +147,18 @@ func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, st if err := cmd.Wait(); err != nil { status, _ := command.ExitStatus(err) + // When waiting for the packfile negotiation to end times out we'll cancel the local + // context, but not cancel the overall RPC's context. Our cancelhandler middleware + // thus cannot observe the fact that we're cancelling the context, and neither do we + // provide any valuable information to the caller that we do indeed kill the command + // because of our own internal timeout. + // + // We thus need to special-case the situation where we cancel our own context in + // order to provide that information and return a proper gRPC error code. + if ctx.Err() != nil && rpcContext.Err() == nil { + return status, helper.ErrDeadlineExceededf("waiting for packfile negotiation: %w", ctx.Err()) + } + // A common error case is that the client is terminating the request prematurely, // e.g. by killing their git-fetch(1) process because it's taking too long. This is // an expected failure, but we're not in a position to easily tell this error apart diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go index a366d0f80..d7b311fc7 100644 --- a/internal/gitaly/service/ssh/upload_pack_test.go +++ b/internal/gitaly/service/ssh/upload_pack_test.go @@ -30,7 +30,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/encoding/protojson" ) @@ -97,21 +96,23 @@ func requireRevisionsEqual(t *testing.T, cfg config.Cfg, repoPathA, repoPathB, r func TestUploadPack_timeout(t *testing.T) { t.Parallel() - runTestWithAndWithoutConfigOptions(t, testUploadPackTimeout, testcfg.WithPackObjectsCacheEnabled()) + testhelper.NewFeatureSets(featureflag.UploadPackHideRefs).Run(t, func(t *testing.T, ctx context.Context) { + runTestWithAndWithoutConfigOptions(t, func(t *testing.T, opts ...testcfg.Option) { + testUploadPackTimeout(t, ctx, opts...) + }, testcfg.WithPackObjectsCacheEnabled()) + }) } -func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) { +func testUploadPackTimeout(t *testing.T, ctx context.Context, opts ...testcfg.Option) { cfg := testcfg.Build(t, opts...) - cfg.SocketPath = runSSHServerWithOptions(t, cfg, []ServerOpt{WithUploadPackRequestTimeout(10 * time.Microsecond)}) + cfg.SocketPath = runSSHServerWithOptions(t, cfg, []ServerOpt{WithUploadPackRequestTimeout(1)}) - repo, _ := gittest.CreateRepository(testhelper.Context(t), t, cfg, gittest.CreateRepositoryConfig{ - Seed: gittest.SeedGitLabTest, - }) + repo, repoPath := gittest.CreateRepository(testhelper.Context(t), t, cfg) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) client, conn := newSSHClient(t, cfg.SocketPath) defer conn.Close() - ctx := testhelper.Context(t) stream, err := client.SSHUploadPack(ctx) require.NoError(t, err) @@ -122,7 +123,7 @@ func testUploadPackTimeout(t *testing.T, opts ...testcfg.Option) { // Because the client says nothing, the server would block. Because of // the timeout, it won't block forever, and return with a non-zero exit // code instead. - requireFailedSSHStream(t, func() (int32, error) { + requireFailedSSHStream(t, helper.ErrDeadlineExceededf("waiting for packfile negotiation: context canceled"), func() (int32, error) { resp, err := stream.Recv() if err != nil { return 0, err @@ -406,7 +407,7 @@ func testUploadPackWithSidechannelClient(t *testing.T, ctx context.Context) { } } -func requireFailedSSHStream(t *testing.T, recv func() (int32, error)) { +func requireFailedSSHStream(t *testing.T, expectedErr error, recv func() (int32, error)) { done := make(chan struct{}) var code int32 var err error @@ -420,7 +421,7 @@ func requireFailedSSHStream(t *testing.T, recv func() (int32, error)) { select { case <-done: - testhelper.RequireGrpcCode(t, err, codes.Internal) + testhelper.RequireGrpcError(t, expectedErr, err) require.NotEqual(t, 0, code, "exit status") case <-time.After(10 * time.Second): t.Fatal("timeout waiting for SSH stream") @@ -527,9 +528,21 @@ func testUploadPackSuccessful(t *testing.T, sidechannel bool, opts ...testcfg.Op WithPackfileNegotiationMetrics(negotiationMetrics), }, testserver.WithGitCommandFactory(protocolDetectingFactory)) - repo, repoPath := gittest.CreateRepository(testhelper.Context(t), t, cfg, gittest.CreateRepositoryConfig{ - Seed: gittest.SeedGitLabTest, - }) + repo, repoPath := gittest.CreateRepository(ctx, t, cfg) + + smallBlobID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar")) + largeBlobID := gittest.WriteBlob(t, cfg, repoPath, bytes.Repeat([]byte("1"), 2048)) + + // We set up the commits so that HEAD does not reference the above two blobs. If it did we'd + // fetch the blobs regardless of `--filter=blob:limit`. + rootCommitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(), gittest.WithTreeEntries( + gittest.TreeEntry{Path: "small", Mode: "100644", OID: smallBlobID}, + gittest.TreeEntry{Path: "large", Mode: "100644", OID: largeBlobID}, + )) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(rootCommitID), gittest.WithBranch("main"), gittest.WithTreeEntries( + gittest.TreeEntry{Path: "unrelated", Mode: "100644", Content: "something"}, + )) + gittest.WriteTag(t, cfg, repoPath, "v1.0.0", rootCommitID.Revision()) for _, tc := range []struct { desc string @@ -581,16 +594,11 @@ func testUploadPackSuccessful(t *testing.T, sidechannel bool, opts ...testcfg.Op Repository: repo, }, cloneFlags: []git.Option{ - git.ValueFlag{Name: "--filter", Value: "blob:limit=2048"}, + git.ValueFlag{Name: "--filter", Value: "blob:limit=1024"}, }, verify: func(t *testing.T, repoPath string) { - // Ruby file which is ~1kB in size and not present in HEAD - blobLessThanLimit := git.ObjectID("6ee41e85cc9bf33c10b690df09ca735b22f3790f") - // Image which is ~100kB in size and not present in HEAD - blobGreaterThanLimit := git.ObjectID("18079e308ff9b3a5e304941020747e5c39b46c88") - - gittest.RequireObjectNotExists(t, cfg, repoPath, blobGreaterThanLimit) - gittest.RequireObjectExists(t, cfg, repoPath, blobLessThanLimit) + gittest.RequireObjectNotExists(t, cfg, repoPath, largeBlobID) + gittest.RequireObjectExists(t, cfg, repoPath, smallBlobID) }, }, { @@ -627,7 +635,7 @@ func testUploadPackSuccessful(t *testing.T, sidechannel bool, opts ...testcfg.Op Flags: tc.cloneFlags, }, tc.request)) - requireRevisionsEqual(t, cfg, repoPath, localRepoPath, "refs/heads/master") + requireRevisionsEqual(t, cfg, repoPath, localRepoPath, "refs/heads/main") metric, err := negotiationMetrics.GetMetricWithLabelValues("deepen") require.NoError(t, err) diff --git a/internal/helper/error.go b/internal/helper/error.go index 517fb1d70..dd3e17f08 100644 --- a/internal/helper/error.go +++ b/internal/helper/error.go @@ -26,6 +26,9 @@ func (sw statusWrapper) Unwrap() error { // ErrCanceled wraps err with codes.Canceled, unless err is already a gRPC error. func ErrCanceled(err error) error { return wrapError(codes.Canceled, err) } +// ErrDeadlineExceeded wraps err with codes.DeadlineExceeded, unless err is already a gRPC error. +func ErrDeadlineExceeded(err error) error { return wrapError(codes.DeadlineExceeded, err) } + // ErrInternal wraps err with codes.Internal, unless err is already a gRPC error. func ErrInternal(err error) error { return wrapError(codes.Internal, err) } @@ -66,6 +69,12 @@ func ErrCanceledf(format string, a ...interface{}) error { return formatError(codes.Canceled, format, a...) } +// ErrDeadlineExceededf wraps a formatted error with codes.DeadlineExceeded, unless the formatted +// error is a wrapped gRPC error. +func ErrDeadlineExceededf(format string, a ...interface{}) error { + return formatError(codes.DeadlineExceeded, format, a...) +} + // ErrInternalf wraps a formatted error with codes.Internal, unless the formatted error is a // wrapped gRPC error. func ErrInternalf(format string, a ...interface{}) error { diff --git a/internal/helper/error_test.go b/internal/helper/error_test.go index 3de56385d..44df756cd 100644 --- a/internal/helper/error_test.go +++ b/internal/helper/error_test.go @@ -31,6 +31,11 @@ func TestError(t *testing.T) { code: codes.Canceled, }, { + desc: "DeadlineExceeded", + errorf: ErrDeadlineExceeded, + code: codes.DeadlineExceeded, + }, + { desc: "Internal", errorf: ErrInternal, code: codes.Internal, @@ -104,6 +109,11 @@ func TestErrorf(t *testing.T) { expectedCode: codes.Canceled, }, { + desc: "DeadlineExceededf", + errorf: ErrDeadlineExceededf, + expectedCode: codes.DeadlineExceeded, + }, + { desc: "Internalf", errorf: ErrInternalf, expectedCode: codes.Internal, |