diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs')
-rw-r--r-- | Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs | 58 |
1 files changed, 47 insertions, 11 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs index 7d91f73..9baf146 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs @@ -1,13 +1,13 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - #if !NO_PERF +#if !NO_PERF using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Subjects; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>> { @@ -15,14 +15,16 @@ namespace System.Reactive.Linq.Observαble private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector; + private readonly int? _capacity; private readonly IEqualityComparer<TKey> _comparer; - public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer) + public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer) { _source = source; _keySelector = keySelector; _elementSelector = elementSelector; _durationSelector = durationSelector; + _capacity = capacity; _comparer = comparer; } @@ -52,7 +54,7 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _map = new Map<TKey, ISubject<TElement>>(_parent._comparer); + _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer); _nullGate = new object(); } @@ -127,7 +129,7 @@ namespace System.Reactive.Linq.Observαble var md = new SingleAssignmentDisposable(); _parent._groupDisposable.Add(md); - md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md)); + md.Disposable = duration.SubscribeSafe(new Delta(this, key, writer, md)); } var element = default(TElement); @@ -163,14 +165,14 @@ namespace System.Reactive.Linq.Observαble writer.OnNext(element); } - class δ : IObserver<TDuration> + class Delta : IObserver<TDuration> { private readonly _ _parent; private readonly TKey _key; private readonly ISubject<TElement> _writer; private readonly IDisposable _self; - public δ(_ parent, TKey key, ISubject<TElement> writer, IDisposable self) + public Delta(_ parent, TKey key, ISubject<TElement> writer, IDisposable self) { _parent = parent; _key = key; @@ -272,11 +274,38 @@ namespace System.Reactive.Linq.Observαble #if !NO_CDS class Map<TKey, TValue> { +#if !NO_CDS_COLLECTIONS + // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs + + // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the + // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference + // and blocking, but also the more expensive operations that require all locks become (e.g. table + // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good + // compromise. + private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4; + + private static int DefaultConcurrencyLevel + { + get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; } + } +#endif + private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map; - public Map(IEqualityComparer<TKey> comparer) + public Map(int? capacity, IEqualityComparer<TKey> comparer) { - _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer); + if (capacity.HasValue) + { +#if NO_CDS_COLLECTIONS + _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer); +#else + _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer); +#endif + } + else + { + _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer); + } } public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added) @@ -327,9 +356,16 @@ namespace System.Reactive.Linq.Observαble { private readonly Dictionary<TKey, TValue> _map; - public Map(IEqualityComparer<TKey> comparer) + public Map(int? capacity, IEqualityComparer<TKey> comparer) { - _map = new Dictionary<TKey, TValue>(comparer); + if (capacity.HasValue) + { + _map = new Dictionary<TKey, TValue>(capacity.Value, comparer); + } + else + { + _map = new Dictionary<TKey, TValue>(comparer); + } } public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added) |