diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-05 13:09:05 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-08 17:48:45 +0300 |
commit | 5a9107ff8f9066680e90f87b15b2445883115627 (patch) | |
tree | 862445340c8e47881deb3198206fd6e7953ad5ac | |
parent | c469c87034f11689d47f026c464144d92a6a786a (diff) |
catfile: Move remaining bytes into object
The catfile object reader needs to track the number of bytes remaining
to be read such that we know when it's safe to start reading another
object, and when the catfile reader is dirty and thus cannot be returned
to the cache. These bytes are currently stored in the reader itself, and
as a result the object reader has to reach into the parent reader to
adjust bytes in there. This violates the separation of concrens, and
furthermore it results in locking issues when one wants to refactor the
reader.
Fix the design issue by instead moving the remaning byte count into the
object itself. The parent reader instead keeps track of the current
object and asks it for how many bytes are left to be read.
No change in behaviour is expected from this change, but it only serves
to disentangle locking for the upcoming object reader queue.
-rw-r--r-- | internal/git/catfile/object_reader.go | 81 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 5 |
2 files changed, 49 insertions, 37 deletions
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 98654f393..2265be0d6 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -7,6 +7,7 @@ import ( "io" "os" "sync" + "sync/atomic" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -30,13 +31,6 @@ type objectReader struct { cmd *command.Command stdout *bufio.Reader - // n is a state machine that tracks how much data we still have to read - // from r. Legal states are: n==0, this means we can do a new request on - // the cat-file process. n==1, this means that we have to discard a - // trailing newline. n>0, this means we are in the middle of reading a - // raw git object. - n int64 - // Even though the batch type should not be used concurrently, I think // that if that does happen by mistake we should give proper errors // instead of doing unsafe memory writes (to n) and failing in some @@ -46,6 +40,9 @@ type objectReader struct { closed bool counter *prometheus.CounterVec + + // currentObject tracks the object that is currently being read. + currentObject *Object } func newObjectReader( @@ -97,18 +94,15 @@ func (o *objectReader) isClosed() bool { return o.closed } -func (o *objectReader) consume(nBytes int64) { - o.n -= nBytes - if o.n < 1 { - panic("too many bytes read from batch") - } -} - func (o *objectReader) isDirty() bool { o.Lock() defer o.Unlock() - return o.n > 1 + if o.currentObject != nil { + return o.currentObject.isDirty() + } + + return false } func (o *objectReader) Object( @@ -121,16 +115,19 @@ func (o *objectReader) Object( o.Lock() defer o.Unlock() - if o.n == 1 { - // Consume linefeed + if o.currentObject != nil { + // If the current object is still dirty, then we must not try to read a new object. + if o.currentObject.isDirty() { + return nil, fmt.Errorf("current object has not been fully read") + } + + o.currentObject = nil + + // If we have already read an object before, then we must consume the trailing + // newline after the object's data. if _, err := o.stdout.ReadByte(); err != nil { return nil, err } - o.n-- - } - - if o.n != 0 { - return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n) } if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil { @@ -143,16 +140,17 @@ func (o *objectReader) Object( } trace.recordRequest(oi.Type) - o.n = oi.Size + 1 - - return &Object{ + o.currentObject = &Object{ ObjectInfo: *oi, parent: o, dataReader: io.LimitedReader{ R: o.stdout, N: oi.Size, }, - }, nil + bytesRemaining: oi.Size, + } + + return o.currentObject, nil } // Object represents data returned by `git cat-file --batch` @@ -161,20 +159,35 @@ type Object struct { ObjectInfo // parent is the objectReader which has created the Object. parent *objectReader + // dataReader is reader which has all the object data. dataReader io.LimitedReader + + // bytesLeft tracks the number of bytes which are left to be read. While this duplicates the + // information tracked in dataReader.N, this cannot be helped given that we need to make + // access to this information atomic so there's no race between updating it and checking the + // process for dirtiness. While we could use locking instead of atomics, we'd have to lock + // during the whole read duration -- and thus it'd become impossible to check for dirtiness + // at the same time. + bytesRemaining int64 } -func (o *Object) Read(p []byte) (int, error) { - o.parent.Lock() - defer o.parent.Unlock() +// isDirty determines whether the object is still dirty, that is whether there are still unconsumed +// bytes. +func (o *Object) isDirty() bool { + return atomic.LoadInt64(&o.bytesRemaining) != 0 +} +func (o *Object) Read(p []byte) (int, error) { if o.parent.closed { return 0, os.ErrClosed } n, err := o.dataReader.Read(p) - o.parent.consume(int64(n)) + if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 { + return n, fmt.Errorf("bytes remaining became negative while reading object") + } + return n, err } @@ -182,9 +195,6 @@ func (o *Object) Read(p []byte) (int, error) { // via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are // implemented by the respective reader or writer. func (o *Object) WriteTo(w io.Writer) (int64, error) { - o.parent.Lock() - defer o.parent.Unlock() - if o.parent.closed { return 0, os.ErrClosed } @@ -192,6 +202,9 @@ func (o *Object) WriteTo(w io.Writer) (int64, error) { // While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of // `ReadFrom()` in case the writer implements it. n, err := io.Copy(w, &o.dataReader) - o.parent.consume(n) + if atomic.AddInt64(&o.bytesRemaining, -n) < 0 { + return n, fmt.Errorf("bytes remaining became negative while reading object") + } + return n, err } diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index aad733f78..59660ffce 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -1,7 +1,6 @@ package catfile import ( - "fmt" "io" "testing" @@ -76,7 +75,7 @@ func TestObjectReader_reader(t *testing.T) { // We haven't yet consumed the previous object, so this must now fail. _, err = reader.Object(ctx, commitID.Revision()) - require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1)) + require.EqualError(t, err, "current object has not been fully read") }) t.Run("read fails when partially consuming previous object", func(t *testing.T) { @@ -91,7 +90,7 @@ func TestObjectReader_reader(t *testing.T) { // We haven't yet consumed the previous object, so this must now fail. _, err = reader.Object(ctx, commitID.Revision()) - require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1)) + require.EqualError(t, err, "current object has not been fully read") }) t.Run("read increments Prometheus counter", func(t *testing.T) { |