diff options
author | Toon Claes <toon@gitlab.com> | 2021-07-07 13:19:46 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2021-07-07 13:19:46 +0300 |
commit | 20c18da3ef98740a5f9e852197ad643ad74dbd4a (patch) | |
tree | 4c1258ce3fdbe471503b728f4546a0516d7ffcbb | |
parent | 8cc6bb1ffb6830fa416c8d0e32f9edebf6573730 (diff) | |
parent | 1d47a0abfe386c62f15d8f49d71f6275f143aa92 (diff) |
Merge branch 'pks-coordinator-cleanup-rpcs' into 'master'
Fix issues with replication of optimizing RPCs
See merge request gitlab-org/gitaly!3638
-rw-r--r-- | internal/praefect/coordinator.go | 29 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 23 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 134 | ||||
-rw-r--r-- | internal/praefect/replicator_pg_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 107 |
5 files changed, 269 insertions, 33 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 520445b4e..0a2151e04 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -183,7 +183,10 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change if !ok { return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } - return datastore.GarbageCollect, datastore.Params{"CreateBitmap": req.GetCreateBitmap()}, nil + return datastore.GarbageCollect, datastore.Params{ + "CreateBitmap": req.GetCreateBitmap(), + "Prune": req.GetPrune(), + }, nil case "/gitaly.RepositoryService/RepackFull": req, ok := m.(*gitalypb.RepackFullRequest) if !ok { @@ -202,12 +205,34 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } return datastore.Cleanup, nil, nil + case "/gitaly.RepositoryService/WriteCommitGraph": + req, ok := m.(*gitalypb.WriteCommitGraphRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.WriteCommitGraph, datastore.Params{ + "SplitStrategy": req.GetSplitStrategy(), + }, nil + case "/gitaly.RepositoryService/MidxRepack": + req, ok := m.(*gitalypb.MidxRepackRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.MidxRepack, nil, nil + case "/gitaly.RepositoryService/OptimizeRepository": + req, ok := m.(*gitalypb.OptimizeRepositoryRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.OptimizeRepository, nil, nil case "/gitaly.RefService/PackRefs": req, ok := m.(*gitalypb.PackRefsRequest) if !ok { return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } - return datastore.PackRefs, nil, nil + return datastore.PackRefs, datastore.Params{ + "AllRefs": req.GetAllRefs(), + }, nil default: return datastore.UpdateRepo, nil, nil } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 92e94e72e..f1ad98db1 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -8,6 +8,7 @@ package datastore import ( "database/sql/driver" "encoding/json" + "errors" "fmt" ) @@ -59,6 +60,12 @@ const ( Cleanup = ChangeType("cleanup") // PackRefs is when replication optimizes references in a repo PackRefs = ChangeType("pack_refs") + // WriteCommitGraph is when replication writes a commit graph + WriteCommitGraph = ChangeType("write_commit_graph") + // MidxRepack is when replication does a multi-pack-index repack + MidxRepack = ChangeType("midx_repack") + // OptimizeRepository is when replication optimizes a repository + OptimizeRepository = ChangeType("optimize_repository") ) func (ct ChangeType) String() string { @@ -91,3 +98,19 @@ func (p Params) Value() (driver.Value, error) { } return string(data), nil } + +// GetBool returns the boolean parameter associated with the given key. Returns an error if either +// the key does not exist, or if the value is not a bool. +func (p Params) GetBool(key string) (bool, error) { + value, found := p[key] + if !found { + return false, errors.New("key does not exist") + } + + booleanValue, ok := value.(bool) + if !ok { + return false, fmt.Errorf("value is of unexpected type %T", value) + } + + return booleanValue, nil +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 20a44d77e..c57fd77b8 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -39,6 +39,12 @@ type Replicator interface { Cleanup(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // PackRefs will optimize references on the target repository PackRefs(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error + // WriteCommitGraph will optimize references on the target repository + WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error + // MidxRepack will optimize references on the target repository + MidxRepack(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error + // OptimizeRepository will optimize the target repository + OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error } type defaultReplicator struct { @@ -227,23 +233,27 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore. RelativePath: event.Job.RelativePath, } - val, found := event.Job.Params["CreateBitmap"] - if !found { - return errors.New("no 'CreateBitmap' parameter for garbage collect") + createBitmap, err := event.Job.Params.GetBool("CreateBitmap") + if err != nil { + return fmt.Errorf("getting CreateBitmap parameter for GarbageCollect: %w", err) } - createBitmap, ok := val.(bool) - if !ok { - return fmt.Errorf("parameter 'CreateBitmap' has unexpected type: %T", createBitmap) + + prune, err := event.Job.Params.GetBool("Prune") + if err != nil { + return fmt.Errorf("getting Purge parameter for GarbageCollect: %w", err) } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - _, err := repoSvcClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ + if _, err := repoSvcClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ Repository: targetRepo, CreateBitmap: createBitmap, - }) + Prune: prune, + }); err != nil { + return err + } - return err + return nil } func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { @@ -282,38 +292,114 @@ func (dr defaultReplicator) PackRefs(ctx context.Context, event datastore.Replic RelativePath: event.Job.RelativePath, } + allRefs, err := event.Job.Params.GetBool("AllRefs") + if err != nil { + return fmt.Errorf("getting AllRefs parameter for PackRefs: %w", err) + } + refSvcClient := gitalypb.NewRefServiceClient(targetCC) - _, err := refSvcClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ + if _, err := refSvcClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ Repository: targetRepo, - }) + AllRefs: allRefs, + }); err != nil { + return err + } - return err + return nil } -func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) WriteCommitGraph(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: event.Job.TargetNodeStorage, RelativePath: event.Job.RelativePath, } - val, found := event.Job.Params["CreateBitmap"] + val, found := event.Job.Params["SplitStrategy"] if !found { - return errors.New("no 'CreateBitmap' parameter for repack full") + return fmt.Errorf("no SplitStrategy parameter for WriteCommitGraph") + } + + // While we store the parameter as the correct type in the in-memory replication queue, the + // Postgres queue will serialize parameters into a JSON structure. On deserialization, we'll + // thus get a float64 and need to cast it. + var splitStrategy gitalypb.WriteCommitGraphRequest_SplitStrategy + switch v := val.(type) { + case float64: + splitStrategy = gitalypb.WriteCommitGraphRequest_SplitStrategy(v) + case gitalypb.WriteCommitGraphRequest_SplitStrategy: + splitStrategy = v + default: + return fmt.Errorf("split strategy has wrong type %T", val) } - createBitmap, ok := val.(bool) - if !ok { - return fmt.Errorf("parameter 'CreateBitmap' has unexpected type: %T", createBitmap) + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + if _, err := repoSvcClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ + Repository: targetRepo, + SplitStrategy: splitStrategy, + }); err != nil { + return err + } + + return nil +} + +func (dr defaultReplicator) MidxRepack(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, + } + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + if _, err := repoSvcClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{ + Repository: targetRepo, + }); err != nil { + return err + } + + return nil +} + +func (dr defaultReplicator) OptimizeRepository(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, + } + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + if _, err := repoSvcClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: targetRepo, + }); err != nil { + return err + } + + return nil +} + +func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, + } + + createBitmap, err := event.Job.Params.GetBool("CreateBitmap") + if err != nil { + return fmt.Errorf("getting CreateBitmap parameter for RepackFull: %w", err) } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - _, err := repoSvcClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ + if _, err := repoSvcClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ Repository: targetRepo, CreateBitmap: createBitmap, - }) + }); err != nil { + return err + } - return err + return nil } // ReplMgr is a replication manager for handling replication jobs @@ -663,6 +749,12 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re err = r.replicator.Cleanup(ctx, event, targetCC) case datastore.PackRefs: err = r.replicator.PackRefs(ctx, event, targetCC) + case datastore.WriteCommitGraph: + err = r.replicator.WriteCommitGraph(ctx, event, targetCC) + case datastore.MidxRepack: + err = r.replicator.MidxRepack(ctx, event, targetCC) + case datastore.OptimizeRepository: + err = r.replicator.OptimizeRepository(ctx, event, targetCC) default: err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) } diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index b66382a4d..630f6944b 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -124,3 +125,11 @@ func TestReplicatorDestroy(t *testing.T) { }) } } + +func TestReplicator_PropagateReplicationJob_postgres(t *testing.T) { + testReplicatorPropagateReplicationJob(t, + func(t *testing.T, cfg config.Config) datastore.ReplicationEventQueue { + return datastore.NewPostgresReplicationEventQueue(getDB(t)) + }, + ) +} diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 6aab7cb0f..b5675f802 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -269,7 +269,20 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { } } -func TestPropagateReplicationJob(t *testing.T) { +func TestReplicator_PropagateReplicationJob_inmemory(t *testing.T) { + testReplicatorPropagateReplicationJob(t, + func(t *testing.T, cfg config.Config) datastore.ReplicationEventQueue { + return datastore.NewMemoryReplicationEventQueue(cfg) + }, + ) +} + +// testReplicatorPropagateReplicationJob is used to drive both in-memory and Postgres +// tests. +func testReplicatorPropagateReplicationJob( + t *testing.T, + createReplicationQueue func(*testing.T, config.Config) datastore.ReplicationEventQueue, +) { primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" primCfg := testcfg.Build(t, testcfg.WithStorages(primaryStorage)) @@ -303,7 +316,7 @@ func TestPropagateReplicationJob(t *testing.T) { // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. - queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) + queue := datastore.NewReplicationEventQueueInterceptor(createReplicationQueue(t, conf)) var wg sync.WaitGroup queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { wg.Add(1) @@ -357,7 +370,11 @@ func TestPropagateReplicationJob(t *testing.T) { RelativePath: repositoryRelativePath, } - _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: repository, CreateBitmap: true}) + _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ + Repository: repository, + CreateBitmap: true, + Prune: true, + }) require.NoError(t, err) _, err = repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{Repository: repository, CreateBitmap: false}) @@ -369,13 +386,33 @@ func TestPropagateReplicationJob(t *testing.T) { _, err = repositoryClient.Cleanup(ctx, &gitalypb.CleanupRequest{Repository: repository}) require.NoError(t, err) - _, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{Repository: repository}) + _, err = repositoryClient.WriteCommitGraph(ctx, &gitalypb.WriteCommitGraphRequest{ + Repository: repository, + // This is not a valid split strategy, but we currently only support a + // single default split strategy with value 0. So we just test with an + // invalid split strategy to check that a non-default value gets properly + // replicated. + SplitStrategy: 1, + }) + require.NoError(t, err) + + _, err = repositoryClient.MidxRepack(ctx, &gitalypb.MidxRepackRequest{Repository: repository}) + require.NoError(t, err) + + _, err = repositoryClient.OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{Repository: repository}) + require.NoError(t, err) + + _, err = refClient.PackRefs(ctx, &gitalypb.PackRefsRequest{ + Repository: repository, + AllRefs: true, + }) require.NoError(t, err) primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath} expectedPrimaryGcReq := &gitalypb.GarbageCollectRequest{ Repository: primaryRepository, CreateBitmap: true, + Prune: true, } expectedPrimaryRepackFullReq := &gitalypb.RepackFullRequest{ Repository: primaryRepository, @@ -387,8 +424,19 @@ func TestPropagateReplicationJob(t *testing.T) { expectedPrimaryCleanup := &gitalypb.CleanupRequest{ Repository: primaryRepository, } + expectedPrimaryWriteCommitGraph := &gitalypb.WriteCommitGraphRequest{ + Repository: primaryRepository, + SplitStrategy: 1, + } + expectedPrimaryMidxRepack := &gitalypb.MidxRepackRequest{ + Repository: primaryRepository, + } + expectedPrimaryOptimizeRepository := &gitalypb.OptimizeRepositoryRequest{ + Repository: primaryRepository, + } expectedPrimaryPackRefs := &gitalypb.PackRefsRequest{ Repository: primaryRepository, + AllRefs: true, } replCtx, cancel := testhelper.Context() @@ -400,6 +448,9 @@ func TestPropagateReplicationJob(t *testing.T) { waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second) waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second) waitForRequest(t, primaryServer.cleanupChan, expectedPrimaryCleanup, 5*time.Second) + waitForRequest(t, primaryServer.writeCommitGraphChan, expectedPrimaryWriteCommitGraph, 5*time.Second) + waitForRequest(t, primaryServer.midxRepackChan, expectedPrimaryMidxRepack, 5*time.Second) + waitForRequest(t, primaryServer.optimizeRepositoryChan, expectedPrimaryOptimizeRepository, 5*time.Second) waitForRequest(t, primaryServer.packRefsChan, expectedPrimaryPackRefs, 5*time.Second) secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath} @@ -416,6 +467,15 @@ func TestPropagateReplicationJob(t *testing.T) { expectedSecondaryCleanup := expectedPrimaryCleanup expectedSecondaryCleanup.Repository = secondaryRepository + expectedSecondaryWriteCommitGraph := expectedPrimaryWriteCommitGraph + expectedSecondaryWriteCommitGraph.Repository = secondaryRepository + + expectedSecondaryMidxRepack := expectedPrimaryMidxRepack + expectedSecondaryMidxRepack.Repository = secondaryRepository + + expectedSecondaryOptimizeRepository := expectedPrimaryOptimizeRepository + expectedSecondaryOptimizeRepository.Repository = secondaryRepository + expectedSecondaryPackRefs := expectedPrimaryPackRefs expectedSecondaryPackRefs.Repository = secondaryRepository @@ -424,12 +484,15 @@ func TestPropagateReplicationJob(t *testing.T) { waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second) waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second) waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second) + waitForRequest(t, secondaryServer.writeCommitGraphChan, expectedSecondaryWriteCommitGraph, 5*time.Second) + waitForRequest(t, secondaryServer.midxRepackChan, expectedSecondaryMidxRepack, 5*time.Second) + waitForRequest(t, secondaryServer.optimizeRepositoryChan, expectedSecondaryOptimizeRepository, 5*time.Second) waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second) wg.Wait() } type mockServer struct { - gcChan, repackFullChan, repackIncrChan, cleanupChan, packRefsChan chan proto.Message + gcChan, repackFullChan, repackIncrChan, cleanupChan, writeCommitGraphChan, midxRepackChan, optimizeRepositoryChan, packRefsChan chan proto.Message gitalypb.UnimplementedRepositoryServiceServer gitalypb.UnimplementedRefServiceServer @@ -437,11 +500,14 @@ type mockServer struct { func newMockRepositoryServer() *mockServer { return &mockServer{ - gcChan: make(chan proto.Message), - repackFullChan: make(chan proto.Message), - repackIncrChan: make(chan proto.Message), - cleanupChan: make(chan proto.Message), - packRefsChan: make(chan proto.Message), + gcChan: make(chan proto.Message), + repackFullChan: make(chan proto.Message), + repackIncrChan: make(chan proto.Message), + cleanupChan: make(chan proto.Message), + writeCommitGraphChan: make(chan proto.Message), + midxRepackChan: make(chan proto.Message), + optimizeRepositoryChan: make(chan proto.Message), + packRefsChan: make(chan proto.Message), } } @@ -473,6 +539,27 @@ func (m *mockServer) Cleanup(ctx context.Context, in *gitalypb.CleanupRequest) ( return &gitalypb.CleanupResponse{}, nil } +func (m *mockServer) WriteCommitGraph(ctx context.Context, in *gitalypb.WriteCommitGraphRequest) (*gitalypb.WriteCommitGraphResponse, error) { + go func() { + m.writeCommitGraphChan <- in + }() + return &gitalypb.WriteCommitGraphResponse{}, nil +} + +func (m *mockServer) MidxRepack(ctx context.Context, in *gitalypb.MidxRepackRequest) (*gitalypb.MidxRepackResponse, error) { + go func() { + m.midxRepackChan <- in + }() + return &gitalypb.MidxRepackResponse{}, nil +} + +func (m *mockServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + go func() { + m.optimizeRepositoryChan <- in + }() + return &gitalypb.OptimizeRepositoryResponse{}, nil +} + func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) (*gitalypb.PackRefsResponse, error) { go func() { m.packRefsChan <- in |