Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-08-20 15:16:04 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-08-20 15:16:04 +0300
commitd19fe09b1dbebcb6b42b72202c85a63f1f8a224a (patch)
treeff3cd8dcba6a3cfcc7e65a38e1dc69b57bc81f4b
parent6e6cd5f6a965815c5e396e0e2a23434baa2416c8 (diff)
Add ruby scriptjv-bench-lfs-pointers
-rw-r--r--internal/command/command.go2
-rw-r--r--internal/service/blob/lfs_pointers.go300
-rw-r--r--internal/service/blob/lfs_pointers_test.go18
3 files changed, 160 insertions, 160 deletions
diff --git a/internal/command/command.go b/internal/command/command.go
index 0ee930eeb..c2ff8c279 100644
--- a/internal/command/command.go
+++ b/internal/command/command.go
@@ -101,6 +101,8 @@ func (c *Command) Read(p []byte) (int, error) {
return c.reader.Read(p)
}
+func (c *Command) Reader() io.Reader { return c.reader }
+
func (c *Command) WriteTo(w io.Writer) (int64, error) {
if wt, ok := c.reader.(io.WriterTo); ok {
return wt.WriteTo(w)
diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go
index 267c5f9b9..cd346b370 100644
--- a/internal/service/blob/lfs_pointers.go
+++ b/internal/service/blob/lfs_pointers.go
@@ -2,14 +2,17 @@ package blob
import (
"bufio"
+ "bytes"
"fmt"
"io"
"io/ioutil"
+ "os"
"os/exec"
"strings"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/internal/command"
+ "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/internal/helper"
@@ -171,6 +174,15 @@ func (s *server) GetAllLFSPointers(in *gitalypb.GetAllLFSPointersRequest, stream
return nil
}
+ if featureflag.IsEnabled(stream.Context(), "awk4") {
+ getAllLFSPointersRequests.WithLabelValues("awk4").Inc()
+ if err := getAllLFSPointersAwk4(in.GetRepository(), stream); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+ }
+
if featureflag.IsEnabled(stream.Context(), "smallblob") {
getAllLFSPointersRequests.WithLabelValues("smallblob").Inc()
if err := getAllLFSPointersSmallBlob(in.GetRepository(), stream); err != nil {
@@ -180,6 +192,15 @@ func (s *server) GetAllLFSPointers(in *gitalypb.GetAllLFSPointersRequest, stream
return nil
}
+ if featureflag.IsEnabled(stream.Context(), "ruby-script") {
+ getAllLFSPointersRequests.WithLabelValues("ruby-script").Inc()
+ if err := getAllLFSPointersRubyScript(in.GetRepository(), stream); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+ }
+
getAllLFSPointersRequests.WithLabelValues("ruby").Inc()
client, err := s.BlobServiceClient(ctx)
@@ -230,7 +251,7 @@ func (s *allLFSPointersSender) Send() error {
}
func getAllLFSPointers(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
- args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--use-bitmap-index"}
+ args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"}
cmd, err := git.Command(stream.Context(), repository, args...)
if err != nil {
@@ -298,7 +319,7 @@ func getAllLFSPointers(repository *gitalypb.Repository, stream gitalypb.BlobServ
func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
var commands []*command.Command
- args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--use-bitmap-index"}
+ args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"}
revList, err := git.Command(stream.Context(), repository, args...)
if err != nil {
@@ -312,31 +333,45 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS
commands = append(commands, revList)
ctx := stream.Context()
- awk1, err := command.New(ctx, exec.Command("awk", "{print $1}"), revList, nil, nil)
+ awk1, err := command.New(ctx, exec.Command("awk", "{print $1}"), revList.Reader(), nil, nil)
if err != nil {
return err
}
commands = append(commands, awk1)
- catCheck, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch-check", "--buffer"), awk1, nil, nil)
+ catCheck, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch-check", "--buffer"), awk1.Reader(), nil, nil)
if err != nil {
return err
}
commands = append(commands, catCheck)
- awk2, err := command.New(ctx, exec.Command("awk", `$2 == "blob" && $3 <= 200 && $3 >= 100 {print $1}`), catCheck, nil, nil)
+ awk2, err := command.New(ctx, exec.Command("awk", `$2 == "blob" && $3 <= 200 && $3 >= 100 {print $1}`), catCheck.Reader(), nil, nil)
if err != nil {
return err
}
commands = append(commands, awk2)
- catBatch, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch", "--buffer"), awk2, nil, nil)
+ catBatch, err := command.New(ctx, exec.Command("git", "-C", repoPath, "cat-file", "--batch", "--buffer"), awk2.Reader(), nil, nil)
if err != nil {
return err
}
commands = append(commands, catBatch)
- chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil})
+ if err := parseCatfileOut(catBatch.Reader(), stream); err != nil {
+ return err
+ }
- r := bufio.NewReader(catBatch)
+ for i := len(commands) - 1; i >= 0; i-- {
+ if err := commands[i].Wait(); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+func parseCatfileOut(_r io.Reader, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
+ chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil})
+
+ r := bufio.NewReader(_r)
+ buf := &bytes.Buffer{}
for {
_, err := r.Peek(1)
if err == io.EOF {
@@ -351,7 +386,8 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS
return err
}
- b, err := ioutil.ReadAll(io.LimitReader(r, info.Size))
+ buf.Reset()
+ _, err = io.CopyN(buf, r, info.Size)
if err != nil {
return err
}
@@ -363,10 +399,12 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS
return fmt.Errorf("unexpected character %x", delim)
}
- if !git.IsLFSPointer(b) {
+ if !git.IsLFSPointer(buf.Bytes()) {
continue
}
+ b := make([]byte, buf.Len())
+ copy(b, buf.Bytes())
if err := chunker.Send(&gitalypb.LFSPointer{
Oid: info.Oid,
Size: info.Size,
@@ -376,21 +414,10 @@ func getAllLFSPointersAwk(repository *gitalypb.Repository, stream gitalypb.BlobS
}
}
- for i := len(commands) - 1; i >= 0; i-- {
- if err := commands[i].Wait(); err != nil {
- return err
- }
- }
-
- if err := chunker.Flush(); err != nil {
- return err
- }
-
- return nil
+ return chunker.Flush()
}
func getAllLFSPointersAwk2(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
- var commands []*command.Command
repoPath, err := helper.GetRepoPath(repository)
if err != nil {
@@ -399,11 +426,12 @@ func getAllLFSPointersAwk2(repository *gitalypb.Repository, stream gitalypb.Blob
ctx := stream.Context()
- cmd := exec.Command("/bin/sh")
+ // Use bash because we want 'set pipefail'
+ cmd := exec.Command("/usr/bin/env", "bash")
cmd.Dir = repoPath
stdin := strings.NewReader(`
-set -e
-git rev-list --all --in-commit-order --objects --use-bitmap-index |\
+set -e pipefail
+git rev-list --all --in-commit-order --objects --filter=blob:limit=200 |\
awk '{ print $1 }' |\
git cat-file --batch-check --buffer |\
awk '$2 == "blob" && $3 >= 100 && $3 <= 200 { print $1 }' |\
@@ -413,49 +441,48 @@ git rev-list --all --in-commit-order --objects --use-bitmap-index |\
if err != nil {
return err
}
- commands = append(commands, sh)
- chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil})
+ if err := parseCatfileOut(sh.Reader(), stream); err != nil {
+ return err
+ }
- r := bufio.NewReader(sh)
+ return sh.Wait()
+}
- for {
- _, err := r.Peek(1)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
+func getAllLFSPointersAwk4(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
+ var commands []*command.Command
- info, err := catfile.ParseObjectInfo(r)
- if err != nil {
- return err
- }
+ repoPath, err := helper.GetRepoPath(repository)
+ if err != nil {
+ return err
+ }
- b, err := ioutil.ReadAll(io.LimitReader(r, info.Size))
- if err != nil {
- return err
- }
- delim, err := r.ReadByte()
- if err != nil {
- return err
- }
- if delim != '\n' {
- return fmt.Errorf("unexpected character %x", delim)
- }
+ ctx := stream.Context()
+ args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"}
- if !git.IsLFSPointer(b) {
- continue
- }
+ revList, err := git.Command(ctx, repository, args...)
+ if err != nil {
+ return err
+ }
+ commands = append(commands, revList)
- if err := chunker.Send(&gitalypb.LFSPointer{
- Oid: info.Oid,
- Size: info.Size,
- Data: b,
- }); err != nil {
- return err
- }
+ script := `
+set -e
+awk '{ print $1 }' |\
+ git cat-file --batch-check --buffer |\
+ awk '$2 == "blob" && $3 >= 100 && $3 <= 200 { print $1 }' |\
+ git cat-file --batch --buffer
+`
+ cmd := exec.Command("/bin/sh", "-c", script)
+ cmd.Dir = repoPath
+ sh, err := command.New(ctx, cmd, revList.Reader(), nil, nil)
+ if err != nil {
+ return err
+ }
+ commands = append(commands, sh)
+
+ if err := parseCatfileOut(sh.Reader(), stream); err != nil {
+ return err
}
for i := len(commands) - 1; i >= 0; i-- {
@@ -464,10 +491,6 @@ git rev-list --all --in-commit-order --objects --use-bitmap-index |\
}
}
- if err := chunker.Flush(); err != nil {
- return err
- }
-
return nil
}
@@ -509,7 +532,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob
var commands []waitReader
ctx := stream.Context()
- args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--use-bitmap-index"}
+ args := []string{"rev-list", "--all", "--in-commit-order", "--objects", "--filter=blob:limit=200"}
revList, err := git.Command(ctx, repository, args...)
if err != nil {
return err
@@ -522,7 +545,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob
commands = append(commands, revList)
- awk1 := newPseudoCommand(revList, func(_w io.Writer, r io.Reader) error {
+ awk1 := newPseudoCommand(revList.Reader(), func(_w io.Writer, r io.Reader) error {
w := bufio.NewWriter(_w)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
@@ -530,7 +553,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob
if len(split) == 0 {
return fmt.Errorf("awk1: empty line")
}
- if _, err := fmt.Fprintln(w,split[0]); err != nil {
+ if _, err := fmt.Fprintln(w, split[0]); err != nil {
return err
}
}
@@ -548,7 +571,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob
}
commands = append(commands, catCheck)
- awk2 := newPseudoCommand(catCheck, func(_w io.Writer, _r io.Reader) error {
+ awk2 := newPseudoCommand(catCheck.Reader(), func(_w io.Writer, _r io.Reader) error {
r := bufio.NewReader(_r)
w := bufio.NewWriter(_w)
@@ -585,46 +608,8 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob
}
commands = append(commands, catBatch)
- chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil})
- r := bufio.NewReader(catBatch)
- for {
- _, err := r.Peek(1)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
-
- info, err := catfile.ParseObjectInfo(r)
- if err != nil {
- return err
- }
-
- b, err := ioutil.ReadAll(io.LimitReader(r, info.Size))
- if err != nil {
- return err
- }
-
- delim, err := r.ReadByte()
- if err != nil {
- return err
- }
- if delim != '\n' {
- return fmt.Errorf("expected newline, got 0x%x", delim)
- }
-
- if !git.IsLFSPointer(b) {
- continue
- }
-
- if err := chunker.Send(&gitalypb.LFSPointer{
- Oid: info.Oid,
- Size: info.Size,
- Data: b,
- }); err != nil {
- return err
- }
+ if err := parseCatfileOut(catBatch.Reader(), stream); err != nil {
+ return err
}
for i := len(commands) - 1; i >= 0; i-- {
@@ -633,7 +618,7 @@ func getAllLFSPointersAwk3(repository *gitalypb.Repository, stream gitalypb.Blob
}
}
- return chunker.Flush()
+ return nil
}
func getAllLFSPointersSmallBlob(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
@@ -646,65 +631,72 @@ func getAllLFSPointersSmallBlob(repository *gitalypb.Repository, stream gitalypb
ctx := stream.Context()
- cmd := exec.Command("/tmp/smallblob/smallblob",repoPath)
+ cmd := exec.Command("/tmp/smallblob/smallblob", repoPath)
smallblob, err := command.New(ctx, cmd, nil, nil, nil)
if err != nil {
return err
}
commands = append(commands, smallblob)
+ if err := parseCatfileOut(smallblob.Reader(), stream); err != nil {
+ return err
+ }
- chunker := chunk.New(&allLFSPointersSender{stream: stream, lfsPointers: nil})
-
- r := bufio.NewReader(smallblob)
-
- for {
- _, err := r.Peek(1)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
-
- info, err := catfile.ParseObjectInfo(r)
- if err != nil {
- return err
- }
-
- b, err := ioutil.ReadAll(io.LimitReader(r, info.Size))
- if err != nil {
- return err
- }
- delim, err := r.ReadByte()
- if err != nil {
+ for i := len(commands) - 1; i >= 0; i-- {
+ if err := commands[i].Wait(); err != nil {
return err
}
- if delim != '\n' {
- return fmt.Errorf("unexpected character %x", delim)
- }
+ }
- if !git.IsLFSPointer(b) {
- continue
- }
+ return nil
+}
- if err := chunker.Send(&gitalypb.LFSPointer{
- Oid: info.Oid,
- Size: info.Size,
- Data: b,
- }); err != nil {
- return err
- }
+func getAllLFSPointersRubyScript(repository *gitalypb.Repository, stream gitalypb.BlobService_GetAllLFSPointersServer) error {
+ repoPath, err := helper.GetRepoPath(repository)
+ if err != nil {
+ return err
}
- for i := len(commands) - 1; i >= 0; i-- {
- if err := commands[i].Wait(); err != nil {
- return err
- }
+ ctx := stream.Context()
+
+ cmd := exec.Command("ruby", "--", "-", repoPath, fmt.Sprintf("%d", LfsPointerMinSize), fmt.Sprintf("%d", LfsPointerMaxSize))
+ cmd.Dir = config.Config.Ruby.Dir
+ ruby, err := command.New(ctx, cmd, strings.NewReader(rubyScript), nil, nil, os.Environ()...)
+ if err != nil {
+ return err
}
- if err := chunker.Flush(); err != nil {
+ if err := parseCatfileOut(ruby, stream); err != nil {
return err
}
- return nil
+ return ruby.Wait()
}
+
+var rubyScript = `
+
+def main(git_dir, minSize, maxSize)
+ IO.popen(%W[git -C #{git_dir} rev-list --all --filter=blob:limit=#{maxSize+1} --in-commit-order --objects], 'r') do |rev_list|
+ # Loading bundler and rugged is slow. Let's do it while we wait for git rev-list.
+ require 'bundler/setup'
+ require 'rugged'
+
+ repo = Rugged::Repository.new(git_dir)
+
+ rev_list.each_line do |line|
+ oid = line.split(' ', 2).first
+ abort "bad rev-list line #{line.inspect}" unless oid
+
+ header = repo.read_header(oid)
+ next unless header[:len] >= minSize && header[:len] <= maxSize && header[:type] == :blob
+
+ puts "#{oid} blob #{header[:len]}"
+ $stdout.write(repo.lookup(oid).content)
+ puts # newline separator, just like git cat-file
+ end
+ end
+
+ abort 'rev-list failed' unless $?.success?
+end
+
+main(ARGV[0], Integer(ARGV[1]), Integer(ARGV[2]))
+`
diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go
index 86a8fc0b2..d8c4deba4 100644
--- a/internal/service/blob/lfs_pointers_test.go
+++ b/internal/service/blob/lfs_pointers_test.go
@@ -4,8 +4,10 @@ import (
"io"
"os"
"os/exec"
+ "path/filepath"
"runtime/pprof"
"testing"
+ "time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/config"
@@ -498,11 +500,13 @@ func BenchmarkGetAllLFS(b *testing.B) {
t := testing.TB(b)
server, serverSocketPath := runBlobServer(t)
defer server.Stop()
-
+ time.Sleep(5 * time.Second) //ruby server boot
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
- config.Config.Storages = append(config.Config.Storages, config.Storage{Name: "bench", Path: "testdata"})
+ benchStorageDir, err := filepath.Abs("testdata")
+ require.NoError(t, err)
+ config.Config.Storages = append(config.Config.Storages, config.Storage{Name: "bench", Path: benchStorageDir})
testRepo := &gitalypb.Repository{StorageName: "bench", RelativePath: "git.git"}
ctx, cancel := testhelper.Context()
@@ -517,19 +521,21 @@ func BenchmarkGetAllLFS(b *testing.B) {
name string
flag string
}{
- {name: "smallblob", flag: "smallblob"},
+ // {name: "smallblob", flag: "smallblob"},
+ {name: "ruby-script", flag: "ruby-script"},
+ {name: "ruby", flag: ""},
{name: "awk2", flag: "awk2"},
+ {name: "awk4", flag: "awk4"},
{name: "awk3", flag: "awk3"},
- {name: "go", flag: featureflag.GetAllLFSPointersGo},
- {name: "ruby", flag: ""},
{name: "awk", flag: "awk"},
+ {name: "go", flag: featureflag.GetAllLFSPointersGo},
}
for _, cs := range cases {
b.Run(cs.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
func() {
- if i == 0 &&false{
+ if i == 0 && false {
f, err := os.Create("/tmp/cpu." + cs.name)
require.NoError(b, err)
defer f.Close()