Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-11-19 20:03:32 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-11-19 20:03:41 +0300
commit64bc6d600d7c7b22ed538c163c1912bbb7f55866 (patch)
tree780b586dafe87a357f9d3bc2d9bcb33cafde3ef3
parentb653ec0b862e46ce90e32f4ae4dc73583a2c5b75 (diff)
Remove syncer from storage cachepo-storage-cache-remove-syncer
In the interest of maintainability, this change removes the syncer from the storage cache.
-rw-r--r--internal/praefect/datastore/storage_provider.go60
-rw-r--r--internal/praefect/datastore/storage_provider_test.go35
2 files changed, 5 insertions, 90 deletions
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index ebc71807c..029bd8ead 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"strings"
- "sync"
"sync/atomic"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
@@ -90,8 +89,6 @@ type CachingStorageProvider struct {
caches map[string]*lru.Cache
// access is access method to use: 0 - without caching; 1 - with caching.
access int32
- // syncer allows to sync retrieval operations to omit unnecessary runs.
- syncer syncer
// callbackLogger should be used only inside of the methods used as callbacks.
callbackLogger logrus.FieldLogger
cacheAccessTotal *prometheus.CounterVec
@@ -102,7 +99,6 @@ func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider
csp := &CachingStorageProvider{
DirectStorageProvider: NewDirectStorageProvider(sp),
caches: make(map[string]*lru.Cache, len(virtualStorages)),
- syncer: syncer{inflight: map[string]chan struct{}{}},
callbackLogger: logger.WithField("component", "caching_storage_provider"),
cacheAccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -210,26 +206,17 @@ func (c *CachingStorageProvider) cacheMiss(ctx context.Context, virtualStorage,
return c.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage)
}
-func (c *CachingStorageProvider) tryCache(ctx context.Context, virtualStorage, relativePath, primaryStorage string) (func(), *lru.Cache, []string, bool) {
- populateDone := func() {} // should be called AFTER any cache population is done
-
+func (c *CachingStorageProvider) tryCache(ctx context.Context, virtualStorage, relativePath, primaryStorage string) (*lru.Cache, []string, bool) {
cache, found := c.getCache(virtualStorage)
if !found {
- return populateDone, nil, nil, false
+ return nil, nil, false
}
if storages, found := getStringSlice(cache, relativePath); found {
- return populateDone, cache, storages, true
- }
-
- // synchronises concurrent attempts to update cache for the same key.
- populateDone = c.syncer.await(relativePath)
-
- if storages, found := getStringSlice(cache, relativePath); found {
- return populateDone, cache, storages, true
+ return cache, storages, true
}
- return populateDone, cache, nil, false
+ return cache, nil, false
}
func (c *CachingStorageProvider) isCacheEnabled() bool { return atomic.LoadInt32(&c.access) != 0 }
@@ -240,10 +227,8 @@ func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStor
if c.isCacheEnabled() {
var storages []string
var ok bool
- var populationDone func()
- populationDone, cache, storages, ok = c.tryCache(ctx, virtualStorage, relativePath, primaryStorage)
- defer populationDone()
+ cache, storages, ok = c.tryCache(ctx, virtualStorage, relativePath, primaryStorage)
if ok {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc()
return storages
@@ -263,38 +248,3 @@ func getStringSlice(cache *lru.Cache, key string) ([]string, bool) {
vals, _ := val.([]string)
return vals, found
}
-
-// syncer allows to sync access to a particular key.
-type syncer struct {
- // inflight contains set of keys already acquired for sync.
- inflight map[string]chan struct{}
- mtx sync.Mutex
-}
-
-// await acquires lock for provided key and returns a callback to invoke once the key could be released.
-// If key is already acquired the call will be blocked until callback for that key won't be called.
-func (sc *syncer) await(key string) func() {
- sc.mtx.Lock()
-
- if cond, found := sc.inflight[key]; found {
- sc.mtx.Unlock()
-
- <-cond // the key is acquired, wait until it is released
-
- return func() {}
- }
-
- defer sc.mtx.Unlock()
-
- cond := make(chan struct{})
- sc.inflight[key] = cond
-
- return func() {
- sc.mtx.Lock()
- defer sc.mtx.Unlock()
-
- delete(sc.inflight, key)
-
- close(cond)
- }
-}
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 28e93f783..9ead6922f 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -6,7 +6,6 @@ import (
"strings"
"sync"
"testing"
- "time"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus/testutil"
@@ -429,37 +428,3 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
wg.Wait()
})
}
-
-func TestSyncer_await(t *testing.T) {
- sc := syncer{inflight: map[string]chan struct{}{}}
-
- const dur = 50 * time.Millisecond
-
- var wg sync.WaitGroup
- begin := make(chan struct{})
-
- awaitKey := func(key string) {
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- <-begin
-
- defer sc.await(key)()
- time.Sleep(dur)
- }()
- }
-
- keys := []string{"a", "a", "b", "c", "d"}
- for _, key := range keys {
- awaitKey(key)
- }
-
- start := time.Now()
- close(begin)
- wg.Wait()
- duration := time.Since(start).Milliseconds()
-
- require.GreaterOrEqual(t, duration, 2*dur.Milliseconds(), "we use same key twice, so it should take at least 2 durations")
- require.Less(t, duration, int64(len(keys))*dur.Milliseconds(), "it should take less time as sequential processing")
-}