Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-10 15:37:42 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-15 10:44:51 +0300
commit3dbe00d31b26f0cf0d3d247664d7b4e977297144 (patch)
tree0575f8faaeec3bba2e163920a6cf731b11e8175c
parent5d578cdc12ae1298db94f73034fc6a2ea678a2aa (diff)
catfile: Flush requests as required
While the request queue of catfile processes already exposes a way to flush outstanding requests, calling it is not mandatory yet given that we do not yet use buffered I/O. This is going to change in subsequent commits though, so let's prepare for that by calling `Flush()` in all required places. Note that in case of the gitpipe package we're also converting the channels to be buffered channels: this allow us to queue multiple requests before flushing such that we can make better use of the upcoming buffered stdin.
-rw-r--r--internal/git/catfile/object_info_reader.go7
-rw-r--r--internal/git/catfile/object_info_reader_test.go9
-rw-r--r--internal/git/catfile/object_reader.go7
-rw-r--r--internal/git/catfile/object_reader_test.go10
-rw-r--r--internal/git/gitpipe/catfile_info.go22
-rw-r--r--internal/git/gitpipe/catfile_object.go22
6 files changed, 73 insertions, 4 deletions
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index cf8055eba..c551b9777 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -101,7 +101,8 @@ type ObjectInfoReader interface {
// ObjectInfoQueue allows for requesting and reading object info independently of each other. The
// number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the
// object has been requested already. The order of objects returned by ReadInfo is the same as the
-// order in which object info has been requested.
+// order in which object info has been requested. Users of this interface must call `Flush()` after
+// all requests have been queued up such that all requested objects will be readable.
type ObjectInfoQueue interface {
// RequestRevision requests the given revision from git-cat-file(1).
RequestRevision(git.Revision) error
@@ -200,6 +201,10 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob
return nil, err
}
+ if err := queue.Flush(); err != nil {
+ return nil, err
+ }
+
objectInfo, err := queue.ReadInfo()
if err != nil {
return nil, err
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
index d45a6ef39..8bd089393 100644
--- a/internal/git/catfile/object_info_reader_test.go
+++ b/internal/git/catfile/object_info_reader_test.go
@@ -198,6 +198,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
@@ -217,6 +218,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
commitOID: commitInfo,
} {
require.NoError(t, queue.RequestRevision(oid.Revision()))
+ require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
@@ -234,6 +236,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.RequestRevision(commitOID.Revision()))
+ require.NoError(t, queue.Flush())
for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} {
info, err := queue.ReadInfo()
@@ -316,6 +319,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
_, err = queue.ReadInfo()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
@@ -330,12 +334,15 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
+
_, err = queue.ReadInfo()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object info after the previous one has failed should continue
// to work alright.
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
@@ -372,6 +379,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.False(t, queue.isDirty())
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
@@ -409,6 +417,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
// Request the object before we close the queue.
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
queue.close()
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 9a9c07ead..b1ed8650f 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -32,7 +32,8 @@ type ObjectReader interface {
// ObjectQueue allows for requesting and reading objects independently of each other. The number of
// RequestObject and ReadObject calls must match. ReadObject must be executed after the object has
// been requested already. The order of objects returned by ReadObject is the same as the order in
-// which objects have been requested.
+// which objects have been requested. Users of this interface must call `Flush()` after all requests
+// have been queued up such that all requested objects will be readable.
type ObjectQueue interface {
// RequestRevision requests the given revision from git-cat-file(1).
RequestRevision(git.Revision) error
@@ -131,6 +132,10 @@ func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Obje
return nil, err
}
+ if err := queue.Flush(); err != nil {
+ return nil, err
+ }
+
object, err := queue.ReadObject()
if err != nil {
return nil, err
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index 971e9c420..78738f988 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -139,6 +139,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -161,6 +162,7 @@ func TestObjectReader_queue(t *testing.T) {
barfooBlob: "barfoo",
} {
require.NoError(t, queue.RequestRevision(blobID.Revision()))
+ require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -181,6 +183,7 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.RequestRevision(barfooBlob.Revision()))
+ require.NoError(t, queue.Flush())
for _, expectedContents := range []string{"foobar", "barfoo"} {
object, err := queue.ReadObject()
@@ -272,6 +275,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
_, err = queue.ReadObject()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
@@ -286,12 +290,15 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
+
_, err = queue.ReadObject()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object after the previous one has failed should continue to
// work alright.
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -331,6 +338,7 @@ func TestObjectReader_queue(t *testing.T) {
require.False(t, queue.isDirty())
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
@@ -375,6 +383,7 @@ func TestObjectReader_queue(t *testing.T) {
// Request the object before we close the queue.
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
queue.close()
@@ -394,6 +403,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
// Read the object header before closing.
object, err := queue.ReadObject()
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index e87628f38..b756e08f3 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -67,10 +67,11 @@ func CatfileInfo(
}
defer cleanup()
- requestChan := make(chan catfileInfoRequest)
+ requestChan := make(chan catfileInfoRequest, 32)
go func() {
defer close(requestChan)
+ var i int64
for it.Next() {
if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
select {
@@ -88,6 +89,17 @@ func CatfileInfo(
case <-ctx.Done():
return
}
+
+ i++
+ if i%int64(cap(requestChan)) == 0 {
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileInfoRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
}
if err := it.Err(); err != nil {
@@ -97,6 +109,14 @@ func CatfileInfo(
return
}
}
+
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileInfoRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
}()
resultChan := make(chan CatfileInfoResult)
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 5ec2e9fb5..49e33b859 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -44,10 +44,11 @@ func CatfileObject(
}
defer cleanup()
- requestChan := make(chan catfileObjectRequest)
+ requestChan := make(chan catfileObjectRequest, 32)
go func() {
defer close(requestChan)
+ var i int64
for it.Next() {
if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
select {
@@ -62,6 +63,17 @@ func CatfileObject(
case <-ctx.Done():
return
}
+
+ i++
+ if i%int64(cap(requestChan)) == 0 {
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
}
if err := it.Err(); err != nil {
@@ -71,6 +83,14 @@ func CatfileObject(
return
}
}
+
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
}()
resultChan := make(chan CatfileObjectResult)