diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency')
8 files changed, 1917 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs new file mode 100644 index 0000000..1d5c0ae --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_THREAD && WINDOWS +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer + { + public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) + { + var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( + tpt => + { + action(state); + }, + Normalize(dueTime) + ); + + return Disposable.Create(res.Cancel); + } + + public IDisposable StartPeriodicTimer(Action action, TimeSpan period) + { + // + // The WinRT thread pool is based on the Win32 thread pool and cannot handle + // sub-1ms resolution. When passing a lower period, we get single-shot + // timer behavior instead. See MSDN documentation for CreatePeriodicTimer + // for more information. + // + if (period < TimeSpan.FromMilliseconds(1)) + throw new ArgumentOutOfRangeException("period", Strings_PlatformServices.WINRT_NO_SUB1MS_TIMERS); + + var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer( + tpt => + { + action(); + }, + period + ); + + return Disposable.Create(res.Cancel); + } + + public IDisposable QueueUserWorkItem(Action<object> action, object state) + { + var res = global::Windows.System.Threading.ThreadPool.RunAsync(iaa => + { + action(state); + }); + + return Disposable.Create(res.Cancel); + } + + public void Sleep(TimeSpan timeout) + { + var e = new ManualResetEventSlim(); + + global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( + tpt => + { + e.Set(); + }, + Normalize(timeout) + ); + + e.Wait(); + } + + public IStopwatch StartStopwatch() + { +#if !NO_STOPWATCH + return new StopwatchImpl(); +#else + return new DefaultStopwatch(); +#endif + } + + public bool SupportsLongRunning + { + get { return false; } + } + + public void StartThread(Action<object> action, object state) + { + throw new NotSupportedException(); + } + + private TimeSpan Normalize(TimeSpan dueTime) + { + if (dueTime < TimeSpan.Zero) + return TimeSpan.Zero; + + return dueTime; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs new file mode 100644 index 0000000..4c91463 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs @@ -0,0 +1,371 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_THREAD +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer + { + public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) + { + return new Timer(action, state, Normalize(dueTime)); + } + + public IDisposable StartPeriodicTimer(Action action, TimeSpan period) + { + // + // MSDN documentation states the following: + // + // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once; + // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method." + // + if (period <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + + return new PeriodicTimer(action, period); + } + + public IDisposable QueueUserWorkItem(Action<object> action, object state) + { + System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state); + return Disposable.Empty; + } + +#if USE_SLEEP_MS + public void Sleep(TimeSpan timeout) + { + System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds); + } +#else + public void Sleep(TimeSpan timeout) + { + System.Threading.Thread.Sleep(Normalize(timeout)); + } +#endif + + public IStopwatch StartStopwatch() + { +#if !NO_STOPWATCH + return new StopwatchImpl(); +#else + return new DefaultStopwatch(); +#endif + } + + public bool SupportsLongRunning + { + get { return true; } + } + + public void StartThread(Action<object> action, object state) + { + new Thread(() => + { + action(state); + }) { IsBackground = true }.Start(); + } + + private static TimeSpan Normalize(TimeSpan dueTime) + { + if (dueTime < TimeSpan.Zero) + return TimeSpan.Zero; + + return dueTime; + } + +#if USE_TIMER_SELF_ROOT + + // + // Some historical context. In the early days of Rx, we discovered an issue with + // the rooting of timers, causing them to get GC'ed even when the IDisposable of + // a scheduled activity was kept alive. The original code simply created a timer + // as follows: + // + // var t = default(Timer); + // t = new Timer(_ => + // { + // t = null; + // Debug.WriteLine("Hello!"); + // }, null, 5000, Timeout.Infinite); + // + // IIRC the reference to "t" captured by the closure wasn't sufficient on .NET CF + // to keep the timer rooted, causing problems on Windows Phone 7. As a result, we + // added rooting code using a dictionary (SD 7280), which we carried forward all + // the way to Rx v2.0 RTM. + // + // However, the desktop CLR's implementation of System.Threading.Timer exhibits + // other characteristics where a timer can root itself when the timer is still + // reachable through the state or callback parameters. To illustrate this, run + // the following piece of code: + // + // static void Main() + // { + // Bar(); + // + // while (true) + // { + // GC.Collect(); + // GC.WaitForPendingFinalizers(); + // Thread.Sleep(100); + // } + // } + // + // static void Bar() + // { + // var t = default(Timer); + // t = new Timer(_ => + // { + // t = null; // Comment out this line to see the timer stop + // Console.WriteLine("Hello!"); + // }, null, 5000, Timeout.Infinite); + // } + // + // When the closure over "t" is removed, the timer will stop automatically upon + // garbage collection. However, when retaining the reference, this problem does + // not exist. The code below exploits this behavior, avoiding unnecessary costs + // to root timers in a thread-safe manner. + // + // Below is a fragment of SOS output, proving the proper rooting: + // + // !gcroot 02492440 + // HandleTable: + // 005a13fc (pinned handle) + // -> 03491010 System.Object[] + // -> 024924dc System.Threading.TimerQueue + // -> 02492450 System.Threading.TimerQueueTimer + // -> 02492420 System.Threading.TimerCallback + // -> 02492414 TimerRootingExperiment.Program+<>c__DisplayClass1 + // -> 02492440 System.Threading.Timer + // + // With the USE_TIMER_SELF_ROOT symbol, we shake off this additional rooting code + // for newer platforms where this no longer needed. We checked this on .NET Core + // as well as .NET 4.0, and only #define this symbol for those platforms. + // + + class Timer : IDisposable + { + private Action<object> _action; + private volatile System.Threading.Timer _timer; + + public Timer(Action<object> action, object state, TimeSpan dueTime) + { + _action = action; + + // Don't want the spin wait in Tick to get stuck if this thread gets aborted. + try { } + finally + { + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + } + } + + private void Tick(object state) + { + try + { + _action(state); + } + finally + { + SpinWait.SpinUntil(IsTimerAssigned); + Dispose(); + } + } + + private bool IsTimerAssigned() + { + return _timer != null; + } + + public void Dispose() + { + var timer = _timer; + if (timer != TimerStubs.Never) + { + _action = Stubs<object>.Ignore; + _timer = TimerStubs.Never; + + timer.Dispose(); + } + } + } + + class PeriodicTimer : IDisposable + { + private Action _action; + private volatile System.Threading.Timer _timer; + + public PeriodicTimer(Action action, TimeSpan period) + { + _action = action; + + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, period, period); + } + + private void Tick(object state) + { + _action(); + } + + public void Dispose() + { + var timer = _timer; + if (timer != null) + { + _action = Stubs.Nop; + _timer = null; + + timer.Dispose(); + } + } + } +#else + class Timer : IDisposable + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + + private Action<object> _action; + private System.Threading.Timer _timer; + + private bool _hasAdded; + private bool _hasRemoved; + + public Timer(Action<object> action, object state, TimeSpan dueTime) + { + _action = action; + _timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + + lock (s_timers) + { + if (!_hasRemoved) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + + _hasAdded = true; + } + } + } + + private void Tick(object state) + { + try + { + _action(state); + } + finally + { + Dispose(); + } + } + + public void Dispose() + { + _action = Stubs<object>.Ignore; + + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + if (!_hasRemoved) + { + timer = _timer; + _timer = null; + + if (_hasAdded && timer != null) + s_timers.Remove(timer); + + _hasRemoved = true; + } + } + + if (timer != null) + timer.Dispose(); + } + } + + class PeriodicTimer : IDisposable + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + + private Action _action; + private System.Threading.Timer _timer; + + public PeriodicTimer(Action action, TimeSpan period) + { + _action = action; + _timer = new System.Threading.Timer(Tick, null, period, period); + + lock (s_timers) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + } + } + + private void Tick(object state) + { + _action(); + } + + public void Dispose() + { + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + timer = _timer; + _timer = null; + + if (timer != null) + s_timers.Remove(timer); + } + + if (timer != null) + { + timer.Dispose(); + _action = Stubs.Nop; + } + } + } +#endif + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs new file mode 100644 index 0000000..8739895 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs @@ -0,0 +1,375 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +#if NO_SEMAPHORE +using System.Reactive.Threading; +#endif + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on a designated thread. + /// </summary> + public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable + { + #region Fields + + /// <summary> + /// Counter for diagnostic purposes, to name the threads. + /// </summary> + private static int s_counter; + + /// <summary> + /// Thread factory function. + /// </summary> + private readonly Func<ThreadStart, Thread> _threadFactory; + + /// <summary> + /// Stopwatch for timing free of absolute time dependencies. + /// </summary> + private IStopwatch _stopwatch; + + /// <summary> + /// Thread used by the event loop to run work items on. No work should be run on any other thread. + /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled. + /// </summary> + private Thread _thread; + + /// <summary> + /// Gate to protect data structures, including the work queue and the ready list. + /// </summary> + private readonly object _gate; + + /// <summary> + /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer + /// expires and moves on to the next item in the queue. + /// </summary> +#if !NO_CDS + private readonly SemaphoreSlim _evt; +#else + private readonly Semaphore _evt; +#endif + + /// <summary> + /// Queue holding work items. Protected by the gate. + /// </summary> + private readonly SchedulerQueue<TimeSpan> _queue; + + /// <summary> + /// Queue holding items that are ready to be run as soon as possible. Protected by the gate. + /// </summary> + private readonly Queue<ScheduledItem<TimeSpan>> _readyList; + + /// <summary> + /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next + /// item is still the same. If not, a new timer needs to be started (see below). + /// </summary> + private ScheduledItem<TimeSpan> _nextItem; + + /// <summary> + /// Disposable that always holds the timer to dispatch the first element in the queue. + /// </summary> + private readonly SerialDisposable _nextTimer; + + /// <summary> + /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to + /// wake up the event loop thread, which will subsequently abandon all work. + /// </summary> + private bool _disposed; + + #endregion + + #region Constructors + + /// <summary> + /// Creates an object that schedules units of work on a designated thread. + /// </summary> + public EventLoopScheduler() + : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true }) + { + } + +#if !NO_THREAD + /// <summary> + /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options. + /// </summary> + /// <param name="threadFactory">Factory function for thread creation.</param> + /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception> + public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory) + { + if (threadFactory == null) + throw new ArgumentNullException("threadFactory"); +#else + internal EventLoopScheduler(Func<ThreadStart, Thread> threadFactory) + { +#endif + _threadFactory = threadFactory; + _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + _gate = new object(); + +#if !NO_CDS + _evt = new SemaphoreSlim(0); +#else + _evt = new Semaphore(0, int.MaxValue); +#endif + _queue = new SchedulerQueue<TimeSpan>(); + _readyList = new Queue<ScheduledItem<TimeSpan>>(); + + _nextTimer = new SerialDisposable(); + + ExitIfEmpty = false; + } + + #endregion + + #region Properties + + /// <summary> + /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work + /// is scheduled afterwards, a new event loop thread is created. This property is used by the + /// NewThreadScheduler which uses an event loop for its recursive invocations. + /// </summary> + internal bool ExitIfEmpty + { + get; + set; + } + + #endregion + + #region Public methods + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var due = _stopwatch.Elapsed + dueTime; + var si = new ScheduledItem<TimeSpan, TState>(this, state, action, due); + + lock (_gate) + { + if (_disposed) + throw new ObjectDisposedException(""); + + if (dueTime <= TimeSpan.Zero) + { + _readyList.Enqueue(si); + _evt.Release(); + } + else + { + _queue.Enqueue(si); + _evt.Release(); + } + + EnsureThread(); + } + + return Disposable.Create(si.Cancel); + } + + /// <summary> + /// Schedules a periodic piece of work on the designated thread. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var start = _stopwatch.Elapsed; + var next = start + period; + + var state1 = state; + + var d = new MultipleAssignmentDisposable(); + var gate = new AsyncLock(); + + var tick = default(Func<IScheduler, object, IDisposable>); + tick = (self_, _) => + { + next += period; + + d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick); + + gate.Wait(() => + { + state1 = action(state1); + }); + + return Disposable.Empty; + }; + + d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick); + + return new CompositeDisposable(d, gate); + } + +#if !NO_STOPWATCH + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + + /// <summary> + /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned. + /// </summary> + public void Dispose() + { + lock (_gate) + { + if (!_disposed) + { + _disposed = true; + _nextTimer.Dispose(); + _evt.Release(); + } + } + } + + #endregion + + #region Private implementation + + /// <summary> + /// Ensures there is an event loop thread running. Should be called under the gate. + /// </summary> + private void EnsureThread() + { + if (_thread == null) + { + _thread = _threadFactory(Run); + _thread.Start(); + } + } + + /// <summary> + /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event + /// which gets set by calls to Schedule, the next item timer, or calls to Dispose. + /// </summary> + private void Run() + { + while (true) + { +#if !NO_CDS + _evt.Wait(); +#else + _evt.WaitOne(); +#endif + + var ready = default(ScheduledItem<TimeSpan>[]); + + lock (_gate) + { + // + // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the + // loop immediately. Subsequent calls to Schedule won't ever create a new thread. + // + if (_disposed) + { + ((IDisposable)_evt).Dispose(); + return; + } + + while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) + { + var item = _queue.Dequeue(); + _readyList.Enqueue(item); + } + + if (_queue.Count > 0) + { + var next = _queue.Peek(); + if (next != _nextItem) + { + _nextItem = next; + + var due = next.DueTime - _stopwatch.Elapsed; + _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due); + } + } + + if (_readyList.Count > 0) + { + ready = _readyList.ToArray(); + _readyList.Clear(); + } + } + + if (ready != null) + { + foreach (var item in ready) + { + if (!item.IsCanceled) + item.Invoke(); + } + } + + if (ExitIfEmpty) + { + lock (_gate) + { + if (_readyList.Count == 0 && _queue.Count == 0) + { + _thread = null; + return; + } + } + } + } + } + + private void Tick(object state) + { + lock (_gate) + { + if (!_disposed) + { + var item = (ScheduledItem<TimeSpan>)state; + if (_queue.Remove(item)) + { + _readyList.Enqueue(item); + } + + _evt.Release(); + } + } + } + + #endregion + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs new file mode 100644 index 0000000..b111554 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs @@ -0,0 +1,194 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules each unit of work on a separate thread. + /// </summary> + public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic + { + internal static readonly NewThreadScheduler s_instance = new NewThreadScheduler(); + + private readonly Func<ThreadStart, Thread> _threadFactory; + + /// <summary> + /// Creates an object that schedules each unit of work on a separate thread. + /// </summary> + public NewThreadScheduler() + : this(action => new Thread(action)) + { + } + + /// <summary> + /// Gets an instance of this scheduler that uses the default Thread constructor. + /// </summary> + public static NewThreadScheduler Default + { + get + { + return s_instance; + } + } + +#if !NO_THREAD + /// <summary> + /// Creates an object that schedules each unit of work on a separate thread. + /// </summary> + /// <param name="threadFactory">Factory function for thread creation.</param> + /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception> + public NewThreadScheduler(Func<ThreadStart, Thread> threadFactory) + { + if (threadFactory == null) + throw new ArgumentNullException("threadFactory"); +#else + private NewThreadScheduler(Func<ThreadStart, Thread> threadFactory) + { +#endif + _threadFactory = threadFactory; + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var scheduler = new EventLoopScheduler(_threadFactory); + scheduler.ExitIfEmpty = true; + return scheduler.Schedule(state, dueTime, action); + } + + /// <summary> + /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new BooleanDisposable(); + + var thread = _threadFactory(() => + { + // + // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning + // requires us to ensure the scheduled work gets an opportunity to observe + // the cancellation request. + // + action(state, d); + }); + + thread.Start(); + + return d; + } + + /// <summary> + /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var periodic = new Periodic<TState>(state, period, action); + + var thread = _threadFactory(periodic.Run); + thread.Start(); + + return periodic; + } + + class Periodic<TState> : IDisposable + { + private readonly IStopwatch _stopwatch; + private readonly TimeSpan _period; + private readonly Func<TState, TState> _action; + + private readonly object _cancel = new object(); + private volatile bool _done; + + private TState _state; + private TimeSpan _next; + + public Periodic(TState state, TimeSpan period, Func<TState, TState> action) + { + _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + _period = period; + _action = action; + + _state = state; + _next = period; + } + + public void Run() + { + while (!_done) + { + var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed); + + lock (_cancel) + { + if (Monitor.Wait(_cancel, timeout)) + return; + } + + _state = _action(_state); + _next += _period; + } + } + + public void Dispose() + { + _done = true; + + lock (_cancel) + { + Monitor.Pulse(_cancel); + } + } + } + +#if !NO_STOPWATCH + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs new file mode 100644 index 0000000..a84588b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs @@ -0,0 +1,230 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_TPL +using System.Reactive.Disposables; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the Task Parallel Library (TPL) task pool. + /// </summary> + /// <seealso cref="TaskPoolScheduler.Default">Instance of this type using the default TaskScheduler to schedule work on the TPL task pool.</seealso> + public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic + { + private static readonly TaskPoolScheduler s_instance = new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)); + private readonly TaskFactory taskFactory; + + /// <summary> + /// Creates an object that schedules units of work using the provided TaskFactory. + /// </summary> + /// <param name="taskFactory">Task factory used to create tasks to run units of work.</param> + /// <exception cref="ArgumentNullException"><paramref name="taskFactory"/> is null.</exception> + public TaskPoolScheduler(TaskFactory taskFactory) + { + if (taskFactory == null) + throw new ArgumentNullException("taskFactory"); + + this.taskFactory = taskFactory; + } + + /// <summary> + /// Gets an instance of this scheduler that uses the default TaskScheduler. + /// </summary> + public static TaskPoolScheduler Default + { + get + { + return s_instance; + } + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SerialDisposable(); + var cancelable = new CancellationDisposable(); + d.Disposable = cancelable; + taskFactory.StartNew(() => + { + // + // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing + // helper thread. + // + // Our manual escalation based on the creation of a throwing thread was merely to + // expedite the process of throwing the exception that would otherwise occur on the + // finalizer thread at a later point during the app's lifetime. + // + // However, it also prevented applications from observing the exception through + // the TaskScheduler.UnobservedTaskException static event. Also, starting form .NET + // 4.5, the default behavior of the task pool is not to take down the application + // when an exception goes unobserved (done as part of the async/await work). It'd + // be weird for Rx not to follow the platform defaults. + // + // General implementation guidelines for schedulers (in order of importance): + // + // 1. Always thunk through to the underlying infrastructure with a wrapper that's as tiny as possible. + // 2. Global exception notification/handling mechanisms shouldn't be bypassed. + // 3. Escalation behavior for exceptions is left to the underlying infrastructure. + // + // The Catch extension method for IScheduler (added earlier) allows to re-route + // exceptions at stage 2. If the exception isn't handled at the Rx level, it + // propagates by means of a rethrow, falling back to behavior in 3. + // + d.Disposable = action(this, state); + }, cancelable.Token); + return d; + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + +#if !NO_TASK_DELAY + var d = new MultipleAssignmentDisposable(); + + var ct = new CancellationDisposable(); + d.Disposable = ct; + + Task.Delay(dueTime, ct.Token).ContinueWith(_ => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler); + + return d; +#else + return DefaultScheduler.Instance.Schedule(state, dt, (_, state1) => Schedule(state1, action)); +#endif + } + + /// <summary> + /// Schedules a long-running task by creating a new task using TaskCreationOptions.LongRunning. Cancellation happens through polling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + var d = new BooleanDisposable(); + + taskFactory.StartNew(() => + { + // + // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning + // requires us to ensure the scheduled work gets an opportunity to observe + // the cancellation request. + // + action(state, d); + }, TaskCreationOptions.LongRunning); + + return d; + } + +#if !NO_STOPWATCH + /// <summary> + /// Gets a new stopwatch ob ject. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + + /// <summary> + /// Schedules a periodic piece of work by running a platform-specific timer to create tasks periodically. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + +#if !NO_TASK_DELAY + var cancel = new CancellationDisposable(); + + var state1 = state; + var gate = new AsyncLock(); + + var moveNext = default(Action); + moveNext = () => + { + Task.Delay(period, cancel.Token).ContinueWith( + _ => + { + moveNext(); + + gate.Wait(() => + { + state1 = action(state1); + }); + }, + CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler + ); + }; + + moveNext(); + + return new CompositeDisposable(cancel, gate); +#else + var state1 = state; + var gate = new AsyncLock(); + + var timer = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(() => + { + taskFactory.StartNew(() => + { + gate.Wait(() => + { + state1 = action(state1); + }); + }); + }, period); + + return new CompositeDisposable(timer, gate); +#endif + } + } +} +#endif diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs new file mode 100644 index 0000000..691365b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_THREAD +using System; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + class Thread + { + private readonly ThreadStart _start; + + public Thread(ThreadStart start) + { + _start = start; + } + + public string Name { get; set; } + public bool IsBackground { get; set; } + + public void Start() + { + System.Threading.Tasks.Task.Factory.StartNew(Run, System.Threading.Tasks.TaskCreationOptions.LongRunning); + } + + private void Run() + { + _start(); + } + } + + delegate void ThreadStart(); +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs new file mode 100644 index 0000000..1b8579d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if WINDOWS +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using Windows.System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the Windows Runtime thread pool. + /// </summary> + /// <seealso cref="ThreadPoolScheduler.Default">Singleton instance of this type exposed through this static property.</seealso> + [CLSCompliant(false)] + public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerPeriodic + { + private readonly WorkItemPriority _priority; + private readonly WorkItemOptions _options; + private static Lazy<ThreadPoolScheduler> s_default = new Lazy<ThreadPoolScheduler>(() => new ThreadPoolScheduler()); + + /// <summary> + /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool. + /// </summary> + public ThreadPoolScheduler() + { + } + + /// <summary> + /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool with the given priority. + /// </summary> + /// <param name="priority">Priority for scheduled units of work.</param> + public ThreadPoolScheduler(WorkItemPriority priority) + { + _priority = priority; + _options = WorkItemOptions.None; + } + + /// <summary> + /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool with the given priority. + /// </summary> + /// <param name="priority">Priority for scheduled units of work.</param> + /// <param name="options">Options that configure how work is scheduled.</param> + public ThreadPoolScheduler(WorkItemPriority priority, WorkItemOptions options) + { + _priority = priority; + _options = options; + } + + /// <summary> + /// Gets the singleton instance of the Windows Runtime thread pool scheduler. + /// </summary> + public static ThreadPoolScheduler Default + { + get + { + return s_default.Value; + } + } + + /// <summary> + /// Gets the priority at which work is scheduled. + /// </summary> + public WorkItemPriority Priority + { + get { return _priority; } + } + + /// <summary> + /// Gets the options that configure how work is scheduled. + /// </summary> + public WorkItemOptions Options + { + get { return _options; } + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + var res = global::Windows.System.Threading.ThreadPool.RunAsync(iaa => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, _priority, _options); + + return new CompositeDisposable( + d, + Disposable.Create(res.Cancel) + ); + } + + /// <summary> + /// Schedules an action to be executed after dueTime, using a Windows.System.Threading.ThreadPoolTimer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + + if (dt.Ticks == 0) + return Schedule(state, action); + + var d = new SingleAssignmentDisposable(); + + var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer( + tpt => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, + dt + ); + + return new CompositeDisposable( + d, + Disposable.Create(res.Cancel) + ); + } + + /// <summary> + /// Schedules a periodic piece of work, using a Windows.System.Threading.ThreadPoolTimer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than one millisecond.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + // + // The WinRT thread pool is based on the Win32 thread pool and cannot handle + // sub-1ms resolution. When passing a lower period, we get single-shot + // timer behavior instead. See MSDN documentation for CreatePeriodicTimer + // for more information. + // + if (period < TimeSpan.FromMilliseconds(1)) + throw new ArgumentOutOfRangeException("period", Strings_PlatformServices.WINRT_NO_SUB1MS_TIMERS); + if (action == null) + throw new ArgumentNullException("action"); + + var state1 = state; + var gate = new AsyncLock(); + + var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer( + tpt => + { + gate.Wait(() => + { + state1 = action(state1); + }); + }, + period + ); + + return Disposable.Create(() => + { + res.Cancel(); + gate.Dispose(); + action = Stubs<TState>.I; + }); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs new file mode 100644 index 0000000..58fd008 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs @@ -0,0 +1,427 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !WINDOWS && !NO_THREAD +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the CLR thread pool. + /// </summary> + /// <seealso cref="ThreadPoolScheduler.Instance">Singleton instance of this type exposed through this static property.</seealso> + public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic + { + private static readonly ThreadPoolScheduler s_instance = new ThreadPoolScheduler(); + + /// <summary> + /// Gets the singleton instance of the CLR thread pool scheduler. + /// </summary> + public static ThreadPoolScheduler Instance + { + get + { + return s_instance; + } + } + + ThreadPoolScheduler() + { + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + ThreadPool.QueueUserWorkItem(_ => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, null); + + return d; + } + + /// <summary> + /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + + return new Timer<TState>(this, state, dt, action); + } + + /// <summary> + /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return NewThreadScheduler.Default.ScheduleLongRunning(state, action); + } + +#if !NO_STOPWATCH + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public override IStopwatch StartStopwatch() + { + // + // Strictly speaking, this explicit override is not necessary because the base implementation calls into + // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices + // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. + // + return new StopwatchImpl(); + } +#endif + + /// <summary> + /// Schedules a periodic piece of work, using a System.Threading.Timer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than or equal to zero.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + // + // MSDN documentation states the following: + // + // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once; + // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method." + // + if (period <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + return new PeriodicTimer<TState>(state, period, action); + } + +#if USE_TIMER_SELF_ROOT + + // + // See ConcurrencyAbstractionLayerImpl.cs for more information about the code + // below and its timer rooting behavior. + // + + sealed class Timer<TState> : IDisposable + { + private readonly MultipleAssignmentDisposable _disposable; + + private readonly IScheduler _parent; + private readonly TState _state; + private Func<IScheduler, TState, IDisposable> _action; + + private volatile System.Threading.Timer _timer; + + public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + _parent = parent; + _state = state; + _action = action; + + _disposable = new MultipleAssignmentDisposable(); + _disposable.Disposable = Disposable.Create(Stop); + + // Don't want the spin wait in Tick to get stuck if this thread gets aborted. + try { } + finally + { + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + } + } + + private void Tick(object state) + { + try + { + _disposable.Disposable = _action(_parent, _state); + } + finally + { + SpinWait.SpinUntil(IsTimerAssigned); + Stop(); + } + } + + private bool IsTimerAssigned() + { + return _timer != null; + } + + public void Dispose() + { + _disposable.Dispose(); + } + + private void Stop() + { + var timer = _timer; + if (timer != TimerStubs.Never) + { + _action = Nop; + _timer = TimerStubs.Never; + + timer.Dispose(); + } + } + + private IDisposable Nop(IScheduler scheduler, TState state) + { + return Disposable.Empty; + } + } + + sealed class PeriodicTimer<TState> : IDisposable + { + private TState _state; + private Func<TState, TState> _action; + + private readonly AsyncLock _gate; + private volatile System.Threading.Timer _timer; + + public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action) + { + _state = state; + _action = action; + + _gate = new AsyncLock(); + + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, period, period); + } + + private void Tick(object state) + { + _gate.Wait(() => + { + _state = _action(_state); + }); + } + + public void Dispose() + { + var timer = _timer; + if (timer != null) + { + _action = Stubs<TState>.I; + _timer = null; + + timer.Dispose(); + _gate.Dispose(); + } + } + } +#else + abstract class Timer + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + protected static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + protected static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + } + + sealed class Timer<TState> : Timer, IDisposable + { + private readonly MultipleAssignmentDisposable _disposable; + + private readonly IScheduler _parent; + private readonly TState _state; + + private Func<IScheduler, TState, IDisposable> _action; + private System.Threading.Timer _timer; + + private bool _hasAdded; + private bool _hasRemoved; + + public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + _disposable = new MultipleAssignmentDisposable(); + _disposable.Disposable = Disposable.Create(Unroot); + + _parent = parent; + _state = state; + + _action = action; + _timer = new System.Threading.Timer(Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + + lock (s_timers) + { + if (!_hasRemoved) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + + _hasAdded = true; + } + } + } + + private void Tick(object state) + { + try + { + _disposable.Disposable = _action(_parent, _state); + } + finally + { + Unroot(); + } + } + + private void Unroot() + { + _action = Nop; + + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + if (!_hasRemoved) + { + timer = _timer; + _timer = null; + + if (_hasAdded && timer != null) + s_timers.Remove(timer); + + _hasRemoved = true; + } + } + + if (timer != null) + timer.Dispose(); + } + + private IDisposable Nop(IScheduler scheduler, TState state) + { + return Disposable.Empty; + } + + public void Dispose() + { + _disposable.Dispose(); + } + } + + abstract class PeriodicTimer + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + protected static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + protected static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + } + + sealed class PeriodicTimer<TState> : PeriodicTimer, IDisposable + { + private readonly AsyncLock _gate; + + private TState _state; + private Func<TState, TState> _action; + private System.Threading.Timer _timer; + + public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action) + { + _gate = new AsyncLock(); + + _state = state; + _action = action; + _timer = new System.Threading.Timer(Tick, null, period, period); + + lock (s_timers) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + } + } + + private void Tick(object state) + { + _gate.Wait(() => + { + _state = _action(_state); + }); + } + + public void Dispose() + { + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + timer = _timer; + _timer = null; + + if (timer != null) + s_timers.Remove(timer); + } + + if (timer != null) + { + timer.Dispose(); + _gate.Dispose(); + _action = Stubs<TState>.I; + } + } + } +#endif + } +} +#endif
\ No newline at end of file |