diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-09 16:42:55 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-09 16:42:55 +0300 |
commit | 5fdd1ba64d79df3a46c74f29d17faf7927650887 (patch) | |
tree | a2346a1cc37188268c38d86b80ab2aa3dda3ccbc | |
parent | 1df990f975efb5fa73479ba32c46ad076f7568e8 (diff) | |
parent | 3a65ca3bbfb5aa9164819a4c486ab80fa3a9d584 (diff) |
Merge branch 'pks-gitpipe-cancellation' into 'master'
gitpipe: Prioritize context cancellation
Closes #3693 and #3697
See merge request gitlab-org/gitaly!3658
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 60 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 12 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 16 | ||||
-rw-r--r-- | internal/git/gitpipe/revision.go | 11 |
4 files changed, 57 insertions, 42 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 0f10d393e..aff2e74ca 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -34,27 +34,18 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revisionIterator Re go func() { defer close(resultChan) - sendResult := func(result CatfileInfoResult) bool { - select { - case resultChan <- result: - return false - case <-ctx.Done(): - return true - } - } - for revisionIterator.Next() { revlistResult := revisionIterator.Result() objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision()) if err != nil { - sendResult(CatfileInfoResult{ + sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err), }) return } - if isDone := sendResult(CatfileInfoResult{ + if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ ObjectName: revlistResult.ObjectName, ObjectInfo: objectInfo, }); isDone { @@ -63,7 +54,7 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revisionIterator Re } if err := revisionIterator.Err(); err != nil { - sendResult(CatfileInfoResult{err: err}) + sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) return } }() @@ -84,15 +75,6 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf go func() { defer close(resultChan) - sendResult := func(result CatfileInfoResult) bool { - select { - case resultChan <- result: - return false - case <-ctx.Done(): - return true - } - } - var stderr bytes.Buffer cmd, err := repo.Exec(ctx, git.SubCmd{ Name: "cat-file", @@ -104,7 +86,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf }, }, git.WithStderr(&stderr)) if err != nil { - sendResult(CatfileInfoResult{ + sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("spawning cat-file failed: %w", err), }) return @@ -118,13 +100,13 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf break } - sendResult(CatfileInfoResult{ + sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("parsing object info: %w", err), }) return } - if isDone := sendResult(CatfileInfoResult{ + if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ ObjectInfo: objectInfo, }); isDone { return @@ -132,7 +114,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf } if err := cmd.Wait(); err != nil { - sendResult(CatfileInfoResult{ + sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("cat-file failed: %w, stderr: %q", err, stderr), }) return @@ -156,18 +138,14 @@ func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func( for it.Next() { result := it.Result() if filter(result) { - select { - case resultChan <- result: - case <-ctx.Done(): + if sendCatfileInfoResult(ctx, resultChan, result) { return } } } if err := it.Err(); err != nil { - select { - case resultChan <- CatfileInfoResult{err: err}: - case <-ctx.Done(): + if sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) { return } } @@ -177,3 +155,23 @@ func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func( ch: resultChan, } } + +func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, result CatfileInfoResult) bool { + // In case the context has been cancelled, we have a race between observing an error from + // the killed Git process and observing the context cancellation itself. But if we end up + // here because of cancellation of the Git process, we don't want to pass that one down the + // pipeline but instead just stop the pipeline gracefully. We thus have this check here up + // front to error messages from the Git process. + select { + case <-ctx.Done(): + return true + default: + } + + select { + case ch <- result: + return false + case <-ctx.Done(): + return true + } +} diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 5a50c1298..aedde4cb1 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -39,6 +39,18 @@ func CatfileObject( defer close(resultChan) sendResult := func(result CatfileObjectResult) bool { + // In case the context has been cancelled, we have a race between observing + // an error from the killed Git process and observing the context + // cancellation itself. But if we end up here because of cancellation of the + // Git process, we don't want to pass that one down the pipeline but instead + // just stop the pipeline gracefully. We thus have this check here up front + // to error messages from the Git process. + select { + case <-ctx.Done(): + return true + default: + } + select { case resultChan <- result: return false diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 7d46b587a..97062e6f6 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -1,7 +1,6 @@ package gitpipe import ( - "context" "errors" "io" "io/ioutil" @@ -264,16 +263,11 @@ func TestPipeline(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - // We need to create a separate child context because otherwise we'd kill the batch - // process. - childCtx, cancel := context.WithCancel(ctx) - defer cancel() - - revlistIter := Revlist(childCtx, repo, []string{"--all"}) - revlistIter = RevisionFilter(childCtx, revlistIter, func(RevisionResult) bool { return true }) - catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter) - catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true }) - catfileObjectIter := CatfileObject(childCtx, catfileProcess, catfileInfoIter) + revlistIter := Revlist(ctx, repo, []string{"--all"}) + revlistIter = RevisionFilter(ctx, revlistIter, func(RevisionResult) bool { return true }) + catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter) + catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, func(CatfileInfoResult) bool { return true }) + catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter) i := 0 for catfileObjectIter.Next() { diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go index d4f5bf814..293f0bd07 100644 --- a/internal/git/gitpipe/revision.go +++ b/internal/git/gitpipe/revision.go @@ -409,6 +409,17 @@ func RevisionTransform(ctx context.Context, it RevisionIterator, transform func( } func sendRevisionResult(ctx context.Context, ch chan<- RevisionResult, result RevisionResult) bool { + // In case the context has been cancelled, we have a race between observing an error from + // the killed Git process and observing the context cancellation itself. But if we end up + // here because of cancellation of the Git process, we don't want to pass that one down the + // pipeline but instead just stop the pipeline gracefully. We thus have this check here up + // front to error messages from the Git process. + select { + case <-ctx.Done(): + return true + default: + } + select { case ch <- result: return false |