Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-11-17 15:36:36 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-11-17 15:36:36 +0300
commitf869e2716122b17ce78508aacf981a098406d2d7 (patch)
treee380f87f34d1942053f9a45eee6b4b854bc06b85
parentb8a027e4ea96235545000da5ff35d05a91b4370b (diff)
parent6da7ee5a3db7e6511dcf7b794e161439805e22ff (diff)
Merge branch 'ps-up-to-date-storages-provider' into 'master'
Introduction of storages provider to get up to date storages See merge request gitlab-org/gitaly!2787
-rw-r--r--cmd/praefect/main.go4
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator_test.go16
-rw-r--r--internal/praefect/datastore/storage_provider.go63
-rw-r--r--internal/praefect/datastore/storage_provider_test.go107
-rw-r--r--internal/praefect/helper_test.go4
-rw-r--r--internal/praefect/nodes/manager.go30
-rw-r--r--internal/praefect/nodes/manager_test.go12
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/server_factory_test.go6
-rw-r--r--internal/praefect/server_test.go6
11 files changed, 219 insertions, 39 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 51b6aba51..78f597443 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -260,7 +260,9 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
}
- nodeManager, err := nodes.NewManager(logger, conf, db, rs, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker)
+ sp := datastore.NewDirectStorageProvider(rs)
+
+ nodeManager, err := nodes.NewManager(logger, conf, db, rs, sp, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker)
if err != nil {
return err
}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 27d19de5d..ac73e07e3 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -169,7 +169,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
queue := datastore.NewMemoryReplicationEventQueue(conf)
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, rs, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
txMgr := transactions.NewManager(conf)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 8080d3f86..436f7b859 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -152,7 +152,7 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -332,7 +332,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -473,7 +473,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
RelativePath: "/path/to/hashed/storage",
}
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -594,7 +594,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -684,7 +684,9 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
require.NoError(t, err)
require.NoError(t, repoStore.SetGeneration(ctx, conf.VirtualStorages[0].Name, targetRepo.RelativePath, secondaryNodeConf.Storage, generation))
- nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ sp := datastore.NewDirectStorageProvider(repoStore)
+
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, sp, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -883,7 +885,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -1010,7 +1012,7 @@ func TestStreamDirectorStorageScope(t *testing.T) {
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
coordinator := NewCoordinator(
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
new file mode 100644
index 000000000..57f11fc73
--- /dev/null
+++ b/internal/praefect/datastore/storage_provider.go
@@ -0,0 +1,63 @@
+package datastore
+
+import (
+ "context"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// SecondariesProvider should provide information about secondary storages.
+type SecondariesProvider interface {
+ // GetConsistentSecondaries returns all secondaries with the same generation as the primary.
+ GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
+}
+
+// DirectStorageProvider provides the latest state of the synced nodes.
+type DirectStorageProvider struct {
+ sp SecondariesProvider
+ errorsTotal *prometheus.CounterVec
+}
+
+// NewDirectStorageProvider returns a new storage provider.
+func NewDirectStorageProvider(sp SecondariesProvider) *DirectStorageProvider {
+ csp := &DirectStorageProvider{
+ sp: sp,
+ errorsTotal: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitaly_praefect_uptodate_storages_errors_total",
+ Help: "Total number of errors raised during defining up to date storages for reads distribution",
+ },
+ []string{"type"},
+ ),
+ }
+
+ return csp
+}
+
+func (c *DirectStorageProvider) Describe(descs chan<- *prometheus.Desc) {
+ prometheus.DescribeByCollect(c, descs)
+}
+
+func (c *DirectStorageProvider) Collect(collector chan<- prometheus.Metric) {
+ c.errorsTotal.Collect(collector)
+}
+
+func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) []string {
+ upToDateStorages, err := c.sp.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primaryStorage)
+ if err != nil {
+ c.errorsTotal.WithLabelValues("retrieve").Inc()
+ // this is recoverable error - we can proceed with primary node
+ ctxlogrus.Extract(ctx).WithError(err).Warn("get consistent secondaries")
+ return []string{primaryStorage}
+ }
+
+ storages := make([]string, 0, len(upToDateStorages)+1)
+ for upToDateStorage := range upToDateStorages {
+ if upToDateStorage != primaryStorage {
+ storages = append(storages, upToDateStorage)
+ }
+ }
+
+ return append(storages, primaryStorage)
+}
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
new file mode 100644
index 000000000..1bb5b8691
--- /dev/null
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -0,0 +1,107 @@
+package datastore
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestDirectStorageProvider_GetSyncedNodes(t *testing.T) {
+ getCtx := func() (context.Context, context.CancelFunc) {
+ ctx, cancel := testhelper.Context()
+
+ logger := testhelper.DiscardTestEntry(t)
+ ctx = ctxlogrus.ToContext(ctx, logger)
+ return ctx, cancel
+ }
+
+ t.Run("ok", func(t *testing.T) {
+ ctx, cancel := getCtx()
+ defer cancel()
+
+ for _, tc := range []struct {
+ desc string
+ ret map[string]struct{}
+ exp []string
+ }{
+ {
+ desc: "primary included",
+ ret: map[string]struct{}{"g2": {}, "g3": {}},
+ exp: []string{"g1", "g2", "g3"},
+ },
+ {
+ desc: "distinct values",
+ ret: map[string]struct{}{"g1": {}, "g2": {}, "g3": {}},
+ exp: []string{"g1", "g2", "g3"},
+ },
+ {
+ desc: "none",
+ ret: nil,
+ exp: []string{"g1"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ rs := &mockConsistentSecondariesProvider{}
+ rs.On("GetConsistentSecondaries", ctx, "vs", "/repo/path", "g1").Return(tc.ret, nil)
+
+ sp := NewDirectStorageProvider(rs)
+ storages := sp.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ require.ElementsMatch(t, tc.exp, storages)
+ })
+ }
+ })
+
+ t.Run("repository store returns an error", func(t *testing.T) {
+ ctx, cancel := getCtx()
+ defer cancel()
+
+ logger := ctxlogrus.Extract(ctx)
+ logHook := test.NewLocal(logger.Logger)
+
+ rs := &mockConsistentSecondariesProvider{}
+ rs.On("GetConsistentSecondaries", ctx, "vs", "/repo/path", "g1").
+ Return(nil, assert.AnError).
+ Once()
+
+ sp := NewDirectStorageProvider(rs)
+
+ storages := sp.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ require.ElementsMatch(t, []string{"g1"}, storages)
+
+ require.Len(t, logHook.AllEntries(), 1)
+ require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message)
+ require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data)
+ require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level)
+
+ // "populate" metric is not set as there was an error and we don't want this result to be cached
+ err := testutil.CollectAndCompare(sp, strings.NewReader(`
+ # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution
+ # TYPE gitaly_praefect_uptodate_storages_errors_total counter
+ gitaly_praefect_uptodate_storages_errors_total{type="retrieve"} 1
+ `))
+ require.NoError(t, err)
+ })
+}
+
+type mockConsistentSecondariesProvider struct {
+ mock.Mock
+}
+
+func (m *mockConsistentSecondariesProvider) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) {
+ args := m.Called(ctx, virtualStorage, relativePath, primary)
+ val := args.Get(0)
+ var res map[string]struct{}
+ if val != nil {
+ res = val.(map[string]struct{})
+ }
+ return res, args.Error(1)
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index a98ab2bcf..7e5ac3f97 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -185,7 +185,9 @@ func defaultTxMgr(conf config.Config) *transactions.Manager {
}
func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ loggingEntry := testhelper.DiscardTestEntry(t)
+ sp := datastore.NewDirectStorageProvider(rs)
+ nodeMgr, err := nodes.NewManager(loggingEntry, conf, nil, rs, sp, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
return nodeMgr
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 59c8f4378..2b29cb321 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -60,6 +60,12 @@ func (s Shard) GetHealthySecondaries() []Node {
return healthySecondaries
}
+// StorageProvider abstracts the way we get storages (gitaly nodes).
+type StorageProvider interface {
+ // GetSyncedNodes returns list of gitaly storages that are in up to date state based on the generation tracking.
+ GetSyncedNodes(ctx context.Context, virtualStorageName, repoPath, primaryStorage string) []string
+}
+
// Manager is responsible for returning shards for virtual storages
type Manager interface {
GetShard(virtualStorageName string) (Shard, error)
@@ -102,6 +108,7 @@ type Mgr struct {
rs datastore.RepositoryStore
// nodes contains nodes by their virtual storages
nodes map[string][]Node
+ sp StorageProvider
}
// leaderElectionStrategy defines the interface by which primary and
@@ -142,6 +149,7 @@ func NewManager(
c config.Config,
db *sql.DB,
rs datastore.RepositoryStore,
+ sp StorageProvider,
latencyHistogram prommetrics.HistogramVec,
registry *protoregistry.Registry,
errorTracker tracker.ErrorTracker,
@@ -188,6 +196,7 @@ func NewManager(
strategies: strategies,
rs: rs,
nodes: nodes,
+ sp: sp,
}, nil
}
@@ -232,26 +241,17 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return shard.Primary, nil
}
- logger := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath})
- upToDateStorages, err := n.rs.GetConsistentSecondaries(ctx, virtualStorageName, repoPath, shard.Primary.GetStorage())
- if err != nil {
- // this is recoverable error - proceed with primary node
- logger.WithError(err).Warn("get up to date secondaries")
- }
-
- if len(upToDateStorages) == 0 {
- upToDateStorages = make(map[string]struct{}, 1)
- }
-
- // Primary should be considered as all other storages for serving read operations
- upToDateStorages[shard.Primary.GetStorage()] = struct{}{}
+ upToDateStorages := n.sp.GetSyncedNodes(ctx, virtualStorageName, repoPath, shard.Primary.GetStorage())
healthyStorages := make([]Node, 0, len(upToDateStorages))
- for upToDateStorage := range upToDateStorages {
+ for _, upToDateStorage := range upToDateStorages {
node, err := shard.GetNode(upToDateStorage)
if err != nil {
// this is recoverable error - proceed with with other nodes
- logger.WithError(err).Warn("storage returned as up-to-date")
+ ctxlogrus.Extract(ctx).
+ WithFields(logrus.Fields{"virtual_storage": virtualStorageName, "relative_path": repoPath}).
+ WithError(err).
+ Warn("storage was returned as up-to-date")
}
if !node.IsHealthy() {
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 453d6d1f0..bef621651 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -122,7 +122,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: config.ElectionStrategySQL},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -168,7 +168,7 @@ func TestDialWithUnhealthyNode(t *testing.T) {
srv, _ := testhelper.NewHealthServerWithListener(t, primaryLn)
defer srv.Stop()
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -216,10 +216,10 @@ func TestNodeManager(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
// monitoring period set to 1 hour as we execute health checks by hands in this test
@@ -317,9 +317,11 @@ func TestMgr_GetSyncedNode(t *testing.T) {
verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) {
conf.Failover.Enabled = failover
+ loggingEntry := testhelper.DiscardTestEntry(t)
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ sp := datastore.NewDirectStorageProvider(rs)
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nm, err := NewManager(loggingEntry, conf, nil, rs, sp, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
for i := range healthSrvs {
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 9fb08e983..114cfbe14 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -156,7 +156,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -346,7 +346,7 @@ func TestPropagateReplicationJob(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -638,7 +638,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -785,7 +785,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index efe665efa..a7f260b5f 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -80,12 +80,14 @@ func TestServerFactory(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logger, conf, nil, datastore.NewMemoryRepositoryStore(conf.StorageNames()), &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil)
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ sp := datastore.NewDirectStorageProvider(rs)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, sp, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
txMgr := transactions.NewManager(conf)
registry := protoregistry.GitalyProtoPreregistered
- rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+
coordinator := NewCoordinator(
queue,
rs,
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index a47aa68b3..58579da4d 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -740,7 +740,7 @@ func TestProxyWrites(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -921,8 +921,8 @@ func TestErrorThreshold(t *testing.T) {
require.NoError(t, err)
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
-
- nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), registry, errorTracker)
+ sp := datastore.NewDirectStorageProvider(rs)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, sp, promtest.NewMockHistogramVec(), registry, errorTracker)
require.NoError(t, err)
coordinator := NewCoordinator(