Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-11-19 00:48:36 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-11-19 00:48:36 +0300
commita2d7d5ad16d47d64761786d760a02f79e666bbce (patch)
tree74f1fb3af86fbd76342255b737d650ab4e680aaa
parent3aa577d5e9ad572a22966775126076ec4dce2bfb (diff)
parent70fbd613c71a3bf725a8b334f332027b8ff7bab2 (diff)
Merge branch 'ps-reconcile-hangs' into 'master'
Praefect reconcile hangs and fails in case of an error during processing Closes #2839 and #2993 See merge request gitlab-org/gitaly!2795
-rw-r--r--changelogs/unreleased/ps-reconcile-hangs.yml5
-rw-r--r--internal/praefect/consistencycheck_test.go127
-rw-r--r--internal/praefect/service/info/consistencycheck.go34
-rw-r--r--internal/praefect/service/info/consistencycheck_test.go287
-rw-r--r--internal/praefect/service/info/testhelper_test.go21
5 files changed, 339 insertions, 135 deletions
diff --git a/changelogs/unreleased/ps-reconcile-hangs.yml b/changelogs/unreleased/ps-reconcile-hangs.yml
new file mode 100644
index 000000000..f63fe44f9
--- /dev/null
+++ b/changelogs/unreleased/ps-reconcile-hangs.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect reconcile hangs and fails in case of an error during processing
+merge_request: 2795
+author:
+type: fixed
diff --git a/internal/praefect/consistencycheck_test.go b/internal/praefect/consistencycheck_test.go
deleted file mode 100644
index 29fbc3e4a..000000000
--- a/internal/praefect/consistencycheck_test.go
+++ /dev/null
@@ -1,127 +0,0 @@
-package praefect
-
-import (
- "io"
- "os"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
- gconfig "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
-)
-
-func TestConsistencyCheck(t *testing.T) {
- oldStorages := gconfig.Config.Storages
- defer func() { gconfig.Config.Storages = oldStorages }()
-
- conf := config.Config{
- VirtualStorages: []*config.VirtualStorage{
- {
- Name: "praefect",
- Nodes: []*config.Node{
- 0: {
- Storage: "gitaly-0",
- Address: "tcp::/this-doesnt-matter",
- },
- 1: {
- Storage: "gitaly-1",
- Address: "tcp::/this-doesnt-matter",
- },
- },
- },
- },
- }
-
- virtualStorage := conf.VirtualStorages[0]
- primary := virtualStorage.Nodes[0]
- secondary := virtualStorage.Nodes[1]
-
- testStorages := []gconfig.Storage{
- {
- Name: virtualStorage.Nodes[0].Storage,
- Path: tempStoragePath(t),
- },
- {
- Name: virtualStorage.Nodes[1].Storage,
- Path: tempStoragePath(t),
- },
- }
-
- gconfig.Config.Storages = append(gconfig.Config.Storages, testStorages...)
- defer func() {
- for _, ts := range testStorages {
- require.NoError(t, os.RemoveAll(ts.Path))
- }
- }()
-
- repo0, _, cleanup0 := testhelper.NewTestRepo(t)
- defer cleanup0()
-
- _, _, cleanupReference := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[0].Storage)
- defer cleanupReference()
-
- _, targetRepoPath, cleanupTarget := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[1].Storage)
- defer cleanupTarget()
-
- cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
- defer cleanup()
-
- praefectCli := gitalypb.NewPraefectInfoServiceClient(cc)
-
- ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(10 * time.Second))
- defer cancel()
-
- disableReconcilliation := true
-
- requestConsistencyCheck := func() *gitalypb.ConsistencyCheckResponse {
- stream, err := praefectCli.ConsistencyCheck(ctx, &gitalypb.ConsistencyCheckRequest{
- VirtualStorage: virtualStorage.Name,
- ReferenceStorage: primary.Storage,
- TargetStorage: secondary.Storage,
- DisableReconcilliation: disableReconcilliation,
- })
- require.NoError(t, err)
-
- responses := consumeConsistencyCheckResponses(t, stream)
- require.Len(t, responses, 1)
-
- resp := responses[0]
-
- require.Equal(t, repo0.RelativePath, resp.RepoRelativePath)
- require.Equal(t, primary.Storage, resp.ReferenceStorage)
-
- return resp
- }
-
- resp := requestConsistencyCheck()
- require.Equal(t, resp.ReferenceChecksum, resp.TargetChecksum,
- "both repos expected to be consistent after initial clone")
- require.Zero(t, resp.ReplJobId)
-
- testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "update-ref", "HEAD", "spooky-stuff")
-
- resp = requestConsistencyCheck()
- require.NotEqual(t, resp.ReferenceChecksum, resp.TargetChecksum,
- "repos should no longer be consistent after target HEAD changed")
- require.Zero(t, resp.ReplJobId)
-
- disableReconcilliation = false
- resp = requestConsistencyCheck()
- require.NotZero(t, resp.ReplJobId)
-}
-
-func consumeConsistencyCheckResponses(t *testing.T, stream gitalypb.PraefectInfoService_ConsistencyCheckClient) []*gitalypb.ConsistencyCheckResponse {
- var responses []*gitalypb.ConsistencyCheckResponse
- for {
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- require.NoError(t, err)
- responses = append(responses, resp)
- }
- return responses
-}
diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go
index 24510db79..1a1e2578c 100644
--- a/internal/praefect/service/info/consistencycheck.go
+++ b/internal/praefect/service/info/consistencycheck.go
@@ -103,7 +103,12 @@ func walkRepos(ctx context.Context, walkerQ chan<- string, reference nodes.Node)
if err != nil {
return err
}
- walkerQ <- resp.GetRelativePath()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case walkerQ <- resp.GetRelativePath():
+ }
}
}
@@ -134,7 +139,18 @@ type checksumResult struct {
func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ chan<- checksumResult, target, reference nodes.Node, virtualStorage string) error {
defer close(checksumResultQ)
- for repoRelPath := range relpathQ {
+ for {
+ var repoRelPath string
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case repoPath, ok := <-relpathQ:
+ if !ok {
+ return nil
+ }
+ repoRelPath = repoPath
+ }
+
cs := checksumResult{
virtualStorage: virtualStorage,
relativePath: repoRelPath,
@@ -142,10 +158,10 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ
referenceStorage: reference.GetStorage(),
}
- g, ctx := errgroup.WithContext(ctx)
+ g, gctx := errgroup.WithContext(ctx)
g.Go(func() (err error) {
- cs.target, err = checksumRepo(ctx, repoRelPath, target)
+ cs.target, err = checksumRepo(gctx, repoRelPath, target)
if status.Code(err) == codes.NotFound {
// missing repo on target is okay, we need to
// replicate from reference
@@ -155,7 +171,7 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ
})
g.Go(func() (err error) {
- cs.reference, err = checksumRepo(ctx, repoRelPath, reference)
+ cs.reference, err = checksumRepo(gctx, repoRelPath, reference)
return err
})
@@ -163,10 +179,12 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ
return err
}
- checksumResultQ <- cs
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case checksumResultQ <- cs:
+ }
}
-
- return nil
}
func scheduleReplication(ctx context.Context, csr checksumResult, q datastore.ReplicationEventQueue, resp *gitalypb.ConsistencyCheckResponse) error {
diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go
new file mode 100644
index 000000000..56092aaf6
--- /dev/null
+++ b/internal/praefect/service/info/consistencycheck_test.go
@@ -0,0 +1,287 @@
+package info
+
+import (
+ "context"
+ "io"
+ "net"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/client"
+ gconfig "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/internalgitaly"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func TestServer_ConsistencyCheck(t *testing.T) {
+ defer func(old gconfig.Cfg) { gconfig.Config = old }(gconfig.Config)
+
+ primaryStorageDir, cleanupPrim := testhelper.TempDir(t)
+ defer cleanupPrim()
+ secondaryStorageDir, cleanupSec := testhelper.TempDir(t)
+ defer cleanupSec()
+
+ // 1.git exists on both storages and it is the same
+ testhelper.NewTestRepoTo(t, primaryStorageDir, "1.git")
+ testhelper.NewTestRepoTo(t, secondaryStorageDir, "1.git")
+ // 2.git exists only on target storage (where traversal happens)
+ testhelper.NewTestRepoTo(t, secondaryStorageDir, "2.git")
+ // not.git is a folder on target storage that should be skipped as it is not a git repository
+ require.NoError(t, os.MkdirAll(filepath.Join(secondaryStorageDir, "not.git"), os.ModePerm))
+
+ gconfig.Config.Storages = []gconfig.Storage{{
+ Name: "target",
+ Path: secondaryStorageDir,
+ }, {
+ Name: "reference",
+ Path: primaryStorageDir,
+ }}
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{{
+ Name: "vs",
+ Nodes: []*config.Node{{
+ Storage: "reference",
+ Address: testhelper.GetTemporaryGitalySocketFileName(),
+ }, {
+ Storage: "target",
+ Address: testhelper.GetTemporaryGitalySocketFileName(),
+ }},
+ }},
+ }
+
+ for _, node := range conf.VirtualStorages[0].Nodes {
+ gitalyListener, err := net.Listen("unix", node.Address)
+ require.NoError(t, err)
+
+ gitalySrv := grpc.NewServer()
+ defer gitalySrv.Stop()
+ gitalypb.RegisterRepositoryServiceServer(gitalySrv, repository.NewServer(gconfig.Config, nil, gconfig.NewLocator(gconfig.Config), ""))
+ gitalypb.RegisterInternalGitalyServer(gitalySrv, internalgitaly.NewServer(gconfig.Config.Storages))
+ go func() { gitalySrv.Serve(gitalyListener) }()
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ referenceConn, err := client.DialContext(ctx, "unix://"+conf.VirtualStorages[0].Nodes[0].Address, nil)
+ require.NoError(t, err)
+ defer referenceConn.Close()
+
+ targetConn, err := client.DialContext(ctx, "unix://"+conf.VirtualStorages[0].Nodes[1].Address, nil)
+ require.NoError(t, err)
+ defer targetConn.Close()
+
+ nm := &nodes.MockManager{
+ GetShardFunc: func(s string) (nodes.Shard, error) {
+ if s != conf.VirtualStorages[0].Name {
+ return nodes.Shard{}, nodes.ErrVirtualStorageNotExist
+ }
+ return nodes.Shard{
+ Primary: &nodes.MockNode{
+ GetStorageMethod: func() string { return gconfig.Config.Storages[0].Name },
+ Conn: referenceConn,
+ Healthy: true,
+ },
+ Secondaries: []nodes.Node{&nodes.MockNode{
+ GetStorageMethod: func() string { return gconfig.Config.Storages[1].Name },
+ Conn: targetConn,
+ Healthy: true,
+ }},
+ }, nil
+ },
+ }
+
+ praefectAddr := testhelper.GetTemporaryGitalySocketFileName()
+ praefectListener, err := net.Listen("unix", praefectAddr)
+ require.NoError(t, err)
+ defer praefectListener.Close()
+
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ queue.OnEnqueue(func(ctx context.Context, e datastore.ReplicationEvent, q datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ return datastore.ReplicationEvent{ID: 1}, nil
+ })
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+
+ grpcSrv := grpc.NewServer()
+ defer grpcSrv.Stop()
+
+ gitalypb.RegisterPraefectInfoServiceServer(grpcSrv, NewServer(nm, conf, queue, rs))
+ go grpcSrv.Serve(praefectListener)
+
+ infoConn, err := client.Dial("unix://"+praefectAddr, nil)
+ require.NoError(t, err)
+ defer infoConn.Close()
+
+ infoClient := gitalypb.NewPraefectInfoServiceClient(infoConn)
+
+ for _, tc := range []struct {
+ desc string
+ req gitalypb.ConsistencyCheckRequest
+ verify func(*testing.T, []*gitalypb.ConsistencyCheckResponse, error)
+ }{
+ {
+ desc: "with replication event created",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "reference",
+ ReferenceStorage: "target",
+ DisableReconcilliation: false,
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.NoError(t, err)
+ require.Equal(t, []*gitalypb.ConsistencyCheckResponse{
+ {
+ RepoRelativePath: "1.git",
+ TargetChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea",
+ ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea",
+ ReplJobId: 0,
+ ReferenceStorage: "target",
+ },
+ {
+ RepoRelativePath: "2.git",
+ TargetChecksum: "",
+ ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea",
+ ReplJobId: 1,
+ ReferenceStorage: "target",
+ },
+ }, resp)
+ },
+ },
+ {
+ desc: "without replication event",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "reference",
+ ReferenceStorage: "target",
+ DisableReconcilliation: true,
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.NoError(t, err)
+ require.Equal(t, []*gitalypb.ConsistencyCheckResponse{
+ {
+ RepoRelativePath: "1.git",
+ TargetChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea",
+ ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea",
+ ReplJobId: 0,
+ ReferenceStorage: "target",
+ },
+ {
+ RepoRelativePath: "2.git",
+ TargetChecksum: "",
+ ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea",
+ ReplJobId: 0,
+ ReferenceStorage: "target",
+ },
+ }, resp)
+ },
+ },
+ {
+ desc: "no target",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "",
+ ReferenceStorage: "reference",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.InvalidArgument, "missing target storage"), err)
+ },
+ },
+ {
+ desc: "unknown target",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "unknown",
+ ReferenceStorage: "reference",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.NotFound, `unable to find target storage "unknown"`), err)
+ },
+ },
+ {
+ desc: "no reference",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "target",
+ ReferenceStorage: "",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.InvalidArgument, `target storage "target" is same as current primary, must provide alternate reference`), err)
+ },
+ },
+ {
+ desc: "unknown reference",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "target",
+ ReferenceStorage: "unknown",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.NotFound, `unable to find reference storage "unknown" in nodes for shard "vs"`), err)
+ },
+ },
+ {
+ desc: "same storage",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "vs",
+ TargetStorage: "target",
+ ReferenceStorage: "target",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.InvalidArgument, `target storage "target" cannot match reference storage "target"`), err)
+ },
+ },
+ {
+ desc: "no virtual",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "",
+ TargetStorage: "target",
+ ReferenceStorage: "reference",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.InvalidArgument, "missing virtual storage"), err)
+ },
+ },
+ {
+ desc: "unknown virtual",
+ req: gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: "unknown",
+ TargetStorage: "target",
+ ReferenceStorage: "unknown",
+ },
+ verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) {
+ require.Equal(t, status.Error(codes.NotFound, "virtual storage does not exist"), err)
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ response, err := infoClient.ConsistencyCheck(ctx, &tc.req)
+ require.NoError(t, err)
+
+ var results []*gitalypb.ConsistencyCheckResponse
+ var result *gitalypb.ConsistencyCheckResponse
+ for {
+ result, err = response.Recv()
+ if err != nil {
+ break
+ }
+ results = append(results, result)
+ }
+
+ if err == io.EOF {
+ err = nil
+ }
+ tc.verify(t, results, err)
+ })
+ }
+}
diff --git a/internal/praefect/service/info/testhelper_test.go b/internal/praefect/service/info/testhelper_test.go
new file mode 100644
index 000000000..ae20d2d53
--- /dev/null
+++ b/internal/praefect/service/info/testhelper_test.go
@@ -0,0 +1,21 @@
+package info
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ defer testhelper.MustHaveNoChildProcess()
+
+ cleanup := testhelper.Configure()
+ defer cleanup()
+
+ return m.Run()
+}