diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-11-19 00:48:36 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-11-19 00:48:36 +0300 |
commit | a2d7d5ad16d47d64761786d760a02f79e666bbce (patch) | |
tree | 74f1fb3af86fbd76342255b737d650ab4e680aaa | |
parent | 3aa577d5e9ad572a22966775126076ec4dce2bfb (diff) | |
parent | 70fbd613c71a3bf725a8b334f332027b8ff7bab2 (diff) |
Merge branch 'ps-reconcile-hangs' into 'master'
Praefect reconcile hangs and fails in case of an error during processing
Closes #2839 and #2993
See merge request gitlab-org/gitaly!2795
-rw-r--r-- | changelogs/unreleased/ps-reconcile-hangs.yml | 5 | ||||
-rw-r--r-- | internal/praefect/consistencycheck_test.go | 127 | ||||
-rw-r--r-- | internal/praefect/service/info/consistencycheck.go | 34 | ||||
-rw-r--r-- | internal/praefect/service/info/consistencycheck_test.go | 287 | ||||
-rw-r--r-- | internal/praefect/service/info/testhelper_test.go | 21 |
5 files changed, 339 insertions, 135 deletions
diff --git a/changelogs/unreleased/ps-reconcile-hangs.yml b/changelogs/unreleased/ps-reconcile-hangs.yml new file mode 100644 index 000000000..f63fe44f9 --- /dev/null +++ b/changelogs/unreleased/ps-reconcile-hangs.yml @@ -0,0 +1,5 @@ +--- +title: Praefect reconcile hangs and fails in case of an error during processing +merge_request: 2795 +author: +type: fixed diff --git a/internal/praefect/consistencycheck_test.go b/internal/praefect/consistencycheck_test.go deleted file mode 100644 index 29fbc3e4a..000000000 --- a/internal/praefect/consistencycheck_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package praefect - -import ( - "io" - "os" - "testing" - "time" - - "github.com/stretchr/testify/require" - gconfig "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" -) - -func TestConsistencyCheck(t *testing.T) { - oldStorages := gconfig.Config.Storages - defer func() { gconfig.Config.Storages = oldStorages }() - - conf := config.Config{ - VirtualStorages: []*config.VirtualStorage{ - { - Name: "praefect", - Nodes: []*config.Node{ - 0: { - Storage: "gitaly-0", - Address: "tcp::/this-doesnt-matter", - }, - 1: { - Storage: "gitaly-1", - Address: "tcp::/this-doesnt-matter", - }, - }, - }, - }, - } - - virtualStorage := conf.VirtualStorages[0] - primary := virtualStorage.Nodes[0] - secondary := virtualStorage.Nodes[1] - - testStorages := []gconfig.Storage{ - { - Name: virtualStorage.Nodes[0].Storage, - Path: tempStoragePath(t), - }, - { - Name: virtualStorage.Nodes[1].Storage, - Path: tempStoragePath(t), - }, - } - - gconfig.Config.Storages = append(gconfig.Config.Storages, testStorages...) - defer func() { - for _, ts := range testStorages { - require.NoError(t, os.RemoveAll(ts.Path)) - } - }() - - repo0, _, cleanup0 := testhelper.NewTestRepo(t) - defer cleanup0() - - _, _, cleanupReference := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[0].Storage) - defer cleanupReference() - - _, targetRepoPath, cleanupTarget := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[1].Storage) - defer cleanupTarget() - - cc, _, cleanup := runPraefectServerWithGitaly(t, conf) - defer cleanup() - - praefectCli := gitalypb.NewPraefectInfoServiceClient(cc) - - ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(10 * time.Second)) - defer cancel() - - disableReconcilliation := true - - requestConsistencyCheck := func() *gitalypb.ConsistencyCheckResponse { - stream, err := praefectCli.ConsistencyCheck(ctx, &gitalypb.ConsistencyCheckRequest{ - VirtualStorage: virtualStorage.Name, - ReferenceStorage: primary.Storage, - TargetStorage: secondary.Storage, - DisableReconcilliation: disableReconcilliation, - }) - require.NoError(t, err) - - responses := consumeConsistencyCheckResponses(t, stream) - require.Len(t, responses, 1) - - resp := responses[0] - - require.Equal(t, repo0.RelativePath, resp.RepoRelativePath) - require.Equal(t, primary.Storage, resp.ReferenceStorage) - - return resp - } - - resp := requestConsistencyCheck() - require.Equal(t, resp.ReferenceChecksum, resp.TargetChecksum, - "both repos expected to be consistent after initial clone") - require.Zero(t, resp.ReplJobId) - - testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "update-ref", "HEAD", "spooky-stuff") - - resp = requestConsistencyCheck() - require.NotEqual(t, resp.ReferenceChecksum, resp.TargetChecksum, - "repos should no longer be consistent after target HEAD changed") - require.Zero(t, resp.ReplJobId) - - disableReconcilliation = false - resp = requestConsistencyCheck() - require.NotZero(t, resp.ReplJobId) -} - -func consumeConsistencyCheckResponses(t *testing.T, stream gitalypb.PraefectInfoService_ConsistencyCheckClient) []*gitalypb.ConsistencyCheckResponse { - var responses []*gitalypb.ConsistencyCheckResponse - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } - require.NoError(t, err) - responses = append(responses, resp) - } - return responses -} diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go index 24510db79..1a1e2578c 100644 --- a/internal/praefect/service/info/consistencycheck.go +++ b/internal/praefect/service/info/consistencycheck.go @@ -103,7 +103,12 @@ func walkRepos(ctx context.Context, walkerQ chan<- string, reference nodes.Node) if err != nil { return err } - walkerQ <- resp.GetRelativePath() + + select { + case <-ctx.Done(): + return ctx.Err() + case walkerQ <- resp.GetRelativePath(): + } } } @@ -134,7 +139,18 @@ type checksumResult struct { func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ chan<- checksumResult, target, reference nodes.Node, virtualStorage string) error { defer close(checksumResultQ) - for repoRelPath := range relpathQ { + for { + var repoRelPath string + select { + case <-ctx.Done(): + return ctx.Err() + case repoPath, ok := <-relpathQ: + if !ok { + return nil + } + repoRelPath = repoPath + } + cs := checksumResult{ virtualStorage: virtualStorage, relativePath: repoRelPath, @@ -142,10 +158,10 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ referenceStorage: reference.GetStorage(), } - g, ctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContext(ctx) g.Go(func() (err error) { - cs.target, err = checksumRepo(ctx, repoRelPath, target) + cs.target, err = checksumRepo(gctx, repoRelPath, target) if status.Code(err) == codes.NotFound { // missing repo on target is okay, we need to // replicate from reference @@ -155,7 +171,7 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ }) g.Go(func() (err error) { - cs.reference, err = checksumRepo(ctx, repoRelPath, reference) + cs.reference, err = checksumRepo(gctx, repoRelPath, reference) return err }) @@ -163,10 +179,12 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ return err } - checksumResultQ <- cs + select { + case <-ctx.Done(): + return ctx.Err() + case checksumResultQ <- cs: + } } - - return nil } func scheduleReplication(ctx context.Context, csr checksumResult, q datastore.ReplicationEventQueue, resp *gitalypb.ConsistencyCheckResponse) error { diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go new file mode 100644 index 000000000..56092aaf6 --- /dev/null +++ b/internal/praefect/service/info/consistencycheck_test.go @@ -0,0 +1,287 @@ +package info + +import ( + "context" + "io" + "net" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/client" + gconfig "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/internalgitaly" + "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/repository" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestServer_ConsistencyCheck(t *testing.T) { + defer func(old gconfig.Cfg) { gconfig.Config = old }(gconfig.Config) + + primaryStorageDir, cleanupPrim := testhelper.TempDir(t) + defer cleanupPrim() + secondaryStorageDir, cleanupSec := testhelper.TempDir(t) + defer cleanupSec() + + // 1.git exists on both storages and it is the same + testhelper.NewTestRepoTo(t, primaryStorageDir, "1.git") + testhelper.NewTestRepoTo(t, secondaryStorageDir, "1.git") + // 2.git exists only on target storage (where traversal happens) + testhelper.NewTestRepoTo(t, secondaryStorageDir, "2.git") + // not.git is a folder on target storage that should be skipped as it is not a git repository + require.NoError(t, os.MkdirAll(filepath.Join(secondaryStorageDir, "not.git"), os.ModePerm)) + + gconfig.Config.Storages = []gconfig.Storage{{ + Name: "target", + Path: secondaryStorageDir, + }, { + Name: "reference", + Path: primaryStorageDir, + }} + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{{ + Name: "vs", + Nodes: []*config.Node{{ + Storage: "reference", + Address: testhelper.GetTemporaryGitalySocketFileName(), + }, { + Storage: "target", + Address: testhelper.GetTemporaryGitalySocketFileName(), + }}, + }}, + } + + for _, node := range conf.VirtualStorages[0].Nodes { + gitalyListener, err := net.Listen("unix", node.Address) + require.NoError(t, err) + + gitalySrv := grpc.NewServer() + defer gitalySrv.Stop() + gitalypb.RegisterRepositoryServiceServer(gitalySrv, repository.NewServer(gconfig.Config, nil, gconfig.NewLocator(gconfig.Config), "")) + gitalypb.RegisterInternalGitalyServer(gitalySrv, internalgitaly.NewServer(gconfig.Config.Storages)) + go func() { gitalySrv.Serve(gitalyListener) }() + } + + ctx, cancel := testhelper.Context() + defer cancel() + + referenceConn, err := client.DialContext(ctx, "unix://"+conf.VirtualStorages[0].Nodes[0].Address, nil) + require.NoError(t, err) + defer referenceConn.Close() + + targetConn, err := client.DialContext(ctx, "unix://"+conf.VirtualStorages[0].Nodes[1].Address, nil) + require.NoError(t, err) + defer targetConn.Close() + + nm := &nodes.MockManager{ + GetShardFunc: func(s string) (nodes.Shard, error) { + if s != conf.VirtualStorages[0].Name { + return nodes.Shard{}, nodes.ErrVirtualStorageNotExist + } + return nodes.Shard{ + Primary: &nodes.MockNode{ + GetStorageMethod: func() string { return gconfig.Config.Storages[0].Name }, + Conn: referenceConn, + Healthy: true, + }, + Secondaries: []nodes.Node{&nodes.MockNode{ + GetStorageMethod: func() string { return gconfig.Config.Storages[1].Name }, + Conn: targetConn, + Healthy: true, + }}, + }, nil + }, + } + + praefectAddr := testhelper.GetTemporaryGitalySocketFileName() + praefectListener, err := net.Listen("unix", praefectAddr) + require.NoError(t, err) + defer praefectListener.Close() + + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) + queue.OnEnqueue(func(ctx context.Context, e datastore.ReplicationEvent, q datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + return datastore.ReplicationEvent{ID: 1}, nil + }) + rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + + grpcSrv := grpc.NewServer() + defer grpcSrv.Stop() + + gitalypb.RegisterPraefectInfoServiceServer(grpcSrv, NewServer(nm, conf, queue, rs)) + go grpcSrv.Serve(praefectListener) + + infoConn, err := client.Dial("unix://"+praefectAddr, nil) + require.NoError(t, err) + defer infoConn.Close() + + infoClient := gitalypb.NewPraefectInfoServiceClient(infoConn) + + for _, tc := range []struct { + desc string + req gitalypb.ConsistencyCheckRequest + verify func(*testing.T, []*gitalypb.ConsistencyCheckResponse, error) + }{ + { + desc: "with replication event created", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "reference", + ReferenceStorage: "target", + DisableReconcilliation: false, + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.NoError(t, err) + require.Equal(t, []*gitalypb.ConsistencyCheckResponse{ + { + RepoRelativePath: "1.git", + TargetChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea", + ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea", + ReplJobId: 0, + ReferenceStorage: "target", + }, + { + RepoRelativePath: "2.git", + TargetChecksum: "", + ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea", + ReplJobId: 1, + ReferenceStorage: "target", + }, + }, resp) + }, + }, + { + desc: "without replication event", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "reference", + ReferenceStorage: "target", + DisableReconcilliation: true, + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.NoError(t, err) + require.Equal(t, []*gitalypb.ConsistencyCheckResponse{ + { + RepoRelativePath: "1.git", + TargetChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea", + ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea", + ReplJobId: 0, + ReferenceStorage: "target", + }, + { + RepoRelativePath: "2.git", + TargetChecksum: "", + ReferenceChecksum: "06c4db1a33b2e48dac0bf940c7c20429d00a04ea", + ReplJobId: 0, + ReferenceStorage: "target", + }, + }, resp) + }, + }, + { + desc: "no target", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "", + ReferenceStorage: "reference", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.InvalidArgument, "missing target storage"), err) + }, + }, + { + desc: "unknown target", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "unknown", + ReferenceStorage: "reference", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.NotFound, `unable to find target storage "unknown"`), err) + }, + }, + { + desc: "no reference", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "target", + ReferenceStorage: "", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.InvalidArgument, `target storage "target" is same as current primary, must provide alternate reference`), err) + }, + }, + { + desc: "unknown reference", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "target", + ReferenceStorage: "unknown", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.NotFound, `unable to find reference storage "unknown" in nodes for shard "vs"`), err) + }, + }, + { + desc: "same storage", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "vs", + TargetStorage: "target", + ReferenceStorage: "target", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.InvalidArgument, `target storage "target" cannot match reference storage "target"`), err) + }, + }, + { + desc: "no virtual", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "", + TargetStorage: "target", + ReferenceStorage: "reference", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.InvalidArgument, "missing virtual storage"), err) + }, + }, + { + desc: "unknown virtual", + req: gitalypb.ConsistencyCheckRequest{ + VirtualStorage: "unknown", + TargetStorage: "target", + ReferenceStorage: "unknown", + }, + verify: func(t *testing.T, resp []*gitalypb.ConsistencyCheckResponse, err error) { + require.Equal(t, status.Error(codes.NotFound, "virtual storage does not exist"), err) + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + response, err := infoClient.ConsistencyCheck(ctx, &tc.req) + require.NoError(t, err) + + var results []*gitalypb.ConsistencyCheckResponse + var result *gitalypb.ConsistencyCheckResponse + for { + result, err = response.Recv() + if err != nil { + break + } + results = append(results, result) + } + + if err == io.EOF { + err = nil + } + tc.verify(t, results, err) + }) + } +} diff --git a/internal/praefect/service/info/testhelper_test.go b/internal/praefect/service/info/testhelper_test.go new file mode 100644 index 000000000..ae20d2d53 --- /dev/null +++ b/internal/praefect/service/info/testhelper_test.go @@ -0,0 +1,21 @@ +package info + +import ( + "os" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + defer testhelper.MustHaveNoChildProcess() + + cleanup := testhelper.Configure() + defer cleanup() + + return m.Run() +} |