diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs | 415 |
1 files changed, 415 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs b/Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs new file mode 100644 index 0000000..4d67877 --- /dev/null +++ b/Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs @@ -0,0 +1,415 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Globalization; +using System.Reactive.Disposables; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Base class for virtual time schedulers. + /// </summary> + /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam> + /// <typeparam name="TRelative">Relative time representation type.</typeparam> + public abstract class VirtualTimeSchedulerBase<TAbsolute, TRelative> : IScheduler, IServiceProvider, IStopwatchProvider + where TAbsolute : IComparable<TAbsolute> + { + /// <summary> + /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value. + /// </summary> + protected VirtualTimeSchedulerBase() + : this(default(TAbsolute), Comparer<TAbsolute>.Default) + { + } + + /// <summary> + /// Creates a new virtual time scheduler with the specified initial clock value and absolute time comparer. + /// </summary> + /// <param name="initialClock">Initial value for the clock.</param> + /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param> + /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is null.</exception> + protected VirtualTimeSchedulerBase(TAbsolute initialClock, IComparer<TAbsolute> comparer) + { + if (comparer == null) + throw new ArgumentNullException("comparer"); + + Clock = initialClock; + Comparer = comparer; + } + + /// <summary> + /// Adds a relative time value to an absolute time value. + /// </summary> + /// <param name="absolute">Absolute time value.</param> + /// <param name="relative">Relative time value to add.</param> + /// <returns>The resulting absolute time sum value.</returns> + protected abstract TAbsolute Add(TAbsolute absolute, TRelative relative); + + /// <summary> + /// Converts the absolute time value to a DateTimeOffset value. + /// </summary> + /// <param name="absolute">Absolute time value to convert.</param> + /// <returns>The corresponding DateTimeOffset value.</returns> + protected abstract DateTimeOffset ToDateTimeOffset(TAbsolute absolute); + + /// <summary> + /// Converts the TimeSpan value to a relative time value. + /// </summary> + /// <param name="timeSpan">TimeSpan value to convert.</param> + /// <returns>The corresponding relative time value.</returns> + protected abstract TRelative ToRelative(TimeSpan timeSpan); + + /// <summary> + /// Gets whether the scheduler is enabled to run work. + /// </summary> + public bool IsEnabled + { + get; + private set; + } + + /// <summary> + /// Gets the comparer used to compare absolute time values. + /// </summary> + protected IComparer<TAbsolute> Comparer + { + get; + private set; + } + + /// <summary> + /// Schedules an action to be executed at dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + public abstract IDisposable ScheduleAbsolute<TState>(TState state, TAbsolute dueTime, Func<IScheduler, TState, IDisposable> action); + + /// <summary> + /// Schedules an action to be executed at dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + public IDisposable ScheduleRelative<TState>(TState state, TRelative dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var runAt = Add(Clock, dueTime); + + return ScheduleAbsolute(state, runAt, action); + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAbsolute(state, Clock, action); + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleRelative(state, ToRelative(dueTime), action); + } + + /// <summary> + /// Schedules an action to be executed at dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleRelative(state, ToRelative(dueTime - Now), action); + } + + /// <summary> + /// Starts the virtual time scheduler. + /// </summary> + public void Start() + { + if (!IsEnabled) + { + IsEnabled = true; + do + { + var next = GetNext(); + if (next != null) + { + if (Comparer.Compare(next.DueTime, Clock) > 0) + Clock = next.DueTime; + next.Invoke(); + } + else + IsEnabled = false; + } while (IsEnabled); + } + } + + /// <summary> + /// Stops the virtual time scheduler. + /// </summary> + public void Stop() + { + IsEnabled = false; + } + + /// <summary> + /// Advances the scheduler's clock to the specified time, running all work till that point. + /// </summary> + /// <param name="time">Absolute time to advance the scheduler's clock to.</param> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="time"/> is in the past.</exception> + /// <exception cref="InvalidOperationException">The scheduler is already running. VirtualTimeScheduler doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use <see cref="Sleep"/>.</exception> + public void AdvanceTo(TAbsolute time) + { + var dueToClock = Comparer.Compare(time, Clock); + if (dueToClock < 0) + throw new ArgumentOutOfRangeException("time"); + + if (dueToClock == 0) + return; + + if (!IsEnabled) + { + IsEnabled = true; + do + { + var next = GetNext(); + if (next != null && Comparer.Compare(next.DueTime, time) <= 0) + { + if (Comparer.Compare(next.DueTime, Clock) > 0) + Clock = next.DueTime; + next.Invoke(); + } + else + IsEnabled = false; + } while (IsEnabled); + + Clock = time; + } + else + { + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, "AdvanceTo")); + } + } + + /// <summary> + /// Advances the scheduler's clock by the specified relative time, running all work scheduled for that timespan. + /// </summary> + /// <param name="time">Relative time to advance the scheduler's clock by.</param> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="time"/> is negative.</exception> + /// <exception cref="InvalidOperationException">The scheduler is already running. VirtualTimeScheduler doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use <see cref="Sleep"/>.</exception> + public void AdvanceBy(TRelative time) + { + var dt = Add(Clock, time); + + var dueToClock = Comparer.Compare(dt, Clock); + if (dueToClock < 0) + throw new ArgumentOutOfRangeException("time"); + + if (dueToClock == 0) + return; + + if (!IsEnabled) + { + AdvanceTo(dt); + } + else + { + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, "AdvanceBy")); + } + } + + /// <summary> + /// Advances the scheduler's clock by the specified relative time. + /// </summary> + /// <param name="time">Relative time to advance the scheduler's clock by.</param> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="time"/> is negative.</exception> + public void Sleep(TRelative time) + { + var dt = Add(Clock, time); + + var dueToClock = Comparer.Compare(dt, Clock); + if (dueToClock < 0) + throw new ArgumentOutOfRangeException("time"); + + Clock = dt; + } + + /// <summary> + /// Gets the scheduler's absolute time clock value. + /// </summary> + public TAbsolute Clock + { + get; + protected set; + } + + /// <summary> + /// Gets the scheduler's notion of current time. + /// </summary> + public DateTimeOffset Now + { + get { return ToDateTimeOffset(Clock); } + } + + /// <summary> + /// Gets the next scheduled item to be executed. + /// </summary> + /// <returns>The next scheduled item.</returns> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "By design. Side-effecting operation to retrieve the next element.")] + protected abstract IScheduledItem<TAbsolute> GetNext(); + + object IServiceProvider.GetService(Type serviceType) + { + return GetService(serviceType); + } + + /// <summary> + /// Discovers scheduler services by interface type. The base class implementation supports + /// only the IStopwatchProvider service. To influence service discovery - such as adding + /// support for other scheduler services - derived types can override this method. + /// </summary> + /// <param name="serviceType">Scheduler service interface type to discover.</param> + /// <returns>Object implementing the requested service, if available; null otherwise.</returns> + protected virtual object GetService(Type serviceType) + { + if (serviceType == typeof(IStopwatchProvider)) + return this as IStopwatchProvider; + + return null; + } + + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + public IStopwatch StartStopwatch() + { + var start = ToDateTimeOffset(Clock); + return new VirtualTimeStopwatch(() => ToDateTimeOffset(Clock) - start); + } + + class VirtualTimeStopwatch : IStopwatch + { + private readonly Func<TimeSpan> _getElapsed; + + public VirtualTimeStopwatch(Func<TimeSpan> getElapsed) + { + _getElapsed = getElapsed; + } + + public TimeSpan Elapsed + { + get { return _getElapsed(); } + } + } + } + + /// <summary> + /// Base class for virtual time schedulers using a priority queue for scheduled items. + /// </summary> + /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam> + /// <typeparam name="TRelative">Relative time representation type.</typeparam> + public abstract class VirtualTimeScheduler<TAbsolute, TRelative> : VirtualTimeSchedulerBase<TAbsolute, TRelative> + where TAbsolute : IComparable<TAbsolute> + { + private readonly SchedulerQueue<TAbsolute> queue = new SchedulerQueue<TAbsolute>(); + + /// <summary> + /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value. + /// </summary> + protected VirtualTimeScheduler() + : base() + { + } + + /// <summary> + /// Creates a new virtual time scheduler. + /// </summary> + /// <param name="initialClock">Initial value for the clock.</param> + /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param> + /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is null.</exception> + protected VirtualTimeScheduler(TAbsolute initialClock, IComparer<TAbsolute> comparer) + : base(initialClock, comparer) + { + } + + /// <summary> + /// Gets the next scheduled item to be executed. + /// </summary> + /// <returns>The next scheduled item.</returns> + protected override IScheduledItem<TAbsolute> GetNext() + { + while (queue.Count > 0) + { + var next = queue.Peek(); + if (next.IsCanceled) + queue.Dequeue(); + else + return next; + } + return null; + } + + /// <summary> + /// Schedules an action to be executed at dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable ScheduleAbsolute<TState>(TState state, TAbsolute dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var si = default(ScheduledItem<TAbsolute, TState>); + + var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) => + { + queue.Remove(si); + return action(scheduler, state1); + }); + + si = new ScheduledItem<TAbsolute, TState>(this, state, run, dueTime, Comparer); + queue.Enqueue(si); + + return Disposable.Create(si.Cancel); + } + } +} |