diff options
author | John Cai <jcai@gitlab.com> | 2019-01-04 02:48:04 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-01-09 22:34:46 +0300 |
commit | 5ca020fd8e1126e70c109682dacace8cee870c03 (patch) | |
tree | 1e7f0a4d7b3344cda812e9a4f5827170cbae47cb | |
parent | 55c8f124608b22288b2fee99ec8295d3fd980e12 (diff) |
use go implementation of find commits
refactoring logic to stream commits
manually handle --skip and --follow
use reverse by default when --all is specified
minor fixes
-rw-r--r-- | changelogs/unreleased/jc-migrate-find-commits.yml | 5 | ||||
-rw-r--r-- | internal/service/commit/find_commits.go | 177 | ||||
-rw-r--r-- | internal/service/commit/find_commits_test.go | 50 |
3 files changed, 213 insertions, 19 deletions
diff --git a/changelogs/unreleased/jc-migrate-find-commits.yml b/changelogs/unreleased/jc-migrate-find-commits.yml new file mode 100644 index 000000000..91f4f7f30 --- /dev/null +++ b/changelogs/unreleased/jc-migrate-find-commits.yml @@ -0,0 +1,5 @@ +--- +title: Use go implementation of FindCommits +merge_request: 1025 +author: +type: other diff --git a/internal/service/commit/find_commits.go b/internal/service/commit/find_commits.go index 6aa4d7298..bd7a178f5 100644 --- a/internal/service/commit/find_commits.go +++ b/internal/service/commit/find_commits.go @@ -1,12 +1,22 @@ package commit import ( + "bufio" + "context" + "errors" + "fmt" + "strings" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" - "gitlab.com/gitlab-org/gitaly/internal/rubyserver" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "gitlab.com/gitlab-org/gitaly/internal/command" + "gitlab.com/gitlab-org/gitaly/internal/git" + "gitlab.com/gitlab-org/gitaly/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/internal/git/log" + "gitlab.com/gitlab-org/gitaly/internal/helper" ) +const commitsPerPage int = 20 + func (s *server) FindCommits(req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) error { ctx := stream.Context() @@ -16,39 +26,168 @@ func (s *server) FindCommits(req *gitalypb.FindCommitsRequest, stream gitalypb.C var err error req.Revision, err = defaultBranchName(ctx, req.Repository) if err != nil { - return status.Errorf(codes.Internal, "defaultBranchName: %v", err) + return helper.ErrInternal(fmt.Errorf("defaultBranchName: %v", err)) } } - // Clients might send empty paths. That is an error for _, path := range req.Paths { if len(path) == 0 { - return status.Errorf(codes.InvalidArgument, "path is empty string") + return helper.ErrInvalidArgument(errors.New("path is empty string")) } } - client, err := s.CommitServiceClient(ctx) - if err != nil { - return err + if err := findCommits(ctx, req, stream); err != nil { + return helper.ErrInternal(err) } - clientCtx, err := rubyserver.SetHeaders(ctx, req.GetRepository()) + return nil +} + +func findCommits(ctx context.Context, req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) error { + + args := getLogCommandFlags(req) + logCmd, err := git.Command(ctx, req.GetRepository(), args...) if err != nil { - return err + return fmt.Errorf("error when creating git log command: %v", err) + } + batch, err := catfile.New(ctx, req.GetRepository()) + if err != nil { + return fmt.Errorf("creating catfile: %v", err) + } + + getCommits := NewGetCommits(logCmd, batch) + + if calculateOffsetManually(req) { + getCommits.Offset(int(req.GetOffset())) + } + + if err := streamPaginatedCommits(getCommits, commitsPerPage, stream); err != nil { + return fmt.Errorf("error streaming commits: %v", err) } + return nil +} + +func calculateOffsetManually(req *gitalypb.FindCommitsRequest) bool { + return req.GetFollow() && req.GetOffset() > 0 +} + +// GetCommits wraps a git log command that can be interated on to get individual commit objects +type GetCommits struct { + scanner *bufio.Scanner + batch *catfile.Batch +} + +// NewGetCommits returns a new GetCommits object +func NewGetCommits(cmd *command.Command, batch *catfile.Batch) *GetCommits { + return &GetCommits{ + scanner: bufio.NewScanner(cmd), + batch: batch, + } +} + +// Scan indicates whether or not there are more commits to return +func (g *GetCommits) Scan() bool { + return g.scanner.Scan() +} + +// Err returns the first non EOF error +func (g *GetCommits) Err() error { + return g.scanner.Err() +} - rubyStream, err := client.FindCommits(clientCtx, req) +// Offset skips over a number of commits +func (g *GetCommits) Offset(offset int) error { + for i := 0; i < offset; i++ { + if !g.Scan() { + return fmt.Errorf("offset %d is invalid: %v", offset, g.scanner.Err()) + } + } + return nil +} + +// Commit returns the current commit +func (g *GetCommits) Commit() (*gitalypb.GitCommit, error) { + revision := strings.TrimSpace(g.scanner.Text()) + commit, err := log.GetCommitCatfile(g.batch, revision) if err != nil { - return err + return nil, fmt.Errorf("cat-file get commit %q: %v", revision, err) } + return commit, nil +} + +func streamPaginatedCommits(getCommits *GetCommits, commitsPerPage int, stream gitalypb.CommitService_FindCommitsServer) error { + var commitPage []*gitalypb.GitCommit - return rubyserver.Proxy(func() error { - resp, err := rubyStream.Recv() + for getCommits.Scan() { + commit, err := getCommits.Commit() if err != nil { - md := rubyStream.Trailer() - stream.SetTrailer(md) return err } - return stream.Send(resp) - }) + commitPage = append(commitPage, commit) + if len(commitPage) == commitsPerPage { + if err := stream.Send( + &gitalypb.FindCommitsResponse{ + Commits: commitPage, + }, + ); err != nil { + return fmt.Errorf("error when sending stream response: %v", err) + } + commitPage = nil + } + } + if getCommits.Err() != nil { + return fmt.Errorf("get commits: %v", getCommits.Err()) + } + // send the last page + if len(commitPage) > 0 { + if err := stream.Send( + &gitalypb.FindCommitsResponse{ + Commits: commitPage, + }, + ); err != nil { + return fmt.Errorf("error when sending stream response: %v", err) + } + } + return nil +} + +func getLogCommandFlags(req *gitalypb.FindCommitsRequest) []string { + args := []string{"log", "--format=format:%H"} + + // We will perform the offset in Go because --follow doesn't play well with --skip. + // See: https://gitlab.com/gitlab-org/gitlab-ce/issues/3574#note_3040520 + if req.GetOffset() > 0 && !calculateOffsetManually(req) { + args = append(args, fmt.Sprintf("--skip=%d", req.GetOffset())) + } + limit := req.GetLimit() + if calculateOffsetManually(req) { + limit += req.GetOffset() + } + args = append(args, fmt.Sprintf("--max-count=%d", limit)) + + if req.GetFollow() && len(req.GetPaths()) > 0 { + args = append(args, "--follow") + } + if req.GetSkipMerges() { + args = append(args, "--no-merges") + } + if req.GetBefore() != nil { + args = append(args, fmt.Sprintf("--before=%s", req.GetBefore().String())) + } + if req.GetAfter() != nil { + args = append(args, fmt.Sprintf("--after=%s", req.GetAfter().String())) + } + if req.GetAll() { + args = append(args, "--all", "--reverse") + } + if req.GetRevision() != nil { + args = append(args, string(req.GetRevision())) + } + if len(req.GetPaths()) > 0 { + args = append(args, "--") + for _, path := range req.GetPaths() { + args = append(args, string(path)) + } + } + return args } diff --git a/internal/service/commit/find_commits_test.go b/internal/service/commit/find_commits_test.go index 4027414e7..ee7d1fc17 100644 --- a/internal/service/commit/find_commits_test.go +++ b/internal/service/commit/find_commits_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/golang/protobuf/ptypes/timestamp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -408,3 +409,52 @@ func TestFailureFindCommitsRequest(t *testing.T) { }) } } + +func TestFindCommitsRequestWithFollowAndOffset(t *testing.T) { + server, serverSocketPath := startTestServices(t) + defer server.Stop() + + client, conn := newCommitServiceClient(t, serverSocketPath) + defer conn.Close() + + testRepo, _, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + request := &gitalypb.FindCommitsRequest{ + Repository: testRepo, + Follow: true, + Paths: [][]byte{[]byte("CHANGELOG")}, + Limit: 100, + } + ctx, cancel := testhelper.Context() + defer cancel() + allCommits := getCommits(ctx, t, request, client) + totalCommits := len(allCommits) + + for offset := 0; offset < totalCommits; offset++ { + t.Run(fmt.Sprintf("testing with offset %d", offset), func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + request.Offset = int32(offset) + request.Limit = int32(totalCommits) + commits := getCommits(ctx, t, request, client) + assert.Len(t, commits, totalCommits-offset) + assert.Equal(t, allCommits[offset:], commits) + }) + } +} + +func getCommits(ctx context.Context, t *testing.T, request *gitalypb.FindCommitsRequest, client gitalypb.CommitServiceClient) []*gitalypb.GitCommit { + stream, err := client.FindCommits(ctx, request) + require.NoError(t, err) + + var commits []*gitalypb.GitCommit + for err == nil { + var resp *gitalypb.FindCommitsResponse + resp, err = stream.Recv() + commits = append(commits, resp.GetCommits()...) + } + + require.Equal(t, io.EOF, err) + return commits +} |