Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-19 07:04:53 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-19 07:04:53 +0300
commit0bef3b7e4677bb820c8ab8946024688f27fc5282 (patch)
treecced5a4ed61b18553663ff4463e7ea42c3734f2c
parent8e36035898e136f098e71e8d77052542edccbf8f (diff)
parent0952026e82cf500b1fcb7a3515f1665fe283f41d (diff)
Merge branch 'pks-pack-objects-hook-context-cancellation-flake' into 'master'
hook: Fix pack-objects hook test with context cancellation Closes #5548 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6372 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go201
1 files changed, 114 insertions, 87 deletions
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 80811dc5e..c70de60de 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -3,6 +3,7 @@ package hook
import (
"bytes"
"context"
+ "crypto/rand"
"fmt"
"io"
"net"
@@ -29,6 +30,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v16/streamio"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -82,8 +84,6 @@ func TestParsePackObjectsArgs(t *testing.T) {
}
func TestServer_PackObjectsHook_separateContext(t *testing.T) {
- testhelper.SkipQuarantinedTest(t, "https://gitlab.com/gitlab-org/gitaly/-/issues/5548")
-
t.Parallel()
runTestsWithRuntimeDir(t, testServerPackObjectsHookSeparateContextWithRuntimeDir)
}
@@ -94,13 +94,14 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim
cfg := cfgWithCache(t, 0)
cfg.SocketPath = runHooksServer(t, cfg, nil)
- ctx1, cancel := context.WithCancel(ctx)
- defer cancel()
- repo, repoPath := gittest.CreateRepository(t, ctx1, cfg)
-
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
// We write a commit with a large blob such that the response needs to be split over multiple messages.
// Otherwise it may happen that the request will finish before we can actually cancel the context.
- commitID := gittest.WriteCommit(t, cfg, repoPath)
+ data := make([]byte, 10*streamio.WriteBufferSize)
+ _, _ = rand.Read(data[:])
+ commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithTreeEntries(
+ gittest.TreeEntry{Path: "path", Mode: "100644", Content: string(data)},
+ ))
req := &gitalypb.PackObjectsHookWithSidechannelRequest{
Repository: repo,
@@ -108,108 +109,134 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim
}
stdin := commitID.String() + "\n--not\n\n"
- start1 := make(chan struct{})
- start2 := make(chan struct{})
- wg := &sync.WaitGroup{}
- wg.Add(2)
+ syncCh := make(chan struct{})
- // Call 1: sends a valid request but hangs up without reading response.
- // This should not break call 2.
- client1, conn1 := newHooksClient(t, cfg.SocketPath)
- defer conn1.Close()
-
- ctx1, wt1, err := hookPkg.SetupSidechannel(
- ctx1,
- git.HooksPayload{
- RuntimeDir: runtimeDir,
- },
- func(c *net.UnixConn) error {
- defer close(start2)
- <-start1
- if _, err := io.WriteString(c, stdin); err != nil {
- return err
- }
- if err := c.CloseWrite(); err != nil {
- return err
- }
-
- // Read one byte of the response to ensure that this call got handled
- // before the next one.
- buf := make([]byte, 1)
- _, err := io.ReadFull(c, buf)
- return err
- },
- )
- require.NoError(t, err)
- defer testhelper.MustClose(t, wt1)
+ var wg sync.WaitGroup
+ // The first call sends a valid request, but will then immediately hang up without reading the response. This
+ // should not impact the second call in any way even if it uses the same cache entry.
+ wg.Add(1)
go func() {
defer wg.Done()
- _, err := client1.PackObjectsHookWithSidechannel(ctx1, req)
- if runtime.GOOS == "darwin" {
- assert.Contains(t, []codes.Code{codes.Canceled, codes.Internal}, status.Code(err))
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
- if status.Code(err) == codes.Internal {
- assert.Contains(t, err.Error(), "write: socket is not connected")
- }
+ client, conn := newHooksClient(t, cfg.SocketPath)
+ defer testhelper.MustClose(t, conn)
+
+ ctx, wt, err := hookPkg.SetupSidechannel(
+ ctx,
+ git.HooksPayload{
+ RuntimeDir: runtimeDir,
+ },
+ func(c *net.UnixConn) error {
+ if _, err := io.WriteString(c, stdin); err != nil {
+ return err
+ }
+ if err := c.CloseWrite(); err != nil {
+ return err
+ }
+
+ // Read one byte of the response to ensure that this call got handled before the next
+ // one. Afterwards we exit immediately without reading the rest of the response.
+ buf := make([]byte, 1)
+ _, err := io.ReadFull(c, buf)
+
+ // Step 2: unblock the second Goroutine such that it can start invoking the RPC. At this
+ // point in time we know that git-pack-objects(1) is running already and originally
+ // created by this Goroutine.
+ syncCh <- struct{}{}
+ // Step 3: we wait for the second Goroutine to catch up and end up in the code that
+ // handles the sidechannel.
+ <-syncCh
+
+ return err
+ },
+ )
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, wt)
+
+ _, err = client.PackObjectsHookWithSidechannel(ctx, req)
+ if runtime.GOOS == "darwin" {
+ require.Error(t, err)
+ // macOS uses different logic than Linux systems because the sendfile(3P) syscall is not
+ // available. The resulting error message is non-deterministic and includes the actual path of
+ // the pipe we're trying to write to.
+ require.Regexp(t, "pack objects hook: write unix ->.*: write: broken pipe", err.Error())
+ testhelper.RequireGrpcCode(t, err, codes.Canceled)
} else {
- testhelper.AssertGrpcCode(t, err, codes.Canceled)
+ testhelper.RequireGrpcError(t, structerr.NewCanceled("pack objects hook: broken pipe"), err)
}
+ require.NoError(t, wt.Wait())
- assert.NoError(t, wt1.Wait())
+ cancel()
+
+ // Step 6: unblock the second Goroutine such that it can resume processing the git-pack-objects(1) data.
+ syncCh <- struct{}{}
}()
- // Call 2: this is a normal call with the same request as call 1
- client2, conn2 := newHooksClient(t, cfg.SocketPath)
- defer conn2.Close()
+ // The second call sends the same request as we do in the first one, but starts at a point where the first call
+ // has already started to do its thing. But even though the first call will drop out early, we should be able to
+ // fully receive the packfile here.
+ wg.Add(1)
+ var stdout bytes.Buffer
+ go func() {
+ defer wg.Done()
- ctx2, cancel := context.WithCancel(ctx)
- defer cancel()
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
- var stdout2 []byte
- ctx2, wt2, err := hookPkg.SetupSidechannel(
- ctx2,
- git.HooksPayload{
- RuntimeDir: runtimeDir,
- },
- func(c *net.UnixConn) error {
- <-start2
- if _, err := io.WriteString(c, stdin); err != nil {
- return err
- }
- if err := c.CloseWrite(); err != nil {
- return err
- }
+ client, conn := newHooksClient(t, cfg.SocketPath)
+ defer testhelper.MustClose(t, conn)
- return pktline.EachSidebandPacket(c, func(band byte, data []byte) error {
- if band == 1 {
- stdout2 = append(stdout2, data...)
+ ctx, wt, err := hookPkg.SetupSidechannel(
+ ctx,
+ git.HooksPayload{
+ RuntimeDir: runtimeDir,
+ },
+ func(c *net.UnixConn) error {
+ // Step 4: unblock the first Goroutine such that it can exit. We know know that both
+ // Goroutines are handling the git-pack-objects(1) data.
+ syncCh <- struct{}{}
+ // Step 5: we wait for the first Goroutine to stop processing the RPC. After this point,
+ // we're the only ones still processing output of git-pack-objects(1).
+ <-syncCh
+
+ // Step 7: remainder of this logic. The first Goroutine has finished processing the RPC
+ // and is about to exit. Furthermore, it has cancelled its context.
+
+ if _, err := io.WriteString(c, stdin); err != nil {
+ return err
+ }
+ if err := c.CloseWrite(); err != nil {
+ return err
}
- return nil
- })
- },
- )
- require.NoError(t, err)
- defer testhelper.MustClose(t, wt2)
- go func() {
- defer wg.Done()
- _, err := client2.PackObjectsHookWithSidechannel(ctx2, req)
+ return pktline.EachSidebandPacket(c, func(band byte, data []byte) error {
+ if band == 1 {
+ _, err := stdout.Write(data)
+ return err
+ }
+ return nil
+ })
+ },
+ )
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, wt)
+
+ // Step 1: we wait for the first Goroutine to have started processing its call.
+ <-syncCh
+
+ _, err = client.PackObjectsHookWithSidechannel(ctx, req)
assert.NoError(t, err)
- assert.NoError(t, wt2.Wait())
+ assert.NoError(t, wt.Wait())
}()
- close(start1)
wg.Wait()
// Sanity check: second call received valid response
- gittest.ExecOpts(
- t,
- cfg,
- gittest.ExecConfig{Stdin: bytes.NewReader(stdout2)},
- "-C", repoPath, "index-pack", "--stdin", "--fix-thin",
- )
+ gittest.ExecOpts(t, cfg, gittest.ExecConfig{Stdin: &stdout}, "-C", repoPath, "index-pack", "--stdin", "--fix-thin")
}
func TestServer_PackObjectsHook_usesCache(t *testing.T) {