Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Core/Reactive/Internal')
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs42
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs100
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs75
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs576
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs316
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs17
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs30
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs48
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs113
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs51
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs126
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs109
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs102
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs154
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs100
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs71
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs441
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs116
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs143
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs68
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs23
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs55
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs40
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs113
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs149
25 files changed, 3178 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs
new file mode 100644
index 0000000..a2237ae
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs
@@ -0,0 +1,42 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Reactive.Concurrency;
+
+namespace System.Reactive
+{
+ internal class AsyncLockObserver<T> : ObserverBase<T>
+ {
+ private readonly AsyncLock _gate;
+ private readonly IObserver<T> _observer;
+
+ public AsyncLockObserver(IObserver<T> observer, AsyncLock gate)
+ {
+ _gate = gate;
+ _observer = observer;
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ _gate.Wait(() =>
+ {
+ _observer.OnNext(value);
+ });
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ _gate.Wait(() =>
+ {
+ _observer.OnError(exception);
+ });
+ }
+
+ protected override void OnCompletedCore()
+ {
+ _gate.Wait(() =>
+ {
+ _observer.OnCompleted();
+ });
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs
new file mode 100644
index 0000000..b4d67dc
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs
@@ -0,0 +1,100 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Reactive.Disposables;
+
+namespace System.Reactive
+{
+ class AutoDetachObserver<T> : ObserverBase<T>
+ {
+ private readonly IObserver<T> observer;
+ private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable();
+
+ public AutoDetachObserver(IObserver<T> observer)
+ {
+ this.observer = observer;
+ }
+
+ public IDisposable Disposable
+ {
+ set { m.Disposable = value; }
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ //
+ // Safeguarding of the pipeline against rogue observers is required for proper
+ // resource cleanup. Consider the following example:
+ //
+ // var xs = Observable.Interval(TimeSpan.FromSeconds(1));
+ // var ys = <some random sequence>;
+ // var res = xs.CombineLatest(ys, (x, y) => x + y);
+ //
+ // The marble diagram of the query above looks as follows:
+ //
+ // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
+ // | | | | | | | | |
+ // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
+ // | | | | | | | | | | | | | |
+ // v v v v v v v v v v v v v v
+ // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
+ // |
+ // @#&
+ //
+ // Notice the free-threaded nature of Rx, where messages on the resulting sequence
+ // are produced by either of the two input sequences to CombineLatest.
+ //
+ // Now assume an exception happens in the OnNext callback for the observer of res,
+ // at the indicated point marked with @#& above. The callback runs in the context
+ // of ys, so the exception will take down the scheduler thread of ys. This by
+ // itself is a problem (that can be mitigated by a Catch operator on IScheduler),
+ // but notice how the timer that produces xs is kept alive.
+ //
+ // The safe-guarding code below ensures the acquired resources are disposed when
+ // the user callback throws.
+ //
+ var __noError = false;
+ try
+ {
+ observer.OnNext(value);
+ __noError = true;
+ }
+ finally
+ {
+ if (!__noError)
+ Dispose();
+ }
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ try
+ {
+ observer.OnError(exception);
+ }
+ finally
+ {
+ Dispose();
+ }
+ }
+
+ protected override void OnCompletedCore()
+ {
+ try
+ {
+ observer.OnCompleted();
+ }
+ finally
+ {
+ Dispose();
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ m.Dispose();
+ }
+ }
+} \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs
new file mode 100644
index 0000000..ca201f4
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs
@@ -0,0 +1,75 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Threading;
+
+namespace System.Reactive
+{
+ internal class CheckedObserver<T> : IObserver<T>
+ {
+ private readonly IObserver<T> _observer;
+ private int _state;
+
+ private const int IDLE = 0;
+ private const int BUSY = 1;
+ private const int DONE = 2;
+
+ public CheckedObserver(IObserver<T> observer)
+ {
+ _observer = observer;
+ }
+
+ public void OnNext(T value)
+ {
+ CheckAccess();
+
+ try
+ {
+ _observer.OnNext(value);
+ }
+ finally
+ {
+ Interlocked.Exchange(ref _state, IDLE);
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ CheckAccess();
+
+ try
+ {
+ _observer.OnError(error);
+ }
+ finally
+ {
+ Interlocked.Exchange(ref _state, DONE);
+ }
+ }
+
+ public void OnCompleted()
+ {
+ CheckAccess();
+
+ try
+ {
+ _observer.OnCompleted();
+ }
+ finally
+ {
+ Interlocked.Exchange(ref _state, DONE);
+ }
+ }
+
+ private void CheckAccess()
+ {
+ switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE))
+ {
+ case BUSY:
+ throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED);
+ case DONE:
+ throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED);
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs
new file mode 100644
index 0000000..8b7ec81
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs
@@ -0,0 +1,576 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+/*
+ * WARNING: Auto-generated file (7/18/2012 4:59:53 PM)
+ *
+ * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentDictionary.cs
+ */
+
+#if NO_CDS_COLLECTIONS
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Reflection;
+using System.Threading;
+
+namespace System.Collections.Concurrent
+{
+ internal class ConcurrentDictionary<TKey, TValue>
+ {
+ /* >>> Code copied from the Array class */
+
+ // We impose limits on maximum array lenght in each dimension to allow efficient
+ // implementation of advanced range check elimination in future.
+ // Keep in sync with vm\gcscan.cpp and HashHelpers.MaxPrimeArrayLength.
+ internal const int MaxArrayLength = 0X7FEFFFFF;
+
+ /* <<< Code copied from the Array class */
+
+ private class Tables
+ {
+ internal readonly Node[] m_buckets; // A singly-linked list for each bucket.
+ internal readonly object[] m_locks; // A set of locks, each guarding a section of the table.
+ internal volatile int[] m_countPerLock; // The number of elements guarded by each lock.
+
+ internal Tables(Node[] buckets, object[] locks, int[] countPerLock)
+ {
+ m_buckets = buckets;
+ m_locks = locks;
+ m_countPerLock = countPerLock;
+ }
+ }
+
+ private volatile Tables m_tables; // Internal tables of the dictionary
+ private readonly IEqualityComparer<TKey> m_comparer; // Key equality comparer
+ private readonly bool m_growLockArray; // Whether to dynamically increase the size of the striped lock
+ private int m_budget; // The maximum number of elements per lock before a resize operation is triggered
+
+ // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
+ // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
+ // and blocking, but also the more expensive operations that require all locks become (e.g. table
+ // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
+ // compromise.
+ private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
+
+ // The default capacity, i.e. the initial # of buckets. When choosing this value, we are making
+ // a trade-off between the size of a very small dictionary, and the number of resizes when
+ // constructing a large dictionary. Also, the capacity should not be divisible by a small prime.
+ private const int DEFAULT_CAPACITY = 31;
+
+ // The maximum size of the striped lock that will not be exceeded when locks are automatically
+ // added as the dictionary grows. However, the user is allowed to exceed this limit by passing
+ // a concurrency level larger than MAX_LOCK_NUMBER into the constructor.
+ private const int MAX_LOCK_NUMBER = 1024;
+
+ // Whether TValue is a type that can be written atomically (i.e., with no danger of torn reads)
+ private static readonly bool s_isValueWriteAtomic = IsValueWriteAtomic();
+
+ private static bool IsValueWriteAtomic()
+ {
+ Type valueType = typeof(TValue);
+
+ //
+ // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without
+ // the risk of tearing.
+ //
+ // See http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-335.pdf
+ //
+ bool isAtomic =
+ (valueType.GetTypeInfo().IsClass)
+ || valueType == typeof(Boolean)
+ || valueType == typeof(Char)
+ || valueType == typeof(Byte)
+ || valueType == typeof(SByte)
+ || valueType == typeof(Int16)
+ || valueType == typeof(UInt16)
+ || valueType == typeof(Int32)
+ || valueType == typeof(UInt32)
+ || valueType == typeof(Single);
+
+ if (!isAtomic && IntPtr.Size == 8)
+ {
+ isAtomic |= valueType == typeof(Double) || valueType == typeof(Int64);
+ }
+
+ return isAtomic;
+ }
+
+ public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { }
+
+ internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
+ {
+ if (concurrencyLevel < 1)
+ {
+ throw new ArgumentOutOfRangeException("concurrencyLevel");
+ }
+ if (capacity < 0)
+ {
+ throw new ArgumentOutOfRangeException("capacity");
+ }
+ if (comparer == null) throw new ArgumentNullException("comparer");
+
+ // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard
+ // any buckets.
+ if (capacity < concurrencyLevel)
+ {
+ capacity = concurrencyLevel;
+ }
+
+ object[] locks = new object[concurrencyLevel];
+ for (int i = 0; i < locks.Length; i++)
+ {
+ locks[i] = new object();
+ }
+
+ int[] countPerLock = new int[locks.Length];
+ Node[] buckets = new Node[capacity];
+ m_tables = new Tables(buckets, locks, countPerLock);
+
+ m_comparer = comparer;
+ m_growLockArray = growLockArray;
+ m_budget = buckets.Length / locks.Length;
+ }
+
+ public bool TryAdd(TKey key, TValue value)
+ {
+ if (key == null) throw new ArgumentNullException("key");
+ TValue dummy;
+ return TryAddInternal(key, value, false, true, out dummy);
+ }
+
+ public bool TryRemove(TKey key, out TValue value)
+ {
+ if (key == null) throw new ArgumentNullException("key");
+
+ return TryRemoveInternal(key, out value, false, default(TValue));
+ }
+
+ [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
+ private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
+ {
+ while (true)
+ {
+ Tables tables = m_tables;
+
+ int bucketNo, lockNo;
+ GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);
+
+ lock (tables.m_locks[lockNo])
+ {
+ // If the table just got resized, we may not be holding the right lock, and must retry.
+ // This should be a rare occurence.
+ if (tables != m_tables)
+ {
+ continue;
+ }
+
+ Node prev = null;
+ for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)
+ {
+ if (m_comparer.Equals(curr.m_key, key))
+ {
+ if (matchValue)
+ {
+ bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);
+ if (!valuesMatch)
+ {
+ value = default(TValue);
+ return false;
+ }
+ }
+
+ if (prev == null)
+ {
+ Volatile.Write<Node>(ref tables.m_buckets[bucketNo], curr.m_next);
+ }
+ else
+ {
+ prev.m_next = curr.m_next;
+ }
+
+ value = curr.m_value;
+ tables.m_countPerLock[lockNo]--;
+ return true;
+ }
+ prev = curr;
+ }
+ }
+
+ value = default(TValue);
+ return false;
+ }
+ }
+
+ [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
+ public bool TryGetValue(TKey key, out TValue value)
+ {
+ if (key == null) throw new ArgumentNullException("key");
+
+ int bucketNo, lockNoUnused;
+
+ // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize.
+ Tables tables = m_tables;
+
+ GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNoUnused, tables.m_buckets.Length, tables.m_locks.Length);
+
+ // We can get away w/out a lock here.
+ // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i].
+ Node n = Volatile.Read<Node>(ref tables.m_buckets[bucketNo]);
+
+ while (n != null)
+ {
+ if (m_comparer.Equals(n.m_key, key))
+ {
+ value = n.m_value;
+ return true;
+ }
+ n = n.m_next;
+ }
+
+ value = default(TValue);
+ return false;
+ }
+
+ [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
+ private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
+ {
+ int hashcode = m_comparer.GetHashCode(key);
+
+ while (true)
+ {
+ int bucketNo, lockNo;
+
+ Tables tables = m_tables;
+ GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);
+
+ bool resizeDesired = false;
+ bool lockTaken = false;
+ try
+ {
+ if (acquireLock)
+ Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);
+
+ // If the table just got resized, we may not be holding the right lock, and must retry.
+ // This should be a rare occurence.
+ if (tables != m_tables)
+ {
+ continue;
+ }
+
+ // Try to find this key in the bucket
+ Node prev = null;
+ for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
+ {
+ if (m_comparer.Equals(node.m_key, key))
+ {
+ // The key was found in the dictionary. If updates are allowed, update the value for that key.
+ // We need to create a new node for the update, in order to support TValue types that cannot
+ // be written atomically, since lock-free reads may be happening concurrently.
+ if (updateIfExists)
+ {
+ if (s_isValueWriteAtomic)
+ {
+ node.m_value = value;
+ }
+ else
+ {
+ Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
+ if (prev == null)
+ {
+ tables.m_buckets[bucketNo] = newNode;
+ }
+ else
+ {
+ prev.m_next = newNode;
+ }
+ }
+ resultingValue = value;
+ }
+ else
+ {
+ resultingValue = node.m_value;
+ }
+ return false;
+ }
+ prev = node;
+ }
+
+ // The key was not found in the bucket. Insert the key-value pair.
+ Volatile.Write<Node>(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
+ checked
+ {
+ tables.m_countPerLock[lockNo]++;
+ }
+
+ //
+ // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table.
+ // It is also possible that GrowTable will increase the budget but won't resize the bucket table.
+ // That happens if the bucket table is found to be poorly utilized due to a bad hash function.
+ //
+ if (tables.m_countPerLock[lockNo] > m_budget)
+ {
+ resizeDesired = true;
+ }
+ }
+ finally
+ {
+ if (lockTaken)
+ Monitor.Exit(tables.m_locks[lockNo]);
+ }
+
+ //
+ // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table.
+ //
+ // Concurrency notes:
+ // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks.
+ // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0
+ // and then verify that the table we passed to it as the argument is still the current table.
+ //
+ if (resizeDesired)
+ {
+ GrowTable(tables);
+ }
+
+ resultingValue = value;
+ return true;
+ }
+ }
+
+ public ICollection<TValue> Values
+ {
+ get { return GetValues(); }
+ }
+
+ private void GrowTable(Tables tables)
+ {
+ int locksAcquired = 0;
+ try
+ {
+ // The thread that first obtains m_locks[0] will be the one doing the resize operation
+ AcquireLocks(0, 1, ref locksAcquired);
+
+ // Make sure nobody resized the table while we were waiting for lock 0:
+ if (tables != m_tables)
+ {
+ // We assume that since the table reference is different, it was already resized (or the budget
+ // was adjusted). If we ever decide to do table shrinking, or replace the table for other reasons,
+ // we will have to revisit this logic.
+ return;
+ }
+
+ // Compute the (approx.) total size. Use an Int64 accumulation variable to avoid an overflow.
+ long approxCount = 0;
+ for (int i = 0; i < tables.m_countPerLock.Length; i++)
+ {
+ approxCount += tables.m_countPerLock[i];
+ }
+
+ //
+ // If the bucket array is too empty, double the budget instead of resizing the table
+ //
+ if (approxCount < tables.m_buckets.Length / 4)
+ {
+ m_budget = 2 * m_budget;
+ if (m_budget < 0)
+ {
+ m_budget = int.MaxValue;
+ }
+ return;
+ }
+
+
+ // Compute the new table size. We find the smallest integer larger than twice the previous table size, and not divisible by
+ // 2,3,5 or 7. We can consider a different table-sizing policy in the future.
+ int newLength = 0;
+ bool maximizeTableSize = false;
+ try
+ {
+ checked
+ {
+ // Double the size of the buckets table and add one, so that we have an odd integer.
+ newLength = tables.m_buckets.Length * 2 + 1;
+
+ // Now, we only need to check odd integers, and find the first that is not divisible
+ // by 3, 5 or 7.
+ while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
+ {
+ newLength += 2;
+ }
+
+ if (newLength > MaxArrayLength)
+ {
+ maximizeTableSize = true;
+ }
+ }
+ }
+ catch (OverflowException)
+ {
+ maximizeTableSize = true;
+ }
+
+ if (maximizeTableSize)
+ {
+ newLength = MaxArrayLength;
+
+ // We want to make sure that GrowTable will not be called again, since table is at the maximum size.
+ // To achieve that, we set the budget to int.MaxValue.
+ //
+ // (There is one special case that would allow GrowTable() to be called in the future:
+ // calling Clear() on the ConcurrentDictionary will shrink the table and lower the budget.)
+ m_budget = int.MaxValue;
+ }
+
+ // Now acquire all other locks for the table
+ AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);
+
+ object[] newLocks = tables.m_locks;
+
+ // Add more locks
+ if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)
+ {
+ newLocks = new object[tables.m_locks.Length * 2];
+ Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);
+
+ for (int i = tables.m_locks.Length; i < newLocks.Length; i++)
+ {
+ newLocks[i] = new object();
+ }
+ }
+
+ Node[] newBuckets = new Node[newLength];
+ int[] newCountPerLock = new int[newLocks.Length];
+
+ // Copy all data into a new table, creating new nodes for all elements
+ for (int i = 0; i < tables.m_buckets.Length; i++)
+ {
+ Node current = tables.m_buckets[i];
+ while (current != null)
+ {
+ Node next = current.m_next;
+ int newBucketNo, newLockNo;
+ GetBucketAndLockNo(current.m_hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length);
+
+ newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, current.m_hashcode, newBuckets[newBucketNo]);
+
+ checked
+ {
+ newCountPerLock[newLockNo]++;
+ }
+
+ current = next;
+ }
+ }
+
+ // Adjust the budget
+ m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);
+
+ // Replace tables with the new versions
+ m_tables = new Tables(newBuckets, newLocks, newCountPerLock);
+ }
+ finally
+ {
+ // Release all locks that we took earlier
+ ReleaseLocks(0, locksAcquired);
+ }
+ }
+
+ private void GetBucketAndLockNo(
+ int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
+ {
+ bucketNo = (hashcode & 0x7fffffff) % bucketCount;
+ lockNo = bucketNo % lockCount;
+ }
+
+ private static int DefaultConcurrencyLevel
+ {
+ get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
+ }
+
+ private void AcquireAllLocks(ref int locksAcquired)
+ {
+ // First, acquire lock 0
+ AcquireLocks(0, 1, ref locksAcquired);
+
+ // Now that we have lock 0, the m_locks array will not change (i.e., grow),
+ // and so we can safely read m_locks.Length.
+ AcquireLocks(1, m_tables.m_locks.Length, ref locksAcquired);
+ }
+
+ private void AcquireLocks(int fromInclusive, int toExclusive, ref int locksAcquired)
+ {
+ object[] locks = m_tables.m_locks;
+
+ for (int i = fromInclusive; i < toExclusive; i++)
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.Enter(locks[i], ref lockTaken);
+ }
+ finally
+ {
+ if (lockTaken)
+ {
+ locksAcquired++;
+ }
+ }
+ }
+ }
+
+ [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
+ private void ReleaseLocks(int fromInclusive, int toExclusive)
+ {
+ for (int i = fromInclusive; i < toExclusive; i++)
+ {
+ Monitor.Exit(m_tables.m_locks[i]);
+ }
+ }
+
+ [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")]
+ private ReadOnlyCollection<TValue> GetValues()
+ {
+ int locksAcquired = 0;
+ try
+ {
+ AcquireAllLocks(ref locksAcquired);
+ List<TValue> values = new List<TValue>();
+
+ for (int i = 0; i < m_tables.m_buckets.Length; i++)
+ {
+ Node current = m_tables.m_buckets[i];
+ while (current != null)
+ {
+ values.Add(current.m_value);
+ current = current.m_next;
+ }
+ }
+
+ return new ReadOnlyCollection<TValue>(values);
+ }
+ finally
+ {
+ ReleaseLocks(0, locksAcquired);
+ }
+ }
+
+ private class Node
+ {
+ internal TKey m_key;
+ internal TValue m_value;
+ internal volatile Node m_next;
+ internal int m_hashcode;
+
+ internal Node(TKey key, TValue value, int hashcode, Node next)
+ {
+ m_key = key;
+ m_value = value;
+ m_next = next;
+ m_hashcode = hashcode;
+ }
+ }
+ }
+}
+
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs
new file mode 100644
index 0000000..76ab088
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs
@@ -0,0 +1,316 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+/*
+ * WARNING: Auto-generated file (7/18/2012 4:47:38 PM)
+ *
+ * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentQueue.cs
+ */
+
+#if NO_CDS_COLLECTIONS
+
+#pragma warning disable 0420
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.Contracts;
+using System.Threading;
+
+namespace System.Collections.Concurrent
+{
+ internal class ConcurrentQueue<T>
+ {
+ private volatile Segment m_head;
+ private volatile Segment m_tail;
+
+ private const int SEGMENT_SIZE = 32;
+
+ public ConcurrentQueue()
+ {
+ m_head = m_tail = new Segment(0, this);
+ }
+
+ public bool IsEmpty
+ {
+ get
+ {
+ Segment head = m_head;
+ if (!head.IsEmpty)
+ //fast route 1:
+ //if current head is not empty, then queue is not empty
+ return false;
+ else if (head.Next == null)
+ //fast route 2:
+ //if current head is empty and it's the last segment
+ //then queue is empty
+ return true;
+ else
+ //slow route:
+ //current head is empty and it is NOT the last segment,
+ //it means another thread is growing new segment
+ {
+ SpinWait spin = new SpinWait();
+ while (head.IsEmpty)
+ {
+ if (head.Next == null)
+ return true;
+
+ spin.SpinOnce();
+ head = m_head;
+ }
+ return false;
+ }
+ }
+ }
+
+ public void Enqueue(T item)
+ {
+ SpinWait spin = new SpinWait();
+ while (true)
+ {
+ Segment tail = m_tail;
+ if (tail.TryAppend(item))
+ return;
+ spin.SpinOnce();
+ }
+ }
+
+ public bool TryDequeue(out T result)
+ {
+ while (!IsEmpty)
+ {
+ Segment head = m_head;
+ if (head.TryRemove(out result))
+ return true;
+ //since method IsEmpty spins, we don't need to spin in the while loop
+ }
+ result = default(T);
+ return false;
+ }
+
+ private class Segment
+ {
+ //we define two volatile arrays: m_array and m_state. Note that the accesses to the array items
+ //do not get volatile treatment. But we don't need to worry about loading adjacent elements or
+ //store/load on adjacent elements would suffer reordering.
+ // - Two stores: these are at risk, but CLRv2 memory model guarantees store-release hence we are safe.
+ // - Two loads: because one item from two volatile arrays are accessed, the loads of the array references
+ // are sufficient to prevent reordering of the loads of the elements.
+ internal volatile T[] m_array;
+
+ // For each entry in m_array, the corresponding entry in m_state indicates whether this position contains
+ // a valid value. m_state is initially all false.
+ internal volatile VolatileBool[] m_state;
+
+ //pointer to the next segment. null if the current segment is the last segment
+ private volatile Segment m_next;
+
+ //We use this zero based index to track how many segments have been created for the queue, and
+ //to compute how many active segments are there currently.
+ // * The number of currently active segments is : m_tail.m_index - m_head.m_index + 1;
+ // * m_index is incremented with every Segment.Grow operation. We use Int64 type, and we can safely
+ // assume that it never overflows. To overflow, we need to do 2^63 increments, even at a rate of 4
+ // billion (2^32) increments per second, it takes 2^31 seconds, which is about 64 years.
+ internal readonly long m_index;
+
+ //indices of where the first and last valid values
+ // - m_low points to the position of the next element to pop from this segment, range [0, infinity)
+ // m_low >= SEGMENT_SIZE implies the segment is disposable
+ // - m_high points to the position of the latest pushed element, range [-1, infinity)
+ // m_high == -1 implies the segment is new and empty
+ // m_high >= SEGMENT_SIZE-1 means this segment is ready to grow.
+ // and the thread who sets m_high to SEGMENT_SIZE-1 is responsible to grow the segment
+ // - Math.Min(m_low, SEGMENT_SIZE) > Math.Min(m_high, SEGMENT_SIZE-1) implies segment is empty
+ // - initially m_low =0 and m_high=-1;
+ private volatile int m_low;
+ private volatile int m_high;
+
+ private volatile ConcurrentQueue<T> m_source;
+
+ internal Segment(long index, ConcurrentQueue<T> source)
+ {
+ m_array = new T[SEGMENT_SIZE];
+ m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false
+ m_high = -1;
+ Contract.Assert(index >= 0);
+ m_index = index;
+ m_source = source;
+ }
+
+ internal Segment Next
+ {
+ get { return m_next; }
+ }
+
+ internal bool IsEmpty
+ {
+ get { return (Low > High); }
+ }
+
+ internal void UnsafeAdd(T value)
+ {
+ Contract.Assert(m_high < SEGMENT_SIZE - 1);
+ m_high++;
+ m_array[m_high] = value;
+ m_state[m_high].m_value = true;
+ }
+
+ internal Segment UnsafeGrow()
+ {
+ Contract.Assert(m_high >= SEGMENT_SIZE - 1);
+ Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow
+ m_next = newSegment;
+ return newSegment;
+ }
+
+ internal void Grow()
+ {
+ //no CAS is needed, since there is no contention (other threads are blocked, busy waiting)
+ Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow
+ m_next = newSegment;
+ Contract.Assert(m_source.m_tail == this);
+ m_source.m_tail = m_next;
+ }
+
+ internal bool TryAppend(T value)
+ {
+ //quickly check if m_high is already over the boundary, if so, bail out
+ if (m_high >= SEGMENT_SIZE - 1)
+ {
+ return false;
+ }
+
+ //Now we will use a CAS to increment m_high, and store the result in newhigh.
+ //Depending on how many free spots left in this segment and how many threads are doing this Increment
+ //at this time, the returning "newhigh" can be
+ // 1) < SEGMENT_SIZE - 1 : we took a spot in this segment, and not the last one, just insert the value
+ // 2) == SEGMENT_SIZE - 1 : we took the last spot, insert the value AND grow the segment
+ // 3) > SEGMENT_SIZE - 1 : we failed to reserve a spot in this segment, we return false to
+ // Queue.Enqueue method, telling it to try again in the next segment.
+
+ int newhigh = SEGMENT_SIZE; //initial value set to be over the boundary
+
+ //We need do Interlocked.Increment and value/state update in a finally block to ensure that they run
+ //without interuption. This is to prevent anything from happening between them, and another dequeue
+ //thread maybe spinning forever to wait for m_state[] to be true;
+ try
+ { }
+ finally
+ {
+ newhigh = Interlocked.Increment(ref m_high);
+ if (newhigh <= SEGMENT_SIZE - 1)
+ {
+ m_array[newhigh] = value;
+ m_state[newhigh].m_value = true;
+ }
+
+ //if this thread takes up the last slot in the segment, then this thread is responsible
+ //to grow a new segment. Calling Grow must be in the finally block too for reliability reason:
+ //if thread abort during Grow, other threads will be left busy spinning forever.
+ if (newhigh == SEGMENT_SIZE - 1)
+ {
+ Grow();
+ }
+ }
+
+ //if newhigh <= SEGMENT_SIZE-1, it means the current thread successfully takes up a spot
+ return newhigh <= SEGMENT_SIZE - 1;
+ }
+
+ internal bool TryRemove(out T result)
+ {
+ SpinWait spin = new SpinWait();
+ int lowLocal = Low, highLocal = High;
+ while (lowLocal <= highLocal)
+ {
+ //try to update m_low
+ if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal)
+ {
+ //if the specified value is not available (this spot is taken by a push operation,
+ // but the value is not written into yet), then spin
+ SpinWait spinLocal = new SpinWait();
+ while (!m_state[lowLocal].m_value)
+ {
+ spinLocal.SpinOnce();
+ }
+ result = m_array[lowLocal];
+ m_array[lowLocal] = default(T); //release the reference to the object.
+
+ //if the current thread sets m_low to SEGMENT_SIZE, which means the current segment becomes
+ //disposable, then this thread is responsible to dispose this segment, and reset m_head
+ if (lowLocal + 1 >= SEGMENT_SIZE)
+ {
+ // Invariant: we only dispose the current m_head, not any other segment
+ // In usual situation, disposing a segment is simply seting m_head to m_head.m_next
+ // But there is one special case, where m_head and m_tail points to the same and ONLY
+ //segment of the queue: Another thread A is doing Enqueue and finds that it needs to grow,
+ //while the *current* thread is doing *this* Dequeue operation, and finds that it needs to
+ //dispose the current (and ONLY) segment. Then we need to wait till thread A finishes its
+ //Grow operation, this is the reason of having the following while loop
+ spinLocal = new SpinWait();
+ while (m_next == null)
+ {
+ spinLocal.SpinOnce();
+ }
+ Contract.Assert(m_source.m_head == this);
+ m_source.m_head = m_next;
+ }
+ return true;
+ }
+ else
+ {
+ //CAS failed due to contention: spin briefly and retry
+ spin.SpinOnce();
+ lowLocal = Low; highLocal = High;
+ }
+ }//end of while
+ result = default(T);
+ return false;
+ }
+
+ internal bool TryPeek(out T result)
+ {
+ result = default(T);
+ int lowLocal = Low;
+ if (lowLocal > High)
+ return false;
+ SpinWait spin = new SpinWait();
+ while (!m_state[lowLocal].m_value)
+ {
+ spin.SpinOnce();
+ }
+ result = m_array[lowLocal];
+ return true;
+ }
+
+ internal int Low
+ {
+ get
+ {
+ return Math.Min(m_low, SEGMENT_SIZE);
+ }
+ }
+
+ internal int High
+ {
+ get
+ {
+ //if m_high > SEGMENT_SIZE, it means it's out of range, we should return
+ //SEGMENT_SIZE-1 as the logical position
+ return Math.Min(m_high, SEGMENT_SIZE - 1);
+ }
+ }
+
+ }
+ }//end of class Segment
+
+ struct VolatileBool
+ {
+ public VolatileBool(bool value)
+ {
+ m_value = value;
+ }
+ public volatile bool m_value;
+ }
+}
+
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs
new file mode 100644
index 0000000..a43acb2
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs
@@ -0,0 +1,17 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ // We can't make those based on the Strings_Core.resx file, because the ObsoleteAttribute needs a compile-time constant.
+
+ class Constants_Core
+ {
+ private const string OBSOLETE_REFACTORING = "This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies.";
+
+ public const string OBSOLETE_SCHEDULER_NEWTHREAD = OBSOLETE_REFACTORING + " Please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use NewThreadScheduler.Default to obtain an instance of this scheduler type. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information.";
+ public const string OBSOLETE_SCHEDULER_TASKPOOL = OBSOLETE_REFACTORING + " Please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use TaskPoolScheduler.Default to obtain an instance of this scheduler type. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information.";
+ public const string OBSOLETE_SCHEDULER_THREADPOOL = OBSOLETE_REFACTORING + " Consider using Scheduler.Default to obtain the platform's most appropriate pool-based scheduler. In order to access a specific pool-based scheduler, please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use the appropriate scheduler in the System.Reactive.Concurrency namespace. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information.";
+
+ public const string OBSOLETE_SCHEDULEREQUIRED = "This instance property is no longer supported. Use CurrentThreadScheduler.IsScheduleRequired instead. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information.";
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs
new file mode 100644
index 0000000..209bd55
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if HAS_EDI
+namespace System.Reactive.PlatformServices
+{
+ //
+ // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
+ // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
+ // behavior of Rx for PLIB when used on a more capable platform.
+ //
+ internal class DefaultExceptionServices/*Impl*/ : IExceptionServices
+ {
+ public void Rethrow(Exception exception)
+ {
+ System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw();
+ }
+ }
+}
+#else
+namespace System.Reactive.PlatformServices
+{
+ internal class DefaultExceptionServices : IExceptionServices
+ {
+ public void Rethrow(Exception exception)
+ {
+ throw exception;
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs
new file mode 100644
index 0000000..1fceeba
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.ComponentModel;
+using System.Reactive.PlatformServices;
+
+namespace System.Reactive
+{
+ internal static class ExceptionHelpers
+ {
+ private static Lazy<IExceptionServices> s_services = new Lazy<IExceptionServices>(Initialize);
+
+ public static void Throw(this Exception exception)
+ {
+ s_services.Value.Rethrow(exception);
+ }
+
+ public static void ThrowIfNotNull(this Exception exception)
+ {
+ if (exception != null)
+ s_services.Value.Rethrow(exception);
+ }
+
+ private static IExceptionServices Initialize()
+ {
+ return PlatformEnlightenmentProvider.Current.GetService<IExceptionServices>() ?? new DefaultExceptionServices();
+ }
+ }
+}
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// (Infrastructure) Services to rethrow exceptions.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// No guarantees are made about forward compatibility of the type's functionality and its usage.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public interface IExceptionServices
+ {
+ /// <summary>
+ /// Rethrows the specified exception.
+ /// </summary>
+ /// <param name="exception">Exception to rethrow.</param>
+ void Rethrow(Exception exception);
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs
new file mode 100644
index 0000000..a8f6a7a
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs
@@ -0,0 +1,113 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.ComponentModel;
+using System.Threading;
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// (Infrastructure) Provides access to the host's lifecycle management services.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public static class HostLifecycleService
+ {
+ private static Lazy<IHostLifecycleNotifications> s_notifications = new Lazy<IHostLifecycleNotifications>(InitializeNotifications);
+
+ private static int _refCount;
+
+ /// <summary>
+ /// Event that gets raised when the host suspends the application.
+ /// </summary>
+ public static event EventHandler<HostSuspendingEventArgs> Suspending;
+
+ /// <summary>
+ /// Event that gets raised when the host resumes the application.
+ /// </summary>
+ public static event EventHandler<HostResumingEventArgs> Resuming;
+
+ /// <summary>
+ /// Adds a reference to the host lifecycle manager, causing it to be sending notifications.
+ /// </summary>
+ public static void AddRef()
+ {
+ if (Interlocked.Increment(ref _refCount) == 1)
+ {
+ var notifications = s_notifications.Value;
+ if (notifications != null)
+ {
+ notifications.Suspending += OnSuspending;
+ notifications.Resuming += OnResuming;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Removes a reference to the host lifecycle manager, causing it to stop sending notifications
+ /// if the removed reference was the last one.
+ /// </summary>
+ public static void Release()
+ {
+ if (Interlocked.Decrement(ref _refCount) == 0)
+ {
+ var notifications = s_notifications.Value;
+ if (notifications != null)
+ {
+ notifications.Suspending -= OnSuspending;
+ notifications.Resuming -= OnResuming;
+ }
+ }
+ }
+
+ private static void OnSuspending(object sender, HostSuspendingEventArgs e)
+ {
+ var suspending = Suspending;
+ if (suspending != null)
+ suspending(sender, e);
+ }
+
+ private static void OnResuming(object sender, HostResumingEventArgs e)
+ {
+ var resuming = Resuming;
+ if (resuming != null)
+ resuming(sender, e);
+ }
+
+ private static IHostLifecycleNotifications InitializeNotifications()
+ {
+ return PlatformEnlightenmentProvider.Current.GetService<IHostLifecycleNotifications>();
+ }
+ }
+
+ /// <summary>
+ /// (Infrastructure) Provides notifications about the host's lifecycle events.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public interface IHostLifecycleNotifications
+ {
+ /// <summary>
+ /// Event that gets raised when the host suspends.
+ /// </summary>
+ event EventHandler<HostSuspendingEventArgs> Suspending;
+
+ /// <summary>
+ /// Event that gets raised when the host resumes.
+ /// </summary>
+ event EventHandler<HostResumingEventArgs> Resuming;
+ }
+
+ /// <summary>
+ /// (Infrastructure) Event arguments for host suspension events.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public class HostSuspendingEventArgs : EventArgs
+ {
+ }
+
+ /// <summary>
+ /// (Infrastructure) Event arguments for host resumption events.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public class HostResumingEventArgs : EventArgs
+ {
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs
new file mode 100644
index 0000000..5cc9b2d
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs
@@ -0,0 +1,51 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ internal class ImmutableList<T>
+ {
+ T[] data;
+
+ public ImmutableList()
+ {
+ data = new T[0];
+ }
+
+ public ImmutableList(T[] data)
+ {
+ this.data = data;
+ }
+
+ public ImmutableList<T> Add(T value)
+ {
+ var newData = new T[data.Length + 1];
+ Array.Copy(data, newData, data.Length);
+ newData[data.Length] = value;
+ return new ImmutableList<T>(newData);
+ }
+
+ public ImmutableList<T> Remove(T value)
+ {
+ var i = IndexOf(value);
+ if (i < 0)
+ return this;
+ var newData = new T[data.Length - 1];
+ Array.Copy(data, 0, newData, 0, i);
+ Array.Copy(data, i + 1, newData, i, data.Length - i - 1);
+ return new ImmutableList<T>(newData);
+ }
+
+ public int IndexOf(T value)
+ {
+ for (var i = 0; i < data.Length; ++i)
+ if (data[i].Equals(value))
+ return i;
+ return -1;
+ }
+
+ public T[] Data
+ {
+ get { return data; }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs
new file mode 100644
index 0000000..4094dce
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs
@@ -0,0 +1,126 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_LAZY
+#pragma warning disable 0420
+
+//
+// Based on ndp\clr\src\BCL\System\Lazy.cs but with LazyThreadSafetyMode.ExecutionAndPublication mode behavior hardcoded.
+//
+
+using System.Diagnostics;
+using System.Threading;
+using System.Reactive;
+
+namespace System
+{
+ internal class Lazy<T>
+ {
+ class Boxed
+ {
+ internal Boxed(T value)
+ {
+ m_value = value;
+ }
+
+ internal T m_value;
+ }
+
+ static Func<T> ALREADY_INVOKED_SENTINEL = delegate { return default(T); };
+
+ private object m_boxed;
+ private Func<T> m_valueFactory;
+ private volatile object m_threadSafeObj;
+
+ public Lazy(Func<T> valueFactory)
+ {
+ m_threadSafeObj = new object();
+ m_valueFactory = valueFactory;
+ }
+
+#if !NO_DEBUGGER_ATTRIBUTES
+ [DebuggerBrowsable(DebuggerBrowsableState.Never)]
+#endif
+ public T Value
+ {
+ get
+ {
+ Boxed boxed = null;
+ if (m_boxed != null)
+ {
+ boxed = m_boxed as Boxed;
+ if (boxed != null)
+ {
+ return boxed.m_value;
+ }
+
+ var exc = m_boxed as Exception;
+ exc.Throw();
+ }
+
+ return LazyInitValue();
+ }
+ }
+
+ private T LazyInitValue()
+ {
+ Boxed boxed = null;
+ object threadSafeObj = m_threadSafeObj;
+ bool lockTaken = false;
+ try
+ {
+ if (threadSafeObj != (object)ALREADY_INVOKED_SENTINEL)
+ {
+ Monitor.Enter(threadSafeObj);
+ lockTaken = true;
+ }
+
+ if (m_boxed == null)
+ {
+ boxed = CreateValue();
+ m_boxed = boxed;
+ m_threadSafeObj = ALREADY_INVOKED_SENTINEL;
+ }
+ else
+ {
+ boxed = m_boxed as Boxed;
+ if (boxed == null)
+ {
+ var exc = m_boxed as Exception;
+ exc.Throw();
+ }
+ }
+ }
+ finally
+ {
+ if (lockTaken)
+ Monitor.Exit(threadSafeObj);
+ }
+
+ return boxed.m_value;
+ }
+
+ private Boxed CreateValue()
+ {
+ Boxed boxed = null;
+ try
+ {
+ if (m_valueFactory == ALREADY_INVOKED_SENTINEL)
+ throw new InvalidOperationException();
+
+ Func<T> factory = m_valueFactory;
+ m_valueFactory = ALREADY_INVOKED_SENTINEL;
+
+ boxed = new Boxed(factory());
+ }
+ catch (Exception ex)
+ {
+ m_boxed = ex;
+ throw;
+ }
+
+ return boxed;
+ }
+ }
+}
+#pragma warning restore 0420
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs
new file mode 100644
index 0000000..6a2e2f8
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs
@@ -0,0 +1,109 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ internal class NopObserver<T> : IObserver<T>
+ {
+ public static readonly IObserver<T> Instance = new NopObserver<T>();
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(T value)
+ {
+ }
+ }
+
+ internal class DoneObserver<T> : IObserver<T>
+ {
+ public static readonly IObserver<T> Completed = new DoneObserver<T>();
+
+ public Exception Exception { get; set; }
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(T value)
+ {
+ }
+ }
+
+ internal class DisposedObserver<T> : IObserver<T>
+ {
+ public static readonly IObserver<T> Instance = new DisposedObserver<T>();
+
+ public void OnCompleted()
+ {
+ throw new ObjectDisposedException("");
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new ObjectDisposedException("");
+ }
+
+ public void OnNext(T value)
+ {
+ throw new ObjectDisposedException("");
+ }
+ }
+
+ internal class Observer<T> : IObserver<T>
+ {
+ private readonly ImmutableList<IObserver<T>> _observers;
+
+ public Observer(ImmutableList<IObserver<T>> observers)
+ {
+ _observers = observers;
+ }
+
+ public void OnCompleted()
+ {
+ foreach (var observer in _observers.Data)
+ observer.OnCompleted();
+ }
+
+ public void OnError(Exception error)
+ {
+ foreach (var observer in _observers.Data)
+ observer.OnError(error);
+ }
+
+ public void OnNext(T value)
+ {
+ foreach (var observer in _observers.Data)
+ observer.OnNext(value);
+ }
+
+ internal IObserver<T> Add(IObserver<T> observer)
+ {
+ return new Observer<T>(_observers.Add(observer));
+ }
+
+ internal IObserver<T> Remove(IObserver<T> observer)
+ {
+ var i = Array.IndexOf(_observers.Data, observer);
+ if (i < 0)
+ return this;
+
+ if (_observers.Data.Length == 2)
+ {
+ return _observers.Data[1 - i];
+ }
+ else
+ {
+ return new Observer<T>(_observers.Remove(observer));
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs
new file mode 100644
index 0000000..f2483e8
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs
@@ -0,0 +1,102 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.ComponentModel;
+using System.Reflection;
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// (Infrastructure) Interface for enlightenment providers.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// No guarantees are made about forward compatibility of the type's functionality and its usage.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public interface IPlatformEnlightenmentProvider
+ {
+ /// <summary>
+ /// (Infastructure) Tries to gets the specified service.
+ /// </summary>
+ /// <typeparam name="T">Service type.</typeparam>
+ /// <param name="args">Optional set of arguments.</param>
+ /// <returns>Service instance or null if not found.</returns>
+ T GetService<T>(params object[] args) where T : class;
+ }
+
+ /// <summary>
+ /// (Infrastructure) Provider for platform-specific framework enlightenments.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public static class PlatformEnlightenmentProvider
+ {
+ private static readonly object s_gate = new object();
+ private static IPlatformEnlightenmentProvider s_current;
+
+ /// <summary>
+ /// (Infrastructure) Gets the current enlightenment provider. If none is loaded yet, accessing this property triggers provider resolution.
+ /// </summary>
+ /// <remarks>
+ /// This member is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// </remarks>
+ public static IPlatformEnlightenmentProvider Current
+ {
+ get
+ {
+ if (s_current == null)
+ {
+ lock (s_gate)
+ {
+ if (s_current == null)
+ {
+ //
+ // TODO: Investigate whether we can simplify this logic to just use "System.Reactive.PlatformServices.PlatformEnlightenmentProvider, System.Reactive.PlatformServices".
+ // It turns out this doesn't quite work on Silverlight. On the other hand, in .NET Compact Framework 3.5, we mysteriously have to use that path.
+ //
+
+#if NETCF35
+ var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, System.Reactive.PlatformServices";
+#else
+#if CRIPPLED_REFLECTION
+ var ifType = typeof(IPlatformEnlightenmentProvider).GetTypeInfo();
+#else
+ var ifType = typeof(IPlatformEnlightenmentProvider);
+#endif
+ var asm = new AssemblyName(ifType.Assembly.FullName);
+ asm.Name = "System.Reactive.PlatformServices";
+ var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, " + asm.FullName;
+#endif
+
+ var t = Type.GetType(name, false);
+ if (t != null)
+ s_current = (IPlatformEnlightenmentProvider)Activator.CreateInstance(t);
+ else
+ s_current = new DefaultPlatformEnlightenmentProvider();
+ }
+ }
+ }
+
+ return s_current;
+ }
+
+ set
+ {
+ lock (s_gate)
+ {
+ s_current = value;
+ }
+ }
+ }
+ }
+
+ class DefaultPlatformEnlightenmentProvider : IPlatformEnlightenmentProvider
+ {
+ public T GetService<T>(object[] args) where T : class
+ {
+ return null;
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs
new file mode 100644
index 0000000..8a02cd5
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs
@@ -0,0 +1,154 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Threading;
+using System.Collections.Generic;
+
+namespace System.Reactive
+{
+ internal class PriorityQueue<T> where T : IComparable<T>
+ {
+#if !NO_INTERLOCKED_64
+ private static long _count = long.MinValue;
+#else
+ private static int _count = int.MinValue;
+#endif
+ private IndexedItem[] _items;
+ private int _size;
+
+ public PriorityQueue()
+ : this(16)
+ {
+ }
+
+ public PriorityQueue(int capacity)
+ {
+ _items = new IndexedItem[capacity];
+ _size = 0;
+ }
+
+ private bool IsHigherPriority(int left, int right)
+ {
+ return _items[left].CompareTo(_items[right]) < 0;
+ }
+
+ private void Percolate(int index)
+ {
+ if (index >= _size || index < 0)
+ return;
+ var parent = (index - 1) / 2;
+ if (parent < 0 || parent == index)
+ return;
+
+ if (IsHigherPriority(index, parent))
+ {
+ var temp = _items[index];
+ _items[index] = _items[parent];
+ _items[parent] = temp;
+ Percolate(parent);
+ }
+ }
+
+ private void Heapify()
+ {
+ Heapify(0);
+ }
+
+ private void Heapify(int index)
+ {
+ if (index >= _size || index < 0)
+ return;
+
+ var left = 2 * index + 1;
+ var right = 2 * index + 2;
+ var first = index;
+
+ if (left < _size && IsHigherPriority(left, first))
+ first = left;
+ if (right < _size && IsHigherPriority(right, first))
+ first = right;
+ if (first != index)
+ {
+ var temp = _items[index];
+ _items[index] = _items[first];
+ _items[first] = temp;
+ Heapify(first);
+ }
+ }
+
+ public int Count { get { return _size; } }
+
+ public T Peek()
+ {
+ if (_size == 0)
+ throw new InvalidOperationException(Strings_Core.HEAP_EMPTY);
+
+ return _items[0].Value;
+ }
+
+ private void RemoveAt(int index)
+ {
+ _items[index] = _items[--_size];
+ _items[_size] = default(IndexedItem);
+ Heapify();
+ if (_size < _items.Length / 4)
+ {
+ var temp = _items;
+ _items = new IndexedItem[_items.Length / 2];
+ Array.Copy(temp, 0, _items, 0, _size);
+ }
+ }
+
+ public T Dequeue()
+ {
+ var result = Peek();
+ RemoveAt(0);
+ return result;
+ }
+
+ public void Enqueue(T item)
+ {
+ if (_size >= _items.Length)
+ {
+ var temp = _items;
+ _items = new IndexedItem[_items.Length * 2];
+ Array.Copy(temp, _items, temp.Length);
+ }
+
+ var index = _size++;
+ _items[index] = new IndexedItem { Value = item, Id = Interlocked.Increment(ref _count) };
+ Percolate(index);
+ }
+
+ public bool Remove(T item)
+ {
+ for (var i = 0; i < _size; ++i)
+ {
+ if (EqualityComparer<T>.Default.Equals(_items[i].Value, item))
+ {
+ RemoveAt(i);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ struct IndexedItem : IComparable<IndexedItem>
+ {
+ public T Value;
+#if !NO_INTERLOCKED_64
+ public long Id;
+#else
+ public int Id;
+#endif
+
+ public int CompareTo(IndexedItem other)
+ {
+ var c = Value.CompareTo(other.Value);
+ if (c == 0)
+ c = Id.CompareTo(other.Id);
+ return c;
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs
new file mode 100644
index 0000000..f03b45f
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs
@@ -0,0 +1,100 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+
+namespace System.Reactive
+{
+ /// <summary>
+ /// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
+ internal interface IProducer<
+#if !NO_VARIANCE
+ out
+#endif
+ TSource> : IObservable<TSource>
+ {
+ IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard);
+ }
+
+ /// <summary>
+ /// Base class for implementation of query operators, providing performance benefits over the use of Observable.Create.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
+ internal abstract class Producer<TSource> : IProducer<TSource>
+ {
+ /// <summary>
+ /// Publicly visible Subscribe method.
+ /// </summary>
+ /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
+ /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
+ public IDisposable Subscribe(IObserver<TSource> observer)
+ {
+ if (observer == null)
+ throw new ArgumentNullException("observer");
+
+ return SubscribeRaw(observer, true);
+ }
+
+ public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard)
+ {
+ var state = new State();
+ state.observer = observer;
+ state.sink = new SingleAssignmentDisposable();
+ state.subscription = new SingleAssignmentDisposable();
+
+ var d = new CompositeDisposable(2) { state.sink, state.subscription };
+
+ //
+ // See AutoDetachObserver.cs for more information on the safeguarding requirement and
+ // its implementation aspects.
+ //
+ if (enableSafeguard)
+ {
+ state.observer = SafeObserver<TSource>.Create(state.observer, d);
+ }
+
+ if (CurrentThreadScheduler.IsScheduleRequired)
+ {
+ CurrentThreadScheduler.Instance.Schedule(state, Run);
+ }
+ else
+ {
+ state.subscription.Disposable = this.Run(state.observer, state.subscription, state.Assign);
+ }
+
+ return d;
+ }
+
+ struct State
+ {
+ public SingleAssignmentDisposable sink;
+ public SingleAssignmentDisposable subscription;
+ public IObserver<TSource> observer;
+
+ public void Assign(IDisposable s)
+ {
+ sink.Disposable = s;
+ }
+ }
+
+ private IDisposable Run(IScheduler _, State x)
+ {
+ x.subscription.Disposable = this.Run(x.observer, x.subscription, x.Assign);
+ return Disposable.Empty;
+ }
+
+ /// <summary>
+ /// Core implementation of the query operator, called upon a new subscription to the producer object.
+ /// </summary>
+ /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
+ /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param>
+ /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param>
+ /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns>
+ /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks>
+ protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink);
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs
new file mode 100644
index 0000000..2693569
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs
@@ -0,0 +1,71 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+
+namespace System.Reactive
+{
+ //
+ // See AutoDetachObserver.cs for more information on the safeguarding requirement and
+ // its implementation aspects.
+ //
+
+ class SafeObserver<TSource> : IObserver<TSource>
+ {
+ private readonly IObserver<TSource> _observer;
+ private readonly IDisposable _disposable;
+
+ public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable)
+ {
+ var a = observer as AnonymousObserver<TSource>;
+ if (a != null)
+ return a.MakeSafe(disposable);
+ else
+ return new SafeObserver<TSource>(observer, disposable);
+ }
+
+ private SafeObserver(IObserver<TSource> observer, IDisposable disposable)
+ {
+ _observer = observer;
+ _disposable = disposable;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var __noError = false;
+ try
+ {
+ _observer.OnNext(value);
+ __noError = true;
+ }
+ finally
+ {
+ if (!__noError)
+ _disposable.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ try
+ {
+ _observer.OnError(error);
+ }
+ finally
+ {
+ _disposable.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ try
+ {
+ _observer.OnCompleted();
+ }
+ finally
+ {
+ _disposable.Dispose();
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs
new file mode 100644
index 0000000..2b728e2
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs
@@ -0,0 +1,441 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive
+{
+#if !NO_PERF && !NO_CDS
+ using System.Collections.Concurrent;
+ using System.Diagnostics;
+
+ internal class ScheduledObserver<T> : ObserverBase<T>, IDisposable
+ {
+ private volatile int _state = 0;
+ private const int STOPPED = 0;
+ private const int RUNNING = 1;
+ private const int PENDING = 2;
+ private const int FAULTED = 9;
+
+ private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
+ private volatile bool _failed;
+ private volatile Exception _error;
+ private volatile bool _completed;
+
+ private readonly IObserver<T> _observer;
+ private readonly IScheduler _scheduler;
+ private readonly ISchedulerLongRunning _longRunning;
+ private readonly SerialDisposable _disposable = new SerialDisposable();
+
+ public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
+ {
+ _scheduler = scheduler;
+ _observer = observer;
+ _longRunning = _scheduler.AsLongRunning();
+
+ if (_longRunning != null)
+ _dispatcherEvent = new SemaphoreSlim(0);
+ }
+
+ private readonly object _dispatcherInitGate = new object();
+ private SemaphoreSlim _dispatcherEvent;
+ private IDisposable _dispatcherJob;
+
+ private void EnsureDispatcher()
+ {
+ if (_dispatcherJob == null)
+ {
+ lock (_dispatcherInitGate)
+ {
+ if (_dispatcherJob == null)
+ {
+ _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
+
+ _disposable.Disposable = new CompositeDisposable(2)
+ {
+ _dispatcherJob,
+ Disposable.Create(() => _dispatcherEvent.Release())
+ };
+ }
+ }
+ }
+ }
+
+ private void Dispatch(ICancelable cancel)
+ {
+ while (true)
+ {
+ _dispatcherEvent.Wait();
+
+ if (cancel.IsDisposed)
+ return;
+
+ var next = default(T);
+ while (_queue.TryDequeue(out next))
+ {
+ try
+ {
+ _observer.OnNext(next);
+ }
+ catch
+ {
+ var nop = default(T);
+ while (_queue.TryDequeue(out nop))
+ ;
+
+ throw;
+ }
+
+ _dispatcherEvent.Wait();
+
+ if (cancel.IsDisposed)
+ return;
+ }
+
+ if (_failed)
+ {
+ _observer.OnError(_error);
+ Dispose();
+ return;
+ }
+
+ if (_completed)
+ {
+ _observer.OnCompleted();
+ Dispose();
+ return;
+ }
+ }
+ }
+
+ public void EnsureActive()
+ {
+ EnsureActive(1);
+ }
+
+ public void EnsureActive(int n)
+ {
+ if (_longRunning != null)
+ {
+ if (n > 0)
+ _dispatcherEvent.Release(n);
+
+ EnsureDispatcher();
+ }
+ else
+ EnsureActiveSlow();
+ }
+
+ private void EnsureActiveSlow()
+ {
+ var isOwner = false;
+
+#pragma warning disable 0420
+ while (true)
+ {
+ var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
+ if (old == STOPPED)
+ {
+ isOwner = true; // RUNNING
+ break;
+ }
+
+ if (old == FAULTED)
+ return;
+
+ // If we find the consumer loop running, we transition to PENDING to handle
+ // the case where the queue is seen empty by the consumer, making it transition
+ // to the STOPPED state, but we inserted an item into the queue.
+ //
+ // C: _queue.TryDequeue == false (RUNNING)
+ // ----------------------------------------------
+ // P: _queue.Enqueue(...)
+ // EnsureActive
+ // Exchange(ref _state, RUNNING) == RUNNING
+ // ----------------------------------------------
+ // C: transition to STOPPED (STOPPED)
+ //
+ // In this case, P would believe C is running and not invoke the scheduler
+ // using the isOwner flag.
+ //
+ // By introducing an intermediate PENDING state and using CAS in the consumer
+ // to only transition to STOPPED in case we were still RUNNING, we can force
+ // the consumer to reconsider the decision to transition to STOPPED. In that
+ // case, the consumer loops again and re-reads from the queue and other state
+ // fields. At least one bit of state will have changed because EnsureActive
+ // should only be called after invocation of IObserver<T> methods that touch
+ // this state.
+ //
+ if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
+ break;
+ }
+#pragma warning restore 0420
+
+ if (isOwner)
+ {
+ _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
+ }
+ }
+
+ private void Run(object state, Action<object> recurse)
+ {
+#pragma warning disable 0420
+ var next = default(T);
+ while (!_queue.TryDequeue(out next))
+ {
+ if (_failed)
+ {
+ // Between transitioning to _failed and the queue check in the loop,
+ // items could have been queued, so we can't stop yet. We don't spin
+ // and immediately re-check the queue.
+ //
+ // C: _queue.TryDequeue == false
+ // ----------------------------------------------
+ // P: OnNext(...)
+ // _queue.Enqueue(...) // Will get lost
+ // P: OnError(...)
+ // _failed = true
+ // ----------------------------------------------
+ // C: if (_failed)
+ // _observer.OnError(...) // Lost an OnNext
+ //
+ if (!_queue.IsEmpty)
+ continue;
+
+ Interlocked.Exchange(ref _state, STOPPED);
+ _observer.OnError(_error);
+ Dispose();
+ return;
+ }
+
+ if (_completed)
+ {
+ // Between transitioning to _completed and the queue check in the loop,
+ // items could have been queued, so we can't stop yet. We don't spin
+ // and immediately re-check the queue.
+ //
+ // C: _queue.TryDequeue == false
+ // ----------------------------------------------
+ // P: OnNext(...)
+ // _queue.Enqueue(...) // Will get lost
+ // P: OnCompleted(...)
+ // _completed = true
+ // ----------------------------------------------
+ // C: if (_completed)
+ // _observer.OnCompleted() // Lost an OnNext
+ //
+ if (!_queue.IsEmpty)
+ continue;
+
+ Interlocked.Exchange(ref _state, STOPPED);
+ _observer.OnCompleted();
+ Dispose();
+ return;
+ }
+
+ var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
+ if (old == RUNNING || old == FAULTED)
+ return;
+
+ Debug.Assert(old == PENDING);
+
+ // The producer has put us in the PENDING state to prevent us from
+ // transitioning to STOPPED, so we go RUNNING again and re-check our state.
+ _state = RUNNING;
+ }
+
+ Interlocked.Exchange(ref _state, RUNNING);
+
+#pragma warning restore 0420
+
+ try
+ {
+ _observer.OnNext(next);
+ }
+ catch
+ {
+#pragma warning disable 0420
+ Interlocked.Exchange(ref _state, FAULTED);
+#pragma warning restore 0420
+
+ var nop = default(T);
+ while (_queue.TryDequeue(out nop))
+ ;
+ throw;
+ }
+
+ recurse(state);
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ _queue.Enqueue(value);
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ _error = exception;
+ _failed = true;
+ }
+
+ protected override void OnCompletedCore()
+ {
+ _completed = true;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ {
+ _disposable.Dispose();
+ }
+ }
+ }
+#else
+ class ScheduledObserver<T> : ObserverBase<T>, IDisposable
+ {
+ private bool _isAcquired = false;
+ private bool _hasFaulted = false;
+ private readonly Queue<Action> _queue = new Queue<Action>();
+ private readonly IObserver<T> _observer;
+ private readonly IScheduler _scheduler;
+ private readonly SerialDisposable _disposable = new SerialDisposable();
+
+ public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
+ {
+ _scheduler = scheduler;
+ _observer = observer;
+ }
+
+ public void EnsureActive(int n)
+ {
+ EnsureActive();
+ }
+
+ public void EnsureActive()
+ {
+ var isOwner = false;
+
+ lock (_queue)
+ {
+ if (!_hasFaulted && _queue.Count > 0)
+ {
+ isOwner = !_isAcquired;
+ _isAcquired = true;
+ }
+ }
+
+ if (isOwner)
+ {
+ _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
+ }
+ }
+
+ private void Run(object state, Action<object> recurse)
+ {
+ var work = default(Action);
+ lock (_queue)
+ {
+ if (_queue.Count > 0)
+ work = _queue.Dequeue();
+ else
+ {
+ _isAcquired = false;
+ return;
+ }
+ }
+
+ try
+ {
+ work();
+ }
+ catch
+ {
+ lock (_queue)
+ {
+ _queue.Clear();
+ _hasFaulted = true;
+ }
+ throw;
+ }
+
+ recurse(state);
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ lock (_queue)
+ _queue.Enqueue(() => _observer.OnNext(value));
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ lock (_queue)
+ _queue.Enqueue(() => _observer.OnError(exception));
+ }
+
+ protected override void OnCompletedCore()
+ {
+ lock (_queue)
+ _queue.Enqueue(() => _observer.OnCompleted());
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ {
+ _disposable.Dispose();
+ }
+ }
+ }
+#endif
+
+ class ObserveOnObserver<T> : ScheduledObserver<T>
+ {
+ private IDisposable _cancel;
+
+ public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
+ : base(scheduler, observer)
+ {
+ _cancel = cancel;
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ base.OnNextCore(value);
+ EnsureActive();
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ base.OnErrorCore(exception);
+ EnsureActive();
+ }
+
+ protected override void OnCompletedCore()
+ {
+ base.OnCompletedCore();
+ EnsureActive();
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ {
+ var cancel = Interlocked.Exchange(ref _cancel, null);
+ if (cancel != null)
+ {
+ cancel.Dispose();
+ }
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs
new file mode 100644
index 0000000..4f5eeee
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs
@@ -0,0 +1,116 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_SEMAPHORE && SILVERLIGHT
+using System;
+using System.Threading;
+
+namespace System.Reactive.Threading
+{
+ //Monitor based implementation of Semaphore
+ //that mimicks the .NET Semaphore class (System.Threading.Semaphore)
+
+ internal sealed class Semaphore : IDisposable
+ {
+ private int m_currentCount;
+ private readonly int m_maximumCount;
+ private readonly object m_lockObject;
+ private bool m_disposed;
+
+ public Semaphore(int initialCount, int maximumCount)
+ {
+ if (initialCount < 0)
+ {
+ throw new ArgumentOutOfRangeException("initialCount", "Non-negative number required.");
+ }
+ if (maximumCount < 1)
+ {
+ throw new ArgumentOutOfRangeException("maximumCount", "Positive number required.");
+ }
+ if (initialCount > maximumCount)
+ {
+ throw new ArgumentException("Initial count must be smaller than maximum");
+ }
+
+ m_currentCount = initialCount;
+ m_maximumCount = maximumCount;
+ m_lockObject = new object();
+ }
+
+ public int Release()
+ {
+ return this.Release(1);
+ }
+
+ public int Release(int releaseCount)
+ {
+ if (releaseCount < 1)
+ {
+ throw new ArgumentOutOfRangeException("releaseCount", "Positive number required.");
+ }
+ if (m_disposed)
+ {
+ throw new ObjectDisposedException("Semaphore");
+ }
+
+ var oldCount = default(int);
+ lock (m_lockObject)
+ {
+ oldCount = m_currentCount;
+ if (releaseCount + m_currentCount > m_maximumCount)
+ {
+ throw new ArgumentOutOfRangeException("releaseCount", "Amount of releases would overflow maximum");
+ }
+ m_currentCount += releaseCount;
+ //PulseAll makes sure all waiting threads get queued for acquiring the lock
+ //Pulse would only queue one thread.
+
+ Monitor.PulseAll(m_lockObject);
+ }
+ return oldCount;
+ }
+
+ public bool WaitOne()
+ {
+ return WaitOne(Timeout.Infinite);
+ }
+
+ public bool WaitOne(int millisecondsTimeout)
+ {
+ if (m_disposed)
+ {
+ throw new ObjectDisposedException("Semaphore");
+ }
+
+ lock (m_lockObject)
+ {
+ while (m_currentCount == 0)
+ {
+ if (!Monitor.Wait(m_lockObject, millisecondsTimeout))
+ {
+ return false;
+ }
+ }
+ m_currentCount--;
+ return true;
+ }
+ }
+
+ public bool WaitOne(TimeSpan timeout)
+ {
+ return WaitOne((int)timeout.TotalMilliseconds);
+ }
+
+ public void Close()
+ {
+ Dispose();
+ }
+
+ public void Dispose()
+ {
+ //the .NET CLR semaphore does not release waits upon dispose
+ //so we don't do that either.
+ m_disposed = true;
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs
new file mode 100644
index 0000000..847c14d
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs
@@ -0,0 +1,143 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_SEMAPHORE && (XNA || NETCF)
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace System.Reactive.Threading
+{
+ //Monitor based implementation of Semaphore
+ //that mimicks the .NET Semaphore class (System.Threading.Semaphore)
+
+ internal sealed class Semaphore : IDisposable
+ {
+ private int m_currentCount;
+ private readonly int m_maximumCount;
+ private readonly object m_lockObject;
+ private bool m_disposed;
+ private readonly List<ManualResetEvent> m_waiting;
+
+ public Semaphore(int initialCount, int maximumCount)
+ {
+ if (initialCount < 0)
+ {
+ throw new ArgumentOutOfRangeException("initialCount", "Non-negative number required.");
+ }
+ if (maximumCount < 1)
+ {
+ throw new ArgumentOutOfRangeException("maximumCount", "Positive number required.");
+ }
+ if (initialCount > maximumCount)
+ {
+ throw new ArgumentException("Initial count must be smaller than maximum");
+ }
+ m_waiting = new List<ManualResetEvent>();
+ m_currentCount = initialCount;
+ m_maximumCount = maximumCount;
+ m_lockObject = new object();
+ }
+
+ public int Release()
+ {
+ return this.Release(1);
+ }
+
+ public int Release(int releaseCount)
+ {
+ if (releaseCount < 1)
+ {
+ throw new ArgumentOutOfRangeException("releaseCount", "Positive number required.");
+ }
+ if (m_disposed)
+ {
+ throw new ObjectDisposedException("Semaphore");
+ }
+
+ var oldCount = default(int);
+ var toBeReleased = new List<ManualResetEvent>();
+ lock (m_lockObject)
+ {
+ oldCount = m_currentCount;
+ if (releaseCount + m_currentCount > m_maximumCount)
+ {
+ throw new ArgumentOutOfRangeException("releaseCount", "Amount of releases would overflow maximum");
+ }
+
+ var waiting = m_waiting.ToArray();
+ var left = Math.Max(0, releaseCount - waiting.Length);
+ for (var i = 0; i < releaseCount && i < m_waiting.Count; i++)
+ {
+ toBeReleased.Add(waiting[i]);
+ m_waiting.RemoveAt(0);
+ }
+ m_currentCount += left;
+ }
+ foreach(var release in toBeReleased)
+ {
+ release.Set();
+ }
+ return oldCount;
+ }
+
+ public bool WaitOne()
+ {
+ return WaitOne(Timeout.Infinite);
+ }
+
+ public bool WaitOne(int millisecondsTimeout)
+ {
+ if (m_disposed)
+ {
+ throw new ObjectDisposedException("Semaphore");
+ }
+
+ var manualResetEvent = default(ManualResetEvent);
+
+ lock (m_lockObject)
+ {
+ if (m_currentCount == 0)
+ {
+ manualResetEvent = new ManualResetEvent(false);
+ m_waiting.Add(manualResetEvent);
+ }
+ else
+ {
+ m_currentCount--;
+ return true;
+ }
+ }
+#if XNA_31_ZUNE || NETCF35
+ if (!manualResetEvent.WaitOne(millisecondsTimeout, false))
+#else
+ if (!manualResetEvent.WaitOne(millisecondsTimeout))
+#endif
+ {
+ lock(m_lockObject)
+ {
+ m_waiting.Remove(manualResetEvent);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ public bool WaitOne(TimeSpan timeout)
+ {
+ return WaitOne((int)timeout.TotalMilliseconds);
+ }
+
+ public void Close()
+ {
+ Dispose();
+ }
+
+ public void Dispose()
+ {
+ //the .NET CLR semaphore does not release waits upon dispose
+ //so we don't do that either.
+ m_disposed = true;
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs
new file mode 100644
index 0000000..ffe6ee5
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs
@@ -0,0 +1,68 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System.Threading;
+
+namespace System.Reactive
+{
+ /// <summary>
+ /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
+ /// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks>
+ internal abstract class Sink<TSource> : IDisposable
+ {
+ protected internal volatile IObserver<TSource> _observer;
+ private IDisposable _cancel;
+
+ public Sink(IObserver<TSource> observer, IDisposable cancel)
+ {
+ _observer = observer;
+ _cancel = cancel;
+ }
+
+ public virtual void Dispose()
+ {
+ _observer = NopObserver<TSource>.Instance;
+
+ var cancel = Interlocked.Exchange(ref _cancel, null);
+ if (cancel != null)
+ {
+ cancel.Dispose();
+ }
+ }
+
+ public IObserver<TSource> GetForwarder()
+ {
+ return new _(this);
+ }
+
+ class _ : IObserver<TSource>
+ {
+ private readonly Sink<TSource> _forward;
+
+ public _(Sink<TSource> forward)
+ {
+ _forward = forward;
+ }
+
+ public void OnNext(TSource value)
+ {
+ _forward._observer.OnNext(value);
+ }
+
+ public void OnError(Exception error)
+ {
+ _forward._observer.OnError(error);
+ _forward.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ _forward._observer.OnCompleted();
+ _forward.Dispose();
+ }
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs
new file mode 100644
index 0000000..d3c2374
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs
@@ -0,0 +1,23 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ internal static class Stubs<T>
+ {
+ public static readonly Action<T> Ignore = _ => { };
+ public static readonly Func<T, T> I = _ => _;
+ }
+
+ internal static class Stubs
+ {
+ public static readonly Action Nop = () => { };
+ public static readonly Action<Exception> Throw = ex => { ex.Throw(); };
+ }
+
+#if !NO_THREAD
+ internal static class TimerStubs
+ {
+ public static readonly System.Threading.Timer Never = new System.Threading.Timer(_ => { });
+ }
+#endif
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs
new file mode 100644
index 0000000..c417f90
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs
@@ -0,0 +1,55 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_SYNCCTX
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace System.Reactive.Concurrency
+{
+ internal static class SynchronizationContextExtensions
+ {
+ public static void PostWithStartComplete<T>(this SynchronizationContext context, Action<T> action, T state)
+ {
+ context.OperationStarted();
+
+ context.Post(
+ o =>
+ {
+ try
+ {
+ action((T)o);
+ }
+ finally
+ {
+ context.OperationCompleted();
+ }
+ },
+ state
+ );
+ }
+
+ public static void PostWithStartComplete(this SynchronizationContext context, Action action)
+ {
+ context.OperationStarted();
+
+ context.Post(
+ _ =>
+ {
+ try
+ {
+ action();
+ }
+ finally
+ {
+ context.OperationCompleted();
+ }
+ },
+ null
+ );
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs
new file mode 100644
index 0000000..e41294e
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs
@@ -0,0 +1,40 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ internal class SynchronizedObserver<T> : ObserverBase<T>
+ {
+ private readonly object _gate;
+ private readonly IObserver<T> _observer;
+
+ public SynchronizedObserver(IObserver<T> observer, object gate)
+ {
+ _gate = gate;
+ _observer = observer;
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ lock (_gate)
+ {
+ _observer.OnNext(value);
+ }
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ lock (_gate)
+ {
+ _observer.OnError(exception);
+ }
+ }
+
+ protected override void OnCompletedCore()
+ {
+ lock (_gate)
+ {
+ _observer.OnCompleted();
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs
new file mode 100644
index 0000000..b9c8167
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs
@@ -0,0 +1,113 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.ComponentModel;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// (Infrastructure) Provides access to the local system clock.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public class DefaultSystemClock : ISystemClock
+ {
+ /// <summary>
+ /// Gets the current time.
+ /// </summary>
+ public DateTimeOffset UtcNow
+ {
+ get { return DateTimeOffset.UtcNow; }
+ }
+ }
+
+ internal class DefaultSystemClockMonitor : PeriodicTimerSystemClockMonitor
+ {
+ private static readonly TimeSpan DEFAULT_PERIOD = TimeSpan.FromSeconds(1);
+
+ public DefaultSystemClockMonitor()
+ : base(DEFAULT_PERIOD)
+ {
+ }
+ }
+
+ /// <summary>
+ /// (Infrastructure) Monitors for system clock changes based on a periodic timer.
+ /// </summary>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged
+ {
+ private readonly TimeSpan _period;
+ private readonly SerialDisposable _timer;
+
+ private DateTimeOffset _lastTime;
+ private EventHandler<SystemClockChangedEventArgs> _systemClockChanged;
+
+ private const int SYNC_MAXRETRIES = 100;
+ private const double SYNC_MAXDELTA = 10;
+ private const int MAXERROR = 100;
+
+ /// <summary>
+ /// Creates a new monitor for system clock changes with the specified polling frequency.
+ /// </summary>
+ /// <param name="period">Polling frequency for system clock changes.</param>
+ public PeriodicTimerSystemClockMonitor(TimeSpan period)
+ {
+ _period = period;
+ _timer = new SerialDisposable();
+ }
+
+ /// <summary>
+ /// Event that gets raised when a system clock change is detected.
+ /// </summary>
+ public event EventHandler<SystemClockChangedEventArgs> SystemClockChanged
+ {
+ add
+ {
+ NewTimer();
+
+ _systemClockChanged += value;
+ }
+
+ remove
+ {
+ _systemClockChanged -= value;
+
+ _timer.Disposable = Disposable.Empty;
+ }
+ }
+
+ private void NewTimer()
+ {
+ _timer.Disposable = Disposable.Empty;
+
+ var n = 0;
+ do
+ {
+ _lastTime = SystemClock.UtcNow;
+ _timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period);
+ } while (Math.Abs((SystemClock.UtcNow - _lastTime).TotalMilliseconds) > SYNC_MAXDELTA && ++n < SYNC_MAXRETRIES);
+
+ if (n >= SYNC_MAXRETRIES)
+ throw new InvalidOperationException(Strings_Core.FAILED_CLOCK_MONITORING);
+ }
+
+ private void TimeChanged()
+ {
+ var now = SystemClock.UtcNow;
+ var diff = now - (_lastTime + _period);
+ if (Math.Abs(diff.TotalMilliseconds) >= MAXERROR)
+ {
+ var scc = _systemClockChanged;
+ if (scc != null)
+ scc(this, new SystemClockChangedEventArgs(_lastTime + _period, now));
+
+ NewTimer();
+ }
+ else
+ {
+ _lastTime = SystemClock.UtcNow;
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs
new file mode 100644
index 0000000..e482dd0
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs
@@ -0,0 +1,149 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.ComponentModel;
+using System.Threading;
+
+namespace System.Reactive.PlatformServices
+{
+ /// <summary>
+ /// (Infrastructure) Provides access to local system clock services.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// No guarantees are made about forward compatibility of the type's functionality and its usage.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public static class SystemClock
+ {
+ private static Lazy<ISystemClock> s_serviceSystemClock = new Lazy<ISystemClock>(InitializeSystemClock);
+ private static Lazy<INotifySystemClockChanged> s_serviceSystemClockChanged = new Lazy<INotifySystemClockChanged>(InitializeSystemClockChanged);
+
+ private static int _refCount;
+
+ /// <summary>
+ /// Gets the local system clock time.
+ /// </summary>
+ public static DateTimeOffset UtcNow
+ {
+ get { return s_serviceSystemClock.Value.UtcNow; }
+ }
+
+ /// <summary>
+ /// Event that gets raised when a system clock change is detected, if there's any interest as indicated by AddRef calls.
+ /// </summary>
+ public static event EventHandler<SystemClockChangedEventArgs> SystemClockChanged;
+
+ /// <summary>
+ /// Adds a reference to the system clock monitor, causing it to be sending notifications.
+ /// </summary>
+ /// <exception cref="NotSupportedException">Thrown when the system doesn't support sending clock change notifications.</exception>
+ public static void AddRef()
+ {
+ if (Interlocked.Increment(ref _refCount) == 1)
+ {
+ s_serviceSystemClockChanged.Value.SystemClockChanged += OnSystemClockChanged;
+ }
+ }
+
+ /// <summary>
+ /// Removes a reference to the system clock monitor, causing it to stop sending notifications
+ /// if the removed reference was the last one.
+ /// </summary>
+ public static void Release()
+ {
+ if (Interlocked.Decrement(ref _refCount) == 0)
+ {
+ s_serviceSystemClockChanged.Value.SystemClockChanged -= OnSystemClockChanged;
+ }
+ }
+
+ private static void OnSystemClockChanged(object sender, SystemClockChangedEventArgs e)
+ {
+ var scc = SystemClockChanged;
+ if (scc != null)
+ scc(sender, e);
+ }
+
+ private static ISystemClock InitializeSystemClock()
+ {
+ return PlatformEnlightenmentProvider.Current.GetService<ISystemClock>() ?? new DefaultSystemClock();
+ }
+
+ private static INotifySystemClockChanged InitializeSystemClockChanged()
+ {
+ return PlatformEnlightenmentProvider.Current.GetService<INotifySystemClockChanged>() ?? new DefaultSystemClockMonitor();
+ }
+ }
+
+ /// <summary>
+ /// (Infrastructure) Provides access to the local system clock.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// No guarantees are made about forward compatibility of the type's functionality and its usage.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public interface ISystemClock
+ {
+ /// <summary>
+ /// Gets the current time.
+ /// </summary>
+ DateTimeOffset UtcNow { get; }
+ }
+
+ /// <summary>
+ /// (Infrastructure) Provides a mechanism to notify local schedulers about system clock changes.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// No guarantees are made about forward compatibility of the type's functionality and its usage.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public interface INotifySystemClockChanged
+ {
+ /// <summary>
+ /// Event that gets raised when a system clock change is detected.
+ /// </summary>
+ event EventHandler<SystemClockChangedEventArgs> SystemClockChanged;
+ }
+
+ /// <summary>
+ /// (Infrastructure) Event arguments for system clock change notifications.
+ /// </summary>
+ /// <remarks>
+ /// This type is used by the Rx infrastructure and not meant for public consumption or implementation.
+ /// No guarantees are made about forward compatibility of the type's functionality and its usage.
+ /// </remarks>
+ [EditorBrowsable(EditorBrowsableState.Never)]
+ public class SystemClockChangedEventArgs : EventArgs
+ {
+ /// <summary>
+ /// Creates a new system clock notification object with unknown old and new times.
+ /// </summary>
+ public SystemClockChangedEventArgs()
+ : this(DateTimeOffset.MinValue, DateTimeOffset.MaxValue)
+ {
+ }
+
+ /// <summary>
+ /// Creates a new system clock notification object with the specified old and new times.
+ /// </summary>
+ /// <param name="oldTime">Time before the system clock changed, or DateTimeOffset.MinValue if not known.</param>
+ /// <param name="newTime">Time after the system clock changed, or DateTimeOffset.MaxValue if not known.</param>
+ public SystemClockChangedEventArgs(DateTimeOffset oldTime, DateTimeOffset newTime)
+ {
+ OldTime = oldTime;
+ NewTime = newTime;
+ }
+
+ /// <summary>
+ /// Gets the time before the system clock changed, or DateTimeOffset.MinValue if not known.
+ /// </summary>
+ public DateTimeOffset OldTime { get; private set; }
+
+ /// <summary>
+ /// Gets the time after the system clock changed, or DateTimeOffset.MaxValue if not known.
+ /// </summary>
+ public DateTimeOffset NewTime { get; private set; }
+ }
+}