Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-11-08 17:10:46 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-11-08 17:10:46 +0300
commit12082c14c38c110bc9c9442017a9f83cd6e80674 (patch)
tree6daff04c625fba6eb5ee15b3a1033348845cfe1d
parent9acc2b74c8948ffd0b2ba48eafe51ad1a2bad75b (diff)
parent8f34243e3c9f87d30a3d3f6ba3e8174717a19892 (diff)
Merge branch 'smh-replication-job-replica-path' into 'master'
Use replica path when processing replication jobs See merge request gitlab-org/gitaly!3981
-rw-r--r--internal/praefect/coordinator_test.go3
-rw-r--r--internal/praefect/datastore/queue.go20
-rw-r--r--internal/praefect/datastore/repository_store.go38
-rw-r--r--internal/praefect/datastore/repository_store_mock.go12
-rw-r--r--internal/praefect/datastore/repository_store_test.go18
-rw-r--r--internal/praefect/replicator.go67
-rw-r--r--internal/praefect/replicator_pg_test.go2
-rw-r--r--internal/praefect/replicator_test.go34
8 files changed, 154 insertions, 40 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 6abfd8c7b..6dde3bbdd 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1567,6 +1567,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
// Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
// nodes will take part in transactions.
withRepoStore: datastore.MockRepositoryStore{
+ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
+ return repoProto.GetRelativePath(), nil
+ },
GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil
},
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index dd56493fc..a520fba1a 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -52,13 +52,19 @@ type ReplicationJob struct {
// RepositoryID is the ID of the repository this job relates to. RepositoryID
// may be 0 if the job doesn't relate to any known repository. This can happen
// for example when the job is deleting an orphaned replica of a deleted repository.
- RepositoryID int64 `json:"repository_id"`
- Change ChangeType `json:"change"`
- RelativePath string `json:"relative_path"`
- TargetNodeStorage string `json:"target_node_storage"`
- SourceNodeStorage string `json:"source_node_storage"`
- VirtualStorage string `json:"virtual_storage"`
- Params Params `json:"params"`
+ RepositoryID int64 `json:"repository_id"`
+ // ReplicaPath is the relative path where the replicas are stored in the Gitaly storages.
+ ReplicaPath string `json:"replica_path"`
+ Change ChangeType `json:"change"`
+ // RelativePath is the virtual relative path the client uses to access the repository on the
+ // virtual storage. The actual path that is used to store the repository on the disks is the
+ // ReplicaPath. This can be removed in the future but is still carried in the jobs as the
+ // replication queue locking depends on this.
+ RelativePath string `json:"relative_path"`
+ TargetNodeStorage string `json:"target_node_storage"`
+ SourceNodeStorage string `json:"source_node_storage"`
+ VirtualStorage string `json:"virtual_storage"`
+ Params Params `json:"params"`
}
func (job *ReplicationJob) Scan(value interface{}) error {
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index a7b4d9c2d..ddfb6db8e 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -89,6 +89,9 @@ type RepositoryStore interface {
// SetGeneration sets the repository's generation on the given storage. If the generation is higher
// than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error
+ // GetReplicaPath gets the replica path of a repository. Returns a commonerr.ErrRepositoryNotFound if a record
+ // for the repository ID is not found.
+ GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
// GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would
// downgrade, a DowngradeAttemptedError is returned.
GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
@@ -110,7 +113,7 @@ type RepositoryStore interface {
// the repository is not tracked by the Praefect datastore.
DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
// DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage.
- DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
+ DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
// RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well
// as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
// which has no record in the virtual storage or the storage.
@@ -445,18 +448,12 @@ GROUP BY replica_path
}
// DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
-func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath string, storage string) error {
- return rs.delete(ctx, `
+func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error {
+ result, err := rs.db.ExecContext(ctx, `
DELETE FROM storage_repositories
-WHERE virtual_storage = $1
-AND relative_path = $2
-AND storage = ANY($3::text[])
- `, virtualStorage, relativePath, []string{storage},
- )
-}
-
-func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath string, storages []string) error {
- result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, pq.StringArray(storages))
+WHERE repository_id = $1
+AND storage = $2
+ `, repositoryID, storage)
if err != nil {
return err
}
@@ -773,3 +770,20 @@ AND relative_path = $2
return id, nil
}
+
+// GetReplicaPath gets the replica path of a repository. Returns a commonerr.ErrRepositoryNotFound if a record
+// for the repository ID is not found.
+func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) {
+ var replicaPath string
+ if err := rs.db.QueryRowContext(
+ ctx, "SELECT replica_path FROM repositories WHERE repository_id = $1", repositoryID,
+ ).Scan(&replicaPath); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return "", commonerr.ErrRepositoryNotFound
+ }
+
+ return "", fmt.Errorf("scan: %w", err)
+ }
+
+ return replicaPath, nil
+}
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index d4526397f..426babd66 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -12,7 +12,7 @@ type MockRepositoryStore struct {
CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
- DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
@@ -21,6 +21,7 @@ type MockRepositoryStore struct {
RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error)
GetRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error)
+ GetReplicaPathFunc func(ctx context.Context, repositoryID int64) (string, error)
}
func (m MockRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) {
@@ -82,12 +83,12 @@ func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorag
}
// DeleteReplica runs the mock's DeleteReplicaFunc.
-func (m MockRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error {
if m.DeleteReplicaFunc == nil {
return nil
}
- return m.DeleteReplicaFunc(ctx, virtualStorage, relativePath, storage)
+ return m.DeleteReplicaFunc(ctx, repositoryID, storage)
}
func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error {
@@ -158,3 +159,8 @@ func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage
return m.GetRepositoryIDFunc(ctx, virtualStorage, relativePath)
}
+
+// GetReplicaPath returns the result of GetReplicaPathFunc or panics if it is unset.
+func (m MockRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) {
+ return m.GetReplicaPathFunc(ctx, repositoryID)
+}
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index cdc44678b..b5eeda410 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -843,7 +843,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
rs, requireState := newStore(t, nil)
t.Run("delete non-existing", func(t *testing.T) {
- require.Equal(t, ErrNoRowsAffected, rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1"))
+ require.Equal(t, ErrNoRowsAffected, rs.DeleteReplica(ctx, 1, "storage-1"))
})
t.Run("delete existing", func(t *testing.T) {
@@ -879,7 +879,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
},
)
- require.NoError(t, rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1"))
+ require.NoError(t, rs.DeleteReplica(ctx, 1, "storage-1"))
requireState(t, ctx,
virtualStorageState{
@@ -1202,6 +1202,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.Nil(t, err)
require.Equal(t, int64(1), id)
})
+
+ t.Run("GetReplicaPath", func(t *testing.T) {
+ rs, _ := newStore(t, nil)
+
+ replicaPath, err := rs.GetReplicaPath(ctx, 1)
+ require.Equal(t, err, commonerr.ErrRepositoryNotFound)
+ require.Empty(t, replicaPath)
+
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+
+ replicaPath, err = rs.GetReplicaPath(ctx, 1)
+ require.NoError(t, err)
+ require.Equal(t, replicaPath, repo)
+ })
}
func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T) {
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index a6f411cc6..147b1186e 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -56,12 +56,12 @@ type defaultReplicator struct {
func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
targetRepository := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
sourceRepository := &gitalypb.Repository{
StorageName: event.Job.SourceNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
logger := dr.log.WithFields(logrus.Fields{
@@ -146,7 +146,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -159,7 +159,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
// If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual
// storage but having one for the storage. We can later retry the deletion.
- if err := dr.rs.DeleteReplica(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil {
+ if err := dr.rs.DeleteReplica(ctx, event.Job.RepositoryID, event.Job.TargetNodeStorage); err != nil {
if !errors.Is(err, datastore.ErrNoRowsAffected) {
return err
}
@@ -217,7 +217,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat
func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
createBitmap, err := event.Job.Params.GetBool("CreateBitmap")
@@ -246,7 +246,7 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.
func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -261,7 +261,7 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datasto
func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -276,7 +276,7 @@ func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.Replica
func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
allRefs, err := event.Job.Params.GetBool("AllRefs")
@@ -299,7 +299,7 @@ func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.Replic
func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
val, found := event.Job.Params["SplitStrategy"]
@@ -335,7 +335,7 @@ func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastor
func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -352,7 +352,7 @@ func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.Repl
func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -369,7 +369,7 @@ func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datast
func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
- RelativePath: event.Job.RelativePath,
+ RelativePath: event.Job.ReplicaPath,
}
createBitmap, err := event.Job.Params.GetBool("CreateBitmap")
@@ -403,6 +403,7 @@ type ReplMgr struct {
replJobTimeout time.Duration
dequeueBatchSize uint
parallelStorageProcessingWorkers uint
+ repositoryStore datastore.RepositoryStore
}
// ReplMgrOpt allows a replicator to be configured with additional options
@@ -457,6 +458,7 @@ func NewReplMgr(log *logrus.Entry, storageNames map[string][]string, queue datas
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
parallelStorageProcessingWorkers: 1,
+ repositoryStore: rs,
}
for _, opt := range opts {
@@ -740,6 +742,42 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger,
return newState
}
+// backfillReplicaPath backfills the replica path in the replication job. As of 14.5, not all jobs are guaranteed
+// to have a replica path on them yet. There are few special cased jobs which won't be scheduled anymore in 14.6 a
+// and thus do not need to use replica paths.
+func (r ReplMgr) backfillReplicaPath(ctx context.Context, event datastore.ReplicationEvent) (string, error) {
+ switch {
+ // The reconciler scheduled DeleteReplica jobs which are missing repository ID. 14.5 has
+ // dropped this logic and doesn't leave orphaned records any more as Praefect has a walker
+ // to identify stale replicas. Any jobs still in flight have been scheduled prior to 14.4 and
+ // should be handled in the old manner.
+ case event.Job.Change == datastore.DeleteReplica && event.Job.RepositoryID == 0:
+ fallthrough
+ // 14.5 also doesn't schedule DeleteRepo jobs. Any jobs are again old jobs in-flight.
+ // The repository ID in delete jobs scheduled in 14.4 won't be present anymore at the time the
+ // replication job is being executed, as the the 'repositories' record is deleted. Given that,
+ // it's not possible to get the replica path. In 14.4, Praefect intercepts deletes and handles
+ // them without scheduling replication jobs. The 'delete' jobs still in flight are handled as before
+ // for backwards compatibility.
+ case event.Job.Change == datastore.DeleteRepo:
+ fallthrough
+ // RenameRepo doesn't need to use repository ID as the RenameRepository RPC
+ // call will be intercepted in 14.6 by Praefect to perform an atomic rename in
+ // the database. Any jobs still in flight are from 14.5 and older, and should be
+ // handled in the old manner. We'll use the relative path from the replication job
+ // for the backwards compatible handling.
+ case event.Job.Change == datastore.RenameRepo:
+ return event.Job.RelativePath, nil
+ default:
+ replicaPath, err := r.repositoryStore.GetReplicaPath(ctx, event.Job.RepositoryID)
+ if err != nil {
+ return "", fmt.Errorf("get replica path: %w", err)
+ }
+
+ return replicaPath, nil
+ }
+}
+
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
var cancel func()
@@ -759,6 +797,11 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
defer inFlightGauge.Dec()
var err error
+ event.Job.ReplicaPath, err = r.backfillReplicaPath(ctx, event)
+ if err != nil {
+ return fmt.Errorf("choose replica path: %w", err)
+ }
+
switch event.Job.Change {
case datastore.UpdateRepo:
source, ok := r.nodes[event.Job.VirtualStorage][event.Job.SourceNodeStorage]
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index c5d23154d..e99ceff9a 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -61,6 +61,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: 1,
+ ReplicaPath: "relative-path-1",
VirtualStorage: "virtual-storage-1",
RelativePath: "relative-path-1",
SourceNodeStorage: "gitaly-1",
@@ -114,6 +115,7 @@ func TestReplicatorDestroy(t *testing.T) {
ctx,
datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ ReplicaPath: "relative-path-1",
Change: tc.change,
VirtualStorage: "virtual-storage-1",
RelativePath: "relative-path-1",
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 62f5524e0..8d6857656 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -123,10 +123,12 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
require.NoError(t, err)
require.Len(t, shard.Secondaries, 1)
+ const repositoryID = 1
var events []datastore.ReplicationEvent
for _, secondary := range shard.Secondaries {
events = append(events, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: repositoryID,
VirtualStorage: conf.VirtualStorages[0].Name,
Change: datastore.UpdateRepo,
TargetNodeStorage: secondary.GetStorage(),
@@ -158,11 +160,15 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
_, err = queue.Enqueue(ctx, events[0])
require.NoError(t, err)
+ db := glsql.NewDB(t)
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
+
replMgr := NewReplMgr(
loggerEntry,
conf.StorageNames(),
queue,
- datastore.MockRepositoryStore{},
+ rs,
nodeMgr,
NodeSetFromNodeManager(nodeMgr),
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
@@ -254,6 +260,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ ReplicaPath: "relative-path-1",
VirtualStorage: "virtual-storage-1",
RelativePath: "relative-path-1",
SourceNodeStorage: "gitaly-1",
@@ -330,6 +337,9 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
return repositoryRelativePath, nil, nil
},
+ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
+ return repositoryRelativePath, nil
+ },
}
coordinator := NewCoordinator(
@@ -682,6 +692,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.UpdateRepo,
RelativePath: testRepo.RelativePath,
TargetNodeStorage: secondary.Storage,
@@ -706,11 +717,15 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
+ db := glsql.NewDB(t)
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
+
replMgr := NewReplMgr(
logEntry,
conf.StorageNames(),
queueInterceptor,
- datastore.MockRepositoryStore{},
+ rs,
nodeMgr,
NodeSetFromNodeManager(nodeMgr),
)
@@ -799,6 +814,7 @@ func TestProcessBacklog_Success(t *testing.T) {
// Update replication job
eventType1 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.UpdateRepo,
RelativePath: testRepo.GetRelativePath(),
TargetNodeStorage: secondary.Storage,
@@ -857,11 +873,15 @@ func TestProcessBacklog_Success(t *testing.T) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
+ db := glsql.NewDB(t)
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
+
replMgr := NewReplMgr(
logEntry,
conf.StorageNames(),
queueInterceptor,
- datastore.MockRepositoryStore{},
+ rs,
nodeMgr,
NodeSetFromNodeManager(nodeMgr),
)
@@ -966,6 +986,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
const virtualStorage = "virtal-storage"
const primaryStorage = "storage-1"
const secondaryStorage = "storage-2"
+ const repositoryID = 1
primaryConn := &grpc.ClientConn{}
secondaryConn := &grpc.ClientConn{}
@@ -985,6 +1006,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
_, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
Change: datastore.UpdateRepo,
RelativePath: "ignored",
TargetNodeStorage: primaryStorage,
@@ -994,11 +1016,15 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
})
require.NoError(t, err)
+ db := glsql.NewDB(t)
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
+
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.StorageNames(),
queue,
- datastore.MockRepositoryStore{},
+ rs,
StaticHealthChecker{virtualStorage: {primaryStorage, secondaryStorage}},
NodeSet{virtualStorage: {
primaryStorage: {Storage: primaryStorage, Connection: primaryConn},