Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2019-09-12 10:50:46 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-09-12 10:50:46 +0300
commit3bc16e2427cb705e902a2935c44466f3586445fd (patch)
treec438f92b3a6526bf0fc5a9961236448040657b49
parentb049feaa3847ad9e0201cd05a1a5592f6b8b246b (diff)
parentcbe829bf47181155aedaf7b83d51cc9f0fe42963 (diff)
Merge branch 'jc-confirm-checksums' into 'master'
Confirm checksums after replication Closes #1922 See merge request gitlab-org/gitaly!1479
-rw-r--r--changelogs/unreleased/jc-confirm-checksums.yml5
-rw-r--r--internal/praefect/replicator.go70
-rw-r--r--internal/praefect/replicator_test.go41
3 files changed, 109 insertions, 7 deletions
diff --git a/changelogs/unreleased/jc-confirm-checksums.yml b/changelogs/unreleased/jc-confirm-checksums.yml
new file mode 100644
index 000000000..e25906ead
--- /dev/null
+++ b/changelogs/unreleased/jc-confirm-checksums.yml
@@ -0,0 +1,5 @@
+---
+title: Confirm checksums after replication
+merge_request: 1479
+author:
+type: added
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 8e41c3879..32a884426 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -5,24 +5,25 @@ import (
"fmt"
"time"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/grpc"
+
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
-
- "github.com/sirupsen/logrus"
)
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, job ReplJob, target *grpc.ClientConn) error
+ Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Entry
}
-func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
repository := &gitalypb.Repository{
StorageName: job.TargetNode.Storage,
RelativePath: job.Repository.RelativePath,
@@ -49,6 +50,20 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC
}); err != nil {
return err
}
+
+ checksumsMatch, err := dr.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(sourceCC), repositoryClient, remoteRepository, repository)
+ if err != nil {
+ return err
+ }
+
+ // TODO: Do something meaninful with the result of confirmChecksums if checksums do not match
+ if !checksumsMatch {
+ dr.log.WithFields(logrus.Fields{
+ "primary": remoteRepository,
+ "replica": repository,
+ }).Error("checksums do not match")
+ }
+
// TODO: ensure attribute files are synced
// https://gitlab.com/gitlab-org/gitaly/issues/1655
@@ -58,6 +73,42 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC
return nil
}
+func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClient, repo *gitalypb.Repository, checksum *string) func() error {
+ return func() error {
+ primaryChecksumRes, err := client.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
+ Repository: repo,
+ })
+ if err != nil {
+ return err
+ }
+ *checksum = primaryChecksumRes.GetChecksum()
+ return nil
+ }
+}
+
+func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, replicaClient gitalypb.RepositoryServiceClient, primary, replica *gitalypb.Repository) (bool, error) {
+
+ g, gCtx := errgroup.WithContext(ctx)
+
+ var primaryChecksum, replicaChecksum string
+
+ g.Go(getChecksumFunc(gCtx, primaryClient, primary, &primaryChecksum))
+ g.Go(getChecksumFunc(gCtx, replicaClient, replica, &replicaChecksum))
+
+ if err := g.Wait(); err != nil {
+ return false, err
+ }
+
+ dr.log.WithFields(logrus.Fields{
+ "primary": primary,
+ "replica": replica,
+ "primary_checksum": primaryChecksum,
+ "replica_checksum": replicaChecksum,
+ }).Info("replication finished")
+
+ return primaryChecksum == replicaChecksum, nil
+}
+
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Entry
@@ -184,12 +235,17 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
return err
}
- cc, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ if err != nil {
+ return err
+ }
+
+ sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage)
if err != nil {
return err
}
- if err := r.replicator.Replicate(ctx, job, cc); err != nil {
+ if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil {
r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
return err
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b143061ac..d7a9aa3db 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -14,10 +14,12 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
func TestReplicate(t *testing.T) {
@@ -68,6 +70,8 @@ func TestReplicate(t *testing.T) {
require.NoError(t, err)
var replicator defaultReplicator
+ replicator.log = gitalylog.Default()
+
require.NoError(t, replicator.Replicate(
ctx,
ReplJob{
@@ -82,12 +86,49 @@ func TestReplicate(t *testing.T) {
},
},
conn,
+ conn,
))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
}
+func TestConfirmReplication(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ srv, srvSocketPath := runFullGitalyServer(t)
+ defer srv.Stop()
+
+ testRepoA, testRepoAPath, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ testRepoB, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)),
+ }
+ conn, err := grpc.Dial(srvSocketPath, connOpts...)
+ require.NoError(t, err)
+
+ var replicator defaultReplicator
+ replicator.log = gitalylog.Default()
+
+ equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
+ require.NoError(t, err)
+ require.True(t, equal)
+
+ testhelper.CreateCommit(t, testRepoAPath, "master", &testhelper.CreateCommitOpts{
+ Message: "a commit",
+ })
+
+ equal, err = replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
+ require.NoError(t, err)
+ require.False(t, equal)
+}
+
func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
server := serverPkg.NewInsecure(RubyServer)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()