diff options
author | David Fowler <davidfowl@gmail.com> | 2019-08-27 10:43:36 +0300 |
---|---|---|
committer | David Fowler <davidfowl@gmail.com> | 2019-08-27 10:43:36 +0300 |
commit | e0eb53a3d5766fe8777ad8f367fcfafd17b7c2d1 (patch) | |
tree | 6d4ce745def5717ba44a0dc806e1e55e53f69b12 | |
parent | a2178e50971d1f0ed9356a2b3905dc8f20cbed02 (diff) |
Spike the HTTP/2 Streaming transportdavidfowl/http2streaming
7 files changed, 376 insertions, 14 deletions
diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs index 6e66673ad9..a264eb0c21 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs @@ -45,6 +45,11 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal } } + if ((availableServerTransports & HttpTransportType.HttpStreaming & _requestedTransportType) == HttpTransportType.HttpStreaming) + { + return new HttpStreamingTransport(_httpClient, _loggerFactory); + } + if ((availableServerTransports & HttpTransportType.ServerSentEvents & _requestedTransportType) == HttpTransportType.ServerSentEvents) { // We don't need to give the transport the accessTokenProvider because the HttpClient has a message handler that does the work for us. diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs new file mode 100644 index 0000000000..d1cb650cf3 --- /dev/null +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.Log.cs @@ -0,0 +1,81 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AspNetCore.Http.Connections.Client.Internal +{ + internal partial class HttpStreamingTransport + { + private static class Log + { + private static readonly Action<ILogger, TransferFormat, Exception> _startTransport = + LoggerMessage.Define<TransferFormat>(LogLevel.Information, new EventId(1, "StartTransport"), "Starting transport. Transfer mode: {TransferFormat}."); + + private static readonly Action<ILogger, Exception> _transportStopped = + LoggerMessage.Define(LogLevel.Debug, new EventId(2, "TransportStopped"), "Transport stopped."); + + private static readonly Action<ILogger, Exception> _startReceive = + LoggerMessage.Define(LogLevel.Debug, new EventId(3, "StartReceive"), "Starting receive loop."); + + private static readonly Action<ILogger, Exception> _receiveStopped = + LoggerMessage.Define(LogLevel.Debug, new EventId(4, "ReceiveStopped"), "Receive loop stopped."); + + private static readonly Action<ILogger, Exception> _receiveCanceled = + LoggerMessage.Define(LogLevel.Debug, new EventId(5, "ReceiveCanceled"), "Receive loop canceled."); + + private static readonly Action<ILogger, Exception> _transportStopping = + LoggerMessage.Define(LogLevel.Information, new EventId(6, "TransportStopping"), "Transport is stopping."); + + private static readonly Action<ILogger, int, Exception> _messageToApplication = + LoggerMessage.Define<int>(LogLevel.Debug, new EventId(7, "MessageToApplication"), "Passing message to application. Payload size: {Count}."); + + private static readonly Action<ILogger, Exception> _eventStreamEnded = + LoggerMessage.Define(LogLevel.Debug, new EventId(8, "EventStreamEnded"), "Server-Sent Event Stream ended."); + + // EventIds 100 - 106 used in SendUtils + + public static void StartTransport(ILogger logger, TransferFormat transferFormat) + { + _startTransport(logger, transferFormat, null); + } + + public static void TransportStopped(ILogger logger, Exception exception) + { + _transportStopped(logger, exception); + } + + public static void StartReceive(ILogger logger) + { + _startReceive(logger, null); + } + + public static void TransportStopping(ILogger logger) + { + _transportStopping(logger, null); + } + + public static void MessageToApplication(ILogger logger, int count) + { + _messageToApplication(logger, count, null); + } + + public static void ReceiveCanceled(ILogger logger) + { + _receiveCanceled(logger, null); + } + + public static void ReceiveStopped(ILogger logger) + { + _receiveStopped(logger, null); + } + + public static void EventStreamEnded(ILogger logger) + { + _eventStreamEnded(logger, null); + } + } + } +} diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs new file mode 100644 index 0000000000..e971fecd36 --- /dev/null +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/HttpStreamingTransport.cs @@ -0,0 +1,180 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Microsoft.AspNetCore.Http.Connections.Client.Internal +{ + internal partial class HttpStreamingTransport : ITransport + { + private readonly HttpClient _httpClient; + private readonly ILogger _logger; + + // Volatile so that the receive loop sees the updated value set from a different thread + private volatile Exception _error; + private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); + private readonly CancellationTokenSource _inputCts = new CancellationTokenSource(); + private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser(); + private IDuplexPipe _transport; + private IDuplexPipe _application; + + internal Task Running { get; private set; } = Task.CompletedTask; + + public PipeReader Input => _transport.Input; + + public PipeWriter Output => _transport.Output; + + public HttpStreamingTransport(HttpClient httpClient) + : this(httpClient, null) + { } + + public HttpStreamingTransport(HttpClient httpClient, ILoggerFactory loggerFactory) + { + if (httpClient == null) + { + throw new ArgumentNullException(nameof(_httpClient)); + } + + _httpClient = httpClient; + _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>(); + } + + public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default) + { + if (transferFormat != TransferFormat.Text) + { + throw new ArgumentException($"The '{transferFormat}' transfer format is not supported by this transport.", nameof(transferFormat)); + } + + Log.StartTransport(_logger, transferFormat); + + var request = new HttpRequestMessage(HttpMethod.Get, url); + request.Version = new Version(2, 0); + + HttpResponseMessage response = null; + + try + { + response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + response.EnsureSuccessStatusCode(); + } + catch + { + response?.Dispose(); + + Log.TransportStopping(_logger); + + throw; + } + + // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) + var options = ClientPipeOptions.DefaultOptions; + var pair = DuplexPipe.CreateConnectionPair(options, options); + + _transport = pair.Transport; + _application = pair.Application; + + // Cancellation token will be triggered when the pipe is stopped on the client. + // This is to avoid the client throwing from a 404 response caused by the + // server stopping the connection while the send message request is in progress. + // _application.Input.OnWriterCompleted((exception, state) => ((CancellationTokenSource)state).Cancel(), inputCts); + + Running = ProcessAsync(url, response); + } + + private async Task ProcessAsync(Uri url, HttpResponseMessage response) + { + // Start sending and polling (ask for binary if the server supports it) + var receiving = ProcessResponseStream(response, _transportCts.Token); + var sending = SendUtils.SendMessages(url, _application, _httpClient, _logger, _inputCts.Token); + + // Wait for send or receive to complete + var trigger = await Task.WhenAny(receiving, sending); + + if (trigger == receiving) + { + // We're waiting for the application to finish and there are 2 things it could be doing + // 1. Waiting for application data + // 2. Waiting for an outgoing send (this should be instantaneous) + + _inputCts.Cancel(); + + // Cancel the application so that ReadAsync yields + _application.Input.CancelPendingRead(); + + await sending; + } + else + { + // Set the sending error so we communicate that to the application + _error = sending.IsFaulted ? sending.Exception.InnerException : null; + + _transportCts.Cancel(); + + // Cancel any pending flush so that we can quit + _application.Output.CancelPendingFlush(); + + await receiving; + } + } + + private async Task ProcessResponseStream(HttpResponseMessage response, CancellationToken cancellationToken) + { + Log.StartReceive(_logger); + + using (response) + using (var stream = await response.Content.ReadAsStreamAsync()) + { + try + { + await stream.CopyToAsync(_application.Output, cancellationToken); + } + catch (Exception ex) + { + _error = ex; + } + finally + { + _application.Output.Complete(_error); + + Log.ReceiveStopped(_logger); + } + } + } + + public async Task StopAsync() + { + Log.TransportStopping(_logger); + + if (_application == null) + { + // We never started + return; + } + + _transport.Output.Complete(); + _transport.Input.Complete(); + + _application.Input.CancelPendingRead(); + + try + { + await Running; + } + catch (Exception ex) + { + Log.TransportStopped(_logger, ex); + throw; + } + + Log.TransportStopped(_logger, null); + } + } +} diff --git a/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs b/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs index 85b88d80c4..31749b0683 100644 --- a/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs +++ b/src/SignalR/common/Http.Connections.Common/src/HttpTransportType.cs @@ -30,5 +30,9 @@ namespace Microsoft.AspNetCore.Http.Connections /// Specifies that the long polling transport is used. /// </summary> LongPolling = 4, + /// <summary> + /// Specifies that the HTTP streaming is used. + /// </summary> + HttpStreaming = 8 } } diff --git a/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs b/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs index a633f2fc1d..9354fc5fc4 100644 --- a/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs +++ b/src/SignalR/common/Http.Connections.Common/src/HttpTransports.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. namespace Microsoft.AspNetCore.Http.Connections @@ -11,6 +11,6 @@ namespace Microsoft.AspNetCore.Http.Connections /// <summary> /// A bitmask combining all available <see cref="HttpTransportType"/> values. /// </summary> - public static readonly HttpTransportType All = HttpTransportType.WebSockets | HttpTransportType.ServerSentEvents | HttpTransportType.LongPolling; + public static readonly HttpTransportType All = HttpTransportType.WebSockets | HttpTransportType.HttpStreaming | HttpTransportType.ServerSentEvents | HttpTransportType.LongPolling; } } diff --git a/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs b/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs index bf82562a7b..831c4fd1ba 100644 --- a/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs +++ b/src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs @@ -35,6 +35,13 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal TransferFormats = new List<string> { nameof(TransferFormat.Text) } }; + private static readonly AvailableTransport _httpStreamingAvailableTransport = + new AvailableTransport + { + Transport = nameof(HttpTransportType.HttpStreaming), + TransferFormats = new List<string> { nameof(TransferFormat.Text), nameof(TransferFormat.Binary) } + }; + private static readonly AvailableTransport _longPollingAvailableTransport = new AvailableTransport { @@ -107,11 +114,35 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal { var supportedTransports = options.Transports; + if (context.WebSockets.IsWebSocketRequest) + { + // Connection can be established lazily + var connection = await GetOrCreateConnectionAsync(context, options); + if (connection == null) + { + // No such connection, GetOrCreateConnection already set the response status code + return; + } + + if (!await EnsureConnectionStateAsync(connection, context, HttpTransportType.WebSockets, supportedTransports, logScope, options)) + { + // Bad connection state. It's already set the response status code. + return; + } + + Log.EstablishedConnection(_logger); + + // Allow the reads to be cancelled + connection.Cancellation = new CancellationTokenSource(); + + var ws = new WebSocketsServerTransport(options.WebSockets, connection.Application, connection, _loggerFactory); + + await DoPersistentConnection(connectionDelegate, ws, context, connection); + } // Server sent events transport // GET /{path} // Accept: text/event-stream - var headers = context.Request.GetTypedHeaders(); - if (headers.Accept?.Contains(new Net.Http.Headers.MediaTypeHeaderValue("text/event-stream")) == true) + else if (context.Request.GetTypedHeaders().Accept?.Contains(new Net.Http.Headers.MediaTypeHeaderValue("text/event-stream")) == true) { // Connection must already exist var connection = await GetConnectionAsync(context); @@ -137,17 +168,18 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal await DoPersistentConnection(connectionDelegate, sse, context, connection); } - else if (context.WebSockets.IsWebSocketRequest) + else if (context.Request.Protocol == "HTTP/2") { - // Connection can be established lazily - var connection = await GetOrCreateConnectionAsync(context, options); + // Support HTTP/2 Streaming + + var connection = await GetConnectionAsync(context); if (connection == null) { - // No such connection, GetOrCreateConnection already set the response status code + // No such connection, GetConnection already set the response status code return; } - if (!await EnsureConnectionStateAsync(connection, context, HttpTransportType.WebSockets, supportedTransports, logScope, options)) + if (!await EnsureConnectionStateAsync(connection, context, HttpTransportType.HttpStreaming, supportedTransports, logScope, options)) { // Bad connection state. It's already set the response status code. return; @@ -155,12 +187,10 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal Log.EstablishedConnection(_logger); - // Allow the reads to be cancelled - connection.Cancellation = new CancellationTokenSource(); - - var ws = new WebSocketsServerTransport(options.WebSockets, connection.Application, connection, _loggerFactory); + // We only need to provide the Input channel since writing to the application is handled through /send. + var streaming = new HttpStreamingTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory); - await DoPersistentConnection(connectionDelegate, ws, context, connection); + await DoPersistentConnection(connectionDelegate, streaming, context, connection); } else { @@ -317,6 +347,12 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal response.AvailableTransports.Add(_webSocketAvailableTransport); } + // REVIEW: Hard code to HTTP/2 support on the server side? + if ((options.Transports & HttpTransportType.HttpStreaming) != 0) + { + response.AvailableTransports.Add(_httpStreamingAvailableTransport); + } + if ((options.Transports & HttpTransportType.ServerSentEvents) != 0) { response.AvailableTransports.Add(_serverSentEventsAvailableTransport); diff --git a/src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs b/src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs new file mode 100644 index 0000000000..7708246c2e --- /dev/null +++ b/src/SignalR/common/Http.Connections/src/Internal/Transports/HttpStreamingTransport.cs @@ -0,0 +1,56 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AspNetCore.Http.Connections.Internal.Transports +{ + internal class HttpStreamingTransport : IHttpTransport + { + private readonly PipeReader _application; + private readonly string _connectionId; + private readonly ILogger _logger; + + public HttpStreamingTransport(PipeReader application, string connectionId, ILoggerFactory loggerFactory) + { + _application = application; + _connectionId = connectionId; + + // We create the logger with a string to preserve the logging namespace after the server side transport renames. + _logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Http.Connections.Internal.Transports.HttpStreamingTransport"); + } + + public async Task ProcessRequestAsync(HttpContext context, CancellationToken token) + { + // Flush headers immediately so we can start streaming + await context.Response.Body.FlushAsync(); + + try + { + while (true) + { + await _application.CopyToAsync(context.Response.Body, token); + } + } + catch (OperationCanceledException) + { + // Closed connection + } + } + + private static class Log + { + private static readonly Action<ILogger, long, Exception> _writingMessage = + LoggerMessage.Define<long>(LogLevel.Trace, new EventId(1, "HttpStreamingWritingMessage"), "Writing a {Count} byte message."); + + public static void WritingMessage(ILogger logger, long count) + { + _writingMessage(logger, count, null); + } + } + } +} |