Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dotnet/aspnetcore.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Fowler <davidfowl@gmail.com>2019-08-27 10:43:36 +0300
committerDavid Fowler <davidfowl@gmail.com>2019-08-27 10:43:36 +0300
commite0eb53a3d5766fe8777ad8f367fcfafd17b7c2d1 (patch)
tree6d4ce745def5717ba44a0dc806e1e55e53f69b12
parenta2178e50971d1f0ed9356a2b3905dc8f20cbed02 (diff)
Spike the HTTP/2 Streaming transportdavidfowl/http2streaming
-rw-r--r--src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs5
-rw-r--r--src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs81
-rw-r--r--src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs180
-rw-r--r--src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs4
-rw-r--r--src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs4
-rw-r--r--src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs60
-rw-r--r--src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs56
7 files changed, 376 insertions, 14 deletions
diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs
index 6e66673ad9..a264eb0c21 100644
--- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs
+++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs
@@ -45,6 +45,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
}
}
+ if ((availableServerTransports & HttpTransportType.HttpStreaming & _requestedTransportType) == HttpTransportType.HttpStreaming)
+ {
+ return new HttpStreamingTransport(_httpClient, _loggerFactory);
+ }
+
if ((availableServerTransports & HttpTransportType.ServerSentEvents & _requestedTransportType) == HttpTransportType.ServerSentEvents)
{
// We don't need to give the transport the accessTokenProvider because the HttpClient has a message handler that does the work for us.
diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs
new file mode 100644
index 0000000000..d1cb650cf3
--- /dev/null
+++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs
@@ -0,0 +1,81 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using Microsoft.AspNetCore.Connections;
+using Microsoft.Extensions.Logging;
+
+namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
+{
+ internal partial class HttpStreamingTransport
+ {
+ private static class Log
+ {
+ private static readonly Action<ILogger, TransferFormat, Exception> _startTransport =
+ LoggerMessage.Define<TransferFormat>(LogLevel.Information, new EventId(1, "StartTransport"), "Starting transport. Transfer mode: {TransferFormat}.");
+
+ private static readonly Action<ILogger, Exception> _transportStopped =
+ LoggerMessage.Define(LogLevel.Debug, new EventId(2, "TransportStopped"), "Transport stopped.");
+
+ private static readonly Action<ILogger, Exception> _startReceive =
+ LoggerMessage.Define(LogLevel.Debug, new EventId(3, "StartReceive"), "Starting receive loop.");
+
+ private static readonly Action<ILogger, Exception> _receiveStopped =
+ LoggerMessage.Define(LogLevel.Debug, new EventId(4, "ReceiveStopped"), "Receive loop stopped.");
+
+ private static readonly Action<ILogger, Exception> _receiveCanceled =
+ LoggerMessage.Define(LogLevel.Debug, new EventId(5, "ReceiveCanceled"), "Receive loop canceled.");
+
+ private static readonly Action<ILogger, Exception> _transportStopping =
+ LoggerMessage.Define(LogLevel.Information, new EventId(6, "TransportStopping"), "Transport is stopping.");
+
+ private static readonly Action<ILogger, int, Exception> _messageToApplication =
+ LoggerMessage.Define<int>(LogLevel.Debug, new EventId(7, "MessageToApplication"), "Passing message to application. Payload size: {Count}.");
+
+ private static readonly Action<ILogger, Exception> _eventStreamEnded =
+ LoggerMessage.Define(LogLevel.Debug, new EventId(8, "EventStreamEnded"), "Server-Sent Event Stream ended.");
+
+ // EventIds 100 - 106 used in SendUtils
+
+ public static void StartTransport(ILogger logger, TransferFormat transferFormat)
+ {
+ _startTransport(logger, transferFormat, null);
+ }
+
+ public static void TransportStopped(ILogger logger, Exception exception)
+ {
+ _transportStopped(logger, exception);
+ }
+
+ public static void StartReceive(ILogger logger)
+ {
+ _startReceive(logger, null);
+ }
+
+ public static void TransportStopping(ILogger logger)
+ {
+ _transportStopping(logger, null);
+ }
+
+ public static void MessageToApplication(ILogger logger, int count)
+ {
+ _messageToApplication(logger, count, null);
+ }
+
+ public static void ReceiveCanceled(ILogger logger)
+ {
+ _receiveCanceled(logger, null);
+ }
+
+ public static void ReceiveStopped(ILogger logger)
+ {
+ _receiveStopped(logger, null);
+ }
+
+ public static void EventStreamEnded(ILogger logger)
+ {
+ _eventStreamEnded(logger, null);
+ }
+ }
+ }
+}
diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs
new file mode 100644
index 0000000000..e971fecd36
--- /dev/null
+++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs
@@ -0,0 +1,180 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.IO.Pipelines;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.AspNetCore.Connections;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+
+namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
+{
+ internal partial class HttpStreamingTransport : ITransport
+ {
+ private readonly HttpClient _httpClient;
+ private readonly ILogger _logger;
+
+ // Volatile so that the receive loop sees the updated value set from a different thread
+ private volatile Exception _error;
+ private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
+ private readonly CancellationTokenSource _inputCts = new CancellationTokenSource();
+ private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser();
+ private IDuplexPipe _transport;
+ private IDuplexPipe _application;
+
+ internal Task Running { get; private set; } = Task.CompletedTask;
+
+ public PipeReader Input => _transport.Input;
+
+ public PipeWriter Output => _transport.Output;
+
+ public HttpStreamingTransport(HttpClient httpClient)
+ : this(httpClient, null)
+ { }
+
+ public HttpStreamingTransport(HttpClient httpClient, ILoggerFactory loggerFactory)
+ {
+ if (httpClient == null)
+ {
+ throw new ArgumentNullException(nameof(_httpClient));
+ }
+
+ _httpClient = httpClient;
+ _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
+ }
+
+ public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
+ {
+ if (transferFormat != TransferFormat.Text)
+ {
+ throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat));
+ }
+
+ Log.StartTransport(_logger, transferFormat);
+
+ var request = new HttpRequestMessage(HttpMethod.Get, url);
+ request.Version = new Version(2, 0);
+
+ HttpResponseMessage response = null;
+
+ try
+ {
+ response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
+ response.EnsureSuccessStatusCode();
+ }
+ catch
+ {
+ response?.Dispose();
+
+ Log.TransportStopping(_logger);
+
+ throw;
+ }
+
+ // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
+ var options = ClientPipeOptions.DefaultOptions;
+ var pair = DuplexPipe.CreateConnectionPair(options, options);
+
+ _transport = pair.Transport;
+ _application = pair.Application;
+
+ // Cancellation token will be triggered when the pipe is stopped on the client.
+ // This is to avoid the client throwing from a 404 response caused by the
+ // server stopping the connection while the send message request is in progress.
+ // _application.Input.OnWriterCompleted((exception, state) => ((CancellationTokenSource)state).Cancel(), inputCts);
+
+ Running = ProcessAsync(url, response);
+ }
+
+ private async Task ProcessAsync(Uri url, HttpResponseMessage response)
+ {
+ // Start sending and polling (ask for binary if the server supports it)
+ var receiving = ProcessResponseStream(response, _transportCts.Token);
+ var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger, _inputCts.Token);
+
+ // Wait for send or receive to complete
+ var trigger = await Task.WhenAny(receiving, sending);
+
+ if (trigger == receiving)
+ {
+ // We're waiting for the application to finish and there are 2 things it could be doing
+ // 1. Waiting for application data
+ // 2. Waiting for an outgoing send (this should be instantaneous)
+
+ _inputCts.Cancel();
+
+ // Cancel the application so that ReadAsync yields
+ _application.Input.CancelPendingRead();
+
+ await sending;
+ }
+ else
+ {
+ // Set the sending error so we communicate that to the application
+ _error = sending.IsFaulted ? sending.Exception.InnerException : null;
+
+ _transportCts.Cancel();
+
+ // Cancel any pending flush so that we can quit
+ _application.Output.CancelPendingFlush();
+
+ await receiving;
+ }
+ }
+
+ private async Task ProcessResponseStream(HttpResponseMessage response, CancellationToken cancellationToken)
+ {
+ Log.StartReceive(_logger);
+
+ using (response)
+ using (var stream = await response.Content.ReadAsStreamAsync())
+ {
+ try
+ {
+ await stream.CopyToAsync(_application.Output, cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ _error = ex;
+ }
+ finally
+ {
+ _application.Output.Complete(_error);
+
+ Log.ReceiveStopped(_logger);
+ }
+ }
+ }
+
+ public async Task StopAsync()
+ {
+ Log.TransportStopping(_logger);
+
+ if (_application == null)
+ {
+ // We never started
+ return;
+ }
+
+ _transport.Output.Complete();
+ _transport.Input.Complete();
+
+ _application.Input.CancelPendingRead();
+
+ try
+ {
+ await Running;
+ }
+ catch (Exception ex)
+ {
+ Log.TransportStopped(_logger, ex);
+ throw;
+ }
+
+ Log.TransportStopped(_logger, null);
+ }
+ }
+}
diff --git a/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs b/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs
index 85b88d80c4..31749b0683 100644
--- a/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs
+++ b/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs
@@ -30,5 +30,9 @@ namespace Microsoft.AspNetCore.Http.Connections
/// Specifies that the long polling transport is used.
/// </summary>
LongPolling = 4,
+ /// <summary>
+ /// Specifies that the HTTP streaming is used.
+ /// </summary>
+ HttpStreaming = 8
}
}
diff --git a/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs b/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs
index a633f2fc1d..9354fc5fc4 100644
--- a/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs
+++ b/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs
@@ -1,4 +1,4 @@
-// Copyright (c) .NET Foundation. All rights reserved.
+// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
namespace Microsoft.AspNetCore.Http.Connections
@@ -11,6 +11,6 @@ namespace Microsoft.AspNetCore.Http.Connections
/// <summary>
/// A bitmask combining all available <see cref="HttpTransportType"/> values.
/// </summary>
- public static readonly HttpTransportType All = HttpTransportType.WebSockets | HttpTransportType.ServerSentEvents | HttpTransportType.LongPolling;
+ public static readonly HttpTransportType All = HttpTransportType.WebSockets | HttpTransportType.HttpStreaming | HttpTransportType.ServerSentEvents | HttpTransportType.LongPolling;
}
}
diff --git a/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs b/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs
index bf82562a7b..831c4fd1ba 100644
--- a/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs
+++ b/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs
@@ -35,6 +35,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
TransferFormats = new List<string> { nameof(TransferFormat.Text) }
};
+ private static readonly AvailableTransport _httpStreamingAvailableTransport =
+ new AvailableTransport
+ {
+ Transport = nameof(HttpTransportType.HttpStreaming),
+ TransferFormats = new List<string> { nameof(TransferFormat.Text), nameof(TransferFormat.Binary) }
+ };
+
private static readonly AvailableTransport _longPollingAvailableTransport =
new AvailableTransport
{
@@ -107,11 +114,35 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
{
var supportedTransports = options.Transports;
+ if (context.WebSockets.IsWebSocketRequest)
+ {
+ // Connection can be established lazily
+ var connection = await GetOrCreateConnectionAsync(context, options);
+ if (connection == null)
+ {
+ // No such connection, GetOrCreateConnection already set the response status code
+ return;
+ }
+
+ if (!await EnsureConnectionStateAsync(connection, context, HttpTransportType.WebSockets, supportedTransports, logScope, options))
+ {
+ // Bad connection state. It's already set the response status code.
+ return;
+ }
+
+ Log.EstablishedConnection(_logger);
+
+ // Allow the reads to be cancelled
+ connection.Cancellation = new CancellationTokenSource();
+
+ var ws = new WebSocketsServerTransport(options.WebSockets, connection.Application, connection, _loggerFactory);
+
+ await DoPersistentConnection(connectionDelegate, ws, context, connection);
+ }
// Server sent events transport
// GET /{path}
// Accept: text/event-stream
- var headers = context.Request.GetTypedHeaders();
- if (headers.Accept?.Contains(new Net.Http.Headers.MediaTypeHeaderValue("text/event-stream")) == true)
+ else if (context.Request.GetTypedHeaders().Accept?.Contains(new Net.Http.Headers.MediaTypeHeaderValue("text/event-stream")) == true)
{
// Connection must already exist
var connection = await GetConnectionAsync(context);
@@ -137,17 +168,18 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
await DoPersistentConnection(connectionDelegate, sse, context, connection);
}
- else if (context.WebSockets.IsWebSocketRequest)
+ else if (context.Request.Protocol == "HTTP/2")
{
- // Connection can be established lazily
- var connection = await GetOrCreateConnectionAsync(context, options);
+ // Support HTTP/2 Streaming
+
+ var connection = await GetConnectionAsync(context);
if (connection == null)
{
- // No such connection, GetOrCreateConnection already set the response status code
+ // No such connection, GetConnection already set the response status code
return;
}
- if (!await EnsureConnectionStateAsync(connection, context, HttpTransportType.WebSockets, supportedTransports, logScope, options))
+ if (!await EnsureConnectionStateAsync(connection, context, HttpTransportType.HttpStreaming, supportedTransports, logScope, options))
{
// Bad connection state. It's already set the response status code.
return;
@@ -155,12 +187,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
Log.EstablishedConnection(_logger);
- // Allow the reads to be cancelled
- connection.Cancellation = new CancellationTokenSource();
-
- var ws = new WebSocketsServerTransport(options.WebSockets, connection.Application, connection, _loggerFactory);
+ // We only need to provide the Input channel since writing to the application is handled through /send.
+ var streaming = new HttpStreamingTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
- await DoPersistentConnection(connectionDelegate, ws, context, connection);
+ await DoPersistentConnection(connectionDelegate, streaming, context, connection);
}
else
{
@@ -317,6 +347,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
response.AvailableTransports.Add(_webSocketAvailableTransport);
}
+ // REVIEW: Hard code to HTTP/2 support on the server side?
+ if ((options.Transports & HttpTransportType.HttpStreaming) != 0)
+ {
+ response.AvailableTransports.Add(_httpStreamingAvailableTransport);
+ }
+
if ((options.Transports & HttpTransportType.ServerSentEvents) != 0)
{
response.AvailableTransports.Add(_serverSentEventsAvailableTransport);
diff --git a/src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs b/src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs
new file mode 100644
index 0000000000..7708246c2e
--- /dev/null
+++ b/src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs
@@ -0,0 +1,56 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports
+{
+ internal class HttpStreamingTransport : IHttpTransport
+ {
+ private readonly PipeReader _application;
+ private readonly string _connectionId;
+ private readonly ILogger _logger;
+
+ public HttpStreamingTransport(PipeReader application, string connectionId, ILoggerFactory loggerFactory)
+ {
+ _application = application;
+ _connectionId = connectionId;
+
+ // We create the logger with a string to preserve the logging namespace after the server side transport renames.
+ _logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Http.Connections.Internal.Transports.HttpStreamingTransport");
+ }
+
+ public async Task ProcessRequestAsync(HttpContext context, CancellationToken token)
+ {
+ // Flush headers immediately so we can start streaming
+ await context.Response.Body.FlushAsync();
+
+ try
+ {
+ while (true)
+ {
+ await _application.CopyToAsync(context.Response.Body, token);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // Closed connection
+ }
+ }
+
+ private static class Log
+ {
+ private static readonly Action<ILogger, long, Exception> _writingMessage =
+ LoggerMessage.Define<long>(LogLevel.Trace, new EventId(1, "HttpStreamingWritingMessage"), "Writing a {Count} byte message.");
+
+ public static void WritingMessage(ILogger logger, long count)
+ {
+ _writingMessage(logger, count, null);
+ }
+ }
+ }
+}