From 64bc6d600d7c7b22ed538c163c1912bbb7f55866 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Thu, 19 Nov 2020 09:03:32 -0800 Subject: Remove syncer from storage cache In the interest of maintainability, this change removes the syncer from the storage cache. --- internal/praefect/datastore/storage_provider.go | 60 ++-------------------- .../praefect/datastore/storage_provider_test.go | 35 ------------- 2 files changed, 5 insertions(+), 90 deletions(-) diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index ebc71807c..029bd8ead 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "strings" - "sync" "sync/atomic" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" @@ -90,8 +89,6 @@ type CachingStorageProvider struct { caches map[string]*lru.Cache // access is access method to use: 0 - without caching; 1 - with caching. access int32 - // syncer allows to sync retrieval operations to omit unnecessary runs. - syncer syncer // callbackLogger should be used only inside of the methods used as callbacks. callbackLogger logrus.FieldLogger cacheAccessTotal *prometheus.CounterVec @@ -102,7 +99,6 @@ func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider csp := &CachingStorageProvider{ DirectStorageProvider: NewDirectStorageProvider(sp), caches: make(map[string]*lru.Cache, len(virtualStorages)), - syncer: syncer{inflight: map[string]chan struct{}{}}, callbackLogger: logger.WithField("component", "caching_storage_provider"), cacheAccessTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -210,26 +206,17 @@ func (c *CachingStorageProvider) cacheMiss(ctx context.Context, virtualStorage, return c.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage) } -func (c *CachingStorageProvider) tryCache(ctx context.Context, virtualStorage, relativePath, primaryStorage string) (func(), *lru.Cache, []string, bool) { - populateDone := func() {} // should be called AFTER any cache population is done - +func (c *CachingStorageProvider) tryCache(ctx context.Context, virtualStorage, relativePath, primaryStorage string) (*lru.Cache, []string, bool) { cache, found := c.getCache(virtualStorage) if !found { - return populateDone, nil, nil, false + return nil, nil, false } if storages, found := getStringSlice(cache, relativePath); found { - return populateDone, cache, storages, true - } - - // synchronises concurrent attempts to update cache for the same key. - populateDone = c.syncer.await(relativePath) - - if storages, found := getStringSlice(cache, relativePath); found { - return populateDone, cache, storages, true + return cache, storages, true } - return populateDone, cache, nil, false + return cache, nil, false } func (c *CachingStorageProvider) isCacheEnabled() bool { return atomic.LoadInt32(&c.access) != 0 } @@ -240,10 +227,8 @@ func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStor if c.isCacheEnabled() { var storages []string var ok bool - var populationDone func() - populationDone, cache, storages, ok = c.tryCache(ctx, virtualStorage, relativePath, primaryStorage) - defer populationDone() + cache, storages, ok = c.tryCache(ctx, virtualStorage, relativePath, primaryStorage) if ok { c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() return storages @@ -263,38 +248,3 @@ func getStringSlice(cache *lru.Cache, key string) ([]string, bool) { vals, _ := val.([]string) return vals, found } - -// syncer allows to sync access to a particular key. -type syncer struct { - // inflight contains set of keys already acquired for sync. - inflight map[string]chan struct{} - mtx sync.Mutex -} - -// await acquires lock for provided key and returns a callback to invoke once the key could be released. -// If key is already acquired the call will be blocked until callback for that key won't be called. -func (sc *syncer) await(key string) func() { - sc.mtx.Lock() - - if cond, found := sc.inflight[key]; found { - sc.mtx.Unlock() - - <-cond // the key is acquired, wait until it is released - - return func() {} - } - - defer sc.mtx.Unlock() - - cond := make(chan struct{}) - sc.inflight[key] = cond - - return func() { - sc.mtx.Lock() - defer sc.mtx.Unlock() - - delete(sc.inflight, key) - - close(cond) - } -} diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 28e93f783..9ead6922f 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -6,7 +6,6 @@ import ( "strings" "sync" "testing" - "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -429,37 +428,3 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { wg.Wait() }) } - -func TestSyncer_await(t *testing.T) { - sc := syncer{inflight: map[string]chan struct{}{}} - - const dur = 50 * time.Millisecond - - var wg sync.WaitGroup - begin := make(chan struct{}) - - awaitKey := func(key string) { - wg.Add(1) - go func() { - defer wg.Done() - - <-begin - - defer sc.await(key)() - time.Sleep(dur) - }() - } - - keys := []string{"a", "a", "b", "c", "d"} - for _, key := range keys { - awaitKey(key) - } - - start := time.Now() - close(begin) - wg.Wait() - duration := time.Since(start).Milliseconds() - - require.GreaterOrEqual(t, duration, 2*dur.Milliseconds(), "we use same key twice, so it should take at least 2 durations") - require.Less(t, duration, int64(len(keys))*dur.Milliseconds(), "it should take less time as sequential processing") -} -- cgit v1.2.3