diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-11 09:32:26 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-13 14:18:10 +0300 |
commit | 065ad266cc5732a7214d97ba85568d7d74465140 (patch) | |
tree | 3c3ec430a3813d26a935f3e7ff3fb02068c5eeb9 | |
parent | 12016efaf99cd593d3dc6b11536cb0e5dc2fe02c (diff) |
command: Reliably propagate read errors on context cancellation
On context cancellation, we will send a signal to the command's process
and then wait for it to exit. This means that the standard streams are
only getting closed _after_ we have sent a signal as part of `Wait()`.
Consequentially, we have a race between the process exiting due to the
signal and thus gracefully closing its standard streams, and ourselves
to close the streams and can thus lead to downstream readers of stdout
to never receive an error on their reader.
Fx this by tearing down standard streams before we deliver the signal so
that read errors on stdout are propagated properly.
Changelog: fixed
-rw-r--r-- | internal/command/command.go | 48 | ||||
-rw-r--r-- | internal/command/command_test.go | 18 |
2 files changed, 45 insertions, 21 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index f23ead03c..53bf24298 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -146,6 +146,7 @@ type Command struct { waitError error waitOnce sync.Once + teardownOnce sync.Once processExitedCh chan struct{} finalizers []func(context.Context, *Command) @@ -310,6 +311,14 @@ func New(ctx context.Context, nameAndArgs []string, opts ...Option) (*Command, e go func() { select { case <-ctx.Done(): + // Before we kill the child process we need to close the process' standard streams. If + // we don't, it may happen that the signal gets delivered and that the process exits + // before we close the streams in `command.Wait()`. This would cause downstream readers + // to potentially miss those errors when reading stdout. + if featureflag.CommandCloseStdout.IsEnabled(ctx) { + command.teardownStandardStreams() + } + // If the context has been cancelled and we didn't explicitly reap // the child process then we need to manually kill it and release // all associated resources. @@ -378,23 +387,7 @@ func (c *Command) Wait() error { func (c *Command) wait() { defer close(c.processExitedCh) - if c.writer != nil { - // Prevent the command from blocking on waiting for stdin to be closed - c.writer.Close() - } - - if c.reader != nil { - if featureflag.CommandCloseStdout.IsEnabled(c.context) { - // Close stdout of the command. This causes us to receive an error when trying to consume the - // output and will also cause an error when stdout hasn't been fully consumed at the time of - // calling `Wait()`. - c.reader.Close() - } else { - // Prevent the command from blocking on writing to its stdout. - _, _ = io.Copy(io.Discard, c.reader) - } - } - + c.teardownStandardStreams() c.waitError = c.cmd.Wait() // If the context is done, the process was likely terminated due to it. If so, @@ -426,6 +419,27 @@ func (c *Command) wait() { } } +func (c *Command) teardownStandardStreams() { + c.teardownOnce.Do(func() { + if c.writer != nil { + // Prevent the command from blocking on waiting for stdin to be closed + c.writer.Close() + } + + if c.reader != nil { + if featureflag.CommandCloseStdout.IsEnabled(c.context) { + // Close stdout of the command. This causes us to receive an error when trying to consume the + // output and will also cause an error when stdout hasn't been fully consumed at the time of + // calling `Wait()`. + c.reader.Close() + } else { + // Prevent the command from blocking on writing to its stdout. + _, _ = io.Copy(io.Discard, c.reader) + } + } + }) +} + func (c *Command) logProcessComplete() { exitCode := 0 if c.waitError != nil { diff --git a/internal/command/command_test.go b/internal/command/command_test.go index 6e036c11b..ce26ed88b 100644 --- a/internal/command/command_test.go +++ b/internal/command/command_test.go @@ -12,6 +12,7 @@ import ( "runtime" "strings" "sync" + "syscall" "testing" "time" @@ -242,11 +243,20 @@ func testCommandWaitContextCancellationKillsCommand(t *testing.T, ctx context.Co require.Equal(t, &fs.PathError{ Op: "read", Path: "|0", Err: os.ErrClosed, }, err) - } - err = cmd.Wait() - require.Equal(t, err, fmt.Errorf("signal: terminated: %w", context.Canceled)) - require.ErrorIs(t, err, context.Canceled) + err = cmd.Wait() + // Depending on whether the closed file descriptor or the sent signal arrive first, cat(1) may fail + // either with SIGPIPE or with SIGKILL. + require.Contains(t, []error{ + fmt.Errorf("signal: %s: %w", syscall.SIGTERM, context.Canceled), + fmt.Errorf("signal: %s: %w", syscall.SIGPIPE, context.Canceled), + }, err) + require.ErrorIs(t, err, context.Canceled) + } else { + err = cmd.Wait() + require.Equal(t, err, fmt.Errorf("signal: terminated: %w", context.Canceled)) + require.ErrorIs(t, err, context.Canceled) + } } func TestNew_setupStdin(t *testing.T) { |