diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-01-31 00:41:11 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-01-31 00:41:11 +0300 |
commit | 69c20c816d6fc7f2147961b58e513686cba925a7 (patch) | |
tree | 949b61fe767d2e7005398fa209ebfefbad46b633 | |
parent | 4f3ed56eea905196027ec9aeec99829458ae9be6 (diff) |
Switch from commitsSender to chunker
-rw-r--r-- | changelogs/unreleased/commits-sender-to-chunker.yml | 5 | ||||
-rw-r--r-- | internal/service/commit/between.go | 42 | ||||
-rw-r--r-- | internal/service/commit/commits_by_message.go | 36 | ||||
-rw-r--r-- | internal/service/commit/commits_helper.go | 25 | ||||
-rw-r--r-- | internal/service/commit/find_all_commits.go | 57 |
5 files changed, 99 insertions, 66 deletions
diff --git a/changelogs/unreleased/commits-sender-to-chunker.yml b/changelogs/unreleased/commits-sender-to-chunker.yml new file mode 100644 index 000000000..145ec417b --- /dev/null +++ b/changelogs/unreleased/commits-sender-to-chunker.yml @@ -0,0 +1,5 @@ +--- +title: Switch from commitsSender to chunker +merge_request: 1060 +author: +type: other diff --git a/internal/service/commit/between.go b/internal/service/commit/between.go index be761556e..f6cfa34f5 100644 --- a/internal/service/commit/between.go +++ b/internal/service/commit/between.go @@ -5,28 +5,46 @@ import ( "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/git" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" ) type commitsBetweenSender struct { - stream gitalypb.CommitService_CommitsBetweenServer + stream gitalypb.CommitService_CommitsBetweenServer + commits []*gitalypb.GitCommit +} + +func (sender *commitsBetweenSender) Reset() { sender.commits = nil } +func (sender *commitsBetweenSender) Append(it chunk.Item) { + sender.commits = append(sender.commits, it.(*gitalypb.GitCommit)) +} +func (sender *commitsBetweenSender) Send() error { + return sender.stream.Send(&gitalypb.CommitsBetweenResponse{Commits: sender.commits}) } func (s *server) CommitsBetween(in *gitalypb.CommitsBetweenRequest, stream gitalypb.CommitService_CommitsBetweenServer) error { - if err := git.ValidateRevision(in.GetFrom()); err != nil { - return status.Errorf(codes.InvalidArgument, "CommitsBetween: from: %v", err) - } - if err := git.ValidateRevision(in.GetTo()); err != nil { - return status.Errorf(codes.InvalidArgument, "CommitsBetween: to: %v", err) + if err := validateCommitsBetween(in); err != nil { + return helper.ErrInvalidArgument(err) } - sender := &commitsBetweenSender{stream} + sender := &commitsBetweenSender{stream: stream} revisionRange := fmt.Sprintf("%s..%s", in.GetFrom(), in.GetTo()) - return sendCommits(stream.Context(), sender, in.GetRepository(), []string{revisionRange}, nil, "--reverse") + if err := sendCommits(stream.Context(), sender, in.GetRepository(), []string{revisionRange}, nil, "--reverse"); err != nil { + return helper.ErrInternal(err) + } + + return nil } -func (sender *commitsBetweenSender) Send(commits []*gitalypb.GitCommit) error { - return sender.stream.Send(&gitalypb.CommitsBetweenResponse{Commits: commits}) +func validateCommitsBetween(in *gitalypb.CommitsBetweenRequest) error { + if err := git.ValidateRevision(in.GetFrom()); err != nil { + return fmt.Errorf("from: %v", err) + } + + if err := git.ValidateRevision(in.GetTo()); err != nil { + return fmt.Errorf("to: %v", err) + } + + return nil } diff --git a/internal/service/commit/commits_by_message.go b/internal/service/commit/commits_by_message.go index a1a06fbdb..87ca6622f 100644 --- a/internal/service/commit/commits_by_message.go +++ b/internal/service/commit/commits_by_message.go @@ -4,21 +4,38 @@ import ( "fmt" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" ) type commitsByMessageSender struct { - stream gitalypb.CommitService_CommitsByMessageServer + stream gitalypb.CommitService_CommitsByMessageServer + commits []*gitalypb.GitCommit +} + +func (sender *commitsByMessageSender) Reset() { sender.commits = nil } +func (sender *commitsByMessageSender) Append(it chunk.Item) { + sender.commits = append(sender.commits, it.(*gitalypb.GitCommit)) +} +func (sender *commitsByMessageSender) Send() error { + return sender.stream.Send(&gitalypb.CommitsByMessageResponse{Commits: sender.commits}) } func (s *server) CommitsByMessage(in *gitalypb.CommitsByMessageRequest, stream gitalypb.CommitService_CommitsByMessageServer) error { if err := validateCommitsByMessageRequest(in); err != nil { - return status.Errorf(codes.InvalidArgument, "CommitsByMessage: %v", err) + return helper.ErrInvalidArgument(err) + } + + if err := commitsByMessage(in, stream); err != nil { + return helper.ErrInternal(err) } + return nil +} + +func commitsByMessage(in *gitalypb.CommitsByMessageRequest, stream gitalypb.CommitService_CommitsByMessageServer) error { ctx := stream.Context() - sender := &commitsByMessageSender{stream} + sender := &commitsByMessageSender{stream: stream} gitLogExtraOptions := []string{ "--grep=" + in.GetQuery(), @@ -37,10 +54,7 @@ func (s *server) CommitsByMessage(in *gitalypb.CommitsByMessageRequest, stream g revision, err = defaultBranchName(ctx, in.Repository) if err != nil { - if _, ok := status.FromError(err); ok { - return err - } - return status.Errorf(codes.Internal, "CommitsByMessage: defaultBranchName: %v", err) + return err } } @@ -59,7 +73,3 @@ func validateCommitsByMessageRequest(in *gitalypb.CommitsByMessageRequest) error return nil } - -func (sender *commitsByMessageSender) Send(commits []*gitalypb.GitCommit) error { - return sender.stream.Send(&gitalypb.CommitsByMessageResponse{Commits: commits}) -} diff --git a/internal/service/commit/commits_helper.go b/internal/service/commit/commits_helper.go index d1480991c..ba079dff7 100644 --- a/internal/service/commit/commits_helper.go +++ b/internal/service/commit/commits_helper.go @@ -6,15 +6,10 @@ import ( grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/git/log" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" ) -type commitsSender interface { - Send([]*gitalypb.GitCommit) error -} - -const commitsPerChunk = 20 - -func sendCommits(ctx context.Context, sender commitsSender, repo *gitalypb.Repository, revisionRange []string, paths []string, extraArgs ...string) error { +func sendCommits(ctx context.Context, sender chunk.Sender, repo *gitalypb.Repository, revisionRange []string, paths []string, extraArgs ...string) error { cmd, err := log.GitLogCommand(ctx, repo, revisionRange, paths, extraArgs...) if err != nil { return err @@ -25,26 +20,18 @@ func sendCommits(ctx context.Context, sender commitsSender, repo *gitalypb.Repos return err } - var commits []*gitalypb.GitCommit - + chunker := chunk.New(sender) for logParser.Parse() { - commit := logParser.Commit() - - if len(commits) >= commitsPerChunk { - if err := sender.Send(commits); err != nil { - return err - } - commits = nil + if err := chunker.Send(logParser.Commit()); err != nil { + return err } - - commits = append(commits, commit) } if err := logParser.Err(); err != nil { return err } - if err := sender.Send(commits); err != nil { + if err := chunker.Flush(); err != nil { return err } diff --git a/internal/service/commit/find_all_commits.go b/internal/service/commit/find_all_commits.go index e6fc1049d..84b0639f7 100644 --- a/internal/service/commit/find_all_commits.go +++ b/internal/service/commit/find_all_commits.go @@ -4,20 +4,51 @@ import ( "fmt" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/internal/service/ref" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // We declare this function in variables so that we can override them in our tests var _findBranchNamesFunc = ref.FindBranchNames type findAllCommitsSender struct { - stream gitalypb.CommitService_FindAllCommitsServer + stream gitalypb.CommitService_FindAllCommitsServer + commits []*gitalypb.GitCommit +} + +func (sender *findAllCommitsSender) Reset() { sender.commits = nil } +func (sender *findAllCommitsSender) Append(it chunk.Item) { + sender.commits = append(sender.commits, it.(*gitalypb.GitCommit)) +} +func (sender *findAllCommitsSender) Send() error { + return sender.stream.Send(&gitalypb.FindAllCommitsResponse{Commits: sender.commits}) } func (s *server) FindAllCommits(in *gitalypb.FindAllCommitsRequest, stream gitalypb.CommitService_FindAllCommitsServer) error { - sender := &findAllCommitsSender{stream} + var revisions []string + if len(in.GetRevision()) == 0 { + branchNames, err := _findBranchNamesFunc(stream.Context(), in.Repository) + if err != nil { + return helper.ErrInvalidArgument(err) + } + + for _, branch := range branchNames { + revisions = append(revisions, string(branch)) + } + } else { + revisions = []string{string(in.GetRevision())} + } + + if err := findAllCommits(in, stream, revisions); err != nil { + return helper.ErrInternal(err) + } + + return nil +} + +func findAllCommits(in *gitalypb.FindAllCommitsRequest, stream gitalypb.CommitService_FindAllCommitsServer, revisions []string) error { + sender := &findAllCommitsSender{stream: stream} var gitLogExtraOptions []string if maxCount := in.GetMaxCount(); maxCount > 0 { @@ -35,23 +66,5 @@ func (s *server) FindAllCommits(in *gitalypb.FindAllCommitsRequest, stream gital gitLogExtraOptions = append(gitLogExtraOptions, "--topo-order") } - var revisions []string - if len(in.GetRevision()) == 0 { - branchNames, err := _findBranchNamesFunc(stream.Context(), in.Repository) - if err != nil { - return status.Errorf(codes.InvalidArgument, "FindAllCommits: %v", err) - } - - for _, branch := range branchNames { - revisions = append(revisions, string(branch)) - } - } else { - revisions = []string{string(in.GetRevision())} - } - return sendCommits(stream.Context(), sender, in.GetRepository(), revisions, nil, gitLogExtraOptions...) } - -func (sender *findAllCommitsSender) Send(commits []*gitalypb.GitCommit) error { - return sender.stream.Send(&gitalypb.FindAllCommitsResponse{Commits: commits}) -} |