diff options
author | Brennan Conroy <brecon@microsoft.com> | 2022-08-24 18:49:32 +0300 |
---|---|---|
committer | Brennan Conroy <brecon@microsoft.com> | 2022-08-24 18:51:57 +0300 |
commit | 485de1bb19f4110362999df00c64fd9719304a0c (patch) | |
tree | b8b482199e7b9d974500aa300c4a81b2e41ff122 | |
parent | 797aed71b385f475196279d32f8572f955859689 (diff) |
[SignalR] Fix WebSocket client close when network disappearsbrecon/close
-rw-r--r-- | src/SignalR/clients/csharp/Client/test/UnitTests/WebSocketsTransportTests.cs | 79 | ||||
-rw-r--r-- | src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs | 37 |
2 files changed, 97 insertions, 19 deletions
diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/WebSocketsTransportTests.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/WebSocketsTransportTests.cs new file mode 100644 index 0000000000..b68ee0d467 --- /dev/null +++ b/src/SignalR/clients/csharp/Client/test/UnitTests/WebSocketsTransportTests.cs @@ -0,0 +1,79 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Net.WebSockets; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Http.Connections.Client; +using Microsoft.AspNetCore.Http.Connections.Client.Internal; +using Microsoft.AspNetCore.SignalR.Tests; +using Microsoft.AspNetCore.Testing; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests; + +public class WebSocketsTransportTests : VerifiableLoggedTest +{ + // Tests that the transport can still be stopped if SendAsync and ReceiveAsync are hanging (ethernet unplugged for example) + [Fact] + public async Task StopCancelsSendAndReceive() + { + var options = new HttpConnectionOptions() + { + WebSocketFactory = (context, token) => + { + return ValueTask.FromResult((WebSocket)new TestWebSocket()); + }, + CloseTimeout = TimeSpan.FromMilliseconds(1), + }; + + using (StartVerifiableLog()) + { + var webSocketsTransport = new WebSocketsTransport(options, loggerFactory: LoggerFactory, () => Task.FromResult<string>(null), null); + + await webSocketsTransport.StartAsync( + new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout(); + + await webSocketsTransport.StopAsync().DefaultTimeout(); + + await webSocketsTransport.Running.DefaultTimeout(); + } + } + + internal class TestWebSocket : WebSocket + { + public Task ConnectAsync(Uri uri, CancellationToken cancellationToken) => Task.CompletedTask; + + public override WebSocketCloseStatus? CloseStatus => null; + + public override string CloseStatusDescription => string.Empty; + + public override WebSocketState State => WebSocketState.Open; + + public override string SubProtocol => string.Empty; + + public override void Abort() { } + + public override Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + => Task.CompletedTask; + + public override async Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + { + await cancellationToken.WaitForCancellationAsync(); + cancellationToken.ThrowIfCancellationRequested(); + } + + public override void Dispose() { } + + public override async Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken) + { + await cancellationToken.WaitForCancellationAsync(); + cancellationToken.ThrowIfCancellationRequested(); + return new WebSocketReceiveResult(0, WebSocketMessageType.Text, true); + } + + public override async Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + await cancellationToken.WaitForCancellationAsync(); + cancellationToken.ThrowIfCancellationRequested(); + } + } +} diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs index 2dd87407d6..7a2c49909f 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs @@ -27,6 +27,7 @@ internal sealed partial class WebSocketsTransport : ITransport private volatile bool _aborted; private readonly HttpConnectionOptions _httpConnectionOptions; private readonly HttpClient? _httpClient; + private readonly CancellationTokenSource _stopCts = new CancellationTokenSource(); private IDuplexPipe? _transport; @@ -224,6 +225,8 @@ internal sealed partial class WebSocketsTransport : ITransport // Wait for send or receive to complete var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false); + _stopCts.CancelAfter(_closeTimeout); + if (trigger == receiving) { // We're waiting for the application to finish and there are 2 things it could be doing @@ -233,22 +236,14 @@ internal sealed partial class WebSocketsTransport : ITransport // Cancel the application so that ReadAsync yields _application.Input.CancelPendingRead(); - using (var delayCts = new CancellationTokenSource()) - { - var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, delayCts.Token)).ConfigureAwait(false); + var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token)).ConfigureAwait(false); - if (resultTask != sending) - { - _aborted = true; + if (resultTask != sending) + { + _aborted = true; - // Abort the websocket if we're stuck in a pending send to the client - socket.Abort(); - } - else - { - // Cancel the timeout - delayCts.Cancel(); - } + // Abort the websocket if we're stuck in a pending send to the client + socket.Abort(); } } else @@ -278,7 +273,7 @@ internal sealed partial class WebSocketsTransport : ITransport { #if NETSTANDARD2_1 || NETCOREAPP // Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read - var result = await socket.ReceiveAsync(Memory<byte>.Empty, CancellationToken.None).ConfigureAwait(false); + var result = await socket.ReceiveAsync(Memory<byte>.Empty, _stopCts.Token).ConfigureAwait(false); if (result.MessageType == WebSocketMessageType.Close) { @@ -295,13 +290,13 @@ internal sealed partial class WebSocketsTransport : ITransport var memory = _application.Output.GetMemory(); #if NETSTANDARD2_1 || NETCOREAPP // Because we checked the CloseStatus from the 0 byte read above, we don't need to check again after reading - var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None).ConfigureAwait(false); + var receiveResult = await socket.ReceiveAsync(memory, _stopCts.Token).ConfigureAwait(false); #elif NETSTANDARD2_0 || NETFRAMEWORK var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment); Debug.Assert(isArray); // Exceptions are handled above where the send and receive tasks are being run. - var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None).ConfigureAwait(false); + var receiveResult = await socket.ReceiveAsync(arraySegment, _stopCts.Token).ConfigureAwait(false); #else #error TFMs need to be updated #endif @@ -382,7 +377,7 @@ internal sealed partial class WebSocketsTransport : ITransport if (WebSocketCanSend(socket)) { - await socket.SendAsync(buffer, _webSocketMessageType).ConfigureAwait(false); + await socket.SendAsync(buffer, _webSocketMessageType, _stopCts.Token).ConfigureAwait(false); } else { @@ -420,7 +415,7 @@ internal sealed partial class WebSocketsTransport : ITransport try { // We're done sending, send the close frame to the client if the websocket is still open - await socket.CloseOutputAsync(error != null ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None).ConfigureAwait(false); + await socket.CloseOutputAsync(error != null ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", _stopCts.Token).ConfigureAwait(false); } catch (Exception ex) { @@ -472,6 +467,9 @@ internal sealed partial class WebSocketsTransport : ITransport // Cancel any pending reads from the application, this should start the entire shutdown process _application.Input.CancelPendingRead(); + // Start ungraceful close timer + _stopCts.CancelAfter(_closeTimeout); + try { await Running.ConfigureAwait(false); @@ -485,6 +483,7 @@ internal sealed partial class WebSocketsTransport : ITransport finally { _webSocket?.Dispose(); + _stopCts.Dispose(); } Log.TransportStopped(_logger, null); |