diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.PlatformServices/Reactive')
16 files changed, 2433 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs new file mode 100644 index 0000000..1d5c0ae --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_THREAD && WINDOWS +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer + { + public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) + { + var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( + tpt => + { + action(state); + }, + Normalize(dueTime) + ); + + return Disposable.Create(res.Cancel); + } + + public IDisposable StartPeriodicTimer(Action action, TimeSpan period) + { + // + // The WinRT thread pool is based on the Win32 thread pool and cannot handle + // sub-1ms resolution. When passing a lower period, we get single-shot + // timer behavior instead. See MSDN documentation for CreatePeriodicTimer + // for more information. + // + if (period < TimeSpan.FromMilliseconds(1)) + throw new ArgumentOutOfRangeException("period", Strings_PlatformServices.WINRT_NO_SUB1MS_TIMERS); + + var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer( + tpt => + { + action(); + }, + period + ); + + return Disposable.Create(res.Cancel); + } + + public IDisposable QueueUserWorkItem(Action<object> action, object state) + { + var res = global::Windows.System.Threading.ThreadPool.RunAsync(iaa => + { + action(state); + }); + + return Disposable.Create(res.Cancel); + } + + public void Sleep(TimeSpan timeout) + { + var e = new ManualResetEventSlim(); + + global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( + tpt => + { + e.Set(); + }, + Normalize(timeout) + ); + + e.Wait(); + } + + public IStopwatch StartStopwatch() + { +#if !NO_STOPWATCH + return new StopwatchImpl(); +#else + return new DefaultStopwatch(); +#endif + } + + public bool SupportsLongRunning + { + get { return false; } + } + + public void StartThread(Action<object> action, object state) + { + throw new NotSupportedException(); + } + + private TimeSpan Normalize(TimeSpan dueTime) + { + if (dueTime < TimeSpan.Zero) + return TimeSpan.Zero; + + return dueTime; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs new file mode 100644 index 0000000..4c91463 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs @@ -0,0 +1,371 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_THREAD +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer + { + public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) + { + return new Timer(action, state, Normalize(dueTime)); + } + + public IDisposable StartPeriodicTimer(Action action, TimeSpan period) + { + // + // MSDN documentation states the following: + // + // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once; + // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method." + // + if (period <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + + return new PeriodicTimer(action, period); + } + + public IDisposable QueueUserWorkItem(Action<object> action, object state) + { + System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state); + return Disposable.Empty; + } + +#if USE_SLEEP_MS + public void Sleep(TimeSpan timeout) + { + System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds); + } +#else + public void Sleep(TimeSpan timeout) + { + System.Threading.Thread.Sleep(Normalize(timeout)); + } +#endif + + public IStopwatch StartStopwatch() + { +#if !NO_STOPWATCH + return new StopwatchImpl(); +#else + return new DefaultStopwatch(); +#endif + } + + public bool SupportsLongRunning + { + get { return true; } + } + + public void StartThread(Action<object> action, object state) + { + new Thread(() => + { + action(state); + }) { IsBackground = true }.Start(); + } + + private static TimeSpan Normalize(TimeSpan dueTime) + { + if (dueTime < TimeSpan.Zero) + return TimeSpan.Zero; + + return dueTime; + } + +#if USE_TIMER_SELF_ROOT + + // + // Some historical context. In the early days of Rx, we discovered an issue with + // the rooting of timers, causing them to get GC'ed even when the IDisposable of + // a scheduled activity was kept alive. The original code simply created a timer + // as follows: + // + // var t = default(Timer); + // t = new Timer(_ => + // { + // t = null; + // Debug.WriteLine("Hello!"); + // }, null, 5000, Timeout.Infinite); + // + // IIRC the reference to "t" captured by the closure wasn't sufficient on .NET CF + // to keep the timer rooted, causing problems on Windows Phone 7. As a result, we + // added rooting code using a dictionary (SD 7280), which we carried forward all + // the way to Rx v2.0 RTM. + // + // However, the desktop CLR's implementation of System.Threading.Timer exhibits + // other characteristics where a timer can root itself when the timer is still + // reachable through the state or callback parameters. To illustrate this, run + // the following piece of code: + // + // static void Main() + // { + // Bar(); + // + // while (true) + // { + // GC.Collect(); + // GC.WaitForPendingFinalizers(); + // Thread.Sleep(100); + // } + // } + // + // static void Bar() + // { + // var t = default(Timer); + // t = new Timer(_ => + // { + // t = null; // Comment out this line to see the timer stop + // Console.WriteLine("Hello!"); + // }, null, 5000, Timeout.Infinite); + // } + // + // When the closure over "t" is removed, the timer will stop automatically upon + // garbage collection. However, when retaining the reference, this problem does + // not exist. The code below exploits this behavior, avoiding unnecessary costs + // to root timers in a thread-safe manner. + // + // Below is a fragment of SOS output, proving the proper rooting: + // + // !gcroot 02492440 + // HandleTable: + // 005a13fc (pinned handle) + // -> 03491010 System.Object[] + // -> 024924dc System.Threading.TimerQueue + // -> 02492450 System.Threading.TimerQueueTimer + // -> 02492420 System.Threading.TimerCallback + // -> 02492414 TimerRootingExperiment.Program+<>c__DisplayClass1 + // -> 02492440 System.Threading.Timer + // + // With the USE_TIMER_SELF_ROOT symbol, we shake off this additional rooting code + // for newer platforms where this no longer needed. We checked this on .NET Core + // as well as .NET 4.0, and only #define this symbol for those platforms. + // + + class Timer : IDisposable + { + private Action<object> _action; + private volatile System.Threading.Timer _timer; + + public Timer(Action<object> action, object state, TimeSpan dueTime) + { + _action = action; + + // Don't want the spin wait in Tick to get stuck if this thread gets aborted. + try { } + finally + { + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + } + } + + private void Tick(object state) + { + try + { + _action(state); + } + finally + { + SpinWait.SpinUntil(IsTimerAssigned); + Dispose(); + } + } + + private bool IsTimerAssigned() + { + return _timer != null; + } + + public void Dispose() + { + var timer = _timer; + if (timer != TimerStubs.Never) + { + _action = Stubs<object>.Ignore; + _timer = TimerStubs.Never; + + timer.Dispose(); + } + } + } + + class PeriodicTimer : IDisposable + { + private Action _action; + private volatile System.Threading.Timer _timer; + + public PeriodicTimer(Action action, TimeSpan period) + { + _action = action; + + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, period, period); + } + + private void Tick(object state) + { + _action(); + } + + public void Dispose() + { + var timer = _timer; + if (timer != null) + { + _action = Stubs.Nop; + _timer = null; + + timer.Dispose(); + } + } + } +#else + class Timer : IDisposable + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + + private Action<object> _action; + private System.Threading.Timer _timer; + + private bool _hasAdded; + private bool _hasRemoved; + + public Timer(Action<object> action, object state, TimeSpan dueTime) + { + _action = action; + _timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + + lock (s_timers) + { + if (!_hasRemoved) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + + _hasAdded = true; + } + } + } + + private void Tick(object state) + { + try + { + _action(state); + } + finally + { + Dispose(); + } + } + + public void Dispose() + { + _action = Stubs<object>.Ignore; + + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + if (!_hasRemoved) + { + timer = _timer; + _timer = null; + + if (_hasAdded && timer != null) + s_timers.Remove(timer); + + _hasRemoved = true; + } + } + + if (timer != null) + timer.Dispose(); + } + } + + class PeriodicTimer : IDisposable + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + + private Action _action; + private System.Threading.Timer _timer; + + public PeriodicTimer(Action action, TimeSpan period) + { + _action = action; + _timer = new System.Threading.Timer(Tick, null, period, period); + + lock (s_timers) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + } + } + + private void Tick(object state) + { + _action(); + } + + public void Dispose() + { + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + timer = _timer; + _timer = null; + + if (timer != null) + s_timers.Remove(timer); + } + + if (timer != null) + { + timer.Dispose(); + _action = Stubs.Nop; + } + } + } +#endif + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs new file mode 100644 index 0000000..8739895 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs @@ -0,0 +1,375 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +#if NO_SEMAPHORE +using System.Reactive.Threading; +#endif + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on a designated thread. + /// </summary> + public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable + { + #region Fields + + /// <summary> + /// Counter for diagnostic purposes, to name the threads. + /// </summary> + private static int s_counter; + + /// <summary> + /// Thread factory function. + /// </summary> + private readonly Func<ThreadStart, Thread> _threadFactory; + + /// <summary> + /// Stopwatch for timing free of absolute time dependencies. + /// </summary> + private IStopwatch _stopwatch; + + /// <summary> + /// Thread used by the event loop to run work items on. No work should be run on any other thread. + /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled. + /// </summary> + private Thread _thread; + + /// <summary> + /// Gate to protect data structures, including the work queue and the ready list. + /// </summary> + private readonly object _gate; + + /// <summary> + /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer + /// expires and moves on to the next item in the queue. + /// </summary> +#if !NO_CDS + private readonly SemaphoreSlim _evt; +#else + private readonly Semaphore _evt; +#endif + + /// <summary> + /// Queue holding work items. Protected by the gate. + /// </summary> + private readonly SchedulerQueue<TimeSpan> _queue; + + /// <summary> + /// Queue holding items that are ready to be run as soon as possible. Protected by the gate. + /// </summary> + private readonly Queue<ScheduledItem<TimeSpan>> _readyList; + + /// <summary> + /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next + /// item is still the same. If not, a new timer needs to be started (see below). + /// </summary> + private ScheduledItem<TimeSpan> _nextItem; + + /// <summary> + /// Disposable that always holds the timer to dispatch the first element in the queue. + /// </summary> + private readonly SerialDisposable _nextTimer; + + /// <summary> + /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to + /// wake up the event loop thread, which will subsequently abandon all work. + /// </summary> + private bool _disposed; + + #endregion + + #region Constructors + + /// <summary> + /// Creates an object that schedules units of work on a designated thread. + /// </summary> + public EventLoopScheduler() + : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true }) + { + } + +#if !NO_THREAD + /// <summary> + /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options. + /// </summary> + /// <param name="threadFactory">Factory function for thread creation.</param> + /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception> + public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory) + { + if (threadFactory == null) + throw new ArgumentNullException("threadFactory"); +#else + internal EventLoopScheduler(Func<ThreadStart, Thread> threadFactory) + { +#endif + _threadFactory = threadFactory; + _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + _gate = new object(); + +#if !NO_CDS + _evt = new SemaphoreSlim(0); +#else + _evt = new Semaphore(0, int.MaxValue); +#endif + _queue = new SchedulerQueue<TimeSpan>(); + _readyList = new Queue<ScheduledItem<TimeSpan>>(); + + _nextTimer = new SerialDisposable(); + + ExitIfEmpty = false; + } + + #endregion + + #region Properties + + /// <summary> + /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work + /// is scheduled afterwards, a new event loop thread is created. This property is used by the + /// NewThreadScheduler which uses an event loop for its recursive invocations. + /// </summary> + internal bool ExitIfEmpty + { + get; + set; + } + + #endregion + + #region Public methods + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var due = _stopwatch.Elapsed + dueTime; + var si = new ScheduledItem<TimeSpan, TState>(this, state, action, due); + + lock (_gate) + { + if (_disposed) + throw new ObjectDisposedException(""); + + if (dueTime <= TimeSpan.Zero) + { + _readyList.Enqueue(si); + _evt.Release(); + } + else + { + _queue.Enqueue(si); + _evt.Release(); + } + + EnsureThread(); + } + + return Disposable.Create(si.Cancel); + } + + /// <summary> + /// Schedules a periodic piece of work on the designated thread. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var start = _stopwatch.Elapsed; + var next = start + period; + + var state1 = state; + + var d = new MultipleAssignmentDisposable(); + var gate = new AsyncLock(); + + var tick = default(Func<IScheduler, object, IDisposable>); + tick = (self_, _) => + { + next += period; + + d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick); + + gate.Wait(() => + { + state1 = action(state1); + }); + + return Disposable.Empty; + }; + + d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick); + + return new CompositeDisposable(d, gate); + } + +#if !NO_STOPWATCH + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + + /// <summary> + /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned. + /// </summary> + public void Dispose() + { + lock (_gate) + { + if (!_disposed) + { + _disposed = true; + _nextTimer.Dispose(); + _evt.Release(); + } + } + } + + #endregion + + #region Private implementation + + /// <summary> + /// Ensures there is an event loop thread running. Should be called under the gate. + /// </summary> + private void EnsureThread() + { + if (_thread == null) + { + _thread = _threadFactory(Run); + _thread.Start(); + } + } + + /// <summary> + /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event + /// which gets set by calls to Schedule, the next item timer, or calls to Dispose. + /// </summary> + private void Run() + { + while (true) + { +#if !NO_CDS + _evt.Wait(); +#else + _evt.WaitOne(); +#endif + + var ready = default(ScheduledItem<TimeSpan>[]); + + lock (_gate) + { + // + // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the + // loop immediately. Subsequent calls to Schedule won't ever create a new thread. + // + if (_disposed) + { + ((IDisposable)_evt).Dispose(); + return; + } + + while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) + { + var item = _queue.Dequeue(); + _readyList.Enqueue(item); + } + + if (_queue.Count > 0) + { + var next = _queue.Peek(); + if (next != _nextItem) + { + _nextItem = next; + + var due = next.DueTime - _stopwatch.Elapsed; + _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due); + } + } + + if (_readyList.Count > 0) + { + ready = _readyList.ToArray(); + _readyList.Clear(); + } + } + + if (ready != null) + { + foreach (var item in ready) + { + if (!item.IsCanceled) + item.Invoke(); + } + } + + if (ExitIfEmpty) + { + lock (_gate) + { + if (_readyList.Count == 0 && _queue.Count == 0) + { + _thread = null; + return; + } + } + } + } + } + + private void Tick(object state) + { + lock (_gate) + { + if (!_disposed) + { + var item = (ScheduledItem<TimeSpan>)state; + if (_queue.Remove(item)) + { + _readyList.Enqueue(item); + } + + _evt.Release(); + } + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs new file mode 100644 index 0000000..b111554 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs @@ -0,0 +1,194 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules each unit of work on a separate thread. + /// </summary> + public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic + { + internal static readonly NewThreadScheduler s_instance = new NewThreadScheduler(); + + private readonly Func<ThreadStart, Thread> _threadFactory; + + /// <summary> + /// Creates an object that schedules each unit of work on a separate thread. + /// </summary> + public NewThreadScheduler() + : this(action => new Thread(action)) + { + } + + /// <summary> + /// Gets an instance of this scheduler that uses the default Thread constructor. + /// </summary> + public static NewThreadScheduler Default + { + get + { + return s_instance; + } + } + +#if !NO_THREAD + /// <summary> + /// Creates an object that schedules each unit of work on a separate thread. + /// </summary> + /// <param name="threadFactory">Factory function for thread creation.</param> + /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception> + public NewThreadScheduler(Func<ThreadStart, Thread> threadFactory) + { + if (threadFactory == null) + throw new ArgumentNullException("threadFactory"); +#else + private NewThreadScheduler(Func<ThreadStart, Thread> threadFactory) + { +#endif + _threadFactory = threadFactory; + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var scheduler = new EventLoopScheduler(_threadFactory); + scheduler.ExitIfEmpty = true; + return scheduler.Schedule(state, dueTime, action); + } + + /// <summary> + /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new BooleanDisposable(); + + var thread = _threadFactory(() => + { + // + // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning + // requires us to ensure the scheduled work gets an opportunity to observe + // the cancellation request. + // + action(state, d); + }); + + thread.Start(); + + return d; + } + + /// <summary> + /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var periodic = new Periodic<TState>(state, period, action); + + var thread = _threadFactory(periodic.Run); + thread.Start(); + + return periodic; + } + + class Periodic<TState> : IDisposable + { + private readonly IStopwatch _stopwatch; + private readonly TimeSpan _period; + private readonly Func<TState, TState> _action; + + private readonly object _cancel = new object(); + private volatile bool _done; + + private TState _state; + private TimeSpan _next; + + public Periodic(TState state, TimeSpan period, Func<TState, TState> action) + { + _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + _period = period; + _action = action; + + _state = state; + _next = period; + } + + public void Run() + { + while (!_done) + { + var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed); + + lock (_cancel) + { + if (Monitor.Wait(_cancel, timeout)) + return; + } + + _state = _action(_state); + _next += _period; + } + } + + public void Dispose() + { + _done = true; + + lock (_cancel) + { + Monitor.Pulse(_cancel); + } + } + } + +#if !NO_STOPWATCH + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs new file mode 100644 index 0000000..a84588b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs @@ -0,0 +1,230 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_TPL +using System.Reactive.Disposables; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the Task Parallel Library (TPL) task pool. + /// </summary> + /// <seealso cref="TaskPoolScheduler.Default">Instance of this type using the default TaskScheduler to schedule work on the TPL task pool.</seealso> + public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic + { + private static readonly TaskPoolScheduler s_instance = new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)); + private readonly TaskFactory taskFactory; + + /// <summary> + /// Creates an object that schedules units of work using the provided TaskFactory. + /// </summary> + /// <param name="taskFactory">Task factory used to create tasks to run units of work.</param> + /// <exception cref="ArgumentNullException"><paramref name="taskFactory"/> is null.</exception> + public TaskPoolScheduler(TaskFactory taskFactory) + { + if (taskFactory == null) + throw new ArgumentNullException("taskFactory"); + + this.taskFactory = taskFactory; + } + + /// <summary> + /// Gets an instance of this scheduler that uses the default TaskScheduler. + /// </summary> + public static TaskPoolScheduler Default + { + get + { + return s_instance; + } + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SerialDisposable(); + var cancelable = new CancellationDisposable(); + d.Disposable = cancelable; + taskFactory.StartNew(() => + { + // + // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing + // helper thread. + // + // Our manual escalation based on the creation of a throwing thread was merely to + // expedite the process of throwing the exception that would otherwise occur on the + // finalizer thread at a later point during the app's lifetime. + // + // However, it also prevented applications from observing the exception through + // the TaskScheduler.UnobservedTaskException static event. Also, starting form .NET + // 4.5, the default behavior of the task pool is not to take down the application + // when an exception goes unobserved (done as part of the async/await work). It'd + // be weird for Rx not to follow the platform defaults. + // + // General implementation guidelines for schedulers (in order of importance): + // + // 1. Always thunk through to the underlying infrastructure with a wrapper that's as tiny as possible. + // 2. Global exception notification/handling mechanisms shouldn't be bypassed. + // 3. Escalation behavior for exceptions is left to the underlying infrastructure. + // + // The Catch extension method for IScheduler (added earlier) allows to re-route + // exceptions at stage 2. If the exception isn't handled at the Rx level, it + // propagates by means of a rethrow, falling back to behavior in 3. + // + d.Disposable = action(this, state); + }, cancelable.Token); + return d; + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + +#if !NO_TASK_DELAY + var d = new MultipleAssignmentDisposable(); + + var ct = new CancellationDisposable(); + d.Disposable = ct; + + Task.Delay(dueTime, ct.Token).ContinueWith(_ => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler); + + return d; +#else + return DefaultScheduler.Instance.Schedule(state, dt, (_, state1) => Schedule(state1, action)); +#endif + } + + /// <summary> + /// Schedules a long-running task by creating a new task using TaskCreationOptions.LongRunning. Cancellation happens through polling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + var d = new BooleanDisposable(); + + taskFactory.StartNew(() => + { + // + // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning + // requires us to ensure the scheduled work gets an opportunity to observe + // the cancellation request. + // + action(state, d); + }, TaskCreationOptions.LongRunning); + + return d; + } + +#if !NO_STOPWATCH + /// <summary> + /// Gets a new stopwatch ob ject. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + + /// <summary> + /// Schedules a periodic piece of work by running a platform-specific timer to create tasks periodically. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + +#if !NO_TASK_DELAY + var cancel = new CancellationDisposable(); + + var state1 = state; + var gate = new AsyncLock(); + + var moveNext = default(Action); + moveNext = () => + { + Task.Delay(period, cancel.Token).ContinueWith( + _ => + { + moveNext(); + + gate.Wait(() => + { + state1 = action(state1); + }); + }, + CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler + ); + }; + + moveNext(); + + return new CompositeDisposable(cancel, gate); +#else + var state1 = state; + var gate = new AsyncLock(); + + var timer = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(() => + { + taskFactory.StartNew(() => + { + gate.Wait(() => + { + state1 = action(state1); + }); + }); + }, period); + + return new CompositeDisposable(timer, gate); +#endif + } + } +} +#endif diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs new file mode 100644 index 0000000..691365b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_THREAD +using System; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + class Thread + { + private readonly ThreadStart _start; + + public Thread(ThreadStart start) + { + _start = start; + } + + public string Name { get; set; } + public bool IsBackground { get; set; } + + public void Start() + { + System.Threading.Tasks.Task.Factory.StartNew(Run, System.Threading.Tasks.TaskCreationOptions.LongRunning); + } + + private void Run() + { + _start(); + } + } + + delegate void ThreadStart(); +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs new file mode 100644 index 0000000..1b8579d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWS +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using Windows.System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the Windows Runtime thread pool. + /// </summary> + /// <seealso cref="ThreadPoolScheduler.Default">Singleton instance of this type exposed through this static property.</seealso> + [CLSCompliant(false)] + public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerPeriodic + { + private readonly WorkItemPriority _priority; + private readonly WorkItemOptions _options; + private static Lazy<ThreadPoolScheduler> s_default = new Lazy<ThreadPoolScheduler>(() => new ThreadPoolScheduler()); + + /// <summary> + /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool. + /// </summary> + public ThreadPoolScheduler() + { + } + + /// <summary> + /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool with the given priority. + /// </summary> + /// <param name="priority">Priority for scheduled units of work.</param> + public ThreadPoolScheduler(WorkItemPriority priority) + { + _priority = priority; + _options = WorkItemOptions.None; + } + + /// <summary> + /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool with the given priority. + /// </summary> + /// <param name="priority">Priority for scheduled units of work.</param> + /// <param name="options">Options that configure how work is scheduled.</param> + public ThreadPoolScheduler(WorkItemPriority priority, WorkItemOptions options) + { + _priority = priority; + _options = options; + } + + /// <summary> + /// Gets the singleton instance of the Windows Runtime thread pool scheduler. + /// </summary> + public static ThreadPoolScheduler Default + { + get + { + return s_default.Value; + } + } + + /// <summary> + /// Gets the priority at which work is scheduled. + /// </summary> + public WorkItemPriority Priority + { + get { return _priority; } + } + + /// <summary> + /// Gets the options that configure how work is scheduled. + /// </summary> + public WorkItemOptions Options + { + get { return _options; } + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + var res = global::Windows.System.Threading.ThreadPool.RunAsync(iaa => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, _priority, _options); + + return new CompositeDisposable( + d, + Disposable.Create(res.Cancel) + ); + } + + /// <summary> + /// Schedules an action to be executed after dueTime, using a Windows.System.Threading.ThreadPoolTimer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + + if (dt.Ticks == 0) + return Schedule(state, action); + + var d = new SingleAssignmentDisposable(); + + var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( + tpt => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, + dt + ); + + return new CompositeDisposable( + d, + Disposable.Create(res.Cancel) + ); + } + + /// <summary> + /// Schedules a periodic piece of work, using a Windows.System.Threading.ThreadPoolTimer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than one millisecond.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + // + // The WinRT thread pool is based on the Win32 thread pool and cannot handle + // sub-1ms resolution. When passing a lower period, we get single-shot + // timer behavior instead. See MSDN documentation for CreatePeriodicTimer + // for more information. + // + if (period < TimeSpan.FromMilliseconds(1)) + throw new ArgumentOutOfRangeException("period", Strings_PlatformServices.WINRT_NO_SUB1MS_TIMERS); + if (action == null) + throw new ArgumentNullException("action"); + + var state1 = state; + var gate = new AsyncLock(); + + var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer( + tpt => + { + gate.Wait(() => + { + state1 = action(state1); + }); + }, + period + ); + + return Disposable.Create(() => + { + res.Cancel(); + gate.Dispose(); + action = Stubs<TState>.I; + }); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs new file mode 100644 index 0000000..58fd008 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs @@ -0,0 +1,427 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !WINDOWS && !NO_THREAD +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the CLR thread pool. + /// </summary> + /// <seealso cref="ThreadPoolScheduler.Instance">Singleton instance of this type exposed through this static property.</seealso> + public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic + { + private static readonly ThreadPoolScheduler s_instance = new ThreadPoolScheduler(); + + /// <summary> + /// Gets the singleton instance of the CLR thread pool scheduler. + /// </summary> + public static ThreadPoolScheduler Instance + { + get + { + return s_instance; + } + } + + ThreadPoolScheduler() + { + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + ThreadPool.QueueUserWorkItem(_ => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, null); + + return d; + } + + /// <summary> + /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + + return new Timer<TState>(this, state, dt, action); + } + + /// <summary> + /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return NewThreadScheduler.Default.ScheduleLongRunning(state, action); + } + +#if !NO_STOPWATCH + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + + /// <summary> + /// Schedules a periodic piece of work, using a System.Threading.Timer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than or equal to zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + // + // MSDN documentation states the following: + // + // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once; + // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method." + // + if (period <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + return new PeriodicTimer<TState>(state, period, action); + } + +#if USE_TIMER_SELF_ROOT + + // + // See ConcurrencyAbstractionLayerImpl.cs for more information about the code + // below and its timer rooting behavior. + // + + sealed class Timer<TState> : IDisposable + { + private readonly MultipleAssignmentDisposable _disposable; + + private readonly IScheduler _parent; + private readonly TState _state; + private Func<IScheduler, TState, IDisposable> _action; + + private volatile System.Threading.Timer _timer; + + public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + _parent = parent; + _state = state; + _action = action; + + _disposable = new MultipleAssignmentDisposable(); + _disposable.Disposable = Disposable.Create(Stop); + + // Don't want the spin wait in Tick to get stuck if this thread gets aborted. + try { } + finally + { + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + } + } + + private void Tick(object state) + { + try + { + _disposable.Disposable = _action(_parent, _state); + } + finally + { + SpinWait.SpinUntil(IsTimerAssigned); + Stop(); + } + } + + private bool IsTimerAssigned() + { + return _timer != null; + } + + public void Dispose() + { + _disposable.Dispose(); + } + + private void Stop() + { + var timer = _timer; + if (timer != TimerStubs.Never) + { + _action = Nop; + _timer = TimerStubs.Never; + + timer.Dispose(); + } + } + + private IDisposable Nop(IScheduler scheduler, TState state) + { + return Disposable.Empty; + } + } + + sealed class PeriodicTimer<TState> : IDisposable + { + private TState _state; + private Func<TState, TState> _action; + + private readonly AsyncLock _gate; + private volatile System.Threading.Timer _timer; + + public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action) + { + _state = state; + _action = action; + + _gate = new AsyncLock(); + + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, period, period); + } + + private void Tick(object state) + { + _gate.Wait(() => + { + _state = _action(_state); + }); + } + + public void Dispose() + { + var timer = _timer; + if (timer != null) + { + _action = Stubs<TState>.I; + _timer = null; + + timer.Dispose(); + _gate.Dispose(); + } + } + } +#else + abstract class Timer + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + protected static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + protected static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + } + + sealed class Timer<TState> : Timer, IDisposable + { + private readonly MultipleAssignmentDisposable _disposable; + + private readonly IScheduler _parent; + private readonly TState _state; + + private Func<IScheduler, TState, IDisposable> _action; + private System.Threading.Timer _timer; + + private bool _hasAdded; + private bool _hasRemoved; + + public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + _disposable = new MultipleAssignmentDisposable(); + _disposable.Disposable = Disposable.Create(Unroot); + + _parent = parent; + _state = state; + + _action = action; + _timer = new System.Threading.Timer(Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + + lock (s_timers) + { + if (!_hasRemoved) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + + _hasAdded = true; + } + } + } + + private void Tick(object state) + { + try + { + _disposable.Disposable = _action(_parent, _state); + } + finally + { + Unroot(); + } + } + + private void Unroot() + { + _action = Nop; + + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + if (!_hasRemoved) + { + timer = _timer; + _timer = null; + + if (_hasAdded && timer != null) + s_timers.Remove(timer); + + _hasRemoved = true; + } + } + + if (timer != null) + timer.Dispose(); + } + + private IDisposable Nop(IScheduler scheduler, TState state) + { + return Disposable.Empty; + } + + public void Dispose() + { + _disposable.Dispose(); + } + } + + abstract class PeriodicTimer + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + protected static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + protected static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + } + + sealed class PeriodicTimer<TState> : PeriodicTimer, IDisposable + { + private readonly AsyncLock _gate; + + private TState _state; + private Func<TState, TState> _action; + private System.Threading.Timer _timer; + + public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action) + { + _gate = new AsyncLock(); + + _state = state; + _action = action; + _timer = new System.Threading.Timer(Tick, null, period, period); + + lock (s_timers) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + } + } + + private void Tick(object state) + { + _gate.Wait(() => + { + _state = _action(_state); + }); + } + + public void Dispose() + { + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + timer = _timer; + _timer = null; + + if (timer != null) + s_timers.Remove(timer); + } + + if (timer != null) + { + timer.Dispose(); + _gate.Dispose(); + _action = Stubs<TState>.I; + } + } + } +#endif + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs new file mode 100644 index 0000000..bdacdc7 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Concurrency; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// Provides access to the platform enlightenments used by other Rx libraries to improve system performance and + /// runtime efficiency. While Rx can run without platform enlightenments loaded, it's recommended to deploy the + /// System.Reactive.PlatformServices assembly with your application and call <see cref="EnlightenmentProvider. + /// EnsureLoaded"/> during application startup to ensure enlightenments are properly loaded. + /// </summary> + public static class EnlightenmentProvider + { + /// <summary> + /// Ensures that the calling assembly has a reference to the System.Reactive.PlatformServices assembly with + /// platform enlightenments. If no reference is made from the user code, it's possible for the build process + /// to drop the deployment of System.Reactive.PlatformServices, preventing its runtime discovery. + /// </summary> + /// <returns> + /// true if the loaded enlightenment provider matches the provided defined in the current assembly; false + /// otherwise. When a custom enlightenment provider is installed by the host, false will be returned. + /// </returns> + public static bool EnsureLoaded() + { + return PlatformEnlightenmentProvider.Current is CurrentPlatformEnlightenmentProvider; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs new file mode 100644 index 0000000..6d14651 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if HAS_EDI +namespace System.Reactive.PlatformServices +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class /*Default*/ExceptionServicesImpl : IExceptionServices + { + public void Rethrow(Exception exception) + { + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw(); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs new file mode 100644 index 0000000..c70bf17 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWS +using Windows.ApplicationModel; +using Windows.ApplicationModel.Core; + +namespace System.Reactive.PlatformServices +{ + internal class HostLifecycleNotifications : IHostLifecycleNotifications + { + private EventHandler<SuspendingEventArgs> _suspending; + private EventHandler<object> _resuming; + + public event EventHandler<HostSuspendingEventArgs> Suspending + { + add + { + _suspending = (o, e) => value(o, new HostSuspendingEventArgs()); + CoreApplication.Suspending += _suspending; + } + + remove + { + CoreApplication.Suspending -= _suspending; + } + } + + public event EventHandler<HostResumingEventArgs> Resuming + { + add + { + _resuming = (o, e) => value(o, new HostResumingEventArgs()); + CoreApplication.Resuming += _resuming; + } + + remove + { + CoreApplication.Resuming -= _resuming; + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs new file mode 100644 index 0000000..30a32e7 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWSPHONE7 + +#if DEBUG_NO_AGENT_SUPPORT +using Microsoft.Phone.Shell; +#else +using System.Reactive.PlatformServices.Phone.Shell; +#endif + +namespace System.Reactive.PlatformServices +{ + internal class HostLifecycleNotifications : IHostLifecycleNotifications + { + private EventHandler<ActivatedEventArgs> _activated; + private EventHandler<DeactivatedEventArgs> _deactivated; + + public event EventHandler<HostSuspendingEventArgs> Suspending + { + add + { + _deactivated = (o, e) => value(o, new HostSuspendingEventArgs()); + + var current = PhoneApplicationService.Current; + if (current != null) + current.Deactivated += _deactivated; + } + + remove + { + var current = PhoneApplicationService.Current; + if (current != null) + current.Deactivated -= _deactivated; + } + } + + public event EventHandler<HostResumingEventArgs> Resuming + { + add + { + _activated = (o, e) => + { + if (e.IsApplicationInstancePreserved) + { + value(o, new HostResumingEventArgs()); + } + }; + + var current = PhoneApplicationService.Current; + if (current != null) + current.Activated += _activated; + } + + remove + { + var current = PhoneApplicationService.Current; + if (current != null) + current.Activated -= _activated; + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs new file mode 100644 index 0000000..3ce1b22 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs @@ -0,0 +1,214 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWSPHONE7 +#if !DEBUG_NO_AGENT_SUPPORT +// +// The Windows Phone Marketplace Test Kit disallows usage of types +// in the Microsoft.Phone.Shell namespace, determined by static code +// analysis, for background agents. +// +// However, with a null check for PhoneApplicationService.Current, +// we can safely use this the lifecycle events for dormant state +// transitions. In a background agent this property will be null; +// trying to create an instance of PhoneApplicationService throws +// a ComException and is required to set the Current property. +// +// In order to access the PhoneApplicationService functionality for +// non-background agent assemblies, we build a late bound wrapper +// around the APIs. +// +// See appplat\src\Frameworks\Microsoft\Phone\PhoneApplicationService.cs +// for implementation details of the class we use. +// +namespace System.Reactive.PlatformServices.Phone.Shell +{ + using System; + using System.Collections.Generic; + using System.Linq.Expressions; + using System.Reflection; + using Microsoft.Phone; + + class PhoneApplicationService + { + private static readonly object s_gate = new object(); + internal static readonly Assembly s_phshAsm; + private static readonly Type s_pasType; + private static readonly PropertyInfo s_curProp; + private static readonly EventInfo s_actdEvt; + private static readonly EventInfo s_deacEvt; + + private readonly object _target; + + static PhoneApplicationService() + { + s_phshAsm = typeof(BackgroundAgent).Assembly; // Has to be in Microsoft.Phone.dll. + s_pasType = s_phshAsm.GetType("Microsoft.Phone.Shell.PhoneApplicationService"); + s_curProp = s_pasType.GetProperty("Current", BindingFlags.Public | BindingFlags.Static); + s_actdEvt = s_curProp.PropertyType.GetEvent("Activated", BindingFlags.Public | BindingFlags.Instance); + s_deacEvt = s_curProp.PropertyType.GetEvent("Deactivated", BindingFlags.Public | BindingFlags.Instance); + } + + private PhoneApplicationService(object target) + { + _target = target; + } + + private static PhoneApplicationService s_current; + + public static PhoneApplicationService Current + { + get + { + lock (s_gate) + { + if (s_current == null) + { + var current = s_curProp.GetValue(null, null); + if (current == null) + return null; + + // + // Current provides a singleton. The constructor + // of PhoneApplicationService guarantees this, + // throwing an InvalidOperationException if more + // than one instance is created. + // + s_current = new PhoneApplicationService(current); + } + } + + return s_current; + } + } + + private Dictionary<object, object> _actdHandlers = new Dictionary<object, object>(); + + public event EventHandler<ActivatedEventArgs> Activated + { + add + { + AddHandler<ActivatedEventArgs>(s_actdEvt, _actdHandlers, value); + } + + remove + { + RemoveHandler(s_actdEvt, _actdHandlers, value); + } + } + + private Dictionary<object, object> _deacHandlers = new Dictionary<object, object>(); + + public event EventHandler<DeactivatedEventArgs> Deactivated + { + add + { + AddHandler<DeactivatedEventArgs>(s_deacEvt, _deacHandlers, value); + } + + remove + { + RemoveHandler(s_deacEvt, _deacHandlers, value); + } + } + + private void AddHandler<TEventArgs>(EventInfo evt, Dictionary<object, object> map, object handler) + where TEventArgs : EventArgs + { + var h = GetHandler<TEventArgs>(evt, handler); + var add = evt.GetAddMethod(); + + lock (s_gate) + { + map.Add(handler, h); + add.Invoke(_target, new object[] { h }); + } + } + + private void RemoveHandler(EventInfo evt, Dictionary<object, object> map, object handler) + { + var rem = evt.GetRemoveMethod(); + + lock (s_gate) + { + var h = default(object); + if (map.TryGetValue(handler, out h)) + { + // + // We assume only one handler will be attached to + // the event, hence we shouldn't worry about having + // multiple delegate instances with the same target + // being attached. This guarantee is made by the + // reference counting in HostLifecycleService. + // + // The use of a dictionary allows for reuse with + // multiple distinct handlers going forward. + // + map.Remove(handler); + rem.Invoke(_target, new object[] { h }); + } + } + } + + private static object GetHandler<TEventArgsThunk>(EventInfo evt, object call) + where TEventArgsThunk : EventArgs + { + var ht = evt.EventHandlerType; + var hp = ht.GetMethod("Invoke").GetParameters(); + + var po = Expression.Parameter(hp[0].ParameterType, hp[0].Name); + var pe = Expression.Parameter(hp[1].ParameterType, hp[1].Name); + + var h = Expression.Lambda( + ht, + Expression.Invoke( + Expression.Constant(call), + po, + Expression.New( + typeof(TEventArgsThunk).GetConstructor(new[] { typeof(object) }), + pe + ) + ), + po, + pe + ); + + return h.Compile(); + } + } + + class ActivatedEventArgs : EventArgs + { + private static readonly Type s_aeaType; + private static readonly PropertyInfo s_aipProp; + + private readonly object _target; + + static ActivatedEventArgs() + { + s_aeaType = PhoneApplicationService.s_phshAsm.GetType("Microsoft.Phone.Shell.ActivatedEventArgs"); + s_aipProp = s_aeaType.GetProperty("IsApplicationInstancePreserved", BindingFlags.Public | BindingFlags.Instance); + } + + public ActivatedEventArgs(object target) + { + _target = target; + } + + public bool IsApplicationInstancePreserved + { + get + { + return (bool)s_aipProp.GetValue(_target, null); + } + } + } + + class DeactivatedEventArgs : EventArgs + { + public DeactivatedEventArgs(object target) + { + } + } +} +#endif +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs new file mode 100644 index 0000000..7bb597c --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +// +// WARNING: The full namespace-qualified type name should stay the same for the discovery in System.Reactive.Core to work! +// +using System.ComponentModel; +using System.Diagnostics; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Reflection; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provider for platform-specific framework enlightenments. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class CurrentPlatformEnlightenmentProvider : IPlatformEnlightenmentProvider + { + /// <summary> + /// (Infastructure) Tries to gets the specified service. + /// </summary> + /// <typeparam name="T">Service type.</typeparam> + /// <param name="args">Optional set of arguments.</param> + /// <returns>Service instance or null if not found.</returns> + public virtual T GetService<T>(object[] args) where T : class + { + var t = typeof(T); + +#if HAS_EDI + if (t == typeof(IExceptionServices)) + { + return (T)(object)new ExceptionServicesImpl(); + } +#endif + +#if !NO_THREAD || WINDOWS + if (t == typeof(IConcurrencyAbstractionLayer)) + { + return (T)(object)new ConcurrencyAbstractionLayerImpl(); + } +#endif + + if (t == typeof(IScheduler) && args != null) + { + switch ((string)args[0]) + { +#if !WINDOWS && !NO_THREAD + case "ThreadPool": + return (T)(object)ThreadPoolScheduler.Instance; +#elif WINDOWS + case "ThreadPool": + return (T)(object)ThreadPoolScheduler.Default; +#endif +#if !NO_TPL + case "TaskPool": + return (T)(object)TaskPoolScheduler.Default; +#endif + case "NewThread": + return (T)(object)NewThreadScheduler.Default; + } + } + +#if WINDOWS || WINDOWSPHONE7 + if (t == typeof(IHostLifecycleNotifications)) + { + return (T)(object)new HostLifecycleNotifications(); + } +#endif + + if (t == typeof(IQueryServices)) + { + // + // We perform this Debugger.IsAttached check early rather than deferring + // the decision to intercept query operator methods to the debugger + // assembly that's dynamically discovered here. Also, it's a reasonable + // expectation it'd be pretty hard to turn on interception dynamically + // upon a debugger attach event, so we should make this check early. + // + // In the initial release of v2.0 (RTM), we won't have the corresponding + // debugger assembly available yet, so the dynamic load would always + // fail. We also don't want to take the price of (an attempt to) a dynamic + // assembly load for the regular production case. + // + if (Debugger.IsAttached) + { +#if NETCF35 + var name = "System.Reactive.Linq.QueryDebugger, System.Reactive.Debugger"; +#else +#if CRIPPLED_REFLECTION + var ifType = t.GetTypeInfo(); +#else + var ifType = t; +#endif + var asm = new AssemblyName(ifType.Assembly.FullName); + asm.Name = "System.Reactive.Debugger"; + var name = "System.Reactive.Linq.QueryDebugger, " + asm.FullName; +#endif + var dbg = Type.GetType(name, false); + if (dbg != null) + return (T)(object)Activator.CreateInstance(dbg); + } + } + + return null; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs new file mode 100644 index 0000000..04a4040 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_STOPWATCH +using System.Diagnostics; + +namespace System.Reactive.Concurrency +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class /*Default*/StopwatchImpl : IStopwatch + { + private readonly Stopwatch _sw; + + public StopwatchImpl() + { + _sw = Stopwatch.StartNew(); + } + + public TimeSpan Elapsed + { + get { return _sw.Elapsed; } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/Stubs.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/Stubs.cs new file mode 100644 index 0000000..c879ccf --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Internal/Stubs.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive.Concurrency +{ +#if !NO_THREAD + internal static class TimerStubs + { + public static readonly System.Threading.Timer Never = new System.Threading.Timer(_ => { }); + } +#endif +} |