Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-07-09 16:42:55 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-07-09 16:42:55 +0300
commit5fdd1ba64d79df3a46c74f29d17faf7927650887 (patch)
treea2346a1cc37188268c38d86b80ab2aa3dda3ccbc
parent1df990f975efb5fa73479ba32c46ad076f7568e8 (diff)
parent3a65ca3bbfb5aa9164819a4c486ab80fa3a9d584 (diff)
Merge branch 'pks-gitpipe-cancellation' into 'master'
gitpipe: Prioritize context cancellation Closes #3693 and #3697 See merge request gitlab-org/gitaly!3658
-rw-r--r--internal/git/gitpipe/catfile_info.go60
-rw-r--r--internal/git/gitpipe/catfile_object.go12
-rw-r--r--internal/git/gitpipe/pipeline_test.go16
-rw-r--r--internal/git/gitpipe/revision.go11
4 files changed, 57 insertions, 42 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 0f10d393e..aff2e74ca 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -34,27 +34,18 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revisionIterator Re
go func() {
defer close(resultChan)
- sendResult := func(result CatfileInfoResult) bool {
- select {
- case resultChan <- result:
- return false
- case <-ctx.Done():
- return true
- }
- }
-
for revisionIterator.Next() {
revlistResult := revisionIterator.Result()
objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision())
if err != nil {
- sendResult(CatfileInfoResult{
+ sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err),
})
return
}
- if isDone := sendResult(CatfileInfoResult{
+ if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
ObjectName: revlistResult.ObjectName,
ObjectInfo: objectInfo,
}); isDone {
@@ -63,7 +54,7 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revisionIterator Re
}
if err := revisionIterator.Err(); err != nil {
- sendResult(CatfileInfoResult{err: err})
+ sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err})
return
}
}()
@@ -84,15 +75,6 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf
go func() {
defer close(resultChan)
- sendResult := func(result CatfileInfoResult) bool {
- select {
- case resultChan <- result:
- return false
- case <-ctx.Done():
- return true
- }
- }
-
var stderr bytes.Buffer
cmd, err := repo.Exec(ctx, git.SubCmd{
Name: "cat-file",
@@ -104,7 +86,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf
},
}, git.WithStderr(&stderr))
if err != nil {
- sendResult(CatfileInfoResult{
+ sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
err: fmt.Errorf("spawning cat-file failed: %w", err),
})
return
@@ -118,13 +100,13 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf
break
}
- sendResult(CatfileInfoResult{
+ sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
err: fmt.Errorf("parsing object info: %w", err),
})
return
}
- if isDone := sendResult(CatfileInfoResult{
+ if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
ObjectInfo: objectInfo,
}); isDone {
return
@@ -132,7 +114,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf
}
if err := cmd.Wait(); err != nil {
- sendResult(CatfileInfoResult{
+ sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
err: fmt.Errorf("cat-file failed: %w, stderr: %q", err, stderr),
})
return
@@ -156,18 +138,14 @@ func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(
for it.Next() {
result := it.Result()
if filter(result) {
- select {
- case resultChan <- result:
- case <-ctx.Done():
+ if sendCatfileInfoResult(ctx, resultChan, result) {
return
}
}
}
if err := it.Err(); err != nil {
- select {
- case resultChan <- CatfileInfoResult{err: err}:
- case <-ctx.Done():
+ if sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) {
return
}
}
@@ -177,3 +155,23 @@ func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(
ch: resultChan,
}
}
+
+func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, result CatfileInfoResult) bool {
+ // In case the context has been cancelled, we have a race between observing an error from
+ // the killed Git process and observing the context cancellation itself. But if we end up
+ // here because of cancellation of the Git process, we don't want to pass that one down the
+ // pipeline but instead just stop the pipeline gracefully. We thus have this check here up
+ // front to error messages from the Git process.
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ }
+
+ select {
+ case ch <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+}
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 5a50c1298..aedde4cb1 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -39,6 +39,18 @@ func CatfileObject(
defer close(resultChan)
sendResult := func(result CatfileObjectResult) bool {
+ // In case the context has been cancelled, we have a race between observing
+ // an error from the killed Git process and observing the context
+ // cancellation itself. But if we end up here because of cancellation of the
+ // Git process, we don't want to pass that one down the pipeline but instead
+ // just stop the pipeline gracefully. We thus have this check here up front
+ // to error messages from the Git process.
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ }
+
select {
case resultChan <- result:
return false
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 7d46b587a..97062e6f6 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -1,7 +1,6 @@
package gitpipe
import (
- "context"
"errors"
"io"
"io/ioutil"
@@ -264,16 +263,11 @@ func TestPipeline(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- // We need to create a separate child context because otherwise we'd kill the batch
- // process.
- childCtx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- revlistIter := Revlist(childCtx, repo, []string{"--all"})
- revlistIter = RevisionFilter(childCtx, revlistIter, func(RevisionResult) bool { return true })
- catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter)
- catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true })
- catfileObjectIter := CatfileObject(childCtx, catfileProcess, catfileInfoIter)
+ revlistIter := Revlist(ctx, repo, []string{"--all"})
+ revlistIter = RevisionFilter(ctx, revlistIter, func(RevisionResult) bool { return true })
+ catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, func(CatfileInfoResult) bool { return true })
+ catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter)
i := 0
for catfileObjectIter.Next() {
diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go
index d4f5bf814..293f0bd07 100644
--- a/internal/git/gitpipe/revision.go
+++ b/internal/git/gitpipe/revision.go
@@ -409,6 +409,17 @@ func RevisionTransform(ctx context.Context, it RevisionIterator, transform func(
}
func sendRevisionResult(ctx context.Context, ch chan<- RevisionResult, result RevisionResult) bool {
+ // In case the context has been cancelled, we have a race between observing an error from
+ // the killed Git process and observing the context cancellation itself. But if we end up
+ // here because of cancellation of the Git process, we don't want to pass that one down the
+ // pipeline but instead just stop the pipeline gracefully. We thus have this check here up
+ // front to error messages from the Git process.
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ }
+
select {
case ch <- result:
return false