Welcome to mirror list, hosted at ThFree Co, Russian Federation.

MessageBody.cs « Http « Internal « src « Core « Kestrel « Servers « src - github.com/dotnet/aspnetcore.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: e5001df5dc9d76a365e6c471ede2a82403e7a25e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
    internal abstract class MessageBody
    {
        private static readonly MessageBody _zeroContentLengthClose = new ZeroContentLengthMessageBody(keepAlive: false);
        private static readonly MessageBody _zeroContentLengthKeepAlive = new ZeroContentLengthMessageBody(keepAlive: true);

        private readonly HttpProtocol _context;

        private bool _send100Continue = true;
        private long _observedBytes;
        private bool _stopped;

        protected bool _timingEnabled;
        protected bool _backpressure;
        protected long _alreadyTimedBytes;
        protected long _examinedUnconsumedBytes;

        protected MessageBody(HttpProtocol context)
        {
            _context = context;
        }

        public static MessageBody ZeroContentLengthClose => _zeroContentLengthClose;

        public static MessageBody ZeroContentLengthKeepAlive => _zeroContentLengthKeepAlive;

        public bool RequestKeepAlive { get; protected set; }

        public bool RequestUpgrade { get; protected set; }

        public virtual bool IsEmpty => false;

        protected IKestrelTrace Log => _context.ServiceContext.Log;

        public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default);

        public abstract bool TryRead(out ReadResult readResult);

        public void AdvanceTo(SequencePosition consumed)
        {
            AdvanceTo(consumed, consumed);
        }

        public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);

        public abstract void CancelPendingRead();

        public abstract void Complete(Exception? exception);

        public virtual ValueTask CompleteAsync(Exception? exception)
        {
            Complete(exception);
            return default;
        }

        public virtual Task ConsumeAsync()
        {
            TryStart();

            return OnConsumeAsync();
        }

        public virtual ValueTask StopAsync()
        {
            TryStop();

            return OnStopAsync();
        }

        protected virtual Task OnConsumeAsync() => Task.CompletedTask;

        protected virtual ValueTask OnStopAsync() => default;

        public virtual void Reset()
        {
            _send100Continue = true;
            _observedBytes = 0;
            _stopped = false;
            _timingEnabled = false;
            _backpressure = false;
            _alreadyTimedBytes = 0;
            _examinedUnconsumedBytes = 0;
        }

        protected void TryProduceContinue()
        {
            if (_send100Continue)
            {
                _context.HttpResponseControl.ProduceContinue();
                _send100Continue = false;
            }
        }

        protected void TryStart()
        {
            if (_context.HasStartedConsumingRequestBody)
            {
                return;
            }

            OnReadStarting();
            _context.HasStartedConsumingRequestBody = true;

            if (!RequestUpgrade)
            {
                // Accessing TraceIdentifier will lazy-allocate a string ID.
                // Don't access TraceIdentifer unless logging is enabled.
                if (Log.IsEnabled(LogLevel.Debug))
                {
                    Log.RequestBodyStart(_context.ConnectionIdFeature, _context.TraceIdentifier);
                }

                if (_context.MinRequestBodyDataRate != null)
                {
                    _timingEnabled = true;
                    _context.TimeoutControl.StartRequestBody(_context.MinRequestBodyDataRate);
                }
            }

            OnReadStarted();
        }

        protected void TryStop()
        {
            if (_stopped)
            {
                return;
            }

            _stopped = true;

            if (!RequestUpgrade)
            {
                // Accessing TraceIdentifier will lazy-allocate a string ID
                // Don't access TraceIdentifer unless logging is enabled.
                if (Log.IsEnabled(LogLevel.Debug))
                {
                    Log.RequestBodyDone(_context.ConnectionIdFeature, _context.TraceIdentifier);
                }

                if (_timingEnabled)
                {
                    if (_backpressure)
                    {
                        _context.TimeoutControl.StopTimingRead();
                    }

                    _context.TimeoutControl.StopRequestBody();
                }
            }
        }

        protected virtual void OnReadStarting()
        {
        }

        protected virtual void OnReadStarted()
        {
        }

        protected void AddAndCheckObservedBytes(long observedBytes)
        {
            _observedBytes += observedBytes;

            if (_observedBytes > _context.MaxRequestBodySize)
            {
                KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTooLarge);
            }
        }

        protected ValueTask<ReadResult> StartTimingReadAsync(ValueTask<ReadResult> readAwaitable, CancellationToken cancellationToken)
        {

            if (!readAwaitable.IsCompleted && _timingEnabled)
            {
                TryProduceContinue();

                _backpressure = true;
                _context.TimeoutControl.StartTimingRead();
            }

            return readAwaitable;
        }

        protected void CountBytesRead(long bytesInReadResult)
        {
            var numFirstSeenBytes = bytesInReadResult - _alreadyTimedBytes;

            if (numFirstSeenBytes > 0)
            {
                _context.TimeoutControl.BytesRead(numFirstSeenBytes);
            }
        }

        protected void StopTimingRead(long bytesInReadResult)
        {
            CountBytesRead(bytesInReadResult);

            if (_backpressure)
            {
                _backpressure = false;
                _context.TimeoutControl.StopTimingRead();
            }
        }

        protected long TrackConsumedAndExaminedBytes(ReadResult readResult, SequencePosition consumed, SequencePosition examined)
        {
            // This code path is fairly hard to understand so let's break it down with an example
            // ReadAsync returns a ReadResult of length 50.
            // Advance(25, 40). The examined length would be 40 and consumed length would be 25.
            // _totalExaminedInPreviousReadResult starts at 0. newlyExamined is 40.
            // OnDataRead is called with length 40.
            // _totalExaminedInPreviousReadResult is now 40 - 25 = 15.

            // The next call to ReadAsync returns 50 again
            // Advance(5, 5) is called
            // newlyExamined is 5 - 15, or -10.
            // Update _totalExaminedInPreviousReadResult to 10 as we consumed 5.

            // The next call to ReadAsync returns 50 again
            // _totalExaminedInPreviousReadResult is 10
            // Advance(50, 50) is called
            // newlyExamined = 50 - 10 = 40
            // _totalExaminedInPreviousReadResult is now 50
            // _totalExaminedInPreviousReadResult is finally 0 after subtracting consumedLength.

            long examinedLength, consumedLength, totalLength;

            if (consumed.Equals(examined))
            {
                examinedLength = readResult.Buffer.Slice(readResult.Buffer.Start, examined).Length;
                consumedLength = examinedLength;
            }
            else
            {
                consumedLength = readResult.Buffer.Slice(readResult.Buffer.Start, consumed).Length;
                examinedLength = consumedLength + readResult.Buffer.Slice(consumed, examined).Length;
            }

            if (examined.Equals(readResult.Buffer.End))
            {
                totalLength = examinedLength;
            }
            else
            {
                totalLength = readResult.Buffer.Length;
            }

            var newlyExaminedBytes = examinedLength - _examinedUnconsumedBytes;
            _examinedUnconsumedBytes += newlyExaminedBytes - consumedLength;
            _alreadyTimedBytes = totalLength - consumedLength;

            return newlyExaminedBytes;
        }
    }
}