diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-06-03 12:28:25 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-06-07 10:55:59 +0300 |
commit | a72be286fcc4e8de993332175a32ff6d5f91e818 (patch) | |
tree | 21201208638d7ed679bff507f643502b96c21373 | |
parent | a31bd1be25d0ff03efaa7f756321ea9440122b24 (diff) |
catfile: Add tests which exercise the request queuepks-catfile-request-queue-tests
We do not currently have tests which directly exercise the request queue
at a low level. Add some to improve our test coverage and assert that it
continues to work alright for several edge cases.
-rw-r--r-- | internal/git/catfile/request_queue_test.go | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/internal/git/catfile/request_queue_test.go b/internal/git/catfile/request_queue_test.go new file mode 100644 index 000000000..b09ee75ab --- /dev/null +++ b/internal/git/catfile/request_queue_test.go @@ -0,0 +1,325 @@ +package catfile + +import ( + "context" + "fmt" + "io" + "os" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" +) + +func TestRequestQueue_ReadObject(t *testing.T) { + ctx := testhelper.Context(t) + + t.Run("ReadInfo on ReadObject queue", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, "#!/bin/sh\nread\n") + + require.PanicsWithValue(t, "object queue used to read object info", func() { + _, _ = queue.ReadInfo() + }) + }) + + t.Run("read without request", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, "#!/bin/sh\nread\n") + _, err := queue.ReadObject() + require.Equal(t, fmt.Errorf("no outstanding request"), err) + }) + + t.Run("read on closed reader", func(t *testing.T) { + reader, queue := newInterceptedQueue(ctx, t, "#!/bin/sh\nread\n") + + require.NoError(t, queue.RequestRevision("foo")) + require.True(t, queue.isDirty()) + + reader.close() + require.True(t, queue.isDirty()) + + _, err := queue.ReadObject() + require.Equal(t, fmt.Errorf("cannot read object info: %w", fmt.Errorf("file already closed")), err) + }) + + t.Run("read with unconsumed object", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + echo "1111111111111111111111111111111111111111 commit 464" + `) + + // We queue two revisions... + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.RequestRevision("foo")) + + // .. and only unqueue one object. This object isn't read though, ... + _, err := queue.ReadObject() + require.NoError(t, err) + + // ... which means that trying to read the second object should fail now. + _, err = queue.ReadObject() + require.Equal(t, fmt.Errorf("current object has not been fully read"), err) + + require.True(t, queue.isDirty()) + }) + + t.Run("read with invalid object header", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + echo "something something" + `) + + require.NoError(t, queue.RequestRevision("foo")) + + _, err := queue.ReadObject() + require.Equal(t, fmt.Errorf("invalid info line: %q", "something something"), err) + + // The queue must be dirty when we failed due to an unexpected error. + require.True(t, queue.isDirty()) + }) + + t.Run("read with unexpected exit", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + exit 1 + `) + + require.NoError(t, queue.RequestRevision("foo")) + + _, err := queue.ReadObject() + require.Equal(t, fmt.Errorf("read info line: %w", io.EOF), err) + + // The queue must be dirty when we failed due to an unexpected error. + require.True(t, queue.isDirty()) + }) + + t.Run("read with missing object", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + echo "1111111111111111111111111111111111111111 missing" + `) + + require.NoError(t, queue.RequestRevision("foo")) + + _, err := queue.ReadObject() + require.Equal(t, NotFoundError{error: fmt.Errorf("object not found")}, err) + + // The queue must be empty even if the object wasn't found: this is a graceful + // failure that we should handle alright. + require.False(t, queue.isDirty()) + }) + + t.Run("read single object", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + echo "1111111111111111111111111111111111111111 blob 10" + echo "1234567890" + `) + + require.NoError(t, queue.RequestRevision("foo")) + require.True(t, queue.isDirty()) + + object, err := queue.ReadObject() + require.NoError(t, err) + require.Equal(t, ObjectInfo{ + Oid: "1111111111111111111111111111111111111111", + Type: "blob", + Size: 10, + }, object.ObjectInfo) + + data, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "1234567890", string(data)) + + require.False(t, queue.isDirty()) + }) + + t.Run("read multiple objects", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + echo "1111111111111111111111111111111111111111 blob 10" + echo "1234567890" + echo "2222222222222222222222222222222222222222 commit 10" + echo "0987654321" + `) + + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.RequestRevision("foo")) + require.True(t, queue.isDirty()) + + for _, expectedObject := range []struct { + info ObjectInfo + data string + }{ + { + info: ObjectInfo{ + Oid: "1111111111111111111111111111111111111111", + Type: "blob", + Size: 10, + }, + data: "1234567890", + }, + { + info: ObjectInfo{ + Oid: "2222222222222222222222222222222222222222", + Type: "commit", + Size: 10, + }, + data: "0987654321", + }, + } { + require.True(t, queue.isDirty()) + + object, err := queue.ReadObject() + require.NoError(t, err) + require.Equal(t, expectedObject.info, object.ObjectInfo) + + data, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, expectedObject.data, string(data)) + } + + require.False(t, queue.isDirty()) + }) + + t.Run("truncated object", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + echo "1111111111111111111111111111111111111111 blob 10" + printf "123" + `) + + require.NoError(t, queue.RequestRevision("foo")) + require.True(t, queue.isDirty()) + + object, err := queue.ReadObject() + require.NoError(t, err) + require.Equal(t, ObjectInfo{ + Oid: "1111111111111111111111111111111111111111", + Type: "blob", + Size: 10, + }, object.ObjectInfo) + + // The first read will succeed and return the prefix. + var buf [10]byte + n, err := object.Read(buf[:]) + require.NoError(t, err) + require.Equal(t, 3, n) + require.Equal(t, "123", string(buf[:n])) + + // But the second read must fail and give us a hint that the read had been + // truncated. Note that we explicitly expect to not see an io.EOF here, + // which might indicate success to the caller. + _, err = object.Read(buf[:]) + require.Equal(t, fmt.Errorf("discard newline: \"EOF\""), err) + + require.True(t, queue.isDirty()) + }) +} + +func TestRequestQueue_RequestRevision(t *testing.T) { + ctx := testhelper.Context(t) + + requireRevision := func(t *testing.T, queue *requestQueue, rev git.Revision) { + object, err := queue.ReadObject() + require.NoError(t, err) + + data, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, rev, git.Revision(data)) + } + + t.Run("requesting revision on closed queue", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, "#!/bin/sh") + queue.close() + + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision("foo")) + }) + + t.Run("requesting revision on closed process", func(t *testing.T) { + process, queue := newInterceptedQueue(ctx, t, "#!/bin/sh") + + process.close() + + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision("foo")) + }) + + t.Run("single request", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + read revision + echo "1111111111111111111111111111111111111111 blob ${#revision}" + echo "${revision}" + `) + + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.Flush()) + + requireRevision(t, queue, "foo") + }) + + t.Run("multiple request", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + while read revision + do + echo "1111111111111111111111111111111111111111 blob ${#revision}" + echo "${revision}" + done + `) + + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.RequestRevision("bar")) + require.NoError(t, queue.RequestRevision("baz")) + require.NoError(t, queue.RequestRevision("qux")) + require.NoError(t, queue.Flush()) + + requireRevision(t, queue, "foo") + requireRevision(t, queue, "bar") + requireRevision(t, queue, "baz") + requireRevision(t, queue, "qux") + }) + + t.Run("multiple request with intermediate flushing", func(t *testing.T) { + _, queue := newInterceptedQueue(ctx, t, `#!/bin/sh + while read revision + do + read flush + if test "$flush" != "FLUSH" + then + echo "expected a flush" + exit 1 + fi + + echo "1111111111111111111111111111111111111111 blob ${#revision}" + echo "${revision}" + done + `) + + for _, revision := range []git.Revision{ + "foo", + "bar", + "foo", + "qux", + } { + require.NoError(t, queue.RequestRevision(revision)) + require.NoError(t, queue.Flush()) + requireRevision(t, queue, revision) + } + }) +} + +func newInterceptedQueue(ctx context.Context, t *testing.T, script string) (ObjectReader, *requestQueue) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + + commandFactory := gittest.NewInterceptingCommandFactory(ctx, t, cfg, func(execEnv git.ExecutionEnvironment) string { + return script + }) + repoExecutor := repoExecutor{ + GitRepo: repo, + gitCmdFactory: commandFactory, + } + + reader, err := newObjectReader(ctx, &repoExecutor, nil) + require.NoError(t, err) + t.Cleanup(reader.close) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + t.Cleanup(cleanup) + + return reader, queue +} |