diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Windows.Threading')
10 files changed, 1500 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/GlobalSuppressions.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/GlobalSuppressions.cs new file mode 100644 index 0000000..951372c --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/GlobalSuppressions.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +// This file is used by Code Analysis to maintain SuppressMessage +// attributes that are applied to this project. +// Project-level suppressions either have no target or are given +// a specific target and scoped to a namespace, type, member, etc. +// +// To add a suppression to this file, right-click the message in the +// Error List, point to "Suppress Message(s)", and click +// "In Project Suppression File". +// You do not need to add suppressions to this file manually. + +[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1016:MarkAssembliesWithAssemblyVersion", Justification = "Taken care of by lab build.")] +[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA2210:AssembliesShouldHaveValidStrongNames", Justification = "Taken care of by lab build.")] diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Properties/AssemblyInfo.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..d11d58d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Properties/AssemblyInfo.cs @@ -0,0 +1,44 @@ +using System; +using System.Reflection; +using System.Resources; +using System.Runtime.InteropServices; +using System.Security; + +[assembly: AssemblyTitle("System.Reactive.Windows.Threading")] +#if !USE_SL_DISPATCHER +// Notice: same description as in the .nuspec files; see Source/Rx/Setup/NuGet +[assembly: AssemblyDescription("Windows Presentation Foundation extensions library for Rx. Contains scheduler functionality for the WPF Dispatcher.")] +#else +// Notice: same description as in the .nuspec files; see Source/Rx/Setup/NuGet +[assembly: AssemblyDescription("Silverlight extensions library for Rx. Contains scheduler functionality for the Silverlight Dispatcher.")] +#endif +#if DEBUG +[assembly: AssemblyConfiguration("Debug")] +#else +[assembly: AssemblyConfiguration("Retail")] +#endif +[assembly: AssemblyCompany("Microsoft Open Technologies, Inc.")] +[assembly: AssemblyProduct("Reactive Extensions")] +[assembly: AssemblyCopyright("\x00a9 Microsoft Open Technologies, Inc. All rights reserved.")] +[assembly: NeutralResourcesLanguage("en-US")] + +#if !PLIB +[assembly: ComVisible(false)] +#endif + +#if !WINDOWS +[assembly: CLSCompliant(true)] +#endif + +#if HAS_APTCA && NO_CODECOVERAGE +[assembly: AllowPartiallyTrustedCallers] +#endif + +#if XBOX_LAKEVIEW +[assembly: SecurityTransparent] +#endif + +// +// Note: Assembly (file) version numbers get inserted by the build system on the fly. Inspect the Team Build workflows +// and the custom activity in Build/Source/Activities/AppendVersionInfo.cs for more information. +// diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Concurrency/CoreDispatcherScheduler.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Concurrency/CoreDispatcherScheduler.cs new file mode 100644 index 0000000..098e2d1 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Concurrency/CoreDispatcherScheduler.cs @@ -0,0 +1,239 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWS +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Runtime.ExceptionServices; +using System.Threading; +using Windows.UI.Core; +using Windows.UI.Xaml; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on a Windows.UI.Core.CoreDispatcher. + /// </summary> + /// <remarks> + /// This scheduler type is typically used indirectly through the <see cref="System.Reactive.Linq.DispatcherObservable.ObserveOnDispatcher<TSource>(IObservable<TSource>)"/> and <see cref="System.Reactive.Linq.DispatcherObservable.SubscribeOnDispatcher<TSource>(IObservable<TSource>)"/> methods that use the current Dispatcher. + /// </remarks> + public sealed class CoreDispatcherScheduler : LocalScheduler, ISchedulerPeriodic + { + private readonly CoreDispatcher _dispatcher; + private readonly CoreDispatcherPriority _priority; + + /// <summary> + /// Constructs a CoreDispatcherScheduler that schedules units of work on the given Windows.UI.Core.CoreDispatcher. + /// </summary> + /// <param name="dispatcher">Dispatcher to schedule work on.</param> + /// <exception cref="ArgumentNullException"><paramref name="dispatcher"/> is null.</exception> + public CoreDispatcherScheduler(CoreDispatcher dispatcher) + { + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + _dispatcher = dispatcher; + _priority = CoreDispatcherPriority.Normal; + } + + /// <summary> + /// Constructs a CoreDispatcherScheduler that schedules units of work on the given Windows.UI.Core.CoreDispatcher with the given priority. + /// </summary> + /// <param name="dispatcher">Dispatcher to schedule work on.</param> + /// <param name="priority">Priority for scheduled units of work.</param> + /// <exception cref="ArgumentNullException"><paramref name="dispatcher"/> is null.</exception> + public CoreDispatcherScheduler(CoreDispatcher dispatcher, CoreDispatcherPriority priority) + { + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + _dispatcher = dispatcher; + _priority = priority; + } + + /// <summary> + /// Gets the scheduler that schedules work on the Windows.UI.Core.CoreDispatcher associated with the current Window. + /// </summary> + public static CoreDispatcherScheduler Current + { + get + { + var window = Window.Current; + if (window == null) + throw new InvalidOperationException(Strings_WindowsThreading.NO_WINDOW_CURRENT); + + return new CoreDispatcherScheduler(window.Dispatcher); + } + } + + /// <summary> + /// Gets the Windows.UI.Core.CoreDispatcher associated with the CoreDispatcherScheduler. + /// </summary> + public CoreDispatcher Dispatcher + { + get { return _dispatcher; } + } + + /// <summary> + /// Gets the priority at which work is scheduled. + /// </summary> + public CoreDispatcherPriority Priority + { + get { return _priority; } + } + + /// <summary> + /// Schedules an action to be executed on the dispatcher. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + var res = _dispatcher.RunAsync(_priority, () => + { + if (!d.IsDisposed) + { + try + { + d.Disposable = action(this, state); + } + catch (Exception ex) + { + // + // Work-around for the behavior of throwing from RunAsync not propagating + // the exception to the Application.UnhandledException event (as of W8RP) + // as our users have come to expect from previous XAML stacks using Rx. + // + // If we wouldn't do this, there'd be an observable behavioral difference + // between scheduling with TimeSpan.Zero or using this overload. + // + // For scheduler implementation guidance rules, see TaskPoolScheduler.cs + // in System.Reactive.PlatformServices\Reactive\Concurrency. + // + var timer = new DispatcherTimer(); + timer.Interval = TimeSpan.Zero; + timer.Tick += (o, e) => + { + timer.Stop(); + ExceptionDispatchInfo.Capture(ex).Throw(); + }; + + timer.Start(); + } + } + }); + + return new CompositeDisposable( + d, + Disposable.Create(res.Cancel) + ); + } + + /// <summary> + /// Schedules an action to be executed after dueTime on the dispatcher, using a Windows.UI.Xaml.DispatcherTimer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + + var d = new MultipleAssignmentDisposable(); + + var timer = new DispatcherTimer(); + + timer.Tick += (o, e) => + { + var t = Interlocked.Exchange(ref timer, null); + if (t != null) + { + try + { + d.Disposable = action(this, state); + } + finally + { + t.Stop(); + action = null; + } + } + }; + + timer.Interval = dt; + timer.Start(); + + d.Disposable = Disposable.Create(() => + { + var t = Interlocked.Exchange(ref timer, null); + if (t != null) + { + t.Stop(); + action = (_, __) => Disposable.Empty; + } + }); + + return d; + } + + /// <summary> + /// Schedules a periodic piece of work on the dispatcher, using a Windows.UI.Xaml.DispatcherTimer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + // + // According to MSDN documentation, the default is TimeSpan.Zero, so that's definitely valid. + // Empirical observation - negative values seem to be normalized to TimeSpan.Zero, but let's not go there. + // + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var timer = new DispatcherTimer(); + + var state1 = state; + + timer.Tick += (o, e) => + { + state1 = action(state1); + }; + + timer.Interval = period; + timer.Start(); + + return Disposable.Create(() => + { + var t = Interlocked.Exchange(ref timer, null); + if (t != null) + { + t.Stop(); + action = _ => _; + } + }); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Concurrency/DispatcherScheduler.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Concurrency/DispatcherScheduler.cs new file mode 100644 index 0000000..847aaf6 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Concurrency/DispatcherScheduler.cs @@ -0,0 +1,248 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !WINDOWS +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on a <see cref="System.Windows.Threading.Dispatcher"/>. + /// </summary> + /// <remarks> + /// This scheduler type is typically used indirectly through the <see cref="System.Reactive.Linq.DispatcherObservable.ObserveOnDispatcher<TSource>(IObservable<TSource>)"/> and <see cref="System.Reactive.Linq.DispatcherObservable.SubscribeOnDispatcher<TSource>(IObservable<TSource>)"/> methods that use the Dispatcher on the calling thread. + /// </remarks> + public class DispatcherScheduler : LocalScheduler, ISchedulerPeriodic + { + /// <summary> + /// Gets the scheduler that schedules work on the current <see cref="System.Windows.Threading.Dispatcher"/>. + /// </summary> + [Obsolete(Constants_WindowsThreading.OBSOLETE_INSTANCE_PROPERTY)] + public static DispatcherScheduler Instance + { + get + { + return new DispatcherScheduler( +#if USE_SL_DISPATCHER + System.Windows.Deployment.Current.Dispatcher +#else + System.Windows.Threading.Dispatcher.CurrentDispatcher +#endif + ); + } + } + + /// <summary> + /// Gets the scheduler that schedules work on the <see cref="System.Windows.Threading.Dispatcher"/> for the current thread. + /// </summary> + public static DispatcherScheduler Current + { + get + { +#if USE_SL_DISPATCHER + return new DispatcherScheduler(System.Windows.Deployment.Current.Dispatcher); +#else + var dispatcher = System.Windows.Threading.Dispatcher.FromThread(Thread.CurrentThread); + if (dispatcher == null) + throw new InvalidOperationException(Strings_WindowsThreading.NO_DISPATCHER_CURRENT_THREAD); + + return new DispatcherScheduler(dispatcher); +#endif + } + } + + System.Windows.Threading.Dispatcher _dispatcher; + +#if HAS_DISPATCHER_PRIORITY + System.Windows.Threading.DispatcherPriority _priority; +#endif + + /// <summary> + /// Constructs a DispatcherScheduler that schedules units of work on the given <see cref="System.Windows.Threading.Dispatcher"/>. + /// </summary> + /// <param name="dispatcher">Dispatcher to schedule work on.</param> + /// <exception cref="ArgumentNullException"><paramref name="dispatcher"/> is null.</exception> + public DispatcherScheduler(System.Windows.Threading.Dispatcher dispatcher) + { + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + _dispatcher = dispatcher; +#if HAS_DISPATCHER_PRIORITY + _priority = Windows.Threading.DispatcherPriority.Normal; +#endif + } + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Constructs a DispatcherScheduler that schedules units of work on the given <see cref="System.Windows.Threading.Dispatcher"/> at the given priority. + /// </summary> + /// <param name="dispatcher">Dispatcher to schedule work on.</param> + /// <param name="priority">Priority at which units of work are scheduled.</param> + /// <exception cref="ArgumentNullException"><paramref name="dispatcher"/> is null.</exception> + public DispatcherScheduler(System.Windows.Threading.Dispatcher dispatcher, System.Windows.Threading.DispatcherPriority priority) + { + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + _dispatcher = dispatcher; + _priority = priority; + } +#endif + + /// <summary> + /// Gets the <see cref="System.Windows.Threading.Dispatcher"/> associated with the DispatcherScheduler. + /// </summary> + public System.Windows.Threading.Dispatcher Dispatcher + { + get { return _dispatcher; } + } + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Gets the priority at which work items will be dispatched. + /// </summary> + public System.Windows.Threading.DispatcherPriority Priority + { + get { return _priority; } + } +#endif + + /// <summary> + /// Schedules an action to be executed on the dispatcher. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + _dispatcher.BeginInvoke( + new Action(() => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }) +#if HAS_DISPATCHER_PRIORITY + , _priority +#endif + ); + + return d; + } + + /// <summary> + /// Schedules an action to be executed after dueTime on the dispatcher, using a <see cref="System.Windows.Threading.DispatcherTimer"/> object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + + var d = new MultipleAssignmentDisposable(); + + var timer = new System.Windows.Threading.DispatcherTimer( +#if HAS_DISPATCHER_PRIORITY + _priority, _dispatcher +#elif DESKTOPCLR40 // BACKWARDS COMPATIBILITY with v1.x + System.Windows.Threading.DispatcherPriority.Background, _dispatcher +#endif + ); + + timer.Tick += (s, e) => + { + var t = Interlocked.Exchange(ref timer, null); + if (t != null) + { + try + { + d.Disposable = action(this, state); + } + finally + { + t.Stop(); + action = null; + } + } + }; + + timer.Interval = dt; + timer.Start(); + + d.Disposable = Disposable.Create(() => + { + var t = Interlocked.Exchange(ref timer, null); + if (t != null) + { + t.Stop(); + action = (_, __) => Disposable.Empty; + } + }); + + return d; + } + + /// <summary> + /// Schedules a periodic piece of work on the dispatcher, using a <see cref="System.Windows.Threading.DispatcherTimer"/> object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var timer = new System.Windows.Threading.DispatcherTimer( +#if HAS_DISPATCHER_PRIORITY + _priority, _dispatcher +#elif DESKTOPCLR40 // BACKWARDS COMPATIBILITY with v1.x + System.Windows.Threading.DispatcherPriority.Background, _dispatcher +#endif + ); + + var state1 = state; + + timer.Tick += (s, e) => + { + state1 = action(state1); + }; + + timer.Interval = period; + timer.Start(); + + return Disposable.Create(() => + { + var t = Interlocked.Exchange(ref timer, null); + if (t != null) + { + t.Stop(); + action = _ => _; + } + }); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Internal/Constants.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Internal/Constants.cs new file mode 100644 index 0000000..4d22eba --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Internal/Constants.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + // We can't make those based on the Strings_WindowsThreading.resx file, because the ObsoleteAttribute needs a compile-time constant. + + static class Constants_WindowsThreading + { +#if !WINDOWS + public const string OBSOLETE_INSTANCE_PROPERTY = "Use the Current property to retrieve the DispatcherScheduler instance for the current thread's Dispatcher object. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; +#endif + } +} diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Linq/CoreDispatcherObservable.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Linq/CoreDispatcherObservable.cs new file mode 100644 index 0000000..d68039c --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Linq/CoreDispatcherObservable.cs @@ -0,0 +1,258 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWS +using System.Reactive.Concurrency; +using Windows.UI.Core; +using Windows.UI.Xaml; + +namespace System.Reactive.Linq +{ + /// <summary> + /// Provides a set of extension methods for scheduling actions performed through observable sequences on UI dispatchers. + /// </summary> + public static class DispatcherObservable + { + #region ObserveOn[Dispatcher] + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to notify observers on.</param> + /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, CoreDispatcher dispatcher) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return Synchronization.ObserveOn(source, new CoreDispatcherScheduler(dispatcher)); + } + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to notify observers on.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, CoreDispatcher dispatcher, CoreDispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return Synchronization.ObserveOn(source, new CoreDispatcherScheduler(dispatcher, priority)); + } + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dependencyObject">Object to get the dispatcher from.</param> + /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dependencyObject == null) + throw new ArgumentNullException("dependencyObject"); + + return Synchronization.ObserveOn(source, new CoreDispatcherScheduler(dependencyObject.Dispatcher)); + } + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dependencyObject">Object to get the dispatcher from.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject, CoreDispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dependencyObject == null) + throw new ArgumentNullException("dependencyObject"); + + return Synchronization.ObserveOn(source, new CoreDispatcherScheduler(dependencyObject.Dispatcher, priority)); + } + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current window. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <returns>The source sequence whose observations happen on the current window's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source) + { + if (source == null) + throw new ArgumentNullException("source"); + + return Synchronization.ObserveOn(source, CoreDispatcherScheduler.Current); + } + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current window. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the current window's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source, CoreDispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + + return Synchronization.ObserveOn(source, new CoreDispatcherScheduler(CoreDispatcherScheduler.Current.Dispatcher, priority)); + } + + #endregion + + #region SubscribeOn[Dispatcher] + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to perform subscription and unsubscription actions on.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher. + /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, CoreDispatcher)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, CoreDispatcher dispatcher) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return Synchronization.SubscribeOn(source, new CoreDispatcherScheduler(dispatcher)); + } + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to perform subscription and unsubscription actions on.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher. + /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, CoreDispatcher, CoreDispatcherPriority)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, CoreDispatcher dispatcher, CoreDispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return Synchronization.SubscribeOn(source, new CoreDispatcherScheduler(dispatcher, priority)); + } + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dependencyObject">Object to get the dispatcher from.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object. + /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DependencyObject)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dependencyObject == null) + throw new ArgumentNullException("dependencyObject"); + + return Synchronization.SubscribeOn(source, new CoreDispatcherScheduler(dependencyObject.Dispatcher)); + } + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dependencyObject">Object to get the dispatcher from.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object. + /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DependencyObject, CoreDispatcherPriority)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject, CoreDispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dependencyObject == null) + throw new ArgumentNullException("dependencyObject"); + + return Synchronization.SubscribeOn(source, new CoreDispatcherScheduler(dependencyObject.Dispatcher, priority)); + } + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current window. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the current window's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current window. + /// In order to invoke observer callbacks on the dispatcher associated with the current window, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource})"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source) + { + if (source == null) + throw new ArgumentNullException("source"); + + return Synchronization.SubscribeOn(source, CoreDispatcherScheduler.Current); + } + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current window. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the current window's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current window. + /// In order to invoke observer callbacks on the dispatcher associated with the current window, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource}, CoreDispatcherPriority)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source, CoreDispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + + return Synchronization.SubscribeOn(source, new CoreDispatcherScheduler(CoreDispatcherScheduler.Current.Dispatcher, priority)); + } + + #endregion + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Linq/DispatcherObservable.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Linq/DispatcherObservable.cs new file mode 100644 index 0000000..8510179 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Reactive/Linq/DispatcherObservable.cs @@ -0,0 +1,390 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !WINDOWS +using System.Reactive.Concurrency; +using System.Windows; +using System.Windows.Threading; + +namespace System.Reactive.Linq +{ + /// <summary> + /// Provides a set of extension methods for scheduling actions performed through observable sequences on UI dispatchers. + /// </summary> + public static class DispatcherObservable + { + #region ObserveOn[Dispatcher] + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to to notify observers on.</param> + /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return ObserveOn_<TSource>(source, dispatcher); + } + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to to notify observers on.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return ObserveOn_<TSource>(source, dispatcher, priority); + } +#endif + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified dispatcher scheduler. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="scheduler">Dispatcher scheduler to notify observers on.</param> + /// <returns>The source sequence whose observations happen on the specified dispatcher scheduler.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherScheduler scheduler) + { + if (source == null) + throw new ArgumentNullException("source"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + +#if HAS_DISPATCHER_PRIORITY + return ObserveOn_<TSource>(source, scheduler.Dispatcher, scheduler.Priority); +#else + return ObserveOn_<TSource>(source, scheduler.Dispatcher); +#endif + } + +#if USE_SL_DISPATCHER + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dependencyObject">Object to get the dispatcher from.</param> + /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dependencyObject == null) + throw new ArgumentNullException("dependencyObject"); + + return ObserveOn_<TSource>(source, dependencyObject.Dispatcher); + } +#else + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcherObject">Object to get the dispatcher from.</param> + /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcherObject == null) + throw new ArgumentNullException("dispatcherObject"); + + return ObserveOn_<TSource>(source, dispatcherObject.Dispatcher); + } +#endif + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcherObject">Object to get the dispatcher from.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject, DispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcherObject == null) + throw new ArgumentNullException("dispatcherObject"); + + return ObserveOn_<TSource>(source, dispatcherObject.Dispatcher, priority); + } +#endif + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current thread. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source) + { + if (source == null) + throw new ArgumentNullException("source"); + +#if USE_SL_DISPATCHER + return ObserveOn_<TSource>(source, System.Windows.Deployment.Current.Dispatcher); +#else + return ObserveOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher); +#endif + } + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the dispatcher associated with the current thread. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + public static IObservable<TSource> ObserveOnDispatcher<TSource>(this IObservable<TSource> source, DispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + + return ObserveOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher, priority); + } + + private static IObservable<TSource> ObserveOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority) + { + return Synchronization.ObserveOn(source, new DispatcherSynchronizationContext(dispatcher, priority)); + } +#endif + + private static IObservable<TSource> ObserveOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher) + { + return Synchronization.ObserveOn(source, new DispatcherSynchronizationContext(dispatcher)); + } + + #endregion + + #region SubscribeOn[Dispatcher] + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to to perform subscription and unsubscription actions on.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher. + /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, Dispatcher)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return SubscribeOn_<TSource>(source, dispatcher); + } + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcher">Dispatcher whose associated message loop is used to to perform subscription and unsubscription actions on.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcher"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified dispatcher. + /// In order to invoke observer callbacks on the specified dispatcher, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, Dispatcher, DispatcherPriority)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcher == null) + throw new ArgumentNullException("dispatcher"); + + return SubscribeOn_<TSource>(source, dispatcher, priority); + } +#endif + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified dispatcher scheduler. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="scheduler">Dispatcher scheduler to perform subscription and unsubscription actions on.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified dispatcher scheduler.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler. + /// In order to invoke observer callbacks on the specified scheduler, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherScheduler)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherScheduler scheduler) + { + if (source == null) + throw new ArgumentNullException("source"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + +#if HAS_DISPATCHER_PRIORITY + return SubscribeOn_<TSource>(source, scheduler.Dispatcher, scheduler.Priority); +#else + return SubscribeOn_<TSource>(source, scheduler.Dispatcher); +#endif + } + +#if USE_SL_DISPATCHER + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dependencyObject">Object to get the dispatcher from.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dependencyObject"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object. + /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DependencyObject)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DependencyObject dependencyObject) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dependencyObject == null) + throw new ArgumentNullException("dependencyObject"); + + return SubscribeOn_<TSource>(source, dependencyObject.Dispatcher); + } +#else + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcherObject">Object to get the dispatcher from.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object. + /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherObject)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcherObject == null) + throw new ArgumentNullException("dispatcherObject"); + + return SubscribeOn_<TSource>(source, dispatcherObject.Dispatcher); + } +#endif + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the specified object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="dispatcherObject">Object to get the dispatcher from.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified object's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="dispatcherObject"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the specified object. + /// In order to invoke observer callbacks on the dispatcher associated with the specified object, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOn{TSource}(IObservable{TSource}, DispatcherObject, DispatcherPriority)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, DispatcherObject dispatcherObject, DispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + if (dispatcherObject == null) + throw new ArgumentNullException("dispatcherObject"); + + return SubscribeOn_<TSource>(source, dispatcherObject.Dispatcher, priority); + } +#endif + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current thread. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the current thread's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current thread. + /// In order to invoke observer callbacks on the dispatcher associated with the current thread, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource})"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source) + { + if (source == null) + throw new ArgumentNullException("source"); + +#if USE_SL_DISPATCHER + return SubscribeOn_<TSource>(source, System.Windows.Deployment.Current.Dispatcher); +#else + return SubscribeOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher); +#endif + } + +#if HAS_DISPATCHER_PRIORITY + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the dispatcher associated with the current thread. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="priority">Priority to schedule work items at.</param> + /// <returns>The source sequence whose observations happen on the current thread's dispatcher.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the dispatcher associated with the current thread. + /// In order to invoke observer callbacks on the dispatcher associated with the current thread, e.g. to render results in a control, use <see cref="DispatcherObservable.ObserveOnDispatcher{TSource}(IObservable{TSource}, DispatcherPriority)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOnDispatcher<TSource>(this IObservable<TSource> source, DispatcherPriority priority) + { + if (source == null) + throw new ArgumentNullException("source"); + + return SubscribeOn_<TSource>(source, DispatcherScheduler.Current.Dispatcher, priority); + } + + private static IObservable<TSource> SubscribeOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher, DispatcherPriority priority) + { + return Synchronization.SubscribeOn(source, new DispatcherSynchronizationContext(dispatcher, priority)); + } +#endif + + private static IObservable<TSource> SubscribeOn_<TSource>(IObservable<TSource> source, Dispatcher dispatcher) + { + return Synchronization.SubscribeOn(source, new DispatcherSynchronizationContext(dispatcher)); + } + + #endregion + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Strings_WindowsThreading.Generated.cs b/Rx/NET/Source/System.Reactive.Windows.Threading/Strings_WindowsThreading.Generated.cs new file mode 100644 index 0000000..0ba08cc --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Strings_WindowsThreading.Generated.cs @@ -0,0 +1,106 @@ +/* + * This file is manually generated upon every change to the corresponding .resx file, using the built-in code generator. + * However, we can't use auto-generated code because the output is different for .NET 4.5 for Metro and PLIB, due to + * changes in the reflection APIs (search for CRIPPLED_REFLECTION in this file), and because regeneration doesn't happen + * as part of build, but rather at design time in Visual Studio. While we likely could tweak MSBuild to force regeneration + * using the right version of ResGen.exe, this approach turned out to be the easiest for the time being. + * + * Upon adding entries to the ResX file, regenerate this file, put this comment back, and make sure to add the #if checks + * for conditional use of the right flavor of reflection (using the CRIPPLED_REFLECTION defined symbol). + */ + +// Required for the use of the GetTypeInfo extension method. +#if CRIPPLED_REFLECTION +using System.Reflection; +#endif + +// GENERATED CODE starts here + +namespace System.Reactive +{ + using System; + + + /// <summary> + /// A strongly-typed resource class, for looking up localized strings, etc. + /// </summary> + // This class was auto-generated by the StronglyTypedResourceBuilder + // class via a tool like ResGen or Visual Studio. + // To add or remove a member, edit your .ResX file then rerun ResGen + // with the /str option, or rebuild your VS project. + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")] + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] + internal class Strings_WindowsThreading + { + + private static global::System.Resources.ResourceManager resourceMan; + + private static global::System.Globalization.CultureInfo resourceCulture; + + [global::System.Diagnostics.CodeAnalysis.SuppressMessageAttribute("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] + internal Strings_WindowsThreading() + { + } + + /// <summary> + /// Returns the cached ResourceManager instance used by this class. + /// </summary> + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Resources.ResourceManager ResourceManager + { + get + { + if (object.ReferenceEquals(resourceMan, null)) + { +#if CRIPPLED_REFLECTION + global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("System.Reactive.Strings_WindowsThreading", typeof(Strings_WindowsThreading).GetTypeInfo().Assembly); +#else + global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("System.Reactive.Strings_WindowsThreading", typeof(Strings_WindowsThreading).Assembly); +#endif + resourceMan = temp; + } + return resourceMan; + } + } + + /// <summary> + /// Overrides the current thread's CurrentUICulture property for all + /// resource lookups using this strongly typed resource class. + /// </summary> + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Globalization.CultureInfo Culture + { + get + { + return resourceCulture; + } + set + { + resourceCulture = value; + } + } + + /// <summary> + /// Looks up a localized string similar to The current thread has no Dispatcher associated with it.. + /// </summary> + internal static string NO_DISPATCHER_CURRENT_THREAD + { + get + { + return ResourceManager.GetString("NO_DISPATCHER_CURRENT_THREAD", resourceCulture); + } + } + + /// <summary> + /// Looks up a localized string similar to No current Window object found to obtain a CoreDispatcher from.. + /// </summary> + internal static string NO_WINDOW_CURRENT + { + get + { + return ResourceManager.GetString("NO_WINDOW_CURRENT", resourceCulture); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/Strings_WindowsThreading.resx b/Rx/NET/Source/System.Reactive.Windows.Threading/Strings_WindowsThreading.resx new file mode 100644 index 0000000..85f5101 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/Strings_WindowsThreading.resx @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="utf-8"?> +<root> + <!-- + Microsoft ResX Schema + + Version 2.0 + + The primary goals of this format is to allow a simple XML format + that is mostly human readable. The generation and parsing of the + various data types are done through the TypeConverter classes + associated with the data types. + + Example: + + ... ado.net/XML headers & schema ... + <resheader name="resmimetype">text/microsoft-resx</resheader> + <resheader name="version">2.0</resheader> + <resheader name="reader">System.Resources.ResXResourceReader, System.Windows.Forms, ...</resheader> + <resheader name="writer">System.Resources.ResXResourceWriter, System.Windows.Forms, ...</resheader> + <data name="Name1"><value>this is my long string</value><comment>this is a comment</comment></data> + <data name="Color1" type="System.Drawing.Color, System.Drawing">Blue</data> + <data name="Bitmap1" mimetype="application/x-microsoft.net.object.binary.base64"> + <value>[base64 mime encoded serialized .NET Framework object]</value> + </data> + <data name="Icon1" type="System.Drawing.Icon, System.Drawing" mimetype="application/x-microsoft.net.object.bytearray.base64"> + <value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value> + <comment>This is a comment</comment> + </data> + + There are any number of "resheader" rows that contain simple + name/value pairs. + + Each data row contains a name, and value. The row also contains a + type or mimetype. Type corresponds to a .NET class that support + text/value conversion through the TypeConverter architecture. + Classes that don't support this are serialized and stored with the + mimetype set. + + The mimetype is used for serialized objects, and tells the + ResXResourceReader how to depersist the object. This is currently not + extensible. For a given mimetype the value must be set accordingly: + + Note - application/x-microsoft.net.object.binary.base64 is the format + that the ResXResourceWriter will generate, however the reader can + read any of the formats listed below. + + mimetype: application/x-microsoft.net.object.binary.base64 + value : The object must be serialized with + : System.Runtime.Serialization.Formatters.Binary.BinaryFormatter + : and then encoded with base64 encoding. + + mimetype: application/x-microsoft.net.object.soap.base64 + value : The object must be serialized with + : System.Runtime.Serialization.Formatters.Soap.SoapFormatter + : and then encoded with base64 encoding. + + mimetype: application/x-microsoft.net.object.bytearray.base64 + value : The object must be serialized into a byte array + : using a System.ComponentModel.TypeConverter + : and then encoded with base64 encoding. + --> + <xsd:schema id="root" xmlns="" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:msdata="urn:schemas-microsoft-com:xml-msdata"> + <xsd:import namespace="http://www.w3.org/XML/1998/namespace" /> + <xsd:element name="root" msdata:IsDataSet="true"> + <xsd:complexType> + <xsd:choice maxOccurs="unbounded"> + <xsd:element name="metadata"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="value" type="xsd:string" minOccurs="0" /> + </xsd:sequence> + <xsd:attribute name="name" use="required" type="xsd:string" /> + <xsd:attribute name="type" type="xsd:string" /> + <xsd:attribute name="mimetype" type="xsd:string" /> + <xsd:attribute ref="xml:space" /> + </xsd:complexType> + </xsd:element> + <xsd:element name="assembly"> + <xsd:complexType> + <xsd:attribute name="alias" type="xsd:string" /> + <xsd:attribute name="name" type="xsd:string" /> + </xsd:complexType> + </xsd:element> + <xsd:element name="data"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" /> + <xsd:element name="comment" type="xsd:string" minOccurs="0" msdata:Ordinal="2" /> + </xsd:sequence> + <xsd:attribute name="name" type="xsd:string" use="required" msdata:Ordinal="1" /> + <xsd:attribute name="type" type="xsd:string" msdata:Ordinal="3" /> + <xsd:attribute name="mimetype" type="xsd:string" msdata:Ordinal="4" /> + <xsd:attribute ref="xml:space" /> + </xsd:complexType> + </xsd:element> + <xsd:element name="resheader"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" /> + </xsd:sequence> + <xsd:attribute name="name" type="xsd:string" use="required" /> + </xsd:complexType> + </xsd:element> + </xsd:choice> + </xsd:complexType> + </xsd:element> + </xsd:schema> + <resheader name="resmimetype"> + <value>text/microsoft-resx</value> + </resheader> + <resheader name="version"> + <value>2.0</value> + </resheader> + <resheader name="reader"> + <value>System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value> + </resheader> + <resheader name="writer"> + <value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value> + </resheader> + <data name="NO_DISPATCHER_CURRENT_THREAD" xml:space="preserve"> + <value>The current thread has no Dispatcher associated with it.</value> + <comment>Only on WPF/SL.</comment> + </data> + <data name="NO_WINDOW_CURRENT" xml:space="preserve"> + <value>No current Window object found to obtain a CoreDispatcher from.</value> + <comment>Only on Jupiter.</comment> + </data> +</root>
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Windows.Threading/System.Reactive.Windows.Threading.csproj b/Rx/NET/Source/System.Reactive.Windows.Threading/System.Reactive.Windows.Threading.csproj new file mode 100644 index 0000000..def5e4b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Windows.Threading/System.Reactive.Windows.Threading.csproj @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProductVersion>8.0.30703</ProductVersion> + <SchemaVersion>2.0</SchemaVersion> + <ProjectGuid>{2F7D32BD-5BFC-45D4-9899-F1A76DB32FCB}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>System.Reactive</RootNamespace> + <AssemblyName>System.Reactive.Windows.Threading</AssemblyName> + <TargetFrameworkVersion>v4.0</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <ProductSignAssembly>true</ProductSignAssembly> + <CodeAnalysisRuleSet>..\Rx.ruleset</CodeAnalysisRuleSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'ReleaseXBLV|AnyCPU'"> + <OutputPath>bin\ReleaseXBLV\</OutputPath> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'DebugXBLV|AnyCPU'"> + <OutputPath>bin\DebugXBLV\</OutputPath> + </PropertyGroup> + <Import Project="..\Common.targets" /> + <PropertyGroup> + <DocumentationFile>$(OutputPath)\$(AssemblyName).XML</DocumentationFile> + </PropertyGroup> + <ItemGroup> + <Reference Include="mscorlib" Condition=" '$(BuildPlatform)' == 'SILVERLIGHT' " /> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Windows" Condition=" '$(BuildPlatform)' == 'SILVERLIGHT' " /> + <Reference Include="WindowsBase" Condition=" '$(BuildPlatform)' == 'DESKTOPCLR' " /> + <Reference Include="System.Observable" Condition=" '$(BuildFlavor)' == 'SILVERLIGHTM7' " /> + </ItemGroup> + <ItemGroup> + <Compile Include="GlobalSuppressions.cs" /> + <Compile Include="Reactive\Concurrency\CoreDispatcherScheduler.cs" /> + <Compile Include="Reactive\Internal\Constants.cs" /> + <Compile Include="Reactive\Linq\CoreDispatcherObservable.cs" /> + <Compile Include="Reactive\Linq\DispatcherObservable.cs" /> + <Compile Include="Reactive\Concurrency\DispatcherScheduler.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Strings_WindowsThreading.Generated.cs" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\System.Reactive.Core\System.Reactive.Core.csproj"> + <Project>{4E516F10-DA7A-4D43-963E-A93865ABEA5B}</Project> + <Name>System.Reactive.Core</Name> + </ProjectReference> + <ProjectReference Include="..\System.Reactive.Interfaces\System.Reactive.Interfaces.csproj"> + <Project>{9E9B9C60-98B0-40FA-9C2B-1218D417CAA4}</Project> + <Name>System.Reactive.Interfaces</Name> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <EmbeddedResource Include="Strings_WindowsThreading.resx" /> + </ItemGroup> + <Import Project="..\Import.targets" /> +</Project>
\ No newline at end of file |