Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-02-17 18:35:33 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-02-18 10:17:28 +0300
commitc3ef20fedcc54c5ff6c735b9ecf49b1e0dee4c65 (patch)
treeeb3c28e71522749837956ff2c8469f4252775d1a
parent08024c36dcedbbc8f0ad172737cc4b6351522eba (diff)
implement 'delete_replica' replication job type
This commit adds a 'delete_replica' replication job type. It differs from 'delete' type by only deleting the specific replica without marking the repository deleted from the virtual storage completely. While we could update 'delete' to be handled in a similar manner, this would require us to do the work over multiple releases as old Praefect's could delete repositories completely during an upgrade. For this reason, let's implement a new job type on the side an remove the old 'delete' type in future.
-rw-r--r--internal/praefect/datastore/datastore.go2
-rw-r--r--internal/praefect/replicator.go18
-rw-r--r--internal/praefect/replicator_pg_test.go59
3 files changed, 75 insertions, 4 deletions
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index fa5996044..082719aed 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -45,6 +45,8 @@ const (
CreateRepo = ChangeType("create")
// DeleteRepo is when a replication deletes a repo
DeleteRepo = ChangeType("delete")
+ // DeleteReplica change type indicates that the targeted replica is due for deletion.
+ DeleteReplica = ChangeType("delete_replica")
// RenameRepo is when a replication renames repo
RenameRepo = ChangeType("rename")
// GarbageCollect is when replication runs gc
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 5dfa4806d..240c59ae3 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -149,16 +149,26 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
return err
}
+ var deleteFunc func(context.Context, string, string, string) error
+ switch event.Job.Change {
+ case datastore.DeleteRepo:
+ deleteFunc = dr.rs.DeleteRepository
+ case datastore.DeleteReplica:
+ deleteFunc = dr.rs.DeleteReplica
+ default:
+ return fmt.Errorf("unknown change type: %q", event.Job.Change)
+ }
+
// If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual
// storage but having one for the storage. We can later retry the deletion.
- if err := dr.rs.DeleteRepository(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil {
+ if err := deleteFunc(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil {
if !errors.Is(err, datastore.RepositoryNotExistsError{}) {
return err
}
dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
WithError(err).
- Info("replicated repository delete does not have a store entry")
+ Info("deleted repository did not have a store entry")
}
return nil
@@ -598,13 +608,13 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
return fmt.Errorf("no connection to source node %q/%q", event.Job.VirtualStorage, event.Job.SourceNodeStorage)
}
- ctx, err := helper.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token)
+ ctx, err = helper.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token)
if err != nil {
return fmt.Errorf("inject Gitaly servers into context: %w", err)
}
err = r.replicator.Replicate(ctx, event, source.Connection, targetCC)
- case datastore.DeleteRepo:
+ case datastore.DeleteRepo, datastore.DeleteReplica:
err = r.replicator.Destroy(ctx, event, targetCC)
case datastore.RenameRepo:
err = r.replicator.Rename(ctx, event, targetCC)
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index 55bf5a244..934e4a337 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -4,6 +4,7 @@ package praefect
import (
"context"
+ "errors"
"net"
"path/filepath"
"testing"
@@ -66,3 +67,61 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
require.NoError(t, err)
require.False(t, exists)
}
+
+func TestReplicatorDestroy(t *testing.T) {
+ for _, tc := range []struct {
+ change datastore.ChangeType
+ exists bool
+ error error
+ }{
+ {change: datastore.DeleteReplica, exists: true},
+ {change: datastore.DeleteRepo, exists: false},
+ {change: "invalid-type", exists: true, error: errors.New(`unknown change type: "invalid-type"`)},
+ } {
+ t.Run(string(tc.change), func(t *testing.T) {
+ db := getDB(t)
+
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "storage-1", 0))
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "storage-2", 0))
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ require.NoError(t, err)
+
+ srv := grpc.NewServer(grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
+ return stream.SendMsg(&gitalypb.RemoveRepositoryResponse{})
+ }))
+
+ go srv.Serve(ln)
+ defer srv.Stop()
+
+ clientConn, err := grpc.Dial(ln.Addr().String(), grpc.WithInsecure())
+ require.NoError(t, err)
+ defer clientConn.Close()
+
+ require.Equal(t, tc.error, defaultReplicator{
+ rs: rs,
+ log: testhelper.DiscardTestLogger(t),
+ }.Destroy(
+ ctx,
+ datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: tc.change,
+ VirtualStorage: "virtual-storage-1",
+ RelativePath: "relative-path-1",
+ TargetNodeStorage: "storage-1",
+ },
+ },
+ clientConn,
+ ))
+
+ exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1")
+ require.NoError(t, err)
+ require.Equal(t, tc.exists, exists)
+ })
+ }
+}