Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-11-23 17:26:42 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-11-24 23:31:53 +0300
commitb815f7f4e580138fbc053ac0fd6c7efcd9425233 (patch)
treeeee5c97cc4bae6c4de25933792d85261348176a1
parent8dd83e26c8244438dd402ff273dc14119f414628 (diff)
Verification of disable cache logging message
To detect if cache was disabled we could search for corresponding log message. Unused parameters removed from 'tryCache' method. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2699
-rw-r--r--cmd/praefect/main.go8
-rw-r--r--internal/praefect/datastore/storage_provider.go47
-rw-r--r--internal/praefect/datastore/storage_provider_test.go90
3 files changed, 32 insertions, 113 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index f240d7c61..82fb04bbc 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -242,17 +242,13 @@ func run(cfgs []starter.Config, conf config.Config) error {
if conf.MemoryQueueEnabled {
queue = datastore.NewMemoryReplicationEventQueue(conf)
rs = datastore.NewMemoryRepositoryStore(conf.StorageNames())
- storagesDirect := datastore.NewDirectStorageProvider(rs)
- metricsCollectors = append(metricsCollectors, storagesDirect)
- sp = storagesDirect
+ sp = datastore.NewDirectStorageProvider(rs)
} else {
queue = datastore.NewPostgresReplicationEventQueue(db)
rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
if conf.DB.ToPQString(true) == "" {
- storagesDirect := datastore.NewDirectStorageProvider(rs)
- metricsCollectors = append(metricsCollectors, storagesDirect)
- sp = storagesDirect
+ sp = datastore.NewDirectStorageProvider(rs)
} else {
listenerOpts := datastore.DefaultPostgresListenerOpts
listenerOpts.Addr = conf.DB.ToPQString(true)
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index ebc71807c..4b52c0304 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -23,32 +23,12 @@ type SecondariesProvider interface {
// DirectStorageProvider provides the latest state of the synced nodes.
type DirectStorageProvider struct {
- sp SecondariesProvider
- errorsTotal *prometheus.CounterVec
+ sp SecondariesProvider
}
// NewDirectStorageProvider returns a new storage provider.
func NewDirectStorageProvider(sp SecondariesProvider) *DirectStorageProvider {
- csp := &DirectStorageProvider{
- sp: sp,
- errorsTotal: prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "gitaly_praefect_uptodate_storages_errors_total",
- Help: "Total number of errors raised during defining up to date storages for reads distribution",
- },
- []string{"type"},
- ),
- }
-
- return csp
-}
-
-func (c *DirectStorageProvider) Describe(descs chan<- *prometheus.Desc) {
- prometheus.DescribeByCollect(c, descs)
-}
-
-func (c *DirectStorageProvider) Collect(collector chan<- prometheus.Metric) {
- c.errorsTotal.Collect(collector)
+ return &DirectStorageProvider{sp: sp}
}
func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) []string {
@@ -59,7 +39,6 @@ func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStora
func (c *DirectStorageProvider) getSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) ([]string, bool) {
upToDateStorages, err := c.sp.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primaryStorage)
if err != nil {
- c.errorsTotal.WithLabelValues("retrieve").Inc()
// this is recoverable error - we can proceed with primary node
ctxlogrus.Extract(ctx).WithError(err).Warn("get consistent secondaries")
return []string{primaryStorage}, false
@@ -85,7 +64,7 @@ var errNotExistingVirtualStorage = errors.New("virtual storage does not exist")
// CachingStorageProvider is a storage provider that caches up to date storages by repository.
// Each virtual storage has it's own cache that invalidates entries based on notifications.
type CachingStorageProvider struct {
- *DirectStorageProvider
+ dsp *DirectStorageProvider
// caches is per virtual storage cache. It is initialized once on construction.
caches map[string]*lru.Cache
// access is access method to use: 0 - without caching; 1 - with caching.
@@ -100,10 +79,10 @@ type CachingStorageProvider struct {
// NewCachingStorageProvider returns a storage provider that uses caching.
func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider, virtualStorages []string) (*CachingStorageProvider, error) {
csp := &CachingStorageProvider{
- DirectStorageProvider: NewDirectStorageProvider(sp),
- caches: make(map[string]*lru.Cache, len(virtualStorages)),
- syncer: syncer{inflight: map[string]chan struct{}{}},
- callbackLogger: logger.WithField("component", "caching_storage_provider"),
+ dsp: NewDirectStorageProvider(sp),
+ caches: make(map[string]*lru.Cache, len(virtualStorages)),
+ syncer: syncer{inflight: map[string]chan struct{}{}},
+ callbackLogger: logger.WithField("component", "caching_storage_provider"),
cacheAccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_praefect_uptodate_storages_cache_access_total",
@@ -143,13 +122,10 @@ func (c *CachingStorageProvider) Notification(n glsql.Notification) {
var change changeNotification
if err := json.NewDecoder(strings.NewReader(n.Payload)).Decode(&change); err != nil {
c.disableCaching() // as we can't update cache properly we should disable it
- c.errorsTotal.WithLabelValues("notification_decode").Inc()
- c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed")
+ c.callbackLogger.WithError(err).WithField("channel", n.Channel).Error("received payload can't be processed, cache disabled")
return
}
- c.enableCaching()
-
entries := map[string][]string{}
for _, notificationEntries := range [][]notificationEntry{change.Old, change.New} {
for _, entry := range notificationEntries {
@@ -184,7 +160,6 @@ func (c *CachingStorageProvider) Describe(descs chan<- *prometheus.Desc) {
}
func (c *CachingStorageProvider) Collect(collector chan<- prometheus.Metric) {
- c.errorsTotal.Collect(collector)
c.cacheAccessTotal.Collect(collector)
}
@@ -207,10 +182,10 @@ func (c *CachingStorageProvider) getCache(virtualStorage string) (*lru.Cache, bo
func (c *CachingStorageProvider) cacheMiss(ctx context.Context, virtualStorage, relativePath, primaryStorage string) ([]string, bool) {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc()
- return c.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage)
+ return c.dsp.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage)
}
-func (c *CachingStorageProvider) tryCache(ctx context.Context, virtualStorage, relativePath, primaryStorage string) (func(), *lru.Cache, []string, bool) {
+func (c *CachingStorageProvider) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, []string, bool) {
populateDone := func() {} // should be called AFTER any cache population is done
cache, found := c.getCache(virtualStorage)
@@ -242,7 +217,7 @@ func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStor
var ok bool
var populationDone func()
- populationDone, cache, storages, ok = c.tryCache(ctx, virtualStorage, relativePath, primaryStorage)
+ populationDone, cache, storages, ok = c.tryCache(virtualStorage, relativePath)
defer populationDone()
if ok {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc()
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 0548d5e7a..45b422ad0 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -2,7 +2,7 @@ package datastore
import (
"context"
- "io"
+ "errors"
"runtime"
"strings"
"sync"
@@ -77,14 +77,6 @@ func TestDirectStorageProvider_GetSyncedNodes(t *testing.T) {
require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message)
require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data)
require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level)
-
- // "populate" metric is not set as there was an error and we don't want this result to be cached
- err := testutil.CollectAndCompare(sp, strings.NewReader(`
- # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution
- # TYPE gitaly_praefect_uptodate_storages_errors_total counter
- gitaly_praefect_uptodate_storages_errors_total{type="retrieve"} 1
- `))
- require.NoError(t, err)
})
}
@@ -187,23 +179,20 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.ElementsMatch(t, []string{"g1"}, storages)
require.Len(t, logHook.AllEntries(), 1)
- require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message)
- require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data)
- require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level)
+ assert.Equal(t, "get consistent secondaries", logHook.LastEntry().Message)
+ assert.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data)
+ assert.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level)
// "populate" metric is not set as there was an error and we don't want this result to be cached
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
# TYPE gitaly_praefect_uptodate_storages_cache_access_total counter
gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 1
- # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution
- # TYPE gitaly_praefect_uptodate_storages_errors_total counter
- gitaly_praefect_uptodate_storages_errors_total{type="retrieve"} 1
`))
require.NoError(t, err)
})
- t.Run("cache becomes disabled after handling invalid notification payload", func(t *testing.T) {
+ t.Run("cache is disabled after handling invalid payload", func(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
logHook := test.NewLocal(logger.Logger)
@@ -213,53 +202,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
rs := &mockConsistentSecondariesProvider{}
rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1").
Return(map[string]struct{}{"g2": {}, "g3": {}}, nil).
- Twice()
-
- cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
- require.NoError(t, err)
- cache.Connected()
-
- // first access populates the cache
- storages1 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
- require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1)
-
- // invalid payload disables caching
- cache.Notification(glsql.Notification{Channel: "nt-channel", Payload: ``})
-
- logEntries := logHook.AllEntries()
- require.Len(t, logEntries, 1)
- assert.Equal(t, logrus.Fields{
- "component": "caching_storage_provider",
- "channel": "nt-channel",
- "error": io.EOF,
- }, logEntries[0].Data)
- assert.Equal(t, "received payload can't be processed", logEntries[0].Message)
-
- // second access omits cached data as caching should be disabled
- storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
- require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2)
-
- err = testutil.CollectAndCompare(cache, strings.NewReader(`
- # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
- # TYPE gitaly_praefect_uptodate_storages_cache_access_total counter
- gitaly_praefect_uptodate_storages_cache_access_total{type="evict",virtual_storage="vs"} 1
- gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 2
- gitaly_praefect_uptodate_storages_cache_access_total{type="populate",virtual_storage="vs"} 1
- # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution
- # TYPE gitaly_praefect_uptodate_storages_errors_total counter
- gitaly_praefect_uptodate_storages_errors_total{type="notification_decode"} 1
- `))
- require.NoError(t, err)
- })
-
- t.Run("cache becomes enabled after handling valid payload after invalid payload", func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1").
- Return(map[string]struct{}{"g2": {}, "g3": {}}, nil).
- Times(3)
+ Times(4)
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -270,14 +213,14 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1)
// invalid payload disables caching
- cache.Notification(glsql.Notification{Payload: ``})
+ cache.Notification(glsql.Notification{Channel: "notification_channel_1", Payload: ``})
// second access omits cached data as caching should be disabled
storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2)
// valid payload enables caching again
- cache.Notification(glsql.Notification{Payload: `{}`})
+ cache.Notification(glsql.Notification{Channel: "notification_channel_2", Payload: `{}`})
// third access retrieves data and caches it
storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
@@ -287,16 +230,21 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
storages4 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages4)
+ require.Len(t, logHook.AllEntries(), 1)
+ assert.Equal(t, "received payload can't be processed, cache disabled", logHook.LastEntry().Message)
+ assert.Equal(t, logrus.Fields{
+ "channel": "notification_channel_1",
+ "component": "caching_storage_provider",
+ "error": errors.New("EOF"),
+ }, logHook.LastEntry().Data)
+ assert.Equal(t, logrus.ErrorLevel, logHook.LastEntry().Level)
+
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
# TYPE gitaly_praefect_uptodate_storages_cache_access_total counter
gitaly_praefect_uptodate_storages_cache_access_total{type="evict",virtual_storage="vs"} 1
- gitaly_praefect_uptodate_storages_cache_access_total{type="hit",virtual_storage="vs"} 1
- gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 3
- gitaly_praefect_uptodate_storages_cache_access_total{type="populate",virtual_storage="vs"} 2
- # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution
- # TYPE gitaly_praefect_uptodate_storages_errors_total counter
- gitaly_praefect_uptodate_storages_errors_total{type="notification_decode"} 1
+ gitaly_praefect_uptodate_storages_cache_access_total{type="miss",virtual_storage="vs"} 4
+ gitaly_praefect_uptodate_storages_cache_access_total{type="populate",virtual_storage="vs"} 1
`))
require.NoError(t, err)
})