diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-09-17 08:09:18 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-09-17 08:09:18 +0300 |
commit | ad9337afb7b0269ae3df13f0ecaa6b0d8944c636 (patch) | |
tree | 7d661c050f447c857a920c549df542bb19eb5a6d | |
parent | 5fbf3687e73736db23ffd30475ab94d7818bfc13 (diff) |
Fix replicator bug due to bad merge
Add tests as well to give coverage to the processReplJob method
-rw-r--r-- | changelogs/unreleased/po-demo-fix-replication.yml | 5 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 6 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 146 |
3 files changed, 57 insertions, 100 deletions
diff --git a/changelogs/unreleased/po-demo-fix-replication.yml b/changelogs/unreleased/po-demo-fix-replication.yml new file mode 100644 index 000000000..5f48f6a07 --- /dev/null +++ b/changelogs/unreleased/po-demo-fix-replication.yml @@ -0,0 +1,5 @@ +--- +title: Replicator fixes from demo +merge_request: 1487 +author: +type: fixed diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index c8de2716b..61cfdceed 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -285,11 +285,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error { return err } - if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil { - r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") - return err - } - injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "") + injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, job.SourceNode.Token) if err != nil { return err } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 60dc69cda..b50db5194 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -1,7 +1,6 @@ package praefect import ( - "context" "io/ioutil" "log" "net" @@ -9,14 +8,12 @@ import ( "path/filepath" "sync" "testing" - "time" "github.com/stretchr/testify/require" "google.golang.org/grpc" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" - "gitlab.com/gitlab-org/gitaly/internal/helper" gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -25,7 +22,22 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -func TestReplicate(t *testing.T) { +func TestProceessReplicationJob(t *testing.T) { + mockReplicationLatency := &testhelper.MockHistogram{} + mockReplicationGauge := &testhelper.MockGauge{} + + recordReplicationLatency = func(d float64) { + mockReplicationLatency.Observe(d) + } + + incReplicationJobsInFlight = func() { + mockReplicationGauge.Inc() + } + + decReplicationJobsInFlight = func() { + mockReplicationGauge.Dec() + } + srv, srvSocketPath := runFullGitalyServer(t) defer srv.Stop() @@ -36,6 +48,7 @@ func TestReplicate(t *testing.T) { backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName) require.NoError(t, err) + defer func() { os.RemoveAll(backupDir) }() @@ -55,45 +68,53 @@ func TestReplicate(t *testing.T) { }, ) - ctx, cancel := testhelper.Context() - defer cancel() + job := jobRecord{state: JobStateReady} - connOpts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)), + m := &MemoryDatastore{ + jobs: &struct { + sync.RWMutex + records map[uint64]jobRecord // all jobs indexed by ID + }{ + records: map[uint64]jobRecord{1: job}, + }, } - conn, err := grpc.Dial(srvSocketPath, connOpts...) - require.NoError(t, err) + + replJob := ReplJob{ID: 1, + TargetNode: models.Node{Storage: backupStorageName, Address: srvSocketPath}, + SourceNode: models.Node{Storage: "default", Address: srvSocketPath, Token: testhelper.RepositoryAuthToken}, + Repository: models.Repository{Primary: models.Node{Storage: "default", Address: srvSocketPath}, RelativePath: testRepo.GetRelativePath()}, + State: JobStateReady, + } + + ctx, cancel := testhelper.Context() + defer cancel() commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ Message: "a commit", }) - ctx, err = helper.InjectGitalyServers(ctx, "default", srvSocketPath, testhelper.RepositoryAuthToken) - require.NoError(t, err) - var replicator defaultReplicator replicator.log = gitaly_log.Default() - require.NoError(t, replicator.Replicate( - ctx, - ReplJob{ - Repository: models.Repository{ - RelativePath: testRepo.GetRelativePath(), - }, - SourceNode: models.Node{ - Storage: "default", - }, - TargetNode: models.Node{ - Storage: backupStorageName, - }, - }, - conn, - conn, - )) + coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)} + coordinator.RegisterNode("default", srvSocketPath) + coordinator.RegisterNode("backup", srvSocketPath) + + replMgr := &ReplMgr{ + log: gitaly_log.Default(), + datastore: m, + coordinator: coordinator, + replicator: replicator, + } + + require.NoError(t, replMgr.processReplJob(ctx, replJob)) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) + + require.Equal(t, 1, mockReplicationGauge.IncrementCalled) + require.Equal(t, 1, mockReplicationGauge.DecrementCalled) + require.Len(t, mockReplicationLatency.Values, 1) } func TestConfirmReplication(t *testing.T) { @@ -130,72 +151,7 @@ func TestConfirmReplication(t *testing.T) { equal, err = replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) require.NoError(t, err) require.False(t, equal) -} - -type noopReplicator struct { - replicationLatency time.Duration -} - -func (n *noopReplicator) Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error { - time.Sleep(n.replicationLatency) - return nil -} - -func TestReplicationMetrics(t *testing.T) { - mockReplicationLatency := &testhelper.MockHistogram{} - mockReplicationGauge := &testhelper.MockGauge{} - - recordReplicationLatency = func(d float64) { - mockReplicationLatency.Observe(d) - } - - incReplicationJobsInFlight = func() { - mockReplicationGauge.Inc() - } - - decReplicationJobsInFlight = func() { - mockReplicationGauge.Dec() - } - - job := jobRecord{state: JobStateReady} - - m := &MemoryDatastore{ - jobs: &struct { - sync.RWMutex - records map[uint64]jobRecord // all jobs indexed by ID - }{ - records: map[uint64]jobRecord{1: job}, - }, - } - - replJob := ReplJob{ID: 1, - TargetNode: models.Node{Storage: "target"}, - SourceNode: models.Node{Storage: "source"}, - Repository: models.Repository{Primary: models.Node{Storage: "target"}}, - State: JobStateReady, - } - - coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)} - coordinator.RegisterNode("source", "tcp://127.0.0.1") - coordinator.RegisterNode("target", "tcp://127.0.0.1") - - replicationLatencyMs := 10 - replMgr := &ReplMgr{ - log: gitaly_log.Default(), - datastore: m, - coordinator: coordinator, - replicator: &noopReplicator{replicationLatency: time.Duration(replicationLatencyMs) * time.Millisecond}, - } - - ctx, cancel := testhelper.Context() - defer cancel() - - require.NoError(t, replMgr.processReplJob(ctx, replJob)) - require.Equal(t, 1, mockReplicationGauge.IncrementCalled) - require.Equal(t, 1, mockReplicationGauge.DecrementCalled) - require.Len(t, mockReplicationLatency.Values, 1) - require.Equal(t, replicationLatencyMs, int(mockReplicationLatency.Values[0])) } func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { |