diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs | 623 |
1 files changed, 0 insertions, 623 deletions
diff --git a/Rx.NET/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs b/Rx.NET/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs deleted file mode 100644 index c40959f..0000000 --- a/Rx.NET/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs +++ /dev/null @@ -1,623 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -using System; -using System.Diagnostics; -using System.Reactive.Disposables; -using System.Reactive.PlatformServices; -using System.Threading; - -namespace System.Reactive.Concurrency -{ - public static partial class Scheduler - { - /// <summary> - /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. - /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. - /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. - /// Otherwise, the periodic task will be emulated using recursive scheduling. - /// </summary> - /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> - /// <param name="scheduler">The scheduler to run periodic work on.</param> - /// <param name="state">Initial state passed to the action upon the first iteration.</param> - /// <param name="period">Period for running the work periodically.</param> - /// <param name="action">Action to be executed, potentially updating the state.</param> - /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> - /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> - /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> - public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) - { - if (scheduler == null) - throw new ArgumentNullException("scheduler"); - if (period < TimeSpan.Zero) - throw new ArgumentOutOfRangeException("period"); - if (action == null) - throw new ArgumentNullException("action"); - - return SchedulePeriodic_(scheduler, state, period, action); - } - - /// <summary> - /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. - /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. - /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. - /// Otherwise, the periodic task will be emulated using recursive scheduling. - /// </summary> - /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> - /// <param name="scheduler">Scheduler to execute the action on.</param> - /// <param name="state">State passed to the action to be executed.</param> - /// <param name="period">Period for running the work periodically.</param> - /// <param name="action">Action to be executed.</param> - /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> - /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> - /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> - public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler, TState state, TimeSpan period, Action<TState> action) - { - if (scheduler == null) - throw new ArgumentNullException("scheduler"); - if (period < TimeSpan.Zero) - throw new ArgumentOutOfRangeException("period"); - if (action == null) - throw new ArgumentNullException("action"); - - return SchedulePeriodic_(scheduler, state, period, state_ => { action(state_); return state_; }); - } - - /// <summary> - /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. - /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. - /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. - /// Otherwise, the periodic task will be emulated using recursive scheduling. - /// </summary> - /// <param name="scheduler">Scheduler to execute the action on.</param> - /// <param name="period">Period for running the work periodically.</param> - /// <param name="action">Action to be executed.</param> - /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> - /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> - /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> - public static IDisposable SchedulePeriodic(this IScheduler scheduler, TimeSpan period, Action action) - { - if (scheduler == null) - throw new ArgumentNullException("scheduler"); - if (period < TimeSpan.Zero) - throw new ArgumentOutOfRangeException("period"); - if (action == null) - throw new ArgumentNullException("action"); - - return SchedulePeriodic_(scheduler, default(object), period, _ => { action(); return _; }); - } - - /// <summary> - /// Starts a new stopwatch object by dynamically discovering the scheduler's capabilities. - /// If the scheduler provides stopwatch functionality, the request will be forwarded to the stopwatch provider implementation. - /// Otherwise, the stopwatch will be emulated using the scheduler's notion of absolute time. - /// </summary> - /// <param name="scheduler">Scheduler to obtain a stopwatch for.</param> - /// <returns>New stopwatch object; started at the time of the request.</returns> - /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> - /// <remarks>The resulting stopwatch object can have non-monotonic behavior.</remarks> - public static IStopwatch StartStopwatch(this IScheduler scheduler) - { - if (scheduler == null) - throw new ArgumentNullException("scheduler"); - - // - // All schedulers deriving from LocalScheduler will automatically pick up this - // capability based on a local stopwatch, typically using QueryPerformanceCounter - // through the System.Diagnostics.Stopwatch class. - // - // Notice virtual time schedulers do implement this facility starting from Rx v2.0, - // using subtraction of their absolute time notion to compute elapsed time values. - // This is fine because those schedulers do not allow the clock to go back in time. - // - // For schedulers that don't have a stopwatch, we have to pick some fallback logic - // here. We could either dismiss the scheduler's notion of time and go for the CAL's - // stopwatch facility, or go with a stopwatch based on "scheduler.Now", which has - // the drawback of potentially going back in time: - // - // - Using the CAL's stopwatch facility causes us to abondon the scheduler's - // potentially virtualized notion of time, always going for the local system - // time instead. - // - // - Using the scheduler's Now property for calculations can break monotonicity, - // and there's no right answer on how to deal with jumps back in time. - // - // However, even the built-in stopwatch in the BCL can potentially fall back to - // subtraction of DateTime values in case no high-resolution performance counter is - // available, causing monotonicity to break down. We're not trying to solve this - // problem there either (though we could check IsHighResolution and smoothen out - // non-monotonic points somehow), so we pick the latter option as the lesser of - // two evils (also because it should occur rarely). - // - // Users of the stopwatch retrieved by this method could detect non-sensical data - // revealing a jump back in time, or implement custom fallback logic like the one - // shown below. - // - var swp = scheduler.AsStopwatchProvider(); - if (swp != null) - return swp.StartStopwatch(); - - return new EmulatedStopwatch(scheduler); - } - - private static IDisposable SchedulePeriodic_<TState>(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) - { - // - // Design rationale: - // - // In Rx v1.x, we employed recursive scheduling for periodic tasks. The following code - // fragment shows how the Timer (and hence Interval) function used to be implemented: - // - // var p = Normalize(period); - // - // return new AnonymousObservable<long>(observer => - // { - // var d = dueTime; - // long count = 0; - // return scheduler.Schedule(d, self => - // { - // if (p > TimeSpan.Zero) - // { - // var now = scheduler.Now; - // d = d + p; - // if (d <= now) - // d = now + p; - // } - // - // observer.OnNext(count); - // count = unchecked(count + 1); - // self(d); - // }); - // }); - // - // Despite the purity of this approach, it suffered from a set of drawbacks: - // - // 1) Usage of IScheduler.Now to correct for time drift did have a positive effect for - // a limited number of scenarios, in particular when a short period was used. The - // major issues with this are: - // - // a) Relying on absolute time at the LINQ layer in Rx's layer map, causing issues - // when the system clock changes. Various customers hit this issue, reported to - // us on the MSDN forums. Basically, when the clock goes forward, the recursive - // loop wants to catch up as quickly as it can; when it goes backwards, a long - // silence will occur. (See 2 for a discussion of WP7 related fixes.) - // - // b) Even if a) would be addressed by using Rx v2.0's capabilities to monitor for - // system clock changes, the solution would violate the reasonable expectation - // of operators overloads using TimeSpan *not* relying on absolute time. - // - // c) Drift correction doesn't work for large periods when the system encounters - // systematic drift. For example, in the lab we've seen cases of drift up to - // tens of seconds on a 24 hour timeframe. Correcting for this drift by making - // a recursive call with a due time of 24 * 3600 with 10 seconds of adjustment - // won't fix systematic drift. - // - // 2) This implementation has been plagued with issues around application container - // lifecycle models, in particular Windows Phone 7's model of tombstoning and in - // particular its "dormant state". This feature was introduced in Mango to enable - // fast application switching. Essentially, the phone's OS puts the application - // in a suspended state when the user navigates "forward" (or takes an incoming - // call for instance). When the application is woken up again, threads are resumed - // and we're faced with an illusion of missed events due to the use of absolute - // time, not relative to how the application observes it. This caused nightmare - // scenarios of fast battery drain due to the flood of catch-up work. - // - // See http://msdn.microsoft.com/en-us/library/ff817008(v=vs.92).aspx for more - // information on this. - // - // 3) Recursive scheduling imposes a non-trivial cost due to the creation of many - // single-shot timers and closures. For high frequency timers, this can cause a - // lot of churn in the GC, which we like to avoid (operators shouldn't have hidden - // linear - or worse - allocation cost). - // - // Notice these drawbacks weren't limited to the use of Timer and Interval directly, - // as many operators such as Sample, Buffer, and Window used such sequences for their - // periodic behavior (typically by delegating to a more general overload). - // - // As a result, in Rx v2.0, we took the decision to improve periodic timing based on - // the following design decisions: - // - // 1) When the scheduler has the ability to run a periodic task, it should implement - // the ISchedulerPeriodic interface and expose it through the IServiceProvider - // interface. Passing the intent of the user through all layers of Rx, down to the - // underlying infrastructure provides delegation of responsibilities. This allows - // the target scheduler to optimize execution in various ways, e.g. by employing - // techniques such as timer coalescing. - // - // See http://www.bing.com/search?q=windows+timer+coalescing for information on - // techniques like timer coalescing which may be applied more aggressively in - // future OS releases in order to reduce power consumption. - // - // 2) Emulation of periodic scheduling is used to avoid breaking existing code that - // uses schedulers without this capability. We expect those fallback paths to be - // exercised rarely, though the use of DisableOptimizations can trigger them as - // well. In such cases we rely on stopwatches or a carefully crafted recursive - // scheme to deal with (or maximally compensate for) slippage or time. Behavior - // of periodic tasks is expected to be as follows: - // - // timer ticks 0-------1-------2-------3-------4-------5-------6----... - // | | | +====+ +==+ | | - // user code +~~~| +~| +~~~~~~~~~~~|+~~~~|+~~| +~~~| +~~| - // - // rather than the following scheme, where time slippage is introduced by user - // code running on the scheduler: - // - // timer ticks 0####-------1##-------2############-------3#####-----... - // | | | | - // user code +~~~| +~| +~~~~~~~~~~~| +~~~~| - // - // (Side-note: Unfortunately, we didn't reserve the name Interval for the latter - // behavior, but used it as an alias for "periodic scheduling" with - // the former behavior, delegating to the Timer implementation. One - // can simulate this behavior using Generate, which uses tail calls.) - // - // This behavior is important for operations like Sample, Buffer, and Window, all - // of which expect proper spacing of events, even if the user code takes a long - // time to complete (considered a bad practice nonetheless, cf. ObserveOn). - // - // 3) To deal with the issue of suspensions induced by application lifecycle events - // in Windows Phone and WinRT applications, we decided to hook available system - // events through IHostLifecycleNotifications, discovered through the PEP in order - // to maintain portability of the core of Rx. - // - var periodic = scheduler.AsPeriodic(); - if (periodic != null) - { - return periodic.SchedulePeriodic(state, period, action); - } - - var swp = scheduler.AsStopwatchProvider(); - if (swp != null) - { - var spr = new SchedulePeriodicStopwatch<TState>(scheduler, state, period, action, swp); - return spr.Start(); - } - else - { - var spr = new SchedulePeriodicRecursive<TState>(scheduler, state, period, action); - return spr.Start(); - } - } - - class SchedulePeriodicStopwatch<TState> - { - private readonly IScheduler _scheduler; - private readonly TimeSpan _period; - private readonly Func<TState, TState> _action; - private readonly IStopwatchProvider _stopwatchProvider; - - public SchedulePeriodicStopwatch(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action, IStopwatchProvider stopwatchProvider) - { - _scheduler = scheduler; - _period = period; - _action = action; - _stopwatchProvider = stopwatchProvider; - - _state = state; - _runState = STOPPED; - } - - private TState _state; - - private readonly object _gate = new object(); - private readonly AutoResetEvent _resumeEvent = new AutoResetEvent(false); - private volatile int _runState; - private IStopwatch _stopwatch; - private TimeSpan _nextDue; - private TimeSpan _suspendedAt; - private TimeSpan _inactiveTime; - - // - // State transition diagram: - // (c) - // +-----------<-----------+ - // / \ - // / (b) \ - // | +-->--SUSPENDED---+ - // (a) v / | - // ^----STOPPED -->-- RUNNING -->--+ v (e) - // \ | - // +-->--DISPOSED----$ - // (d) - // - // (a) Start --> call to Schedule the Tick method - // (b) Suspending event handler --> Tick gets blocked waiting for _resumeEvent - // (c) Resuming event handler --> _resumeEvent is signaled, Tick continues - // (d) Dispose returned object from Start --> scheduled work is cancelled - // (e) Dispose returned object from Start --> unblocks _resumeEvent, Tick exits - // - private const int STOPPED = 0; - private const int RUNNING = 1; - private const int SUSPENDED = 2; - private const int DISPOSED = 3; - - public IDisposable Start() - { - RegisterHostLifecycleEventHandlers(); - - _stopwatch = _stopwatchProvider.StartStopwatch(); - _nextDue = _period; - _runState = RUNNING; - - return new CompositeDisposable(2) - { - _scheduler.Schedule(_nextDue, Tick), - Disposable.Create(Cancel) - }; - } - - private void Tick(Action<TimeSpan> recurse) - { - _nextDue += _period; - _state = _action(_state); - - var next = default(TimeSpan); - - while (true) - { - var shouldWaitForResume = false; - - lock (_gate) - { - if (_runState == RUNNING) - { - // - // This is the fast path. We just let the stopwatch continue to - // run while we're suspended, but compensate for time that was - // recorded as inactive based on cumulative deltas computed in - // the suspend and resume event handlers. - // - next = Normalize(_nextDue - (_stopwatch.Elapsed - _inactiveTime)); - break; - } - else if (_runState == DISPOSED) - { - // - // In case the periodic job gets disposed but we are currently - // waiting to come back out of suspension, we should make sure - // we don't remain blocked indefinitely. Hence, we set the event - // in the Cancel method and trap this case here to bail out from - // the scheduled work gracefully. - // - return; - } - else - { - // - // This is the least common case where we got suspended and need - // to block such that future reevaluations of the next due time - // will pick up the cumulative inactive time delta. - // - Debug.Assert(_runState == SUSPENDED); - shouldWaitForResume = true; - } - } - - // - // Only happens in the SUSPENDED case; otherwise we will have broken from - // the loop or have quit the Tick method. After returning from the wait, - // we'll either be RUNNING again, quit due to a DISPOSED transition, or - // be extremely unlucky to find ourselves SUSPENDED again and be blocked - // once more. - // - if (shouldWaitForResume) - _resumeEvent.WaitOne(); - } - - recurse(next); - } - - private void Cancel() - { - UnregisterHostLifecycleEventHandlers(); - - lock (_gate) - { - _runState = DISPOSED; - - if (!Environment.HasShutdownStarted) - _resumeEvent.Set(); - } - } - - private void Suspending(object sender, HostSuspendingEventArgs args) - { - // - // The host is telling us we're about to be suspended. At this point, time - // computations will still be in a valid range (next <= _period), but after - // we're woken up again, Tick would start to go on a crucade to catch up. - // - // This has caused problems in the past, where the flood of events caused - // batteries to drain etc (see design rationale discussion higher up). - // - // In order to mitigate this problem, we force Tick to suspend before its - // next computation of the next due time. Notice we can't afford to block - // during the Suspending event handler; the host expects us to respond to - // this event quickly, such that we're not keeping the application from - // suspending promptly. - // - lock (_gate) - { - if (_runState == RUNNING) - { - _suspendedAt = _stopwatch.Elapsed; - _runState = SUSPENDED; - - if (!Environment.HasShutdownStarted) - _resumeEvent.Reset(); - } - } - } - - private void Resuming(object sender, HostResumingEventArgs args) - { - // - // The host is telling us we're being resumed. At this point, code will - // already be running in the process, so a past timer may still expire and - // cause the code in Tick to run. Two interleavings are possible now: - // - // 1) We enter the gate first, and will adjust the cumulative inactive - // time delta used for correction. The code in Tick will have the - // illusion nothing happened and find itself RUNNING when entering - // the gate, resuming activities as before. - // - // 2) The code in Tick enters the gate first, and takes notice of the - // currently SUSPENDED state. It leaves the gate, entering the wait - // state for _resumeEvent. Next, we enter to adjust the cumulative - // inactive time delta, switch to the RUNNING state and signal the - // event for Tick to carry on and recompute its next due time based - // on the new cumulative delta. - // - lock (_gate) - { - if (_runState == SUSPENDED) - { - _inactiveTime += _stopwatch.Elapsed - _suspendedAt; - _runState = RUNNING; - - if (!Environment.HasShutdownStarted) - _resumeEvent.Set(); - } - } - } - - private void RegisterHostLifecycleEventHandlers() - { - HostLifecycleService.Suspending += Suspending; - HostLifecycleService.Resuming += Resuming; - HostLifecycleService.AddRef(); - } - - private void UnregisterHostLifecycleEventHandlers() - { - HostLifecycleService.Suspending -= Suspending; - HostLifecycleService.Resuming -= Resuming; - HostLifecycleService.Release(); - } - } - - class SchedulePeriodicRecursive<TState> - { - private readonly IScheduler _scheduler; - private readonly TimeSpan _period; - private readonly Func<TState, TState> _action; - - public SchedulePeriodicRecursive(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) - { - _scheduler = scheduler; - _period = period; - _action = action; - - _state = state; - } - - private TState _state; - private int _pendingTickCount; - private IDisposable _cancel; - - public IDisposable Start() - { - _pendingTickCount = 0; - - var d = new SingleAssignmentDisposable(); - _cancel = d; - - d.Disposable = _scheduler.Schedule(TICK, _period, Tick); - - return d; - } - - // - // The protocol using the three commands is explained in the Tick implementation below. - // - private const int TICK = 0; - private const int DISPATCH_START = 1; - private const int DISPATCH_END = 2; - - private void Tick(int command, Action<int, TimeSpan> recurse) - { - switch (command) - { - case TICK: - // - // Ticks keep going at the specified periodic rate. We do a head call such - // that no slippage is introduced because of DISPATCH_START work involving - // user code that may take arbitrarily long. - // - recurse(TICK, _period); - - // - // If we're not transitioning from 0 to 1 pending tick, another processing - // request is in flight which will see a non-zero pending tick count after - // doing the final decrement, causing it to reschedule immediately. We can - // safely bail out, delegating work to the catch-up tail calls. - // - if (Interlocked.Increment(ref _pendingTickCount) == 1) - goto case DISPATCH_START; - - break; - - case DISPATCH_START: - try - { - _state = _action(_state); - } - catch (Exception e) - { - _cancel.Dispose(); - e.Throw(); - } - - // - // This is very subtle. We can't do a goto case DISPATCH_END here because it - // wouldn't introduce interleaving of periodic ticks that are due. In order - // to have best effort behavior for schedulers that don't have concurrency, - // we yield by doing a recursive call here. Notice this doesn't heal all of - // the problem, because the TICK commands that may be dispatched before the - // scheduled DISPATCH_END will do a "recurse(TICK, period)", which is relative - // from the point of entrance. Really all we're doing here is damage control - // for the case there's no stopwatch provider which should be rare (notice - // the LocalScheduler base class always imposes a stopwatch, but it can get - // disabled using DisableOptimizations; legacy implementations of schedulers - // from the v1.x days will not have a stopwatch). - // - recurse(DISPATCH_END, TimeSpan.Zero); - - break; - - case DISPATCH_END: - // - // If work was due while we were still running user code, the count will have - // been incremented by the periodic tick handler above. In that case, we will - // reschedule ourselves for dispatching work immediately. - // - // Notice we don't run a loop here, in order to allow interleaving of work on - // the scheduler by making recursive calls. In case we would use AsyncLock to - // ensure serialized execution the owner could get stuck in such a loop, thus - // we make tail calls to play nice with the scheduler. - // - if (Interlocked.Decrement(ref _pendingTickCount) > 0) - recurse(DISPATCH_START, TimeSpan.Zero); - - break; - } - } - } - - class EmulatedStopwatch : IStopwatch - { - private readonly IScheduler _scheduler; - private readonly DateTimeOffset _start; - - public EmulatedStopwatch(IScheduler scheduler) - { - _scheduler = scheduler; - _start = _scheduler.Now; - } - - public TimeSpan Elapsed - { - get { return Scheduler.Normalize(_scheduler.Now - _start); } - } - } - } -} |