diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-09-19 07:04:53 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-09-19 07:04:53 +0300 |
commit | 0bef3b7e4677bb820c8ab8946024688f27fc5282 (patch) | |
tree | cced5a4ed61b18553663ff4463e7ea42c3734f2c | |
parent | 8e36035898e136f098e71e8d77052542edccbf8f (diff) | |
parent | 0952026e82cf500b1fcb7a3515f1665fe283f41d (diff) |
Merge branch 'pks-pack-objects-hook-context-cancellation-flake' into 'master'
hook: Fix pack-objects hook test with context cancellation
Closes #5548
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6372
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 201 |
1 files changed, 114 insertions, 87 deletions
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 80811dc5e..c70de60de 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -3,6 +3,7 @@ package hook import ( "bytes" "context" + "crypto/rand" "fmt" "io" "net" @@ -29,6 +30,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v16/streamio" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -82,8 +84,6 @@ func TestParsePackObjectsArgs(t *testing.T) { } func TestServer_PackObjectsHook_separateContext(t *testing.T) { - testhelper.SkipQuarantinedTest(t, "https://gitlab.com/gitlab-org/gitaly/-/issues/5548") - t.Parallel() runTestsWithRuntimeDir(t, testServerPackObjectsHookSeparateContextWithRuntimeDir) } @@ -94,13 +94,14 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim cfg := cfgWithCache(t, 0) cfg.SocketPath = runHooksServer(t, cfg, nil) - ctx1, cancel := context.WithCancel(ctx) - defer cancel() - repo, repoPath := gittest.CreateRepository(t, ctx1, cfg) - + repo, repoPath := gittest.CreateRepository(t, ctx, cfg) // We write a commit with a large blob such that the response needs to be split over multiple messages. // Otherwise it may happen that the request will finish before we can actually cancel the context. - commitID := gittest.WriteCommit(t, cfg, repoPath) + data := make([]byte, 10*streamio.WriteBufferSize) + _, _ = rand.Read(data[:]) + commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithTreeEntries( + gittest.TreeEntry{Path: "path", Mode: "100644", Content: string(data)}, + )) req := &gitalypb.PackObjectsHookWithSidechannelRequest{ Repository: repo, @@ -108,108 +109,134 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim } stdin := commitID.String() + "\n--not\n\n" - start1 := make(chan struct{}) - start2 := make(chan struct{}) - wg := &sync.WaitGroup{} - wg.Add(2) + syncCh := make(chan struct{}) - // Call 1: sends a valid request but hangs up without reading response. - // This should not break call 2. - client1, conn1 := newHooksClient(t, cfg.SocketPath) - defer conn1.Close() - - ctx1, wt1, err := hookPkg.SetupSidechannel( - ctx1, - git.HooksPayload{ - RuntimeDir: runtimeDir, - }, - func(c *net.UnixConn) error { - defer close(start2) - <-start1 - if _, err := io.WriteString(c, stdin); err != nil { - return err - } - if err := c.CloseWrite(); err != nil { - return err - } - - // Read one byte of the response to ensure that this call got handled - // before the next one. - buf := make([]byte, 1) - _, err := io.ReadFull(c, buf) - return err - }, - ) - require.NoError(t, err) - defer testhelper.MustClose(t, wt1) + var wg sync.WaitGroup + // The first call sends a valid request, but will then immediately hang up without reading the response. This + // should not impact the second call in any way even if it uses the same cache entry. + wg.Add(1) go func() { defer wg.Done() - _, err := client1.PackObjectsHookWithSidechannel(ctx1, req) - if runtime.GOOS == "darwin" { - assert.Contains(t, []codes.Code{codes.Canceled, codes.Internal}, status.Code(err)) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - if status.Code(err) == codes.Internal { - assert.Contains(t, err.Error(), "write: socket is not connected") - } + client, conn := newHooksClient(t, cfg.SocketPath) + defer testhelper.MustClose(t, conn) + + ctx, wt, err := hookPkg.SetupSidechannel( + ctx, + git.HooksPayload{ + RuntimeDir: runtimeDir, + }, + func(c *net.UnixConn) error { + if _, err := io.WriteString(c, stdin); err != nil { + return err + } + if err := c.CloseWrite(); err != nil { + return err + } + + // Read one byte of the response to ensure that this call got handled before the next + // one. Afterwards we exit immediately without reading the rest of the response. + buf := make([]byte, 1) + _, err := io.ReadFull(c, buf) + + // Step 2: unblock the second Goroutine such that it can start invoking the RPC. At this + // point in time we know that git-pack-objects(1) is running already and originally + // created by this Goroutine. + syncCh <- struct{}{} + // Step 3: we wait for the second Goroutine to catch up and end up in the code that + // handles the sidechannel. + <-syncCh + + return err + }, + ) + require.NoError(t, err) + defer testhelper.MustClose(t, wt) + + _, err = client.PackObjectsHookWithSidechannel(ctx, req) + if runtime.GOOS == "darwin" { + require.Error(t, err) + // macOS uses different logic than Linux systems because the sendfile(3P) syscall is not + // available. The resulting error message is non-deterministic and includes the actual path of + // the pipe we're trying to write to. + require.Regexp(t, "pack objects hook: write unix ->.*: write: broken pipe", err.Error()) + testhelper.RequireGrpcCode(t, err, codes.Canceled) } else { - testhelper.AssertGrpcCode(t, err, codes.Canceled) + testhelper.RequireGrpcError(t, structerr.NewCanceled("pack objects hook: broken pipe"), err) } + require.NoError(t, wt.Wait()) - assert.NoError(t, wt1.Wait()) + cancel() + + // Step 6: unblock the second Goroutine such that it can resume processing the git-pack-objects(1) data. + syncCh <- struct{}{} }() - // Call 2: this is a normal call with the same request as call 1 - client2, conn2 := newHooksClient(t, cfg.SocketPath) - defer conn2.Close() + // The second call sends the same request as we do in the first one, but starts at a point where the first call + // has already started to do its thing. But even though the first call will drop out early, we should be able to + // fully receive the packfile here. + wg.Add(1) + var stdout bytes.Buffer + go func() { + defer wg.Done() - ctx2, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancel := context.WithCancel(ctx) + defer cancel() - var stdout2 []byte - ctx2, wt2, err := hookPkg.SetupSidechannel( - ctx2, - git.HooksPayload{ - RuntimeDir: runtimeDir, - }, - func(c *net.UnixConn) error { - <-start2 - if _, err := io.WriteString(c, stdin); err != nil { - return err - } - if err := c.CloseWrite(); err != nil { - return err - } + client, conn := newHooksClient(t, cfg.SocketPath) + defer testhelper.MustClose(t, conn) - return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { - if band == 1 { - stdout2 = append(stdout2, data...) + ctx, wt, err := hookPkg.SetupSidechannel( + ctx, + git.HooksPayload{ + RuntimeDir: runtimeDir, + }, + func(c *net.UnixConn) error { + // Step 4: unblock the first Goroutine such that it can exit. We know know that both + // Goroutines are handling the git-pack-objects(1) data. + syncCh <- struct{}{} + // Step 5: we wait for the first Goroutine to stop processing the RPC. After this point, + // we're the only ones still processing output of git-pack-objects(1). + <-syncCh + + // Step 7: remainder of this logic. The first Goroutine has finished processing the RPC + // and is about to exit. Furthermore, it has cancelled its context. + + if _, err := io.WriteString(c, stdin); err != nil { + return err + } + if err := c.CloseWrite(); err != nil { + return err } - return nil - }) - }, - ) - require.NoError(t, err) - defer testhelper.MustClose(t, wt2) - go func() { - defer wg.Done() - _, err := client2.PackObjectsHookWithSidechannel(ctx2, req) + return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { + if band == 1 { + _, err := stdout.Write(data) + return err + } + return nil + }) + }, + ) + require.NoError(t, err) + defer testhelper.MustClose(t, wt) + + // Step 1: we wait for the first Goroutine to have started processing its call. + <-syncCh + + _, err = client.PackObjectsHookWithSidechannel(ctx, req) assert.NoError(t, err) - assert.NoError(t, wt2.Wait()) + assert.NoError(t, wt.Wait()) }() - close(start1) wg.Wait() // Sanity check: second call received valid response - gittest.ExecOpts( - t, - cfg, - gittest.ExecConfig{Stdin: bytes.NewReader(stdout2)}, - "-C", repoPath, "index-pack", "--stdin", "--fix-thin", - ) + gittest.ExecOpts(t, cfg, gittest.ExecConfig{Stdin: &stdout}, "-C", repoPath, "index-pack", "--stdin", "--fix-thin") } func TestServer_PackObjectsHook_usesCache(t *testing.T) { |