diff options
author | Brennan <brecon@microsoft.com> | 2022-03-17 06:37:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-17 06:37:28 +0300 |
commit | 61492c80a0e06b953e85d983f86ef9cbaa82931e (patch) | |
tree | 81b2a8b652af8960036eb880a4c3dcec64529cc3 | |
parent | 82239a2a0def33c00159c0372226427099cfa355 (diff) |
Fix cancel in PipeReader.ReadAtLeastAsyncbrecon/readatleast
-rw-r--r-- | src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs | 9 | ||||
-rw-r--r-- | src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs | 31 |
2 files changed, 37 insertions, 3 deletions
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index d9b0b8c7a99..0c743f4c74a 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -347,7 +347,7 @@ namespace System.IO.Pipelines ValueTask<FlushResult> result; lock (SyncObj) { - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, completionData); @@ -355,7 +355,7 @@ namespace System.IO.Pipelines return result; } - private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken) + private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken) { var completeReader = CommitUnsynchronized(); @@ -691,6 +691,9 @@ namespace System.IO.Pipelines // We also need to flip the reading state off _operationState.EndRead(); + + // Begin read again to wire up cancellation token + _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); } // If the writer is currently paused and we are about the wait for more data then this would deadlock. @@ -1057,7 +1060,7 @@ namespace System.IO.Pipelines WriteMultiSegment(source.Span); } - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, completionData); diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs index c12d3b88253..64762de5ec8 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs @@ -162,5 +162,36 @@ namespace System.IO.Pipelines.Tests Assert.True(result.IsCanceled); PipeReader.AdvanceTo(buffer.End); } + + [Fact] + public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData() + { + CancellationTokenSource cts = new CancellationTokenSource(); + ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(1, cts.Token); + cts.Cancel(); + return Assert.ThrowsAsync<OperationCanceledException>(async () => await task); + } + + [Fact] + public async Task ReadAtLeastAsyncCancelableAfterReadingSome() + { + CancellationTokenSource cts = new CancellationTokenSource(); + await Pipe.WriteAsync(new byte[10], default); + ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11, cts.Token); + cts.Cancel(); + await Assert.ThrowsAsync<OperationCanceledException>(async () => await task); + } + + [Fact] + public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead() + { + CancellationTokenSource cts = new CancellationTokenSource(); + await Pipe.WriteAsync(new byte[10], default); + ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(12, cts.Token); + // Write, but not enough to unblock ReadAtLeastAsync + await Pipe.WriteAsync(new byte[1], default); + cts.Cancel(); + await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await task); + } } } |