diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-11-28 09:59:31 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-11-28 09:59:31 +0300 |
commit | 4d45db6662456a9018c0ea07f6020bc91dc9394f (patch) | |
tree | 836b627196e1c2100dbeda7480816c07b2acb7ef /internal | |
parent | 3cd2a30b770d3312a1634d8eec49e34a592ef5d7 (diff) | |
parent | 9ae8aeb857f9c31d2fbc2ea5bb5b5011401e8cff (diff) |
Merge branch 'ej-5671-wait-for-git-log-to-finish' into 'master'
commit: Wait for the git-log(1) process to exit in FindCommits
Closes #5671
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6507
Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Eric Ju <eju@gitlab.com>
Co-authored-by: Eric Ju <eju@gitlab.com>
Diffstat (limited to 'internal')
-rw-r--r-- | internal/gitaly/service/commit/find_commits.go | 43 | ||||
-rw-r--r-- | internal/gitaly/service/commit/find_commits_test.go | 62 |
2 files changed, 94 insertions, 11 deletions
diff --git a/internal/gitaly/service/commit/find_commits.go b/internal/gitaly/service/commit/find_commits.go index e2d717b25..167909b23 100644 --- a/internal/gitaly/service/commit/find_commits.go +++ b/internal/gitaly/service/commit/find_commits.go @@ -65,14 +65,33 @@ func (s *server) FindCommits(req *gitalypb.FindCommitsRequest, stream gitalypb.C return nil } -func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) error { +func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsRequest, stream gitalypb.CommitService_FindCommitsServer) (returnedErr error) { + commitCounter := int32(0) opts := git.ConvertGlobalOptions(req.GetGlobalOptions()) repo := s.localrepo(req.GetRepository()) - logCmd, err := repo.Exec(ctx, getLogCommandSubCmd(req), append(opts, git.WithSetupStdout())...) + var stderr strings.Builder + gitLogCmd, limit := getLogCommandSubCmd(req) + logCmd, err := repo.Exec(ctx, gitLogCmd, append(opts, git.WithSetupStdout(), git.WithStderr(&stderr))...) if err != nil { return fmt.Errorf("error when creating git log command: %w", err) } + defer func() { + if err := logCmd.Wait(); err != nil { + // We differentiate benign errors from real errors in this deferred function. + // Benign errors are caused by terminating the stream since response limit is reached; + // real errors are caused by git log command failures such as timeout or OOM kill. + if limit >= 0 && commitCounter == limit { + // We already send the maximum number of commits, git log command is terminated. + s.logger.Debug("git log command terminated because maximum number of commits is reached") + } else if returnedErr == nil { + // Avoid overriding the real error. + returnedErr = structerr.NewInternal("listing commits failed").WithMetadata("error", err).WithMetadata("stderr", stderr.String()) + } else { + s.logger.WithError(err).WithField("stderr", stderr.String()).Error("listing commits failed") + } + } + }() objectReader, cancel, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { @@ -95,9 +114,11 @@ func (s *server) findCommits(ctx context.Context, req *gitalypb.FindCommitsReque } } - if err := streamCommits(getCommits, stream, req.GetTrailers(), req.GetIncludeShortstat(), len(req.GetIncludeReferencedBy()) > 0); err != nil { + commitCounter, err = streamCommits(getCommits, stream, req.GetTrailers(), req.GetIncludeShortstat(), len(req.GetIncludeReferencedBy()) > 0) + if err != nil { return fmt.Errorf("error streaming commits: %w", err) } + return nil } @@ -198,8 +219,9 @@ func (g *GetCommits) Commit(ctx context.Context, trailers, shortStat, refs bool) return commit, nil } -func streamCommits(getCommits *GetCommits, stream gitalypb.CommitService_FindCommitsServer, trailers, shortStat bool, refs bool) error { +func streamCommits(getCommits *GetCommits, stream gitalypb.CommitService_FindCommitsServer, trailers, shortStat bool, refs bool) (int32, error) { ctx := stream.Context() + commitCounter := int32(0) chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { @@ -212,22 +234,23 @@ func streamCommits(getCommits *GetCommits, stream gitalypb.CommitService_FindCom for getCommits.Scan() { commit, err := getCommits.Commit(ctx, trailers, shortStat, refs) if err != nil { - return err + return commitCounter, err } if err := chunker.Send(commit); err != nil { - return err + return commitCounter, err } + commitCounter++ } if getCommits.Err() != nil { - return fmt.Errorf("get commits: %w", getCommits.Err()) + return commitCounter, fmt.Errorf("get commits: %w", getCommits.Err()) } - return chunker.Flush() + return commitCounter, chunker.Flush() } -func getLogCommandSubCmd(req *gitalypb.FindCommitsRequest) git.Command { +func getLogCommandSubCmd(req *gitalypb.FindCommitsRequest) (git.Command, int32) { logFormatOption := "--format=%H" // To split the commits by '\x01' instead of '\n' if req.GetIncludeShortstat() { @@ -298,7 +321,7 @@ func getLogCommandSubCmd(req *gitalypb.FindCommitsRequest) git.Command { } } - return subCmd + return subCmd, limit } func parseRefs(refsLine string) [][]byte { diff --git a/internal/gitaly/service/commit/find_commits_test.go b/internal/gitaly/service/commit/find_commits_test.go index 50648526c..d46cb178a 100644 --- a/internal/gitaly/service/commit/find_commits_test.go +++ b/internal/gitaly/service/commit/find_commits_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/text/encoding/charmap" "google.golang.org/grpc/codes" @@ -648,16 +649,21 @@ func TestFindCommits_quarantine(t *testing.T) { desc string altDirs []string expectedCount int + expectedErr error }{ { desc: "present GIT_ALTERNATE_OBJECT_DIRECTORIES", altDirs: []string{altObjectsDir}, expectedCount: 1, + expectedErr: nil, }, { desc: "empty GIT_ALTERNATE_OBJECT_DIRECTORIES", altDirs: []string{}, expectedCount: 0, + expectedErr: testhelper.ToInterceptedMetadata( + structerr.NewInternal("listing commits failed").WithMetadata("error", "exit status 128").WithMetadata("stderr", + fmt.Sprintf("fatal: bad object %s\n", commitID.String()))), }, } { t.Run(tc.desc, func(t *testing.T) { @@ -668,7 +674,61 @@ func TestFindCommits_quarantine(t *testing.T) { Revision: []byte(commitID.String()), Limit: 1, }) - require.NoError(t, err) + testhelper.RequireGrpcError(t, tc.expectedErr, err) + require.Len(t, commits, tc.expectedCount) + }) + } +} + +func TestFindCommits_simulateGitLogWaitError(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logger := testhelper.NewLogger(t) + cfg, client := setupCommitService(t, ctx, testserver.WithLogger(logger)) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg) + // altObjectsDir is used to trigger git log wait error + // we will set this to empty string to trigger the error + altObjectsDir := "./alt-objects" + commitID := gittest.WriteCommit(t, cfg, repoPath, + gittest.WithAlternateObjectDirectory(filepath.Join(repoPath, altObjectsDir)), + ) + + for _, tc := range []struct { + desc string + altDirs []string + expectedCount int + expectedErr error + limit int32 + }{ + { + desc: "git log exit with error, with limit 0", + altDirs: []string{}, + limit: 0, + expectedCount: 0, + expectedErr: nil, + }, + { + desc: "limit 1, git log exit with error", + altDirs: []string{}, + limit: 1, + expectedCount: 0, + expectedErr: testhelper.ToInterceptedMetadata( + structerr.NewInternal("listing commits failed"). + WithMetadata("error", "exit status 128").WithMetadata("stderr", + fmt.Sprintf("fatal: bad object %s\n", commitID.String()))), + }, + } { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + repo.GitAlternateObjectDirectories = tc.altDirs + commits, err := getCommits(t, ctx, client, &gitalypb.FindCommitsRequest{ + Repository: repo, + Revision: []byte(commitID.String()), + Limit: tc.limit, + }) + testhelper.RequireGrpcError(t, tc.expectedErr, err) require.Len(t, commits, tc.expectedCount) }) } |