diff options
author | John Cai <jcai@gitlab.com> | 2019-04-22 23:21:06 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-05-20 19:39:41 +0300 |
commit | 49d72a2fd9926c3ef0330f3ca2961be01487f2f6 (patch) | |
tree | 1a092ce3cdc3a0fcf335da66aac419e5c3f0e725 | |
parent | ef4a5d9721ceeb5ab09486856442ce6863563ab5 (diff) |
Add replication logic to praefect replication manager
-rw-r--r-- | changelogs/unreleased/jc-replication.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 7 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 37 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 149 | ||||
-rw-r--r-- | internal/praefect/testdata/gitlab-shell/hooks/.gitkeep | 0 | ||||
-rw-r--r-- | internal/service/repository/create_test.go | 26 |
6 files changed, 220 insertions, 4 deletions
diff --git a/changelogs/unreleased/jc-replication.yml b/changelogs/unreleased/jc-replication.yml new file mode 100644 index 000000000..8450f6388 --- /dev/null +++ b/changelogs/unreleased/jc-replication.yml @@ -0,0 +1,5 @@ +--- +title: Replication logic +merge_request: 1219 +author: +type: added diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 9c6634a2e..7d91f358a 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -88,7 +90,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) // is encountered. func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error { conn, err := client.Dial(listenAddr, - []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))}, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)), + }, ) if err != nil { return err diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 723726567..60f47fa72 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -2,8 +2,11 @@ package praefect import ( "context" + "fmt" "time" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "github.com/sirupsen/logrus" ) @@ -17,7 +20,37 @@ type defaultReplicator struct { } func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, target Node) error { - dr.log.Infof("replicating from %v to target %q", source, target.Storage) + repository := &gitalypb.Repository{ + StorageName: target.Storage, + RelativePath: source.RelativePath, + } + remoteRepository := &gitalypb.Repository{ + StorageName: source.Storage, + RelativePath: source.RelativePath, + } + + repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc) + remoteClient := gitalypb.NewRemoteServiceClient(target.cc) + + // CreateRepository is idempotent + if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ + Repository: repository, + }); err != nil { + return fmt.Errorf("failed to create repository: %v", err) + } + + if _, err := remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{ + Repository: repository, + RemoteRepository: remoteRepository, + }); err != nil { + return err + } + // TODO: ensure attribute files are synced + // https://gitlab.com/gitlab-org/gitaly/issues/1655 + + // TODO: ensure objects/info/alternates are synced + // https://gitlab.com/gitlab-org/gitaly/issues/1674 + return nil } @@ -103,9 +136,7 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { - since := time.Time{} for { - r.log.Debugf("fetching replication jobs since %s", since) jobs, err := r.jobsStore.GetIncompleteJobs(r.storage, 10) if err != nil { return err diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 1f1955e81..8f6edb5d0 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -2,15 +2,24 @@ package praefect_test import ( "context" + "log" + "net" + "os" + "path/filepath" "testing" "time" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" + serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) @@ -119,3 +128,143 @@ func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Reposit return nil } + +func TestReplicate(t *testing.T) { + testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ + Message: "a commit", + }) + + defer cleanupFn() + var ( + cfg = config.Config{ + PrimaryServer: &config.GitalyServer{ + Name: "default", + ListenAddr: "tcp://gitaly-primary.example.com", + }, + SecondaryServers: []*config.GitalyServer{ + { + Name: "backup", + ListenAddr: "tcp://gitaly-backup1.example.com", + }, + }, + Whitelist: []string{ + testRepo.GetRelativePath(), + }, + } + ) + backupDir := filepath.Join(testhelper.GitlabTestStoragePath(), "backup") + require.NoError(t, os.Mkdir(backupDir, os.ModeDir|0755)) + defer func() { + os.RemoveAll(backupDir) + }() + + oldStorages := gitaly_config.Config.Storages + defer func() { + gitaly_config.Config.Storages = oldStorages + }() + + gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ + Name: "backup", + Path: backupDir, + }, gitaly_config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }) + + srv, socketPath := runFullGitalyServer(t) + defer srv.Stop() + + datastore := praefect.NewMemoryDatastore(cfg) + coordinator := praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name) + + coordinator.RegisterNode("backup", socketPath) + coordinator.RegisterNode("default", socketPath) + + replman := praefect.NewReplMgr( + cfg.SecondaryServers[0].Name, + logrus.New(), + datastore, + coordinator, + praefect.WithWhitelist([]string{testRepo.GetRelativePath()}), + ) + + ctx, cancel := testhelper.Context() + defer cancel() + + replman.ScheduleReplication(ctx, praefect.Repository{ + Storage: testRepo.GetStorageName(), + RelativePath: testRepo.GetRelativePath(), + }) + + md := testhelper.GitalyServersMetadata(t, socketPath) + ctx = metadata.NewOutgoingContext(ctx, md) + + go func() { + require.Error(t, context.Canceled, replman.ProcessBacklog(ctx)) + }() + + var tries int + jobs, err := datastore.GetIncompleteJobs("backup", 1) + require.NoError(t, err) + + for len(jobs) > 0 { + if tries > 10 { + t.Error("exceeded timeout") + } + time.Sleep(1 * time.Second) + tries++ + + jobs, err = datastore.GetIncompleteJobs("backup", 1) + require.NoError(t, err) + } + cancel() + + replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) + testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) +} + +func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { + server := serverPkg.NewInsecure(RubyServer) + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + if err != nil { + t.Fatal(err) + } + + go server.Serve(listener) + + return server, "unix://" + serverSocketPath +} + +var RubyServer *rubyserver.Server + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + defer testhelper.MustHaveNoChildProcess() + + testhelper.ConfigureRuby() + gitaly_config.Config.Auth = gitaly_config.Auth{Token: testhelper.RepositoryAuthToken} + + var err error + gitaly_config.Config.GitlabShell.Dir, err = filepath.Abs("testdata/gitlab-shell") + if err != nil { + log.Fatal(err) + } + + testhelper.ConfigureGitalySSH() + + RubyServer, err = rubyserver.Start() + if err != nil { + log.Fatal(err) + } + defer RubyServer.Stop() + + return m.Run() +} diff --git a/internal/praefect/testdata/gitlab-shell/hooks/.gitkeep b/internal/praefect/testdata/gitlab-shell/hooks/.gitkeep new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/internal/praefect/testdata/gitlab-shell/hooks/.gitkeep diff --git a/internal/service/repository/create_test.go b/internal/service/repository/create_test.go index 7a36c40f7..60c71f0b7 100644 --- a/internal/service/repository/create_test.go +++ b/internal/service/repository/create_test.go @@ -4,8 +4,10 @@ import ( "fmt" "os" "path" + "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/config" @@ -106,3 +108,27 @@ func TestCreateRepositoryFailureInvalidArgs(t *testing.T) { }) } } +func TestCreateRepositoryIdempotent(t *testing.T) { + server, serverSocketPath := runRepoServer(t) + defer server.Stop() + + client, conn := newRepositoryClient(t, serverSocketPath) + defer conn.Close() + + ctx, cancel := testhelper.Context() + defer cancel() + + testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + refsBefore := strings.Split(string(testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "for-each-ref")), "\n") + + req := &gitalypb.CreateRepositoryRequest{Repository: testRepo} + _, err := client.CreateRepository(ctx, req) + require.NoError(t, err) + + refsAfter := strings.Split(string(testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "for-each-ref")), "\n") + + assert.Equal(t, refsBefore, refsAfter) + +} |