Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-03-25 01:43:10 +0300
committerJohn Cai <jcai@gitlab.com>2020-03-27 23:23:21 +0300
commitf0977f4045ba6ac75c448a399533e284401f5f35 (patch)
tree34f70d0b5a9866b18d3c298934309f71d8d35a0e
parent7c116ace7f1626cd8bdcd0ca9260a7d13488d6c6 (diff)
Propagate GarbageCollect, RepackFull, RepackIncremental
-rw-r--r--changelogs/unreleased/jc-propagate-repacks.yml5
-rw-r--r--internal/praefect/coordinator.go19
-rw-r--r--internal/praefect/datastore/datastore.go6
-rw-r--r--internal/praefect/replicator.go77
-rw-r--r--internal/praefect/replicator_test.go188
5 files changed, 295 insertions, 0 deletions
diff --git a/changelogs/unreleased/jc-propagate-repacks.yml b/changelogs/unreleased/jc-propagate-repacks.yml
new file mode 100644
index 000000000..3189d77cb
--- /dev/null
+++ b/changelogs/unreleased/jc-propagate-repacks.yml
@@ -0,0 +1,5 @@
+---
+title: Propagate GarbageCollect, RepackFull, RepackIncremental to secondary nodes
+merge_request: 1970
+author:
+type: added
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 85e53345d..361f24602 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -30,6 +30,25 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change
return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
}
return datastore.RenameRepo, datastore.Params{"RelativePath": req.RelativePath}, nil
+ case "/gitaly.RepositoryService/GarbageCollect":
+ req, ok := m.(*gitalypb.GarbageCollectRequest)
+ if !ok {
+ return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ }
+ return datastore.GarbageCollect, datastore.Params{"CreateBitmap": req.GetCreateBitmap()}, nil
+ case "/gitaly.RepositoryService/RepackFull":
+ req, ok := m.(*gitalypb.RepackFullRequest)
+ if !ok {
+ return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ }
+ return datastore.RepackFull, datastore.Params{"CreateBitmap": req.GetCreateBitmap()}, nil
+ case "/gitaly.RepositoryService/RepackIncremental":
+ req, ok := m.(*gitalypb.RepackIncrementalRequest)
+ if !ok {
+ return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ }
+ return datastore.RepackIncremental, nil, nil
+
default:
return datastore.UpdateRepo, nil, nil
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 0b2d91c60..74a511712 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -50,6 +50,12 @@ const (
DeleteRepo = ChangeType("delete")
// RenameRepo is when a replication renames repo
RenameRepo = ChangeType("rename")
+ // GarbageCollect is when replication runs gc
+ GarbageCollect = ChangeType("gc")
+ // RepackFull is when replication runs a full repack
+ RepackFull = ChangeType("repack_full")
+ // RepackIncremental is when replication runs an incremental repack
+ RepackIncremental = ChangeType("repack_incremental")
)
func (ct ChangeType) String() string {
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 263740747..d21d82624 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -28,6 +28,12 @@ type Replicator interface {
Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
// Rename will rename(move) the target repo on the specified target connection
Rename(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ // GarbageCollect will run gc on the target repository
+ GarbageCollect(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ // RepackFull will do a full repack on the target repository
+ RepackFull(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ // RepackIncremental will do an incremental repack on the target repository
+ RepackIncremental(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
}
type defaultReplicator struct {
@@ -145,6 +151,71 @@ func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, t
return err
}
+func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: job.TargetNode.Storage,
+ RelativePath: job.RelativePath,
+ }
+
+ val, found := job.Params["CreateBitmap"]
+ if !found {
+ return errors.New("no 'CreateBitmap' parameter for garbage collect")
+ }
+ createBitmap, ok := val.(bool)
+ if !ok {
+ return fmt.Errorf("parameter 'CreateBitmap' has unexpected type: %T", createBitmap)
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ _, err := repoSvcClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{
+ Repository: targetRepo,
+ CreateBitmap: createBitmap,
+ })
+
+ return err
+}
+
+func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: job.TargetNode.Storage,
+ RelativePath: job.RelativePath,
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ _, err := repoSvcClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{
+ Repository: targetRepo,
+ })
+
+ return err
+}
+
+func (dr defaultReplicator) RepackFull(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: job.TargetNode.Storage,
+ RelativePath: job.RelativePath,
+ }
+
+ val, found := job.Params["CreateBitmap"]
+ if !found {
+ return errors.New("no 'CreateBitmap' parameter for repack full")
+ }
+ createBitmap, ok := val.(bool)
+ if !ok {
+ return fmt.Errorf("parameter 'CreateBitmap' has unexpected type: %T", createBitmap)
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ _, err := repoSvcClient.RepackFull(ctx, &gitalypb.RepackFullRequest{
+ Repository: targetRepo,
+ CreateBitmap: createBitmap,
+ })
+
+ return err
+}
+
func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClient, repo *gitalypb.Repository, checksum *string) func() error {
return func() error {
primaryChecksumRes, err := client.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
@@ -448,6 +519,12 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
err = r.replicator.Destroy(injectedCtx, job, targetCC)
case datastore.RenameRepo:
err = r.replicator.Rename(injectedCtx, job, targetCC)
+ case datastore.GarbageCollect:
+ err = r.replicator.GarbageCollect(injectedCtx, job, targetCC)
+ case datastore.RepackFull:
+ err = r.replicator.RepackFull(injectedCtx, job, targetCC)
+ case datastore.RepackIncremental:
+ err = r.replicator.RepackIncremental(injectedCtx, job, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %q", job.Change)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 07ef6b614..e3ff5897f 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -19,6 +20,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
objectpoolservice "gitlab.com/gitlab-org/gitaly/internal/service/objectpool"
@@ -179,6 +181,192 @@ func TestProcessReplicationJob(t *testing.T) {
require.Len(t, mockReplicationHistogram.Values, 1)
}
+func TestPropagateReplicationJob(t *testing.T) {
+ primaryServer, primarySocketPath, cleanup := runMockRepositoryServer(t)
+ defer cleanup()
+
+ secondaryServer, secondarySocketPath, cleanup := runMockRepositoryServer(t)
+ defer cleanup()
+
+ primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1"
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: primaryStorage,
+ Address: primarySocketPath,
+ DefaultPrimary: true,
+ },
+ {
+ Storage: secondaryStorage,
+ Address: secondarySocketPath,
+ },
+ },
+ },
+ },
+ }
+
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()),
+ }
+ logEntry := testhelper.DiscardTestEntry(t)
+
+ nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ registry := protoregistry.New()
+ require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, registry)
+
+ replmgr := NewReplMgr(
+ conf.VirtualStorages[0].Name,
+ logEntry,
+ ds,
+ nodeMgr,
+ )
+
+ prf := NewServer(
+ coordinator.StreamDirector,
+ logEntry,
+ registry,
+ conf,
+ )
+ listener, port := listenAvailPort(t)
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ prf.RegisterServices(nodeMgr, conf, ds)
+ go prf.Serve(listener, false)
+ defer prf.Stop()
+
+ cc := dialLocalPort(t, port, false)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(cc)
+ defer listener.Close()
+ defer cc.Close()
+
+ repositoryRelativePath := "/path/to/repo"
+
+ repository := &gitalypb.Repository{
+ StorageName: conf.VirtualStorages[0].Name,
+ RelativePath: repositoryRelativePath,
+ }
+
+ _, err = repositoryClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: repository, CreateBitmap: true})
+ require.NoError(t, err)
+
+ _, err = repositoryClient.RepackFull(ctx, &gitalypb.RepackFullRequest{Repository: repository, CreateBitmap: false})
+ require.NoError(t, err)
+
+ _, err = repositoryClient.RepackIncremental(ctx, &gitalypb.RepackIncrementalRequest{Repository: repository})
+ require.NoError(t, err)
+
+ primaryRepository := &gitalypb.Repository{StorageName: primaryStorage, RelativePath: repositoryRelativePath}
+ expectedPrimaryGcReq := gitalypb.GarbageCollectRequest{
+ Repository: primaryRepository,
+ CreateBitmap: true,
+ }
+ expectedPrimaryRepackFullReq := gitalypb.RepackFullRequest{
+ Repository: primaryRepository,
+ CreateBitmap: false,
+ }
+ expectedPrimaryRepackIncrementalReq := gitalypb.RepackIncrementalRequest{
+ Repository: primaryRepository,
+ }
+
+ replCtx, cancel := testhelper.Context()
+ defer cancel()
+ go replmgr.ProcessBacklog(replCtx, noopBackoffFunc)
+
+ // ensure primary gitaly server received the expected requests
+ waitForRequest(t, primaryServer.gcChan, expectedPrimaryGcReq, 5*time.Second)
+ waitForRequest(t, primaryServer.repackIncrChan, expectedPrimaryRepackIncrementalReq, 5*time.Second)
+ waitForRequest(t, primaryServer.repackFullChan, expectedPrimaryRepackFullReq, 5*time.Second)
+
+ secondaryRepository := &gitalypb.Repository{StorageName: secondaryStorage, RelativePath: repositoryRelativePath}
+
+ expectedSecondaryGcReq := expectedPrimaryGcReq
+ expectedSecondaryGcReq.Repository = secondaryRepository
+
+ expectedSecondaryRepackFullReq := expectedPrimaryRepackFullReq
+ expectedSecondaryRepackFullReq.Repository = secondaryRepository
+
+ expectedSecondaryRepackIncrementalReq := expectedPrimaryRepackIncrementalReq
+ expectedSecondaryRepackIncrementalReq.Repository = secondaryRepository
+
+ // ensure secondary gitaly server received the expected requests
+ waitForRequest(t, secondaryServer.gcChan, expectedSecondaryGcReq, 5*time.Second)
+ waitForRequest(t, secondaryServer.repackIncrChan, expectedSecondaryRepackIncrementalReq, 5*time.Second)
+ waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second)
+}
+
+type mockRepositoryServer struct {
+ gcChan, repackFullChan, repackIncrChan chan interface{}
+
+ gitalypb.UnimplementedRepositoryServiceServer
+}
+
+func newMockRepositoryServer() *mockRepositoryServer {
+ return &mockRepositoryServer{
+ gcChan: make(chan interface{}),
+ repackFullChan: make(chan interface{}),
+ repackIncrChan: make(chan interface{}),
+ }
+}
+
+func (m *mockRepositoryServer) GarbageCollect(ctx context.Context, in *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) {
+ go func() {
+ m.gcChan <- *in
+ }()
+ return &gitalypb.GarbageCollectResponse{}, nil
+}
+
+func (m *mockRepositoryServer) RepackFull(ctx context.Context, in *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) {
+ go func() {
+ m.repackFullChan <- *in
+ }()
+ return &gitalypb.RepackFullResponse{}, nil
+}
+
+func (m *mockRepositoryServer) RepackIncremental(ctx context.Context, in *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) {
+ go func() {
+ m.repackIncrChan <- *in
+ }()
+ return &gitalypb.RepackIncrementalResponse{}, nil
+}
+
+func runMockRepositoryServer(t *testing.T) (*mockRepositoryServer, string, func()) {
+ server := testhelper.NewTestGrpcServer(t, nil, nil)
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", serverSocketPath)
+ require.NoError(t, err)
+
+ mockServer := newMockRepositoryServer()
+
+ gitalypb.RegisterRepositoryServiceServer(server, mockServer)
+ reflection.Register(server)
+
+ go server.Serve(listener)
+
+ return mockServer, "unix://" + serverSocketPath, server.Stop
+}
+
+func waitForRequest(t *testing.T, ch chan interface{}, expected interface{}, timeout time.Duration) {
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case req := <-ch:
+ assert.Equal(t, expected, req)
+ close(ch)
+ case <-timer.C:
+ t.Fatal("timed out")
+ }
+}
+
func TestConfirmReplication(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()