Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs58
1 files changed, 47 insertions, 11 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
index 7d91f73..9baf146 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
@@ -1,13 +1,13 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- #if !NO_PERF
+#if !NO_PERF
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>>
{
@@ -15,14 +15,16 @@ namespace System.Reactive.Linq.Observαble
private readonly Func<TSource, TKey> _keySelector;
private readonly Func<TSource, TElement> _elementSelector;
private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
+ private readonly int? _capacity;
private readonly IEqualityComparer<TKey> _comparer;
- public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
_source = source;
_keySelector = keySelector;
_elementSelector = elementSelector;
_durationSelector = durationSelector;
+ _capacity = capacity;
_comparer = comparer;
}
@@ -52,7 +54,7 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _map = new Map<TKey, ISubject<TElement>>(_parent._comparer);
+ _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
_nullGate = new object();
}
@@ -127,7 +129,7 @@ namespace System.Reactive.Linq.Observαble
var md = new SingleAssignmentDisposable();
_parent._groupDisposable.Add(md);
- md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, key, writer, md));
}
var element = default(TElement);
@@ -163,14 +165,14 @@ namespace System.Reactive.Linq.Observαble
writer.OnNext(element);
}
- class δ : IObserver<TDuration>
+ class Delta : IObserver<TDuration>
{
private readonly _ _parent;
private readonly TKey _key;
private readonly ISubject<TElement> _writer;
private readonly IDisposable _self;
- public δ(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
+ public Delta(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
{
_parent = parent;
_key = key;
@@ -272,11 +274,38 @@ namespace System.Reactive.Linq.Observαble
#if !NO_CDS
class Map<TKey, TValue>
{
+#if !NO_CDS_COLLECTIONS
+ // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs
+
+ // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
+ // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
+ // and blocking, but also the more expensive operations that require all locks become (e.g. table
+ // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
+ // compromise.
+ private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
+
+ private static int DefaultConcurrencyLevel
+ {
+ get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
+ }
+#endif
+
private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
- public Map(IEqualityComparer<TKey> comparer)
+ public Map(int? capacity, IEqualityComparer<TKey> comparer)
{
- _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+ if (capacity.HasValue)
+ {
+#if NO_CDS_COLLECTIONS
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer);
+#else
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
+#endif
+ }
+ else
+ {
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+ }
}
public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
@@ -327,9 +356,16 @@ namespace System.Reactive.Linq.Observαble
{
private readonly Dictionary<TKey, TValue> _map;
- public Map(IEqualityComparer<TKey> comparer)
+ public Map(int? capacity, IEqualityComparer<TKey> comparer)
{
- _map = new Dictionary<TKey, TValue>(comparer);
+ if (capacity.HasValue)
+ {
+ _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
+ }
+ else
+ {
+ _map = new Dictionary<TKey, TValue>(comparer);
+ }
}
public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)