Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dotnet/runtime.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrennan <brecon@microsoft.com>2022-03-17 06:37:28 +0300
committerGitHub <noreply@github.com>2022-03-17 06:37:28 +0300
commit61492c80a0e06b953e85d983f86ef9cbaa82931e (patch)
tree81b2a8b652af8960036eb880a4c3dcec64529cc3
parent82239a2a0def33c00159c0372226427099cfa355 (diff)
Fix cancel in PipeReader.ReadAtLeastAsyncbrecon/readatleast
-rw-r--r--src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs9
-rw-r--r--src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs31
2 files changed, 37 insertions, 3 deletions
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
index d9b0b8c7a99..0c743f4c74a 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
@@ -347,7 +347,7 @@ namespace System.IO.Pipelines
ValueTask<FlushResult> result;
lock (SyncObj)
{
- PrepareFlush(out completionData, out result, cancellationToken);
+ PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(ReaderScheduler, completionData);
@@ -355,7 +355,7 @@ namespace System.IO.Pipelines
return result;
}
- private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
+ private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
var completeReader = CommitUnsynchronized();
@@ -691,6 +691,9 @@ namespace System.IO.Pipelines
// We also need to flip the reading state off
_operationState.EndRead();
+
+ // Begin read again to wire up cancellation token
+ _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
}
// If the writer is currently paused and we are about the wait for more data then this would deadlock.
@@ -1057,7 +1060,7 @@ namespace System.IO.Pipelines
WriteMultiSegment(source.Span);
}
- PrepareFlush(out completionData, out result, cancellationToken);
+ PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(ReaderScheduler, completionData);
diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs
index c12d3b88253..64762de5ec8 100644
--- a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs
+++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs
@@ -162,5 +162,36 @@ namespace System.IO.Pipelines.Tests
Assert.True(result.IsCanceled);
PipeReader.AdvanceTo(buffer.End);
}
+
+ [Fact]
+ public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData()
+ {
+ CancellationTokenSource cts = new CancellationTokenSource();
+ ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(1, cts.Token);
+ cts.Cancel();
+ return Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ }
+
+ [Fact]
+ public async Task ReadAtLeastAsyncCancelableAfterReadingSome()
+ {
+ CancellationTokenSource cts = new CancellationTokenSource();
+ await Pipe.WriteAsync(new byte[10], default);
+ ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11, cts.Token);
+ cts.Cancel();
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ }
+
+ [Fact]
+ public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead()
+ {
+ CancellationTokenSource cts = new CancellationTokenSource();
+ await Pipe.WriteAsync(new byte[10], default);
+ ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(12, cts.Token);
+ // Write, but not enough to unblock ReadAtLeastAsync
+ await Pipe.WriteAsync(new byte[1], default);
+ cts.Cancel();
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await task);
+ }
}
}