diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-08-20 15:16:04 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-08-20 15:16:04 +0300 |
commit | d19fe09b1dbebcb6b42b72202c85a63f1f8a224a (patch) | |
tree | ff3cd8dcba6a3cfcc7e65a38e1dc69b57bc81f4b | |
parent | 6e6cd5f6a965815c5e396e0e2a23434baa2416c8 (diff) |
Add ruby scriptjv-bench-lfs-pointers
-rw-r--r-- | internal/command/command.go | 2 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers.go | 300 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers_test.go | 18 |
3 files changed, 160 insertions, 160 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index 0ee930eeb..c2ff8c279 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -101,6 +101,8 @@ func (c *Command) Read(p []byte) (int, error) { return c.reader.Read(p) } +func (c *Command) Reader() io.Reader { return c.reader } + func (c *Command) WriteTo(w io.Writer) (int64, error) { if wt, ok := c.reader.(io.WriterTo); ok { return wt.WriteTo(w) diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go index 267c5f9b9..cd346b370 100644 --- a/internal/service/blob/lfs_pointers.go +++ b/internal/service/blob/lfs_pointers.go @@ -2,14 +2,17 @@ package blob import ( "bufio" + "bytes" "fmt" "io" "io/ioutil" + "os" "os/exec" "strings" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/internal/command" + "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/internal/helper" @@ -171,6 +174,15 @@ func (s *server) GetAllLFSPointers(in *gitalypb.GetAllLFSPointersRequest, stream return nil } + if featureflag.IsEnabled(stream.Context(), "awk4") { + getAllLFSPointersRequests.WithLabelValues("awk4").Inc() + if err := getAllLFSPointersAwk4(in.GetRepository(), stream); err != nil { + return helper.ErrInternal(err) + } + + return nil + } + if featureflag.IsEnabled(stream.Context(), "smallblob") { getAllLFSPointersRequests.WithLabelValues("smallblob").Inc() if err := getAllLFSPointersSmallBlob(in.GetRepository(), stream); err != nil { @@ -180,6 +192,15 @@ func (s *server) GetAllLFSPointers(in *gitalypb.GetAllLFSPointersRequest, stream return nil } + if featureflag.IsEnabled(stream.Context(), "ruby-script") { + getAllLFSPointersRequests.WithLabelValues("ruby-script").Inc() + if err := getAllLFSPointersRubyScript(in.GetRepository(), stream); err != nil { + return helper.ErrInternal(err) + } + + return nil + } + getAllLFSPointersRequests.WithLabelValues("ruby").Inc() client, err := s.BlobServiceClient(ctx) @@ -230,7 +251,7 @@ func (s *allLFSPointersSender) Send() error { } func getAllLFSPointers(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { - args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--use-bitmap-index"} + args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"} cmd, err := git.Command(stream.Context(), repository, args...) if err != nil { @@ -298,7 +319,7 @@ func getAllLFSPointers(repository *gitalypb.Repository, stream gitalypb.BlobServ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { var commands []*command.Command - args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--use-bitmap-index"} + args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"} revList, err := git.Command(stream.Context(), repository, args...) if err != nil { @@ -312,31 +333,45 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS commands = append(commands, revList) ctx := stream.Context() - awk1, err := command.New(ctx, exec.Command("awk", "{print $1}"), revList, nil, nil) + awk1, err := command.New(ctx, exec.Command("awk", "{print $1}"), revList.Reader(), nil, nil) if err != nil { return err } commands = append(commands, awk1) - catCheck, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch-check", "--buffer"), awk1, nil, nil) + catCheck, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch-check", "--buffer"), awk1.Reader(), nil, nil) if err != nil { return err } commands = append(commands, catCheck) - awk2, err := command.New(ctx, exec.Command("awk", `$2 == "blob" && $3 <= 200 && $3 >= 100 {print $1}`), catCheck, nil, nil) + awk2, err := command.New(ctx, exec.Command("awk", `$2 == "blob" && $3 <= 200 && $3 >= 100 {print $1}`), catCheck.Reader(), nil, nil) if err != nil { return err } commands = append(commands, awk2) - catBatch, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch", "--buffer"), awk2, nil, nil) + catBatch, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch", "--buffer"), awk2.Reader(), nil, nil) if err != nil { return err } commands = append(commands, catBatch) - chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil}) + if err := parseCatfileOut(catBatch.Reader(), stream); err != nil { + return err + } - r := bufio.NewReader(catBatch) + for i := len(commands) - 1; i >= 0; i-- { + if err := commands[i].Wait(); err != nil { + return err + } + } + + return nil +} +func parseCatfileOut(_r io.Reader, stream gitalypb.BlobService_GetAllLFSPointersServer) error { + chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil}) + + r := bufio.NewReader(_r) + buf := &bytes.Buffer{} for { _, err := r.Peek(1) if err == io.EOF { @@ -351,7 +386,8 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS return err } - b, err := ioutil.ReadAll(io.LimitReader(r, info.Size)) + buf.Reset() + _, err = io.CopyN(buf, r, info.Size) if err != nil { return err } @@ -363,10 +399,12 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS return fmt.Errorf("unexpected character %x", delim) } - if !git.IsLFSPointer(b) { + if !git.IsLFSPointer(buf.Bytes()) { continue } + b := make([]byte, buf.Len()) + copy(b, buf.Bytes()) if err := chunker.Send(&gitalypb.LFSPointer{ Oid: info.Oid, Size: info.Size, @@ -376,21 +414,10 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS } } - for i := len(commands) - 1; i >= 0; i-- { - if err := commands[i].Wait(); err != nil { - return err - } - } - - if err := chunker.Flush(); err != nil { - return err - } - - return nil + return chunker.Flush() } func getAllLFSPointersAwk2(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { - var commands []*command.Command repoPath, err := helper.GetRepoPath(repository) if err != nil { @@ -399,11 +426,12 @@ func getAllLFSPointersAwk2(repository *gitalypb.Repository, stream gitalypb.Blob ctx := stream.Context() - cmd := exec.Command("/bin/sh") + // Use bash because we want 'set pipefail' + cmd := exec.Command("/usr/bin/env", "bash") cmd.Dir = repoPath stdin := strings.NewReader(` -set -e -git rev-list --all --in-commit-order --objects --use-bitmap-index |\ +set -e pipefail +git rev-list --all --in-commit-order --objects --filter=blob:limit=200 |\ awk '{ print $1 }' |\ git cat-file --batch-check --buffer |\ awk '$2 == "blob" && $3 >= 100 && $3 <= 200 { print $1 }' |\ @@ -413,49 +441,48 @@ git rev-list --all --in-commit-order --objects --use-bitmap-index |\ if err != nil { return err } - commands = append(commands, sh) - chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil}) + if err := parseCatfileOut(sh.Reader(), stream); err != nil { + return err + } - r := bufio.NewReader(sh) + return sh.Wait() +} - for { - _, err := r.Peek(1) - if err == io.EOF { - break - } - if err != nil { - return err - } +func getAllLFSPointersAwk4(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { + var commands []*command.Command - info, err := catfile.ParseObjectInfo(r) - if err != nil { - return err - } + repoPath, err := helper.GetRepoPath(repository) + if err != nil { + return err + } - b, err := ioutil.ReadAll(io.LimitReader(r, info.Size)) - if err != nil { - return err - } - delim, err := r.ReadByte() - if err != nil { - return err - } - if delim != '\n' { - return fmt.Errorf("unexpected character %x", delim) - } + ctx := stream.Context() + args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"} - if !git.IsLFSPointer(b) { - continue - } + revList, err := git.Command(ctx, repository, args...) + if err != nil { + return err + } + commands = append(commands, revList) - if err := chunker.Send(&gitalypb.LFSPointer{ - Oid: info.Oid, - Size: info.Size, - Data: b, - }); err != nil { - return err - } + script := ` +set -e +awk '{ print $1 }' |\ + git cat-file --batch-check --buffer |\ + awk '$2 == "blob" && $3 >= 100 && $3 <= 200 { print $1 }' |\ + git cat-file --batch --buffer +` + cmd := exec.Command("/bin/sh", "-c", script) + cmd.Dir = repoPath + sh, err := command.New(ctx, cmd, revList.Reader(), nil, nil) + if err != nil { + return err + } + commands = append(commands, sh) + + if err := parseCatfileOut(sh.Reader(), stream); err != nil { + return err } for i := len(commands) - 1; i >= 0; i-- { @@ -464,10 +491,6 @@ git rev-list --all --in-commit-order --objects --use-bitmap-index |\ } } - if err := chunker.Flush(); err != nil { - return err - } - return nil } @@ -509,7 +532,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob var commands []waitReader ctx := stream.Context() - args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--use-bitmap-index"} + args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"} revList, err := git.Command(ctx, repository, args...) if err != nil { return err @@ -522,7 +545,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob commands = append(commands, revList) - awk1 := newPseudoCommand(revList, func(_w io.Writer, r io.Reader) error { + awk1 := newPseudoCommand(revList.Reader(), func(_w io.Writer, r io.Reader) error { w := bufio.NewWriter(_w) scanner := bufio.NewScanner(r) for scanner.Scan() { @@ -530,7 +553,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob if len(split) == 0 { return fmt.Errorf("awk1: empty line") } - if _, err := fmt.Fprintln(w,split[0]); err != nil { + if _, err := fmt.Fprintln(w, split[0]); err != nil { return err } } @@ -548,7 +571,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob } commands = append(commands, catCheck) - awk2 := newPseudoCommand(catCheck, func(_w io.Writer, _r io.Reader) error { + awk2 := newPseudoCommand(catCheck.Reader(), func(_w io.Writer, _r io.Reader) error { r := bufio.NewReader(_r) w := bufio.NewWriter(_w) @@ -585,46 +608,8 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob } commands = append(commands, catBatch) - chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil}) - r := bufio.NewReader(catBatch) - for { - _, err := r.Peek(1) - if err == io.EOF { - break - } - if err != nil { - return err - } - - info, err := catfile.ParseObjectInfo(r) - if err != nil { - return err - } - - b, err := ioutil.ReadAll(io.LimitReader(r, info.Size)) - if err != nil { - return err - } - - delim, err := r.ReadByte() - if err != nil { - return err - } - if delim != '\n' { - return fmt.Errorf("expected newline, got 0x%x", delim) - } - - if !git.IsLFSPointer(b) { - continue - } - - if err := chunker.Send(&gitalypb.LFSPointer{ - Oid: info.Oid, - Size: info.Size, - Data: b, - }); err != nil { - return err - } + if err := parseCatfileOut(catBatch.Reader(), stream); err != nil { + return err } for i := len(commands) - 1; i >= 0; i-- { @@ -633,7 +618,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob } } - return chunker.Flush() + return nil } func getAllLFSPointersSmallBlob(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { @@ -646,65 +631,72 @@ func getAllLFSPointersSmallBlob(repository *gitalypb.Repository, stream gitalypb ctx := stream.Context() - cmd := exec.Command("/tmp/smallblob/smallblob",repoPath) + cmd := exec.Command("/tmp/smallblob/smallblob", repoPath) smallblob, err := command.New(ctx, cmd, nil, nil, nil) if err != nil { return err } commands = append(commands, smallblob) + if err := parseCatfileOut(smallblob.Reader(), stream); err != nil { + return err + } - chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil}) - - r := bufio.NewReader(smallblob) - - for { - _, err := r.Peek(1) - if err == io.EOF { - break - } - if err != nil { - return err - } - - info, err := catfile.ParseObjectInfo(r) - if err != nil { - return err - } - - b, err := ioutil.ReadAll(io.LimitReader(r, info.Size)) - if err != nil { - return err - } - delim, err := r.ReadByte() - if err != nil { + for i := len(commands) - 1; i >= 0; i-- { + if err := commands[i].Wait(); err != nil { return err } - if delim != '\n' { - return fmt.Errorf("unexpected character %x", delim) - } + } - if !git.IsLFSPointer(b) { - continue - } + return nil +} - if err := chunker.Send(&gitalypb.LFSPointer{ - Oid: info.Oid, - Size: info.Size, - Data: b, - }); err != nil { - return err - } +func getAllLFSPointersRubyScript(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error { + repoPath, err := helper.GetRepoPath(repository) + if err != nil { + return err } - for i := len(commands) - 1; i >= 0; i-- { - if err := commands[i].Wait(); err != nil { - return err - } + ctx := stream.Context() + + cmd := exec.Command("ruby", "--", "-", repoPath, fmt.Sprintf("%d", LfsPointerMinSize), fmt.Sprintf("%d", LfsPointerMaxSize)) + cmd.Dir = config.Config.Ruby.Dir + ruby, err := command.New(ctx, cmd, strings.NewReader(rubyScript), nil, nil, os.Environ()...) + if err != nil { + return err } - if err := chunker.Flush(); err != nil { + if err := parseCatfileOut(ruby, stream); err != nil { return err } - return nil + return ruby.Wait() } + +var rubyScript = ` + +def main(git_dir, minSize, maxSize) + IO.popen(%W[git -C #{git_dir} rev-list --all --filter=blob:limit=#{maxSize+1} --in-commit-order --objects], 'r') do |rev_list| + # Loading bundler and rugged is slow. Let's do it while we wait for git rev-list. + require 'bundler/setup' + require 'rugged' + + repo = Rugged::Repository.new(git_dir) + + rev_list.each_line do |line| + oid = line.split(' ', 2).first + abort "bad rev-list line #{line.inspect}" unless oid + + header = repo.read_header(oid) + next unless header[:len] >= minSize && header[:len] <= maxSize && header[:type] == :blob + + puts "#{oid} blob #{header[:len]}" + $stdout.write(repo.lookup(oid).content) + puts # newline separator, just like git cat-file + end + end + + abort 'rev-list failed' unless $?.success? +end + +main(ARGV[0], Integer(ARGV[1]), Integer(ARGV[2])) +` diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 86a8fc0b2..d8c4deba4 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -4,8 +4,10 @@ import ( "io" "os" "os/exec" + "path/filepath" "runtime/pprof" "testing" + "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/config" @@ -498,11 +500,13 @@ func BenchmarkGetAllLFS(b *testing.B) { t := testing.TB(b) server, serverSocketPath := runBlobServer(t) defer server.Stop() - + time.Sleep(5 * time.Second) //ruby server boot client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() - config.Config.Storages = append(config.Config.Storages, config.Storage{Name: "bench", Path: "testdata"}) + benchStorageDir, err := filepath.Abs("testdata") + require.NoError(t, err) + config.Config.Storages = append(config.Config.Storages, config.Storage{Name: "bench", Path: benchStorageDir}) testRepo := &gitalypb.Repository{StorageName: "bench", RelativePath: "git.git"} ctx, cancel := testhelper.Context() @@ -517,19 +521,21 @@ func BenchmarkGetAllLFS(b *testing.B) { name string flag string }{ - {name: "smallblob", flag: "smallblob"}, + // {name: "smallblob", flag: "smallblob"}, + {name: "ruby-script", flag: "ruby-script"}, + {name: "ruby", flag: ""}, {name: "awk2", flag: "awk2"}, + {name: "awk4", flag: "awk4"}, {name: "awk3", flag: "awk3"}, - {name: "go", flag: featureflag.GetAllLFSPointersGo}, - {name: "ruby", flag: ""}, {name: "awk", flag: "awk"}, + {name: "go", flag: featureflag.GetAllLFSPointersGo}, } for _, cs := range cases { b.Run(cs.name, func(b *testing.B) { for i := 0; i < b.N; i++ { func() { - if i == 0 &&false{ + if i == 0 && false { f, err := os.Create("/tmp/cpu." + cs.name) require.NoError(b, err) defer f.Close() |