diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-08-28 15:51:44 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-08-28 15:51:44 +0300 |
commit | 357e09f248881f0461a5ec8989ca01ccd9b2a860 (patch) | |
tree | 5d966b03a61868b5a72b6b524b535bd1290c84b1 | |
parent | 3f95a789b84a0fac1ecd8735573bc21c0097f810 (diff) | |
parent | 6c16216e8a25b1d84fbf0fa13536ef4d3ac3d978 (diff) |
Merge branch 'smh-fix-flaky-updater-error' into 'master'
Return the same error from Updater after closing
Closes #5476
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6260
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
-rw-r--r-- | internal/git/updateref/updateref.go | 101 | ||||
-rw-r--r-- | internal/git/updateref/updateref_test.go | 229 | ||||
-rw-r--r-- | internal/gitaly/service/operations/user_create_branch_test.go | 6 |
3 files changed, 250 insertions, 86 deletions
diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go index aab4fddc4..a54e2717e 100644 --- a/internal/git/updateref/updateref.go +++ b/internal/git/updateref/updateref.go @@ -13,6 +13,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" ) +// errClosed is returned when accessing an updater that has already been closed. +var errClosed = errors.New("closed") + // AlreadyLockedError indicates a reference cannot be locked because another // process has already locked it. type AlreadyLockedError struct { @@ -32,6 +35,25 @@ func (e AlreadyLockedError) ErrorMetadata() []structerr.MetadataItem { } } +// ReferenceAlreadyExistsError is returned when attempting to create a reference +// that already exists. +type ReferenceAlreadyExistsError struct { + // ReferenceName is the name of the reference that already exists. + ReferenceName string +} + +func (e ReferenceAlreadyExistsError) Error() string { + return "reference already exists" +} + +// ErrorMetadata implements the `structerr.ErrorMetadater` interface and provides the name of the +// reference that already existed. +func (e ReferenceAlreadyExistsError) ErrorMetadata() []structerr.MetadataItem { + return []structerr.MetadataItem{ + {Key: "reference", Value: e.ReferenceName}, + } +} + // InvalidReferenceFormatError indicates a reference name was invalid. type InvalidReferenceFormatError struct { // ReferenceName is the invalid reference name. @@ -176,8 +198,6 @@ const ( // accepts reference changes until the current transaction is committed and // a new one started. statePrepared state = "prepared" - // stateClosed means the updater has been closed and is no longer usable. - stateClosed state = "closed" ) // invalidStateTransitionError is returned when the updater is used incorrectly. @@ -209,6 +229,7 @@ func (err invalidStateTransitionError) Error() string { type Updater struct { repo git.RepositoryExecutor cmd *command.Command + closeErr error stdout *bufio.Reader stderr *bytes.Buffer objectHash git.ObjectHash @@ -294,9 +315,12 @@ func New(ctx context.Context, repo git.RepositoryExecutor, opts ...UpdaterOpt) ( // expectState returns an error and closes the updater if it is not in the expected state. func (u *Updater) expectState(expected state) error { + if u.closeErr != nil { + return u.closeErr + } + if err := u.checkState(expected); err != nil { - _ = u.Close() - return err + return u.closeWithError(err) } return nil @@ -387,17 +411,44 @@ func (u *Updater) Commit() error { // Close closes the updater and aborts a possible open transaction. No changes will be written // to disk, all lockfiles will be cleaned up and the process will exit. func (u *Updater) Close() error { - u.state = stateClosed + return u.closeWithError(nil) +} + +// closeWithError closes the updater with the given error. The passed in error is only used +// if the updater closes successfully. This is used to close the Updater with errors raised +// by our logic when the command itself hasn't errored. All subsequent method calls return +// the error returned from first closeWithError call. +func (u *Updater) closeWithError(closeErr error) error { + if u.closeErr != nil { + return u.closeErr + } if err := u.cmd.Wait(); err != nil { - return fmt.Errorf("closing updater: %w", err) + err = structerr.New("%w", err).WithMetadataItems( + structerr.MetadataItem{Key: "stderr", Value: u.stderr.String()}, + structerr.MetadataItem{Key: "close_error", Value: closeErr}, + ) + if parsedErr := u.parseStderr(); parsedErr != nil { + // If stderr contained a specific error, return it instead. + err = parsedErr + } + + u.closeErr = err + return err + } + + if closeErr != nil { + u.closeErr = closeErr + return closeErr } + + u.closeErr = errClosed return nil } func (u *Updater) write(format string, args ...interface{}) error { if _, err := fmt.Fprintf(u.cmd, format, args...); err != nil { - return u.handleIOError(err) + return u.closeWithError(err) } return nil @@ -406,6 +457,7 @@ func (u *Updater) write(format string, args ...interface{}) error { var ( refLockedRegex = regexp.MustCompile(`^fatal: (prepare|commit): cannot lock ref '(.+?)': Unable to create '.*': File exists.`) refInvalidFormatRegex = regexp.MustCompile(`^fatal: invalid ref format: (.*)\n$`) + referenceAlreadyExistsRegex = regexp.MustCompile(`^fatal: .*: cannot lock ref '(.*)': reference already exists\n$`) referenceExistsConflictRegex = regexp.MustCompile(`^fatal: .*: cannot lock ref '(.*)': '(.*)' exists; cannot create '.*'\n$`) inTransactionConflictRegex = regexp.MustCompile(`^fatal: .*: cannot lock ref '.*': cannot process '(.*)' and '(.*)' at the same time\n$`) nonExistentObjectRegex = regexp.MustCompile(`^fatal: .*: cannot update ref '.*': trying to write ref '(.*)' with nonexistent object (.*)\n$`) @@ -425,37 +477,17 @@ func (u *Updater) setState(state string) error { // raised. line, err := u.stdout.ReadString('\n') if err != nil { - return u.handleIOError(fmt.Errorf("state update to %q failed: %w", state, err)) + return u.closeWithError(fmt.Errorf("state update to %q failed: %w", state, err)) } if line != fmt.Sprintf("%s: ok\n", state) { - _ = u.Close() - return fmt.Errorf("state update to %q not successful: expected ok, got %q", state, line) + return u.closeWithError(fmt.Errorf("state update to %q not successful: expected ok, got %q", state, line)) } return nil } -// handleIOError handles errors after reading from or writing to git-update-ref(1) has failed. -// It makes sure to properly tear down the process so that the stderr gets synchronized and handles -// well-known errors. If the error message is not a well-known error then this function returns the -// fallback error provided by the caller. -func (u *Updater) handleIOError(fallbackErr error) error { - // We need to explicitly cancel the command here and wait for it to terminate such that we - // can retrieve the command's stderr in a race-free manner. - // - // Furthermore, if I/O has failed because we cancelled the process then we don't want to - // return a converted error, but instead want to return the actual context cancellation - // error. - if err := u.Close(); err != nil { - switch { - case errors.Is(err, context.Canceled): - return err - case errors.Is(err, context.DeadlineExceeded): - return err - } - } - +func (u *Updater) parseStderr() error { stderr := u.stderr.Bytes() matches := refLockedRegex.FindSubmatch(stderr) @@ -509,5 +541,12 @@ func (u *Updater) handleIOError(fallbackErr error) error { } } - return structerr.New("%w", fallbackErr).WithMetadata("stderr", string(stderr)) + matches = referenceAlreadyExistsRegex.FindSubmatch(stderr) + if len(matches) > 1 { + return ReferenceAlreadyExistsError{ + ReferenceName: string(matches[1]), + } + } + + return nil } diff --git a/internal/git/updateref/updateref_test.go b/internal/git/updateref/updateref_test.go index 0a409acc2..869e7f04b 100644 --- a/internal/git/updateref/updateref_test.go +++ b/internal/git/updateref/updateref_test.go @@ -3,8 +3,9 @@ package updateref import ( "context" "encoding/hex" + "errors" "fmt" - "io" + "io/fs" "os" "path/filepath" "testing" @@ -65,6 +66,28 @@ func TestUpdater_create(t *testing.T) { require.Equal(t, gittest.ResolveRevision(t, cfg, repoPath, "refs/heads/_create"), commitID) } +func TestUpdater_referenceAlreadyExists(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + cfg, _, repoPath, updater := setupUpdater(t, ctx) + expectedErr := ReferenceAlreadyExistsError{ReferenceName: "refs/heads/main"} + defer func() { require.Equal(t, expectedErr, updater.Close()) }() + + commitID := gittest.WriteCommit(t, cfg, repoPath) + + // Create the first branch. + require.NoError(t, updater.Start()) + require.NoError(t, updater.Create("refs/heads/main", commitID)) + require.NoError(t, updater.Commit()) + + // Attempt to create the same branch. + require.NoError(t, updater.Start()) + require.NoError(t, updater.Create("refs/heads/main", commitID)) + require.Equal(t, expectedErr, updater.Commit()) +} + func TestUpdater_nonCommitObject(t *testing.T) { t.Parallel() @@ -143,7 +166,9 @@ func TestUpdater_properErrorOnWriteFailure(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { cfg, _, repoPath, updater := setupUpdater(t, ctx) - defer func() { require.ErrorContains(t, updater.Close(), "closing updater: exit status 128") }() + + expectedErr := InvalidReferenceFormatError{ReferenceName: referenceName} + defer func() { require.Equal(t, expectedErr, updater.Close()) }() commitID := gittest.WriteCommit(t, cfg, repoPath) @@ -158,7 +183,7 @@ func TestUpdater_properErrorOnWriteFailure(t *testing.T) { continue } - require.Equal(t, InvalidReferenceFormatError{ReferenceName: referenceName}, err) + require.Equal(t, expectedErr, err) break } }) @@ -234,7 +259,13 @@ func TestUpdater_fileDirectoryConflict(t *testing.T) { t.Run(method.desc, func(t *testing.T) { t.Run("different transaction", func(t *testing.T) { cfg, _, repoPath, updater := setupUpdater(t, ctx) - defer func() { require.ErrorContains(t, updater.Close(), "closing updater: exit status 128") }() + + expectedErr := FileDirectoryConflictError{ + ExistingReferenceName: tc.firstReference.String(), + ConflictingReferenceName: tc.secondReference.String(), + } + + defer func() { require.Equal(t, expectedErr, updater.Close()) }() commitID := gittest.WriteCommit(t, cfg, repoPath) @@ -245,15 +276,17 @@ func TestUpdater_fileDirectoryConflict(t *testing.T) { require.NoError(t, updater.Start()) require.NoError(t, updater.Create(tc.secondReference, commitID)) - require.Equal(t, FileDirectoryConflictError{ - ExistingReferenceName: tc.firstReference.String(), - ConflictingReferenceName: tc.secondReference.String(), - }, method.finish(updater)) + require.Equal(t, expectedErr, method.finish(updater)) }) t.Run("same transaction", func(t *testing.T) { cfg, _, repoPath, updater := setupUpdater(t, ctx) - defer func() { require.ErrorContains(t, updater.Close(), "closing updater: exit status 128") }() + + expectedErr := InTransactionConflictError{ + FirstReferenceName: tc.firstReference.String(), + SecondReferenceName: tc.secondReference.String(), + } + defer func() { require.Equal(t, expectedErr, updater.Close()) }() commitID := gittest.WriteCommit(t, cfg, repoPath) @@ -261,10 +294,7 @@ func TestUpdater_fileDirectoryConflict(t *testing.T) { require.NoError(t, updater.Create(tc.firstReference, commitID)) require.NoError(t, updater.Create(tc.secondReference, commitID)) - require.Equal(t, InTransactionConflictError{ - FirstReferenceName: tc.firstReference.String(), - SecondReferenceName: tc.secondReference.String(), - }, method.finish(updater)) + require.Equal(t, expectedErr, method.finish(updater)) }) }) } @@ -277,23 +307,104 @@ func TestUpdater_invalidStateTransitions(t *testing.T) { ctx := testhelper.Context(t) - cfg, repo, repoPath, updater := setupUpdater(t, ctx) - defer testhelper.MustClose(t, updater) + for _, tc := range []struct { + desc string + expectedErr error + perform func(*testing.T, *Updater) error + }{ + { + desc: "update before starting", + perform: func(t *testing.T, u *Updater) error { + return u.Update("", "", "") + }, + expectedErr: invalidStateTransitionError{expected: stateStarted, actual: stateIdle}, + }, + { + desc: "create before starting", + perform: func(t *testing.T, u *Updater) error { + return u.Create("", "") + }, + expectedErr: invalidStateTransitionError{expected: stateStarted, actual: stateIdle}, + }, + { + desc: "delete before starting", + perform: func(t *testing.T, u *Updater) error { + return u.Delete("") + }, + expectedErr: invalidStateTransitionError{expected: stateStarted, actual: stateIdle}, + }, + { + desc: "prepare before starting", + perform: func(t *testing.T, u *Updater) error { + return u.Prepare() + }, + expectedErr: invalidStateTransitionError{expected: stateStarted, actual: stateIdle}, + }, + { + desc: "preparing before starting", + perform: func(t *testing.T, u *Updater) error { + return u.Prepare() + }, + expectedErr: invalidStateTransitionError{expected: stateStarted, actual: stateIdle}, + }, + { + desc: "preparing when prepared", + perform: func(t *testing.T, u *Updater) error { + require.NoError(t, u.Start()) + require.NoError(t, u.Prepare()) + return u.Prepare() + }, + expectedErr: invalidStateTransitionError{expected: stateStarted, actual: statePrepared}, + }, + { + desc: "starting when started", + perform: func(t *testing.T, u *Updater) error { + require.NoError(t, u.Start()) + return u.Start() + }, + expectedErr: invalidStateTransitionError{expected: stateIdle, actual: stateStarted}, + }, + { + desc: "starting when prepared", + perform: func(t *testing.T, u *Updater) error { + require.NoError(t, u.Start()) + require.NoError(t, u.Prepare()) + return u.Start() + }, + expectedErr: invalidStateTransitionError{expected: stateIdle, actual: statePrepared}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + _, repo, _, updater := setupUpdater(t, ctx) + defer func() { require.Equal(t, tc.expectedErr, updater.Close()) }() - commitID := gittest.WriteCommit(t, cfg, repoPath) + require.Equal(t, tc.expectedErr, tc.perform(t, updater)) - require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: stateIdle}, updater.Update("refs/heads/main", commitID, "")) - require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: stateClosed}, updater.Create("refs/heads/main", commitID)) - require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: stateClosed}, updater.Delete("refs/heads/main")) - require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: stateClosed}, updater.Prepare()) - require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: stateClosed}, updater.Commit()) - require.Equal(t, invalidStateTransitionError{expected: stateIdle, actual: stateClosed}, updater.Start()) - require.NoError(t, updater.Close()) + // Verify no references were created. + refs, err := repo.GetReferences(ctx) + require.NoError(t, err) + require.Empty(t, refs) + }) + } +} - // Verify no references were created. - refs, err := repo.GetReferences(ctx) - require.NoError(t, err) - require.Empty(t, refs) +func TestUpdater_allMethodsReturnSameError(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + _, _, _, updater := setupUpdater(t, ctx) + + expectedErr := invalidStateTransitionError{expected: stateStarted, actual: stateIdle} + defer func() { require.Equal(t, expectedErr, updater.Close()) }() + + require.Equal(t, expectedErr, updater.Update("", "", "")) + require.Equal(t, expectedErr, updater.Create("", "")) + require.Equal(t, expectedErr, updater.Delete("")) + require.Equal(t, expectedErr, updater.Prepare()) + require.Equal(t, expectedErr, updater.Commit()) + require.Equal(t, expectedErr, updater.Start()) + require.Equal(t, expectedErr, updater.Close()) } func TestUpdater_update(t *testing.T) { @@ -305,7 +416,9 @@ func TestUpdater_update(t *testing.T) { // The updater cancel should fail at the end of the test as the final operation is an error, // which results in closing the updater. - defer func() { require.ErrorContains(t, updater.Close(), "closing updater: exit status 128") }() + + var expectedErr error + defer func() { require.Equal(t, expectedErr, updater.Close()) }() oldCommitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) newCommitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(oldCommitID)) @@ -328,12 +441,13 @@ func TestUpdater_update(t *testing.T) { require.NoError(t, updater.Start()) require.NoError(t, updater.Update("refs/heads/main", newCommitID, otherCommitID)) - require.Equal(t, MismatchingStateError{ + expectedErr = MismatchingStateError{ ReferenceName: "refs/heads/main", ExpectedObjectID: otherCommitID.String(), ActualObjectID: oldCommitID.String(), - }, updater.Commit()) - require.Equal(t, invalidStateTransitionError{expected: stateIdle, actual: stateClosed}, updater.Start()) + } + require.Equal(t, expectedErr, updater.Commit()) + require.Equal(t, expectedErr, updater.Start()) require.Equal(t, gittest.ResolveRevision(t, cfg, repoPath, "refs/heads/main"), oldCommitID) } @@ -363,7 +477,9 @@ func TestUpdater_prepareLocksTransaction(t *testing.T) { ctx := testhelper.Context(t) cfg, _, repoPath, updater := setupUpdater(t, ctx) - defer testhelper.MustClose(t, updater) + + expectedErr := invalidStateTransitionError{expected: stateStarted, actual: statePrepared} + defer func() { require.Equal(t, expectedErr, updater.Close()) }() commitID := gittest.WriteCommit(t, cfg, repoPath) @@ -372,7 +488,7 @@ func TestUpdater_prepareLocksTransaction(t *testing.T) { require.NoError(t, updater.Prepare()) require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: statePrepared}, updater.Update("refs/heads/feature", commitID, "")) - require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: stateClosed}, updater.Commit()) + require.Equal(t, invalidStateTransitionError{expected: stateStarted, actual: statePrepared}, updater.Commit()) } func TestUpdater_invalidReferenceName(t *testing.T) { @@ -388,11 +504,11 @@ func TestUpdater_invalidReferenceName(t *testing.T) { repo := localrepo.NewTestRepo(t, cfg, repoProto, git.WithSkipHooks()) commitID := gittest.WriteCommit(t, cfg, repoPath) + const referenceName = `refs/heads\master` updater, err := New(ctx, repo) require.NoError(t, err) - defer func() { require.ErrorContains(t, updater.Close(), "closing updater: exit status 128") }() + defer func() { require.Equal(t, InvalidReferenceFormatError{ReferenceName: referenceName}, updater.Close()) }() - const referenceName = `refs/heads\master` require.NoError(t, updater.Start()) require.NoError(t, updater.Update(referenceName, commitID, "")) require.Equal(t, InvalidReferenceFormatError{ReferenceName: referenceName}, updater.Prepare()) @@ -504,17 +620,19 @@ func TestUpdater_contextCancellation(t *testing.T) { require.NoError(t, err) require.NoError(t, updater.Start()) - require.NoError(t, updater.Create("refs/heads/main", commitID)) // Force the update-ref process to terminate early by cancelling the context. childCancel() - // We should see that committing the update fails now. - require.Error(t, updater.Commit()) - - // And the reference should not have been created. - _, err = repo.ReadCommit(ctx, "refs/heads/main") - require.Equal(t, localrepo.ErrObjectNotFound, err) + for i := 0; true; i++ { + // Context cancellation happens asynchronously. Keep running commands until + // the context cancellation error is returned. + err := updater.Create(git.ReferenceName(fmt.Sprintf("refs/heads/ref-%d", i)), commitID) + if err != nil { + require.ErrorIs(t, err, context.Canceled) + return + } + } } func TestUpdater_cancel(t *testing.T) { @@ -523,7 +641,7 @@ func TestUpdater_cancel(t *testing.T) { ctx := testhelper.Context(t) cfg, repo, repoPath, firstUpdater := setupUpdater(t, ctx) - defer testhelper.MustClose(t, firstUpdater) + defer func() { require.Equal(t, errClosed, firstUpdater.Close()) }() gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) @@ -536,13 +654,15 @@ func TestUpdater_cancel(t *testing.T) { // because the reference is locked already. failingUpdater, err := New(ctx, repo) require.NoError(t, err) - defer func() { require.ErrorContains(t, failingUpdater.Close(), "closing updater: exit status 128") }() + + expectedErr := AlreadyLockedError{ + ReferenceName: "refs/heads/main", + } + defer func() { require.Equal(t, expectedErr, failingUpdater.Close()) }() require.NoError(t, failingUpdater.Start()) require.NoError(t, failingUpdater.Delete(git.ReferenceName("refs/heads/main"))) - require.Equal(t, AlreadyLockedError{ - ReferenceName: "refs/heads/main", - }, failingUpdater.Commit()) + require.Equal(t, expectedErr, failingUpdater.Commit()) // We now cancel the initial updater. Afterwards, it should be possible again to update the // ref because locks should have been released. @@ -589,16 +709,23 @@ func TestUpdater_capturesStderr(t *testing.T) { ctx := testhelper.Context(t) _, _, _, updater := setupUpdater(t, ctx) - defer func() { require.ErrorContains(t, updater.Close(), "closing updater: exit status 128") }() + var expectedErr error + defer func() { require.Equal(t, expectedErr, updater.Close()) }() require.NoError(t, updater.Start()) // fail the process by writing bad input _, err := updater.cmd.Write([]byte("garbage input")) require.NoError(t, err) - require.Equal(t, structerr.New("%w", fmt.Errorf("state update to %q failed: %w", "commit", io.EOF)).WithMetadata( - "stderr", "fatal: unknown command: garbage inputcommit\n", - ), updater.Commit()) + expectedErr = structerr.New("%w", updater.cmd.Wait()).WithMetadataItems( + structerr.MetadataItem{Key: "stderr", Value: "fatal: unknown command: garbage input\n"}, + structerr.MetadataItem{Key: "close_error", Value: &fs.PathError{ + Op: "write", + Path: "|1", + Err: errors.New("file already closed"), + }}, + ) + require.Equal(t, expectedErr, updater.Commit()) } func BenchmarkUpdater(b *testing.B) { diff --git a/internal/gitaly/service/operations/user_create_branch_test.go b/internal/gitaly/service/operations/user_create_branch_test.go index 6e58e4a48..8713bf5ec 100644 --- a/internal/gitaly/service/operations/user_create_branch_test.go +++ b/internal/gitaly/service/operations/user_create_branch_test.go @@ -1,7 +1,6 @@ package operations import ( - "io" "testing" "github.com/stretchr/testify/require" @@ -399,9 +398,8 @@ func TestUserCreateBranch_failure(t *testing.T) { startPoint: "master", user: gittest.TestUser, err: testhelper.WithInterceptedMetadata( - structerr.NewFailedPrecondition("reference update: state update to %q failed: %w", "prepare", io.EOF), - "stderr", - "fatal: prepare: cannot lock ref 'refs/heads/master': reference already exists\n", + structerr.NewFailedPrecondition("reference update: reference already exists"), + "reference", "refs/heads/master", ), }, { |