diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-09-18 17:40:14 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-09-18 17:40:14 +0300 |
commit | 4caa35347f5e8f198bff8127713fc6abeb909687 (patch) | |
tree | f132ef6b9adab9cc8517a867b164d9286e65946d | |
parent | 8ab5bd595984678838f3f09a96798b149e68a939 (diff) |
wip prevent accidental catfile usejv-catfile-reuse-token
-rw-r--r-- | internal/git/catfile/batch.go | 31 | ||||
-rw-r--r-- | internal/git/catfile/batch_cache.go | 8 | ||||
-rw-r--r-- | internal/git/catfile/batch_cache_test.go | 4 | ||||
-rw-r--r-- | internal/git/catfile/catfile.go | 103 |
4 files changed, 106 insertions, 40 deletions
diff --git a/internal/git/catfile/batch.go b/internal/git/catfile/batch.go index d37c9c718..f24ff1dfd 100644 --- a/internal/git/catfile/batch.go +++ b/internal/git/catfile/batch.go @@ -28,6 +28,8 @@ type batchProcess struct { // instead of doing unsafe memory writes (to n) and failing in some // unpredictable way. sync.Mutex + + token int } func newBatchProcess(ctx context.Context, repoPath string, env []string) (*batchProcess, error) { @@ -62,7 +64,23 @@ func newBatchProcess(ctx context.Context, repoPath string, env []string) (*batch return b, nil } -func (b *batchProcess) reader(revspec string, expectedType string) (io.Reader, error) { +func (b *batchProcess) setToken(token int) { + b.Lock() + b.token = token + b.Unlock() +} + +func (b *batchProcess) lockWithToken(token int) (func(), error) { + b.Lock() + if b.token != token { + b.Unlock() + return nil, errBatchTokenExpired + } + + return b.Unlock, nil +} + +func (b *batchProcess) reader(currentToken int, revspec string, expectedType string) (io.Reader, error) { b.Lock() defer b.Unlock() @@ -103,6 +121,7 @@ func (b *batchProcess) reader(revspec string, expectedType string) (io.Reader, e return &batchReader{ batchProcess: b, r: io.LimitReader(b.r, oi.Size), + token: currentToken, }, nil } @@ -122,12 +141,16 @@ func (b *batchProcess) hasUnreadData() bool { type batchReader struct { *batchProcess - r io.Reader + r io.Reader + token int } func (br *batchReader) Read(p []byte) (int, error) { - br.batchProcess.Lock() - defer br.batchProcess.Unlock() + unlock, err := br.batchProcess.lockWithToken(br.token) + if err != nil { + return 0, err + } + defer unlock() n, err := br.r.Read(p) br.batchProcess.consume(n) diff --git a/internal/git/catfile/batch_cache.go b/internal/git/catfile/batch_cache.go index 7d08cc598..464fd79be 100644 --- a/internal/git/catfile/batch_cache.go +++ b/internal/git/catfile/batch_cache.go @@ -58,7 +58,7 @@ type key struct { type entry struct { key - value *Batch + value *batchCore expiry time.Time } @@ -105,7 +105,7 @@ func (bc *batchCache) monitor(refreshInterval time.Duration) { // Add adds a key, value pair to bc. If there are too many keys in bc // already Add will evict old keys until the length is OK again. -func (bc *batchCache) Add(k key, b *Batch) { +func (bc *batchCache) Add(k key, core *batchCore) { bc.Lock() defer bc.Unlock() @@ -114,7 +114,7 @@ func (bc *batchCache) Add(k key, b *Batch) { bc.delete(i, true) } - ent := &entry{key: k, value: b, expiry: time.Now().Add(bc.ttl)} + ent := &entry{key: k, value: core, expiry: time.Now().Add(bc.ttl)} bc.entries = append(bc.entries, ent) for bc.len() > bc.maxLen { @@ -129,7 +129,7 @@ func (bc *batchCache) evictHead() { bc.delete(0, true) } func (bc *batchCache) len() int { return len(bc.entries) } // Checkout removes a value from bc. After use the caller can re-add the value with bc.Add. -func (bc *batchCache) Checkout(k key) (*Batch, bool) { +func (bc *batchCache) Checkout(k key) (*batchCore, bool) { bc.Lock() defer bc.Unlock() diff --git a/internal/git/catfile/batch_cache_test.go b/internal/git/catfile/batch_cache_test.go index 510b91a7a..bddcec446 100644 --- a/internal/git/catfile/batch_cache_test.go +++ b/internal/git/catfile/batch_cache_test.go @@ -119,7 +119,7 @@ func TestCacheEnforceTTL(t *testing.T) { requireCacheValid(t, bc) - for i, v := range []*Batch{value0, value1} { + for i, v := range []*batchCore{value0, value1} { require.True(t, v.isClosed(), "value %d %v should be closed", i, v) } @@ -168,7 +168,7 @@ func requireCacheValid(t *testing.T, bc *batchCache) { } } -func testValue() *Batch { return &Batch{} } +func testValue() *batchCore { return &batchCore{} } func testKey(i int) key { return key{sessionID: fmt.Sprintf("key-%d", i)} } diff --git a/internal/git/catfile/catfile.go b/internal/git/catfile/catfile.go index 897282836..c7fc036b8 100644 --- a/internal/git/catfile/catfile.go +++ b/internal/git/catfile/catfile.go @@ -2,6 +2,7 @@ package catfile import ( "context" + "errors" "io" "sync" @@ -63,16 +64,53 @@ func init() { // A Batch instance can only serve single request at a time. If you want to // use it across multiple goroutines you need to add your own locking. type Batch struct { + *batchCore + token int +} + +type batchCore struct { sync.Mutex *batchCheck *batchProcess cancel func() closed bool + token int +} + +func (core *batchCore) lockWithToken(token int) (func(), error) { + core.Lock() + if core.token != token { + core.Unlock() + return nil, errBatchTokenExpired + } + + return core.Unlock, nil +} + +var errBatchTokenExpired = errors.New("batch token expired") + +func (core *batchCore) rotateToken() { + core.Lock() + core.token++ + core.batchProcess.setToken(core.token) + core.Unlock() +} + +func (core *batchCore) asBatch() *Batch { + core.Lock() + defer core.Unlock() + return &Batch{batchCore: core, token: core.token} } // Info returns an ObjectInfo if spec exists. If spec does not exist the // error is of type NotFoundError. func (c *Batch) Info(revspec string) (*ObjectInfo, error) { + unlock, err := c.batchCore.lockWithToken(c.token) + if err != nil { + return nil, err + } + defer unlock() + catfileLookupCounter.WithLabelValues("info").Inc() return c.batchCheck.info(revspec) } @@ -83,7 +121,7 @@ func (c *Batch) Info(revspec string) (*ObjectInfo, error) { // making another call on C. func (c *Batch) Tree(revspec string) (io.Reader, error) { catfileLookupCounter.WithLabelValues("tree").Inc() - return c.batchProcess.reader(revspec, "tree") + return c.batchProcess.reader(c.token, revspec, "tree") } // Commit returns a raw commit object. It is an error if revspec does not @@ -92,7 +130,7 @@ func (c *Batch) Tree(revspec string) (io.Reader, error) { // making another call on C. func (c *Batch) Commit(revspec string) (io.Reader, error) { catfileLookupCounter.WithLabelValues("commit").Inc() - return c.batchProcess.reader(revspec, "commit") + return c.batchProcess.reader(c.token, revspec, "commit") } // Blob returns a reader for the requested blob. The entire blob must be @@ -102,38 +140,38 @@ func (c *Batch) Commit(revspec string) (io.Reader, error) { // first use Info to resolve the revspec and check the object type. func (c *Batch) Blob(revspec string) (io.Reader, error) { catfileLookupCounter.WithLabelValues("blob").Inc() - return c.batchProcess.reader(revspec, "blob") + return c.batchProcess.reader(c.token, revspec, "blob") } // Tag returns a raw tag object. Caller must consume the Reader before // making another call on C. func (c *Batch) Tag(revspec string) (io.Reader, error) { catfileLookupCounter.WithLabelValues("tag").Inc() - return c.batchProcess.reader(revspec, "tag") + return c.batchProcess.reader(c.token, revspec, "tag") } // Close closes the writers for batchCheck and batch. This is only used for // cached Batches -func (c *Batch) Close() { - c.Lock() - defer c.Unlock() +func (core *batchCore) Close() { + core.Lock() + defer core.Unlock() - if c.closed { + if core.closed { return } - c.closed = true - if c.cancel != nil { + core.closed = true + if core.cancel != nil { // both c.batch and c.batchCheck have goroutines that listen on <ctx.Done() // when this is cancelled, it will cause those goroutines to close both writers - c.cancel() + core.cancel() } } -func (c *Batch) isClosed() bool { - c.Lock() - defer c.Unlock() - return c.closed +func (core *batchCore) isClosed() bool { + core.Lock() + defer core.Unlock() + return core.closed } // New returns a new Batch instance. It is important that ctx gets canceled @@ -151,7 +189,8 @@ func New(ctx context.Context, repo *gitalypb.Repository) (*Batch, error) { sessionID := metadata.GetValue(ctx, SessionIDField) if sessionID == "" { - return newBatch(ctx, repoPath, env) + core, err := newBatchCore(ctx, repoPath, env) + return core.asBatch(), err } cacheKey := newCacheKey(sessionID, repo) @@ -159,37 +198,39 @@ func New(ctx context.Context, repo *gitalypb.Repository) (*Batch, error) { if c, ok := cache.Checkout(cacheKey); ok { go returnWhenDone(requestDone, cache, cacheKey, c) - return c, nil + return c.asBatch(), nil } // if we are using caching, create a fresh context for the new batch // and initialize the new batch with a cache key and cancel function cacheCtx, cacheCancel := context.WithCancel(context.Background()) - c, err := newBatch(cacheCtx, repoPath, env) + core, err := newBatchCore(cacheCtx, repoPath, env) if err != nil { return nil, err } - c.cancel = cacheCancel - go returnWhenDone(requestDone, cache, cacheKey, c) + core.cancel = cacheCancel + go returnWhenDone(requestDone, cache, cacheKey, core) - return c, nil + return core.asBatch(), nil } -func returnWhenDone(done <-chan struct{}, bc *batchCache, cacheKey key, c *Batch) { +func returnWhenDone(done <-chan struct{}, bc *batchCache, cacheKey key, core *batchCore) { <-done - if c == nil || c.isClosed() { + core.rotateToken() + + if core.isClosed() { return } - if c.hasUnreadData() { + if core.hasUnreadData() { catfileCacheCounter.WithLabelValues("dirty").Inc() - c.Close() + core.Close() return } - bc.Add(cacheKey, c) + bc.Add(cacheKey, core) } var injectSpawnErrors = false @@ -198,7 +239,7 @@ type simulatedBatchSpawnError struct{} func (simulatedBatchSpawnError) Error() string { return "simulated spawn error" } -func newBatch(_ctx context.Context, repoPath string, env []string) (_ *Batch, err error) { +func newBatchCore(_ctx context.Context, repoPath string, env []string) (_ *batchCore, err error) { ctx, cancel := context.WithCancel(_ctx) defer func() { if err != nil { @@ -206,15 +247,17 @@ func newBatch(_ctx context.Context, repoPath string, env []string) (_ *Batch, er } }() - batch, err := newBatchProcess(ctx, repoPath, env) + core := &batchCore{} + + core.batchProcess, err = newBatchProcess(ctx, repoPath, env) if err != nil { return nil, err } - batchCheck, err := newBatchCheck(ctx, repoPath, env) + core.batchCheck, err = newBatchCheck(ctx, repoPath, env) if err != nil { return nil, err } - return &Batch{batchProcess: batch, batchCheck: batchCheck}, nil + return core, nil } |