Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs')
-rw-r--r--Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs415
1 files changed, 415 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs b/Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs
new file mode 100644
index 0000000..4d67877
--- /dev/null
+++ b/Rx.NET/System.Reactive.Linq/Reactive/Concurrency/VirtualTimeScheduler.cs
@@ -0,0 +1,415 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Globalization;
+using System.Reactive.Disposables;
+
+namespace System.Reactive.Concurrency
+{
+ /// <summary>
+ /// Base class for virtual time schedulers.
+ /// </summary>
+ /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
+ /// <typeparam name="TRelative">Relative time representation type.</typeparam>
+ public abstract class VirtualTimeSchedulerBase<TAbsolute, TRelative> : IScheduler, IServiceProvider, IStopwatchProvider
+ where TAbsolute : IComparable<TAbsolute>
+ {
+ /// <summary>
+ /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value.
+ /// </summary>
+ protected VirtualTimeSchedulerBase()
+ : this(default(TAbsolute), Comparer<TAbsolute>.Default)
+ {
+ }
+
+ /// <summary>
+ /// Creates a new virtual time scheduler with the specified initial clock value and absolute time comparer.
+ /// </summary>
+ /// <param name="initialClock">Initial value for the clock.</param>
+ /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
+ /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is null.</exception>
+ protected VirtualTimeSchedulerBase(TAbsolute initialClock, IComparer<TAbsolute> comparer)
+ {
+ if (comparer == null)
+ throw new ArgumentNullException("comparer");
+
+ Clock = initialClock;
+ Comparer = comparer;
+ }
+
+ /// <summary>
+ /// Adds a relative time value to an absolute time value.
+ /// </summary>
+ /// <param name="absolute">Absolute time value.</param>
+ /// <param name="relative">Relative time value to add.</param>
+ /// <returns>The resulting absolute time sum value.</returns>
+ protected abstract TAbsolute Add(TAbsolute absolute, TRelative relative);
+
+ /// <summary>
+ /// Converts the absolute time value to a DateTimeOffset value.
+ /// </summary>
+ /// <param name="absolute">Absolute time value to convert.</param>
+ /// <returns>The corresponding DateTimeOffset value.</returns>
+ protected abstract DateTimeOffset ToDateTimeOffset(TAbsolute absolute);
+
+ /// <summary>
+ /// Converts the TimeSpan value to a relative time value.
+ /// </summary>
+ /// <param name="timeSpan">TimeSpan value to convert.</param>
+ /// <returns>The corresponding relative time value.</returns>
+ protected abstract TRelative ToRelative(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Gets whether the scheduler is enabled to run work.
+ /// </summary>
+ public bool IsEnabled
+ {
+ get;
+ private set;
+ }
+
+ /// <summary>
+ /// Gets the comparer used to compare absolute time values.
+ /// </summary>
+ protected IComparer<TAbsolute> Comparer
+ {
+ get;
+ private set;
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed at dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="dueTime">Absolute time at which to execute the action.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ public abstract IDisposable ScheduleAbsolute<TState>(TState state, TAbsolute dueTime, Func<IScheduler, TState, IDisposable> action);
+
+ /// <summary>
+ /// Schedules an action to be executed at dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ public IDisposable ScheduleRelative<TState>(TState state, TRelative dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var runAt = Add(Clock, dueTime);
+
+ return ScheduleAbsolute(state, runAt, action);
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ return ScheduleAbsolute(state, Clock, action);
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed after dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ return ScheduleRelative(state, ToRelative(dueTime), action);
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed at dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="dueTime">Absolute time at which to execute the action.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ return ScheduleRelative(state, ToRelative(dueTime - Now), action);
+ }
+
+ /// <summary>
+ /// Starts the virtual time scheduler.
+ /// </summary>
+ public void Start()
+ {
+ if (!IsEnabled)
+ {
+ IsEnabled = true;
+ do
+ {
+ var next = GetNext();
+ if (next != null)
+ {
+ if (Comparer.Compare(next.DueTime, Clock) > 0)
+ Clock = next.DueTime;
+ next.Invoke();
+ }
+ else
+ IsEnabled = false;
+ } while (IsEnabled);
+ }
+ }
+
+ /// <summary>
+ /// Stops the virtual time scheduler.
+ /// </summary>
+ public void Stop()
+ {
+ IsEnabled = false;
+ }
+
+ /// <summary>
+ /// Advances the scheduler's clock to the specified time, running all work till that point.
+ /// </summary>
+ /// <param name="time">Absolute time to advance the scheduler's clock to.</param>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="time"/> is in the past.</exception>
+ /// <exception cref="InvalidOperationException">The scheduler is already running. VirtualTimeScheduler doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use <see cref="Sleep"/>.</exception>
+ public void AdvanceTo(TAbsolute time)
+ {
+ var dueToClock = Comparer.Compare(time, Clock);
+ if (dueToClock < 0)
+ throw new ArgumentOutOfRangeException("time");
+
+ if (dueToClock == 0)
+ return;
+
+ if (!IsEnabled)
+ {
+ IsEnabled = true;
+ do
+ {
+ var next = GetNext();
+ if (next != null && Comparer.Compare(next.DueTime, time) <= 0)
+ {
+ if (Comparer.Compare(next.DueTime, Clock) > 0)
+ Clock = next.DueTime;
+ next.Invoke();
+ }
+ else
+ IsEnabled = false;
+ } while (IsEnabled);
+
+ Clock = time;
+ }
+ else
+ {
+ throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, "AdvanceTo"));
+ }
+ }
+
+ /// <summary>
+ /// Advances the scheduler's clock by the specified relative time, running all work scheduled for that timespan.
+ /// </summary>
+ /// <param name="time">Relative time to advance the scheduler's clock by.</param>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="time"/> is negative.</exception>
+ /// <exception cref="InvalidOperationException">The scheduler is already running. VirtualTimeScheduler doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use <see cref="Sleep"/>.</exception>
+ public void AdvanceBy(TRelative time)
+ {
+ var dt = Add(Clock, time);
+
+ var dueToClock = Comparer.Compare(dt, Clock);
+ if (dueToClock < 0)
+ throw new ArgumentOutOfRangeException("time");
+
+ if (dueToClock == 0)
+ return;
+
+ if (!IsEnabled)
+ {
+ AdvanceTo(dt);
+ }
+ else
+ {
+ throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, "AdvanceBy"));
+ }
+ }
+
+ /// <summary>
+ /// Advances the scheduler's clock by the specified relative time.
+ /// </summary>
+ /// <param name="time">Relative time to advance the scheduler's clock by.</param>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="time"/> is negative.</exception>
+ public void Sleep(TRelative time)
+ {
+ var dt = Add(Clock, time);
+
+ var dueToClock = Comparer.Compare(dt, Clock);
+ if (dueToClock < 0)
+ throw new ArgumentOutOfRangeException("time");
+
+ Clock = dt;
+ }
+
+ /// <summary>
+ /// Gets the scheduler's absolute time clock value.
+ /// </summary>
+ public TAbsolute Clock
+ {
+ get;
+ protected set;
+ }
+
+ /// <summary>
+ /// Gets the scheduler's notion of current time.
+ /// </summary>
+ public DateTimeOffset Now
+ {
+ get { return ToDateTimeOffset(Clock); }
+ }
+
+ /// <summary>
+ /// Gets the next scheduled item to be executed.
+ /// </summary>
+ /// <returns>The next scheduled item.</returns>
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "By design. Side-effecting operation to retrieve the next element.")]
+ protected abstract IScheduledItem<TAbsolute> GetNext();
+
+ object IServiceProvider.GetService(Type serviceType)
+ {
+ return GetService(serviceType);
+ }
+
+ /// <summary>
+ /// Discovers scheduler services by interface type. The base class implementation supports
+ /// only the IStopwatchProvider service. To influence service discovery - such as adding
+ /// support for other scheduler services - derived types can override this method.
+ /// </summary>
+ /// <param name="serviceType">Scheduler service interface type to discover.</param>
+ /// <returns>Object implementing the requested service, if available; null otherwise.</returns>
+ protected virtual object GetService(Type serviceType)
+ {
+ if (serviceType == typeof(IStopwatchProvider))
+ return this as IStopwatchProvider;
+
+ return null;
+ }
+
+ /// <summary>
+ /// Starts a new stopwatch object.
+ /// </summary>
+ /// <returns>New stopwatch object; started at the time of the request.</returns>
+ public IStopwatch StartStopwatch()
+ {
+ var start = ToDateTimeOffset(Clock);
+ return new VirtualTimeStopwatch(() => ToDateTimeOffset(Clock) - start);
+ }
+
+ class VirtualTimeStopwatch : IStopwatch
+ {
+ private readonly Func<TimeSpan> _getElapsed;
+
+ public VirtualTimeStopwatch(Func<TimeSpan> getElapsed)
+ {
+ _getElapsed = getElapsed;
+ }
+
+ public TimeSpan Elapsed
+ {
+ get { return _getElapsed(); }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Base class for virtual time schedulers using a priority queue for scheduled items.
+ /// </summary>
+ /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
+ /// <typeparam name="TRelative">Relative time representation type.</typeparam>
+ public abstract class VirtualTimeScheduler<TAbsolute, TRelative> : VirtualTimeSchedulerBase<TAbsolute, TRelative>
+ where TAbsolute : IComparable<TAbsolute>
+ {
+ private readonly SchedulerQueue<TAbsolute> queue = new SchedulerQueue<TAbsolute>();
+
+ /// <summary>
+ /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value.
+ /// </summary>
+ protected VirtualTimeScheduler()
+ : base()
+ {
+ }
+
+ /// <summary>
+ /// Creates a new virtual time scheduler.
+ /// </summary>
+ /// <param name="initialClock">Initial value for the clock.</param>
+ /// <param name="comparer">Comparer to determine causality of events based on absolute time.</param>
+ /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is null.</exception>
+ protected VirtualTimeScheduler(TAbsolute initialClock, IComparer<TAbsolute> comparer)
+ : base(initialClock, comparer)
+ {
+ }
+
+ /// <summary>
+ /// Gets the next scheduled item to be executed.
+ /// </summary>
+ /// <returns>The next scheduled item.</returns>
+ protected override IScheduledItem<TAbsolute> GetNext()
+ {
+ while (queue.Count > 0)
+ {
+ var next = queue.Peek();
+ if (next.IsCanceled)
+ queue.Dequeue();
+ else
+ return next;
+ }
+ return null;
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed at dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <param name="dueTime">Absolute time at which to execute the action.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable ScheduleAbsolute<TState>(TState state, TAbsolute dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var si = default(ScheduledItem<TAbsolute, TState>);
+
+ var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
+ {
+ queue.Remove(si);
+ return action(scheduler, state1);
+ });
+
+ si = new ScheduledItem<TAbsolute, TState>(this, state, run, dueTime, Comparer);
+ queue.Enqueue(si);
+
+ return Disposable.Create(si.Cancel);
+ }
+ }
+}