diff options
author | Atsushi Eno <atsushieno@veritas-vos-liberabit.com> | 2012-11-12 22:47:31 +0400 |
---|---|---|
committer | Atsushi Eno <atsushieno@veritas-vos-liberabit.com> | 2012-11-12 22:52:17 +0400 |
commit | d1174f3f8979321a9182925df460e07e08157b41 (patch) | |
tree | d16fb2fc191bf68ff0e2aac600adf71aba8cad01 /Rx.NET/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs | |
parent | d90a52595e24b1216c89f6cb5f245262db1810ae (diff) |
partial import of ca05fdeb565e: Reactive Extensions OSS V1.0
Diffstat (limited to 'Rx.NET/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs | 576 |
1 files changed, 576 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs b/Rx.NET/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs new file mode 100644 index 0000000..8b7ec81 --- /dev/null +++ b/Rx.NET/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs @@ -0,0 +1,576 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +/* + * WARNING: Auto-generated file (7/18/2012 4:59:53 PM) + * + * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentDictionary.cs + */ + +#if NO_CDS_COLLECTIONS + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics.CodeAnalysis; +using System.Reflection; +using System.Threading; + +namespace System.Collections.Concurrent +{ + internal class ConcurrentDictionary<TKey, TValue> + { + /* >>> Code copied from the Array class */ + + // We impose limits on maximum array lenght in each dimension to allow efficient + // implementation of advanced range check elimination in future. + // Keep in sync with vm\gcscan.cpp and HashHelpers.MaxPrimeArrayLength. + internal const int MaxArrayLength = 0X7FEFFFFF; + + /* <<< Code copied from the Array class */ + + private class Tables + { + internal readonly Node[] m_buckets; // A singly-linked list for each bucket. + internal readonly object[] m_locks; // A set of locks, each guarding a section of the table. + internal volatile int[] m_countPerLock; // The number of elements guarded by each lock. + + internal Tables(Node[] buckets, object[] locks, int[] countPerLock) + { + m_buckets = buckets; + m_locks = locks; + m_countPerLock = countPerLock; + } + } + + private volatile Tables m_tables; // Internal tables of the dictionary + private readonly IEqualityComparer<TKey> m_comparer; // Key equality comparer + private readonly bool m_growLockArray; // Whether to dynamically increase the size of the striped lock + private int m_budget; // The maximum number of elements per lock before a resize operation is triggered + + // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the + // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference + // and blocking, but also the more expensive operations that require all locks become (e.g. table + // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good + // compromise. + private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4; + + // The default capacity, i.e. the initial # of buckets. When choosing this value, we are making + // a trade-off between the size of a very small dictionary, and the number of resizes when + // constructing a large dictionary. Also, the capacity should not be divisible by a small prime. + private const int DEFAULT_CAPACITY = 31; + + // The maximum size of the striped lock that will not be exceeded when locks are automatically + // added as the dictionary grows. However, the user is allowed to exceed this limit by passing + // a concurrency level larger than MAX_LOCK_NUMBER into the constructor. + private const int MAX_LOCK_NUMBER = 1024; + + // Whether TValue is a type that can be written atomically (i.e., with no danger of torn reads) + private static readonly bool s_isValueWriteAtomic = IsValueWriteAtomic(); + + private static bool IsValueWriteAtomic() + { + Type valueType = typeof(TValue); + + // + // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without + // the risk of tearing. + // + // See http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-335.pdf + // + bool isAtomic = + (valueType.GetTypeInfo().IsClass) + || valueType == typeof(Boolean) + || valueType == typeof(Char) + || valueType == typeof(Byte) + || valueType == typeof(SByte) + || valueType == typeof(Int16) + || valueType == typeof(UInt16) + || valueType == typeof(Int32) + || valueType == typeof(UInt32) + || valueType == typeof(Single); + + if (!isAtomic && IntPtr.Size == 8) + { + isAtomic |= valueType == typeof(Double) || valueType == typeof(Int64); + } + + return isAtomic; + } + + public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { } + + internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) + { + if (concurrencyLevel < 1) + { + throw new ArgumentOutOfRangeException("concurrencyLevel"); + } + if (capacity < 0) + { + throw new ArgumentOutOfRangeException("capacity"); + } + if (comparer == null) throw new ArgumentNullException("comparer"); + + // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard + // any buckets. + if (capacity < concurrencyLevel) + { + capacity = concurrencyLevel; + } + + object[] locks = new object[concurrencyLevel]; + for (int i = 0; i < locks.Length; i++) + { + locks[i] = new object(); + } + + int[] countPerLock = new int[locks.Length]; + Node[] buckets = new Node[capacity]; + m_tables = new Tables(buckets, locks, countPerLock); + + m_comparer = comparer; + m_growLockArray = growLockArray; + m_budget = buckets.Length / locks.Length; + } + + public bool TryAdd(TKey key, TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + TValue dummy; + return TryAddInternal(key, value, false, true, out dummy); + } + + public bool TryRemove(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + return TryRemoveInternal(key, out value, false, default(TValue)); + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) + { + while (true) + { + Tables tables = m_tables; + + int bucketNo, lockNo; + GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + lock (tables.m_locks[lockNo]) + { + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + Node prev = null; + for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next) + { + if (m_comparer.Equals(curr.m_key, key)) + { + if (matchValue) + { + bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value); + if (!valuesMatch) + { + value = default(TValue); + return false; + } + } + + if (prev == null) + { + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], curr.m_next); + } + else + { + prev.m_next = curr.m_next; + } + + value = curr.m_value; + tables.m_countPerLock[lockNo]--; + return true; + } + prev = curr; + } + } + + value = default(TValue); + return false; + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + public bool TryGetValue(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + int bucketNo, lockNoUnused; + + // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize. + Tables tables = m_tables; + + GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNoUnused, tables.m_buckets.Length, tables.m_locks.Length); + + // We can get away w/out a lock here. + // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i]. + Node n = Volatile.Read<Node>(ref tables.m_buckets[bucketNo]); + + while (n != null) + { + if (m_comparer.Equals(n.m_key, key)) + { + value = n.m_value; + return true; + } + n = n.m_next; + } + + value = default(TValue); + return false; + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) + { + int hashcode = m_comparer.GetHashCode(key); + + while (true) + { + int bucketNo, lockNo; + + Tables tables = m_tables; + GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + bool resizeDesired = false; + bool lockTaken = false; + try + { + if (acquireLock) + Monitor.Enter(tables.m_locks[lockNo], ref lockTaken); + + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + // Try to find this key in the bucket + Node prev = null; + for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) + { + if (m_comparer.Equals(node.m_key, key)) + { + // The key was found in the dictionary. If updates are allowed, update the value for that key. + // We need to create a new node for the update, in order to support TValue types that cannot + // be written atomically, since lock-free reads may be happening concurrently. + if (updateIfExists) + { + if (s_isValueWriteAtomic) + { + node.m_value = value; + } + else + { + Node newNode = new Node(node.m_key, value, hashcode, node.m_next); + if (prev == null) + { + tables.m_buckets[bucketNo] = newNode; + } + else + { + prev.m_next = newNode; + } + } + resultingValue = value; + } + else + { + resultingValue = node.m_value; + } + return false; + } + prev = node; + } + + // The key was not found in the bucket. Insert the key-value pair. + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo])); + checked + { + tables.m_countPerLock[lockNo]++; + } + + // + // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. + // It is also possible that GrowTable will increase the budget but won't resize the bucket table. + // That happens if the bucket table is found to be poorly utilized due to a bad hash function. + // + if (tables.m_countPerLock[lockNo] > m_budget) + { + resizeDesired = true; + } + } + finally + { + if (lockTaken) + Monitor.Exit(tables.m_locks[lockNo]); + } + + // + // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. + // + // Concurrency notes: + // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. + // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 + // and then verify that the table we passed to it as the argument is still the current table. + // + if (resizeDesired) + { + GrowTable(tables); + } + + resultingValue = value; + return true; + } + } + + public ICollection<TValue> Values + { + get { return GetValues(); } + } + + private void GrowTable(Tables tables) + { + int locksAcquired = 0; + try + { + // The thread that first obtains m_locks[0] will be the one doing the resize operation + AcquireLocks(0, 1, ref locksAcquired); + + // Make sure nobody resized the table while we were waiting for lock 0: + if (tables != m_tables) + { + // We assume that since the table reference is different, it was already resized (or the budget + // was adjusted). If we ever decide to do table shrinking, or replace the table for other reasons, + // we will have to revisit this logic. + return; + } + + // Compute the (approx.) total size. Use an Int64 accumulation variable to avoid an overflow. + long approxCount = 0; + for (int i = 0; i < tables.m_countPerLock.Length; i++) + { + approxCount += tables.m_countPerLock[i]; + } + + // + // If the bucket array is too empty, double the budget instead of resizing the table + // + if (approxCount < tables.m_buckets.Length / 4) + { + m_budget = 2 * m_budget; + if (m_budget < 0) + { + m_budget = int.MaxValue; + } + return; + } + + + // Compute the new table size. We find the smallest integer larger than twice the previous table size, and not divisible by + // 2,3,5 or 7. We can consider a different table-sizing policy in the future. + int newLength = 0; + bool maximizeTableSize = false; + try + { + checked + { + // Double the size of the buckets table and add one, so that we have an odd integer. + newLength = tables.m_buckets.Length * 2 + 1; + + // Now, we only need to check odd integers, and find the first that is not divisible + // by 3, 5 or 7. + while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) + { + newLength += 2; + } + + if (newLength > MaxArrayLength) + { + maximizeTableSize = true; + } + } + } + catch (OverflowException) + { + maximizeTableSize = true; + } + + if (maximizeTableSize) + { + newLength = MaxArrayLength; + + // We want to make sure that GrowTable will not be called again, since table is at the maximum size. + // To achieve that, we set the budget to int.MaxValue. + // + // (There is one special case that would allow GrowTable() to be called in the future: + // calling Clear() on the ConcurrentDictionary will shrink the table and lower the budget.) + m_budget = int.MaxValue; + } + + // Now acquire all other locks for the table + AcquireLocks(1, tables.m_locks.Length, ref locksAcquired); + + object[] newLocks = tables.m_locks; + + // Add more locks + if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER) + { + newLocks = new object[tables.m_locks.Length * 2]; + Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length); + + for (int i = tables.m_locks.Length; i < newLocks.Length; i++) + { + newLocks[i] = new object(); + } + } + + Node[] newBuckets = new Node[newLength]; + int[] newCountPerLock = new int[newLocks.Length]; + + // Copy all data into a new table, creating new nodes for all elements + for (int i = 0; i < tables.m_buckets.Length; i++) + { + Node current = tables.m_buckets[i]; + while (current != null) + { + Node next = current.m_next; + int newBucketNo, newLockNo; + GetBucketAndLockNo(current.m_hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); + + newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, current.m_hashcode, newBuckets[newBucketNo]); + + checked + { + newCountPerLock[newLockNo]++; + } + + current = next; + } + } + + // Adjust the budget + m_budget = Math.Max(1, newBuckets.Length / newLocks.Length); + + // Replace tables with the new versions + m_tables = new Tables(newBuckets, newLocks, newCountPerLock); + } + finally + { + // Release all locks that we took earlier + ReleaseLocks(0, locksAcquired); + } + } + + private void GetBucketAndLockNo( + int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) + { + bucketNo = (hashcode & 0x7fffffff) % bucketCount; + lockNo = bucketNo % lockCount; + } + + private static int DefaultConcurrencyLevel + { + get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; } + } + + private void AcquireAllLocks(ref int locksAcquired) + { + // First, acquire lock 0 + AcquireLocks(0, 1, ref locksAcquired); + + // Now that we have lock 0, the m_locks array will not change (i.e., grow), + // and so we can safely read m_locks.Length. + AcquireLocks(1, m_tables.m_locks.Length, ref locksAcquired); + } + + private void AcquireLocks(int fromInclusive, int toExclusive, ref int locksAcquired) + { + object[] locks = m_tables.m_locks; + + for (int i = fromInclusive; i < toExclusive; i++) + { + bool lockTaken = false; + try + { + Monitor.Enter(locks[i], ref lockTaken); + } + finally + { + if (lockTaken) + { + locksAcquired++; + } + } + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private void ReleaseLocks(int fromInclusive, int toExclusive) + { + for (int i = fromInclusive; i < toExclusive; i++) + { + Monitor.Exit(m_tables.m_locks[i]); + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + private ReadOnlyCollection<TValue> GetValues() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + List<TValue> values = new List<TValue>(); + + for (int i = 0; i < m_tables.m_buckets.Length; i++) + { + Node current = m_tables.m_buckets[i]; + while (current != null) + { + values.Add(current.m_value); + current = current.m_next; + } + } + + return new ReadOnlyCollection<TValue>(values); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + private class Node + { + internal TKey m_key; + internal TValue m_value; + internal volatile Node m_next; + internal int m_hashcode; + + internal Node(TKey key, TValue value, int hashcode, Node next) + { + m_key = key; + m_value = value; + m_next = next; + m_hashcode = hashcode; + } + } + } +} + +#endif
\ No newline at end of file |