diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Core/Reactive/Internal')
25 files changed, 3178 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs new file mode 100644 index 0000000..a2237ae --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Concurrency; + +namespace System.Reactive +{ + internal class AsyncLockObserver<T> : ObserverBase<T> + { + private readonly AsyncLock _gate; + private readonly IObserver<T> _observer; + + public AsyncLockObserver(IObserver<T> observer, AsyncLock gate) + { + _gate = gate; + _observer = observer; + } + + protected override void OnNextCore(T value) + { + _gate.Wait(() => + { + _observer.OnNext(value); + }); + } + + protected override void OnErrorCore(Exception exception) + { + _gate.Wait(() => + { + _observer.OnError(exception); + }); + } + + protected override void OnCompletedCore() + { + _gate.Wait(() => + { + _observer.OnCompleted(); + }); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs new file mode 100644 index 0000000..b4d67dc --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Disposables; + +namespace System.Reactive +{ + class AutoDetachObserver<T> : ObserverBase<T> + { + private readonly IObserver<T> observer; + private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable(); + + public AutoDetachObserver(IObserver<T> observer) + { + this.observer = observer; + } + + public IDisposable Disposable + { + set { m.Disposable = value; } + } + + protected override void OnNextCore(T value) + { + // + // Safeguarding of the pipeline against rogue observers is required for proper + // resource cleanup. Consider the following example: + // + // var xs = Observable.Interval(TimeSpan.FromSeconds(1)); + // var ys = <some random sequence>; + // var res = xs.CombineLatest(ys, (x, y) => x + y); + // + // The marble diagram of the query above looks as follows: + // + // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---... + // | | | | | | | | | + // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---... + // | | | | | | | | | | | | | | + // v v v v v v v v v v v v v v + // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---... + // | + // @#& + // + // Notice the free-threaded nature of Rx, where messages on the resulting sequence + // are produced by either of the two input sequences to CombineLatest. + // + // Now assume an exception happens in the OnNext callback for the observer of res, + // at the indicated point marked with @#& above. The callback runs in the context + // of ys, so the exception will take down the scheduler thread of ys. This by + // itself is a problem (that can be mitigated by a Catch operator on IScheduler), + // but notice how the timer that produces xs is kept alive. + // + // The safe-guarding code below ensures the acquired resources are disposed when + // the user callback throws. + // + var __noError = false; + try + { + observer.OnNext(value); + __noError = true; + } + finally + { + if (!__noError) + Dispose(); + } + } + + protected override void OnErrorCore(Exception exception) + { + try + { + observer.OnError(exception); + } + finally + { + Dispose(); + } + } + + protected override void OnCompletedCore() + { + try + { + observer.OnCompleted(); + } + finally + { + Dispose(); + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + m.Dispose(); + } + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs new file mode 100644 index 0000000..ca201f4 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Threading; + +namespace System.Reactive +{ + internal class CheckedObserver<T> : IObserver<T> + { + private readonly IObserver<T> _observer; + private int _state; + + private const int IDLE = 0; + private const int BUSY = 1; + private const int DONE = 2; + + public CheckedObserver(IObserver<T> observer) + { + _observer = observer; + } + + public void OnNext(T value) + { + CheckAccess(); + + try + { + _observer.OnNext(value); + } + finally + { + Interlocked.Exchange(ref _state, IDLE); + } + } + + public void OnError(Exception error) + { + CheckAccess(); + + try + { + _observer.OnError(error); + } + finally + { + Interlocked.Exchange(ref _state, DONE); + } + } + + public void OnCompleted() + { + CheckAccess(); + + try + { + _observer.OnCompleted(); + } + finally + { + Interlocked.Exchange(ref _state, DONE); + } + } + + private void CheckAccess() + { + switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE)) + { + case BUSY: + throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED); + case DONE: + throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs new file mode 100644 index 0000000..8b7ec81 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs @@ -0,0 +1,576 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +/* + * WARNING: Auto-generated file (7/18/2012 4:59:53 PM) + * + * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentDictionary.cs + */ + +#if NO_CDS_COLLECTIONS + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics.CodeAnalysis; +using System.Reflection; +using System.Threading; + +namespace System.Collections.Concurrent +{ + internal class ConcurrentDictionary<TKey, TValue> + { + /* >>> Code copied from the Array class */ + + // We impose limits on maximum array lenght in each dimension to allow efficient + // implementation of advanced range check elimination in future. + // Keep in sync with vm\gcscan.cpp and HashHelpers.MaxPrimeArrayLength. + internal const int MaxArrayLength = 0X7FEFFFFF; + + /* <<< Code copied from the Array class */ + + private class Tables + { + internal readonly Node[] m_buckets; // A singly-linked list for each bucket. + internal readonly object[] m_locks; // A set of locks, each guarding a section of the table. + internal volatile int[] m_countPerLock; // The number of elements guarded by each lock. + + internal Tables(Node[] buckets, object[] locks, int[] countPerLock) + { + m_buckets = buckets; + m_locks = locks; + m_countPerLock = countPerLock; + } + } + + private volatile Tables m_tables; // Internal tables of the dictionary + private readonly IEqualityComparer<TKey> m_comparer; // Key equality comparer + private readonly bool m_growLockArray; // Whether to dynamically increase the size of the striped lock + private int m_budget; // The maximum number of elements per lock before a resize operation is triggered + + // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the + // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference + // and blocking, but also the more expensive operations that require all locks become (e.g. table + // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good + // compromise. + private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4; + + // The default capacity, i.e. the initial # of buckets. When choosing this value, we are making + // a trade-off between the size of a very small dictionary, and the number of resizes when + // constructing a large dictionary. Also, the capacity should not be divisible by a small prime. + private const int DEFAULT_CAPACITY = 31; + + // The maximum size of the striped lock that will not be exceeded when locks are automatically + // added as the dictionary grows. However, the user is allowed to exceed this limit by passing + // a concurrency level larger than MAX_LOCK_NUMBER into the constructor. + private const int MAX_LOCK_NUMBER = 1024; + + // Whether TValue is a type that can be written atomically (i.e., with no danger of torn reads) + private static readonly bool s_isValueWriteAtomic = IsValueWriteAtomic(); + + private static bool IsValueWriteAtomic() + { + Type valueType = typeof(TValue); + + // + // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without + // the risk of tearing. + // + // See http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-335.pdf + // + bool isAtomic = + (valueType.GetTypeInfo().IsClass) + || valueType == typeof(Boolean) + || valueType == typeof(Char) + || valueType == typeof(Byte) + || valueType == typeof(SByte) + || valueType == typeof(Int16) + || valueType == typeof(UInt16) + || valueType == typeof(Int32) + || valueType == typeof(UInt32) + || valueType == typeof(Single); + + if (!isAtomic && IntPtr.Size == 8) + { + isAtomic |= valueType == typeof(Double) || valueType == typeof(Int64); + } + + return isAtomic; + } + + public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { } + + internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) + { + if (concurrencyLevel < 1) + { + throw new ArgumentOutOfRangeException("concurrencyLevel"); + } + if (capacity < 0) + { + throw new ArgumentOutOfRangeException("capacity"); + } + if (comparer == null) throw new ArgumentNullException("comparer"); + + // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard + // any buckets. + if (capacity < concurrencyLevel) + { + capacity = concurrencyLevel; + } + + object[] locks = new object[concurrencyLevel]; + for (int i = 0; i < locks.Length; i++) + { + locks[i] = new object(); + } + + int[] countPerLock = new int[locks.Length]; + Node[] buckets = new Node[capacity]; + m_tables = new Tables(buckets, locks, countPerLock); + + m_comparer = comparer; + m_growLockArray = growLockArray; + m_budget = buckets.Length / locks.Length; + } + + public bool TryAdd(TKey key, TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + TValue dummy; + return TryAddInternal(key, value, false, true, out dummy); + } + + public bool TryRemove(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + return TryRemoveInternal(key, out value, false, default(TValue)); + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) + { + while (true) + { + Tables tables = m_tables; + + int bucketNo, lockNo; + GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + lock (tables.m_locks[lockNo]) + { + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + Node prev = null; + for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next) + { + if (m_comparer.Equals(curr.m_key, key)) + { + if (matchValue) + { + bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value); + if (!valuesMatch) + { + value = default(TValue); + return false; + } + } + + if (prev == null) + { + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], curr.m_next); + } + else + { + prev.m_next = curr.m_next; + } + + value = curr.m_value; + tables.m_countPerLock[lockNo]--; + return true; + } + prev = curr; + } + } + + value = default(TValue); + return false; + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + public bool TryGetValue(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + int bucketNo, lockNoUnused; + + // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize. + Tables tables = m_tables; + + GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNoUnused, tables.m_buckets.Length, tables.m_locks.Length); + + // We can get away w/out a lock here. + // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i]. + Node n = Volatile.Read<Node>(ref tables.m_buckets[bucketNo]); + + while (n != null) + { + if (m_comparer.Equals(n.m_key, key)) + { + value = n.m_value; + return true; + } + n = n.m_next; + } + + value = default(TValue); + return false; + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) + { + int hashcode = m_comparer.GetHashCode(key); + + while (true) + { + int bucketNo, lockNo; + + Tables tables = m_tables; + GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + bool resizeDesired = false; + bool lockTaken = false; + try + { + if (acquireLock) + Monitor.Enter(tables.m_locks[lockNo], ref lockTaken); + + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + // Try to find this key in the bucket + Node prev = null; + for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) + { + if (m_comparer.Equals(node.m_key, key)) + { + // The key was found in the dictionary. If updates are allowed, update the value for that key. + // We need to create a new node for the update, in order to support TValue types that cannot + // be written atomically, since lock-free reads may be happening concurrently. + if (updateIfExists) + { + if (s_isValueWriteAtomic) + { + node.m_value = value; + } + else + { + Node newNode = new Node(node.m_key, value, hashcode, node.m_next); + if (prev == null) + { + tables.m_buckets[bucketNo] = newNode; + } + else + { + prev.m_next = newNode; + } + } + resultingValue = value; + } + else + { + resultingValue = node.m_value; + } + return false; + } + prev = node; + } + + // The key was not found in the bucket. Insert the key-value pair. + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo])); + checked + { + tables.m_countPerLock[lockNo]++; + } + + // + // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. + // It is also possible that GrowTable will increase the budget but won't resize the bucket table. + // That happens if the bucket table is found to be poorly utilized due to a bad hash function. + // + if (tables.m_countPerLock[lockNo] > m_budget) + { + resizeDesired = true; + } + } + finally + { + if (lockTaken) + Monitor.Exit(tables.m_locks[lockNo]); + } + + // + // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. + // + // Concurrency notes: + // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. + // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 + // and then verify that the table we passed to it as the argument is still the current table. + // + if (resizeDesired) + { + GrowTable(tables); + } + + resultingValue = value; + return true; + } + } + + public ICollection<TValue> Values + { + get { return GetValues(); } + } + + private void GrowTable(Tables tables) + { + int locksAcquired = 0; + try + { + // The thread that first obtains m_locks[0] will be the one doing the resize operation + AcquireLocks(0, 1, ref locksAcquired); + + // Make sure nobody resized the table while we were waiting for lock 0: + if (tables != m_tables) + { + // We assume that since the table reference is different, it was already resized (or the budget + // was adjusted). If we ever decide to do table shrinking, or replace the table for other reasons, + // we will have to revisit this logic. + return; + } + + // Compute the (approx.) total size. Use an Int64 accumulation variable to avoid an overflow. + long approxCount = 0; + for (int i = 0; i < tables.m_countPerLock.Length; i++) + { + approxCount += tables.m_countPerLock[i]; + } + + // + // If the bucket array is too empty, double the budget instead of resizing the table + // + if (approxCount < tables.m_buckets.Length / 4) + { + m_budget = 2 * m_budget; + if (m_budget < 0) + { + m_budget = int.MaxValue; + } + return; + } + + + // Compute the new table size. We find the smallest integer larger than twice the previous table size, and not divisible by + // 2,3,5 or 7. We can consider a different table-sizing policy in the future. + int newLength = 0; + bool maximizeTableSize = false; + try + { + checked + { + // Double the size of the buckets table and add one, so that we have an odd integer. + newLength = tables.m_buckets.Length * 2 + 1; + + // Now, we only need to check odd integers, and find the first that is not divisible + // by 3, 5 or 7. + while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) + { + newLength += 2; + } + + if (newLength > MaxArrayLength) + { + maximizeTableSize = true; + } + } + } + catch (OverflowException) + { + maximizeTableSize = true; + } + + if (maximizeTableSize) + { + newLength = MaxArrayLength; + + // We want to make sure that GrowTable will not be called again, since table is at the maximum size. + // To achieve that, we set the budget to int.MaxValue. + // + // (There is one special case that would allow GrowTable() to be called in the future: + // calling Clear() on the ConcurrentDictionary will shrink the table and lower the budget.) + m_budget = int.MaxValue; + } + + // Now acquire all other locks for the table + AcquireLocks(1, tables.m_locks.Length, ref locksAcquired); + + object[] newLocks = tables.m_locks; + + // Add more locks + if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER) + { + newLocks = new object[tables.m_locks.Length * 2]; + Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length); + + for (int i = tables.m_locks.Length; i < newLocks.Length; i++) + { + newLocks[i] = new object(); + } + } + + Node[] newBuckets = new Node[newLength]; + int[] newCountPerLock = new int[newLocks.Length]; + + // Copy all data into a new table, creating new nodes for all elements + for (int i = 0; i < tables.m_buckets.Length; i++) + { + Node current = tables.m_buckets[i]; + while (current != null) + { + Node next = current.m_next; + int newBucketNo, newLockNo; + GetBucketAndLockNo(current.m_hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); + + newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, current.m_hashcode, newBuckets[newBucketNo]); + + checked + { + newCountPerLock[newLockNo]++; + } + + current = next; + } + } + + // Adjust the budget + m_budget = Math.Max(1, newBuckets.Length / newLocks.Length); + + // Replace tables with the new versions + m_tables = new Tables(newBuckets, newLocks, newCountPerLock); + } + finally + { + // Release all locks that we took earlier + ReleaseLocks(0, locksAcquired); + } + } + + private void GetBucketAndLockNo( + int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) + { + bucketNo = (hashcode & 0x7fffffff) % bucketCount; + lockNo = bucketNo % lockCount; + } + + private static int DefaultConcurrencyLevel + { + get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; } + } + + private void AcquireAllLocks(ref int locksAcquired) + { + // First, acquire lock 0 + AcquireLocks(0, 1, ref locksAcquired); + + // Now that we have lock 0, the m_locks array will not change (i.e., grow), + // and so we can safely read m_locks.Length. + AcquireLocks(1, m_tables.m_locks.Length, ref locksAcquired); + } + + private void AcquireLocks(int fromInclusive, int toExclusive, ref int locksAcquired) + { + object[] locks = m_tables.m_locks; + + for (int i = fromInclusive; i < toExclusive; i++) + { + bool lockTaken = false; + try + { + Monitor.Enter(locks[i], ref lockTaken); + } + finally + { + if (lockTaken) + { + locksAcquired++; + } + } + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private void ReleaseLocks(int fromInclusive, int toExclusive) + { + for (int i = fromInclusive; i < toExclusive; i++) + { + Monitor.Exit(m_tables.m_locks[i]); + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + private ReadOnlyCollection<TValue> GetValues() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + List<TValue> values = new List<TValue>(); + + for (int i = 0; i < m_tables.m_buckets.Length; i++) + { + Node current = m_tables.m_buckets[i]; + while (current != null) + { + values.Add(current.m_value); + current = current.m_next; + } + } + + return new ReadOnlyCollection<TValue>(values); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + private class Node + { + internal TKey m_key; + internal TValue m_value; + internal volatile Node m_next; + internal int m_hashcode; + + internal Node(TKey key, TValue value, int hashcode, Node next) + { + m_key = key; + m_value = value; + m_next = next; + m_hashcode = hashcode; + } + } + } +} + +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs new file mode 100644 index 0000000..76ab088 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs @@ -0,0 +1,316 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +/* + * WARNING: Auto-generated file (7/18/2012 4:47:38 PM) + * + * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentQueue.cs + */ + +#if NO_CDS_COLLECTIONS + +#pragma warning disable 0420 + +using System; +using System.Collections.Generic; +using System.Diagnostics.Contracts; +using System.Threading; + +namespace System.Collections.Concurrent +{ + internal class ConcurrentQueue<T> + { + private volatile Segment m_head; + private volatile Segment m_tail; + + private const int SEGMENT_SIZE = 32; + + public ConcurrentQueue() + { + m_head = m_tail = new Segment(0, this); + } + + public bool IsEmpty + { + get + { + Segment head = m_head; + if (!head.IsEmpty) + //fast route 1: + //if current head is not empty, then queue is not empty + return false; + else if (head.Next == null) + //fast route 2: + //if current head is empty and it's the last segment + //then queue is empty + return true; + else + //slow route: + //current head is empty and it is NOT the last segment, + //it means another thread is growing new segment + { + SpinWait spin = new SpinWait(); + while (head.IsEmpty) + { + if (head.Next == null) + return true; + + spin.SpinOnce(); + head = m_head; + } + return false; + } + } + } + + public void Enqueue(T item) + { + SpinWait spin = new SpinWait(); + while (true) + { + Segment tail = m_tail; + if (tail.TryAppend(item)) + return; + spin.SpinOnce(); + } + } + + public bool TryDequeue(out T result) + { + while (!IsEmpty) + { + Segment head = m_head; + if (head.TryRemove(out result)) + return true; + //since method IsEmpty spins, we don't need to spin in the while loop + } + result = default(T); + return false; + } + + private class Segment + { + //we define two volatile arrays: m_array and m_state. Note that the accesses to the array items + //do not get volatile treatment. But we don't need to worry about loading adjacent elements or + //store/load on adjacent elements would suffer reordering. + // - Two stores: these are at risk, but CLRv2 memory model guarantees store-release hence we are safe. + // - Two loads: because one item from two volatile arrays are accessed, the loads of the array references + // are sufficient to prevent reordering of the loads of the elements. + internal volatile T[] m_array; + + // For each entry in m_array, the corresponding entry in m_state indicates whether this position contains + // a valid value. m_state is initially all false. + internal volatile VolatileBool[] m_state; + + //pointer to the next segment. null if the current segment is the last segment + private volatile Segment m_next; + + //We use this zero based index to track how many segments have been created for the queue, and + //to compute how many active segments are there currently. + // * The number of currently active segments is : m_tail.m_index - m_head.m_index + 1; + // * m_index is incremented with every Segment.Grow operation. We use Int64 type, and we can safely + // assume that it never overflows. To overflow, we need to do 2^63 increments, even at a rate of 4 + // billion (2^32) increments per second, it takes 2^31 seconds, which is about 64 years. + internal readonly long m_index; + + //indices of where the first and last valid values + // - m_low points to the position of the next element to pop from this segment, range [0, infinity) + // m_low >= SEGMENT_SIZE implies the segment is disposable + // - m_high points to the position of the latest pushed element, range [-1, infinity) + // m_high == -1 implies the segment is new and empty + // m_high >= SEGMENT_SIZE-1 means this segment is ready to grow. + // and the thread who sets m_high to SEGMENT_SIZE-1 is responsible to grow the segment + // - Math.Min(m_low, SEGMENT_SIZE) > Math.Min(m_high, SEGMENT_SIZE-1) implies segment is empty + // - initially m_low =0 and m_high=-1; + private volatile int m_low; + private volatile int m_high; + + private volatile ConcurrentQueue<T> m_source; + + internal Segment(long index, ConcurrentQueue<T> source) + { + m_array = new T[SEGMENT_SIZE]; + m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false + m_high = -1; + Contract.Assert(index >= 0); + m_index = index; + m_source = source; + } + + internal Segment Next + { + get { return m_next; } + } + + internal bool IsEmpty + { + get { return (Low > High); } + } + + internal void UnsafeAdd(T value) + { + Contract.Assert(m_high < SEGMENT_SIZE - 1); + m_high++; + m_array[m_high] = value; + m_state[m_high].m_value = true; + } + + internal Segment UnsafeGrow() + { + Contract.Assert(m_high >= SEGMENT_SIZE - 1); + Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow + m_next = newSegment; + return newSegment; + } + + internal void Grow() + { + //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) + Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow + m_next = newSegment; + Contract.Assert(m_source.m_tail == this); + m_source.m_tail = m_next; + } + + internal bool TryAppend(T value) + { + //quickly check if m_high is already over the boundary, if so, bail out + if (m_high >= SEGMENT_SIZE - 1) + { + return false; + } + + //Now we will use a CAS to increment m_high, and store the result in newhigh. + //Depending on how many free spots left in this segment and how many threads are doing this Increment + //at this time, the returning "newhigh" can be + // 1) < SEGMENT_SIZE - 1 : we took a spot in this segment, and not the last one, just insert the value + // 2) == SEGMENT_SIZE - 1 : we took the last spot, insert the value AND grow the segment + // 3) > SEGMENT_SIZE - 1 : we failed to reserve a spot in this segment, we return false to + // Queue.Enqueue method, telling it to try again in the next segment. + + int newhigh = SEGMENT_SIZE; //initial value set to be over the boundary + + //We need do Interlocked.Increment and value/state update in a finally block to ensure that they run + //without interuption. This is to prevent anything from happening between them, and another dequeue + //thread maybe spinning forever to wait for m_state[] to be true; + try + { } + finally + { + newhigh = Interlocked.Increment(ref m_high); + if (newhigh <= SEGMENT_SIZE - 1) + { + m_array[newhigh] = value; + m_state[newhigh].m_value = true; + } + + //if this thread takes up the last slot in the segment, then this thread is responsible + //to grow a new segment. Calling Grow must be in the finally block too for reliability reason: + //if thread abort during Grow, other threads will be left busy spinning forever. + if (newhigh == SEGMENT_SIZE - 1) + { + Grow(); + } + } + + //if newhigh <= SEGMENT_SIZE-1, it means the current thread successfully takes up a spot + return newhigh <= SEGMENT_SIZE - 1; + } + + internal bool TryRemove(out T result) + { + SpinWait spin = new SpinWait(); + int lowLocal = Low, highLocal = High; + while (lowLocal <= highLocal) + { + //try to update m_low + if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) + { + //if the specified value is not available (this spot is taken by a push operation, + // but the value is not written into yet), then spin + SpinWait spinLocal = new SpinWait(); + while (!m_state[lowLocal].m_value) + { + spinLocal.SpinOnce(); + } + result = m_array[lowLocal]; + m_array[lowLocal] = default(T); //release the reference to the object. + + //if the current thread sets m_low to SEGMENT_SIZE, which means the current segment becomes + //disposable, then this thread is responsible to dispose this segment, and reset m_head + if (lowLocal + 1 >= SEGMENT_SIZE) + { + // Invariant: we only dispose the current m_head, not any other segment + // In usual situation, disposing a segment is simply seting m_head to m_head.m_next + // But there is one special case, where m_head and m_tail points to the same and ONLY + //segment of the queue: Another thread A is doing Enqueue and finds that it needs to grow, + //while the *current* thread is doing *this* Dequeue operation, and finds that it needs to + //dispose the current (and ONLY) segment. Then we need to wait till thread A finishes its + //Grow operation, this is the reason of having the following while loop + spinLocal = new SpinWait(); + while (m_next == null) + { + spinLocal.SpinOnce(); + } + Contract.Assert(m_source.m_head == this); + m_source.m_head = m_next; + } + return true; + } + else + { + //CAS failed due to contention: spin briefly and retry + spin.SpinOnce(); + lowLocal = Low; highLocal = High; + } + }//end of while + result = default(T); + return false; + } + + internal bool TryPeek(out T result) + { + result = default(T); + int lowLocal = Low; + if (lowLocal > High) + return false; + SpinWait spin = new SpinWait(); + while (!m_state[lowLocal].m_value) + { + spin.SpinOnce(); + } + result = m_array[lowLocal]; + return true; + } + + internal int Low + { + get + { + return Math.Min(m_low, SEGMENT_SIZE); + } + } + + internal int High + { + get + { + //if m_high > SEGMENT_SIZE, it means it's out of range, we should return + //SEGMENT_SIZE-1 as the logical position + return Math.Min(m_high, SEGMENT_SIZE - 1); + } + } + + } + }//end of class Segment + + struct VolatileBool + { + public VolatileBool(bool value) + { + m_value = value; + } + public volatile bool m_value; + } +} + +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs new file mode 100644 index 0000000..a43acb2 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + // We can't make those based on the Strings_Core.resx file, because the ObsoleteAttribute needs a compile-time constant. + + class Constants_Core + { + private const string OBSOLETE_REFACTORING = "This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies."; + + public const string OBSOLETE_SCHEDULER_NEWTHREAD = OBSOLETE_REFACTORING + " Please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use NewThreadScheduler.Default to obtain an instance of this scheduler type. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + public const string OBSOLETE_SCHEDULER_TASKPOOL = OBSOLETE_REFACTORING + " Please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use TaskPoolScheduler.Default to obtain an instance of this scheduler type. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + public const string OBSOLETE_SCHEDULER_THREADPOOL = OBSOLETE_REFACTORING + " Consider using Scheduler.Default to obtain the platform's most appropriate pool-based scheduler. In order to access a specific pool-based scheduler, please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use the appropriate scheduler in the System.Reactive.Concurrency namespace. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + + public const string OBSOLETE_SCHEDULEREQUIRED = "This instance property is no longer supported. Use CurrentThreadScheduler.IsScheduleRequired instead. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs new file mode 100644 index 0000000..209bd55 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if HAS_EDI +namespace System.Reactive.PlatformServices +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class DefaultExceptionServices/*Impl*/ : IExceptionServices + { + public void Rethrow(Exception exception) + { + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw(); + } + } +} +#else +namespace System.Reactive.PlatformServices +{ + internal class DefaultExceptionServices : IExceptionServices + { + public void Rethrow(Exception exception) + { + throw exception; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs new file mode 100644 index 0000000..1fceeba --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reactive.PlatformServices; + +namespace System.Reactive +{ + internal static class ExceptionHelpers + { + private static Lazy<IExceptionServices> s_services = new Lazy<IExceptionServices>(Initialize); + + public static void Throw(this Exception exception) + { + s_services.Value.Rethrow(exception); + } + + public static void ThrowIfNotNull(this Exception exception) + { + if (exception != null) + s_services.Value.Rethrow(exception); + } + + private static IExceptionServices Initialize() + { + return PlatformEnlightenmentProvider.Current.GetService<IExceptionServices>() ?? new DefaultExceptionServices(); + } + } +} + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Services to rethrow exceptions. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IExceptionServices + { + /// <summary> + /// Rethrows the specified exception. + /// </summary> + /// <param name="exception">Exception to rethrow.</param> + void Rethrow(Exception exception); + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs new file mode 100644 index 0000000..a8f6a7a --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Threading; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provides access to the host's lifecycle management services. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public static class HostLifecycleService + { + private static Lazy<IHostLifecycleNotifications> s_notifications = new Lazy<IHostLifecycleNotifications>(InitializeNotifications); + + private static int _refCount; + + /// <summary> + /// Event that gets raised when the host suspends the application. + /// </summary> + public static event EventHandler<HostSuspendingEventArgs> Suspending; + + /// <summary> + /// Event that gets raised when the host resumes the application. + /// </summary> + public static event EventHandler<HostResumingEventArgs> Resuming; + + /// <summary> + /// Adds a reference to the host lifecycle manager, causing it to be sending notifications. + /// </summary> + public static void AddRef() + { + if (Interlocked.Increment(ref _refCount) == 1) + { + var notifications = s_notifications.Value; + if (notifications != null) + { + notifications.Suspending += OnSuspending; + notifications.Resuming += OnResuming; + } + } + } + + /// <summary> + /// Removes a reference to the host lifecycle manager, causing it to stop sending notifications + /// if the removed reference was the last one. + /// </summary> + public static void Release() + { + if (Interlocked.Decrement(ref _refCount) == 0) + { + var notifications = s_notifications.Value; + if (notifications != null) + { + notifications.Suspending -= OnSuspending; + notifications.Resuming -= OnResuming; + } + } + } + + private static void OnSuspending(object sender, HostSuspendingEventArgs e) + { + var suspending = Suspending; + if (suspending != null) + suspending(sender, e); + } + + private static void OnResuming(object sender, HostResumingEventArgs e) + { + var resuming = Resuming; + if (resuming != null) + resuming(sender, e); + } + + private static IHostLifecycleNotifications InitializeNotifications() + { + return PlatformEnlightenmentProvider.Current.GetService<IHostLifecycleNotifications>(); + } + } + + /// <summary> + /// (Infrastructure) Provides notifications about the host's lifecycle events. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IHostLifecycleNotifications + { + /// <summary> + /// Event that gets raised when the host suspends. + /// </summary> + event EventHandler<HostSuspendingEventArgs> Suspending; + + /// <summary> + /// Event that gets raised when the host resumes. + /// </summary> + event EventHandler<HostResumingEventArgs> Resuming; + } + + /// <summary> + /// (Infrastructure) Event arguments for host suspension events. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class HostSuspendingEventArgs : EventArgs + { + } + + /// <summary> + /// (Infrastructure) Event arguments for host resumption events. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class HostResumingEventArgs : EventArgs + { + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs new file mode 100644 index 0000000..5cc9b2d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal class ImmutableList<T> + { + T[] data; + + public ImmutableList() + { + data = new T[0]; + } + + public ImmutableList(T[] data) + { + this.data = data; + } + + public ImmutableList<T> Add(T value) + { + var newData = new T[data.Length + 1]; + Array.Copy(data, newData, data.Length); + newData[data.Length] = value; + return new ImmutableList<T>(newData); + } + + public ImmutableList<T> Remove(T value) + { + var i = IndexOf(value); + if (i < 0) + return this; + var newData = new T[data.Length - 1]; + Array.Copy(data, 0, newData, 0, i); + Array.Copy(data, i + 1, newData, i, data.Length - i - 1); + return new ImmutableList<T>(newData); + } + + public int IndexOf(T value) + { + for (var i = 0; i < data.Length; ++i) + if (data[i].Equals(value)) + return i; + return -1; + } + + public T[] Data + { + get { return data; } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs new file mode 100644 index 0000000..4094dce --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_LAZY +#pragma warning disable 0420 + +// +// Based on ndp\clr\src\BCL\System\Lazy.cs but with LazyThreadSafetyMode.ExecutionAndPublication mode behavior hardcoded. +// + +using System.Diagnostics; +using System.Threading; +using System.Reactive; + +namespace System +{ + internal class Lazy<T> + { + class Boxed + { + internal Boxed(T value) + { + m_value = value; + } + + internal T m_value; + } + + static Func<T> ALREADY_INVOKED_SENTINEL = delegate { return default(T); }; + + private object m_boxed; + private Func<T> m_valueFactory; + private volatile object m_threadSafeObj; + + public Lazy(Func<T> valueFactory) + { + m_threadSafeObj = new object(); + m_valueFactory = valueFactory; + } + +#if !NO_DEBUGGER_ATTRIBUTES + [DebuggerBrowsable(DebuggerBrowsableState.Never)] +#endif + public T Value + { + get + { + Boxed boxed = null; + if (m_boxed != null) + { + boxed = m_boxed as Boxed; + if (boxed != null) + { + return boxed.m_value; + } + + var exc = m_boxed as Exception; + exc.Throw(); + } + + return LazyInitValue(); + } + } + + private T LazyInitValue() + { + Boxed boxed = null; + object threadSafeObj = m_threadSafeObj; + bool lockTaken = false; + try + { + if (threadSafeObj != (object)ALREADY_INVOKED_SENTINEL) + { + Monitor.Enter(threadSafeObj); + lockTaken = true; + } + + if (m_boxed == null) + { + boxed = CreateValue(); + m_boxed = boxed; + m_threadSafeObj = ALREADY_INVOKED_SENTINEL; + } + else + { + boxed = m_boxed as Boxed; + if (boxed == null) + { + var exc = m_boxed as Exception; + exc.Throw(); + } + } + } + finally + { + if (lockTaken) + Monitor.Exit(threadSafeObj); + } + + return boxed.m_value; + } + + private Boxed CreateValue() + { + Boxed boxed = null; + try + { + if (m_valueFactory == ALREADY_INVOKED_SENTINEL) + throw new InvalidOperationException(); + + Func<T> factory = m_valueFactory; + m_valueFactory = ALREADY_INVOKED_SENTINEL; + + boxed = new Boxed(factory()); + } + catch (Exception ex) + { + m_boxed = ex; + throw; + } + + return boxed; + } + } +} +#pragma warning restore 0420 +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs new file mode 100644 index 0000000..6a2e2f8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal class NopObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Instance = new NopObserver<T>(); + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + } + + internal class DoneObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Completed = new DoneObserver<T>(); + + public Exception Exception { get; set; } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + } + + internal class DisposedObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Instance = new DisposedObserver<T>(); + + public void OnCompleted() + { + throw new ObjectDisposedException(""); + } + + public void OnError(Exception error) + { + throw new ObjectDisposedException(""); + } + + public void OnNext(T value) + { + throw new ObjectDisposedException(""); + } + } + + internal class Observer<T> : IObserver<T> + { + private readonly ImmutableList<IObserver<T>> _observers; + + public Observer(ImmutableList<IObserver<T>> observers) + { + _observers = observers; + } + + public void OnCompleted() + { + foreach (var observer in _observers.Data) + observer.OnCompleted(); + } + + public void OnError(Exception error) + { + foreach (var observer in _observers.Data) + observer.OnError(error); + } + + public void OnNext(T value) + { + foreach (var observer in _observers.Data) + observer.OnNext(value); + } + + internal IObserver<T> Add(IObserver<T> observer) + { + return new Observer<T>(_observers.Add(observer)); + } + + internal IObserver<T> Remove(IObserver<T> observer) + { + var i = Array.IndexOf(_observers.Data, observer); + if (i < 0) + return this; + + if (_observers.Data.Length == 2) + { + return _observers.Data[1 - i]; + } + else + { + return new Observer<T>(_observers.Remove(observer)); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs new file mode 100644 index 0000000..f2483e8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reflection; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Interface for enlightenment providers. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IPlatformEnlightenmentProvider + { + /// <summary> + /// (Infastructure) Tries to gets the specified service. + /// </summary> + /// <typeparam name="T">Service type.</typeparam> + /// <param name="args">Optional set of arguments.</param> + /// <returns>Service instance or null if not found.</returns> + T GetService<T>(params object[] args) where T : class; + } + + /// <summary> + /// (Infrastructure) Provider for platform-specific framework enlightenments. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public static class PlatformEnlightenmentProvider + { + private static readonly object s_gate = new object(); + private static IPlatformEnlightenmentProvider s_current; + + /// <summary> + /// (Infrastructure) Gets the current enlightenment provider. If none is loaded yet, accessing this property triggers provider resolution. + /// </summary> + /// <remarks> + /// This member is used by the Rx infrastructure and not meant for public consumption or implementation. + /// </remarks> + public static IPlatformEnlightenmentProvider Current + { + get + { + if (s_current == null) + { + lock (s_gate) + { + if (s_current == null) + { + // + // TODO: Investigate whether we can simplify this logic to just use "System.Reactive.PlatformServices.PlatformEnlightenmentProvider, System.Reactive.PlatformServices". + // It turns out this doesn't quite work on Silverlight. On the other hand, in .NET Compact Framework 3.5, we mysteriously have to use that path. + // + +#if NETCF35 + var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, System.Reactive.PlatformServices"; +#else +#if CRIPPLED_REFLECTION + var ifType = typeof(IPlatformEnlightenmentProvider).GetTypeInfo(); +#else + var ifType = typeof(IPlatformEnlightenmentProvider); +#endif + var asm = new AssemblyName(ifType.Assembly.FullName); + asm.Name = "System.Reactive.PlatformServices"; + var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, " + asm.FullName; +#endif + + var t = Type.GetType(name, false); + if (t != null) + s_current = (IPlatformEnlightenmentProvider)Activator.CreateInstance(t); + else + s_current = new DefaultPlatformEnlightenmentProvider(); + } + } + } + + return s_current; + } + + set + { + lock (s_gate) + { + s_current = value; + } + } + } + } + + class DefaultPlatformEnlightenmentProvider : IPlatformEnlightenmentProvider + { + public T GetService<T>(object[] args) where T : class + { + return null; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs new file mode 100644 index 0000000..8a02cd5 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; +using System.Collections.Generic; + +namespace System.Reactive +{ + internal class PriorityQueue<T> where T : IComparable<T> + { +#if !NO_INTERLOCKED_64 + private static long _count = long.MinValue; +#else + private static int _count = int.MinValue; +#endif + private IndexedItem[] _items; + private int _size; + + public PriorityQueue() + : this(16) + { + } + + public PriorityQueue(int capacity) + { + _items = new IndexedItem[capacity]; + _size = 0; + } + + private bool IsHigherPriority(int left, int right) + { + return _items[left].CompareTo(_items[right]) < 0; + } + + private void Percolate(int index) + { + if (index >= _size || index < 0) + return; + var parent = (index - 1) / 2; + if (parent < 0 || parent == index) + return; + + if (IsHigherPriority(index, parent)) + { + var temp = _items[index]; + _items[index] = _items[parent]; + _items[parent] = temp; + Percolate(parent); + } + } + + private void Heapify() + { + Heapify(0); + } + + private void Heapify(int index) + { + if (index >= _size || index < 0) + return; + + var left = 2 * index + 1; + var right = 2 * index + 2; + var first = index; + + if (left < _size && IsHigherPriority(left, first)) + first = left; + if (right < _size && IsHigherPriority(right, first)) + first = right; + if (first != index) + { + var temp = _items[index]; + _items[index] = _items[first]; + _items[first] = temp; + Heapify(first); + } + } + + public int Count { get { return _size; } } + + public T Peek() + { + if (_size == 0) + throw new InvalidOperationException(Strings_Core.HEAP_EMPTY); + + return _items[0].Value; + } + + private void RemoveAt(int index) + { + _items[index] = _items[--_size]; + _items[_size] = default(IndexedItem); + Heapify(); + if (_size < _items.Length / 4) + { + var temp = _items; + _items = new IndexedItem[_items.Length / 2]; + Array.Copy(temp, 0, _items, 0, _size); + } + } + + public T Dequeue() + { + var result = Peek(); + RemoveAt(0); + return result; + } + + public void Enqueue(T item) + { + if (_size >= _items.Length) + { + var temp = _items; + _items = new IndexedItem[_items.Length * 2]; + Array.Copy(temp, _items, temp.Length); + } + + var index = _size++; + _items[index] = new IndexedItem { Value = item, Id = Interlocked.Increment(ref _count) }; + Percolate(index); + } + + public bool Remove(T item) + { + for (var i = 0; i < _size; ++i) + { + if (EqualityComparer<T>.Default.Equals(_items[i].Value, item)) + { + RemoveAt(i); + return true; + } + } + + return false; + } + + struct IndexedItem : IComparable<IndexedItem> + { + public T Value; +#if !NO_INTERLOCKED_64 + public long Id; +#else + public int Id; +#endif + + public int CompareTo(IndexedItem other) + { + var c = Value.CompareTo(other.Value); + if (c == 0) + c = Id.CompareTo(other.Id); + return c; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs new file mode 100644 index 0000000..f03b45f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive +{ + /// <summary> + /// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + internal interface IProducer< +#if !NO_VARIANCE + out +#endif + TSource> : IObservable<TSource> + { + IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard); + } + + /// <summary> + /// Base class for implementation of query operators, providing performance benefits over the use of Observable.Create. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + internal abstract class Producer<TSource> : IProducer<TSource> + { + /// <summary> + /// Publicly visible Subscribe method. + /// </summary> + /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param> + /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns> + public IDisposable Subscribe(IObserver<TSource> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + return SubscribeRaw(observer, true); + } + + public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard) + { + var state = new State(); + state.observer = observer; + state.sink = new SingleAssignmentDisposable(); + state.subscription = new SingleAssignmentDisposable(); + + var d = new CompositeDisposable(2) { state.sink, state.subscription }; + + // + // See AutoDetachObserver.cs for more information on the safeguarding requirement and + // its implementation aspects. + // + if (enableSafeguard) + { + state.observer = SafeObserver<TSource>.Create(state.observer, d); + } + + if (CurrentThreadScheduler.IsScheduleRequired) + { + CurrentThreadScheduler.Instance.Schedule(state, Run); + } + else + { + state.subscription.Disposable = this.Run(state.observer, state.subscription, state.Assign); + } + + return d; + } + + struct State + { + public SingleAssignmentDisposable sink; + public SingleAssignmentDisposable subscription; + public IObserver<TSource> observer; + + public void Assign(IDisposable s) + { + sink.Disposable = s; + } + } + + private IDisposable Run(IScheduler _, State x) + { + x.subscription.Disposable = this.Run(x.observer, x.subscription, x.Assign); + return Disposable.Empty; + } + + /// <summary> + /// Core implementation of the query operator, called upon a new subscription to the producer object. + /// </summary> + /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param> + /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param> + /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param> + /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns> + /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks> + protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink); + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs new file mode 100644 index 0000000..2693569 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive +{ + // + // See AutoDetachObserver.cs for more information on the safeguarding requirement and + // its implementation aspects. + // + + class SafeObserver<TSource> : IObserver<TSource> + { + private readonly IObserver<TSource> _observer; + private readonly IDisposable _disposable; + + public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable) + { + var a = observer as AnonymousObserver<TSource>; + if (a != null) + return a.MakeSafe(disposable); + else + return new SafeObserver<TSource>(observer, disposable); + } + + private SafeObserver(IObserver<TSource> observer, IDisposable disposable) + { + _observer = observer; + _disposable = disposable; + } + + public void OnNext(TSource value) + { + var __noError = false; + try + { + _observer.OnNext(value); + __noError = true; + } + finally + { + if (!__noError) + _disposable.Dispose(); + } + } + + public void OnError(Exception error) + { + try + { + _observer.OnError(error); + } + finally + { + _disposable.Dispose(); + } + } + + public void OnCompleted() + { + try + { + _observer.OnCompleted(); + } + finally + { + _disposable.Dispose(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs new file mode 100644 index 0000000..2b728e2 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs @@ -0,0 +1,441 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive +{ +#if !NO_PERF && !NO_CDS + using System.Collections.Concurrent; + using System.Diagnostics; + + internal class ScheduledObserver<T> : ObserverBase<T>, IDisposable + { + private volatile int _state = 0; + private const int STOPPED = 0; + private const int RUNNING = 1; + private const int PENDING = 2; + private const int FAULTED = 9; + + private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>(); + private volatile bool _failed; + private volatile Exception _error; + private volatile bool _completed; + + private readonly IObserver<T> _observer; + private readonly IScheduler _scheduler; + private readonly ISchedulerLongRunning _longRunning; + private readonly SerialDisposable _disposable = new SerialDisposable(); + + public ScheduledObserver(IScheduler scheduler, IObserver<T> observer) + { + _scheduler = scheduler; + _observer = observer; + _longRunning = _scheduler.AsLongRunning(); + + if (_longRunning != null) + _dispatcherEvent = new SemaphoreSlim(0); + } + + private readonly object _dispatcherInitGate = new object(); + private SemaphoreSlim _dispatcherEvent; + private IDisposable _dispatcherJob; + + private void EnsureDispatcher() + { + if (_dispatcherJob == null) + { + lock (_dispatcherInitGate) + { + if (_dispatcherJob == null) + { + _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch); + + _disposable.Disposable = new CompositeDisposable(2) + { + _dispatcherJob, + Disposable.Create(() => _dispatcherEvent.Release()) + }; + } + } + } + } + + private void Dispatch(ICancelable cancel) + { + while (true) + { + _dispatcherEvent.Wait(); + + if (cancel.IsDisposed) + return; + + var next = default(T); + while (_queue.TryDequeue(out next)) + { + try + { + _observer.OnNext(next); + } + catch + { + var nop = default(T); + while (_queue.TryDequeue(out nop)) + ; + + throw; + } + + _dispatcherEvent.Wait(); + + if (cancel.IsDisposed) + return; + } + + if (_failed) + { + _observer.OnError(_error); + Dispose(); + return; + } + + if (_completed) + { + _observer.OnCompleted(); + Dispose(); + return; + } + } + } + + public void EnsureActive() + { + EnsureActive(1); + } + + public void EnsureActive(int n) + { + if (_longRunning != null) + { + if (n > 0) + _dispatcherEvent.Release(n); + + EnsureDispatcher(); + } + else + EnsureActiveSlow(); + } + + private void EnsureActiveSlow() + { + var isOwner = false; + +#pragma warning disable 0420 + while (true) + { + var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED); + if (old == STOPPED) + { + isOwner = true; // RUNNING + break; + } + + if (old == FAULTED) + return; + + // If we find the consumer loop running, we transition to PENDING to handle + // the case where the queue is seen empty by the consumer, making it transition + // to the STOPPED state, but we inserted an item into the queue. + // + // C: _queue.TryDequeue == false (RUNNING) + // ---------------------------------------------- + // P: _queue.Enqueue(...) + // EnsureActive + // Exchange(ref _state, RUNNING) == RUNNING + // ---------------------------------------------- + // C: transition to STOPPED (STOPPED) + // + // In this case, P would believe C is running and not invoke the scheduler + // using the isOwner flag. + // + // By introducing an intermediate PENDING state and using CAS in the consumer + // to only transition to STOPPED in case we were still RUNNING, we can force + // the consumer to reconsider the decision to transition to STOPPED. In that + // case, the consumer loops again and re-reads from the queue and other state + // fields. At least one bit of state will have changed because EnsureActive + // should only be called after invocation of IObserver<T> methods that touch + // this state. + // + if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING) + break; + } +#pragma warning restore 0420 + + if (isOwner) + { + _disposable.Disposable = _scheduler.Schedule<object>(null, Run); + } + } + + private void Run(object state, Action<object> recurse) + { +#pragma warning disable 0420 + var next = default(T); + while (!_queue.TryDequeue(out next)) + { + if (_failed) + { + // Between transitioning to _failed and the queue check in the loop, + // items could have been queued, so we can't stop yet. We don't spin + // and immediately re-check the queue. + // + // C: _queue.TryDequeue == false + // ---------------------------------------------- + // P: OnNext(...) + // _queue.Enqueue(...) // Will get lost + // P: OnError(...) + // _failed = true + // ---------------------------------------------- + // C: if (_failed) + // _observer.OnError(...) // Lost an OnNext + // + if (!_queue.IsEmpty) + continue; + + Interlocked.Exchange(ref _state, STOPPED); + _observer.OnError(_error); + Dispose(); + return; + } + + if (_completed) + { + // Between transitioning to _completed and the queue check in the loop, + // items could have been queued, so we can't stop yet. We don't spin + // and immediately re-check the queue. + // + // C: _queue.TryDequeue == false + // ---------------------------------------------- + // P: OnNext(...) + // _queue.Enqueue(...) // Will get lost + // P: OnCompleted(...) + // _completed = true + // ---------------------------------------------- + // C: if (_completed) + // _observer.OnCompleted() // Lost an OnNext + // + if (!_queue.IsEmpty) + continue; + + Interlocked.Exchange(ref _state, STOPPED); + _observer.OnCompleted(); + Dispose(); + return; + } + + var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING); + if (old == RUNNING || old == FAULTED) + return; + + Debug.Assert(old == PENDING); + + // The producer has put us in the PENDING state to prevent us from + // transitioning to STOPPED, so we go RUNNING again and re-check our state. + _state = RUNNING; + } + + Interlocked.Exchange(ref _state, RUNNING); + +#pragma warning restore 0420 + + try + { + _observer.OnNext(next); + } + catch + { +#pragma warning disable 0420 + Interlocked.Exchange(ref _state, FAULTED); +#pragma warning restore 0420 + + var nop = default(T); + while (_queue.TryDequeue(out nop)) + ; + throw; + } + + recurse(state); + } + + protected override void OnNextCore(T value) + { + _queue.Enqueue(value); + } + + protected override void OnErrorCore(Exception exception) + { + _error = exception; + _failed = true; + } + + protected override void OnCompletedCore() + { + _completed = true; + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + _disposable.Dispose(); + } + } + } +#else + class ScheduledObserver<T> : ObserverBase<T>, IDisposable + { + private bool _isAcquired = false; + private bool _hasFaulted = false; + private readonly Queue<Action> _queue = new Queue<Action>(); + private readonly IObserver<T> _observer; + private readonly IScheduler _scheduler; + private readonly SerialDisposable _disposable = new SerialDisposable(); + + public ScheduledObserver(IScheduler scheduler, IObserver<T> observer) + { + _scheduler = scheduler; + _observer = observer; + } + + public void EnsureActive(int n) + { + EnsureActive(); + } + + public void EnsureActive() + { + var isOwner = false; + + lock (_queue) + { + if (!_hasFaulted && _queue.Count > 0) + { + isOwner = !_isAcquired; + _isAcquired = true; + } + } + + if (isOwner) + { + _disposable.Disposable = _scheduler.Schedule<object>(null, Run); + } + } + + private void Run(object state, Action<object> recurse) + { + var work = default(Action); + lock (_queue) + { + if (_queue.Count > 0) + work = _queue.Dequeue(); + else + { + _isAcquired = false; + return; + } + } + + try + { + work(); + } + catch + { + lock (_queue) + { + _queue.Clear(); + _hasFaulted = true; + } + throw; + } + + recurse(state); + } + + protected override void OnNextCore(T value) + { + lock (_queue) + _queue.Enqueue(() => _observer.OnNext(value)); + } + + protected override void OnErrorCore(Exception exception) + { + lock (_queue) + _queue.Enqueue(() => _observer.OnError(exception)); + } + + protected override void OnCompletedCore() + { + lock (_queue) + _queue.Enqueue(() => _observer.OnCompleted()); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + _disposable.Dispose(); + } + } + } +#endif + + class ObserveOnObserver<T> : ScheduledObserver<T> + { + private IDisposable _cancel; + + public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel) + : base(scheduler, observer) + { + _cancel = cancel; + } + + protected override void OnNextCore(T value) + { + base.OnNextCore(value); + EnsureActive(); + } + + protected override void OnErrorCore(Exception exception) + { + base.OnErrorCore(exception); + EnsureActive(); + } + + protected override void OnCompletedCore() + { + base.OnCompletedCore(); + EnsureActive(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + var cancel = Interlocked.Exchange(ref _cancel, null); + if (cancel != null) + { + cancel.Dispose(); + } + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs new file mode 100644 index 0000000..4f5eeee --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_SEMAPHORE && SILVERLIGHT +using System; +using System.Threading; + +namespace System.Reactive.Threading +{ + //Monitor based implementation of Semaphore + //that mimicks the .NET Semaphore class (System.Threading.Semaphore) + + internal sealed class Semaphore : IDisposable + { + private int m_currentCount; + private readonly int m_maximumCount; + private readonly object m_lockObject; + private bool m_disposed; + + public Semaphore(int initialCount, int maximumCount) + { + if (initialCount < 0) + { + throw new ArgumentOutOfRangeException("initialCount", "Non-negative number required."); + } + if (maximumCount < 1) + { + throw new ArgumentOutOfRangeException("maximumCount", "Positive number required."); + } + if (initialCount > maximumCount) + { + throw new ArgumentException("Initial count must be smaller than maximum"); + } + + m_currentCount = initialCount; + m_maximumCount = maximumCount; + m_lockObject = new object(); + } + + public int Release() + { + return this.Release(1); + } + + public int Release(int releaseCount) + { + if (releaseCount < 1) + { + throw new ArgumentOutOfRangeException("releaseCount", "Positive number required."); + } + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + var oldCount = default(int); + lock (m_lockObject) + { + oldCount = m_currentCount; + if (releaseCount + m_currentCount > m_maximumCount) + { + throw new ArgumentOutOfRangeException("releaseCount", "Amount of releases would overflow maximum"); + } + m_currentCount += releaseCount; + //PulseAll makes sure all waiting threads get queued for acquiring the lock + //Pulse would only queue one thread. + + Monitor.PulseAll(m_lockObject); + } + return oldCount; + } + + public bool WaitOne() + { + return WaitOne(Timeout.Infinite); + } + + public bool WaitOne(int millisecondsTimeout) + { + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + lock (m_lockObject) + { + while (m_currentCount == 0) + { + if (!Monitor.Wait(m_lockObject, millisecondsTimeout)) + { + return false; + } + } + m_currentCount--; + return true; + } + } + + public bool WaitOne(TimeSpan timeout) + { + return WaitOne((int)timeout.TotalMilliseconds); + } + + public void Close() + { + Dispose(); + } + + public void Dispose() + { + //the .NET CLR semaphore does not release waits upon dispose + //so we don't do that either. + m_disposed = true; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs new file mode 100644 index 0000000..847c14d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_SEMAPHORE && (XNA || NETCF) +using System; +using System.Collections.Generic; +using System.Threading; + +namespace System.Reactive.Threading +{ + //Monitor based implementation of Semaphore + //that mimicks the .NET Semaphore class (System.Threading.Semaphore) + + internal sealed class Semaphore : IDisposable + { + private int m_currentCount; + private readonly int m_maximumCount; + private readonly object m_lockObject; + private bool m_disposed; + private readonly List<ManualResetEvent> m_waiting; + + public Semaphore(int initialCount, int maximumCount) + { + if (initialCount < 0) + { + throw new ArgumentOutOfRangeException("initialCount", "Non-negative number required."); + } + if (maximumCount < 1) + { + throw new ArgumentOutOfRangeException("maximumCount", "Positive number required."); + } + if (initialCount > maximumCount) + { + throw new ArgumentException("Initial count must be smaller than maximum"); + } + m_waiting = new List<ManualResetEvent>(); + m_currentCount = initialCount; + m_maximumCount = maximumCount; + m_lockObject = new object(); + } + + public int Release() + { + return this.Release(1); + } + + public int Release(int releaseCount) + { + if (releaseCount < 1) + { + throw new ArgumentOutOfRangeException("releaseCount", "Positive number required."); + } + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + var oldCount = default(int); + var toBeReleased = new List<ManualResetEvent>(); + lock (m_lockObject) + { + oldCount = m_currentCount; + if (releaseCount + m_currentCount > m_maximumCount) + { + throw new ArgumentOutOfRangeException("releaseCount", "Amount of releases would overflow maximum"); + } + + var waiting = m_waiting.ToArray(); + var left = Math.Max(0, releaseCount - waiting.Length); + for (var i = 0; i < releaseCount && i < m_waiting.Count; i++) + { + toBeReleased.Add(waiting[i]); + m_waiting.RemoveAt(0); + } + m_currentCount += left; + } + foreach(var release in toBeReleased) + { + release.Set(); + } + return oldCount; + } + + public bool WaitOne() + { + return WaitOne(Timeout.Infinite); + } + + public bool WaitOne(int millisecondsTimeout) + { + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + var manualResetEvent = default(ManualResetEvent); + + lock (m_lockObject) + { + if (m_currentCount == 0) + { + manualResetEvent = new ManualResetEvent(false); + m_waiting.Add(manualResetEvent); + } + else + { + m_currentCount--; + return true; + } + } +#if XNA_31_ZUNE || NETCF35 + if (!manualResetEvent.WaitOne(millisecondsTimeout, false)) +#else + if (!manualResetEvent.WaitOne(millisecondsTimeout)) +#endif + { + lock(m_lockObject) + { + m_waiting.Remove(manualResetEvent); + } + return false; + } + return true; + } + + public bool WaitOne(TimeSpan timeout) + { + return WaitOne((int)timeout.TotalMilliseconds); + } + + public void Close() + { + Dispose(); + } + + public void Dispose() + { + //the .NET CLR semaphore does not release waits upon dispose + //so we don't do that either. + m_disposed = true; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs new file mode 100644 index 0000000..ffe6ee5 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System.Threading; + +namespace System.Reactive +{ + /// <summary> + /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + /// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks> + internal abstract class Sink<TSource> : IDisposable + { + protected internal volatile IObserver<TSource> _observer; + private IDisposable _cancel; + + public Sink(IObserver<TSource> observer, IDisposable cancel) + { + _observer = observer; + _cancel = cancel; + } + + public virtual void Dispose() + { + _observer = NopObserver<TSource>.Instance; + + var cancel = Interlocked.Exchange(ref _cancel, null); + if (cancel != null) + { + cancel.Dispose(); + } + } + + public IObserver<TSource> GetForwarder() + { + return new _(this); + } + + class _ : IObserver<TSource> + { + private readonly Sink<TSource> _forward; + + public _(Sink<TSource> forward) + { + _forward = forward; + } + + public void OnNext(TSource value) + { + _forward._observer.OnNext(value); + } + + public void OnError(Exception error) + { + _forward._observer.OnError(error); + _forward.Dispose(); + } + + public void OnCompleted() + { + _forward._observer.OnCompleted(); + _forward.Dispose(); + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs new file mode 100644 index 0000000..d3c2374 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal static class Stubs<T> + { + public static readonly Action<T> Ignore = _ => { }; + public static readonly Func<T, T> I = _ => _; + } + + internal static class Stubs + { + public static readonly Action Nop = () => { }; + public static readonly Action<Exception> Throw = ex => { ex.Throw(); }; + } + +#if !NO_THREAD + internal static class TimerStubs + { + public static readonly System.Threading.Timer Never = new System.Threading.Timer(_ => { }); + } +#endif +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs new file mode 100644 index 0000000..c417f90 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_SYNCCTX +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + internal static class SynchronizationContextExtensions + { + public static void PostWithStartComplete<T>(this SynchronizationContext context, Action<T> action, T state) + { + context.OperationStarted(); + + context.Post( + o => + { + try + { + action((T)o); + } + finally + { + context.OperationCompleted(); + } + }, + state + ); + } + + public static void PostWithStartComplete(this SynchronizationContext context, Action action) + { + context.OperationStarted(); + + context.Post( + _ => + { + try + { + action(); + } + finally + { + context.OperationCompleted(); + } + }, + null + ); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs new file mode 100644 index 0000000..e41294e --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal class SynchronizedObserver<T> : ObserverBase<T> + { + private readonly object _gate; + private readonly IObserver<T> _observer; + + public SynchronizedObserver(IObserver<T> observer, object gate) + { + _gate = gate; + _observer = observer; + } + + protected override void OnNextCore(T value) + { + lock (_gate) + { + _observer.OnNext(value); + } + } + + protected override void OnErrorCore(Exception exception) + { + lock (_gate) + { + _observer.OnError(exception); + } + } + + protected override void OnCompletedCore() + { + lock (_gate) + { + _observer.OnCompleted(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs new file mode 100644 index 0000000..b9c8167 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provides access to the local system clock. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class DefaultSystemClock : ISystemClock + { + /// <summary> + /// Gets the current time. + /// </summary> + public DateTimeOffset UtcNow + { + get { return DateTimeOffset.UtcNow; } + } + } + + internal class DefaultSystemClockMonitor : PeriodicTimerSystemClockMonitor + { + private static readonly TimeSpan DEFAULT_PERIOD = TimeSpan.FromSeconds(1); + + public DefaultSystemClockMonitor() + : base(DEFAULT_PERIOD) + { + } + } + + /// <summary> + /// (Infrastructure) Monitors for system clock changes based on a periodic timer. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged + { + private readonly TimeSpan _period; + private readonly SerialDisposable _timer; + + private DateTimeOffset _lastTime; + private EventHandler<SystemClockChangedEventArgs> _systemClockChanged; + + private const int SYNC_MAXRETRIES = 100; + private const double SYNC_MAXDELTA = 10; + private const int MAXERROR = 100; + + /// <summary> + /// Creates a new monitor for system clock changes with the specified polling frequency. + /// </summary> + /// <param name="period">Polling frequency for system clock changes.</param> + public PeriodicTimerSystemClockMonitor(TimeSpan period) + { + _period = period; + _timer = new SerialDisposable(); + } + + /// <summary> + /// Event that gets raised when a system clock change is detected. + /// </summary> + public event EventHandler<SystemClockChangedEventArgs> SystemClockChanged + { + add + { + NewTimer(); + + _systemClockChanged += value; + } + + remove + { + _systemClockChanged -= value; + + _timer.Disposable = Disposable.Empty; + } + } + + private void NewTimer() + { + _timer.Disposable = Disposable.Empty; + + var n = 0; + do + { + _lastTime = SystemClock.UtcNow; + _timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period); + } while (Math.Abs((SystemClock.UtcNow - _lastTime).TotalMilliseconds) > SYNC_MAXDELTA && ++n < SYNC_MAXRETRIES); + + if (n >= SYNC_MAXRETRIES) + throw new InvalidOperationException(Strings_Core.FAILED_CLOCK_MONITORING); + } + + private void TimeChanged() + { + var now = SystemClock.UtcNow; + var diff = now - (_lastTime + _period); + if (Math.Abs(diff.TotalMilliseconds) >= MAXERROR) + { + var scc = _systemClockChanged; + if (scc != null) + scc(this, new SystemClockChangedEventArgs(_lastTime + _period, now)); + + NewTimer(); + } + else + { + _lastTime = SystemClock.UtcNow; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs new file mode 100644 index 0000000..e482dd0 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Threading; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provides access to local system clock services. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public static class SystemClock + { + private static Lazy<ISystemClock> s_serviceSystemClock = new Lazy<ISystemClock>(InitializeSystemClock); + private static Lazy<INotifySystemClockChanged> s_serviceSystemClockChanged = new Lazy<INotifySystemClockChanged>(InitializeSystemClockChanged); + + private static int _refCount; + + /// <summary> + /// Gets the local system clock time. + /// </summary> + public static DateTimeOffset UtcNow + { + get { return s_serviceSystemClock.Value.UtcNow; } + } + + /// <summary> + /// Event that gets raised when a system clock change is detected, if there's any interest as indicated by AddRef calls. + /// </summary> + public static event EventHandler<SystemClockChangedEventArgs> SystemClockChanged; + + /// <summary> + /// Adds a reference to the system clock monitor, causing it to be sending notifications. + /// </summary> + /// <exception cref="NotSupportedException">Thrown when the system doesn't support sending clock change notifications.</exception> + public static void AddRef() + { + if (Interlocked.Increment(ref _refCount) == 1) + { + s_serviceSystemClockChanged.Value.SystemClockChanged += OnSystemClockChanged; + } + } + + /// <summary> + /// Removes a reference to the system clock monitor, causing it to stop sending notifications + /// if the removed reference was the last one. + /// </summary> + public static void Release() + { + if (Interlocked.Decrement(ref _refCount) == 0) + { + s_serviceSystemClockChanged.Value.SystemClockChanged -= OnSystemClockChanged; + } + } + + private static void OnSystemClockChanged(object sender, SystemClockChangedEventArgs e) + { + var scc = SystemClockChanged; + if (scc != null) + scc(sender, e); + } + + private static ISystemClock InitializeSystemClock() + { + return PlatformEnlightenmentProvider.Current.GetService<ISystemClock>() ?? new DefaultSystemClock(); + } + + private static INotifySystemClockChanged InitializeSystemClockChanged() + { + return PlatformEnlightenmentProvider.Current.GetService<INotifySystemClockChanged>() ?? new DefaultSystemClockMonitor(); + } + } + + /// <summary> + /// (Infrastructure) Provides access to the local system clock. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface ISystemClock + { + /// <summary> + /// Gets the current time. + /// </summary> + DateTimeOffset UtcNow { get; } + } + + /// <summary> + /// (Infrastructure) Provides a mechanism to notify local schedulers about system clock changes. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface INotifySystemClockChanged + { + /// <summary> + /// Event that gets raised when a system clock change is detected. + /// </summary> + event EventHandler<SystemClockChangedEventArgs> SystemClockChanged; + } + + /// <summary> + /// (Infrastructure) Event arguments for system clock change notifications. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public class SystemClockChangedEventArgs : EventArgs + { + /// <summary> + /// Creates a new system clock notification object with unknown old and new times. + /// </summary> + public SystemClockChangedEventArgs() + : this(DateTimeOffset.MinValue, DateTimeOffset.MaxValue) + { + } + + /// <summary> + /// Creates a new system clock notification object with the specified old and new times. + /// </summary> + /// <param name="oldTime">Time before the system clock changed, or DateTimeOffset.MinValue if not known.</param> + /// <param name="newTime">Time after the system clock changed, or DateTimeOffset.MaxValue if not known.</param> + public SystemClockChangedEventArgs(DateTimeOffset oldTime, DateTimeOffset newTime) + { + OldTime = oldTime; + NewTime = newTime; + } + + /// <summary> + /// Gets the time before the system clock changed, or DateTimeOffset.MinValue if not known. + /// </summary> + public DateTimeOffset OldTime { get; private set; } + + /// <summary> + /// Gets the time after the system clock changed, or DateTimeOffset.MaxValue if not known. + /// </summary> + public DateTimeOffset NewTime { get; private set; } + } +} |