diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-10 15:37:42 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-15 10:44:51 +0300 |
commit | 3dbe00d31b26f0cf0d3d247664d7b4e977297144 (patch) | |
tree | 0575f8faaeec3bba2e163920a6cf731b11e8175c | |
parent | 5d578cdc12ae1298db94f73034fc6a2ea678a2aa (diff) |
catfile: Flush requests as required
While the request queue of catfile processes already exposes a way to
flush outstanding requests, calling it is not mandatory yet given that
we do not yet use buffered I/O. This is going to change in subsequent
commits though, so let's prepare for that by calling `Flush()` in all
required places.
Note that in case of the gitpipe package we're also converting the
channels to be buffered channels: this allow us to queue multiple
requests before flushing such that we can make better use of the
upcoming buffered stdin.
-rw-r--r-- | internal/git/catfile/object_info_reader.go | 7 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader_test.go | 9 | ||||
-rw-r--r-- | internal/git/catfile/object_reader.go | 7 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 10 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 22 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 22 |
6 files changed, 73 insertions, 4 deletions
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index cf8055eba..c551b9777 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -101,7 +101,8 @@ type ObjectInfoReader interface { // ObjectInfoQueue allows for requesting and reading object info independently of each other. The // number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the // object has been requested already. The order of objects returned by ReadInfo is the same as the -// order in which object info has been requested. +// order in which object info has been requested. Users of this interface must call `Flush()` after +// all requests have been queued up such that all requested objects will be readable. type ObjectInfoQueue interface { // RequestRevision requests the given revision from git-cat-file(1). RequestRevision(git.Revision) error @@ -200,6 +201,10 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob return nil, err } + if err := queue.Flush(); err != nil { + return nil, err + } + objectInfo, err := queue.ReadInfo() if err != nil { return nil, err diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go index d45a6ef39..8bd089393 100644 --- a/internal/git/catfile/object_info_reader_test.go +++ b/internal/git/catfile/object_info_reader_test.go @@ -198,6 +198,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) @@ -217,6 +218,7 @@ func TestObjectInfoReader_queue(t *testing.T) { commitOID: commitInfo, } { require.NoError(t, queue.RequestRevision(oid.Revision())) + require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) @@ -234,6 +236,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.RequestRevision(commitOID.Revision())) + require.NoError(t, queue.Flush()) for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} { info, err := queue.ReadInfo() @@ -316,6 +319,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) _, err = queue.ReadInfo() require.Equal(t, NotFoundError{errors.New("object not found")}, err) @@ -330,12 +334,15 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) + _, err = queue.ReadInfo() require.Equal(t, NotFoundError{errors.New("object not found")}, err) // Requesting another object info after the previous one has failed should continue // to work alright. require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) require.Equal(t, &blobInfo, info) @@ -372,6 +379,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.False(t, queue.isDirty()) require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) require.True(t, reader.isDirty()) require.True(t, queue.isDirty()) @@ -409,6 +417,7 @@ func TestObjectInfoReader_queue(t *testing.T) { // Request the object before we close the queue. require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) queue.close() diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 9a9c07ead..b1ed8650f 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -32,7 +32,8 @@ type ObjectReader interface { // ObjectQueue allows for requesting and reading objects independently of each other. The number of // RequestObject and ReadObject calls must match. ReadObject must be executed after the object has // been requested already. The order of objects returned by ReadObject is the same as the order in -// which objects have been requested. +// which objects have been requested. Users of this interface must call `Flush()` after all requests +// have been queued up such that all requested objects will be readable. type ObjectQueue interface { // RequestRevision requests the given revision from git-cat-file(1). RequestRevision(git.Revision) error @@ -131,6 +132,10 @@ func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Obje return nil, err } + if err := queue.Flush(); err != nil { + return nil, err + } + object, err := queue.ReadObject() if err != nil { return nil, err diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index 971e9c420..78738f988 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -139,6 +139,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -161,6 +162,7 @@ func TestObjectReader_queue(t *testing.T) { barfooBlob: "barfoo", } { require.NoError(t, queue.RequestRevision(blobID.Revision())) + require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -181,6 +183,7 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.RequestRevision(barfooBlob.Revision())) + require.NoError(t, queue.Flush()) for _, expectedContents := range []string{"foobar", "barfoo"} { object, err := queue.ReadObject() @@ -272,6 +275,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) _, err = queue.ReadObject() require.Equal(t, NotFoundError{errors.New("object not found")}, err) @@ -286,12 +290,15 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) + _, err = queue.ReadObject() require.Equal(t, NotFoundError{errors.New("object not found")}, err) // Requesting another object after the previous one has failed should continue to // work alright. require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -331,6 +338,7 @@ func TestObjectReader_queue(t *testing.T) { require.False(t, queue.isDirty()) require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) require.True(t, reader.isDirty()) require.True(t, queue.isDirty()) @@ -375,6 +383,7 @@ func TestObjectReader_queue(t *testing.T) { // Request the object before we close the queue. require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) queue.close() @@ -394,6 +403,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) // Read the object header before closing. object, err := queue.ReadObject() diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index e87628f38..b756e08f3 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -67,10 +67,11 @@ func CatfileInfo( } defer cleanup() - requestChan := make(chan catfileInfoRequest) + requestChan := make(chan catfileInfoRequest, 32) go func() { defer close(requestChan) + var i int64 for it.Next() { if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { select { @@ -88,6 +89,17 @@ func CatfileInfo( case <-ctx.Done(): return } + + i++ + if i%int64(cap(requestChan)) == 0 { + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileInfoRequest{err: err}: + case <-ctx.Done(): + return + } + } + } } if err := it.Err(); err != nil { @@ -97,6 +109,14 @@ func CatfileInfo( return } } + + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileInfoRequest{err: err}: + case <-ctx.Done(): + return + } + } }() resultChan := make(chan CatfileInfoResult) diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 5ec2e9fb5..49e33b859 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -44,10 +44,11 @@ func CatfileObject( } defer cleanup() - requestChan := make(chan catfileObjectRequest) + requestChan := make(chan catfileObjectRequest, 32) go func() { defer close(requestChan) + var i int64 for it.Next() { if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { select { @@ -62,6 +63,17 @@ func CatfileObject( case <-ctx.Done(): return } + + i++ + if i%int64(cap(requestChan)) == 0 { + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } + } } if err := it.Err(); err != nil { @@ -71,6 +83,14 @@ func CatfileObject( return } } + + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } }() resultChan := make(chan CatfileObjectResult) |