diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-10 15:34:44 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-15 10:44:51 +0300 |
commit | 5d578cdc12ae1298db94f73034fc6a2ea678a2aa (patch) | |
tree | f943db21a5ab4344b4e6728d937a68ec7530af43 | |
parent | 51b78efbabefe3abd586a7d5fee12a52905a4af0 (diff) |
catfile: Implement support for flushing
Our git-cat-file(1) processes currently use unbuffered I/O both to
accept requests, where we directly write into the process's pipe, and to
write out results given that we never pass the "--buffer" flag to the
processes. As a result, we can see in production systems that there's
quite some overhead at times of high load due to writing to these pipes,
which causes an excessive number of read/write syscalls and thus a lot
of context switches.
We couldn't really implement buffered I/O for this subsystem in the past
given that all requesting and reading objects was directly tied to each
other. This has changed now with the recent introduction of the request
queue, which allows us to split up requesting objects and reading the
results.
Implement support for flushing in the request queue as a first step
towards using buffered I/O. As of now this is still a no-op, but will be
extended in subsequent commits as we convert the code to use buffering.
-rw-r--r-- | internal/git/catfile/object_info_reader.go | 3 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader_test.go | 53 | ||||
-rw-r--r-- | internal/git/catfile/object_reader.go | 3 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 59 | ||||
-rw-r--r-- | internal/git/catfile/request_queue.go | 8 |
5 files changed, 126 insertions, 0 deletions
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index d26c77b1d..cf8055eba 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -107,6 +107,9 @@ type ObjectInfoQueue interface { RequestRevision(git.Revision) error // ReadInfo reads object info which has previously been requested. ReadInfo() (*ObjectInfo, error) + // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which + // have been requested up to this point. + Flush() error } // objectInfoReader is a reader for Git object information. This reader is implemented via a diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go index 0750ae446..d45a6ef39 100644 --- a/internal/git/catfile/object_info_reader_test.go +++ b/internal/git/catfile/object_info_reader_test.go @@ -254,6 +254,59 @@ func TestObjectInfoReader_queue(t *testing.T) { require.Equal(t, errors.New("no outstanding request"), err) }) + t.Run("flush with single request", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + // We flush once before and once after requesting the object such that we can be + // sure that it doesn't impact which objects we can read. + require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) + + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &blobInfo, info) + }) + + t.Run("flush with multiple requests", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + for i := 0; i < 10; i++ { + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + } + require.NoError(t, queue.Flush()) + + for i := 0; i < 10; i++ { + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &blobInfo, info) + } + }) + + t.Run("flush without request", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.Flush()) + + _, err = queue.ReadInfo() + require.Equal(t, errors.New("no outstanding request"), err) + }) + t.Run("request invalid object info", func(t *testing.T) { reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 540e671de..9a9c07ead 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -38,6 +38,9 @@ type ObjectQueue interface { RequestRevision(git.Revision) error // ReadObject reads an object which has previously been requested. ReadObject() (*Object, error) + // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which + // have been requested up to this point. + Flush() error } // objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index fa525bdba..971e9c420 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -204,6 +204,65 @@ func TestObjectReader_queue(t *testing.T) { require.Equal(t, errors.New("no outstanding request"), err) }) + t.Run("flush with single request", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + // We flush once before and once after requesting the object such that we can be + // sure that it doesn't impact which objects we can read. + require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) + + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "foobar", string(contents)) + }) + + t.Run("flush with multiple requests", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + for i := 0; i < 10; i++ { + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + } + require.NoError(t, queue.Flush()) + + for i := 0; i < 10; i++ { + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "foobar", string(contents)) + } + }) + + t.Run("flush without request", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.Flush()) + + _, err = queue.ReadObject() + require.Equal(t, errors.New("no outstanding request"), err) + }) + t.Run("request invalid object", func(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index 21f7508c1..178d29fbc 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -84,6 +84,14 @@ func (q *requestQueue) RequestRevision(revision git.Revision) error { return nil } +func (q *requestQueue) Flush() error { + if q.isClosed() { + return fmt.Errorf("cannot flush: %w", os.ErrClosed) + } + + return nil +} + func (q *requestQueue) ReadObject() (*Object, error) { if !q.isObjectQueue { panic("object queue used to read object info") |