Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-15 18:41:41 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-15 18:41:41 +0300
commit5997ef7cc293c30d219de2dafa763a9c60b0bc43 (patch)
treef34a1076ef8e85c2f02cf624369f22c03ec38f1a
parent51b78efbabefe3abd586a7d5fee12a52905a4af0 (diff)
parent486b2310e4d86ffb5ead677e65fb9690b5d05bbc (diff)
Merge branch 'pks-catfile-bufio' into 'master'
catfile: Convert to use buffered I/O Closes gitlab#340659 See merge request gitlab-org/gitaly!4070
-rw-r--r--internal/git/catfile/object_info_reader.go22
-rw-r--r--internal/git/catfile/object_info_reader_test.go62
-rw-r--r--internal/git/catfile/object_reader.go13
-rw-r--r--internal/git/catfile/object_reader_test.go69
-rw-r--r--internal/git/catfile/request_queue.go41
-rw-r--r--internal/git/gitpipe/catfile_info.go22
-rw-r--r--internal/git/gitpipe/catfile_object.go22
7 files changed, 242 insertions, 9 deletions
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index d26c77b1d..300459e81 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -52,6 +52,7 @@ func IsNotFound(err error) bool {
// ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct
func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) {
+restart:
infoLine, err := stdout.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("read info line: %w", err)
@@ -59,6 +60,14 @@ func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) {
infoLine = strings.TrimSuffix(infoLine, "\n")
if strings.HasSuffix(infoLine, " missing") {
+ // We use a hack to flush stdout of git-cat-file(1), which is that we request an
+ // object that cannot exist. This causes Git to write an error and immediately flush
+ // stdout. The only downside is that we need to filter this error here, but that's
+ // acceptable while git-cat-file(1) doesn't yet have any way to natively flush.
+ if strings.HasPrefix(infoLine, flushCommand) {
+ goto restart
+ }
+
return nil, NotFoundError{fmt.Errorf("object not found")}
}
@@ -101,12 +110,16 @@ type ObjectInfoReader interface {
// ObjectInfoQueue allows for requesting and reading object info independently of each other. The
// number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the
// object has been requested already. The order of objects returned by ReadInfo is the same as the
-// order in which object info has been requested.
+// order in which object info has been requested. Users of this interface must call `Flush()` after
+// all requests have been queued up such that all requested objects will be readable.
type ObjectInfoQueue interface {
// RequestRevision requests the given revision from git-cat-file(1).
RequestRevision(git.Revision) error
// ReadInfo reads object info which has previously been requested.
ReadInfo() (*ObjectInfo, error)
+ // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
+ // have been requested up to this point.
+ Flush() error
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
@@ -133,6 +146,7 @@ func newObjectInfoReader(
Name: "cat-file",
Flags: []git.Option{
git.Flag{Name: "--batch-check"},
+ git.Flag{Name: "--buffer"},
},
},
git.WithStdin(command.SetupStdin),
@@ -146,7 +160,7 @@ func newObjectInfoReader(
counter: counter,
queue: requestQueue{
stdout: bufio.NewReader(batchCmd),
- stdin: batchCmd,
+ stdin: bufio.NewWriter(batchCmd),
},
}
go func() {
@@ -197,6 +211,10 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob
return nil, err
}
+ if err := queue.Flush(); err != nil {
+ return nil, err
+ }
+
objectInfo, err := queue.ReadInfo()
if err != nil {
return nil, err
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
index 0750ae446..8bd089393 100644
--- a/internal/git/catfile/object_info_reader_test.go
+++ b/internal/git/catfile/object_info_reader_test.go
@@ -198,6 +198,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
@@ -217,6 +218,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
commitOID: commitInfo,
} {
require.NoError(t, queue.RequestRevision(oid.Revision()))
+ require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
@@ -234,6 +236,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.RequestRevision(commitOID.Revision()))
+ require.NoError(t, queue.Flush())
for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} {
info, err := queue.ReadInfo()
@@ -254,6 +257,59 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.Equal(t, errors.New("no outstanding request"), err)
})
+ t.Run("flush with single request", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ // We flush once before and once after requesting the object such that we can be
+ // sure that it doesn't impact which objects we can read.
+ require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
+
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &blobInfo, info)
+ })
+
+ t.Run("flush with multiple requests", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ for i := 0; i < 10; i++ {
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ }
+ require.NoError(t, queue.Flush())
+
+ for i := 0; i < 10; i++ {
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &blobInfo, info)
+ }
+ })
+
+ t.Run("flush without request", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.Flush())
+
+ _, err = queue.ReadInfo()
+ require.Equal(t, errors.New("no outstanding request"), err)
+ })
+
t.Run("request invalid object info", func(t *testing.T) {
reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
@@ -263,6 +319,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
_, err = queue.ReadInfo()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
@@ -277,12 +334,15 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
+
_, err = queue.ReadInfo()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object info after the previous one has failed should continue
// to work alright.
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
@@ -319,6 +379,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.False(t, queue.isDirty())
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
@@ -356,6 +417,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
// Request the object before we close the queue.
require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.Flush())
queue.close()
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 540e671de..9d12291cc 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -32,12 +32,16 @@ type ObjectReader interface {
// ObjectQueue allows for requesting and reading objects independently of each other. The number of
// RequestObject and ReadObject calls must match. ReadObject must be executed after the object has
// been requested already. The order of objects returned by ReadObject is the same as the order in
-// which objects have been requested.
+// which objects have been requested. Users of this interface must call `Flush()` after all requests
+// have been queued up such that all requested objects will be readable.
type ObjectQueue interface {
// RequestRevision requests the given revision from git-cat-file(1).
RequestRevision(git.Revision) error
// ReadObject reads an object which has previously been requested.
ReadObject() (*Object, error)
+ // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
+ // have been requested up to this point.
+ Flush() error
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
@@ -64,6 +68,7 @@ func newObjectReader(
Name: "cat-file",
Flags: []git.Option{
git.Flag{Name: "--batch"},
+ git.Flag{Name: "--buffer"},
},
},
git.WithStdin(command.SetupStdin),
@@ -78,7 +83,7 @@ func newObjectReader(
queue: requestQueue{
isObjectQueue: true,
stdout: bufio.NewReader(batchCmd),
- stdin: batchCmd,
+ stdin: bufio.NewWriter(batchCmd),
},
}
go func() {
@@ -128,6 +133,10 @@ func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Obje
return nil, err
}
+ if err := queue.Flush(); err != nil {
+ return nil, err
+ }
+
object, err := queue.ReadObject()
if err != nil {
return nil, err
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index fa525bdba..78738f988 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -139,6 +139,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -161,6 +162,7 @@ func TestObjectReader_queue(t *testing.T) {
barfooBlob: "barfoo",
} {
require.NoError(t, queue.RequestRevision(blobID.Revision()))
+ require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -181,6 +183,7 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.RequestRevision(barfooBlob.Revision()))
+ require.NoError(t, queue.Flush())
for _, expectedContents := range []string{"foobar", "barfoo"} {
object, err := queue.ReadObject()
@@ -204,6 +207,65 @@ func TestObjectReader_queue(t *testing.T) {
require.Equal(t, errors.New("no outstanding request"), err)
})
+ t.Run("flush with single request", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ // We flush once before and once after requesting the object such that we can be
+ // sure that it doesn't impact which objects we can read.
+ require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
+
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, "foobar", string(contents))
+ })
+
+ t.Run("flush with multiple requests", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ for i := 0; i < 10; i++ {
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ }
+ require.NoError(t, queue.Flush())
+
+ for i := 0; i < 10; i++ {
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, "foobar", string(contents))
+ }
+ })
+
+ t.Run("flush without request", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.Flush())
+
+ _, err = queue.ReadObject()
+ require.Equal(t, errors.New("no outstanding request"), err)
+ })
+
t.Run("request invalid object", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
@@ -213,6 +275,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
_, err = queue.ReadObject()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
@@ -227,12 +290,15 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision("does-not-exist"))
+ require.NoError(t, queue.Flush())
+
_, err = queue.ReadObject()
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object after the previous one has failed should continue to
// work alright.
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -272,6 +338,7 @@ func TestObjectReader_queue(t *testing.T) {
require.False(t, queue.isDirty())
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
@@ -316,6 +383,7 @@ func TestObjectReader_queue(t *testing.T) {
// Request the object before we close the queue.
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
queue.close()
@@ -335,6 +403,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.Flush())
// Read the object header before closing.
object, err := queue.ReadObject()
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index 21f7508c1..2b43f8a3c 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -11,13 +11,23 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
)
+const (
+ // flushCommand is the command we send to git-cat-file(1) to cause it to flush its stdout.
+ // Note that this is a hack: git-cat-file(1) doesn't really support flushing, but it will
+ // flush whenever it encounters an object it doesn't know. The flush command we use is thus
+ // chosen such that it cannot ever refer to a valid object: refs may not contain whitespace,
+ // so this command cannot refer to a ref. Adding "FLUSH" is just for the sake of making it
+ // easier to spot what's going on in case we ever mistakenly see this output in the wild.
+ flushCommand = "\tFLUSH\t"
+)
+
type requestQueue struct {
// isObjectQueue is set to `true` when this is a request queue which can be used for reading
// objects. If set to `false`, then this can only be used to read object info.
isObjectQueue bool
stdout *bufio.Reader
- stdin io.Writer
+ stdin *bufio.Writer
// outstandingRequests is the number of requests which have been queued up. Gets incremented
// on request, and decremented when starting to read an object (not when that object has
@@ -76,9 +86,34 @@ func (q *requestQueue) RequestRevision(revision git.Revision) error {
atomic.AddInt64(&q.outstandingRequests, 1)
- if _, err := fmt.Fprintln(q.stdin, revision.String()); err != nil {
+ if _, err := q.stdin.WriteString(revision.String()); err != nil {
+ atomic.AddInt64(&q.outstandingRequests, -1)
+ return fmt.Errorf("writing object request: %w", err)
+ }
+
+ if err := q.stdin.WriteByte('\n'); err != nil {
atomic.AddInt64(&q.outstandingRequests, -1)
- return fmt.Errorf("requesting revision: %w", err)
+ return fmt.Errorf("terminating object request: %w", err)
+ }
+
+ return nil
+}
+
+func (q *requestQueue) Flush() error {
+ if q.isClosed() {
+ return fmt.Errorf("cannot flush: %w", os.ErrClosed)
+ }
+
+ if _, err := q.stdin.WriteString(flushCommand); err != nil {
+ return fmt.Errorf("writing flush command: %w", err)
+ }
+
+ if err := q.stdin.WriteByte('\n'); err != nil {
+ return fmt.Errorf("terminating flush command: %w", err)
+ }
+
+ if err := q.stdin.Flush(); err != nil {
+ return fmt.Errorf("flushing: %w", err)
}
return nil
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index e87628f38..b756e08f3 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -67,10 +67,11 @@ func CatfileInfo(
}
defer cleanup()
- requestChan := make(chan catfileInfoRequest)
+ requestChan := make(chan catfileInfoRequest, 32)
go func() {
defer close(requestChan)
+ var i int64
for it.Next() {
if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
select {
@@ -88,6 +89,17 @@ func CatfileInfo(
case <-ctx.Done():
return
}
+
+ i++
+ if i%int64(cap(requestChan)) == 0 {
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileInfoRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
}
if err := it.Err(); err != nil {
@@ -97,6 +109,14 @@ func CatfileInfo(
return
}
}
+
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileInfoRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
}()
resultChan := make(chan CatfileInfoResult)
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 5ec2e9fb5..49e33b859 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -44,10 +44,11 @@ func CatfileObject(
}
defer cleanup()
- requestChan := make(chan catfileObjectRequest)
+ requestChan := make(chan catfileObjectRequest, 32)
go func() {
defer close(requestChan)
+ var i int64
for it.Next() {
if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
select {
@@ -62,6 +63,17 @@ func CatfileObject(
case <-ctx.Done():
return
}
+
+ i++
+ if i%int64(cap(requestChan)) == 0 {
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
}
if err := it.Err(); err != nil {
@@ -71,6 +83,14 @@ func CatfileObject(
return
}
}
+
+ if err := queue.Flush(); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
}()
resultChan := make(chan CatfileObjectResult)