Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@veritas-vos-liberabit.com>2012-11-12 22:47:31 +0400
committerAtsushi Eno <atsushieno@veritas-vos-liberabit.com>2012-11-12 22:52:17 +0400
commitd1174f3f8979321a9182925df460e07e08157b41 (patch)
treed16fb2fc191bf68ff0e2aac600adf71aba8cad01 /Rx.NET/System.Reactive.PlatformServices/Reactive
parentd90a52595e24b1216c89f6cb5f245262db1810ae (diff)
partial import of ca05fdeb565e: Reactive Extensions OSS V1.0
Diffstat (limited to 'Rx.NET/System.Reactive.PlatformServices/Reactive')
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs101
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs215
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs375
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs194
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs230
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs34
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs185
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs303
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs29
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs19
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs43
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs63
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs214
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs109
-rw-r--r--Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs28
15 files changed, 2142 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs
new file mode 100644
index 0000000..1d5c0ae
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.Windows.cs
@@ -0,0 +1,101 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_THREAD && WINDOWS
+using System;
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
+ {
+ public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime)
+ {
+ var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer(
+ tpt =>
+ {
+ action(state);
+ },
+ Normalize(dueTime)
+ );
+
+ return Disposable.Create(res.Cancel);
+ }
+
+ public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
+ {
+ //
+ // The WinRT thread pool is based on the Win32 thread pool and cannot handle
+ // sub-1ms resolution. When passing a lower period, we get single-shot
+ // timer behavior instead. See MSDN documentation for CreatePeriodicTimer
+ // for more information.
+ //
+ if (period < TimeSpan.FromMilliseconds(1))
+ throw new ArgumentOutOfRangeException("period", Strings_PlatformServices.WINRT_NO_SUB1MS_TIMERS);
+
+ var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
+ tpt =>
+ {
+ action();
+ },
+ period
+ );
+
+ return Disposable.Create(res.Cancel);
+ }
+
+ public IDisposable QueueUserWorkItem(Action<object> action, object state)
+ {
+ var res = global::Windows.System.Threading.ThreadPool.RunAsync(iaa =>
+ {
+ action(state);
+ });
+
+ return Disposable.Create(res.Cancel);
+ }
+
+ public void Sleep(TimeSpan timeout)
+ {
+ var e = new ManualResetEventSlim();
+
+ global::Windows.System.Threading.ThreadPoolTimer.CreateTimer(
+ tpt =>
+ {
+ e.Set();
+ },
+ Normalize(timeout)
+ );
+
+ e.Wait();
+ }
+
+ public IStopwatch StartStopwatch()
+ {
+#if !NO_STOPWATCH
+ return new StopwatchImpl();
+#else
+ return new DefaultStopwatch();
+#endif
+ }
+
+ public bool SupportsLongRunning
+ {
+ get { return false; }
+ }
+
+ public void StartThread(Action<object> action, object state)
+ {
+ throw new NotSupportedException();
+ }
+
+ private TimeSpan Normalize(TimeSpan dueTime)
+ {
+ if (dueTime < TimeSpan.Zero)
+ return TimeSpan.Zero;
+
+ return dueTime;
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs
new file mode 100644
index 0000000..2d86ea7
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ConcurrencyAbstractionLayerImpl.cs
@@ -0,0 +1,215 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_THREAD
+using System;
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ //
+ // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
+ // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
+ // behavior of Rx for PLIB when used on a more capable platform.
+ //
+ internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
+ {
+ public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime)
+ {
+ return new Timer(action, state, Normalize(dueTime));
+ }
+
+ public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
+ {
+ //
+ // MSDN documentation states the following:
+ //
+ // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once;
+ // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method."
+ //
+ if (period <= TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException("period");
+
+ return new PeriodicTimer(action, period);
+ }
+
+ public IDisposable QueueUserWorkItem(Action<object> action, object state)
+ {
+ System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state);
+ return Disposable.Empty;
+ }
+
+#if USE_SLEEP_MS
+ public void Sleep(TimeSpan timeout)
+ {
+ System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds);
+ }
+#else
+ public void Sleep(TimeSpan timeout)
+ {
+ System.Threading.Thread.Sleep(Normalize(timeout));
+ }
+#endif
+
+ public IStopwatch StartStopwatch()
+ {
+#if !NO_STOPWATCH
+ return new StopwatchImpl();
+#else
+ return new DefaultStopwatch();
+#endif
+ }
+
+ public bool SupportsLongRunning
+ {
+ get { return true; }
+ }
+
+ public void StartThread(Action<object> action, object state)
+ {
+ new Thread(() =>
+ {
+ action(state);
+ }).Start();
+ }
+
+ private static TimeSpan Normalize(TimeSpan dueTime)
+ {
+ if (dueTime < TimeSpan.Zero)
+ return TimeSpan.Zero;
+
+ return dueTime;
+ }
+
+ class Timer : IDisposable
+ {
+ //
+ // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running.
+ //
+#if !NO_HASHSET
+ private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>();
+#else
+ private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>();
+#endif
+
+ private Action<object> _action;
+ private System.Threading.Timer _timer;
+
+ private bool _hasAdded;
+ private bool _hasRemoved;
+
+ public Timer(Action<object> action, object state, TimeSpan dueTime)
+ {
+ _action = action;
+ _timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
+
+ lock (s_timers)
+ {
+ if (!_hasRemoved)
+ {
+#if !NO_HASHSET
+ s_timers.Add(_timer);
+#else
+ s_timers.Add(_timer, null);
+#endif
+
+ _hasAdded = true;
+ }
+ }
+ }
+
+ private void Tick(object state)
+ {
+ try
+ {
+ _action(state);
+ }
+ finally
+ {
+ Dispose();
+ }
+ }
+
+ public void Dispose()
+ {
+ _action = Stubs<object>.Ignore;
+
+ var timer = default(System.Threading.Timer);
+
+ lock (s_timers)
+ {
+ if (!_hasRemoved)
+ {
+ timer = _timer;
+ _timer = null;
+
+ if (_hasAdded && timer != null)
+ s_timers.Remove(timer);
+
+ _hasRemoved = true;
+ }
+ }
+
+ if (timer != null)
+ timer.Dispose();
+ }
+ }
+
+ class PeriodicTimer : IDisposable
+ {
+ //
+ // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running.
+ //
+#if !NO_HASHSET
+ private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>();
+#else
+ private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>();
+#endif
+
+ private Action _action;
+ private System.Threading.Timer _timer;
+
+ public PeriodicTimer(Action action, TimeSpan period)
+ {
+ _action = action;
+ _timer = new System.Threading.Timer(Tick, null, period, period);
+
+ lock (s_timers)
+ {
+#if !NO_HASHSET
+ s_timers.Add(_timer);
+#else
+ s_timers.Add(_timer, null);
+#endif
+ }
+ }
+
+ private void Tick(object state)
+ {
+ _action();
+ }
+
+ public void Dispose()
+ {
+ var timer = default(System.Threading.Timer);
+
+ lock (s_timers)
+ {
+ timer = _timer;
+ _timer = null;
+
+ if (timer != null)
+ s_timers.Remove(timer);
+ }
+
+ if (timer != null)
+ {
+ timer.Dispose();
+ _action = Stubs.Nop;
+ }
+ }
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs
new file mode 100644
index 0000000..8739895
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs
@@ -0,0 +1,375 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Threading;
+
+#if NO_SEMAPHORE
+using System.Reactive.Threading;
+#endif
+
+namespace System.Reactive.Concurrency
+{
+ /// <summary>
+ /// Represents an object that schedules units of work on a designated thread.
+ /// </summary>
+ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
+ {
+ #region Fields
+
+ /// <summary>
+ /// Counter for diagnostic purposes, to name the threads.
+ /// </summary>
+ private static int s_counter;
+
+ /// <summary>
+ /// Thread factory function.
+ /// </summary>
+ private readonly Func<ThreadStart, Thread> _threadFactory;
+
+ /// <summary>
+ /// Stopwatch for timing free of absolute time dependencies.
+ /// </summary>
+ private IStopwatch _stopwatch;
+
+ /// <summary>
+ /// Thread used by the event loop to run work items on. No work should be run on any other thread.
+ /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
+ /// </summary>
+ private Thread _thread;
+
+ /// <summary>
+ /// Gate to protect data structures, including the work queue and the ready list.
+ /// </summary>
+ private readonly object _gate;
+
+ /// <summary>
+ /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
+ /// expires and moves on to the next item in the queue.
+ /// </summary>
+#if !NO_CDS
+ private readonly SemaphoreSlim _evt;
+#else
+ private readonly Semaphore _evt;
+#endif
+
+ /// <summary>
+ /// Queue holding work items. Protected by the gate.
+ /// </summary>
+ private readonly SchedulerQueue<TimeSpan> _queue;
+
+ /// <summary>
+ /// Queue holding items that are ready to be run as soon as possible. Protected by the gate.
+ /// </summary>
+ private readonly Queue<ScheduledItem<TimeSpan>> _readyList;
+
+ /// <summary>
+ /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
+ /// item is still the same. If not, a new timer needs to be started (see below).
+ /// </summary>
+ private ScheduledItem<TimeSpan> _nextItem;
+
+ /// <summary>
+ /// Disposable that always holds the timer to dispatch the first element in the queue.
+ /// </summary>
+ private readonly SerialDisposable _nextTimer;
+
+ /// <summary>
+ /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
+ /// wake up the event loop thread, which will subsequently abandon all work.
+ /// </summary>
+ private bool _disposed;
+
+ #endregion
+
+ #region Constructors
+
+ /// <summary>
+ /// Creates an object that schedules units of work on a designated thread.
+ /// </summary>
+ public EventLoopScheduler()
+ : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true })
+ {
+ }
+
+#if !NO_THREAD
+ /// <summary>
+ /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options.
+ /// </summary>
+ /// <param name="threadFactory">Factory function for thread creation.</param>
+ /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception>
+ public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
+ {
+ if (threadFactory == null)
+ throw new ArgumentNullException("threadFactory");
+#else
+ internal EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
+ {
+#endif
+ _threadFactory = threadFactory;
+ _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
+
+ _gate = new object();
+
+#if !NO_CDS
+ _evt = new SemaphoreSlim(0);
+#else
+ _evt = new Semaphore(0, int.MaxValue);
+#endif
+ _queue = new SchedulerQueue<TimeSpan>();
+ _readyList = new Queue<ScheduledItem<TimeSpan>>();
+
+ _nextTimer = new SerialDisposable();
+
+ ExitIfEmpty = false;
+ }
+
+ #endregion
+
+ #region Properties
+
+ /// <summary>
+ /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work
+ /// is scheduled afterwards, a new event loop thread is created. This property is used by the
+ /// NewThreadScheduler which uses an event loop for its recursive invocations.
+ /// </summary>
+ internal bool ExitIfEmpty
+ {
+ get;
+ set;
+ }
+
+ #endregion
+
+ #region Public methods
+
+ /// <summary>
+ /// Schedules an action to be executed after dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception>
+ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var due = _stopwatch.Elapsed + dueTime;
+ var si = new ScheduledItem<TimeSpan, TState>(this, state, action, due);
+
+ lock (_gate)
+ {
+ if (_disposed)
+ throw new ObjectDisposedException("");
+
+ if (dueTime <= TimeSpan.Zero)
+ {
+ _readyList.Enqueue(si);
+ _evt.Release();
+ }
+ else
+ {
+ _queue.Enqueue(si);
+ _evt.Release();
+ }
+
+ EnsureThread();
+ }
+
+ return Disposable.Create(si.Cancel);
+ }
+
+ /// <summary>
+ /// Schedules a periodic piece of work on the designated thread.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">Initial state passed to the action upon the first iteration.</param>
+ /// <param name="period">Period for running the work periodically.</param>
+ /// <param name="action">Action to be executed, potentially updating the state.</param>
+ /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
+ /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception>
+ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ if (period < TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException("period");
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var start = _stopwatch.Elapsed;
+ var next = start + period;
+
+ var state1 = state;
+
+ var d = new MultipleAssignmentDisposable();
+ var gate = new AsyncLock();
+
+ var tick = default(Func<IScheduler, object, IDisposable>);
+ tick = (self_, _) =>
+ {
+ next += period;
+
+ d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
+
+ gate.Wait(() =>
+ {
+ state1 = action(state1);
+ });
+
+ return Disposable.Empty;
+ };
+
+ d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
+
+ return new CompositeDisposable(d, gate);
+ }
+
+#if !NO_STOPWATCH
+ /// <summary>
+ /// Starts a new stopwatch object.
+ /// </summary>
+ /// <returns>New stopwatch object; started at the time of the request.</returns>
+ public override IStopwatch StartStopwatch()
+ {
+ //
+ // Strictly speaking, this explicit override is not necessary because the base implementation calls into
+ // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
+ // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
+ //
+ return new StopwatchImpl();
+ }
+#endif
+
+ /// <summary>
+ /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned.
+ /// </summary>
+ public void Dispose()
+ {
+ lock (_gate)
+ {
+ if (!_disposed)
+ {
+ _disposed = true;
+ _nextTimer.Dispose();
+ _evt.Release();
+ }
+ }
+ }
+
+ #endregion
+
+ #region Private implementation
+
+ /// <summary>
+ /// Ensures there is an event loop thread running. Should be called under the gate.
+ /// </summary>
+ private void EnsureThread()
+ {
+ if (_thread == null)
+ {
+ _thread = _threadFactory(Run);
+ _thread.Start();
+ }
+ }
+
+ /// <summary>
+ /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event
+ /// which gets set by calls to Schedule, the next item timer, or calls to Dispose.
+ /// </summary>
+ private void Run()
+ {
+ while (true)
+ {
+#if !NO_CDS
+ _evt.Wait();
+#else
+ _evt.WaitOne();
+#endif
+
+ var ready = default(ScheduledItem<TimeSpan>[]);
+
+ lock (_gate)
+ {
+ //
+ // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
+ // loop immediately. Subsequent calls to Schedule won't ever create a new thread.
+ //
+ if (_disposed)
+ {
+ ((IDisposable)_evt).Dispose();
+ return;
+ }
+
+ while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed)
+ {
+ var item = _queue.Dequeue();
+ _readyList.Enqueue(item);
+ }
+
+ if (_queue.Count > 0)
+ {
+ var next = _queue.Peek();
+ if (next != _nextItem)
+ {
+ _nextItem = next;
+
+ var due = next.DueTime - _stopwatch.Elapsed;
+ _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due);
+ }
+ }
+
+ if (_readyList.Count > 0)
+ {
+ ready = _readyList.ToArray();
+ _readyList.Clear();
+ }
+ }
+
+ if (ready != null)
+ {
+ foreach (var item in ready)
+ {
+ if (!item.IsCanceled)
+ item.Invoke();
+ }
+ }
+
+ if (ExitIfEmpty)
+ {
+ lock (_gate)
+ {
+ if (_readyList.Count == 0 && _queue.Count == 0)
+ {
+ _thread = null;
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ private void Tick(object state)
+ {
+ lock (_gate)
+ {
+ if (!_disposed)
+ {
+ var item = (ScheduledItem<TimeSpan>)state;
+ if (_queue.Remove(item))
+ {
+ _readyList.Enqueue(item);
+ }
+
+ _evt.Release();
+ }
+ }
+ }
+
+ #endregion
+ }
+} \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
new file mode 100644
index 0000000..b111554
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
@@ -0,0 +1,194 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ /// <summary>
+ /// Represents an object that schedules each unit of work on a separate thread.
+ /// </summary>
+ public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
+ {
+ internal static readonly NewThreadScheduler s_instance = new NewThreadScheduler();
+
+ private readonly Func<ThreadStart, Thread> _threadFactory;
+
+ /// <summary>
+ /// Creates an object that schedules each unit of work on a separate thread.
+ /// </summary>
+ public NewThreadScheduler()
+ : this(action => new Thread(action))
+ {
+ }
+
+ /// <summary>
+ /// Gets an instance of this scheduler that uses the default Thread constructor.
+ /// </summary>
+ public static NewThreadScheduler Default
+ {
+ get
+ {
+ return s_instance;
+ }
+ }
+
+#if !NO_THREAD
+ /// <summary>
+ /// Creates an object that schedules each unit of work on a separate thread.
+ /// </summary>
+ /// <param name="threadFactory">Factory function for thread creation.</param>
+ /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception>
+ public NewThreadScheduler(Func<ThreadStart, Thread> threadFactory)
+ {
+ if (threadFactory == null)
+ throw new ArgumentNullException("threadFactory");
+#else
+ private NewThreadScheduler(Func<ThreadStart, Thread> threadFactory)
+ {
+#endif
+ _threadFactory = threadFactory;
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed after dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var scheduler = new EventLoopScheduler(_threadFactory);
+ scheduler.ExitIfEmpty = true;
+ return scheduler.Schedule(state, dueTime, action);
+ }
+
+ /// <summary>
+ /// Schedules a long-running task by creating a new thread. Cancellation happens through polling.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var d = new BooleanDisposable();
+
+ var thread = _threadFactory(() =>
+ {
+ //
+ // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
+ // requires us to ensure the scheduled work gets an opportunity to observe
+ // the cancellation request.
+ //
+ action(state, d);
+ });
+
+ thread.Start();
+
+ return d;
+ }
+
+ /// <summary>
+ /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">Initial state passed to the action upon the first iteration.</param>
+ /// <param name="period">Period for running the work periodically.</param>
+ /// <param name="action">Action to be executed, potentially updating the state.</param>
+ /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
+ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ if (period < TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException("period");
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var periodic = new Periodic<TState>(state, period, action);
+
+ var thread = _threadFactory(periodic.Run);
+ thread.Start();
+
+ return periodic;
+ }
+
+ class Periodic<TState> : IDisposable
+ {
+ private readonly IStopwatch _stopwatch;
+ private readonly TimeSpan _period;
+ private readonly Func<TState, TState> _action;
+
+ private readonly object _cancel = new object();
+ private volatile bool _done;
+
+ private TState _state;
+ private TimeSpan _next;
+
+ public Periodic(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
+
+ _period = period;
+ _action = action;
+
+ _state = state;
+ _next = period;
+ }
+
+ public void Run()
+ {
+ while (!_done)
+ {
+ var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed);
+
+ lock (_cancel)
+ {
+ if (Monitor.Wait(_cancel, timeout))
+ return;
+ }
+
+ _state = _action(_state);
+ _next += _period;
+ }
+ }
+
+ public void Dispose()
+ {
+ _done = true;
+
+ lock (_cancel)
+ {
+ Monitor.Pulse(_cancel);
+ }
+ }
+ }
+
+#if !NO_STOPWATCH
+ /// <summary>
+ /// Starts a new stopwatch object.
+ /// </summary>
+ /// <returns>New stopwatch object; started at the time of the request.</returns>
+ public override IStopwatch StartStopwatch()
+ {
+ //
+ // Strictly speaking, this explicit override is not necessary because the base implementation calls into
+ // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
+ // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
+ //
+ return new StopwatchImpl();
+ }
+#endif
+ }
+} \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs
new file mode 100644
index 0000000..a84588b
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs
@@ -0,0 +1,230 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_TPL
+using System.Reactive.Disposables;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Concurrency
+{
+ /// <summary>
+ /// Represents an object that schedules units of work on the Task Parallel Library (TPL) task pool.
+ /// </summary>
+ /// <seealso cref="TaskPoolScheduler.Default">Instance of this type using the default TaskScheduler to schedule work on the TPL task pool.</seealso>
+ public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
+ {
+ private static readonly TaskPoolScheduler s_instance = new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default));
+ private readonly TaskFactory taskFactory;
+
+ /// <summary>
+ /// Creates an object that schedules units of work using the provided TaskFactory.
+ /// </summary>
+ /// <param name="taskFactory">Task factory used to create tasks to run units of work.</param>
+ /// <exception cref="ArgumentNullException"><paramref name="taskFactory"/> is null.</exception>
+ public TaskPoolScheduler(TaskFactory taskFactory)
+ {
+ if (taskFactory == null)
+ throw new ArgumentNullException("taskFactory");
+
+ this.taskFactory = taskFactory;
+ }
+
+ /// <summary>
+ /// Gets an instance of this scheduler that uses the default TaskScheduler.
+ /// </summary>
+ public static TaskPoolScheduler Default
+ {
+ get
+ {
+ return s_instance;
+ }
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var d = new SerialDisposable();
+ var cancelable = new CancellationDisposable();
+ d.Disposable = cancelable;
+ taskFactory.StartNew(() =>
+ {
+ //
+ // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing
+ // helper thread.
+ //
+ // Our manual escalation based on the creation of a throwing thread was merely to
+ // expedite the process of throwing the exception that would otherwise occur on the
+ // finalizer thread at a later point during the app's lifetime.
+ //
+ // However, it also prevented applications from observing the exception through
+ // the TaskScheduler.UnobservedTaskException static event. Also, starting form .NET
+ // 4.5, the default behavior of the task pool is not to take down the application
+ // when an exception goes unobserved (done as part of the async/await work). It'd
+ // be weird for Rx not to follow the platform defaults.
+ //
+ // General implementation guidelines for schedulers (in order of importance):
+ //
+ // 1. Always thunk through to the underlying infrastructure with a wrapper that's as tiny as possible.
+ // 2. Global exception notification/handling mechanisms shouldn't be bypassed.
+ // 3. Escalation behavior for exceptions is left to the underlying infrastructure.
+ //
+ // The Catch extension method for IScheduler (added earlier) allows to re-route
+ // exceptions at stage 2. If the exception isn't handled at the Rx level, it
+ // propagates by means of a rethrow, falling back to behavior in 3.
+ //
+ d.Disposable = action(this, state);
+ }, cancelable.Token);
+ return d;
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed after dueTime.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var dt = Scheduler.Normalize(dueTime);
+ if (dt.Ticks == 0)
+ return Schedule(state, action);
+
+#if !NO_TASK_DELAY
+ var d = new MultipleAssignmentDisposable();
+
+ var ct = new CancellationDisposable();
+ d.Disposable = ct;
+
+ Task.Delay(dueTime, ct.Token).ContinueWith(_ =>
+ {
+ if (!d.IsDisposed)
+ d.Disposable = action(this, state);
+ }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler);
+
+ return d;
+#else
+ return DefaultScheduler.Instance.Schedule(state, dt, (_, state1) => Schedule(state1, action));
+#endif
+ }
+
+ /// <summary>
+ /// Schedules a long-running task by creating a new task using TaskCreationOptions.LongRunning. Cancellation happens through polling.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
+ {
+ var d = new BooleanDisposable();
+
+ taskFactory.StartNew(() =>
+ {
+ //
+ // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
+ // requires us to ensure the scheduled work gets an opportunity to observe
+ // the cancellation request.
+ //
+ action(state, d);
+ }, TaskCreationOptions.LongRunning);
+
+ return d;
+ }
+
+#if !NO_STOPWATCH
+ /// <summary>
+ /// Gets a new stopwatch ob ject.
+ /// </summary>
+ /// <returns>New stopwatch object; started at the time of the request.</returns>
+ public override IStopwatch StartStopwatch()
+ {
+ //
+ // Strictly speaking, this explicit override is not necessary because the base implementation calls into
+ // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
+ // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
+ //
+ return new StopwatchImpl();
+ }
+#endif
+
+ /// <summary>
+ /// Schedules a periodic piece of work by running a platform-specific timer to create tasks periodically.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">Initial state passed to the action upon the first iteration.</param>
+ /// <param name="period">Period for running the work periodically.</param>
+ /// <param name="action">Action to be executed, potentially updating the state.</param>
+ /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
+ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ if (period < TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException("period");
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+#if !NO_TASK_DELAY
+ var cancel = new CancellationDisposable();
+
+ var state1 = state;
+ var gate = new AsyncLock();
+
+ var moveNext = default(Action);
+ moveNext = () =>
+ {
+ Task.Delay(period, cancel.Token).ContinueWith(
+ _ =>
+ {
+ moveNext();
+
+ gate.Wait(() =>
+ {
+ state1 = action(state1);
+ });
+ },
+ CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler
+ );
+ };
+
+ moveNext();
+
+ return new CompositeDisposable(cancel, gate);
+#else
+ var state1 = state;
+ var gate = new AsyncLock();
+
+ var timer = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(() =>
+ {
+ taskFactory.StartNew(() =>
+ {
+ gate.Wait(() =>
+ {
+ state1 = action(state1);
+ });
+ });
+ }, period);
+
+ return new CompositeDisposable(timer, gate);
+#endif
+ }
+ }
+}
+#endif
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs
new file mode 100644
index 0000000..691365b
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/Thread.Stub.cs
@@ -0,0 +1,34 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_THREAD
+using System;
+using System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ class Thread
+ {
+ private readonly ThreadStart _start;
+
+ public Thread(ThreadStart start)
+ {
+ _start = start;
+ }
+
+ public string Name { get; set; }
+ public bool IsBackground { get; set; }
+
+ public void Start()
+ {
+ System.Threading.Tasks.Task.Factory.StartNew(Run, System.Threading.Tasks.TaskCreationOptions.LongRunning);
+ }
+
+ private void Run()
+ {
+ _start();
+ }
+ }
+
+ delegate void ThreadStart();
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs
new file mode 100644
index 0000000..1b8579d
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.Windows.cs
@@ -0,0 +1,185 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if WINDOWS
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using Windows.System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ /// <summary>
+ /// Represents an object that schedules units of work on the Windows Runtime thread pool.
+ /// </summary>
+ /// <seealso cref="ThreadPoolScheduler.Default">Singleton instance of this type exposed through this static property.</seealso>
+ [CLSCompliant(false)]
+ public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerPeriodic
+ {
+ private readonly WorkItemPriority _priority;
+ private readonly WorkItemOptions _options;
+ private static Lazy<ThreadPoolScheduler> s_default = new Lazy<ThreadPoolScheduler>(() => new ThreadPoolScheduler());
+
+ /// <summary>
+ /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool.
+ /// </summary>
+ public ThreadPoolScheduler()
+ {
+ }
+
+ /// <summary>
+ /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool with the given priority.
+ /// </summary>
+ /// <param name="priority">Priority for scheduled units of work.</param>
+ public ThreadPoolScheduler(WorkItemPriority priority)
+ {
+ _priority = priority;
+ _options = WorkItemOptions.None;
+ }
+
+ /// <summary>
+ /// Constructs a ThreadPoolScheduler that schedules units of work on the Windows ThreadPool with the given priority.
+ /// </summary>
+ /// <param name="priority">Priority for scheduled units of work.</param>
+ /// <param name="options">Options that configure how work is scheduled.</param>
+ public ThreadPoolScheduler(WorkItemPriority priority, WorkItemOptions options)
+ {
+ _priority = priority;
+ _options = options;
+ }
+
+ /// <summary>
+ /// Gets the singleton instance of the Windows Runtime thread pool scheduler.
+ /// </summary>
+ public static ThreadPoolScheduler Default
+ {
+ get
+ {
+ return s_default.Value;
+ }
+ }
+
+ /// <summary>
+ /// Gets the priority at which work is scheduled.
+ /// </summary>
+ public WorkItemPriority Priority
+ {
+ get { return _priority; }
+ }
+
+ /// <summary>
+ /// Gets the options that configure how work is scheduled.
+ /// </summary>
+ public WorkItemOptions Options
+ {
+ get { return _options; }
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var d = new SingleAssignmentDisposable();
+
+ var res = global::Windows.System.Threading.ThreadPool.RunAsync(iaa =>
+ {
+ if (!d.IsDisposed)
+ d.Disposable = action(this, state);
+ }, _priority, _options);
+
+ return new CompositeDisposable(
+ d,
+ Disposable.Create(res.Cancel)
+ );
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed after dueTime, using a Windows.System.Threading.ThreadPoolTimer object.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var dt = Scheduler.Normalize(dueTime);
+
+ if (dt.Ticks == 0)
+ return Schedule(state, action);
+
+ var d = new SingleAssignmentDisposable();
+
+ var res = global::Windows.System.Threading.ThreadPoolTimer.CreateTimer(
+ tpt =>
+ {
+ if (!d.IsDisposed)
+ d.Disposable = action(this, state);
+ },
+ dt
+ );
+
+ return new CompositeDisposable(
+ d,
+ Disposable.Create(res.Cancel)
+ );
+ }
+
+ /// <summary>
+ /// Schedules a periodic piece of work, using a Windows.System.Threading.ThreadPoolTimer object.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">Initial state passed to the action upon the first iteration.</param>
+ /// <param name="period">Period for running the work periodically.</param>
+ /// <param name="action">Action to be executed, potentially updating the state.</param>
+ /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than one millisecond.</exception>
+ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ //
+ // The WinRT thread pool is based on the Win32 thread pool and cannot handle
+ // sub-1ms resolution. When passing a lower period, we get single-shot
+ // timer behavior instead. See MSDN documentation for CreatePeriodicTimer
+ // for more information.
+ //
+ if (period < TimeSpan.FromMilliseconds(1))
+ throw new ArgumentOutOfRangeException("period", Strings_PlatformServices.WINRT_NO_SUB1MS_TIMERS);
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var state1 = state;
+ var gate = new AsyncLock();
+
+ var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
+ tpt =>
+ {
+ gate.Wait(() =>
+ {
+ state1 = action(state1);
+ });
+ },
+ period
+ );
+
+ return Disposable.Create(() =>
+ {
+ res.Cancel();
+ gate.Dispose();
+ action = Stubs<TState>.I;
+ });
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs
new file mode 100644
index 0000000..cc2aa15
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Concurrency/ThreadPoolScheduler.cs
@@ -0,0 +1,303 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !WINDOWS && !NO_THREAD
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ /// <summary>
+ /// Represents an object that schedules units of work on the CLR thread pool.
+ /// </summary>
+ /// <seealso cref="ThreadPoolScheduler.Instance">Singleton instance of this type exposed through this static property.</seealso>
+ public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
+ {
+ private static readonly ThreadPoolScheduler s_instance = new ThreadPoolScheduler();
+
+ /// <summary>
+ /// Gets the singleton instance of the CLR thread pool scheduler.
+ /// </summary>
+ public static ThreadPoolScheduler Instance
+ {
+ get
+ {
+ return s_instance;
+ }
+ }
+
+ ThreadPoolScheduler()
+ {
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var d = new SingleAssignmentDisposable();
+
+ ThreadPool.QueueUserWorkItem(_ =>
+ {
+ if (!d.IsDisposed)
+ d.Disposable = action(this, state);
+ }, null);
+
+ return d;
+ }
+
+ /// <summary>
+ /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <param name="dueTime">Relative time after which to execute the action.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ var dt = Scheduler.Normalize(dueTime);
+ if (dt.Ticks == 0)
+ return Schedule(state, action);
+
+ return new Timer<TState>(this, state, dt, action);
+ }
+
+ /// <summary>
+ /// Schedules a long-running task by creating a new thread. Cancellation happens through polling.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">State passed to the action to be executed.</param>
+ /// <param name="action">Action to be executed.</param>
+ /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ return NewThreadScheduler.Default.ScheduleLongRunning(state, action);
+ }
+
+#if !NO_STOPWATCH
+ /// <summary>
+ /// Starts a new stopwatch object.
+ /// </summary>
+ /// <returns>New stopwatch object; started at the time of the request.</returns>
+ public override IStopwatch StartStopwatch()
+ {
+ //
+ // Strictly speaking, this explicit override is not necessary because the base implementation calls into
+ // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
+ // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
+ //
+ return new StopwatchImpl();
+ }
+#endif
+
+ /// <summary>
+ /// Schedules a periodic piece of work, using a System.Threading.Timer object.
+ /// </summary>
+ /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
+ /// <param name="state">Initial state passed to the action upon the first iteration.</param>
+ /// <param name="period">Period for running the work periodically.</param>
+ /// <param name="action">Action to be executed, potentially updating the state.</param>
+ /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than or equal to zero.</exception>
+ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ //
+ // MSDN documentation states the following:
+ //
+ // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once;
+ // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method."
+ //
+ if (period <= TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException("period");
+ if (action == null)
+ throw new ArgumentNullException("action");
+
+ return new PeriodicTimer<TState>(state, period, action);
+ }
+
+ abstract class Timer
+ {
+ //
+ // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running.
+ //
+#if !NO_HASHSET
+ protected static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>();
+#else
+ protected static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>();
+#endif
+ }
+
+ sealed class Timer<TState> : Timer, IDisposable
+ {
+ private readonly MultipleAssignmentDisposable _disposable;
+
+ private readonly IScheduler _parent;
+ private readonly TState _state;
+
+ private Func<IScheduler, TState, IDisposable> _action;
+ private System.Threading.Timer _timer;
+
+ private bool _hasAdded;
+ private bool _hasRemoved;
+
+ public Timer(IScheduler parent, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+ {
+ _disposable = new MultipleAssignmentDisposable();
+ _disposable.Disposable = Disposable.Create(Unroot);
+
+ _parent = parent;
+ _state = state;
+
+ _action = action;
+ _timer = new System.Threading.Timer(Tick, null, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
+
+ lock (s_timers)
+ {
+ if (!_hasRemoved)
+ {
+#if !NO_HASHSET
+ s_timers.Add(_timer);
+#else
+ s_timers.Add(_timer, null);
+#endif
+
+ _hasAdded = true;
+ }
+ }
+ }
+
+ private void Tick(object state)
+ {
+ try
+ {
+ _disposable.Disposable = _action(_parent, _state);
+ }
+ finally
+ {
+ Unroot();
+ }
+ }
+
+ private void Unroot()
+ {
+ _action = Nop;
+
+ var timer = default(System.Threading.Timer);
+
+ lock (s_timers)
+ {
+ if (!_hasRemoved)
+ {
+ timer = _timer;
+ _timer = null;
+
+ if (_hasAdded && timer != null)
+ s_timers.Remove(timer);
+
+ _hasRemoved = true;
+ }
+ }
+
+ if (timer != null)
+ timer.Dispose();
+ }
+
+ private IDisposable Nop(IScheduler scheduler, TState state)
+ {
+ return Disposable.Empty;
+ }
+
+ public void Dispose()
+ {
+ _disposable.Dispose();
+ }
+ }
+
+ abstract class PeriodicTimer
+ {
+ //
+ // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running.
+ //
+#if !NO_HASHSET
+ protected static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>();
+#else
+ protected static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>();
+#endif
+ }
+
+ sealed class PeriodicTimer<TState> : PeriodicTimer, IDisposable
+ {
+ private readonly AsyncLock _gate;
+
+ private TState _state;
+ private Func<TState, TState> _action;
+ private System.Threading.Timer _timer;
+
+ public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action)
+ {
+ _gate = new AsyncLock();
+
+ _state = state;
+ _action = action;
+ _timer = new System.Threading.Timer(Tick, null, period, period);
+
+ lock (s_timers)
+ {
+#if !NO_HASHSET
+ s_timers.Add(_timer);
+#else
+ s_timers.Add(_timer, null);
+#endif
+ }
+ }
+
+ private void Tick(object state)
+ {
+ _gate.Wait(() =>
+ {
+ _state = _action(_state);
+ });
+ }
+
+ public void Dispose()
+ {
+ var timer = default(System.Threading.Timer);
+
+ lock (s_timers)
+ {
+ timer = _timer;
+ _timer = null;
+
+ if (timer != null)
+ s_timers.Remove(timer);
+ }
+
+ if (timer != null)
+ {
+ timer.Dispose();
+ _gate.Dispose();
+ _action = Stubs<TState>.I;
+ }
+ }
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs
new file mode 100644
index 0000000..bdacdc7
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/EnlightenmentProvider.cs
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Reactive.Concurrency;
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// Provides access to the platform enlightenments used by other Rx libraries to improve system performance and
+ /// runtime efficiency. While Rx can run without platform enlightenments loaded, it's recommended to deploy the
+ /// System.Reactive.PlatformServices assembly with your application and call <see cref="EnlightenmentProvider.
+ /// EnsureLoaded"/> during application startup to ensure enlightenments are properly loaded.
+ /// </summary>
+ public static class EnlightenmentProvider
+ {
+ /// <summary>
+ /// Ensures that the calling assembly has a reference to the System.Reactive.PlatformServices assembly with
+ /// platform enlightenments. If no reference is made from the user code, it's possible for the build process
+ /// to drop the deployment of System.Reactive.PlatformServices, preventing its runtime discovery.
+ /// </summary>
+ /// <returns>
+ /// true if the loaded enlightenment provider matches the provided defined in the current assembly; false
+ /// otherwise. When a custom enlightenment provider is installed by the host, false will be returned.
+ /// </returns>
+ public static bool EnsureLoaded()
+ {
+ return PlatformEnlightenmentProvider.Current is CurrentPlatformEnlightenmentProvider;
+ }
+ }
+}
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs
new file mode 100644
index 0000000..6d14651
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/ExceptionServicesImpl.cs
@@ -0,0 +1,19 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if HAS_EDI
+namespace System.Reactive.PlatformServices
+{
+ //
+ // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
+ // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
+ // behavior of Rx for PLIB when used on a more capable platform.
+ //
+ internal class /*Default*/ExceptionServicesImpl : IExceptionServices
+ {
+ public void Rethrow(Exception exception)
+ {
+ System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw();
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs
new file mode 100644
index 0000000..c70bf17
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.Windows.cs
@@ -0,0 +1,43 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if WINDOWS
+using Windows.ApplicationModel;
+using Windows.ApplicationModel.Core;
+
+namespace System.Reactive.PlatformServices
+{
+ internal class HostLifecycleNotifications : IHostLifecycleNotifications
+ {
+ private EventHandler<SuspendingEventArgs> _suspending;
+ private EventHandler<object> _resuming;
+
+ public event EventHandler<HostSuspendingEventArgs> Suspending
+ {
+ add
+ {
+ _suspending = (o, e) => value(o, new HostSuspendingEventArgs());
+ CoreApplication.Suspending += _suspending;
+ }
+
+ remove
+ {
+ CoreApplication.Suspending -= _suspending;
+ }
+ }
+
+ public event EventHandler<HostResumingEventArgs> Resuming
+ {
+ add
+ {
+ _resuming = (o, e) => value(o, new HostResumingEventArgs());
+ CoreApplication.Resuming += _resuming;
+ }
+
+ remove
+ {
+ CoreApplication.Resuming -= _resuming;
+ }
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs
new file mode 100644
index 0000000..30a32e7
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/HostLifecycleNotifications.WindowsPhone.cs
@@ -0,0 +1,63 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if WINDOWSPHONE7
+
+#if DEBUG_NO_AGENT_SUPPORT
+using Microsoft.Phone.Shell;
+#else
+using System.Reactive.PlatformServices.Phone.Shell;
+#endif
+
+namespace System.Reactive.PlatformServices
+{
+ internal class HostLifecycleNotifications : IHostLifecycleNotifications
+ {
+ private EventHandler<ActivatedEventArgs> _activated;
+ private EventHandler<DeactivatedEventArgs> _deactivated;
+
+ public event EventHandler<HostSuspendingEventArgs> Suspending
+ {
+ add
+ {
+ _deactivated = (o, e) => value(o, new HostSuspendingEventArgs());
+
+ var current = PhoneApplicationService.Current;
+ if (current != null)
+ current.Deactivated += _deactivated;
+ }
+
+ remove
+ {
+ var current = PhoneApplicationService.Current;
+ if (current != null)
+ current.Deactivated -= _deactivated;
+ }
+ }
+
+ public event EventHandler<HostResumingEventArgs> Resuming
+ {
+ add
+ {
+ _activated = (o, e) =>
+ {
+ if (e.IsApplicationInstancePreserved)
+ {
+ value(o, new HostResumingEventArgs());
+ }
+ };
+
+ var current = PhoneApplicationService.Current;
+ if (current != null)
+ current.Activated += _activated;
+ }
+
+ remove
+ {
+ var current = PhoneApplicationService.Current;
+ if (current != null)
+ current.Activated -= _activated;
+ }
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs
new file mode 100644
index 0000000..3ce1b22
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PhoneShellThunks.cs
@@ -0,0 +1,214 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if WINDOWSPHONE7
+#if !DEBUG_NO_AGENT_SUPPORT
+//
+// The Windows Phone Marketplace Test Kit disallows usage of types
+// in the Microsoft.Phone.Shell namespace, determined by static code
+// analysis, for background agents.
+//
+// However, with a null check for PhoneApplicationService.Current,
+// we can safely use this the lifecycle events for dormant state
+// transitions. In a background agent this property will be null;
+// trying to create an instance of PhoneApplicationService throws
+// a ComException and is required to set the Current property.
+//
+// In order to access the PhoneApplicationService functionality for
+// non-background agent assemblies, we build a late bound wrapper
+// around the APIs.
+//
+// See appplat\src\Frameworks\Microsoft\Phone\PhoneApplicationService.cs
+// for implementation details of the class we use.
+//
+namespace System.Reactive.PlatformServices.Phone.Shell
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq.Expressions;
+ using System.Reflection;
+ using Microsoft.Phone;
+
+ class PhoneApplicationService
+ {
+ private static readonly object s_gate = new object();
+ internal static readonly Assembly s_phshAsm;
+ private static readonly Type s_pasType;
+ private static readonly PropertyInfo s_curProp;
+ private static readonly EventInfo s_actdEvt;
+ private static readonly EventInfo s_deacEvt;
+
+ private readonly object _target;
+
+ static PhoneApplicationService()
+ {
+ s_phshAsm = typeof(BackgroundAgent).Assembly; // Has to be in Microsoft.Phone.dll.
+ s_pasType = s_phshAsm.GetType("Microsoft.Phone.Shell.PhoneApplicationService");
+ s_curProp = s_pasType.GetProperty("Current", BindingFlags.Public | BindingFlags.Static);
+ s_actdEvt = s_curProp.PropertyType.GetEvent("Activated", BindingFlags.Public | BindingFlags.Instance);
+ s_deacEvt = s_curProp.PropertyType.GetEvent("Deactivated", BindingFlags.Public | BindingFlags.Instance);
+ }
+
+ private PhoneApplicationService(object target)
+ {
+ _target = target;
+ }
+
+ private static PhoneApplicationService s_current;
+
+ public static PhoneApplicationService Current
+ {
+ get
+ {
+ lock (s_gate)
+ {
+ if (s_current == null)
+ {
+ var current = s_curProp.GetValue(null, null);
+ if (current == null)
+ return null;
+
+ //
+ // Current provides a singleton. The constructor
+ // of PhoneApplicationService guarantees this,
+ // throwing an InvalidOperationException if more
+ // than one instance is created.
+ //
+ s_current = new PhoneApplicationService(current);
+ }
+ }
+
+ return s_current;
+ }
+ }
+
+ private Dictionary<object, object> _actdHandlers = new Dictionary<object, object>();
+
+ public event EventHandler<ActivatedEventArgs> Activated
+ {
+ add
+ {
+ AddHandler<ActivatedEventArgs>(s_actdEvt, _actdHandlers, value);
+ }
+
+ remove
+ {
+ RemoveHandler(s_actdEvt, _actdHandlers, value);
+ }
+ }
+
+ private Dictionary<object, object> _deacHandlers = new Dictionary<object, object>();
+
+ public event EventHandler<DeactivatedEventArgs> Deactivated
+ {
+ add
+ {
+ AddHandler<DeactivatedEventArgs>(s_deacEvt, _deacHandlers, value);
+ }
+
+ remove
+ {
+ RemoveHandler(s_deacEvt, _deacHandlers, value);
+ }
+ }
+
+ private void AddHandler<TEventArgs>(EventInfo evt, Dictionary<object, object> map, object handler)
+ where TEventArgs : EventArgs
+ {
+ var h = GetHandler<TEventArgs>(evt, handler);
+ var add = evt.GetAddMethod();
+
+ lock (s_gate)
+ {
+ map.Add(handler, h);
+ add.Invoke(_target, new object[] { h });
+ }
+ }
+
+ private void RemoveHandler(EventInfo evt, Dictionary<object, object> map, object handler)
+ {
+ var rem = evt.GetRemoveMethod();
+
+ lock (s_gate)
+ {
+ var h = default(object);
+ if (map.TryGetValue(handler, out h))
+ {
+ //
+ // We assume only one handler will be attached to
+ // the event, hence we shouldn't worry about having
+ // multiple delegate instances with the same target
+ // being attached. This guarantee is made by the
+ // reference counting in HostLifecycleService.
+ //
+ // The use of a dictionary allows for reuse with
+ // multiple distinct handlers going forward.
+ //
+ map.Remove(handler);
+ rem.Invoke(_target, new object[] { h });
+ }
+ }
+ }
+
+ private static object GetHandler<TEventArgsThunk>(EventInfo evt, object call)
+ where TEventArgsThunk : EventArgs
+ {
+ var ht = evt.EventHandlerType;
+ var hp = ht.GetMethod("Invoke").GetParameters();
+
+ var po = Expression.Parameter(hp[0].ParameterType, hp[0].Name);
+ var pe = Expression.Parameter(hp[1].ParameterType, hp[1].Name);
+
+ var h = Expression.Lambda(
+ ht,
+ Expression.Invoke(
+ Expression.Constant(call),
+ po,
+ Expression.New(
+ typeof(TEventArgsThunk).GetConstructor(new[] { typeof(object) }),
+ pe
+ )
+ ),
+ po,
+ pe
+ );
+
+ return h.Compile();
+ }
+ }
+
+ class ActivatedEventArgs : EventArgs
+ {
+ private static readonly Type s_aeaType;
+ private static readonly PropertyInfo s_aipProp;
+
+ private readonly object _target;
+
+ static ActivatedEventArgs()
+ {
+ s_aeaType = PhoneApplicationService.s_phshAsm.GetType("Microsoft.Phone.Shell.ActivatedEventArgs");
+ s_aipProp = s_aeaType.GetProperty("IsApplicationInstancePreserved", BindingFlags.Public | BindingFlags.Instance);
+ }
+
+ public ActivatedEventArgs(object target)
+ {
+ _target = target;
+ }
+
+ public bool IsApplicationInstancePreserved
+ {
+ get
+ {
+ return (bool)s_aipProp.GetValue(_target, null);
+ }
+ }
+ }
+
+ class DeactivatedEventArgs : EventArgs
+ {
+ public DeactivatedEventArgs(object target)
+ {
+ }
+ }
+}
+#endif
+#endif \ No newline at end of file
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs
new file mode 100644
index 0000000..7bb597c
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/PlatformEnlightenmentProvider.cs
@@ -0,0 +1,109 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+//
+// WARNING: The full namespace-qualified type name should stay the same for the discovery in System.Reactive.Core to work!
+//
+using System.ComponentModel;
+using System.Diagnostics;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using System.Reflection;
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// (Infrastructure) Provider for platform-specific framework enlightenments.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public class CurrentPlatformEnlightenmentProvider : IPlatformEnlightenmentProvider
+ {
+ /// <summary>
+ /// (Infastructure) Tries to gets the specified service.
+ /// </summary>
+ /// <typeparam name="T">Service type.</typeparam>
+ /// <param name="args">Optional set of arguments.</param>
+ /// <returns>Service instance or null if not found.</returns>
+ public virtual T GetService<T>(object[] args) where T : class
+ {
+ var t = typeof(T);
+
+#if HAS_EDI
+ if (t == typeof(IExceptionServices))
+ {
+ return (T)(object)new ExceptionServicesImpl();
+ }
+#endif
+
+#if !NO_THREAD || WINDOWS
+ if (t == typeof(IConcurrencyAbstractionLayer))
+ {
+ return (T)(object)new ConcurrencyAbstractionLayerImpl();
+ }
+#endif
+
+ if (t == typeof(IScheduler) && args != null)
+ {
+ switch ((string)args[0])
+ {
+#if !WINDOWS && !NO_THREAD
+ case "ThreadPool":
+ return (T)(object)ThreadPoolScheduler.Instance;
+#elif WINDOWS
+ case "ThreadPool":
+ return (T)(object)ThreadPoolScheduler.Default;
+#endif
+#if !NO_TPL
+ case "TaskPool":
+ return (T)(object)TaskPoolScheduler.Default;
+#endif
+ case "NewThread":
+ return (T)(object)NewThreadScheduler.Default;
+ }
+ }
+
+#if WINDOWS || WINDOWSPHONE7
+ if (t == typeof(IHostLifecycleNotifications))
+ {
+ return (T)(object)new HostLifecycleNotifications();
+ }
+#endif
+
+ if (t == typeof(IQueryServices))
+ {
+ //
+ // We perform this Debugger.IsAttached check early rather than deferring
+ // the decision to intercept query operator methods to the debugger
+ // assembly that's dynamically discovered here. Also, it's a reasonable
+ // expectation it'd be pretty hard to turn on interception dynamically
+ // upon a debugger attach event, so we should make this check early.
+ //
+ // In the initial release of v2.0 (RTM), we won't have the corresponding
+ // debugger assembly available yet, so the dynamic load would always
+ // fail. We also don't want to take the price of (an attempt to) a dynamic
+ // assembly load for the regular production case.
+ //
+ if (Debugger.IsAttached)
+ {
+#if NETCF35
+ var name = "System.Reactive.Linq.QueryDebugger, System.Reactive.Debugger";
+#else
+#if CRIPPLED_REFLECTION
+ var ifType = t.GetTypeInfo();
+#else
+ var ifType = t;
+#endif
+ var asm = new AssemblyName(ifType.Assembly.FullName);
+ asm.Name = "System.Reactive.Debugger";
+ var name = "System.Reactive.Linq.QueryDebugger, " + asm.FullName;
+#endif
+ var dbg = Type.GetType(name, false);
+ if (dbg != null)
+ return (T)(object)Activator.CreateInstance(dbg);
+ }
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs
new file mode 100644
index 0000000..04a4040
--- /dev/null
+++ b/Rx.NET/System.Reactive.PlatformServices/Reactive/Internal/StopwatchImpl.cs
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_STOPWATCH
+using System.Diagnostics;
+
+namespace System.Reactive.Concurrency
+{
+ //
+ // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
+ // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
+ // behavior of Rx for PLIB when used on a more capable platform.
+ //
+ internal class /*Default*/StopwatchImpl : IStopwatch
+ {
+ private readonly Stopwatch _sw;
+
+ public StopwatchImpl()
+ {
+ _sw = Stopwatch.StartNew();
+ }
+
+ public TimeSpan Elapsed
+ {
+ get { return _sw.Elapsed; }
+ }
+ }
+}
+#endif \ No newline at end of file