Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-11 09:32:26 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-13 14:18:10 +0300
commit065ad266cc5732a7214d97ba85568d7d74465140 (patch)
tree3c3ec430a3813d26a935f3e7ff3fb02068c5eeb9
parent12016efaf99cd593d3dc6b11536cb0e5dc2fe02c (diff)
command: Reliably propagate read errors on context cancellation
On context cancellation, we will send a signal to the command's process and then wait for it to exit. This means that the standard streams are only getting closed _after_ we have sent a signal as part of `Wait()`. Consequentially, we have a race between the process exiting due to the signal and thus gracefully closing its standard streams, and ourselves to close the streams and can thus lead to downstream readers of stdout to never receive an error on their reader. Fx this by tearing down standard streams before we deliver the signal so that read errors on stdout are propagated properly. Changelog: fixed
-rw-r--r--internal/command/command.go48
-rw-r--r--internal/command/command_test.go18
2 files changed, 45 insertions, 21 deletions
diff --git a/internal/command/command.go b/internal/command/command.go
index f23ead03c..53bf24298 100644
--- a/internal/command/command.go
+++ b/internal/command/command.go
@@ -146,6 +146,7 @@ type Command struct {
waitError error
waitOnce sync.Once
+ teardownOnce sync.Once
processExitedCh chan struct{}
finalizers []func(context.Context, *Command)
@@ -310,6 +311,14 @@ func New(ctx context.Context, nameAndArgs []string, opts ...Option) (*Command, e
go func() {
select {
case <-ctx.Done():
+ // Before we kill the child process we need to close the process' standard streams. If
+ // we don't, it may happen that the signal gets delivered and that the process exits
+ // before we close the streams in `command.Wait()`. This would cause downstream readers
+ // to potentially miss those errors when reading stdout.
+ if featureflag.CommandCloseStdout.IsEnabled(ctx) {
+ command.teardownStandardStreams()
+ }
+
// If the context has been cancelled and we didn't explicitly reap
// the child process then we need to manually kill it and release
// all associated resources.
@@ -378,23 +387,7 @@ func (c *Command) Wait() error {
func (c *Command) wait() {
defer close(c.processExitedCh)
- if c.writer != nil {
- // Prevent the command from blocking on waiting for stdin to be closed
- c.writer.Close()
- }
-
- if c.reader != nil {
- if featureflag.CommandCloseStdout.IsEnabled(c.context) {
- // Close stdout of the command. This causes us to receive an error when trying to consume the
- // output and will also cause an error when stdout hasn't been fully consumed at the time of
- // calling `Wait()`.
- c.reader.Close()
- } else {
- // Prevent the command from blocking on writing to its stdout.
- _, _ = io.Copy(io.Discard, c.reader)
- }
- }
-
+ c.teardownStandardStreams()
c.waitError = c.cmd.Wait()
// If the context is done, the process was likely terminated due to it. If so,
@@ -426,6 +419,27 @@ func (c *Command) wait() {
}
}
+func (c *Command) teardownStandardStreams() {
+ c.teardownOnce.Do(func() {
+ if c.writer != nil {
+ // Prevent the command from blocking on waiting for stdin to be closed
+ c.writer.Close()
+ }
+
+ if c.reader != nil {
+ if featureflag.CommandCloseStdout.IsEnabled(c.context) {
+ // Close stdout of the command. This causes us to receive an error when trying to consume the
+ // output and will also cause an error when stdout hasn't been fully consumed at the time of
+ // calling `Wait()`.
+ c.reader.Close()
+ } else {
+ // Prevent the command from blocking on writing to its stdout.
+ _, _ = io.Copy(io.Discard, c.reader)
+ }
+ }
+ })
+}
+
func (c *Command) logProcessComplete() {
exitCode := 0
if c.waitError != nil {
diff --git a/internal/command/command_test.go b/internal/command/command_test.go
index 6e036c11b..ce26ed88b 100644
--- a/internal/command/command_test.go
+++ b/internal/command/command_test.go
@@ -12,6 +12,7 @@ import (
"runtime"
"strings"
"sync"
+ "syscall"
"testing"
"time"
@@ -242,11 +243,20 @@ func testCommandWaitContextCancellationKillsCommand(t *testing.T, ctx context.Co
require.Equal(t, &fs.PathError{
Op: "read", Path: "|0", Err: os.ErrClosed,
}, err)
- }
- err = cmd.Wait()
- require.Equal(t, err, fmt.Errorf("signal: terminated: %w", context.Canceled))
- require.ErrorIs(t, err, context.Canceled)
+ err = cmd.Wait()
+ // Depending on whether the closed file descriptor or the sent signal arrive first, cat(1) may fail
+ // either with SIGPIPE or with SIGKILL.
+ require.Contains(t, []error{
+ fmt.Errorf("signal: %s: %w", syscall.SIGTERM, context.Canceled),
+ fmt.Errorf("signal: %s: %w", syscall.SIGPIPE, context.Canceled),
+ }, err)
+ require.ErrorIs(t, err, context.Canceled)
+ } else {
+ err = cmd.Wait()
+ require.Equal(t, err, fmt.Errorf("signal: terminated: %w", context.Canceled))
+ require.ErrorIs(t, err, context.Canceled)
+ }
}
func TestNew_setupStdin(t *testing.T) {