diff options
author | John Cai <jcai@gitlab.com> | 2020-03-25 01:43:10 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-03-27 23:23:21 +0300 |
commit | f0977f4045ba6ac75c448a399533e284401f5f35 (patch) | |
tree | 34f70d0b5a9866b18d3c298934309f71d8d35a0e | |
parent | 7c116ace7f1626cd8bdcd0ca9260a7d13488d6c6 (diff) |
Propagate GarbageCollect, RepackFull, RepackIncremental
-rw-r--r-- | changelogs/unreleased/jc-propagate-repacks.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 19 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 6 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 77 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 188 |
5 files changed, 295 insertions, 0 deletions
diff --git a/changelogs/unreleased/jc-propagate-repacks.yml b/changelogs/unreleased/jc-propagate-repacks.yml new file mode 100644 index 000000000..3189d77cb --- /dev/null +++ b/changelogs/unreleased/jc-propagate-repacks.yml @@ -0,0 +1,5 @@ +--- +title: Propagate GarbageCollect, RepackFull, RepackIncremental to secondary nodes +merge_request: 1970 +author: +type: added diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 85e53345d..361f24602 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -30,6 +30,25 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } return datastore.RenameRepo, datastore.Params{"RelativePath": req.RelativePath}, nil + case "/gitaly.RepositoryService/GarbageCollect": + req, ok := m.(*gitalypb.GarbageCollectRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.GarbageCollect, datastore.Params{"CreateBitmap": req.GetCreateBitmap()}, nil + case "/gitaly.RepositoryService/RepackFull": + req, ok := m.(*gitalypb.RepackFullRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.RepackFull, datastore.Params{"CreateBitmap": req.GetCreateBitmap()}, nil + case "/gitaly.RepositoryService/RepackIncremental": + req, ok := m.(*gitalypb.RepackIncrementalRequest) + if !ok { + return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) + } + return datastore.RepackIncremental, nil, nil + default: return datastore.UpdateRepo, nil, nil } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 0b2d91c60..74a511712 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -50,6 +50,12 @@ const ( DeleteRepo = ChangeType("delete") // RenameRepo is when a replication renames repo RenameRepo = ChangeType("rename") + // GarbageCollect is when replication runs gc + GarbageCollect = ChangeType("gc") + // RepackFull is when replication runs a full repack + RepackFull = ChangeType("repack_full") + // RepackIncremental is when replication runs an incremental repack + RepackIncremental = ChangeType("repack_incremental") ) func (ct ChangeType) String() string { diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 263740747..d21d82624 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -28,6 +28,12 @@ type Replicator interface { Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error // Rename will rename(move) the target repo on the specified target connection Rename(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + // GarbageCollect will run gc on the target repository + GarbageCollect(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + // RepackFull will do a full repack on the target repository + RepackFull(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + // RepackIncremental will do an incremental repack on the target repository + RepackIncremental(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error } type defaultReplicator struct { @@ -145,6 +151,71 @@ func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, t return err } +func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: job.TargetNode.Storage, + RelativePath: job.RelativePath, + } + + val, found := job.Params["CreateBitmap"] + if !found { + return errors.New("no 'CreateBitmap' parameter for garbage collect") + } + createBitmap, ok := val.(bool) + if !ok { + return fmt.Errorf("parameter 'CreateBitmap' has unexpected type: %T", createBitmap) + } + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + _, err := repoSvcClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{ + Repository: targetRepo, + CreateBitmap: createBitmap, + }) + + return err +} + +func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: job.TargetNode.Storage, + RelativePath: job.RelativePath, + } + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + _, err := repoSvcClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{ + Repository: targetRepo, + }) + + return err +} + +func (dr defaultReplicator) RepackFull(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { + targetRepo := &gitalypb.Repository{ + StorageName: job.TargetNode.Storage, + RelativePath: job.RelativePath, + } + + val, found := job.Params["CreateBitmap"] + if !found { + return errors.New("no 'CreateBitmap' parameter for repack full") + } + createBitmap, ok := val.(bool) + if !ok { + return fmt.Errorf("parameter 'CreateBitmap' has unexpected type: %T", createBitmap) + } + + repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + + _, err := repoSvcClient.RepackFull(ctx, &gitalypb.RepackFullRequest{ + Repository: targetRepo, + CreateBitmap: createBitmap, + }) + + return err +} + func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClient, repo *gitalypb.Repository, checksum *string) func() error { return func() error { primaryChecksumRes, err := client.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{ @@ -448,6 +519,12 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour err = r.replicator.Destroy(injectedCtx, job, targetCC) case datastore.RenameRepo: err = r.replicator.Rename(injectedCtx, job, targetCC) + case datastore.GarbageCollect: + err = r.replicator.GarbageCollect(injectedCtx, job, targetCC) + case datastore.RepackFull: + err = r.replicator.RepackFull(injectedCtx, job, targetCC) + case datastore.RepackIncremental: + err = r.replicator.RepackIncremental(injectedCtx, job, targetCC) default: err = fmt.Errorf("unknown replication change type encountered: %q", job.Change) } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 07ef6b614..e3ff5897f 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" @@ -19,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" objectpoolservice "gitlab.com/gitlab-org/gitaly/internal/service/objectpool" @@ -179,6 +181,192 @@ func TestProcessReplicationJob(t *testing.T) { require.Len(t, mockReplicationHistogram.Values, 1) } +func TestPropagateReplicationJob(t *testing.T) { + primaryServer, primarySocketPath, cleanup := runMockRepositoryServer(t) + defer cleanup() + + secondaryServer, secondarySocketPath, cleanup := runMockRepositoryServer(t) + defer cleanup() + + primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: primaryStorage, + Address: primarySocketPath, + DefaultPrimary: true, + }, + { + Storage: secondaryStorage, + Address: secondarySocketPath, + }, + }, + }, + }, + } + + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()), + } + logEntry := testhelper.DiscardTestEntry(t) + + nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec()) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + + registry := protoregistry.New() + require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) + coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, registry) + + replmgr := NewReplMgr( + conf.VirtualStorages[0].Name, + logEntry, + ds, + nodeMgr, + ) + + prf := NewServer( + coordinator.StreamDirector, + logEntry, + registry, + conf, + ) + listener, port := listenAvailPort(t) + ctx, cancel := testhelper.Context() + defer cancel() + + prf.RegisterServices(nodeMgr, conf, ds) + go prf.Serve(listener, false) + defer prf.Stop() + + cc := dialLocalPort(t, port, false) + repositoryClient := gitalypb.NewRepositoryServiceClient(cc) + defer listener.Close() + defer cc.Close() + + repositoryRelativePath := "/path/to/repo" + + repository := &gitalypb.Repository{ + StorageName: conf.VirtualStorages[0].Name, + RelativePath: repositoryRelativePath, + } + + _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: repository, CreateBitmap: true}) + require.NoError(t, err) + + _, err = repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{Repository: repository, CreateBitmap: false}) + require.NoError(t, err) + + _, err = repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{Repository: repository}) + require.NoError(t, err) + + primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath} + expectedPrimaryGcReq := gitalypb.GarbageCollectRequest{ + Repository: primaryRepository, + CreateBitmap: true, + } + expectedPrimaryRepackFullReq := gitalypb.RepackFullRequest{ + Repository: primaryRepository, + CreateBitmap: false, + } + expectedPrimaryRepackIncrementalReq := gitalypb.RepackIncrementalRequest{ + Repository: primaryRepository, + } + + replCtx, cancel := testhelper.Context() + defer cancel() + go replmgr.ProcessBacklog(replCtx, noopBackoffFunc) + + // ensure primary gitaly server received the expected requests + waitForRequest(t, primaryServer.gcChan, expectedPrimaryGcReq, 5*time.Second) + waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second) + waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second) + + secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath} + + expectedSecondaryGcReq := expectedPrimaryGcReq + expectedSecondaryGcReq.Repository = secondaryRepository + + expectedSecondaryRepackFullReq := expectedPrimaryRepackFullReq + expectedSecondaryRepackFullReq.Repository = secondaryRepository + + expectedSecondaryRepackIncrementalReq := expectedPrimaryRepackIncrementalReq + expectedSecondaryRepackIncrementalReq.Repository = secondaryRepository + + // ensure secondary gitaly server received the expected requests + waitForRequest(t, secondaryServer.gcChan, expectedSecondaryGcReq, 5*time.Second) + waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second) + waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second) +} + +type mockRepositoryServer struct { + gcChan, repackFullChan, repackIncrChan chan interface{} + + gitalypb.UnimplementedRepositoryServiceServer +} + +func newMockRepositoryServer() *mockRepositoryServer { + return &mockRepositoryServer{ + gcChan: make(chan interface{}), + repackFullChan: make(chan interface{}), + repackIncrChan: make(chan interface{}), + } +} + +func (m *mockRepositoryServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) { + go func() { + m.gcChan <- *in + }() + return &gitalypb.GarbageCollectResponse{}, nil +} + +func (m *mockRepositoryServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) { + go func() { + m.repackFullChan <- *in + }() + return &gitalypb.RepackFullResponse{}, nil +} + +func (m *mockRepositoryServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) { + go func() { + m.repackIncrChan <- *in + }() + return &gitalypb.RepackIncrementalResponse{}, nil +} + +func runMockRepositoryServer(t *testing.T) (*mockRepositoryServer, string, func()) { + server := testhelper.NewTestGrpcServer(t, nil, nil) + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + require.NoError(t, err) + + mockServer := newMockRepositoryServer() + + gitalypb.RegisterRepositoryServiceServer(server, mockServer) + reflection.Register(server) + + go server.Serve(listener) + + return mockServer, "unix://" + serverSocketPath, server.Stop +} + +func waitForRequest(t *testing.T, ch chan interface{}, expected interface{}, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case req := <-ch: + assert.Equal(t, expected, req) + close(ch) + case <-timer.C: + t.Fatal("timed out") + } +} + func TestConfirmReplication(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() |