Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-04-22 23:21:06 +0300
committerJohn Cai <jcai@gitlab.com>2019-05-20 19:39:41 +0300
commit49d72a2fd9926c3ef0330f3ca2961be01487f2f6 (patch)
tree1a092ce3cdc3a0fcf335da66aac419e5c3f0e725
parentef4a5d9721ceeb5ab09486856442ce6863563ab5 (diff)
Add replication logic to praefect replication manager
-rw-r--r--changelogs/unreleased/jc-replication.yml5
-rw-r--r--internal/praefect/coordinator.go7
-rw-r--r--internal/praefect/replicator.go37
-rw-r--r--internal/praefect/replicator_test.go149
-rw-r--r--internal/praefect/testdata/gitlab-shell/hooks/.gitkeep0
-rw-r--r--internal/service/repository/create_test.go26
6 files changed, 220 insertions, 4 deletions
diff --git a/changelogs/unreleased/jc-replication.yml b/changelogs/unreleased/jc-replication.yml
new file mode 100644
index 000000000..8450f6388
--- /dev/null
+++ b/changelogs/unreleased/jc-replication.yml
@@ -0,0 +1,5 @@
+---
+title: Replication logic
+merge_request: 1219
+author:
+type: added
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 9c6634a2e..7d91f358a 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -5,6 +5,8 @@ import (
"fmt"
"sync"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
@@ -88,7 +90,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string)
// is encountered.
func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error {
conn, err := client.Dial(listenAddr,
- []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))},
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)),
+ },
)
if err != nil {
return err
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 723726567..60f47fa72 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -2,8 +2,11 @@ package praefect
import (
"context"
+ "fmt"
"time"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+
"github.com/sirupsen/logrus"
)
@@ -17,7 +20,37 @@ type defaultReplicator struct {
}
func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, target Node) error {
- dr.log.Infof("replicating from %v to target %q", source, target.Storage)
+ repository := &gitalypb.Repository{
+ StorageName: target.Storage,
+ RelativePath: source.RelativePath,
+ }
+ remoteRepository := &gitalypb.Repository{
+ StorageName: source.Storage,
+ RelativePath: source.RelativePath,
+ }
+
+ repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc)
+ remoteClient := gitalypb.NewRemoteServiceClient(target.cc)
+
+ // CreateRepository is idempotent
+ if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
+ Repository: repository,
+ }); err != nil {
+ return fmt.Errorf("failed to create repository: %v", err)
+ }
+
+ if _, err := remoteClient.FetchInternalRemote(ctx, &gitalypb.FetchInternalRemoteRequest{
+ Repository: repository,
+ RemoteRepository: remoteRepository,
+ }); err != nil {
+ return err
+ }
+ // TODO: ensure attribute files are synced
+ // https://gitlab.com/gitlab-org/gitaly/issues/1655
+
+ // TODO: ensure objects/info/alternates are synced
+ // https://gitlab.com/gitlab-org/gitaly/issues/1674
+
return nil
}
@@ -103,9 +136,7 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
- since := time.Time{}
for {
- r.log.Debugf("fetching replication jobs since %s", since)
jobs, err := r.jobsStore.GetIncompleteJobs(r.storage, 10)
if err != nil {
return err
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 1f1955e81..8f6edb5d0 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -2,15 +2,24 @@ package praefect_test
import (
"context"
+ "log"
+ "net"
+ "os"
+ "path/filepath"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/rubyserver"
+ serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
@@ -119,3 +128,143 @@ func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Reposit
return nil
}
+
+func TestReplicate(t *testing.T) {
+ testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{
+ Message: "a commit",
+ })
+
+ defer cleanupFn()
+ var (
+ cfg = config.Config{
+ PrimaryServer: &config.GitalyServer{
+ Name: "default",
+ ListenAddr: "tcp://gitaly-primary.example.com",
+ },
+ SecondaryServers: []*config.GitalyServer{
+ {
+ Name: "backup",
+ ListenAddr: "tcp://gitaly-backup1.example.com",
+ },
+ },
+ Whitelist: []string{
+ testRepo.GetRelativePath(),
+ },
+ }
+ )
+ backupDir := filepath.Join(testhelper.GitlabTestStoragePath(), "backup")
+ require.NoError(t, os.Mkdir(backupDir, os.ModeDir|0755))
+ defer func() {
+ os.RemoveAll(backupDir)
+ }()
+
+ oldStorages := gitaly_config.Config.Storages
+ defer func() {
+ gitaly_config.Config.Storages = oldStorages
+ }()
+
+ gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
+ Name: "backup",
+ Path: backupDir,
+ }, gitaly_config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ })
+
+ srv, socketPath := runFullGitalyServer(t)
+ defer srv.Stop()
+
+ datastore := praefect.NewMemoryDatastore(cfg)
+ coordinator := praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name)
+
+ coordinator.RegisterNode("backup", socketPath)
+ coordinator.RegisterNode("default", socketPath)
+
+ replman := praefect.NewReplMgr(
+ cfg.SecondaryServers[0].Name,
+ logrus.New(),
+ datastore,
+ coordinator,
+ praefect.WithWhitelist([]string{testRepo.GetRelativePath()}),
+ )
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ replman.ScheduleReplication(ctx, praefect.Repository{
+ Storage: testRepo.GetStorageName(),
+ RelativePath: testRepo.GetRelativePath(),
+ })
+
+ md := testhelper.GitalyServersMetadata(t, socketPath)
+ ctx = metadata.NewOutgoingContext(ctx, md)
+
+ go func() {
+ require.Error(t, context.Canceled, replman.ProcessBacklog(ctx))
+ }()
+
+ var tries int
+ jobs, err := datastore.GetIncompleteJobs("backup", 1)
+ require.NoError(t, err)
+
+ for len(jobs) > 0 {
+ if tries > 10 {
+ t.Error("exceeded timeout")
+ }
+ time.Sleep(1 * time.Second)
+ tries++
+
+ jobs, err = datastore.GetIncompleteJobs("backup", 1)
+ require.NoError(t, err)
+ }
+ cancel()
+
+ replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
+ testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
+}
+
+func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
+ server := serverPkg.NewInsecure(RubyServer)
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", serverSocketPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ go server.Serve(listener)
+
+ return server, "unix://" + serverSocketPath
+}
+
+var RubyServer *rubyserver.Server
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ defer testhelper.MustHaveNoChildProcess()
+
+ testhelper.ConfigureRuby()
+ gitaly_config.Config.Auth = gitaly_config.Auth{Token: testhelper.RepositoryAuthToken}
+
+ var err error
+ gitaly_config.Config.GitlabShell.Dir, err = filepath.Abs("testdata/gitlab-shell")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ testhelper.ConfigureGitalySSH()
+
+ RubyServer, err = rubyserver.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer RubyServer.Stop()
+
+ return m.Run()
+}
diff --git a/internal/praefect/testdata/gitlab-shell/hooks/.gitkeep b/internal/praefect/testdata/gitlab-shell/hooks/.gitkeep
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/internal/praefect/testdata/gitlab-shell/hooks/.gitkeep
diff --git a/internal/service/repository/create_test.go b/internal/service/repository/create_test.go
index 7a36c40f7..60c71f0b7 100644
--- a/internal/service/repository/create_test.go
+++ b/internal/service/repository/create_test.go
@@ -4,8 +4,10 @@ import (
"fmt"
"os"
"path"
+ "strings"
"testing"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/config"
@@ -106,3 +108,27 @@ func TestCreateRepositoryFailureInvalidArgs(t *testing.T) {
})
}
}
+func TestCreateRepositoryIdempotent(t *testing.T) {
+ server, serverSocketPath := runRepoServer(t)
+ defer server.Stop()
+
+ client, conn := newRepositoryClient(t, serverSocketPath)
+ defer conn.Close()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ refsBefore := strings.Split(string(testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "for-each-ref")), "\n")
+
+ req := &gitalypb.CreateRepositoryRequest{Repository: testRepo}
+ _, err := client.CreateRepository(ctx, req)
+ require.NoError(t, err)
+
+ refsAfter := strings.Split(string(testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "for-each-ref")), "\n")
+
+ assert.Equal(t, refsBefore, refsAfter)
+
+}