Welcome to mirror list, hosted at ThFree Co, Russian Federation.

SchedulerOperation.cs « Concurrency « Reactive « System.Reactive.Core « Rx.NET - github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: bcff761672ff63252bbe703ccc6d1d7baa43b818 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#if HAS_AWAIT
using System.ComponentModel;
using System.Runtime.CompilerServices;
using System.Threading;

namespace System.Reactive.Concurrency
{
    /// <summary>
    /// Represents an awaitable scheduler operation. Awaiting the object causes the continuation to be posted back to the originating scheduler's work queue.
    /// </summary>
    public sealed class SchedulerOperation
    {
        private readonly Func<Action, IDisposable> _schedule;
        private readonly CancellationToken _cancellationToken;
        private readonly bool _postBackToOriginalContext;

        internal SchedulerOperation(Func<Action, IDisposable> schedule, CancellationToken cancellationToken)
            : this(schedule, cancellationToken, false)
        {
        }

        internal SchedulerOperation(Func<Action, IDisposable> schedule, CancellationToken cancellationToken, bool postBackToOriginalContext)
        {
            _schedule = schedule;
            _cancellationToken = cancellationToken;
            _postBackToOriginalContext = postBackToOriginalContext;
        }

        /// <summary>
        /// Controls whether the continuation is run on the originating synchronization context (false by default).
        /// </summary>
        /// <param name="continueOnCapturedContext">true to run the continuation on the captured synchronization context; false otherwise (default).</param>
        /// <returns>Scheduler operation object with configured await behavior.</returns>
        public SchedulerOperation ConfigureAwait(bool continueOnCapturedContext)
        {
            return new SchedulerOperation(_schedule, _cancellationToken, continueOnCapturedContext);
        }

        /// <summary>
        /// Gets an awaiter for the scheduler operation, used to post back the continuation.
        /// </summary>
        /// <returns>Awaiter for the scheduler operation.</returns>
        public SchedulerOperationAwaiter GetAwaiter()
        {
            return new SchedulerOperationAwaiter(_schedule, _cancellationToken, _postBackToOriginalContext);
        }
    }

    /// <summary>
    /// (Infrastructure) Scheduler operation awaiter type used by the code generated for C# await and Visual Basic Await expressions.
    /// </summary>
    [EditorBrowsable(EditorBrowsableState.Never)]
    public sealed class SchedulerOperationAwaiter
        : INotifyCompletion
    {
        private readonly Func<Action, IDisposable> _schedule;
        private readonly CancellationToken _cancellationToken;
        private readonly bool _postBackToOriginalContext;
        private readonly CancellationTokenRegistration _ctr;

        internal SchedulerOperationAwaiter(Func<Action, IDisposable> schedule, CancellationToken cancellationToken, bool postBackToOriginalContext)
        {
            _schedule = schedule;
            _cancellationToken = cancellationToken;
            _postBackToOriginalContext = postBackToOriginalContext;

            _ctr = _cancellationToken.Register(Cancel);
        }

        /// <summary>
        /// Indicates whether the scheduler operation has completed. Returns false unless cancellation was already requested.
        /// </summary>
        public bool IsCompleted
        {
            get { return _cancellationToken.IsCancellationRequested; }
        }

        /// <summary>
        /// Completes the scheduler operation, throwing an OperationCanceledException in case cancellation was requested.
        /// </summary>
        public void GetResult()
        {
            _cancellationToken.ThrowIfCancellationRequested();
        }

        /// <summary>
        /// Registers the continuation with the scheduler operation.
        /// </summary>
        /// <param name="continuation">Continuation to be run on the originating scheduler.</param>
        public void OnCompleted(Action continuation)
        {
            if (continuation == null)
                throw new ArgumentNullException("continuation");

            if (_continuation != null)
                throw new InvalidOperationException(Strings_Core.SCHEDULER_OPERATION_ALREADY_AWAITED);

            if (_postBackToOriginalContext)
            {
                var ctx = SynchronizationContext.Current;
                if (ctx != null)
                {
                    var original = continuation;
                    continuation = () =>
                    {
                        //
                        // No need for OperationStarted and OperationCompleted calls here;
                        // this code is invoked through await support and will have a way
                        // to observe its start/complete behavior, either through returned
                        // Task objects or the async method builder's interaction with the
                        // SynchronizationContext object.
                        //
                        // In general though, Rx doesn't play nicely with synchronization
                        // contexts objects at the scheduler level. It's possible to start
                        // async operations by calling Schedule, without a way to observe
                        // their completion. Not interacting with SynchronizationContext
                        // is a concious design decision as the performance impact was non
                        // negligable and our schedulers abstract over more constructs.
                        //
                        ctx.Post(a => ((Action)a)(), original);
                    };
                }
            }

            var ran = 0;

            _continuation = () =>
            {
                if (Interlocked.Exchange(ref ran, 1) == 0)
                {
                    _ctr.Dispose();
                    continuation();
                }
            };

            _work = _schedule(_continuation);
        }

        private Action _continuation;
        private IDisposable _work;

        private void Cancel()
        {
            var w = _work;
            if (w != null)
                w.Dispose();

            var c = _continuation;
            if (c != null)
                c();
        }
    }
}
#endif