diff options
-rw-r--r-- | changelogs/unreleased/jc-getalllfspointer-go.yml | 5 | ||||
-rw-r--r-- | internal/git/catfile/objectinfo.go | 3 | ||||
-rw-r--r-- | internal/git/lfs.go | 31 | ||||
-rw-r--r-- | internal/metadata/featureflag/featureflags.go | 6 | ||||
-rw-r--r-- | internal/metadata/featureflag/test_utils.go | 19 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers.go | 164 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers_test.go | 14 |
7 files changed, 238 insertions, 4 deletions
diff --git a/changelogs/unreleased/jc-getalllfspointer-go.yml b/changelogs/unreleased/jc-getalllfspointer-go.yml new file mode 100644 index 000000000..d6c2041cf --- /dev/null +++ b/changelogs/unreleased/jc-getalllfspointer-go.yml @@ -0,0 +1,5 @@ +--- +title: Port GetAllLFSPointers to go +merge_request: 1414 +author: +type: performance diff --git a/internal/git/catfile/objectinfo.go b/internal/git/catfile/objectinfo.go index 7f7af80a5..de296287d 100644 --- a/internal/git/catfile/objectinfo.go +++ b/internal/git/catfile/objectinfo.go @@ -28,6 +28,9 @@ func (o *ObjectInfo) IsBlob() bool { return o.Type == "blob" } +// ParseObjectInfo reads from a reader and parses the data into an ObjectInfo struct +var ParseObjectInfo = parseObjectInfo + func parseObjectInfo(stdout *bufio.Reader) (*ObjectInfo, error) { infoLine, err := stdout.ReadString('\n') if err != nil { diff --git a/internal/git/lfs.go b/internal/git/lfs.go new file mode 100644 index 000000000..78f3aadd9 --- /dev/null +++ b/internal/git/lfs.go @@ -0,0 +1,31 @@ +package git + +import ( + "bytes" + "regexp" +) + +var ( + lfsOIDRe = regexp.MustCompile(`(?m)^oid sha256:[0-9a-f]{64}$`) + lfsSizeRe = regexp.MustCompile(`(?m)^size [0-9]+$`) +) + +// IsLFSPointer checks to see if a blob is an LFS pointer. It returns the raw data of the pointer if it is +func IsLFSPointer(b []byte) bool { + // ensure the version exists + if !bytes.HasPrefix(b, []byte("version https://git-lfs.github.com/spec")) { + return false + } + + // ensure the oid exists + if !lfsOIDRe.Match(b) { + return false + } + + // ensure the size exists + if !lfsSizeRe.Match(b) { + return false + } + + return true +} diff --git a/internal/metadata/featureflag/featureflags.go b/internal/metadata/featureflag/featureflags.go new file mode 100644 index 000000000..8ddd15d26 --- /dev/null +++ b/internal/metadata/featureflag/featureflags.go @@ -0,0 +1,6 @@ +package featureflag + +const ( + // GetAllLFSPointersGo will cause the GetAllLFSPointers RPC to use the go implementation when set + GetAllLFSPointersGo = "get_all_lfs_pointers_go" +) diff --git a/internal/metadata/featureflag/test_utils.go b/internal/metadata/featureflag/test_utils.go new file mode 100644 index 000000000..574ca94cc --- /dev/null +++ b/internal/metadata/featureflag/test_utils.go @@ -0,0 +1,19 @@ +package featureflag + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +// EnableFeatureFlag is used in tests to enablea a feature flag in the context metadata +func EnableFeatureFlag(ctx context.Context, flag string) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + md = metadata.New(map[string]string{HeaderKey(flag): "true"}) + } else { + md.Set(HeaderKey(flag), "true") + } + + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go index d84948687..fb45a5859 100644 --- a/internal/service/blob/lfs_pointers.go +++ b/internal/service/blob/lfs_pointers.go @@ -1,15 +1,39 @@ package blob import ( + "bufio" + "bytes" "fmt" + "io" + "os" + "os/exec" + "strings" + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/internal/command" + "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git" + "gitlab.com/gitlab-org/gitaly/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +const ( + // These limits are used as a heuristic to ignore files which can't be LFS + // pointers. The format of these is described in + // https://github.com/git-lfs/git-lfs/blob/master/docs/spec.md#the-pointer + + // LfsPointerMinSize is the minimum size for an lfs pointer text blob + LfsPointerMinSize = 120 + // LfsPointerMaxSize is the minimum size for an lfs pointer text blob + LfsPointerMaxSize = 200 +) + type getLFSPointerByRevisionRequest interface { GetRepository() *gitalypb.Repository GetRevision() []byte @@ -93,13 +117,37 @@ func (s *server) GetNewLFSPointers(in *gitalypb.GetNewLFSPointersRequest, stream }) } +var getAllLFSPointersRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_get_all_lfs_pointers_total", + Help: "Counter of go vs ruby implementation of GetAllLFSPointers", + }, + []string{"implementation"}, +) + +func init() { + prometheus.MustRegister(getAllLFSPointersRequests) +} + func (s *server) GetAllLFSPointers(in *gitalypb.GetAllLFSPointersRequest, stream gitalypb.BlobService_GetAllLFSPointersServer) error { ctx := stream.Context() if err := validateGetLfsPointersByRevisionRequest(in); err != nil { - return status.Errorf(codes.InvalidArgument, "GetAllLFSPointers: %v", err) + return helper.ErrInvalidArgument(err) } + if featureflag.IsEnabled(stream.Context(), featureflag.GetAllLFSPointersGo) { + getAllLFSPointersRequests.WithLabelValues("go").Inc() + + if err := getAllLFSPointersRubyScript(in.GetRepository(), stream); err != nil { + return helper.ErrInternal(err) + } + + return nil + } + + getAllLFSPointersRequests.WithLabelValues("ruby").Inc() + client, err := s.BlobServiceClient(ctx) if err != nil { return err @@ -133,3 +181,117 @@ func validateGetLfsPointersByRevisionRequest(in getLFSPointerByRevisionRequest) return git.ValidateRevision(in.GetRevision()) } + +type allLFSPointersSender struct { + stream gitalypb.BlobService_GetAllLFSPointersServer + lfsPointers []*gitalypb.LFSPointer +} + +func (s *allLFSPointersSender) Reset() { s.lfsPointers = nil } +func (s *allLFSPointersSender) Append(it chunk.Item) { + s.lfsPointers = append(s.lfsPointers, it.(*gitalypb.LFSPointer)) +} +func (s *allLFSPointersSender) Send() error { + return s.stream.Send(&gitalypb.GetAllLFSPointersResponse{LfsPointers: s.lfsPointers}) +} + +func getAllLFSPointersRubyScript(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { + repoPath, err := helper.GetRepoPath(repository) + if err != nil { + return err + } + + ctx := stream.Context() + + cmd := exec.Command("ruby", "--", "-", repoPath, fmt.Sprintf("%d", LfsPointerMinSize), fmt.Sprintf("%d", LfsPointerMaxSize)) + cmd.Dir = config.Config.Ruby.Dir + ruby, err := command.New(ctx, cmd, strings.NewReader(rubyScript), nil, nil, os.Environ()...) + if err != nil { + return err + } + + if err := parseCatfileOut(ruby, stream); err != nil { + return err + } + + return ruby.Wait() +} + +func parseCatfileOut(_r io.Reader, stream gitalypb.BlobService_GetAllLFSPointersServer) error { + chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil}) + + r := bufio.NewReader(_r) + buf := &bytes.Buffer{} + for { + _, err := r.Peek(1) + if err == io.EOF { + break + } + if err != nil { + return err + } + + info, err := catfile.ParseObjectInfo(r) + if err != nil { + return err + } + + buf.Reset() + _, err = io.CopyN(buf, r, info.Size) + if err != nil { + return err + } + delim, err := r.ReadByte() + if err != nil { + return err + } + if delim != '\n' { + return fmt.Errorf("unexpected character %x", delim) + } + + if !git.IsLFSPointer(buf.Bytes()) { + continue + } + + b := make([]byte, buf.Len()) + copy(b, buf.Bytes()) + if err := chunker.Send(&gitalypb.LFSPointer{ + Oid: info.Oid, + Size: info.Size, + Data: b, + }); err != nil { + return err + } + } + + return chunker.Flush() +} + +var rubyScript = ` + +def main(git_dir, minSize, maxSize) + IO.popen(%W[git -C #{git_dir} rev-list --all --filter=blob:limit=#{maxSize+1} --in-commit-order --objects], 'r') do |rev_list| + # Loading bundler and rugged is slow. Let's do it while we wait for git rev-list. + require 'bundler/setup' + require 'rugged' + + repo = Rugged::Repository.new(git_dir) + + rev_list.each_line do |line| + oid = line.split(' ', 2).first + abort "bad rev-list line #{line.inspect}" unless oid + + header = repo.read_header(oid) + next unless header[:len] >= minSize && header[:len] <= maxSize && header[:type] == :blob + + puts "#{oid} blob #{header[:len]}" + $stdout.write(repo.lookup(oid).content) + puts # newline separator, just like git cat-file + end + end + + abort 'rev-list failed' unless $?.success? +end + +main(ARGV[0], Integer(ARGV[1]), Integer(ARGV[2])) +` diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 0c184f922..a9036300c 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" @@ -368,9 +369,6 @@ func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) { Revision: []byte("54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"), } - c, err := client.GetAllLFSPointers(ctx, request) - require.NoError(t, err) - expectedLFSPointers := []*gitalypb.LFSPointer{ { Size: 133, @@ -404,6 +402,16 @@ func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) { }, } + c, err := client.GetAllLFSPointers(ctx, request) + require.NoError(t, err) + + require.ElementsMatch(t, expectedLFSPointers, getAllPointers(t, c)) + + // test with go implementation + // TODO: remove once feature flag is removed + c, err = client.GetAllLFSPointers(featureflag.EnableFeatureFlag(ctx, featureflag.GetAllLFSPointersGo), request) + require.NoError(t, err) + require.ElementsMatch(t, expectedLFSPointers, getAllPointers(t, c)) } |