Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-10 15:34:44 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-15 10:44:51 +0300
commit5d578cdc12ae1298db94f73034fc6a2ea678a2aa (patch)
treef943db21a5ab4344b4e6728d937a68ec7530af43
parent51b78efbabefe3abd586a7d5fee12a52905a4af0 (diff)
catfile: Implement support for flushing
Our git-cat-file(1) processes currently use unbuffered I/O both to accept requests, where we directly write into the process's pipe, and to write out results given that we never pass the "--buffer" flag to the processes. As a result, we can see in production systems that there's quite some overhead at times of high load due to writing to these pipes, which causes an excessive number of read/write syscalls and thus a lot of context switches. We couldn't really implement buffered I/O for this subsystem in the past given that all requesting and reading objects was directly tied to each other. This has changed now with the recent introduction of the request queue, which allows us to split up requesting objects and reading the results. Implement support for flushing in the request queue as a first step towards using buffered I/O. As of now this is still a no-op, but will be extended in subsequent commits as we convert the code to use buffering.
-rw-r--r--internal/git/catfile/object_info_reader.go3
-rw-r--r--internal/git/catfile/object_info_reader_test.go53
-rw-r--r--internal/git/catfile/object_reader.go3
-rw-r--r--internal/git/catfile/object_reader_test.go59
-rw-r--r--internal/git/catfile/request_queue.go8
5 files changed, 126 insertions, 0 deletions
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index d26c77b1d..cf8055eba 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -107,6 +107,9 @@ type ObjectInfoQueue interface {
RequestRevision(git.Revision) error
// ReadInfo reads object info which has previously been requested.
ReadInfo() (*ObjectInfo, error)
+ // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
+ // have been requested up to this point.
+ Flush() error
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
index 0750ae446..d45a6ef39 100644
--- a/internal/git/catfile/object_info_reader_test.go
+++ b/internal/git/catfile/object_info_reader_test.go
@@ -254,6 +254,59 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.Equal(t, errors.New("no outstanding request"), err)
})
+ t.Run("flush with single request", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ // We flush once before and once after requesting the object such that we can be
+ // sure that it doesn't impact which objects we can read.
+ require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
+
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &blobInfo, info)
+ })
+
+ t.Run("flush with multiple requests", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ for i := 0; i < 10; i++ {
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ }
+ require.NoError(t, queue.Flush())
+
+ for i := 0; i < 10; i++ {
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &blobInfo, info)
+ }
+ })
+
+ t.Run("flush without request", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.Flush())
+
+ _, err = queue.ReadInfo()
+ require.Equal(t, errors.New("no outstanding request"), err)
+ })
+
t.Run("request invalid object info", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 540e671de..9a9c07ead 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -38,6 +38,9 @@ type ObjectQueue interface {
RequestRevision(git.Revision) error
// ReadObject reads an object which has previously been requested.
ReadObject() (*Object, error)
+ // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
+ // have been requested up to this point.
+ Flush() error
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index fa525bdba..971e9c420 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -204,6 +204,65 @@ func TestObjectReader_queue(t *testing.T) {
require.Equal(t, errors.New("no outstanding request"), err)
})
+ t.Run("flush with single request", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ // We flush once before and once after requesting the object such that we can be
+ // sure that it doesn't impact which objects we can read.
+ require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
+
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, "foobar", string(contents))
+ })
+
+ t.Run("flush with multiple requests", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ for i := 0; i < 10; i++ {
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ }
+ require.NoError(t, queue.Flush())
+
+ for i := 0; i < 10; i++ {
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, "foobar", string(contents))
+ }
+ })
+
+ t.Run("flush without request", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.Flush())
+
+ _, err = queue.ReadObject()
+ require.Equal(t, errors.New("no outstanding request"), err)
+ })
+
t.Run("request invalid object", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index 21f7508c1..178d29fbc 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -84,6 +84,14 @@ func (q *requestQueue) RequestRevision(revision git.Revision) error {
return nil
}
+func (q *requestQueue) Flush() error {
+ if q.isClosed() {
+ return fmt.Errorf("cannot flush: %w", os.ErrClosed)
+ }
+
+ return nil
+}
+
func (q *requestQueue) ReadObject() (*Object, error) {
if !q.isObjectQueue {
panic("object queue used to read object info")