Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs')
-rw-r--r--Rx.NET/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs155
1 files changed, 155 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs
new file mode 100644
index 0000000..e14df19
--- /dev/null
+++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs
@@ -0,0 +1,155 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Reactive.Concurrency;
+using System.Threading;
+
+namespace System.Reactive.Linq
+{
+ public static partial class Observable
+ {
+ #region + ObserveOn +
+
+ /// <summary>
+ /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence.</param>
+ /// <param name="scheduler">Scheduler to notify observers on.</param>
+ /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
+ /// <remarks>
+ /// This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
+ /// that require to be run on a scheduler, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, IScheduler)"/>.
+ /// </remarks>
+ public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (scheduler == null)
+ throw new ArgumentNullException("scheduler");
+
+ return s_impl.ObserveOn<TSource>(source, scheduler);
+ }
+
+#if !NO_SYNCCTX
+ /// <summary>
+ /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence.</param>
+ /// <param name="context">Synchronization context to notify observers on.</param>
+ /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
+ /// <remarks>
+ /// This only invokes observer callbacks on a synchronization context. In case the subscription and/or unsubscription actions have side-effects
+ /// that require to be run on a synchronization context, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
+ /// </remarks>
+ public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (context == null)
+ throw new ArgumentNullException("context");
+
+ return s_impl.ObserveOn<TSource>(source, context);
+ }
+#endif
+
+ #endregion
+
+ #region + SubscribeOn +
+
+ /// <summary>
+ /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
+ /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence.</param>
+ /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
+ /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
+ /// <remarks>
+ /// This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
+ /// callbacks on a scheduler, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
+ /// </remarks>
+ public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (scheduler == null)
+ throw new ArgumentNullException("scheduler");
+
+ return s_impl.SubscribeOn<TSource>(source, scheduler);
+ }
+
+#if !NO_SYNCCTX
+ /// <summary>
+ /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used;
+ /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence.</param>
+ /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
+ /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
+ /// <remarks>
+ /// This only performs the side-effects of subscription and unsubscription on the specified synchronization context. In order to invoke observer
+ /// callbacks on a synchronization context, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
+ /// </remarks>
+ public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (context == null)
+ throw new ArgumentNullException("context");
+
+ return s_impl.SubscribeOn<TSource>(source, context);
+ }
+#endif
+
+ #endregion
+
+ #region + Synchronize +
+
+ /// <summary>
+ /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently.
+ /// This overload is useful to "fix" an observable sequence that exhibits concurrent callbacks on individual observers, which is invalid behavior for the query processor.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence.</param>
+ /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+ /// <remarks>
+ /// It's invalid behavior - according to the observer grammar - for a sequence to exhibit concurrent callbacks on a given observer.
+ /// This operator can be used to "fix" a source that doesn't conform to this rule.
+ /// </remarks>
+ public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+
+ return s_impl.Synchronize<TSource>(source);
+ }
+
+ /// <summary>
+ /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently, using the specified gate object.
+ /// This overload is useful when writing n-ary query operators, in order to prevent concurrent callbacks from different sources by synchronizing on a common gate object.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence.</param>
+ /// <param name="gate">Gate object to synchronize each observer call on.</param>
+ /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
+ public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source, object gate)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (gate == null)
+ throw new ArgumentNullException("gate");
+
+ return s_impl.Synchronize<TSource>(source, gate);
+ }
+
+ #endregion
+ }
+}