diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-17 09:43:26 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-18 13:31:36 +0300 |
commit | 9badb9ff507113d93b299e87a134bee86ac7eff7 (patch) | |
tree | 976f409d3cdb320249f853d9ff58186c87e1ecb1 | |
parent | 38fab810270c4d1e185a9e9448c8c741a4ab97c8 (diff) |
blob: Drop object filter from pipeline
As part of our pipeline steps, we have a step which allows one to filter
contents of the last pipeline step. The only additional information that
is available at this point in time is the object contents, so this step
effectively can only filter on contents which have already been read in
the first place. Given that the filters' intent is to avoid loading
objects from disk from which we know that they're not going to be
relevant in the first place, having a filter on object contents is not
really that helpful at all.
More importantly though, we're about to convert the `catfileObject()`
pipeline step to not return object data directly but instead to return a
reader. This is effectively incompatible with the filtering step: we
cannot read the same reader twice, and inspecting object data has been
the only motivation for this filter.
Remove the filter step and instead move the burden of filtering to
callers: they're processing results from `catfileObject()` anyway, so
it's not much of a big deal.
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 13 | ||||
-rw-r--r-- | internal/gitaly/service/blob/pipeline.go | 21 | ||||
-rw-r--r-- | internal/gitaly/service/blob/pipeline_test.go | 122 |
3 files changed, 10 insertions, 146 deletions
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index a16523428..4b07608a0 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -79,9 +79,6 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize }) catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan) - catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool { - return git.IsLFSPointer(r.objectData) - }) if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { return err @@ -144,9 +141,6 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize }) catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan) - catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool { - return git.IsLFSPointer(r.objectData) - }) if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { return err @@ -201,9 +195,6 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize }) catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan) - catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool { - return git.IsLFSPointer(r.objectData) - }) if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil { return err @@ -384,6 +375,10 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectRes return helper.ErrInternal(lfsPointer.err) } + if !git.IsLFSPointer(lfsPointer.objectData) { + continue + } + if err := chunker.Send(&gitalypb.LFSPointer{ Data: lfsPointer.objectData, Size: lfsPointer.objectInfo.Size, diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go index 3be39886f..57f327f8f 100644 --- a/internal/gitaly/service/blob/pipeline.go +++ b/internal/gitaly/service/blob/pipeline.go @@ -390,24 +390,3 @@ func catfileObject( return resultChan } - -// catfileObjectFilter filters the catfileObjectResults from the provided channel with the filter -// function: if the filter returns `false` for a given item, then it will be dropped from the -// pipeline. Errors cannot be filtered and will always be passed through. -func catfileObjectFilter(ctx context.Context, c <-chan catfileObjectResult, filter func(catfileObjectResult) bool) <-chan catfileObjectResult { - resultChan := make(chan catfileObjectResult) - go func() { - defer close(resultChan) - - for result := range c { - if result.err != nil || filter(result) { - select { - case resultChan <- result: - case <-ctx.Done(): - return - } - } - } - }() - return resultChan -} diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go index fc3ad6617..7f7cb9a43 100644 --- a/internal/gitaly/service/blob/pipeline_test.go +++ b/internal/gitaly/service/blob/pipeline_test.go @@ -1,7 +1,6 @@ package blob import ( - "bytes" "context" "errors" "testing" @@ -595,90 +594,6 @@ func TestCatfileObject(t *testing.T) { } } -func TestCatfileObjectFilter(t *testing.T) { - for _, tc := range []struct { - desc string - input []catfileObjectResult - filter func(catfileObjectResult) bool - expectedResults []catfileObjectResult - }{ - { - desc: "all accepted", - input: []catfileObjectResult{ - {objectName: []byte{'a'}}, - {objectName: []byte{'b'}}, - {objectName: []byte{'c'}}, - }, - filter: func(catfileObjectResult) bool { - return true - }, - expectedResults: []catfileObjectResult{ - {objectName: []byte{'a'}}, - {objectName: []byte{'b'}}, - {objectName: []byte{'c'}}, - }, - }, - { - desc: "all filtered", - input: []catfileObjectResult{ - {objectName: []byte{'a'}}, - {objectName: []byte{'b'}}, - {objectName: []byte{'c'}}, - }, - filter: func(catfileObjectResult) bool { - return false - }, - }, - { - desc: "errors always get through", - input: []catfileObjectResult{ - {objectName: []byte{'a'}}, - {objectName: []byte{'b'}}, - {err: errors.New("foobar")}, - {objectName: []byte{'c'}}, - }, - filter: func(catfileObjectResult) bool { - return false - }, - expectedResults: []catfileObjectResult{ - {err: errors.New("foobar")}, - }, - }, - { - desc: "subset filtered", - input: []catfileObjectResult{ - {objectName: []byte{'a'}}, - {objectName: []byte{'b'}}, - {objectName: []byte{'c'}}, - }, - filter: func(r catfileObjectResult) bool { - return r.objectName[0] == 'b' - }, - expectedResults: []catfileObjectResult{ - {objectName: []byte{'b'}}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - inputChan := make(chan catfileObjectResult, len(tc.input)) - for _, input := range tc.input { - inputChan <- input - } - close(inputChan) - - var results []catfileObjectResult - for result := range catfileObjectFilter(ctx, inputChan, tc.filter) { - results = append(results, result) - } - - require.Equal(t, tc.expectedResults, results) - }) - } -} - func TestPipeline(t *testing.T) { cfg := testcfg.Build(t) @@ -687,12 +602,11 @@ func TestPipeline(t *testing.T) { repo := localrepo.NewTestRepo(t, cfg, repoProto) for _, tc := range []struct { - desc string - revisions []string - revlistFilter func(revlistResult) bool - catfileInfoFilter func(catfileInfoResult) bool - catfileObjectFilter func(catfileObjectResult) bool - expectedResults []catfileObjectResult + desc string + revisions []string + revlistFilter func(revlistResult) bool + catfileInfoFilter func(catfileInfoResult) bool + expectedResults []catfileObjectResult }{ { desc: "single blob", @@ -769,19 +683,6 @@ func TestPipeline(t *testing.T) { }, }, { - desc: "revision with blob contents filter", - revisions: []string{ - "master", - }, - catfileObjectFilter: func(r catfileObjectResult) bool { - return bytes.HasPrefix(r.objectData, []byte("/custom-highlighting/")) - }, - expectedResults: []catfileObjectResult{ - {objectInfo: &catfile.ObjectInfo{Oid: "b680596c9f3a3c834b933aef14f94a0ab9fa604a", Type: "blob", Size: 100}, objectName: []byte(".gitattributes")}, - {objectInfo: &catfile.ObjectInfo{Oid: "36814a3da051159a1683479e7a1487120309db8f", Type: "blob", Size: 58}, objectName: []byte(".gitattributes")}, - }, - }, - { desc: "--all with all filters", revisions: []string{ "--all", @@ -795,12 +696,9 @@ func TestPipeline(t *testing.T) { // Only let through blobs, so only the two LFS pointers remain. return r.objectInfo.Type == "blob" }, - catfileObjectFilter: func(r catfileObjectResult) bool { - // This brings it down to a single LFS pointer. - return len(r.objectData) == 133 - }, expectedResults: []catfileObjectResult{ {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, objectName: []byte("files/lfs/lfs_object.iso")}, + {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}, objectName: []byte("another.lfs")}, }, }, { @@ -836,10 +734,6 @@ func TestPipeline(t *testing.T) { require.Fail(t, "filter should not be invoked on errors") return true }, - catfileObjectFilter: func(r catfileObjectResult) bool { - require.Fail(t, "filter should not be invoked on errors") - return true - }, expectedResults: []catfileObjectResult{ {err: errors.New("rev-list pipeline command: exit status 128")}, }, @@ -866,9 +760,6 @@ func TestPipeline(t *testing.T) { } catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan) - if tc.catfileObjectFilter != nil { - catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, tc.catfileObjectFilter) - } var results []catfileObjectResult for result := range catfileObjectChan { @@ -913,7 +804,6 @@ func TestPipeline(t *testing.T) { catfileInfoChan := catfileInfo(childCtx, catfileProcess, revlistChan) catfileInfoChan = catfileInfoFilter(childCtx, catfileInfoChan, func(catfileInfoResult) bool { return true }) catfileObjectChan := catfileObject(childCtx, catfileProcess, catfileInfoChan) - catfileObjectChan = catfileObjectFilter(childCtx, catfileObjectChan, func(catfileObjectResult) bool { return true }) i := 0 for result := range catfileObjectChan { |