diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-09-12 10:50:46 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-09-12 10:50:46 +0300 |
commit | 3bc16e2427cb705e902a2935c44466f3586445fd (patch) | |
tree | c438f92b3a6526bf0fc5a9961236448040657b49 | |
parent | b049feaa3847ad9e0201cd05a1a5592f6b8b246b (diff) | |
parent | cbe829bf47181155aedaf7b83d51cc9f0fe42963 (diff) |
Merge branch 'jc-confirm-checksums' into 'master'
Confirm checksums after replication
Closes #1922
See merge request gitlab-org/gitaly!1479
-rw-r--r-- | changelogs/unreleased/jc-confirm-checksums.yml | 5 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 70 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 41 |
3 files changed, 109 insertions, 7 deletions
diff --git a/changelogs/unreleased/jc-confirm-checksums.yml b/changelogs/unreleased/jc-confirm-checksums.yml new file mode 100644 index 000000000..e25906ead --- /dev/null +++ b/changelogs/unreleased/jc-confirm-checksums.yml @@ -0,0 +1,5 @@ +--- +title: Confirm checksums after replication +merge_request: 1479 +author: +type: added diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 8e41c3879..32a884426 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -5,24 +5,25 @@ import ( "fmt" "time" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" - - "github.com/sirupsen/logrus" ) // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, job ReplJob, target *grpc.ClientConn) error + Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Entry } -func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC, targetCC *grpc.ClientConn) error { repository := &gitalypb.Repository{ StorageName: job.TargetNode.Storage, RelativePath: job.Repository.RelativePath, @@ -49,6 +50,20 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC }); err != nil { return err } + + checksumsMatch, err := dr.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(sourceCC), repositoryClient, remoteRepository, repository) + if err != nil { + return err + } + + // TODO: Do something meaninful with the result of confirmChecksums if checksums do not match + if !checksumsMatch { + dr.log.WithFields(logrus.Fields{ + "primary": remoteRepository, + "replica": repository, + }).Error("checksums do not match") + } + // TODO: ensure attribute files are synced // https://gitlab.com/gitlab-org/gitaly/issues/1655 @@ -58,6 +73,42 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC return nil } +func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClient, repo *gitalypb.Repository, checksum *string) func() error { + return func() error { + primaryChecksumRes, err := client.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{ + Repository: repo, + }) + if err != nil { + return err + } + *checksum = primaryChecksumRes.GetChecksum() + return nil + } +} + +func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, replicaClient gitalypb.RepositoryServiceClient, primary, replica *gitalypb.Repository) (bool, error) { + + g, gCtx := errgroup.WithContext(ctx) + + var primaryChecksum, replicaChecksum string + + g.Go(getChecksumFunc(gCtx, primaryClient, primary, &primaryChecksum)) + g.Go(getChecksumFunc(gCtx, replicaClient, replica, &replicaChecksum)) + + if err := g.Wait(); err != nil { + return false, err + } + + dr.log.WithFields(logrus.Fields{ + "primary": primary, + "replica": replica, + "primary_checksum": primaryChecksum, + "replica_checksum": replicaChecksum, + }).Info("replication finished") + + return primaryChecksum == replicaChecksum, nil +} + // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Entry @@ -184,12 +235,17 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { return err } - cc, err := r.coordinator.GetConnection(job.TargetNode.Storage) + targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage) + if err != nil { + return err + } + + sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage) if err != nil { return err } - if err := r.replicator.Replicate(ctx, job, cc); err != nil { + if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil { r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") return err } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b143061ac..d7a9aa3db 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -14,10 +14,12 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" + gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) func TestReplicate(t *testing.T) { @@ -68,6 +70,8 @@ func TestReplicate(t *testing.T) { require.NoError(t, err) var replicator defaultReplicator + replicator.log = gitalylog.Default() + require.NoError(t, replicator.Replicate( ctx, ReplJob{ @@ -82,12 +86,49 @@ func TestReplicate(t *testing.T) { }, }, conn, + conn, )) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) } +func TestConfirmReplication(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + srv, srvSocketPath := runFullGitalyServer(t) + defer srv.Stop() + + testRepoA, testRepoAPath, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + testRepoB, _, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)), + } + conn, err := grpc.Dial(srvSocketPath, connOpts...) + require.NoError(t, err) + + var replicator defaultReplicator + replicator.log = gitalylog.Default() + + equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) + require.NoError(t, err) + require.True(t, equal) + + testhelper.CreateCommit(t, testRepoAPath, "master", &testhelper.CreateCommitOpts{ + Message: "a commit", + }) + + equal, err = replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) + require.NoError(t, err) + require.False(t, equal) +} + func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { server := serverPkg.NewInsecure(RubyServer) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() |