Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-09-18 17:40:14 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-09-18 17:40:14 +0300
commit4caa35347f5e8f198bff8127713fc6abeb909687 (patch)
treef132ef6b9adab9cc8517a867b164d9286e65946d
parent8ab5bd595984678838f3f09a96798b149e68a939 (diff)
wip prevent accidental catfile usejv-catfile-reuse-token
-rw-r--r--internal/git/catfile/batch.go31
-rw-r--r--internal/git/catfile/batch_cache.go8
-rw-r--r--internal/git/catfile/batch_cache_test.go4
-rw-r--r--internal/git/catfile/catfile.go103
4 files changed, 106 insertions, 40 deletions
diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go
index d37c9c718..f24ff1dfd 100644
--- a/internal/git/catfile/batch.go
+++ b/internal/git/catfile/batch.go
@@ -28,6 +28,8 @@ type batchProcess struct {
// instead of doing unsafe memory writes (to n) and failing in some
// unpredictable way.
sync.Mutex
+
+ token int
}
func newBatchProcess(ctx context.Context, repoPath string, env []string) (*batchProcess, error) {
@@ -62,7 +64,23 @@ func newBatchProcess(ctx context.Context, repoPath string, env []string) (*batch
return b, nil
}
-func (b *batchProcess) reader(revspec string, expectedType string) (io.Reader, error) {
+func (b *batchProcess) setToken(token int) {
+ b.Lock()
+ b.token = token
+ b.Unlock()
+}
+
+func (b *batchProcess) lockWithToken(token int) (func(), error) {
+ b.Lock()
+ if b.token != token {
+ b.Unlock()
+ return nil, errBatchTokenExpired
+ }
+
+ return b.Unlock, nil
+}
+
+func (b *batchProcess) reader(currentToken int, revspec string, expectedType string) (io.Reader, error) {
b.Lock()
defer b.Unlock()
@@ -103,6 +121,7 @@ func (b *batchProcess) reader(revspec string, expectedType string) (io.Reader, e
return &batchReader{
batchProcess: b,
r: io.LimitReader(b.r, oi.Size),
+ token: currentToken,
}, nil
}
@@ -122,12 +141,16 @@ func (b *batchProcess) hasUnreadData() bool {
type batchReader struct {
*batchProcess
- r io.Reader
+ r io.Reader
+ token int
}
func (br *batchReader) Read(p []byte) (int, error) {
- br.batchProcess.Lock()
- defer br.batchProcess.Unlock()
+ unlock, err := br.batchProcess.lockWithToken(br.token)
+ if err != nil {
+ return 0, err
+ }
+ defer unlock()
n, err := br.r.Read(p)
br.batchProcess.consume(n)
diff --git a/internal/git/catfile/batch_cache.go b/internal/git/catfile/batch_cache.go
index 7d08cc598..464fd79be 100644
--- a/internal/git/catfile/batch_cache.go
+++ b/internal/git/catfile/batch_cache.go
@@ -58,7 +58,7 @@ type key struct {
type entry struct {
key
- value *Batch
+ value *batchCore
expiry time.Time
}
@@ -105,7 +105,7 @@ func (bc *batchCache) monitor(refreshInterval time.Duration) {
// Add adds a key, value pair to bc. If there are too many keys in bc
// already Add will evict old keys until the length is OK again.
-func (bc *batchCache) Add(k key, b *Batch) {
+func (bc *batchCache) Add(k key, core *batchCore) {
bc.Lock()
defer bc.Unlock()
@@ -114,7 +114,7 @@ func (bc *batchCache) Add(k key, b *Batch) {
bc.delete(i, true)
}
- ent := &entry{key: k, value: b, expiry: time.Now().Add(bc.ttl)}
+ ent := &entry{key: k, value: core, expiry: time.Now().Add(bc.ttl)}
bc.entries = append(bc.entries, ent)
for bc.len() > bc.maxLen {
@@ -129,7 +129,7 @@ func (bc *batchCache) evictHead() { bc.delete(0, true) }
func (bc *batchCache) len() int { return len(bc.entries) }
// Checkout removes a value from bc. After use the caller can re-add the value with bc.Add.
-func (bc *batchCache) Checkout(k key) (*Batch, bool) {
+func (bc *batchCache) Checkout(k key) (*batchCore, bool) {
bc.Lock()
defer bc.Unlock()
diff --git a/internal/git/catfile/batch_cache_test.go b/internal/git/catfile/batch_cache_test.go
index 510b91a7a..bddcec446 100644
--- a/internal/git/catfile/batch_cache_test.go
+++ b/internal/git/catfile/batch_cache_test.go
@@ -119,7 +119,7 @@ func TestCacheEnforceTTL(t *testing.T) {
requireCacheValid(t, bc)
- for i, v := range []*Batch{value0, value1} {
+ for i, v := range []*batchCore{value0, value1} {
require.True(t, v.isClosed(), "value %d %v should be closed", i, v)
}
@@ -168,7 +168,7 @@ func requireCacheValid(t *testing.T, bc *batchCache) {
}
}
-func testValue() *Batch { return &Batch{} }
+func testValue() *batchCore { return &batchCore{} }
func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} }
diff --git a/internal/git/catfile/catfile.go b/internal/git/catfile/catfile.go
index 897282836..c7fc036b8 100644
--- a/internal/git/catfile/catfile.go
+++ b/internal/git/catfile/catfile.go
@@ -2,6 +2,7 @@ package catfile
import (
"context"
+ "errors"
"io"
"sync"
@@ -63,16 +64,53 @@ func init() {
// A Batch instance can only serve single request at a time. If you want to
// use it across multiple goroutines you need to add your own locking.
type Batch struct {
+ *batchCore
+ token int
+}
+
+type batchCore struct {
sync.Mutex
*batchCheck
*batchProcess
cancel func()
closed bool
+ token int
+}
+
+func (core *batchCore) lockWithToken(token int) (func(), error) {
+ core.Lock()
+ if core.token != token {
+ core.Unlock()
+ return nil, errBatchTokenExpired
+ }
+
+ return core.Unlock, nil
+}
+
+var errBatchTokenExpired = errors.New("batch token expired")
+
+func (core *batchCore) rotateToken() {
+ core.Lock()
+ core.token++
+ core.batchProcess.setToken(core.token)
+ core.Unlock()
+}
+
+func (core *batchCore) asBatch() *Batch {
+ core.Lock()
+ defer core.Unlock()
+ return &Batch{batchCore: core, token: core.token}
}
// Info returns an ObjectInfo if spec exists. If spec does not exist the
// error is of type NotFoundError.
func (c *Batch) Info(revspec string) (*ObjectInfo, error) {
+ unlock, err := c.batchCore.lockWithToken(c.token)
+ if err != nil {
+ return nil, err
+ }
+ defer unlock()
+
catfileLookupCounter.WithLabelValues("info").Inc()
return c.batchCheck.info(revspec)
}
@@ -83,7 +121,7 @@ func (c *Batch) Info(revspec string) (*ObjectInfo, error) {
// making another call on C.
func (c *Batch) Tree(revspec string) (io.Reader, error) {
catfileLookupCounter.WithLabelValues("tree").Inc()
- return c.batchProcess.reader(revspec, "tree")
+ return c.batchProcess.reader(c.token, revspec, "tree")
}
// Commit returns a raw commit object. It is an error if revspec does not
@@ -92,7 +130,7 @@ func (c *Batch) Tree(revspec string) (io.Reader, error) {
// making another call on C.
func (c *Batch) Commit(revspec string) (io.Reader, error) {
catfileLookupCounter.WithLabelValues("commit").Inc()
- return c.batchProcess.reader(revspec, "commit")
+ return c.batchProcess.reader(c.token, revspec, "commit")
}
// Blob returns a reader for the requested blob. The entire blob must be
@@ -102,38 +140,38 @@ func (c *Batch) Commit(revspec string) (io.Reader, error) {
// first use Info to resolve the revspec and check the object type.
func (c *Batch) Blob(revspec string) (io.Reader, error) {
catfileLookupCounter.WithLabelValues("blob").Inc()
- return c.batchProcess.reader(revspec, "blob")
+ return c.batchProcess.reader(c.token, revspec, "blob")
}
// Tag returns a raw tag object. Caller must consume the Reader before
// making another call on C.
func (c *Batch) Tag(revspec string) (io.Reader, error) {
catfileLookupCounter.WithLabelValues("tag").Inc()
- return c.batchProcess.reader(revspec, "tag")
+ return c.batchProcess.reader(c.token, revspec, "tag")
}
// Close closes the writers for batchCheck and batch. This is only used for
// cached Batches
-func (c *Batch) Close() {
- c.Lock()
- defer c.Unlock()
+func (core *batchCore) Close() {
+ core.Lock()
+ defer core.Unlock()
- if c.closed {
+ if core.closed {
return
}
- c.closed = true
- if c.cancel != nil {
+ core.closed = true
+ if core.cancel != nil {
// both c.batch and c.batchCheck have goroutines that listen on <ctx.Done()
// when this is cancelled, it will cause those goroutines to close both writers
- c.cancel()
+ core.cancel()
}
}
-func (c *Batch) isClosed() bool {
- c.Lock()
- defer c.Unlock()
- return c.closed
+func (core *batchCore) isClosed() bool {
+ core.Lock()
+ defer core.Unlock()
+ return core.closed
}
// New returns a new Batch instance. It is important that ctx gets canceled
@@ -151,7 +189,8 @@ func New(ctx context.Context, repo *gitalypb.Repository) (*Batch, error) {
sessionID := metadata.GetValue(ctx, SessionIDField)
if sessionID == "" {
- return newBatch(ctx, repoPath, env)
+ core, err := newBatchCore(ctx, repoPath, env)
+ return core.asBatch(), err
}
cacheKey := newCacheKey(sessionID, repo)
@@ -159,37 +198,39 @@ func New(ctx context.Context, repo *gitalypb.Repository) (*Batch, error) {
if c, ok := cache.Checkout(cacheKey); ok {
go returnWhenDone(requestDone, cache, cacheKey, c)
- return c, nil
+ return c.asBatch(), nil
}
// if we are using caching, create a fresh context for the new batch
// and initialize the new batch with a cache key and cancel function
cacheCtx, cacheCancel := context.WithCancel(context.Background())
- c, err := newBatch(cacheCtx, repoPath, env)
+ core, err := newBatchCore(cacheCtx, repoPath, env)
if err != nil {
return nil, err
}
- c.cancel = cacheCancel
- go returnWhenDone(requestDone, cache, cacheKey, c)
+ core.cancel = cacheCancel
+ go returnWhenDone(requestDone, cache, cacheKey, core)
- return c, nil
+ return core.asBatch(), nil
}
-func returnWhenDone(done <-chan struct{}, bc *batchCache, cacheKey key, c *Batch) {
+func returnWhenDone(done <-chan struct{}, bc *batchCache, cacheKey key, core *batchCore) {
<-done
- if c == nil || c.isClosed() {
+ core.rotateToken()
+
+ if core.isClosed() {
return
}
- if c.hasUnreadData() {
+ if core.hasUnreadData() {
catfileCacheCounter.WithLabelValues("dirty").Inc()
- c.Close()
+ core.Close()
return
}
- bc.Add(cacheKey, c)
+ bc.Add(cacheKey, core)
}
var injectSpawnErrors = false
@@ -198,7 +239,7 @@ type simulatedBatchSpawnError struct{}
func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" }
-func newBatch(_ctx context.Context, repoPath string, env []string) (_ *Batch, err error) {
+func newBatchCore(_ctx context.Context, repoPath string, env []string) (_ *batchCore, err error) {
ctx, cancel := context.WithCancel(_ctx)
defer func() {
if err != nil {
@@ -206,15 +247,17 @@ func newBatch(_ctx context.Context, repoPath string, env []string) (_ *Batch, er
}
}()
- batch, err := newBatchProcess(ctx, repoPath, env)
+ core := &batchCore{}
+
+ core.batchProcess, err = newBatchProcess(ctx, repoPath, env)
if err != nil {
return nil, err
}
- batchCheck, err := newBatchCheck(ctx, repoPath, env)
+ core.batchCheck, err = newBatchCheck(ctx, repoPath, env)
if err != nil {
return nil, err
}
- return &Batch{batchProcess: batch, batchCheck: batchCheck}, nil
+ return core, nil
}