diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-11-17 15:36:36 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-11-17 15:36:36 +0300 |
commit | f869e2716122b17ce78508aacf981a098406d2d7 (patch) | |
tree | e380f87f34d1942053f9a45eee6b4b854bc06b85 | |
parent | b8a027e4ea96235545000da5ff35d05a91b4370b (diff) | |
parent | 6da7ee5a3db7e6511dcf7b794e161439805e22ff (diff) |
Merge branch 'ps-up-to-date-storages-provider' into 'master'
Introduction of storages provider to get up to date storages
See merge request gitlab-org/gitaly!2787
-rw-r--r-- | cmd/praefect/main.go | 4 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider.go | 63 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider_test.go | 107 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 30 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 6 |
11 files changed, 219 insertions, 39 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 51b6aba51..78f597443 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -260,7 +260,9 @@ func run(cfgs []starter.Config, conf config.Config) error { } } - nodeManager, err := nodes.NewManager(logger, conf, db, rs, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker) + sp := datastore.NewDirectStorageProvider(rs) + + nodeManager, err := nodes.NewManager(logger, conf, db, rs, sp, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker) if err != nil { return err } diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 27d19de5d..ac73e07e3 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -169,7 +169,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, queue := datastore.NewMemoryReplicationEventQueue(conf) rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, rs, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) txMgr := transactions.NewManager(conf) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 8080d3f86..436f7b859 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -152,7 +152,7 @@ func TestStreamDirectorMutator(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -332,7 +332,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -473,7 +473,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { RelativePath: "/path/to/hashed/storage", } - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -594,7 +594,7 @@ func TestStreamDirectorAccessor(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Minute) @@ -684,7 +684,9 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { require.NoError(t, err) require.NoError(t, repoStore.SetGeneration(ctx, conf.VirtualStorages[0].Name, targetRepo.RelativePath, secondaryNodeConf.Storage, generation)) - nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + sp := datastore.NewDirectStorageProvider(repoStore) + + nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, sp, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Minute) @@ -883,7 +885,7 @@ func TestAbsentCorrelationID(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -1010,7 +1012,7 @@ func TestStreamDirectorStorageScope(t *testing.T) { rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Second) coordinator := NewCoordinator( diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go new file mode 100644 index 000000000..57f11fc73 --- /dev/null +++ b/internal/praefect/datastore/storage_provider.go @@ -0,0 +1,63 @@ +package datastore + +import ( + "context" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus" +) + +// SecondariesProvider should provide information about secondary storages. +type SecondariesProvider interface { + // GetConsistentSecondaries returns all secondaries with the same generation as the primary. + GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) +} + +// DirectStorageProvider provides the latest state of the synced nodes. +type DirectStorageProvider struct { + sp SecondariesProvider + errorsTotal *prometheus.CounterVec +} + +// NewDirectStorageProvider returns a new storage provider. +func NewDirectStorageProvider(sp SecondariesProvider) *DirectStorageProvider { + csp := &DirectStorageProvider{ + sp: sp, + errorsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_uptodate_storages_errors_total", + Help: "Total number of errors raised during defining up to date storages for reads distribution", + }, + []string{"type"}, + ), + } + + return csp +} + +func (c *DirectStorageProvider) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(c, descs) +} + +func (c *DirectStorageProvider) Collect(collector chan<- prometheus.Metric) { + c.errorsTotal.Collect(collector) +} + +func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) []string { + upToDateStorages, err := c.sp.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primaryStorage) + if err != nil { + c.errorsTotal.WithLabelValues("retrieve").Inc() + // this is recoverable error - we can proceed with primary node + ctxlogrus.Extract(ctx).WithError(err).Warn("get consistent secondaries") + return []string{primaryStorage} + } + + storages := make([]string, 0, len(upToDateStorages)+1) + for upToDateStorage := range upToDateStorages { + if upToDateStorage != primaryStorage { + storages = append(storages, upToDateStorage) + } + } + + return append(storages, primaryStorage) +} diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go new file mode 100644 index 000000000..1bb5b8691 --- /dev/null +++ b/internal/praefect/datastore/storage_provider_test.go @@ -0,0 +1,107 @@ +package datastore + +import ( + "context" + "strings" + "testing" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestDirectStorageProvider_GetSyncedNodes(t *testing.T) { + getCtx := func() (context.Context, context.CancelFunc) { + ctx, cancel := testhelper.Context() + + logger := testhelper.DiscardTestEntry(t) + ctx = ctxlogrus.ToContext(ctx, logger) + return ctx, cancel + } + + t.Run("ok", func(t *testing.T) { + ctx, cancel := getCtx() + defer cancel() + + for _, tc := range []struct { + desc string + ret map[string]struct{} + exp []string + }{ + { + desc: "primary included", + ret: map[string]struct{}{"g2": {}, "g3": {}}, + exp: []string{"g1", "g2", "g3"}, + }, + { + desc: "distinct values", + ret: map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, + exp: []string{"g1", "g2", "g3"}, + }, + { + desc: "none", + ret: nil, + exp: []string{"g1"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + rs := &mockConsistentSecondariesProvider{} + rs.On("GetConsistentSecondaries", ctx, "vs", "/repo/path", "g1").Return(tc.ret, nil) + + sp := NewDirectStorageProvider(rs) + storages := sp.GetSyncedNodes(ctx, "vs", "/repo/path", "g1") + require.ElementsMatch(t, tc.exp, storages) + }) + } + }) + + t.Run("repository store returns an error", func(t *testing.T) { + ctx, cancel := getCtx() + defer cancel() + + logger := ctxlogrus.Extract(ctx) + logHook := test.NewLocal(logger.Logger) + + rs := &mockConsistentSecondariesProvider{} + rs.On("GetConsistentSecondaries", ctx, "vs", "/repo/path", "g1"). + Return(nil, assert.AnError). + Once() + + sp := NewDirectStorageProvider(rs) + + storages := sp.GetSyncedNodes(ctx, "vs", "/repo/path", "g1") + require.ElementsMatch(t, []string{"g1"}, storages) + + require.Len(t, logHook.AllEntries(), 1) + require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message) + require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data) + require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level) + + // "populate" metric is not set as there was an error and we don't want this result to be cached + err := testutil.CollectAndCompare(sp, strings.NewReader(` + # HELP gitaly_praefect_uptodate_storages_errors_total Total number of errors raised during defining up to date storages for reads distribution + # TYPE gitaly_praefect_uptodate_storages_errors_total counter + gitaly_praefect_uptodate_storages_errors_total{type="retrieve"} 1 + `)) + require.NoError(t, err) + }) +} + +type mockConsistentSecondariesProvider struct { + mock.Mock +} + +func (m *mockConsistentSecondariesProvider) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + args := m.Called(ctx, virtualStorage, relativePath, primary) + val := args.Get(0) + var res map[string]struct{} + if val != nil { + res = val.(map[string]struct{}) + } + return res, args.Error(1) +} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index a98ab2bcf..7e5ac3f97 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -185,7 +185,9 @@ func defaultTxMgr(conf config.Config) *transactions.Manager { } func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager { - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + loggingEntry := testhelper.DiscardTestEntry(t) + sp := datastore.NewDirectStorageProvider(rs) + nodeMgr, err := nodes.NewManager(loggingEntry, conf, nil, rs, sp, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) return nodeMgr diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 59c8f4378..2b29cb321 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -60,6 +60,12 @@ func (s Shard) GetHealthySecondaries() []Node { return healthySecondaries } +// StorageProvider abstracts the way we get storages (gitaly nodes). +type StorageProvider interface { + // GetSyncedNodes returns list of gitaly storages that are in up to date state based on the generation tracking. + GetSyncedNodes(ctx context.Context, virtualStorageName, repoPath, primaryStorage string) []string +} + // Manager is responsible for returning shards for virtual storages type Manager interface { GetShard(virtualStorageName string) (Shard, error) @@ -102,6 +108,7 @@ type Mgr struct { rs datastore.RepositoryStore // nodes contains nodes by their virtual storages nodes map[string][]Node + sp StorageProvider } // leaderElectionStrategy defines the interface by which primary and @@ -142,6 +149,7 @@ func NewManager( c config.Config, db *sql.DB, rs datastore.RepositoryStore, + sp StorageProvider, latencyHistogram prommetrics.HistogramVec, registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, @@ -188,6 +196,7 @@ func NewManager( strategies: strategies, rs: rs, nodes: nodes, + sp: sp, }, nil } @@ -232,26 +241,17 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st return shard.Primary, nil } - logger := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath}) - upToDateStorages, err := n.rs.GetConsistentSecondaries(ctx, virtualStorageName, repoPath, shard.Primary.GetStorage()) - if err != nil { - // this is recoverable error - proceed with primary node - logger.WithError(err).Warn("get up to date secondaries") - } - - if len(upToDateStorages) == 0 { - upToDateStorages = make(map[string]struct{}, 1) - } - - // Primary should be considered as all other storages for serving read operations - upToDateStorages[shard.Primary.GetStorage()] = struct{}{} + upToDateStorages := n.sp.GetSyncedNodes(ctx, virtualStorageName, repoPath, shard.Primary.GetStorage()) healthyStorages := make([]Node, 0, len(upToDateStorages)) - for upToDateStorage := range upToDateStorages { + for _, upToDateStorage := range upToDateStorages { node, err := shard.GetNode(upToDateStorage) if err != nil { // this is recoverable error - proceed with with other nodes - logger.WithError(err).Warn("storage returned as up-to-date") + ctxlogrus.Extract(ctx). + WithFields(logrus.Fields{"virtual_storage": virtualStorageName, "relative_path": repoPath}). + WithError(err). + Warn("storage was returned as up-to-date") } if !node.IsHealthy() { diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 453d6d1f0..bef621651 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -122,7 +122,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { Failover: config.Failover{Enabled: false, ElectionStrategy: config.ElectionStrategySQL}, VirtualStorages: []*config.VirtualStorage{virtualStorage}, } - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nm.Start(time.Millisecond, time.Millisecond) @@ -168,7 +168,7 @@ func TestDialWithUnhealthyNode(t *testing.T) { srv, _ := testhelper.NewHealthServerWithListener(t, primaryLn) defer srv.Stop() - mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) mgr.Start(1*time.Millisecond, 1*time.Millisecond) @@ -216,10 +216,10 @@ func TestNodeManager(t *testing.T) { } mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) + nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) - nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) + nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) // monitoring period set to 1 hour as we execute health checks by hands in this test @@ -317,9 +317,11 @@ func TestMgr_GetSyncedNode(t *testing.T) { verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { conf.Failover.Enabled = failover + loggingEntry := testhelper.DiscardTestEntry(t) rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + sp := datastore.NewDirectStorageProvider(rs) - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nm, err := NewManager(loggingEntry, conf, nil, rs, sp, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) for i := range healthSrvs { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 9fb08e983..114cfbe14 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -156,7 +156,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -346,7 +346,7 @@ func TestPropagateReplicationJob(t *testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -638,7 +638,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -785,7 +785,7 @@ func TestProcessBacklog_Success(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index efe665efa..a7f260b5f 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -80,12 +80,14 @@ func TestServerFactory(t *testing.T) { logger := testhelper.DiscardTestEntry(t) queue := datastore.NewMemoryReplicationEventQueue(conf) - nodeMgr, err := nodes.NewManager(logger, conf, nil, datastore.NewMemoryRepositoryStore(conf.StorageNames()), &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil) + rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + sp := datastore.NewDirectStorageProvider(rs) + nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, sp, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Second) txMgr := transactions.NewManager(conf) registry := protoregistry.GitalyProtoPreregistered - rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + coordinator := NewCoordinator( queue, rs, diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index a47aa68b3..58579da4d 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -740,7 +740,7 @@ func TestProxyWrites(t *testing.T) { queue := datastore.NewMemoryReplicationEventQueue(conf) entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) @@ -921,8 +921,8 @@ func TestErrorThreshold(t *testing.T) { require.NoError(t, err) rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) - - nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), registry, errorTracker) + sp := datastore.NewDirectStorageProvider(rs) + nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, sp, promtest.NewMockHistogramVec(), registry, errorTracker) require.NoError(t, err) coordinator := NewCoordinator( |