Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-09-17 08:09:18 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-09-17 08:09:18 +0300
commitad9337afb7b0269ae3df13f0ecaa6b0d8944c636 (patch)
tree7d661c050f447c857a920c549df542bb19eb5a6d
parent5fbf3687e73736db23ffd30475ab94d7818bfc13 (diff)
Fix replicator bug due to bad merge
Add tests as well to give coverage to the processReplJob method
-rw-r--r--changelogs/unreleased/po-demo-fix-replication.yml5
-rw-r--r--internal/praefect/replicator.go6
-rw-r--r--internal/praefect/replicator_test.go146
3 files changed, 57 insertions, 100 deletions
diff --git a/changelogs/unreleased/po-demo-fix-replication.yml b/changelogs/unreleased/po-demo-fix-replication.yml
new file mode 100644
index 000000000..5f48f6a07
--- /dev/null
+++ b/changelogs/unreleased/po-demo-fix-replication.yml
@@ -0,0 +1,5 @@
+---
+title: Replicator fixes from demo
+merge_request: 1487
+author:
+type: fixed
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index c8de2716b..61cfdceed 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -285,11 +285,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error {
return err
}
- if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
- }
- injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "")
+ injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
return err
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 60dc69cda..b50db5194 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -1,7 +1,6 @@
package praefect
import (
- "context"
"io/ioutil"
"log"
"net"
@@ -9,14 +8,12 @@ import (
"path/filepath"
"sync"
"testing"
- "time"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
@@ -25,7 +22,22 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-func TestReplicate(t *testing.T) {
+func TestProceessReplicationJob(t *testing.T) {
+ mockReplicationLatency := &testhelper.MockHistogram{}
+ mockReplicationGauge := &testhelper.MockGauge{}
+
+ recordReplicationLatency = func(d float64) {
+ mockReplicationLatency.Observe(d)
+ }
+
+ incReplicationJobsInFlight = func() {
+ mockReplicationGauge.Inc()
+ }
+
+ decReplicationJobsInFlight = func() {
+ mockReplicationGauge.Dec()
+ }
+
srv, srvSocketPath := runFullGitalyServer(t)
defer srv.Stop()
@@ -36,6 +48,7 @@ func TestReplicate(t *testing.T) {
backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName)
require.NoError(t, err)
+
defer func() {
os.RemoveAll(backupDir)
}()
@@ -55,45 +68,53 @@ func TestReplicate(t *testing.T) {
},
)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ job := jobRecord{state: JobStateReady}
- connOpts := []grpc.DialOption{
- grpc.WithInsecure(),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)),
+ m := &MemoryDatastore{
+ jobs: &struct {
+ sync.RWMutex
+ records map[uint64]jobRecord // all jobs indexed by ID
+ }{
+ records: map[uint64]jobRecord{1: job},
+ },
}
- conn, err := grpc.Dial(srvSocketPath, connOpts...)
- require.NoError(t, err)
+
+ replJob := ReplJob{ID: 1,
+ TargetNode: models.Node{Storage: backupStorageName, Address: srvSocketPath},
+ SourceNode: models.Node{Storage: "default", Address: srvSocketPath, Token: testhelper.RepositoryAuthToken},
+ Repository: models.Repository{Primary: models.Node{Storage: "default", Address: srvSocketPath}, RelativePath: testRepo.GetRelativePath()},
+ State: JobStateReady,
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{
Message: "a commit",
})
- ctx, err = helper.InjectGitalyServers(ctx, "default", srvSocketPath, testhelper.RepositoryAuthToken)
- require.NoError(t, err)
-
var replicator defaultReplicator
replicator.log = gitaly_log.Default()
- require.NoError(t, replicator.Replicate(
- ctx,
- ReplJob{
- Repository: models.Repository{
- RelativePath: testRepo.GetRelativePath(),
- },
- SourceNode: models.Node{
- Storage: "default",
- },
- TargetNode: models.Node{
- Storage: backupStorageName,
- },
- },
- conn,
- conn,
- ))
+ coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)}
+ coordinator.RegisterNode("default", srvSocketPath)
+ coordinator.RegisterNode("backup", srvSocketPath)
+
+ replMgr := &ReplMgr{
+ log: gitaly_log.Default(),
+ datastore: m,
+ coordinator: coordinator,
+ replicator: replicator,
+ }
+
+ require.NoError(t, replMgr.processReplJob(ctx, replJob))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
+
+ require.Equal(t, 1, mockReplicationGauge.IncrementCalled)
+ require.Equal(t, 1, mockReplicationGauge.DecrementCalled)
+ require.Len(t, mockReplicationLatency.Values, 1)
}
func TestConfirmReplication(t *testing.T) {
@@ -130,72 +151,7 @@ func TestConfirmReplication(t *testing.T) {
equal, err = replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
require.NoError(t, err)
require.False(t, equal)
-}
-
-type noopReplicator struct {
- replicationLatency time.Duration
-}
-
-func (n *noopReplicator) Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error {
- time.Sleep(n.replicationLatency)
- return nil
-}
-
-func TestReplicationMetrics(t *testing.T) {
- mockReplicationLatency := &testhelper.MockHistogram{}
- mockReplicationGauge := &testhelper.MockGauge{}
-
- recordReplicationLatency = func(d float64) {
- mockReplicationLatency.Observe(d)
- }
-
- incReplicationJobsInFlight = func() {
- mockReplicationGauge.Inc()
- }
-
- decReplicationJobsInFlight = func() {
- mockReplicationGauge.Dec()
- }
-
- job := jobRecord{state: JobStateReady}
-
- m := &MemoryDatastore{
- jobs: &struct {
- sync.RWMutex
- records map[uint64]jobRecord // all jobs indexed by ID
- }{
- records: map[uint64]jobRecord{1: job},
- },
- }
-
- replJob := ReplJob{ID: 1,
- TargetNode: models.Node{Storage: "target"},
- SourceNode: models.Node{Storage: "source"},
- Repository: models.Repository{Primary: models.Node{Storage: "target"}},
- State: JobStateReady,
- }
-
- coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)}
- coordinator.RegisterNode("source", "tcp://127.0.0.1")
- coordinator.RegisterNode("target", "tcp://127.0.0.1")
-
- replicationLatencyMs := 10
- replMgr := &ReplMgr{
- log: gitaly_log.Default(),
- datastore: m,
- coordinator: coordinator,
- replicator: &noopReplicator{replicationLatency: time.Duration(replicationLatencyMs) * time.Millisecond},
- }
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- require.NoError(t, replMgr.processReplJob(ctx, replJob))
- require.Equal(t, 1, mockReplicationGauge.IncrementCalled)
- require.Equal(t, 1, mockReplicationGauge.DecrementCalled)
- require.Len(t, mockReplicationLatency.Values, 1)
- require.Equal(t, replicationLatencyMs, int(mockReplicationLatency.Values[0]))
}
func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {