Welcome to mirror list, hosted at ThFree Co, Russian Federation.

PipeWriterStream.cs « Shared « common « SignalR « src - github.com/dotnet/aspnetcore.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 245731bfd9253fa6fdf1af1e4886ee36d9ab8fc0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines
{
    // Write only stream implementation for efficiently writing bytes from the request body
    internal class PipeWriterStream : Stream
    {
        private long _length;
        private readonly PipeWriter _pipeWriter;

        public PipeWriterStream(PipeWriter pipeWriter)
        {
            _pipeWriter = pipeWriter;
        }

        public override bool CanRead => false;

        public override bool CanSeek => false;

        public override bool CanWrite => true;

        public override long Length => _length;

        public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }

        public override void Flush()
        {
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            _pipeWriter.Write(new ReadOnlySpan<byte>(buffer, offset, count));
            _length += count;
        }

        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            return WriteCoreAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
        }

#if NETCOREAPP2_1
        public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
        {
            return WriteCoreAsync(source, cancellationToken);
        }
#endif

        private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                return new ValueTask(Task.FromCanceled(cancellationToken));
            }

            _length += source.Length;
            var task = _pipeWriter.WriteAsync(source);
            if (task.IsCompletedSuccessfully)
            {
                // Cancellation can be triggered by PipeWriter.CancelPendingFlush
                if (task.Result.IsCanceled)
                {
                    throw new OperationCanceledException();
                }
            }
            else if (!task.IsCompletedSuccessfully)
            {
                return WriteSlowAsync(task);
            }

            return default;

            async ValueTask WriteSlowAsync(ValueTask<FlushResult> flushTask)
            {
                var flushResult = await flushTask;

                // Cancellation can be triggered by PipeWriter.CancelPendingFlush
                if (flushResult.IsCanceled)
                {
                    throw new OperationCanceledException();
                }
            }
        }

        public void Reset()
        {
            _length = 0;
        }
    }
}