Welcome to mirror list, hosted at ThFree Co, Russian Federation.

Synchronization.cs « Concurrency « Reactive « System.Reactive.Core « Rx.NET - github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: d923912a96cf3715db5c86c96841189170a77c2a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

using System;
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Concurrency
{
    /// <summary>
    /// Provides basic synchronization and scheduling services for observable sequences.
    /// </summary>
    [EditorBrowsable(EditorBrowsableState.Advanced)]
    public static class Synchronization
    {
        #region SubscribeOn

        /// <summary>
        /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
        /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
        /// <remarks>
        /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
        /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
        /// </remarks>
        public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");

            return new AnonymousObservable<TSource>(observer =>
            {
                var m = new SingleAssignmentDisposable();
                var d = new SerialDisposable();
                d.Disposable = m;

                m.Disposable = scheduler.Schedule(() =>
                {
                    d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
                });

                return d;
            });
        }

#if !NO_SYNCCTX
        /// <summary>
        /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
        /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
        /// <remarks>
        /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
        /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
        /// </remarks>
        public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (context == null)
                throw new ArgumentNullException("context");

            return new AnonymousObservable<TSource>(observer =>
            {
                var subscription = new SingleAssignmentDisposable();
                context.PostWithStartComplete(() =>
                {
                    if (!subscription.IsDisposed)
                        subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
                });
                return subscription;
            });
        }
#endif

        #endregion

        #region ObserveOn

        /// <summary>
        /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="scheduler">Scheduler to notify observers on.</param>
        /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
        public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");

#if !NO_PERF
            return new ObserveOn<TSource>(source, scheduler);
#else
            return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null)));
#endif
        }

#if !NO_SYNCCTX
        /// <summary>
        /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="context">Synchronization context to notify observers on.</param>
        /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
        public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (context == null)
                throw new ArgumentNullException("context");

#if !NO_PERF
            return new ObserveOn<TSource>(source, context);
#else
            return new AnonymousObservable<TSource>(observer =>
            {
                context.OperationStarted();

                return source.Subscribe(
                    x => context.Post(_ =>
                    {
                        observer.OnNext(x);
                    }, null),
                    exception => context.Post(_ =>
                    {
                        observer.OnError(exception);
                    }, null),
                    () => context.Post(_ =>
                    {
                        observer.OnCompleted();
                    }, null)
                ).Finally(() =>
                {
                    context.OperationCompleted();
                });
            });
#endif
        }
#endif

        #endregion

        #region Synchronize

        /// <summary>
        /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
        public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
        {
            if (source == null)
                throw new ArgumentNullException("source");

#if !NO_PERF
            return new Synchronize<TSource>(source);
#else
            return new AnonymousObservable<TSource>(observer =>
            {
                var gate = new object();
                return source.Subscribe(Observer.Synchronize(observer, gate));
            });
#endif
        }

        /// <summary>
        /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="gate">Gate object to synchronize each observer call on.</param>
        /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
        public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (gate == null)
                throw new ArgumentNullException("gate");

#if !NO_PERF
            return new Synchronize<TSource>(source, gate);
#else
            return new AnonymousObservable<TSource>(observer =>
            {
                return source.Subscribe(Observer.Synchronize(observer, gate));
            });
#endif
        }

        #endregion
    }
}