diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-17 18:35:33 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-18 10:17:28 +0300 |
commit | c3ef20fedcc54c5ff6c735b9ecf49b1e0dee4c65 (patch) | |
tree | eb3c28e71522749837956ff2c8469f4252775d1a | |
parent | 08024c36dcedbbc8f0ad172737cc4b6351522eba (diff) |
implement 'delete_replica' replication job type
This commit adds a 'delete_replica' replication job type. It differs
from 'delete' type by only deleting the specific replica without marking
the repository deleted from the virtual storage completely.
While we could update 'delete' to be handled in a similar manner, this
would require us to do the work over multiple releases as old Praefect's
could delete repositories completely during an upgrade. For this reason,
let's implement a new job type on the side an remove the old 'delete' type
in future.
-rw-r--r-- | internal/praefect/datastore/datastore.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 18 | ||||
-rw-r--r-- | internal/praefect/replicator_pg_test.go | 59 |
3 files changed, 75 insertions, 4 deletions
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index fa5996044..082719aed 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -45,6 +45,8 @@ const ( CreateRepo = ChangeType("create") // DeleteRepo is when a replication deletes a repo DeleteRepo = ChangeType("delete") + // DeleteReplica change type indicates that the targeted replica is due for deletion. + DeleteReplica = ChangeType("delete_replica") // RenameRepo is when a replication renames repo RenameRepo = ChangeType("rename") // GarbageCollect is when replication runs gc diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 5dfa4806d..240c59ae3 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -149,16 +149,26 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica return err } + var deleteFunc func(context.Context, string, string, string) error + switch event.Job.Change { + case datastore.DeleteRepo: + deleteFunc = dr.rs.DeleteRepository + case datastore.DeleteReplica: + deleteFunc = dr.rs.DeleteReplica + default: + return fmt.Errorf("unknown change type: %q", event.Job.Change) + } + // If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual // storage but having one for the storage. We can later retry the deletion. - if err := dr.rs.DeleteRepository(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil { + if err := deleteFunc(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil { if !errors.Is(err, datastore.RepositoryNotExistsError{}) { return err } dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). WithError(err). - Info("replicated repository delete does not have a store entry") + Info("deleted repository did not have a store entry") } return nil @@ -598,13 +608,13 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re return fmt.Errorf("no connection to source node %q/%q", event.Job.VirtualStorage, event.Job.SourceNodeStorage) } - ctx, err := helper.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token) + ctx, err = helper.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token) if err != nil { return fmt.Errorf("inject Gitaly servers into context: %w", err) } err = r.replicator.Replicate(ctx, event, source.Connection, targetCC) - case datastore.DeleteRepo: + case datastore.DeleteRepo, datastore.DeleteReplica: err = r.replicator.Destroy(ctx, event, targetCC) case datastore.RenameRepo: err = r.replicator.Rename(ctx, event, targetCC) diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index 55bf5a244..934e4a337 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -4,6 +4,7 @@ package praefect import ( "context" + "errors" "net" "path/filepath" "testing" @@ -66,3 +67,61 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) { require.NoError(t, err) require.False(t, exists) } + +func TestReplicatorDestroy(t *testing.T) { + for _, tc := range []struct { + change datastore.ChangeType + exists bool + error error + }{ + {change: datastore.DeleteReplica, exists: true}, + {change: datastore.DeleteRepo, exists: false}, + {change: "invalid-type", exists: true, error: errors.New(`unknown change type: "invalid-type"`)}, + } { + t.Run(string(tc.change), func(t *testing.T) { + db := getDB(t) + + rs := datastore.NewPostgresRepositoryStore(db, nil) + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "storage-1", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "storage-2", 0)) + + ln, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + srv := grpc.NewServer(grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { + return stream.SendMsg(&gitalypb.RemoveRepositoryResponse{}) + })) + + go srv.Serve(ln) + defer srv.Stop() + + clientConn, err := grpc.Dial(ln.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + defer clientConn.Close() + + require.Equal(t, tc.error, defaultReplicator{ + rs: rs, + log: testhelper.DiscardTestLogger(t), + }.Destroy( + ctx, + datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: tc.change, + VirtualStorage: "virtual-storage-1", + RelativePath: "relative-path-1", + TargetNodeStorage: "storage-1", + }, + }, + clientConn, + )) + + exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1") + require.NoError(t, err) + require.Equal(t, tc.exists, exists) + }) + } +} |