diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-01-10 20:36:19 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-01-10 20:36:19 +0300 |
commit | 4e0ebbbecf351c0815778d713798c69a08af6e60 (patch) | |
tree | 56b5e2e46fd1e68f30274668c2da1e97a1af6802 | |
parent | 39172dd2700de2f343f324383564da9c32d8571e (diff) | |
parent | 5e6c4b745e81a82f71cda21a92c342d595943c50 (diff) |
Merge branch 'response-chunker' into 'master'
Add response chunker abstraction
See merge request gitlab-org/gitaly!1031
-rw-r--r-- | changelogs/unreleased/response-chunker.yml | 5 | ||||
-rw-r--r-- | internal/helper/chunker/chunker.go | 59 | ||||
-rw-r--r-- | internal/service/commit/list_commits_by_oid.go | 33 | ||||
-rw-r--r-- | internal/service/commit/list_files.go | 86 | ||||
-rw-r--r-- | internal/service/commit/tree_entries.go | 28 |
5 files changed, 146 insertions, 65 deletions
diff --git a/changelogs/unreleased/response-chunker.yml b/changelogs/unreleased/response-chunker.yml new file mode 100644 index 000000000..9407a455f --- /dev/null +++ b/changelogs/unreleased/response-chunker.yml @@ -0,0 +1,5 @@ +--- +title: Add response chunker abstraction +merge_request: 1031 +author: +type: other diff --git a/internal/helper/chunker/chunker.go b/internal/helper/chunker/chunker.go new file mode 100644 index 000000000..f8bf5dc1b --- /dev/null +++ b/internal/helper/chunker/chunker.go @@ -0,0 +1,59 @@ +package chunker + +// Item could be e.g. a commit in an RPC that returns a chunked stream of +// commits. +type Item interface{} + +// Sender encapsulates a gRPC response stream and the current chunk +// that's being built. +// +// Reset, Append, [Append...], Send, Reset, Append, [Append...], Send, ... +type Sender interface { + // Reset should create a fresh response message. + Reset() + // Append should append the given item to the slice in the current response message + Append(Item) + // Send should send the current response message + Send() error +} + +// New returns a new Chunker. +func New(s Sender) *Chunker { return &Chunker{s: s} } + +// Chunker lets you spread items you want to send over multiple chunks. +// This type is not thread-safe. +type Chunker struct { + s Sender + n int +} + +// Send will append an item to the current chunk and send the chunk if it is full. +func (c *Chunker) Send(it Item) error { + if c.n == 0 { + c.s.Reset() + } + + c.s.Append(it) + c.n++ + + const chunkSize = 20 + if c.n >= chunkSize { + return c.sendResponseMsg() + } + + return nil +} + +func (c *Chunker) sendResponseMsg() error { + c.n = 0 + return c.s.Send() +} + +// Flush sends remaining items in the current chunk, if any. +func (c *Chunker) Flush() error { + if c.n == 0 { + return nil + } + + return c.sendResponseMsg() +} diff --git a/internal/service/commit/list_commits_by_oid.go b/internal/service/commit/list_commits_by_oid.go index 05a7a1deb..1fd31f7ae 100644 --- a/internal/service/commit/list_commits_by_oid.go +++ b/internal/service/commit/list_commits_by_oid.go @@ -4,10 +4,9 @@ import ( "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" gitlog "gitlab.com/gitlab-org/gitaly/internal/git/log" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunker" ) -const batchSizeListCommitsByOid = 20 - func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream gitalypb.CommitService_ListCommitsByOidServer) error { ctx := stream.Context() @@ -16,11 +15,8 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g return err } - send := func(commits []*gitalypb.GitCommit) error { - return stream.Send(&gitalypb.ListCommitsByOidResponse{Commits: commits}) - } + sender := chunker.New(&commitsByOidSender{stream: stream}) - var commits []*gitalypb.GitCommit for _, oid := range in.Oid { commit, err := gitlog.GetCommitCatfile(c, oid) if err != nil { @@ -31,19 +27,22 @@ func (s *server) ListCommitsByOid(in *gitalypb.ListCommitsByOidRequest, stream g continue } - commits = append(commits, commit) - - if len(commits) == batchSizeListCommitsByOid { - if err := send(commits); err != nil { - return err - } - commits = nil + if err := sender.Send(commit); err != nil { + return err } } - if len(commits) > 0 { - return send(commits) - } + return sender.Flush() +} + +type commitsByOidSender struct { + response *gitalypb.ListCommitsByOidResponse + stream gitalypb.CommitService_ListCommitsByOidServer +} - return nil +func (c *commitsByOidSender) Append(it chunker.Item) { + c.response.Commits = append(c.response.Commits, it.(*gitalypb.GitCommit)) } + +func (c *commitsByOidSender) Send() error { return c.stream.Send(c.response) } +func (c *commitsByOidSender) Reset() { c.response = &gitalypb.ListCommitsByOidResponse{} } diff --git a/internal/service/commit/list_files.go b/internal/service/commit/list_files.go index 54db537ce..f5a100e8c 100644 --- a/internal/service/commit/list_files.go +++ b/internal/service/commit/list_files.go @@ -1,16 +1,17 @@ package commit import ( - "bytes" + "fmt" + "io" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/git" + "gitlab.com/gitlab-org/gitaly/internal/git/lstree" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/helper/lines" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunker" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.CommitService_ListFilesServer) error { @@ -23,51 +24,64 @@ func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.Commit return err } - revision := in.GetRevision() + revision := string(in.GetRevision()) if len(revision) == 0 { - var err error - - revision, err = defaultBranchName(stream.Context(), repo) + defaultBranch, err := defaultBranchName(stream.Context(), repo) if err != nil { - if _, ok := status.FromError(err); ok { - return err - } - return status.Errorf(codes.NotFound, "Revision not found %q", in.GetRevision()) + return helper.DecorateError(codes.NotFound, fmt.Errorf("revision not found %q", revision)) } + + revision = string(defaultBranch) } - if !git.IsValidRef(stream.Context(), repo, string(revision)) { + + if !git.IsValidRef(stream.Context(), repo, revision) { return stream.Send(&gitalypb.ListFilesResponse{}) } - cmd, err := git.Command(stream.Context(), repo, "ls-tree", "-z", "-r", "--full-tree", "--full-name", "--", string(revision)) + if err := listFiles(repo, revision, stream); err != nil { + return helper.ErrInternal(err) + } + + return nil +} + +func listFiles(repo *gitalypb.Repository, revision string, stream gitalypb.CommitService_ListFilesServer) error { + args := []string{"ls-tree", "-z", "-r", "--full-tree", "--full-name", "--", revision} + cmd, err := git.Command(stream.Context(), repo, args...) if err != nil { - if _, ok := status.FromError(err); ok { + return err + } + + sender := chunker.New(&listFilesSender{stream: stream}) + + for parser := lstree.NewParser(cmd); ; { + entry, err := parser.NextEntry() + if err == io.EOF { + break + } + if err != nil { return err } - return status.Errorf(codes.Internal, err.Error()) - } - return lines.Send(cmd, listFilesWriter(stream), []byte{'\x00'}) -} + if entry.Type != lstree.Blob { + continue + } -func listFilesWriter(stream gitalypb.CommitService_ListFilesServer) lines.Sender { - return func(objs [][]byte) error { - paths := make([][]byte, 0) - for _, obj := range objs { - data := bytes.SplitN(obj, []byte{'\t'}, 2) - if len(data) != 2 { - return status.Errorf(codes.Internal, "ListFiles: failed parsing line") - } - - meta := bytes.SplitN(data[0], []byte{' '}, 3) - if len(meta) != 3 { - return status.Errorf(codes.Internal, "ListFiles: failed parsing meta") - } - - if bytes.Equal(meta[1], []byte("blob")) { - paths = append(paths, data[1]) - } + if err := sender.Send([]byte(entry.Path)); err != nil { + return err } - return stream.Send(&gitalypb.ListFilesResponse{Paths: paths}) } + + return sender.Flush() +} + +type listFilesSender struct { + stream gitalypb.CommitService_ListFilesServer + response *gitalypb.ListFilesResponse +} + +func (s *listFilesSender) Reset() { s.response = &gitalypb.ListFilesResponse{} } +func (s *listFilesSender) Send() error { return s.stream.Send(s.response) } +func (s *listFilesSender) Append(it chunker.Item) { + s.response.Paths = append(s.response.Paths, it.([]byte)) } diff --git a/internal/service/commit/tree_entries.go b/internal/service/commit/tree_entries.go index 1d3b09129..f1456a71f 100644 --- a/internal/service/commit/tree_entries.go +++ b/internal/service/commit/tree_entries.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunker" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -62,23 +63,26 @@ func sendTreeEntries(stream gitalypb.CommitService_GetTreeEntriesServer, c *catf } } - for len(entries) > maxTreeEntries { - chunk := &gitalypb.GetTreeEntriesResponse{ - Entries: entries[:maxTreeEntries], - } - if err := stream.Send(chunk); err != nil { - return err - } - entries = entries[maxTreeEntries:] + sender := chunker.New(&treeEntriesSender{stream: stream}) + for _, e := range entries { + sender.Send(e) } - if len(entries) > 0 { - return stream.Send(&gitalypb.GetTreeEntriesResponse{Entries: entries}) - } + return sender.Flush() +} - return nil +type treeEntriesSender struct { + response *gitalypb.GetTreeEntriesResponse + stream gitalypb.CommitService_GetTreeEntriesServer +} + +func (c *treeEntriesSender) Append(it chunker.Item) { + c.response.Entries = append(c.response.Entries, it.(*gitalypb.TreeEntry)) } +func (c *treeEntriesSender) Send() error { return c.stream.Send(c.response) } +func (c *treeEntriesSender) Reset() { c.response = &gitalypb.GetTreeEntriesResponse{} } + func (s *server) GetTreeEntries(in *gitalypb.GetTreeEntriesRequest, stream gitalypb.CommitService_GetTreeEntriesServer) error { grpc_logrus.Extract(stream.Context()).WithFields(log.Fields{ "Revision": in.Revision, |