diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-15 18:41:41 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-15 18:41:41 +0300 |
commit | 5997ef7cc293c30d219de2dafa763a9c60b0bc43 (patch) | |
tree | f34a1076ef8e85c2f02cf624369f22c03ec38f1a | |
parent | 51b78efbabefe3abd586a7d5fee12a52905a4af0 (diff) | |
parent | 486b2310e4d86ffb5ead677e65fb9690b5d05bbc (diff) |
Merge branch 'pks-catfile-bufio' into 'master'
catfile: Convert to use buffered I/O
Closes gitlab#340659
See merge request gitlab-org/gitaly!4070
-rw-r--r-- | internal/git/catfile/object_info_reader.go | 22 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader_test.go | 62 | ||||
-rw-r--r-- | internal/git/catfile/object_reader.go | 13 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 69 | ||||
-rw-r--r-- | internal/git/catfile/request_queue.go | 41 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 22 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 22 |
7 files changed, 242 insertions, 9 deletions
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index d26c77b1d..300459e81 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -52,6 +52,7 @@ func IsNotFound(err error) bool { // ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) { +restart: infoLine, err := stdout.ReadString('\n') if err != nil { return nil, fmt.Errorf("read info line: %w", err) @@ -59,6 +60,14 @@ func ParseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) { infoLine = strings.TrimSuffix(infoLine, "\n") if strings.HasSuffix(infoLine, " missing") { + // We use a hack to flush stdout of git-cat-file(1), which is that we request an + // object that cannot exist. This causes Git to write an error and immediately flush + // stdout. The only downside is that we need to filter this error here, but that's + // acceptable while git-cat-file(1) doesn't yet have any way to natively flush. + if strings.HasPrefix(infoLine, flushCommand) { + goto restart + } + return nil, NotFoundError{fmt.Errorf("object not found")} } @@ -101,12 +110,16 @@ type ObjectInfoReader interface { // ObjectInfoQueue allows for requesting and reading object info independently of each other. The // number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the // object has been requested already. The order of objects returned by ReadInfo is the same as the -// order in which object info has been requested. +// order in which object info has been requested. Users of this interface must call `Flush()` after +// all requests have been queued up such that all requested objects will be readable. type ObjectInfoQueue interface { // RequestRevision requests the given revision from git-cat-file(1). RequestRevision(git.Revision) error // ReadInfo reads object info which has previously been requested. ReadInfo() (*ObjectInfo, error) + // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which + // have been requested up to this point. + Flush() error } // objectInfoReader is a reader for Git object information. This reader is implemented via a @@ -133,6 +146,7 @@ func newObjectInfoReader( Name: "cat-file", Flags: []git.Option{ git.Flag{Name: "--batch-check"}, + git.Flag{Name: "--buffer"}, }, }, git.WithStdin(command.SetupStdin), @@ -146,7 +160,7 @@ func newObjectInfoReader( counter: counter, queue: requestQueue{ stdout: bufio.NewReader(batchCmd), - stdin: batchCmd, + stdin: bufio.NewWriter(batchCmd), }, } go func() { @@ -197,6 +211,10 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob return nil, err } + if err := queue.Flush(); err != nil { + return nil, err + } + objectInfo, err := queue.ReadInfo() if err != nil { return nil, err diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go index 0750ae446..8bd089393 100644 --- a/internal/git/catfile/object_info_reader_test.go +++ b/internal/git/catfile/object_info_reader_test.go @@ -198,6 +198,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) @@ -217,6 +218,7 @@ func TestObjectInfoReader_queue(t *testing.T) { commitOID: commitInfo, } { require.NoError(t, queue.RequestRevision(oid.Revision())) + require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) @@ -234,6 +236,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.RequestRevision(commitOID.Revision())) + require.NoError(t, queue.Flush()) for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} { info, err := queue.ReadInfo() @@ -254,6 +257,59 @@ func TestObjectInfoReader_queue(t *testing.T) { require.Equal(t, errors.New("no outstanding request"), err) }) + t.Run("flush with single request", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + // We flush once before and once after requesting the object such that we can be + // sure that it doesn't impact which objects we can read. + require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) + + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &blobInfo, info) + }) + + t.Run("flush with multiple requests", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + for i := 0; i < 10; i++ { + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + } + require.NoError(t, queue.Flush()) + + for i := 0; i < 10; i++ { + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &blobInfo, info) + } + }) + + t.Run("flush without request", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.Flush()) + + _, err = queue.ReadInfo() + require.Equal(t, errors.New("no outstanding request"), err) + }) + t.Run("request invalid object info", func(t *testing.T) { reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) @@ -263,6 +319,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) _, err = queue.ReadInfo() require.Equal(t, NotFoundError{errors.New("object not found")}, err) @@ -277,12 +334,15 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) + _, err = queue.ReadInfo() require.Equal(t, NotFoundError{errors.New("object not found")}, err) // Requesting another object info after the previous one has failed should continue // to work alright. require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) require.Equal(t, &blobInfo, info) @@ -319,6 +379,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.False(t, queue.isDirty()) require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) require.True(t, reader.isDirty()) require.True(t, queue.isDirty()) @@ -356,6 +417,7 @@ func TestObjectInfoReader_queue(t *testing.T) { // Request the object before we close the queue. require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.Flush()) queue.close() diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 540e671de..9d12291cc 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -32,12 +32,16 @@ type ObjectReader interface { // ObjectQueue allows for requesting and reading objects independently of each other. The number of // RequestObject and ReadObject calls must match. ReadObject must be executed after the object has // been requested already. The order of objects returned by ReadObject is the same as the order in -// which objects have been requested. +// which objects have been requested. Users of this interface must call `Flush()` after all requests +// have been queued up such that all requested objects will be readable. type ObjectQueue interface { // RequestRevision requests the given revision from git-cat-file(1). RequestRevision(git.Revision) error // ReadObject reads an object which has previously been requested. ReadObject() (*Object, error) + // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which + // have been requested up to this point. + Flush() error } // objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file @@ -64,6 +68,7 @@ func newObjectReader( Name: "cat-file", Flags: []git.Option{ git.Flag{Name: "--batch"}, + git.Flag{Name: "--buffer"}, }, }, git.WithStdin(command.SetupStdin), @@ -78,7 +83,7 @@ func newObjectReader( queue: requestQueue{ isObjectQueue: true, stdout: bufio.NewReader(batchCmd), - stdin: batchCmd, + stdin: bufio.NewWriter(batchCmd), }, } go func() { @@ -128,6 +133,10 @@ func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Obje return nil, err } + if err := queue.Flush(); err != nil { + return nil, err + } + object, err := queue.ReadObject() if err != nil { return nil, err diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index fa525bdba..78738f988 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -139,6 +139,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -161,6 +162,7 @@ func TestObjectReader_queue(t *testing.T) { barfooBlob: "barfoo", } { require.NoError(t, queue.RequestRevision(blobID.Revision())) + require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -181,6 +183,7 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.RequestRevision(barfooBlob.Revision())) + require.NoError(t, queue.Flush()) for _, expectedContents := range []string{"foobar", "barfoo"} { object, err := queue.ReadObject() @@ -204,6 +207,65 @@ func TestObjectReader_queue(t *testing.T) { require.Equal(t, errors.New("no outstanding request"), err) }) + t.Run("flush with single request", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + // We flush once before and once after requesting the object such that we can be + // sure that it doesn't impact which objects we can read. + require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) + + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "foobar", string(contents)) + }) + + t.Run("flush with multiple requests", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + for i := 0; i < 10; i++ { + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + } + require.NoError(t, queue.Flush()) + + for i := 0; i < 10; i++ { + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "foobar", string(contents)) + } + }) + + t.Run("flush without request", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.Flush()) + + _, err = queue.ReadObject() + require.Equal(t, errors.New("no outstanding request"), err) + }) + t.Run("request invalid object", func(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) @@ -213,6 +275,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) _, err = queue.ReadObject() require.Equal(t, NotFoundError{errors.New("object not found")}, err) @@ -227,12 +290,15 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision("does-not-exist")) + require.NoError(t, queue.Flush()) + _, err = queue.ReadObject() require.Equal(t, NotFoundError{errors.New("object not found")}, err) // Requesting another object after the previous one has failed should continue to // work alright. require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -272,6 +338,7 @@ func TestObjectReader_queue(t *testing.T) { require.False(t, queue.isDirty()) require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) require.True(t, reader.isDirty()) require.True(t, queue.isDirty()) @@ -316,6 +383,7 @@ func TestObjectReader_queue(t *testing.T) { // Request the object before we close the queue. require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) queue.close() @@ -335,6 +403,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.Flush()) // Read the object header before closing. object, err := queue.ReadObject() diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index 21f7508c1..2b43f8a3c 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -11,13 +11,23 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" ) +const ( + // flushCommand is the command we send to git-cat-file(1) to cause it to flush its stdout. + // Note that this is a hack: git-cat-file(1) doesn't really support flushing, but it will + // flush whenever it encounters an object it doesn't know. The flush command we use is thus + // chosen such that it cannot ever refer to a valid object: refs may not contain whitespace, + // so this command cannot refer to a ref. Adding "FLUSH" is just for the sake of making it + // easier to spot what's going on in case we ever mistakenly see this output in the wild. + flushCommand = "\tFLUSH\t" +) + type requestQueue struct { // isObjectQueue is set to `true` when this is a request queue which can be used for reading // objects. If set to `false`, then this can only be used to read object info. isObjectQueue bool stdout *bufio.Reader - stdin io.Writer + stdin *bufio.Writer // outstandingRequests is the number of requests which have been queued up. Gets incremented // on request, and decremented when starting to read an object (not when that object has @@ -76,9 +86,34 @@ func (q *requestQueue) RequestRevision(revision git.Revision) error { atomic.AddInt64(&q.outstandingRequests, 1) - if _, err := fmt.Fprintln(q.stdin, revision.String()); err != nil { + if _, err := q.stdin.WriteString(revision.String()); err != nil { + atomic.AddInt64(&q.outstandingRequests, -1) + return fmt.Errorf("writing object request: %w", err) + } + + if err := q.stdin.WriteByte('\n'); err != nil { atomic.AddInt64(&q.outstandingRequests, -1) - return fmt.Errorf("requesting revision: %w", err) + return fmt.Errorf("terminating object request: %w", err) + } + + return nil +} + +func (q *requestQueue) Flush() error { + if q.isClosed() { + return fmt.Errorf("cannot flush: %w", os.ErrClosed) + } + + if _, err := q.stdin.WriteString(flushCommand); err != nil { + return fmt.Errorf("writing flush command: %w", err) + } + + if err := q.stdin.WriteByte('\n'); err != nil { + return fmt.Errorf("terminating flush command: %w", err) + } + + if err := q.stdin.Flush(); err != nil { + return fmt.Errorf("flushing: %w", err) } return nil diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index e87628f38..b756e08f3 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -67,10 +67,11 @@ func CatfileInfo( } defer cleanup() - requestChan := make(chan catfileInfoRequest) + requestChan := make(chan catfileInfoRequest, 32) go func() { defer close(requestChan) + var i int64 for it.Next() { if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { select { @@ -88,6 +89,17 @@ func CatfileInfo( case <-ctx.Done(): return } + + i++ + if i%int64(cap(requestChan)) == 0 { + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileInfoRequest{err: err}: + case <-ctx.Done(): + return + } + } + } } if err := it.Err(); err != nil { @@ -97,6 +109,14 @@ func CatfileInfo( return } } + + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileInfoRequest{err: err}: + case <-ctx.Done(): + return + } + } }() resultChan := make(chan CatfileInfoResult) diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 5ec2e9fb5..49e33b859 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -44,10 +44,11 @@ func CatfileObject( } defer cleanup() - requestChan := make(chan catfileObjectRequest) + requestChan := make(chan catfileObjectRequest, 32) go func() { defer close(requestChan) + var i int64 for it.Next() { if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { select { @@ -62,6 +63,17 @@ func CatfileObject( case <-ctx.Done(): return } + + i++ + if i%int64(cap(requestChan)) == 0 { + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } + } } if err := it.Err(); err != nil { @@ -71,6 +83,14 @@ func CatfileObject( return } } + + if err := queue.Flush(); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } }() resultChan := make(chan CatfileObjectResult) |