diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-11-23 17:26:42 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-11-24 23:31:53 +0300 |
commit | b815f7f4e580138fbc053ac0fd6c7efcd9425233 (patch) | |
tree | eee5c97cc4bae6c4de25933792d85261348176a1 | |
parent | 8dd83e26c8244438dd402ff273dc14119f414628 (diff) |
Verification of disable cache logging message
To detect if cache was disabled we could search for corresponding
log message.
Unused parameters removed from 'tryCache' method.
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2699
-rw-r--r-- | cmd/praefect/main.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider.go | 47 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider_test.go | 90 |
3 files changed, 32 insertions, 113 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index f240d7c61..82fb04bbc 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -242,17 +242,13 @@ func run(cfgs []starter.Config, conf config.Config) error { if conf.MemoryQueueEnabled { queue = datastore.NewMemoryReplicationEventQueue(conf) rs = datastore.NewMemoryRepositoryStore(conf.StorageNames()) - storagesDirect := datastore.NewDirectStorageProvider(rs) - metricsCollectors = append(metricsCollectors, storagesDirect) - sp = storagesDirect + sp = datastore.NewDirectStorageProvider(rs) } else { queue = datastore.NewPostgresReplicationEventQueue(db) rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) if conf.DB.ToPQString(true) == "" { - storagesDirect := datastore.NewDirectStorageProvider(rs) - metricsCollectors = append(metricsCollectors, storagesDirect) - sp = storagesDirect + sp = datastore.NewDirectStorageProvider(rs) } else { listenerOpts := datastore.DefaultPostgresListenerOpts listenerOpts.Addr = conf.DB.ToPQString(true) diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index ebc71807c..4b52c0304 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -23,32 +23,12 @@ type SecondariesProvider interface { // DirectStorageProvider provides the latest state of the synced nodes. type DirectStorageProvider struct { - sp SecondariesProvider - errorsTotal *prometheus.CounterVec + sp SecondariesProvider } // NewDirectStorageProvider returns a new storage provider. func NewDirectStorageProvider(sp SecondariesProvider) *DirectStorageProvider { - csp := &DirectStorageProvider{ - sp: sp, - errorsTotal: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_praefect_uptodate_storages_errors_total", - Help: "Total number of errors raised during defining up to date storages for reads distribution", - }, - []string{"type"}, - ), - } - - return csp -} - -func (c *DirectStorageProvider) Describe(descs chan<- *prometheus.Desc) { - prometheus.DescribeByCollect(c, descs) -} - -func (c *DirectStorageProvider) Collect(collector chan<- prometheus.Metric) { - c.errorsTotal.Collect(collector) + return &DirectStorageProvider{sp: sp} } func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) []string { @@ -59,7 +39,6 @@ func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStora func (c *DirectStorageProvider) getSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) ([]string, bool) { upToDateStorages, err := c.sp.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primaryStorage) if err != nil { - c.errorsTotal.WithLabelValues("retrieve").Inc() // this is recoverable error - we can proceed with primary node ctxlogrus.Extract(ctx).WithError(err).Warn("get consistent secondaries") return []string{primaryStorage}, false @@ -85,7 +64,7 @@ var errNotExistingVirtualStorage = errors.New("virtual storage does not exist") // CachingStorageProvider is a storage provider that caches up to date storages by repository. // Each virtual storage has it's own cache that invalidates entries based on notifications. type CachingStorageProvider struct { - *DirectStorageProvider + dsp *DirectStorageProvider // caches is per virtual storage cache. It is initialized once on construction. caches map[string]*lru.Cache // access is access method to use: 0 - without caching; 1 - with caching. @@ -100,10 +79,10 @@ type CachingStorageProvider struct { // NewCachingStorageProvider returns a storage provider that uses caching. func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider, virtualStorages []string) (*CachingStorageProvider, error) { csp := &CachingStorageProvider{ - DirectStorageProvider: NewDirectStorageProvider(sp), - caches: make(map[string]*lru.Cache, len(virtualStorages)), - syncer: syncer{inflight: map[string]chan struct{}{}}, - callbackLogger: logger.WithField("component", "caching_storage_provider"), + dsp: NewDirectStorageProvider(sp), + caches: make(map[string]*lru.Cache, len(virtualStorages)), + syncer: syncer{inflight: map[string]chan struct{}{}}, + callbackLogger: logger.WithField("component", "caching_storage_provider"), cacheAccessTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_praefect_uptodate_storages_cache_access_total", @@ -143,13 +122,10 @@ func (c *CachingStorageProvider) Notification(n glsql.Notification) { var change changeNotification if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&change); err != nil { c.disableCaching() // as we can't update cache properly we should disable it - c.errorsTotal.WithLabelValues("notification_decode").Inc() - c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed") + c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed, cache disabled") return } - c.enableCaching() - entries := map[string][]string{} for _, notificationEntries := range [][]notificationEntry{change.Old, change.New} { for _, entry := range notificationEntries { @@ -184,7 +160,6 @@ func (c *CachingStorageProvider) Describe(descs chan<- *prometheus.Desc) { } func (c *CachingStorageProvider) Collect(collector chan<- prometheus.Metric) { - c.errorsTotal.Collect(collector) c.cacheAccessTotal.Collect(collector) } @@ -207,10 +182,10 @@ func (c *CachingStorageProvider) getCache(virtualStorage string) (*lru.Cache, bo func (c *CachingStorageProvider) cacheMiss(ctx context.Context, virtualStorage, relativePath, primaryStorage string) ([]string, bool) { c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc() - return c.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage) + return c.dsp.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage) } -func (c *CachingStorageProvider) tryCache(ctx context.Context, virtualStorage, relativePath, primaryStorage string) (func(), *lru.Cache, []string, bool) { +func (c *CachingStorageProvider) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, []string, bool) { populateDone := func() {} // should be called AFTER any cache population is done cache, found := c.getCache(virtualStorage) @@ -242,7 +217,7 @@ func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStor var ok bool var populationDone func() - populationDone, cache, storages, ok = c.tryCache(ctx, virtualStorage, relativePath, primaryStorage) + populationDone, cache, storages, ok = c.tryCache(virtualStorage, relativePath) defer populationDone() if ok { c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 0548d5e7a..45b422ad0 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -2,7 +2,7 @@ package datastore import ( "context" - "io" + "errors" "runtime" "strings" "sync" @@ -77,14 +77,6 @@ func TestDirectStorageProvider_GetSyncedNodes(t *testing.T) { require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message) require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data) require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level) - - // "populate" metric is not set as there was an error and we don't want this result to be cached - err := testutil.CollectAndCompare(sp, strings.NewReader(` - # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution - # TYPE gitaly_praefect_uptodate_storages_errors_total counter - gitaly_praefect_uptodate_storages_errors_total{type="retrieve"} 1 - `)) - require.NoError(t, err) }) } @@ -187,23 +179,20 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.ElementsMatch(t, []string{"g1"}, storages) require.Len(t, logHook.AllEntries(), 1) - require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message) - require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data) - require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level) + assert.Equal(t, "get consistent secondaries", logHook.LastEntry().Message) + assert.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data) + assert.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level) // "populate" metric is not set as there was an error and we don't want this result to be cached err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) # TYPE gitaly_praefect_uptodate_storages_cache_access_total counter gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 1 - # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution - # TYPE gitaly_praefect_uptodate_storages_errors_total counter - gitaly_praefect_uptodate_storages_errors_total{type="retrieve"} 1 `)) require.NoError(t, err) }) - t.Run("cache becomes disabled after handling invalid notification payload", func(t *testing.T) { + t.Run("cache is disabled after handling invalid payload", func(t *testing.T) { logger := testhelper.DiscardTestEntry(t) logHook := test.NewLocal(logger.Logger) @@ -213,53 +202,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { rs := &mockConsistentSecondariesProvider{} rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1"). Return(map[string]struct{}{"g2": {}, "g3": {}}, nil). - Twice() - - cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"}) - require.NoError(t, err) - cache.Connected() - - // first access populates the cache - storages1 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") - require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1) - - // invalid payload disables caching - cache.Notification(glsql.Notification{Channel: "nt-channel", Payload: ``}) - - logEntries := logHook.AllEntries() - require.Len(t, logEntries, 1) - assert.Equal(t, logrus.Fields{ - "component": "caching_storage_provider", - "channel": "nt-channel", - "error": io.EOF, - }, logEntries[0].Data) - assert.Equal(t, "received payload can't be processed", logEntries[0].Message) - - // second access omits cached data as caching should be disabled - storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") - require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2) - - err = testutil.CollectAndCompare(cache, strings.NewReader(` - # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) - # TYPE gitaly_praefect_uptodate_storages_cache_access_total counter - gitaly_praefect_uptodate_storages_cache_access_total{type="evict",virtual_storage="vs"} 1 - gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 2 - gitaly_praefect_uptodate_storages_cache_access_total{type="populate",virtual_storage="vs"} 1 - # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution - # TYPE gitaly_praefect_uptodate_storages_errors_total counter - gitaly_praefect_uptodate_storages_errors_total{type="notification_decode"} 1 - `)) - require.NoError(t, err) - }) - - t.Run("cache becomes enabled after handling valid payload after invalid payload", func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1"). - Return(map[string]struct{}{"g2": {}, "g3": {}}, nil). - Times(3) + Times(4) cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -270,14 +213,14 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1) // invalid payload disables caching - cache.Notification(glsql.Notification{Payload: ``}) + cache.Notification(glsql.Notification{Channel: "notification_channel_1", Payload: ``}) // second access omits cached data as caching should be disabled storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2) // valid payload enables caching again - cache.Notification(glsql.Notification{Payload: `{}`}) + cache.Notification(glsql.Notification{Channel: "notification_channel_2", Payload: `{}`}) // third access retrieves data and caches it storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") @@ -287,16 +230,21 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { storages4 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages4) + require.Len(t, logHook.AllEntries(), 1) + assert.Equal(t, "received payload can't be processed, cache disabled", logHook.LastEntry().Message) + assert.Equal(t, logrus.Fields{ + "channel": "notification_channel_1", + "component": "caching_storage_provider", + "error": errors.New("EOF"), + }, logHook.LastEntry().Data) + assert.Equal(t, logrus.ErrorLevel, logHook.LastEntry().Level) + err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) # TYPE gitaly_praefect_uptodate_storages_cache_access_total counter gitaly_praefect_uptodate_storages_cache_access_total{type="evict",virtual_storage="vs"} 1 - gitaly_praefect_uptodate_storages_cache_access_total{type="hit",virtual_storage="vs"} 1 - gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 3 - gitaly_praefect_uptodate_storages_cache_access_total{type="populate",virtual_storage="vs"} 2 - # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution - # TYPE gitaly_praefect_uptodate_storages_errors_total counter - gitaly_praefect_uptodate_storages_errors_total{type="notification_decode"} 1 + gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 4 + gitaly_praefect_uptodate_storages_cache_access_total{type="populate",virtual_storage="vs"} 1 `)) require.NoError(t, err) }) |