Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/corefx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStephen Toub <stoub@microsoft.com>2017-01-04 18:41:28 +0300
committerStephen Toub <stoub@microsoft.com>2017-01-04 18:41:28 +0300
commit856fb3dae9be9f30fc74aa152c64ff28bfbf6041 (patch)
treeafd40a14c33f13790b00c0da91564b51c1913b6c /src
parent64e0db74f9ba3fecb8e95f4149298e0355e065ce (diff)
Change SslStream to use Read/WriteAsync on inner stream
In .NET Core 1.x, SslStream's usage of Begin/EndRead/Write was actually using extension methods that just used TaskToApm.Begin/End with Read/WriteAsync. When the base Stream's Begin/EndRead/Write methods were added back, SslStream's usage of these methods started binding to the base methods, such that it was no longer using the inner stream's Read/WriteAsync. This change explicitly switches all of those calls (for both SslStream and NegotiateStream) to use Read/WriteAsync. A better solution is to actually add SslStream.Read/WriteAsync overrides that use async/await and tasks all the way down, rather than going back and forth between Tasks and APM, but that is a more complicated work item. This addressesses the immediate need (problems related to deadlocks on SslStream when read and writes are issued to run concurrently on the same stream) and can be undone or augmented subsequently when a better solution is available. This is at least as good performance-wise as what we had in .NET Core 1.x, and in some cases is a bit better, as we can avoid creating the IAsyncResult wrapper in some cases if the returned task is already completed.
Diffstat (limited to 'src')
-rw-r--r--src/System.Net.Security/src/System/Net/FixedSizeReader.cs24
-rw-r--r--src/System.Net.Security/src/System/Net/Security/InternalNegotiateStream.cs20
-rw-r--r--src/System.Net.Security/src/System/Net/Security/NegotiateStream.cs70
-rw-r--r--src/System.Net.Security/src/System/Net/Security/SslState.cs41
-rw-r--r--src/System.Net.Security/src/System/Net/Security/SslStreamInternal.cs21
-rw-r--r--src/System.Net.Security/src/System/Net/StreamFramer.cs19
6 files changed, 87 insertions, 108 deletions
diff --git a/src/System.Net.Security/src/System/Net/FixedSizeReader.cs b/src/System.Net.Security/src/System/Net/FixedSizeReader.cs
index c4eb5cb05f..d1ffb829b6 100644
--- a/src/System.Net.Security/src/System/Net/FixedSizeReader.cs
+++ b/src/System.Net.Security/src/System/Net/FixedSizeReader.cs
@@ -4,6 +4,7 @@
using System.Diagnostics;
using System.IO;
+using System.Threading.Tasks;
namespace System.Net
{
@@ -70,17 +71,26 @@ namespace System.Net
{
while (true)
{
- IAsyncResult ar = _transport.BeginRead(_request.Buffer, _request.Offset + _totalRead, _request.Count - _totalRead, s_readCallback, this);
- if (!ar.CompletedSynchronously)
+ int bytes;
+
+ Task<int> t = _transport.ReadAsync(_request.Buffer, _request.Offset + _totalRead, _request.Count - _totalRead);
+ if (t.IsCompleted)
+ {
+ bytes = t.GetAwaiter().GetResult();
+ }
+ else
{
+ IAsyncResult ar = TaskToApm.Begin(t, s_readCallback, this);
+ if (!ar.CompletedSynchronously)
+ {
#if DEBUG
- _request._DebugAsyncChain = ar;
+ _request._DebugAsyncChain = ar;
#endif
- break;
+ break;
+ }
+ bytes = TaskToApm.End<int>(ar);
}
- int bytes = _transport.EndRead(ar);
-
if (CheckCompletionBeforeNextRead(bytes))
{
break;
@@ -135,7 +145,7 @@ namespace System.Net
// Async completion.
try
{
- int bytes = reader._transport.EndRead(transportResult);
+ int bytes = TaskToApm.End<int>(transportResult);
if (reader.CheckCompletionBeforeNextRead(bytes))
{
diff --git a/src/System.Net.Security/src/System/Net/Security/InternalNegotiateStream.cs b/src/System.Net.Security/src/System/Net/Security/InternalNegotiateStream.cs
index c57008bb9c..b23fc4e27c 100644
--- a/src/System.Net.Security/src/System/Net/Security/InternalNegotiateStream.cs
+++ b/src/System.Net.Security/src/System/Net/Security/InternalNegotiateStream.cs
@@ -5,6 +5,7 @@
using System.Diagnostics;
using System.IO;
using System.Threading;
+using System.Threading.Tasks;
namespace System.Net.Security
{
@@ -167,13 +168,20 @@ namespace System.Net.Security
{
// prepare for the next request
asyncRequest.SetNextRequest(buffer, offset + chunkBytes, count - chunkBytes, null);
- IAsyncResult ar = InnerStream.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest);
- if (!ar.CompletedSynchronously)
+ Task t = InnerStream.WriteAsync(outBuffer, 0, encryptedBytes);
+ if (t.IsCompleted)
{
- return;
+ t.GetAwaiter().GetResult();
+ }
+ else
+ {
+ IAsyncResult ar = TaskToApm.Begin(t, s_writeCallback, asyncRequest);
+ if (!ar.CompletedSynchronously)
+ {
+ return;
+ }
+ TaskToApm.End(ar);
}
-
- InnerStream.EndWrite(ar);
}
else
{
@@ -384,7 +392,7 @@ namespace System.Net.Security
try
{
NegotiateStream negoStream = (NegotiateStream)asyncRequest.AsyncObject;
- negoStream.InnerStream.EndWrite(transportResult);
+ TaskToApm.End(transportResult);
if (asyncRequest.Count == 0)
{
// This was the last chunk.
diff --git a/src/System.Net.Security/src/System/Net/Security/NegotiateStream.cs b/src/System.Net.Security/src/System/Net/Security/NegotiateStream.cs
index 4ad3718532..e07a61c578 100644
--- a/src/System.Net.Security/src/System/Net/Security/NegotiateStream.cs
+++ b/src/System.Net.Security/src/System/Net/Security/NegotiateStream.cs
@@ -575,7 +575,7 @@ namespace System.Net.Security
if (!_negoState.CanGetSecureStream)
{
- return InnerStream.BeginRead(buffer, offset, count, asyncCallback, asyncState);
+ return TaskToApm.Begin(InnerStream.ReadAsync(buffer, offset, count), asyncCallback, asyncState);
}
BufferAsyncResult bufferResult = new BufferAsyncResult(this, buffer, offset, count, asyncState, asyncCallback);
@@ -597,7 +597,7 @@ namespace System.Net.Security
if (!_negoState.CanGetSecureStream)
{
- return InnerStream.EndRead(asyncResult);
+ return TaskToApm.End<int>(asyncResult);
}
@@ -647,7 +647,7 @@ namespace System.Net.Security
if (!_negoState.CanGetSecureStream)
{
- return InnerStream.BeginWrite(buffer, offset, count, asyncCallback, asyncState);
+ return TaskToApm.Begin(InnerStream.WriteAsync(buffer, offset, count), asyncCallback, asyncState);
}
BufferAsyncResult bufferResult = new BufferAsyncResult(this, buffer, offset, count, asyncState, asyncCallback);
@@ -670,7 +670,7 @@ namespace System.Net.Security
if (!_negoState.CanGetSecureStream)
{
- InnerStream.EndWrite(asyncResult);
+ TaskToApm.End(asyncResult);
return;
}
@@ -706,67 +706,5 @@ namespace System.Net.Security
}
#endif
}
-
- // ReadAsync - provide async read functionality.
- //
- // This method provides async read functionality. All we do is
- // call through to the Begin/EndRead methods.
- //
- // Input:
- //
- // buffer - Buffer to read into.
- // offset - Offset into the buffer where we're to read.
- // size - Number of bytes to read.
- // cancellationToken - Token used to request cancellation of the operation
- //
- // Returns:
- //
- // A Task<int> representing the read.
- public override Task<int> ReadAsync(byte[] buffer, int offset, int size, CancellationToken cancellationToken)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return Task.FromCanceled<int>(cancellationToken);
- }
-
- return Task.Factory.FromAsync(
- (bufferArg, offsetArg, sizeArg, callback, state) => ((NegotiateStream)state).BeginRead(bufferArg, offsetArg, sizeArg, callback, state),
- iar => ((NegotiateStream)iar.AsyncState).EndRead(iar),
- buffer,
- offset,
- size,
- this);
- }
-
- // WriteAsync - provide async write functionality.
- //
- // This method provides async write functionality. All we do is
- // call through to the Begin/EndWrite methods.
- //
- // Input:
- //
- // buffer - Buffer to write into.
- // offset - Offset into the buffer where we're to write.
- // size - Number of bytes to write.
- // cancellationToken - Token used to request cancellation of the operation
- //
- // Returns:
- //
- // A Task representing the write.
- public override Task WriteAsync(byte[] buffer, int offset, int size, CancellationToken cancellationToken)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return Task.FromCanceled<int>(cancellationToken);
- }
-
- return Task.Factory.FromAsync(
- (bufferArg, offsetArg, sizeArg, callback, state) => ((NegotiateStream)state).BeginWrite(bufferArg, offsetArg, sizeArg, callback, state),
- iar => ((NegotiateStream)iar.AsyncState).EndWrite(iar),
- buffer,
- offset,
- size,
- this);
- }
}
}
diff --git a/src/System.Net.Security/src/System/Net/Security/SslState.cs b/src/System.Net.Security/src/System/Net/Security/SslState.cs
index 7f89383269..cdc7f2f2ba 100644
--- a/src/System.Net.Security/src/System/Net/Security/SslState.cs
+++ b/src/System.Net.Security/src/System/Net/Security/SslState.cs
@@ -790,16 +790,23 @@ namespace System.Net.Security
else
{
asyncRequest.AsyncState = message;
- IAsyncResult ar = InnerStream.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest);
- if (!ar.CompletedSynchronously)
+ Task t = InnerStream.WriteAsync(message.Payload, 0, message.Size);
+ if (t.IsCompleted)
+ {
+ t.GetAwaiter().GetResult();
+ }
+ else
{
+ IAsyncResult ar = TaskToApm.Begin(t, s_writeCallback, asyncRequest);
+ if (!ar.CompletedSynchronously)
+ {
#if DEBUG
- asyncRequest._DebugAsyncChain = ar;
+ asyncRequest._DebugAsyncChain = ar;
#endif
- return;
+ return;
+ }
+ TaskToApm.End(ar);
}
-
- InnerStream.EndWrite(ar);
}
}
@@ -996,12 +1003,20 @@ namespace System.Net.Security
else
{
asyncRequest.AsyncState = exception;
- IAsyncResult ar = InnerStream.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest);
- if (!ar.CompletedSynchronously)
+ Task t = InnerStream.WriteAsync(message.Payload, 0, message.Size);
+ if (t.IsCompleted)
{
- return;
+ t.GetAwaiter().GetResult();
+ }
+ else
+ {
+ IAsyncResult ar = TaskToApm.Begin(t, s_writeCallback, asyncRequest);
+ if (!ar.CompletedSynchronously)
+ {
+ return;
+ }
+ TaskToApm.End(ar);
}
- InnerStream.EndWrite(ar);
}
exception.Throw();
@@ -1065,7 +1080,7 @@ namespace System.Net.Security
// Async completion.
try
{
- sslState.InnerStream.EndWrite(transportResult);
+ TaskToApm.End(transportResult);
// Special case for an error notification.
object asyncState = asyncRequest.AsyncState;
@@ -1833,14 +1848,14 @@ namespace System.Net.Security
CheckThrow(authSuccessCheck:true, shutdownCheck:true);
ProtocolToken message = Context.CreateShutdownToken();
- return InnerStream.BeginWrite(message.Payload, 0, message.Payload.Length, asyncCallback, asyncState);
+ return TaskToApm.Begin(InnerStream.WriteAsync(message.Payload, 0, message.Payload.Length), asyncCallback, asyncState);
}
internal void EndShutdown(IAsyncResult result)
{
CheckThrow(authSuccessCheck: true, shutdownCheck:true);
- InnerStream.EndWrite(result);
+ TaskToApm.End(result);
_shutdown = true;
}
}
diff --git a/src/System.Net.Security/src/System/Net/Security/SslStreamInternal.cs b/src/System.Net.Security/src/System/Net/Security/SslStreamInternal.cs
index a0fe031f78..6e459ec398 100644
--- a/src/System.Net.Security/src/System/Net/Security/SslStreamInternal.cs
+++ b/src/System.Net.Security/src/System/Net/Security/SslStreamInternal.cs
@@ -5,6 +5,7 @@
using System.Diagnostics;
using System.IO;
using System.Threading;
+using System.Threading.Tasks;
namespace System.Net.Security
{
@@ -454,14 +455,20 @@ namespace System.Net.Security
{
// Prepare for the next request.
asyncRequest.SetNextRequest(buffer, offset + chunkBytes, count - chunkBytes, s_resumeAsyncWriteCallback);
- IAsyncResult ar = _sslState.InnerStream.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest);
- if (!ar.CompletedSynchronously)
+ Task t = _sslState.InnerStream.WriteAsync(outBuffer, 0, encryptedBytes);
+ if (t.IsCompleted)
{
- return;
+ t.GetAwaiter().GetResult();
+ }
+ else
+ {
+ IAsyncResult ar = TaskToApm.Begin(t, s_writeCallback, asyncRequest);
+ if (!ar.CompletedSynchronously)
+ {
+ return;
+ }
+ TaskToApm.End(ar);
}
-
- _sslState.InnerStream.EndWrite(ar);
-
}
else
{
@@ -767,7 +774,7 @@ namespace System.Net.Security
try
{
- sslStream._sslState.InnerStream.EndWrite(transportResult);
+ TaskToApm.End(transportResult);
sslStream._sslState.FinishWrite();
if (asyncRequest.Count == 0)
diff --git a/src/System.Net.Security/src/System/Net/StreamFramer.cs b/src/System.Net.Security/src/System/Net/StreamFramer.cs
index 3aac3ef885..8123c48235 100644
--- a/src/System.Net.Security/src/System/Net/StreamFramer.cs
+++ b/src/System.Net.Security/src/System/Net/StreamFramer.cs
@@ -5,6 +5,7 @@
using System.Diagnostics;
using System.IO;
using System.Globalization;
+using System.Threading.Tasks;
namespace System.Net
{
@@ -136,7 +137,7 @@ namespace System.Net
_readHeaderBuffer, 0,
_readHeaderBuffer.Length);
- IAsyncResult result = _transport.BeginRead(_readHeaderBuffer, 0, _readHeaderBuffer.Length,
+ IAsyncResult result = TaskToApm.Begin(_transport.ReadAsync(_readHeaderBuffer, 0, _readHeaderBuffer.Length),
_readFrameCallback, workerResult);
if (result.CompletedSynchronously)
@@ -199,7 +200,7 @@ namespace System.Net
WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState;
- int bytesRead = _transport.EndRead(transportResult);
+ int bytesRead = TaskToApm.End<int>(transportResult);
workerResult.Offset += bytesRead;
if (!(workerResult.Offset <= workerResult.End))
@@ -260,7 +261,7 @@ namespace System.Net
workerResult.End = frame.Length;
workerResult.Offset = 0;
- // Transport.BeginRead below will pickup those changes.
+ // Transport.ReadAsync below will pickup those changes.
}
else
{
@@ -271,7 +272,7 @@ namespace System.Net
}
// This means we need more data to complete the data block.
- transportResult = _transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset,
+ transportResult = TaskToApm.Begin(_transport.ReadAsync(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset),
_readFrameCallback, workerResult);
} while (transportResult.CompletedSynchronously);
}
@@ -352,7 +353,7 @@ namespace System.Net
if (message.Length == 0)
{
- return _transport.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length,
+ return TaskToApm.Begin(_transport.WriteAsync(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length),
asyncCallback, stateObject);
}
@@ -361,7 +362,7 @@ namespace System.Net
message, 0, message.Length);
// Charge the first:
- IAsyncResult result = _transport.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length,
+ IAsyncResult result = TaskToApm.Begin(_transport.WriteAsync(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length),
_beginWriteCallback, workerResult);
if (result.CompletedSynchronously)
@@ -412,7 +413,7 @@ namespace System.Net
WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState;
// First, complete the previous portion write.
- _transport.EndWrite(transportResult);
+ TaskToApm.End(transportResult);
// Check on exit criterion.
if (workerResult.Offset == workerResult.End)
@@ -425,7 +426,7 @@ namespace System.Net
workerResult.Offset = workerResult.End;
// Write next portion (frame body) using Async IO.
- transportResult = _transport.BeginWrite(workerResult.Buffer, 0, workerResult.End,
+ transportResult = TaskToApm.Begin(_transport.WriteAsync(workerResult.Buffer, 0, workerResult.End),
_beginWriteCallback, workerResult);
}
while (transportResult.CompletedSynchronously);
@@ -454,7 +455,7 @@ namespace System.Net
}
else
{
- _transport.EndWrite(asyncResult);
+ TaskToApm.End(asyncResult);
}
}
}