Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-01-10 20:36:19 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-01-10 20:36:19 +0300
commit4e0ebbbecf351c0815778d713798c69a08af6e60 (patch)
tree56b5e2e46fd1e68f30274668c2da1e97a1af6802
parent39172dd2700de2f343f324383564da9c32d8571e (diff)
parent5e6c4b745e81a82f71cda21a92c342d595943c50 (diff)
Merge branch 'response-chunker' into 'master'
Add response chunker abstraction See merge request gitlab-org/gitaly!1031
-rw-r--r--changelogs/unreleased/response-chunker.yml5
-rw-r--r--internal/helper/chunker/chunker.go59
-rw-r--r--internal/service/commit/list_commits_by_oid.go33
-rw-r--r--internal/service/commit/list_files.go86
-rw-r--r--internal/service/commit/tree_entries.go28
5 files changed, 146 insertions, 65 deletions
diff --git a/changelogs/unreleased/response-chunker.yml b/changelogs/unreleased/response-chunker.yml
new file mode 100644
index 000000000..9407a455f
--- /dev/null
+++ b/changelogs/unreleased/response-chunker.yml
@@ -0,0 +1,5 @@
+---
+title: Add response chunker abstraction
+merge_request: 1031
+author:
+type: other
diff --git a/internal/helper/chunker/chunker.go b/internal/helper/chunker/chunker.go
new file mode 100644
index 000000000..f8bf5dc1b
--- /dev/null
+++ b/internal/helper/chunker/chunker.go
@@ -0,0 +1,59 @@
+package chunker
+
+// Item could be e.g. a commit in an RPC that returns a chunked stream of
+// commits.
+type Item interface{}
+
+// Sender encapsulates a gRPC response stream and the current chunk
+// that's being built.
+//
+// Reset, Append, [Append...], Send, Reset, Append, [Append...], Send, ...
+type Sender interface {
+ // Reset should create a fresh response message.
+ Reset()
+ // Append should append the given item to the slice in the current response message
+ Append(Item)
+ // Send should send the current response message
+ Send() error
+}
+
+// New returns a new Chunker.
+func New(s Sender) *Chunker { return &Chunker{s: s} }
+
+// Chunker lets you spread items you want to send over multiple chunks.
+// This type is not thread-safe.
+type Chunker struct {
+ s Sender
+ n int
+}
+
+// Send will append an item to the current chunk and send the chunk if it is full.
+func (c *Chunker) Send(it Item) error {
+ if c.n == 0 {
+ c.s.Reset()
+ }
+
+ c.s.Append(it)
+ c.n++
+
+ const chunkSize = 20
+ if c.n >= chunkSize {
+ return c.sendResponseMsg()
+ }
+
+ return nil
+}
+
+func (c *Chunker) sendResponseMsg() error {
+ c.n = 0
+ return c.s.Send()
+}
+
+// Flush sends remaining items in the current chunk, if any.
+func (c *Chunker) Flush() error {
+ if c.n == 0 {
+ return nil
+ }
+
+ return c.sendResponseMsg()
+}
diff --git a/internal/service/commit/list_commits_by_oid.go b/internal/service/commit/list_commits_by_oid.go
index 05a7a1deb..1fd31f7ae 100644
--- a/internal/service/commit/list_commits_by_oid.go
+++ b/internal/service/commit/list_commits_by_oid.go
@@ -4,10 +4,9 @@ import (
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/chunker"
)
-const batchSizeListCommitsByOid = 20
-
func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream gitalypb.CommitService_ListCommitsByOidServer) error {
ctx := stream.Context()
@@ -16,11 +15,8 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g
return err
}
- send := func(commits []*gitalypb.GitCommit) error {
- return stream.Send(&gitalypb.ListCommitsByOidResponse{Commits: commits})
- }
+ sender := chunker.New(&commitsByOidSender{stream: stream})
- var commits []*gitalypb.GitCommit
for _, oid := range in.Oid {
commit, err := gitlog.GetCommitCatfile(c, oid)
if err != nil {
@@ -31,19 +27,22 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g
continue
}
- commits = append(commits, commit)
-
- if len(commits) == batchSizeListCommitsByOid {
- if err := send(commits); err != nil {
- return err
- }
- commits = nil
+ if err := sender.Send(commit); err != nil {
+ return err
}
}
- if len(commits) > 0 {
- return send(commits)
- }
+ return sender.Flush()
+}
+
+type commitsByOidSender struct {
+ response *gitalypb.ListCommitsByOidResponse
+ stream gitalypb.CommitService_ListCommitsByOidServer
+}
- return nil
+func (c *commitsByOidSender) Append(it chunker.Item) {
+ c.response.Commits = append(c.response.Commits, it.(*gitalypb.GitCommit))
}
+
+func (c *commitsByOidSender) Send() error { return c.stream.Send(c.response) }
+func (c *commitsByOidSender) Reset() { c.response = &gitalypb.ListCommitsByOidResponse{} }
diff --git a/internal/service/commit/list_files.go b/internal/service/commit/list_files.go
index 54db537ce..f5a100e8c 100644
--- a/internal/service/commit/list_files.go
+++ b/internal/service/commit/list_files.go
@@ -1,16 +1,17 @@
package commit
import (
- "bytes"
+ "fmt"
+ "io"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/git/lstree"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/helper/lines"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/chunker"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.CommitService_ListFilesServer) error {
@@ -23,51 +24,64 @@ func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.Commit
return err
}
- revision := in.GetRevision()
+ revision := string(in.GetRevision())
if len(revision) == 0 {
- var err error
-
- revision, err = defaultBranchName(stream.Context(), repo)
+ defaultBranch, err := defaultBranchName(stream.Context(), repo)
if err != nil {
- if _, ok := status.FromError(err); ok {
- return err
- }
- return status.Errorf(codes.NotFound, "Revision not found %q", in.GetRevision())
+ return helper.DecorateError(codes.NotFound, fmt.Errorf("revision not found %q", revision))
}
+
+ revision = string(defaultBranch)
}
- if !git.IsValidRef(stream.Context(), repo, string(revision)) {
+
+ if !git.IsValidRef(stream.Context(), repo, revision) {
return stream.Send(&gitalypb.ListFilesResponse{})
}
- cmd, err := git.Command(stream.Context(), repo, "ls-tree", "-z", "-r", "--full-tree", "--full-name", "--", string(revision))
+ if err := listFiles(repo, revision, stream); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+}
+
+func listFiles(repo *gitalypb.Repository, revision string, stream gitalypb.CommitService_ListFilesServer) error {
+ args := []string{"ls-tree", "-z", "-r", "--full-tree", "--full-name", "--", revision}
+ cmd, err := git.Command(stream.Context(), repo, args...)
if err != nil {
- if _, ok := status.FromError(err); ok {
+ return err
+ }
+
+ sender := chunker.New(&listFilesSender{stream: stream})
+
+ for parser := lstree.NewParser(cmd); ; {
+ entry, err := parser.NextEntry()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
return err
}
- return status.Errorf(codes.Internal, err.Error())
- }
- return lines.Send(cmd, listFilesWriter(stream), []byte{'\x00'})
-}
+ if entry.Type != lstree.Blob {
+ continue
+ }
-func listFilesWriter(stream gitalypb.CommitService_ListFilesServer) lines.Sender {
- return func(objs [][]byte) error {
- paths := make([][]byte, 0)
- for _, obj := range objs {
- data := bytes.SplitN(obj, []byte{'\t'}, 2)
- if len(data) != 2 {
- return status.Errorf(codes.Internal, "ListFiles: failed parsing line")
- }
-
- meta := bytes.SplitN(data[0], []byte{' '}, 3)
- if len(meta) != 3 {
- return status.Errorf(codes.Internal, "ListFiles: failed parsing meta")
- }
-
- if bytes.Equal(meta[1], []byte("blob")) {
- paths = append(paths, data[1])
- }
+ if err := sender.Send([]byte(entry.Path)); err != nil {
+ return err
}
- return stream.Send(&gitalypb.ListFilesResponse{Paths: paths})
}
+
+ return sender.Flush()
+}
+
+type listFilesSender struct {
+ stream gitalypb.CommitService_ListFilesServer
+ response *gitalypb.ListFilesResponse
+}
+
+func (s *listFilesSender) Reset() { s.response = &gitalypb.ListFilesResponse{} }
+func (s *listFilesSender) Send() error { return s.stream.Send(s.response) }
+func (s *listFilesSender) Append(it chunker.Item) {
+ s.response.Paths = append(s.response.Paths, it.([]byte))
}
diff --git a/internal/service/commit/tree_entries.go b/internal/service/commit/tree_entries.go
index 1d3b09129..f1456a71f 100644
--- a/internal/service/commit/tree_entries.go
+++ b/internal/service/commit/tree_entries.go
@@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/chunker"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -62,23 +63,26 @@ func sendTreeEntries(stream gitalypb.CommitService_GetTreeEntriesServer, c *catf
}
}
- for len(entries) > maxTreeEntries {
- chunk := &gitalypb.GetTreeEntriesResponse{
- Entries: entries[:maxTreeEntries],
- }
- if err := stream.Send(chunk); err != nil {
- return err
- }
- entries = entries[maxTreeEntries:]
+ sender := chunker.New(&treeEntriesSender{stream: stream})
+ for _, e := range entries {
+ sender.Send(e)
}
- if len(entries) > 0 {
- return stream.Send(&gitalypb.GetTreeEntriesResponse{Entries: entries})
- }
+ return sender.Flush()
+}
- return nil
+type treeEntriesSender struct {
+ response *gitalypb.GetTreeEntriesResponse
+ stream gitalypb.CommitService_GetTreeEntriesServer
+}
+
+func (c *treeEntriesSender) Append(it chunker.Item) {
+ c.response.Entries = append(c.response.Entries, it.(*gitalypb.TreeEntry))
}
+func (c *treeEntriesSender) Send() error { return c.stream.Send(c.response) }
+func (c *treeEntriesSender) Reset() { c.response = &gitalypb.GetTreeEntriesResponse{} }
+
func (s *server) GetTreeEntries(in *gitalypb.GetTreeEntriesRequest, stream gitalypb.CommitService_GetTreeEntriesServer) error {
grpc_logrus.Extract(stream.Context()).WithFields(log.Fields{
"Revision": in.Revision,