diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-09-13 20:48:34 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-09-13 20:48:34 +0300 |
commit | 5688f0d78dfb631f600868b80e8bfa9f13834f65 (patch) | |
tree | faf3088c8feae09623b561883d6ce3dab4c916dc | |
parent | f1d2afbee25a73855d931ad2d406e04e2062dc96 (diff) | |
parent | 065ad266cc5732a7214d97ba85568d7d74465140 (diff) |
Merge branch 'pks-command-improve-stdout-error-handling' into 'master'
command: Improve error handling for stdout
Closes #5021
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6315
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | internal/command/command.go | 43 | ||||
-rw-r--r-- | internal/command/command_test.go | 57 | ||||
-rw-r--r-- | internal/featureflag/ff_command_close_stdout.go | 18 | ||||
-rw-r--r-- | internal/gitaly/service/remote/find_remote_root_ref_test.go | 4 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 2 |
5 files changed, 109 insertions, 15 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index 3fc063a72..53bf24298 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -137,7 +137,7 @@ const ( // terminated and reaped automatically when the context.Context that // created it is canceled. type Command struct { - reader io.Reader + reader io.ReadCloser writer io.WriteCloser stderrBuffer *stderrBuffer cmd *exec.Cmd @@ -146,6 +146,7 @@ type Command struct { waitError error waitOnce sync.Once + teardownOnce sync.Once processExitedCh chan struct{} finalizers []func(context.Context, *Command) @@ -310,6 +311,14 @@ func New(ctx context.Context, nameAndArgs []string, opts ...Option) (*Command, e go func() { select { case <-ctx.Done(): + // Before we kill the child process we need to close the process' standard streams. If + // we don't, it may happen that the signal gets delivered and that the process exits + // before we close the streams in `command.Wait()`. This would cause downstream readers + // to potentially miss those errors when reading stdout. + if featureflag.CommandCloseStdout.IsEnabled(ctx) { + command.teardownStandardStreams() + } + // If the context has been cancelled and we didn't explicitly reap // the child process then we need to manually kill it and release // all associated resources. @@ -378,16 +387,7 @@ func (c *Command) Wait() error { func (c *Command) wait() { defer close(c.processExitedCh) - if c.writer != nil { - // Prevent the command from blocking on waiting for stdin to be closed - c.writer.Close() - } - - if c.reader != nil { - // Prevent the command from blocking on writing to its stdout. - _, _ = io.Copy(io.Discard, c.reader) - } - + c.teardownStandardStreams() c.waitError = c.cmd.Wait() // If the context is done, the process was likely terminated due to it. If so, @@ -419,6 +419,27 @@ func (c *Command) wait() { } } +func (c *Command) teardownStandardStreams() { + c.teardownOnce.Do(func() { + if c.writer != nil { + // Prevent the command from blocking on waiting for stdin to be closed + c.writer.Close() + } + + if c.reader != nil { + if featureflag.CommandCloseStdout.IsEnabled(c.context) { + // Close stdout of the command. This causes us to receive an error when trying to consume the + // output and will also cause an error when stdout hasn't been fully consumed at the time of + // calling `Wait()`. + c.reader.Close() + } else { + // Prevent the command from blocking on writing to its stdout. + _, _ = io.Copy(io.Discard, c.reader) + } + } + }) +} + func (c *Command) logProcessComplete() { exitCode := 0 if c.waitError != nil { diff --git a/internal/command/command_test.go b/internal/command/command_test.go index 8f7d51220..ce26ed88b 100644 --- a/internal/command/command_test.go +++ b/internal/command/command_test.go @@ -5,11 +5,14 @@ import ( "context" "fmt" "io" + "io/fs" + "os" "os/exec" "path/filepath" "runtime" "strings" "sync" + "syscall" "testing" "time" @@ -19,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/cgroups" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -210,7 +214,13 @@ func TestNew_spawnTimeout(t *testing.T) { func TestCommand_Wait_contextCancellationKillsCommand(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(testhelper.Context(t)) + testhelper.NewFeatureSets(featureflag.CommandCloseStdout).Run(t, testCommandWaitContextCancellationKillsCommand) +} + +func testCommandWaitContextCancellationKillsCommand(t *testing.T, ctx context.Context) { + t.Parallel() + + ctx, cancel := context.WithCancel(ctx) cmd, err := New(ctx, []string{"cat", "/dev/urandom"}, WithSetupStdout()) require.NoError(t, err) @@ -222,9 +232,31 @@ func TestCommand_Wait_contextCancellationKillsCommand(t *testing.T) { cancel() - err = cmd.Wait() - require.Equal(t, err, fmt.Errorf("signal: terminated: %w", context.Canceled)) - require.ErrorIs(t, err, context.Canceled) + // We only verify that reading the command causes us to fail in an expected way when the feature flag is + // enabled. The case where the feature flag is disabled is indeterministic. + if featureflag.CommandCloseStdout.IsEnabled(ctx) { + // Verify that the cancelled context causes us to receive a read error when consuming the command's output. In + // older days this used to succeed just fine, which had the consequence that we obtain partial stdout of the + // killed process without noticing that it in fact got killed. + _, err = io.ReadAll(cmd) + + require.Equal(t, &fs.PathError{ + Op: "read", Path: "|0", Err: os.ErrClosed, + }, err) + + err = cmd.Wait() + // Depending on whether the closed file descriptor or the sent signal arrive first, cat(1) may fail + // either with SIGPIPE or with SIGKILL. + require.Contains(t, []error{ + fmt.Errorf("signal: %s: %w", syscall.SIGTERM, context.Canceled), + fmt.Errorf("signal: %s: %w", syscall.SIGPIPE, context.Canceled), + }, err) + require.ErrorIs(t, err, context.Canceled) + } else { + err = cmd.Wait() + require.Equal(t, err, fmt.Errorf("signal: terminated: %w", context.Canceled)) + require.ErrorIs(t, err, context.Canceled) + } } func TestNew_setupStdin(t *testing.T) { @@ -270,6 +302,23 @@ func TestCommand_read(t *testing.T) { }) } +func TestCommand_readPartial(t *testing.T) { + t.Parallel() + testhelper.NewFeatureSets(featureflag.CommandCloseStdout).Run(t, testCommandReadPartial) +} + +func testCommandReadPartial(t *testing.T, ctx context.Context) { + t.Parallel() + + cmd, err := New(ctx, []string{"head", "-c", "1000000", "/dev/urandom"}, WithSetupStdout()) + require.NoError(t, err) + if featureflag.CommandCloseStdout.IsEnabled(ctx) { + require.EqualError(t, cmd.Wait(), "signal: broken pipe") + } else { + require.NoError(t, cmd.Wait()) + } +} + func TestNew_nulByteInArgument(t *testing.T) { t.Parallel() diff --git a/internal/featureflag/ff_command_close_stdout.go b/internal/featureflag/ff_command_close_stdout.go new file mode 100644 index 000000000..d7670b14b --- /dev/null +++ b/internal/featureflag/ff_command_close_stdout.go @@ -0,0 +1,18 @@ +package featureflag + +// CommandCloseStdout causes the `command` package to close stdout of commands when calling `Wait()` on them instead of +// discarding their output. This has two consequences: +// +// - We notice when a command's output has not been fully read when calling `Wait()`. Previously, such a call would +// have succeeded. +// +// - We notice when a command has been killed when trying to consume its output. Previously, fully consuming the output +// would succeed, which can lead to partially-read output without indication of an error. +// +// So overall, the new behaviour is more thorough and will cause us to detect errors in stdout handling more readily. +var CommandCloseStdout = NewFeatureFlag( + "command_close_stdout", + "v16.4.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/5558", + false, +) diff --git a/internal/gitaly/service/remote/find_remote_root_ref_test.go b/internal/gitaly/service/remote/find_remote_root_ref_test.go index 4e7be7230..8b8198672 100644 --- a/internal/gitaly/service/remote/find_remote_root_ref_test.go +++ b/internal/gitaly/service/remote/find_remote_root_ref_test.go @@ -2,6 +2,7 @@ package remote import ( "fmt" + "io" "path/filepath" "testing" @@ -201,7 +202,10 @@ func TestServer_findRemoteRootRefCmd(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { cmd, err := s.findRemoteRootRefCmd(ctx, tc.request) require.Equal(t, tc.expectedErr, err) + if err == nil { + _, err := io.ReadAll(cmd) + require.NoError(t, err) require.NoError(t, cmd.Wait()) require.Subset(t, cmd.Env(), tc.expectedConfig) } diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index c7ddc601e..b47e0257a 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -239,6 +239,8 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.MailmapOptions, rand.Int()%2 == 0) // Randomly enable either Git v2.41 or 2.42. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV242, rand.Int()%2 == 0) + // This feature flag sits in `command.Wait()` and is thus getting executed in a ton of tests. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.CommandCloseStdout, rand.Int()%2 == 0) for _, opt := range opts { ctx = opt(ctx) |