diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-29 16:42:52 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-29 16:42:52 +0300 |
commit | bda1c0a28f6d6f767ae7407ff9a1e04d6d0f1878 (patch) | |
tree | c404b0ef009790e7f40305fdb05a006c51685023 | |
parent | 4afeaef9595693f02e05026555ef724b0385ae98 (diff) | |
parent | 64d9727a224d703da8d1afbabfd95573af6ef212 (diff) |
Merge branch 'pks-praefect-remove-maintenance-event-creation' into 'master'
replicator: Drop logic to create and handle maintenance-style replication jobs
See merge request gitlab-org/gitaly!4494
-rw-r--r-- | internal/praefect/coordinator.go | 59 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 289 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 238 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 345 |
4 files changed, 298 insertions, 633 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 82c32fcd0..ed5b71045 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -166,65 +166,6 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } return datastore.RenameRepo, datastore.Params{"RelativePath": req.RelativePath}, nil - case "/gitaly.RepositoryService/GarbageCollect": - req, ok := m.(*gitalypb.GarbageCollectRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.GarbageCollect, datastore.Params{ - "CreateBitmap": req.GetCreateBitmap(), - "Prune": req.GetPrune(), - }, nil - case "/gitaly.RepositoryService/RepackFull": - req, ok := m.(*gitalypb.RepackFullRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.RepackFull, datastore.Params{"CreateBitmap": req.GetCreateBitmap()}, nil - case "/gitaly.RepositoryService/RepackIncremental": - req, ok := m.(*gitalypb.RepackIncrementalRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.RepackIncremental, nil, nil - case "/gitaly.RepositoryService/Cleanup": - req, ok := m.(*gitalypb.CleanupRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.Cleanup, nil, nil - case "/gitaly.RepositoryService/WriteCommitGraph": - req, ok := m.(*gitalypb.WriteCommitGraphRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.WriteCommitGraph, datastore.Params{ - "SplitStrategy": req.GetSplitStrategy(), - }, nil - case "/gitaly.RepositoryService/MidxRepack": - req, ok := m.(*gitalypb.MidxRepackRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.MidxRepack, nil, nil - case "/gitaly.RepositoryService/OptimizeRepository": - req, ok := m.(*gitalypb.OptimizeRepositoryRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.OptimizeRepository, nil, nil - case "/gitaly.RepositoryService/PruneUnreachableObjects": - req, ok := m.(*gitalypb.PruneUnreachableObjectsRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.PruneUnreachableObjects, nil, nil - case "/gitaly.RefService/PackRefs": - req, ok := m.(*gitalypb.PackRefsRequest) - if !ok { - return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) - } - return datastore.PackRefs, nil, nil default: return datastore.UpdateRepo, nil, nil } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 3b4d53d15..fce4d5e5b 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -19,6 +19,8 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" @@ -549,6 +551,293 @@ func TestStreamDirector_maintenance(t *testing.T) { } } +func TestStreamDirector_maintenanceRPCs(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + primaryStorage := "internal-gitaly-0" + primaryServer, primarySocketPath := runMockMaintenanceServer( + t, testcfg.Build(t, testcfg.WithStorages(primaryStorage)), + ) + + secondaryStorage := "internal-gitaly-1" + secondaryServer, secondarySocketPath := runMockMaintenanceServer(t, testcfg.Build( + t, testcfg.WithStorages(secondaryStorage)), + ) + + cc, _, cleanup := runPraefectServer(t, ctx, config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + Storage: primaryStorage, + Address: primarySocketPath, + }, + { + Storage: secondaryStorage, + Address: secondarySocketPath, + }, + }, + }, + }, + }, buildOptions{}) + defer cleanup() + + repository := &gitalypb.Repository{ + StorageName: "default", + RelativePath: gittest.NewRepositoryName(t, true), + } + primaryRepository := &gitalypb.Repository{ + StorageName: primaryStorage, + RelativePath: repository.RelativePath, + } + secondaryRepository := &gitalypb.Repository{ + StorageName: secondaryStorage, + RelativePath: repository.RelativePath, + } + + repositoryClient := gitalypb.NewRepositoryServiceClient(cc) + refClient := gitalypb.NewRefServiceClient(cc) + + for _, tc := range []struct { + desc string + maintenanceFunc func(t *testing.T) + expectedPrimaryRequest proto.Message + expectedSecondaryRequest proto.Message + }{ + { + desc: "GarbageCollect", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ + Repository: repository, + CreateBitmap: true, + Prune: true, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.GarbageCollectRequest{ + Repository: primaryRepository, + CreateBitmap: true, + Prune: true, + }, + expectedSecondaryRequest: &gitalypb.GarbageCollectRequest{ + Repository: secondaryRepository, + CreateBitmap: true, + Prune: true, + }, + }, + { + desc: "RepackFull", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ + Repository: repository, + CreateBitmap: true, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.RepackFullRequest{ + Repository: primaryRepository, + CreateBitmap: true, + }, + expectedSecondaryRequest: &gitalypb.RepackFullRequest{ + Repository: secondaryRepository, + CreateBitmap: true, + }, + }, + { + desc: "RepackIncremental", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.RepackIncrementalRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.RepackIncrementalRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "Cleanup", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.CleanupRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.CleanupRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "WriteCommitGraph", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ + Repository: repository, + // This is not a valid split strategy, but we currently only support a + // single default split strategy with value 0. So we just test with an + // invalid split strategy to check that a non-default value gets properly + // replicated. + SplitStrategy: 1, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.WriteCommitGraphRequest{ + Repository: primaryRepository, + SplitStrategy: 1, + }, + expectedSecondaryRequest: &gitalypb.WriteCommitGraphRequest{ + Repository: secondaryRepository, + SplitStrategy: 1, + }, + }, + { + desc: "MidxRepack", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.MidxRepackRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.MidxRepackRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "OptimizeRepository", + maintenanceFunc: func(t *testing.T) { + _, err := repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.OptimizeRepositoryRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.OptimizeRepositoryRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "PruneUnreachableObjects", + maintenanceFunc: func(t *testing.T) { + _, err := repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.PruneUnreachableObjectsRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.PruneUnreachableObjectsRequest{ + Repository: secondaryRepository, + }, + }, + { + desc: "PackRefs", + maintenanceFunc: func(t *testing.T) { + //nolint:staticcheck + _, err := refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ + Repository: repository, + }) + require.NoError(t, err) + }, + expectedPrimaryRequest: &gitalypb.PackRefsRequest{ + Repository: primaryRepository, + }, + expectedSecondaryRequest: &gitalypb.PackRefsRequest{ + Repository: secondaryRepository, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tc.maintenanceFunc(t) + testhelper.ProtoEqual(t, tc.expectedPrimaryRequest, <-primaryServer.requestCh) + testhelper.ProtoEqual(t, tc.expectedSecondaryRequest, <-secondaryServer.requestCh) + }) + } +} + +type mockMaintenanceServer struct { + requestCh chan proto.Message + gitalypb.UnimplementedRepositoryServiceServer + gitalypb.UnimplementedRefServiceServer +} + +func runMockMaintenanceServer(t *testing.T, cfg gconfig.Cfg) (*mockMaintenanceServer, string) { + server := &mockMaintenanceServer{ + requestCh: make(chan proto.Message, 1), + } + + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, server) + gitalypb.RegisterRefServiceServer(srv, server) + }, testserver.WithDisablePraefect()) + + return server, addr +} + +func (m *mockMaintenanceServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) { + m.requestCh <- in + return &gitalypb.GarbageCollectResponse{}, nil +} + +func (m *mockMaintenanceServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) { + m.requestCh <- in + return &gitalypb.RepackFullResponse{}, nil +} + +func (m *mockMaintenanceServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) { + m.requestCh <- in + return &gitalypb.RepackIncrementalResponse{}, nil +} + +func (m *mockMaintenanceServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) { + m.requestCh <- in + return &gitalypb.CleanupResponse{}, nil +} + +func (m *mockMaintenanceServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) { + m.requestCh <- in + return &gitalypb.WriteCommitGraphResponse{}, nil +} + +func (m *mockMaintenanceServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) { + m.requestCh <- in + return &gitalypb.MidxRepackResponse{}, nil +} + +func (m *mockMaintenanceServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + m.requestCh <- in + return &gitalypb.OptimizeRepositoryResponse{}, nil +} + +func (m *mockMaintenanceServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { + m.requestCh <- in + return &gitalypb.PruneUnreachableObjectsResponse{}, nil +} + +func (m *mockMaintenanceServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) { + m.requestCh <- in + return &gitalypb.PackRefsResponse{}, nil +} + type mockRouter struct { Router routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index fcca4b133..1471ecef3 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -30,24 +30,6 @@ type Replicator interface { Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // Rename will rename(move) the target repo on the specified target connection Rename(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // GarbageCollect will run gc on the target repository - GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // RepackFull will do a full repack on the target repository - RepackFull(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // RepackIncremental will do an incremental repack on the target repository - RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // Cleanup will do a cleanup on the target repository - Cleanup(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // PackRefs will optimize references on the target repository - PackRefs(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // WriteCommitGraph will optimize references on the target repository - WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // MidxRepack will optimize references on the target repository - MidxRepack(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // OptimizeRepository will optimize the target repository - OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error - // PruneUnreachableObjects prunes unreachable objects from the target repository - PruneUnreachableObjects(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error } type defaultReplicator struct { @@ -216,199 +198,6 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat return nil } -func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - createBitmap, err := event.Job.Params.GetBool("CreateBitmap") - if err != nil { - return fmt.Errorf("getting CreateBitmap parameter for GarbageCollect: %w", err) - } - - prune, err := event.Job.Params.GetBool("Prune") - if err != nil { - return fmt.Errorf("getting Purge parameter for GarbageCollect: %w", err) - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - //nolint:staticcheck - if _, err := repoSvcClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ - Repository: targetRepo, - CreateBitmap: createBitmap, - Prune: prune, - }); err != nil { - return err - } - - return nil -} - -func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - //nolint:staticcheck - _, err := repoSvcClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{ - Repository: targetRepo, - }) - - return err -} - -func (dr defaultReplicator) Cleanup(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - //nolint:staticcheck - _, err := repoSvcClient.Cleanup(ctx, &gitalypb.CleanupRequest{ - Repository: targetRepo, - }) - - return err -} - -func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - refSvcClient := gitalypb.NewRefServiceClient(targetCC) - - //nolint:staticcheck - if _, err := refSvcClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ - Repository: targetRepo, - }); err != nil { - return err - } - - return nil -} - -func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - val, found := event.Job.Params["SplitStrategy"] - if !found { - return fmt.Errorf("no SplitStrategy parameter for WriteCommitGraph") - } - - // While we store the parameter as the correct type in the in-memory replication queue, the - // Postgres queue will serialize parameters into a JSON structure. On deserialization, we'll - // thus get a float64 and need to cast it. - var splitStrategy gitalypb.WriteCommitGraphRequest_SplitStrategy - switch v := val.(type) { - case float64: - splitStrategy = gitalypb.WriteCommitGraphRequest_SplitStrategy(v) - case gitalypb.WriteCommitGraphRequest_SplitStrategy: - splitStrategy = v - default: - return fmt.Errorf("split strategy has wrong type %T", val) - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - //nolint:staticcheck - if _, err := repoSvcClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ - Repository: targetRepo, - SplitStrategy: splitStrategy, - }); err != nil { - return err - } - - return nil -} - -func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - //nolint:staticcheck - if _, err := repoSvcClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{ - Repository: targetRepo, - }); err != nil { - return err - } - - return nil -} - -func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - if _, err := repoSvcClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ - Repository: targetRepo, - }); err != nil { - return err - } - - return nil -} - -func (dr defaultReplicator) PruneUnreachableObjects(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - if _, err := repoSvcClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{ - Repository: targetRepo, - }); err != nil { - return err - } - - return nil -} - -func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { - targetRepo := &gitalypb.Repository{ - StorageName: event.Job.TargetNodeStorage, - RelativePath: event.Job.ReplicaPath, - } - - createBitmap, err := event.Job.Params.GetBool("CreateBitmap") - if err != nil { - return fmt.Errorf("getting CreateBitmap parameter for RepackFull: %w", err) - } - - repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - - //nolint:staticcheck - if _, err := repoSvcClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ - Repository: targetRepo, - CreateBitmap: createBitmap, - }); err != nil { - return err - } - - return nil -} - // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Entry @@ -850,24 +639,15 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re err = r.replicator.Destroy(ctx, event, targetCC) case datastore.RenameRepo: err = r.replicator.Rename(ctx, event, targetCC) - case datastore.GarbageCollect: - err = r.replicator.GarbageCollect(ctx, event, targetCC) - case datastore.RepackFull: - err = r.replicator.RepackFull(ctx, event, targetCC) - case datastore.RepackIncremental: - err = r.replicator.RepackIncremental(ctx, event, targetCC) - case datastore.Cleanup: - err = r.replicator.Cleanup(ctx, event, targetCC) - case datastore.PackRefs: - err = r.replicator.PackRefs(ctx, event, targetCC) - case datastore.WriteCommitGraph: - err = r.replicator.WriteCommitGraph(ctx, event, targetCC) - case datastore.MidxRepack: - err = r.replicator.MidxRepack(ctx, event, targetCC) - case datastore.OptimizeRepository: - err = r.replicator.OptimizeRepository(ctx, event, targetCC) - case datastore.PruneUnreachableObjects: - err = r.replicator.PruneUnreachableObjects(ctx, event, targetCC) + case datastore.GarbageCollect, datastore.RepackFull, datastore.RepackIncremental, datastore.Cleanup, datastore.PackRefs, datastore.WriteCommitGraph, datastore.MidxRepack, datastore.OptimizeRepository, datastore.PruneUnreachableObjects: + // Even though we don't generate any replication events for maintenance-style RPCs + // anymore, we still need to handle them here in order to drain the queue. It's safe + // to just do nothing though: the new strategy is best-effort anyway and just + // ignores out-of-date nodes, so by ignoring the events here we roughly do the same. + // + // This fallback code can be removed with v15.1 along with an SQL migration which + // prunes any remaining maintenance-style replication jobs. + r.log.Infof("ignoring maintenance event of type %v", event.Job.Change) default: err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 89e5d342c..dd91d20b8 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -2,7 +2,6 @@ package praefect import ( "context" - "fmt" "path/filepath" "strings" "sync" @@ -22,7 +21,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool" gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" @@ -32,7 +30,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" @@ -279,348 +276,6 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { } } -func TestReplicator_PropagateReplicationJob(t *testing.T) { - t.Parallel() - - ctx := testhelper.Context(t) - primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" - - primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) - primaryServer, primarySocketPath := runMockRepositoryServer(t, primCfg) - - secCfg := testcfg.Build(t, testcfg.WithStorages(secondaryStorage)) - secondaryServer, secondarySocketPath := runMockRepositoryServer(t, secCfg) - - conf := config.Config{ - VirtualStorages: []*config.VirtualStorage{ - { - Name: "default", - Nodes: []*config.Node{ - { - Storage: primaryStorage, - Address: primarySocketPath, - }, - { - Storage: secondaryStorage, - Address: secondarySocketPath, - }, - }, - }, - }, - } - - // We need to await for the replication event to make a complete roundtrip to the remote. - // Because send to the channel happens during in-flight request there are ongoing filesystem - // operations related to caching. The cleanup happens before all IO cache operations finished - // those resulting to: - // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty - // By using WaitGroup we are sure the test cleanup will be started after all replication - // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - - var wg sync.WaitGroup - // When maintenance operation routing is enabled we don't expect to see any - // replication events. The observed behaviour should still be the same though: we - // expect to observe the RPC calls on both the primary and secondary node because we - // route them to both at the same time. - queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { - require.FailNow(t, "no replication jobs should have been created") - return datastore.ReplicationEvent{}, fmt.Errorf("unexpected enqueue") - }) - - logEntry := testhelper.NewDiscardingLogEntry(t) - - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) - require.NoError(t, err) - nodeMgr.Start(0, time.Hour) - defer nodeMgr.Stop() - - txMgr := transactions.NewManager(conf) - - repositoryRelativePath := "/path/to/repo" - - rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return repositoryRelativePath, nil, nil - }, - GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { - return repositoryRelativePath, nil - }, - } - - coordinator := NewCoordinator( - queue, - rs, - NewNodeManagerRouter(nodeMgr, rs), - txMgr, - conf, - protoregistry.GitalyProtoPreregistered, - ) - - replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) - - prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, rs, nil, nil, nil, nil) - - listener, port := listenAvailPort(t) - ctx, cancel := context.WithCancel(ctx) - - go prf.Serve(listener) - defer prf.Stop() - - cc := dialLocalPort(t, port, false) - repositoryClient := gitalypb.NewRepositoryServiceClient(cc) - refClient := gitalypb.NewRefServiceClient(cc) - defer listener.Close() - defer cc.Close() - - repository := &gitalypb.Repository{ - StorageName: conf.VirtualStorages[0].Name, - RelativePath: repositoryRelativePath, - } - - //nolint:staticcheck - _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ - Repository: repository, - CreateBitmap: true, - Prune: true, - }) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{Repository: repository, CreateBitmap: false}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{Repository: repository}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{Repository: repository}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ - Repository: repository, - // This is not a valid split strategy, but we currently only support a - // single default split strategy with value 0. So we just test with an - // invalid split strategy to check that a non-default value gets properly - // replicated. - SplitStrategy: 1, - }) - require.NoError(t, err) - - //nolint:staticcheck - _, err = repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{Repository: repository}) - require.NoError(t, err) - - _, err = repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{Repository: repository}) - require.NoError(t, err) - - _, err = repositoryClient.PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{Repository: repository}) - require.NoError(t, err) - - //nolint:staticcheck - _, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ - Repository: repository, - }) - require.NoError(t, err) - - primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath} - expectedPrimaryGcReq := &gitalypb.GarbageCollectRequest{ - Repository: primaryRepository, - CreateBitmap: true, - Prune: true, - } - expectedPrimaryRepackFullReq := &gitalypb.RepackFullRequest{ - Repository: primaryRepository, - CreateBitmap: false, - } - expectedPrimaryRepackIncrementalReq := &gitalypb.RepackIncrementalRequest{ - Repository: primaryRepository, - } - expectedPrimaryCleanup := &gitalypb.CleanupRequest{ - Repository: primaryRepository, - } - expectedPrimaryWriteCommitGraph := &gitalypb.WriteCommitGraphRequest{ - Repository: primaryRepository, - SplitStrategy: 1, - } - expectedPrimaryMidxRepack := &gitalypb.MidxRepackRequest{ - Repository: primaryRepository, - } - expectedPrimaryOptimizeRepository := &gitalypb.OptimizeRepositoryRequest{ - Repository: primaryRepository, - } - expectedPruneUnreachableObjects := &gitalypb.PruneUnreachableObjectsRequest{ - Repository: primaryRepository, - } - expectedPrimaryPackRefs := &gitalypb.PackRefsRequest{ - Repository: primaryRepository, - } - - replMgrDone := startProcessBacklog(ctx, replmgr) - - // ensure primary gitaly server received the expected requests - waitForRequest(t, primaryServer.gcChan, expectedPrimaryGcReq, 5*time.Second) - waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second) - waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second) - waitForRequest(t, primaryServer.cleanupChan, expectedPrimaryCleanup, 5*time.Second) - waitForRequest(t, primaryServer.writeCommitGraphChan, expectedPrimaryWriteCommitGraph, 5*time.Second) - waitForRequest(t, primaryServer.midxRepackChan, expectedPrimaryMidxRepack, 5*time.Second) - waitForRequest(t, primaryServer.optimizeRepositoryChan, expectedPrimaryOptimizeRepository, 5*time.Second) - waitForRequest(t, primaryServer.pruneUnreachableObjectsChan, expectedPruneUnreachableObjects, 5*time.Second) - waitForRequest(t, primaryServer.packRefsChan, expectedPrimaryPackRefs, 5*time.Second) - - secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath} - - expectedSecondaryGcReq := expectedPrimaryGcReq - expectedSecondaryGcReq.Repository = secondaryRepository - - expectedSecondaryRepackFullReq := expectedPrimaryRepackFullReq - expectedSecondaryRepackFullReq.Repository = secondaryRepository - - expectedSecondaryRepackIncrementalReq := expectedPrimaryRepackIncrementalReq - expectedSecondaryRepackIncrementalReq.Repository = secondaryRepository - - expectedSecondaryCleanup := expectedPrimaryCleanup - expectedSecondaryCleanup.Repository = secondaryRepository - - expectedSecondaryWriteCommitGraph := expectedPrimaryWriteCommitGraph - expectedSecondaryWriteCommitGraph.Repository = secondaryRepository - - expectedSecondaryMidxRepack := expectedPrimaryMidxRepack - expectedSecondaryMidxRepack.Repository = secondaryRepository - - expectedSecondaryOptimizeRepository := expectedPrimaryOptimizeRepository - expectedSecondaryOptimizeRepository.Repository = secondaryRepository - - expectedSecondaryPruneUnreachableObjects := expectedPruneUnreachableObjects - expectedSecondaryPruneUnreachableObjects.Repository = secondaryRepository - - expectedSecondaryPackRefs := expectedPrimaryPackRefs - expectedSecondaryPackRefs.Repository = secondaryRepository - - // ensure secondary gitaly server received the expected requests - waitForRequest(t, secondaryServer.gcChan, expectedSecondaryGcReq, 5*time.Second) - waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second) - waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second) - waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second) - waitForRequest(t, secondaryServer.writeCommitGraphChan, expectedSecondaryWriteCommitGraph, 5*time.Second) - waitForRequest(t, secondaryServer.midxRepackChan, expectedSecondaryMidxRepack, 5*time.Second) - waitForRequest(t, secondaryServer.optimizeRepositoryChan, expectedSecondaryOptimizeRepository, 5*time.Second) - waitForRequest(t, secondaryServer.pruneUnreachableObjectsChan, expectedSecondaryPruneUnreachableObjects, 5*time.Second) - waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second) - wg.Wait() - cancel() - <-replMgrDone -} - -type mockServer struct { - gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, pruneUnreachableObjectsChan, packRefsChan chan proto.Message - - gitalypb.UnimplementedRepositoryServiceServer - gitalypb.UnimplementedRefServiceServer -} - -func newMockRepositoryServer() *mockServer { - return &mockServer{ - gcChan: make(chan proto.Message), - repackFullChan: make(chan proto.Message), - repackIncrChan: make(chan proto.Message), - cleanupChan: make(chan proto.Message), - writeCommitGraphChan: make(chan proto.Message), - midxRepackChan: make(chan proto.Message), - optimizeRepositoryChan: make(chan proto.Message), - pruneUnreachableObjectsChan: make(chan proto.Message), - packRefsChan: make(chan proto.Message), - } -} - -func (m *mockServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) { - go func() { - m.gcChan <- in - }() - return &gitalypb.GarbageCollectResponse{}, nil -} - -func (m *mockServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) { - go func() { - m.repackFullChan <- in - }() - return &gitalypb.RepackFullResponse{}, nil -} - -func (m *mockServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) { - go func() { - m.repackIncrChan <- in - }() - return &gitalypb.RepackIncrementalResponse{}, nil -} - -func (m *mockServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) { - go func() { - m.cleanupChan <- in - }() - return &gitalypb.CleanupResponse{}, nil -} - -func (m *mockServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) { - go func() { - m.writeCommitGraphChan <- in - }() - return &gitalypb.WriteCommitGraphResponse{}, nil -} - -func (m *mockServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) { - go func() { - m.midxRepackChan <- in - }() - return &gitalypb.MidxRepackResponse{}, nil -} - -func (m *mockServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { - go func() { - m.optimizeRepositoryChan <- in - }() - return &gitalypb.OptimizeRepositoryResponse{}, nil -} - -func (m *mockServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { - go func() { - m.pruneUnreachableObjectsChan <- in - }() - return &gitalypb.PruneUnreachableObjectsResponse{}, nil -} - -func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) { - go func() { - m.packRefsChan <- in - }() - return &gitalypb.PackRefsResponse{}, nil -} - -func runMockRepositoryServer(t *testing.T, cfg gconfig.Cfg) (*mockServer, string) { - mockServer := newMockRepositoryServer() - - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterRepositoryServiceServer(srv, mockServer) - gitalypb.RegisterRefServiceServer(srv, mockServer) - }, testserver.WithDisablePraefect()) - return mockServer, addr -} - -func waitForRequest(t *testing.T, ch chan proto.Message, expected proto.Message, timeout time.Duration) { - timer := time.NewTimer(timeout) - defer timer.Stop() - select { - case req := <-ch: - testhelper.ProtoEqual(t, expected, req) - close(ch) - case <-timer.C: - t.Fatal("timed out") - } -} - func TestConfirmReplication(t *testing.T) { ctx := testhelper.Context(t) |