Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-05 13:09:05 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-08 17:48:45 +0300
commit5a9107ff8f9066680e90f87b15b2445883115627 (patch)
tree862445340c8e47881deb3198206fd6e7953ad5ac
parentc469c87034f11689d47f026c464144d92a6a786a (diff)
catfile: Move remaining bytes into object
The catfile object reader needs to track the number of bytes remaining to be read such that we know when it's safe to start reading another object, and when the catfile reader is dirty and thus cannot be returned to the cache. These bytes are currently stored in the reader itself, and as a result the object reader has to reach into the parent reader to adjust bytes in there. This violates the separation of concrens, and furthermore it results in locking issues when one wants to refactor the reader. Fix the design issue by instead moving the remaning byte count into the object itself. The parent reader instead keeps track of the current object and asks it for how many bytes are left to be read. No change in behaviour is expected from this change, but it only serves to disentangle locking for the upcoming object reader queue.
-rw-r--r--internal/git/catfile/object_reader.go81
-rw-r--r--internal/git/catfile/object_reader_test.go5
2 files changed, 49 insertions, 37 deletions
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 98654f393..2265be0d6 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -7,6 +7,7 @@ import (
"io"
"os"
"sync"
+ "sync/atomic"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@@ -30,13 +31,6 @@ type objectReader struct {
cmd *command.Command
stdout *bufio.Reader
- // n is a state machine that tracks how much data we still have to read
- // from r. Legal states are: n==0, this means we can do a new request on
- // the cat-file process. n==1, this means that we have to discard a
- // trailing newline. n>0, this means we are in the middle of reading a
- // raw git object.
- n int64
-
// Even though the batch type should not be used concurrently, I think
// that if that does happen by mistake we should give proper errors
// instead of doing unsafe memory writes (to n) and failing in some
@@ -46,6 +40,9 @@ type objectReader struct {
closed bool
counter *prometheus.CounterVec
+
+ // currentObject tracks the object that is currently being read.
+ currentObject *Object
}
func newObjectReader(
@@ -97,18 +94,15 @@ func (o *objectReader) isClosed() bool {
return o.closed
}
-func (o *objectReader) consume(nBytes int64) {
- o.n -= nBytes
- if o.n < 1 {
- panic("too many bytes read from batch")
- }
-}
-
func (o *objectReader) isDirty() bool {
o.Lock()
defer o.Unlock()
- return o.n > 1
+ if o.currentObject != nil {
+ return o.currentObject.isDirty()
+ }
+
+ return false
}
func (o *objectReader) Object(
@@ -121,16 +115,19 @@ func (o *objectReader) Object(
o.Lock()
defer o.Unlock()
- if o.n == 1 {
- // Consume linefeed
+ if o.currentObject != nil {
+ // If the current object is still dirty, then we must not try to read a new object.
+ if o.currentObject.isDirty() {
+ return nil, fmt.Errorf("current object has not been fully read")
+ }
+
+ o.currentObject = nil
+
+ // If we have already read an object before, then we must consume the trailing
+ // newline after the object's data.
if _, err := o.stdout.ReadByte(); err != nil {
return nil, err
}
- o.n--
- }
-
- if o.n != 0 {
- return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n)
}
if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
@@ -143,16 +140,17 @@ func (o *objectReader) Object(
}
trace.recordRequest(oi.Type)
- o.n = oi.Size + 1
-
- return &Object{
+ o.currentObject = &Object{
ObjectInfo: *oi,
parent: o,
dataReader: io.LimitedReader{
R: o.stdout,
N: oi.Size,
},
- }, nil
+ bytesRemaining: oi.Size,
+ }
+
+ return o.currentObject, nil
}
// Object represents data returned by `git cat-file --batch`
@@ -161,20 +159,35 @@ type Object struct {
ObjectInfo
// parent is the objectReader which has created the Object.
parent *objectReader
+
// dataReader is reader which has all the object data.
dataReader io.LimitedReader
+
+ // bytesLeft tracks the number of bytes which are left to be read. While this duplicates the
+ // information tracked in dataReader.N, this cannot be helped given that we need to make
+ // access to this information atomic so there's no race between updating it and checking the
+ // process for dirtiness. While we could use locking instead of atomics, we'd have to lock
+ // during the whole read duration -- and thus it'd become impossible to check for dirtiness
+ // at the same time.
+ bytesRemaining int64
}
-func (o *Object) Read(p []byte) (int, error) {
- o.parent.Lock()
- defer o.parent.Unlock()
+// isDirty determines whether the object is still dirty, that is whether there are still unconsumed
+// bytes.
+func (o *Object) isDirty() bool {
+ return atomic.LoadInt64(&o.bytesRemaining) != 0
+}
+func (o *Object) Read(p []byte) (int, error) {
if o.parent.closed {
return 0, os.ErrClosed
}
n, err := o.dataReader.Read(p)
- o.parent.consume(int64(n))
+ if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 {
+ return n, fmt.Errorf("bytes remaining became negative while reading object")
+ }
+
return n, err
}
@@ -182,9 +195,6 @@ func (o *Object) Read(p []byte) (int, error) {
// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
// implemented by the respective reader or writer.
func (o *Object) WriteTo(w io.Writer) (int64, error) {
- o.parent.Lock()
- defer o.parent.Unlock()
-
if o.parent.closed {
return 0, os.ErrClosed
}
@@ -192,6 +202,9 @@ func (o *Object) WriteTo(w io.Writer) (int64, error) {
// While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of
// `ReadFrom()` in case the writer implements it.
n, err := io.Copy(w, &o.dataReader)
- o.parent.consume(n)
+ if atomic.AddInt64(&o.bytesRemaining, -n) < 0 {
+ return n, fmt.Errorf("bytes remaining became negative while reading object")
+ }
+
return n, err
}
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index aad733f78..59660ffce 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -1,7 +1,6 @@
package catfile
import (
- "fmt"
"io"
"testing"
@@ -76,7 +75,7 @@ func TestObjectReader_reader(t *testing.T) {
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.Object(ctx, commitID.Revision())
- require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1))
+ require.EqualError(t, err, "current object has not been fully read")
})
t.Run("read fails when partially consuming previous object", func(t *testing.T) {
@@ -91,7 +90,7 @@ func TestObjectReader_reader(t *testing.T) {
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.Object(ctx, commitID.Revision())
- require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1))
+ require.EqualError(t, err, "current object has not been fully read")
})
t.Run("read increments Prometheus counter", func(t *testing.T) {