diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-11-08 17:10:46 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-11-08 17:10:46 +0300 |
commit | 12082c14c38c110bc9c9442017a9f83cd6e80674 (patch) | |
tree | 6daff04c625fba6eb5ee15b3a1033348845cfe1d | |
parent | 9acc2b74c8948ffd0b2ba48eafe51ad1a2bad75b (diff) | |
parent | 8f34243e3c9f87d30a3d3f6ba3e8174717a19892 (diff) |
Merge branch 'smh-replication-job-replica-path' into 'master'
Use replica path when processing replication jobs
See merge request gitlab-org/gitaly!3981
-rw-r--r-- | internal/praefect/coordinator_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 20 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 38 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 12 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 67 | ||||
-rw-r--r-- | internal/praefect/replicator_pg_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 34 |
8 files changed, 154 insertions, 40 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 6abfd8c7b..6dde3bbdd 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1567,6 +1567,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. withRepoStore: datastore.MockRepositoryStore{ + GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { + return repoProto.GetRelativePath(), nil + }, GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil }, diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index dd56493fc..a520fba1a 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -52,13 +52,19 @@ type ReplicationJob struct { // RepositoryID is the ID of the repository this job relates to. RepositoryID // may be 0 if the job doesn't relate to any known repository. This can happen // for example when the job is deleting an orphaned replica of a deleted repository. - RepositoryID int64 `json:"repository_id"` - Change ChangeType `json:"change"` - RelativePath string `json:"relative_path"` - TargetNodeStorage string `json:"target_node_storage"` - SourceNodeStorage string `json:"source_node_storage"` - VirtualStorage string `json:"virtual_storage"` - Params Params `json:"params"` + RepositoryID int64 `json:"repository_id"` + // ReplicaPath is the relative path where the replicas are stored in the Gitaly storages. + ReplicaPath string `json:"replica_path"` + Change ChangeType `json:"change"` + // RelativePath is the virtual relative path the client uses to access the repository on the + // virtual storage. The actual path that is used to store the repository on the disks is the + // ReplicaPath. This can be removed in the future but is still carried in the jobs as the + // replication queue locking depends on this. + RelativePath string `json:"relative_path"` + TargetNodeStorage string `json:"target_node_storage"` + SourceNodeStorage string `json:"source_node_storage"` + VirtualStorage string `json:"virtual_storage"` + Params Params `json:"params"` } func (job *ReplicationJob) Scan(value interface{}) error { diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index a7b4d9c2d..ddfb6db8e 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -89,6 +89,9 @@ type RepositoryStore interface { // SetGeneration sets the repository's generation on the given storage. If the generation is higher // than the virtual storage's generation, it is set to match as well to guarantee monotonic increments. SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error + // GetReplicaPath gets the replica path of a repository. Returns a commonerr.ErrRepositoryNotFound if a record + // for the repository ID is not found. + GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) // GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would // downgrade, a DowngradeAttemptedError is returned. GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) @@ -110,7 +113,7 @@ type RepositoryStore interface { // the repository is not tracked by the Praefect datastore. DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) // DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage. - DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error + DeleteReplica(ctx context.Context, repositoryID int64, storage string) error // RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository // which has no record in the virtual storage or the storage. @@ -445,18 +448,12 @@ GROUP BY replica_path } // DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details. -func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath string, storage string) error { - return rs.delete(ctx, ` +func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error { + result, err := rs.db.ExecContext(ctx, ` DELETE FROM storage_repositories -WHERE virtual_storage = $1 -AND relative_path = $2 -AND storage = ANY($3::text[]) - `, virtualStorage, relativePath, []string{storage}, - ) -} - -func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath string, storages []string) error { - result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, pq.StringArray(storages)) +WHERE repository_id = $1 +AND storage = $2 + `, repositoryID, storage) if err != nil { return err } @@ -773,3 +770,20 @@ AND relative_path = $2 return id, nil } + +// GetReplicaPath gets the replica path of a repository. Returns a commonerr.ErrRepositoryNotFound if a record +// for the repository ID is not found. +func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) { + var replicaPath string + if err := rs.db.QueryRowContext( + ctx, "SELECT replica_path FROM repositories WHERE repository_id = $1", repositoryID, + ).Scan(&replicaPath); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", commonerr.ErrRepositoryNotFound + } + + return "", fmt.Errorf("scan: %w", err) + } + + return replicaPath, nil +} diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index d4526397f..426babd66 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -12,7 +12,7 @@ type MockRepositoryStore struct { CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) - DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) @@ -21,6 +21,7 @@ type MockRepositoryStore struct { RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) GetRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) + GetReplicaPathFunc func(ctx context.Context, repositoryID int64) (string, error) } func (m MockRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) { @@ -82,12 +83,12 @@ func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorag } // DeleteReplica runs the mock's DeleteReplicaFunc. -func (m MockRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error { if m.DeleteReplicaFunc == nil { return nil } - return m.DeleteReplicaFunc(ctx, virtualStorage, relativePath, storage) + return m.DeleteReplicaFunc(ctx, repositoryID, storage) } func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { @@ -158,3 +159,8 @@ func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage return m.GetRepositoryIDFunc(ctx, virtualStorage, relativePath) } + +// GetReplicaPath returns the result of GetReplicaPathFunc or panics if it is unset. +func (m MockRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) { + return m.GetReplicaPathFunc(ctx, repositoryID) +} diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index cdc44678b..b5eeda410 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -843,7 +843,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { rs, requireState := newStore(t, nil) t.Run("delete non-existing", func(t *testing.T) { - require.Equal(t, ErrNoRowsAffected, rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1")) + require.Equal(t, ErrNoRowsAffected, rs.DeleteReplica(ctx, 1, "storage-1")) }) t.Run("delete existing", func(t *testing.T) { @@ -879,7 +879,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, ) - require.NoError(t, rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1")) + require.NoError(t, rs.DeleteReplica(ctx, 1, "storage-1")) requireState(t, ctx, virtualStorageState{ @@ -1202,6 +1202,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Nil(t, err) require.Equal(t, int64(1), id) }) + + t.Run("GetReplicaPath", func(t *testing.T) { + rs, _ := newStore(t, nil) + + replicaPath, err := rs.GetReplicaPath(ctx, 1) + require.Equal(t, err, commonerr.ErrRepositoryNotFound) + require.Empty(t, replicaPath) + + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + + replicaPath, err = rs.GetReplicaPath(ctx, 1) + require.NoError(t, err) + require.Equal(t, replicaPath, repo) + }) } func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T) { diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index a6f411cc6..147b1186e 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -56,12 +56,12 @@ type defaultReplicator struct { func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error { targetRepository := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } sourceRepository := &gitalypb.Repository{ StorageName: event.Job.SourceNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } logger := dr.log.WithFields(logrus.Fields{ @@ -146,7 +146,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -159,7 +159,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica // If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual // storage but having one for the storage. We can later retry the deletion. - if err := dr.rs.DeleteReplica(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil { + if err := dr.rs.DeleteReplica(ctx, event.Job.RepositoryID, event.Job.TargetNodeStorage); err != nil { if !errors.Is(err, datastore.ErrNoRowsAffected) { return err } @@ -217,7 +217,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } createBitmap, err := event.Job.Params.GetBool("CreateBitmap") @@ -246,7 +246,7 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore. func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -261,7 +261,7 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datasto func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -276,7 +276,7 @@ func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.Replica func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } allRefs, err := event.Job.Params.GetBool("AllRefs") @@ -299,7 +299,7 @@ func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.Replic func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } val, found := event.Job.Params["SplitStrategy"] @@ -335,7 +335,7 @@ func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastor func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -352,7 +352,7 @@ func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.Repl func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -369,7 +369,7 @@ func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datast func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.RelativePath, + RelativePath: event.Job.ReplicaPath, } createBitmap, err := event.Job.Params.GetBool("CreateBitmap") @@ -403,6 +403,7 @@ type ReplMgr struct { replJobTimeout time.Duration dequeueBatchSize uint parallelStorageProcessingWorkers uint + repositoryStore datastore.RepositoryStore } // ReplMgrOpt allows a replicator to be configured with additional options @@ -457,6 +458,7 @@ func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datas replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), dequeueBatchSize: config.DefaultReplicationConfig().BatchSize, parallelStorageProcessingWorkers: 1, + repositoryStore: rs, } for _, opt := range opts { @@ -740,6 +742,42 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, return newState } +// backfillReplicaPath backfills the replica path in the replication job. As of 14.5, not all jobs are guaranteed +// to have a replica path on them yet. There are few special cased jobs which won't be scheduled anymore in 14.6 a +// and thus do not need to use replica paths. +func (r ReplMgr) backfillReplicaPath(ctx context.Context, event datastore.ReplicationEvent) (string, error) { + switch { + // The reconciler scheduled DeleteReplica jobs which are missing repository ID. 14.5 has + // dropped this logic and doesn't leave orphaned records any more as Praefect has a walker + // to identify stale replicas. Any jobs still in flight have been scheduled prior to 14.4 and + // should be handled in the old manner. + case event.Job.Change == datastore.DeleteReplica && event.Job.RepositoryID == 0: + fallthrough + // 14.5 also doesn't schedule DeleteRepo jobs. Any jobs are again old jobs in-flight. + // The repository ID in delete jobs scheduled in 14.4 won't be present anymore at the time the + // replication job is being executed, as the the 'repositories' record is deleted. Given that, + // it's not possible to get the replica path. In 14.4, Praefect intercepts deletes and handles + // them without scheduling replication jobs. The 'delete' jobs still in flight are handled as before + // for backwards compatibility. + case event.Job.Change == datastore.DeleteRepo: + fallthrough + // RenameRepo doesn't need to use repository ID as the RenameRepository RPC + // call will be intercepted in 14.6 by Praefect to perform an atomic rename in + // the database. Any jobs still in flight are from 14.5 and older, and should be + // handled in the old manner. We'll use the relative path from the replication job + // for the backwards compatible handling. + case event.Job.Change == datastore.RenameRepo: + return event.Job.RelativePath, nil + default: + replicaPath, err := r.repositoryStore.GetReplicaPath(ctx, event.Job.RepositoryID) + if err != nil { + return "", fmt.Errorf("get replica path: %w", err) + } + + return replicaPath, nil + } +} + func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { var cancel func() @@ -759,6 +797,11 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re defer inFlightGauge.Dec() var err error + event.Job.ReplicaPath, err = r.backfillReplicaPath(ctx, event) + if err != nil { + return fmt.Errorf("choose replica path: %w", err) + } + switch event.Job.Change { case datastore.UpdateRepo: source, ok := r.nodes[event.Job.VirtualStorage][event.Job.SourceNodeStorage] diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index c5d23154d..e99ceff9a 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -61,6 +61,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RepositoryID: 1, + ReplicaPath: "relative-path-1", VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "gitaly-1", @@ -114,6 +115,7 @@ func TestReplicatorDestroy(t *testing.T) { ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + ReplicaPath: "relative-path-1", Change: tc.change, VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 62f5524e0..8d6857656 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -123,10 +123,12 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { require.NoError(t, err) require.Len(t, shard.Secondaries, 1) + const repositoryID = 1 var events []datastore.ReplicationEvent for _, secondary := range shard.Secondaries { events = append(events, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, VirtualStorage: conf.VirtualStorages[0].Name, Change: datastore.UpdateRepo, TargetNodeStorage: secondary.GetStorage(), @@ -158,11 +160,15 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { _, err = queue.Enqueue(ctx, events[0]) require.NoError(t, err) + db := glsql.NewDB(t) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false)) + replMgr := NewReplMgr( loggerEntry, conf.StorageNames(), queue, - datastore.MockRepositoryStore{}, + rs, nodeMgr, NodeSetFromNodeManager(nodeMgr), WithLatencyMetric(&mockReplicationLatencyHistogramVec), @@ -254,6 +260,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + ReplicaPath: "relative-path-1", VirtualStorage: "virtual-storage-1", RelativePath: "relative-path-1", SourceNodeStorage: "gitaly-1", @@ -330,6 +337,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return repositoryRelativePath, nil, nil }, + GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { + return repositoryRelativePath, nil + }, } coordinator := NewCoordinator( @@ -682,6 +692,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { // this job exists to verify that replication works okJob := datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.UpdateRepo, RelativePath: testRepo.RelativePath, TargetNodeStorage: secondary.Storage, @@ -706,11 +717,15 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() + db := glsql.NewDB(t) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false)) + replMgr := NewReplMgr( logEntry, conf.StorageNames(), queueInterceptor, - datastore.MockRepositoryStore{}, + rs, nodeMgr, NodeSetFromNodeManager(nodeMgr), ) @@ -799,6 +814,7 @@ func TestProcessBacklog_Success(t *testing.T) { // Update replication job eventType1 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.UpdateRepo, RelativePath: testRepo.GetRelativePath(), TargetNodeStorage: secondary.Storage, @@ -857,11 +873,15 @@ func TestProcessBacklog_Success(t *testing.T) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() + db := glsql.NewDB(t) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false)) + replMgr := NewReplMgr( logEntry, conf.StorageNames(), queueInterceptor, - datastore.MockRepositoryStore{}, + rs, nodeMgr, NodeSetFromNodeManager(nodeMgr), ) @@ -966,6 +986,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { const virtualStorage = "virtal-storage" const primaryStorage = "storage-1" const secondaryStorage = "storage-2" + const repositoryID = 1 primaryConn := &grpc.ClientConn{} secondaryConn := &grpc.ClientConn{} @@ -985,6 +1006,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: 1, Change: datastore.UpdateRepo, RelativePath: "ignored", TargetNodeStorage: primaryStorage, @@ -994,11 +1016,15 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { }) require.NoError(t, err) + db := glsql.NewDB(t) + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false)) + replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), conf.StorageNames(), queue, - datastore.MockRepositoryStore{}, + rs, StaticHealthChecker{virtualStorage: {primaryStorage, secondaryStorage}}, NodeSet{virtualStorage: { primaryStorage: {Storage: primaryStorage, Connection: primaryConn}, |