Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-17 09:43:26 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-18 13:31:36 +0300
commit9badb9ff507113d93b299e87a134bee86ac7eff7 (patch)
tree976f409d3cdb320249f853d9ff58186c87e1ecb1
parent38fab810270c4d1e185a9e9448c8c741a4ab97c8 (diff)
blob: Drop object filter from pipeline
As part of our pipeline steps, we have a step which allows one to filter contents of the last pipeline step. The only additional information that is available at this point in time is the object contents, so this step effectively can only filter on contents which have already been read in the first place. Given that the filters' intent is to avoid loading objects from disk from which we know that they're not going to be relevant in the first place, having a filter on object contents is not really that helpful at all. More importantly though, we're about to convert the `catfileObject()` pipeline step to not return object data directly but instead to return a reader. This is effectively incompatible with the filtering step: we cannot read the same reader twice, and inspecting object data has been the only motivation for this filter. Remove the filter step and instead move the burden of filtering to callers: they're processing results from `catfileObject()` anyway, so it's not much of a big deal.
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go13
-rw-r--r--internal/gitaly/service/blob/pipeline.go21
-rw-r--r--internal/gitaly/service/blob/pipeline_test.go122
3 files changed, 10 insertions, 146 deletions
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index a16523428..4b07608a0 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -79,9 +79,6 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
})
catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
- catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool {
- return git.IsLFSPointer(r.objectData)
- })
if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
return err
@@ -144,9 +141,6 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
})
catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
- catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool {
- return git.IsLFSPointer(r.objectData)
- })
if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
return err
@@ -201,9 +195,6 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
})
catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
- catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool {
- return git.IsLFSPointer(r.objectData)
- })
if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil {
return err
@@ -384,6 +375,10 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectRes
return helper.ErrInternal(lfsPointer.err)
}
+ if !git.IsLFSPointer(lfsPointer.objectData) {
+ continue
+ }
+
if err := chunker.Send(&gitalypb.LFSPointer{
Data: lfsPointer.objectData,
Size: lfsPointer.objectInfo.Size,
diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go
index 3be39886f..57f327f8f 100644
--- a/internal/gitaly/service/blob/pipeline.go
+++ b/internal/gitaly/service/blob/pipeline.go
@@ -390,24 +390,3 @@ func catfileObject(
return resultChan
}
-
-// catfileObjectFilter filters the catfileObjectResults from the provided channel with the filter
-// function: if the filter returns `false` for a given item, then it will be dropped from the
-// pipeline. Errors cannot be filtered and will always be passed through.
-func catfileObjectFilter(ctx context.Context, c <-chan catfileObjectResult, filter func(catfileObjectResult) bool) <-chan catfileObjectResult {
- resultChan := make(chan catfileObjectResult)
- go func() {
- defer close(resultChan)
-
- for result := range c {
- if result.err != nil || filter(result) {
- select {
- case resultChan <- result:
- case <-ctx.Done():
- return
- }
- }
- }
- }()
- return resultChan
-}
diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go
index fc3ad6617..7f7cb9a43 100644
--- a/internal/gitaly/service/blob/pipeline_test.go
+++ b/internal/gitaly/service/blob/pipeline_test.go
@@ -1,7 +1,6 @@
package blob
import (
- "bytes"
"context"
"errors"
"testing"
@@ -595,90 +594,6 @@ func TestCatfileObject(t *testing.T) {
}
}
-func TestCatfileObjectFilter(t *testing.T) {
- for _, tc := range []struct {
- desc string
- input []catfileObjectResult
- filter func(catfileObjectResult) bool
- expectedResults []catfileObjectResult
- }{
- {
- desc: "all accepted",
- input: []catfileObjectResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- filter: func(catfileObjectResult) bool {
- return true
- },
- expectedResults: []catfileObjectResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- },
- {
- desc: "all filtered",
- input: []catfileObjectResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- filter: func(catfileObjectResult) bool {
- return false
- },
- },
- {
- desc: "errors always get through",
- input: []catfileObjectResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {err: errors.New("foobar")},
- {objectName: []byte{'c'}},
- },
- filter: func(catfileObjectResult) bool {
- return false
- },
- expectedResults: []catfileObjectResult{
- {err: errors.New("foobar")},
- },
- },
- {
- desc: "subset filtered",
- input: []catfileObjectResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- filter: func(r catfileObjectResult) bool {
- return r.objectName[0] == 'b'
- },
- expectedResults: []catfileObjectResult{
- {objectName: []byte{'b'}},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- inputChan := make(chan catfileObjectResult, len(tc.input))
- for _, input := range tc.input {
- inputChan <- input
- }
- close(inputChan)
-
- var results []catfileObjectResult
- for result := range catfileObjectFilter(ctx, inputChan, tc.filter) {
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
func TestPipeline(t *testing.T) {
cfg := testcfg.Build(t)
@@ -687,12 +602,11 @@ func TestPipeline(t *testing.T) {
repo := localrepo.NewTestRepo(t, cfg, repoProto)
for _, tc := range []struct {
- desc string
- revisions []string
- revlistFilter func(revlistResult) bool
- catfileInfoFilter func(catfileInfoResult) bool
- catfileObjectFilter func(catfileObjectResult) bool
- expectedResults []catfileObjectResult
+ desc string
+ revisions []string
+ revlistFilter func(revlistResult) bool
+ catfileInfoFilter func(catfileInfoResult) bool
+ expectedResults []catfileObjectResult
}{
{
desc: "single blob",
@@ -769,19 +683,6 @@ func TestPipeline(t *testing.T) {
},
},
{
- desc: "revision with blob contents filter",
- revisions: []string{
- "master",
- },
- catfileObjectFilter: func(r catfileObjectResult) bool {
- return bytes.HasPrefix(r.objectData, []byte("/custom-highlighting/"))
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "b680596c9f3a3c834b933aef14f94a0ab9fa604a", Type: "blob", Size: 100}, objectName: []byte(".gitattributes")},
- {objectInfo: &catfile.ObjectInfo{Oid: "36814a3da051159a1683479e7a1487120309db8f", Type: "blob", Size: 58}, objectName: []byte(".gitattributes")},
- },
- },
- {
desc: "--all with all filters",
revisions: []string{
"--all",
@@ -795,12 +696,9 @@ func TestPipeline(t *testing.T) {
// Only let through blobs, so only the two LFS pointers remain.
return r.objectInfo.Type == "blob"
},
- catfileObjectFilter: func(r catfileObjectResult) bool {
- // This brings it down to a single LFS pointer.
- return len(r.objectData) == 133
- },
expectedResults: []catfileObjectResult{
{objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, objectName: []byte("files/lfs/lfs_object.iso")},
+ {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}, objectName: []byte("another.lfs")},
},
},
{
@@ -836,10 +734,6 @@ func TestPipeline(t *testing.T) {
require.Fail(t, "filter should not be invoked on errors")
return true
},
- catfileObjectFilter: func(r catfileObjectResult) bool {
- require.Fail(t, "filter should not be invoked on errors")
- return true
- },
expectedResults: []catfileObjectResult{
{err: errors.New("rev-list pipeline command: exit status 128")},
},
@@ -866,9 +760,6 @@ func TestPipeline(t *testing.T) {
}
catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
- if tc.catfileObjectFilter != nil {
- catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, tc.catfileObjectFilter)
- }
var results []catfileObjectResult
for result := range catfileObjectChan {
@@ -913,7 +804,6 @@ func TestPipeline(t *testing.T) {
catfileInfoChan := catfileInfo(childCtx, catfileProcess, revlistChan)
catfileInfoChan = catfileInfoFilter(childCtx, catfileInfoChan, func(catfileInfoResult) bool { return true })
catfileObjectChan := catfileObject(childCtx, catfileProcess, catfileInfoChan)
- catfileObjectChan = catfileObjectFilter(childCtx, catfileObjectChan, func(catfileObjectResult) bool { return true })
i := 0
for result := range catfileObjectChan {