Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
committerAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
commit74a538f6725ebc83efda4bb07d5747e8a6359e19 (patch)
tree7c98de97c88c78b4aca4b25b36db310f82c26865 /Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
parent50e7bdb4507f7e4c2aefb7772d57d9a80f4d42b0 (diff)
Import Official Rx 2.2 (3ebdd2e09991)HEADmaster
I made changes from the original source tree to match the older tree so that we don't have to make several changes to project tree generator. (There is actually no new sources in Rx so hopefully we can just reuse existing modifications in the tree).
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs212
1 files changed, 177 insertions, 35 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
index 0089f04..41ceb9c 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
@@ -1,13 +1,10 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
-using System.Linq;
using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
-using System.Reactive.Subjects;
+
#if !NO_TPL
-using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
#endif
@@ -15,7 +12,7 @@ using System.Threading.Tasks;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
@@ -156,30 +153,50 @@ namespace System.Reactive.Linq
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
{
- return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
{
- return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
- return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
{
- return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+ {
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+ {
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
}
- private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+ private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
#if !NO_PERF
- return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+ return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
#else
- return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), comparer);
+ return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer);
#endif
}
@@ -189,32 +206,54 @@ namespace System.Reactive.Linq
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
- return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
{
- return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, EqualityComparer<TKey>.Default);
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
- return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, comparer);
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
{
- return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, EqualityComparer<TKey>.Default);
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
}
- private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
#if !NO_PERF
- return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+ return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
#else
return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer =>
{
- var map = new Dictionary<TKey, ISubject<TElement>>(comparer);
+ var map = capacity.HasValue
+ ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer)
+ : new Dictionary<TKey, ISubject<TElement>>(comparer);
var groupDisposable = new CompositeDisposable();
var refCountDisposable = new RefCountDisposable(groupDisposable);
@@ -733,7 +772,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var select = source as Select<TSource>;
if (select != null)
- return select.Ω(selector);
+ return select.Omega(selector);
return new Select<TSource, TResult>(source, selector);
#else
@@ -853,6 +892,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, (x, i, token) => selector(x, i));
+#else
+ return SelectMany_<TSource, TResult>(source, (x, i) => selector(x, i).ToObservable());
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
#if !NO_PERF
@@ -861,6 +909,15 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TResult>(source, x => FromAsync(ct => selector(x, ct)));
#endif
}
+
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, selector);
+#else
+ return SelectMany_<TSource, TResult>(source, (x, i) => FromAsync(ct => selector(x, i, ct)));
+#endif
+ }
#endif
public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
@@ -883,6 +940,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TTaskResult, TResult>(source, (x, i, token) => taskSelector(x, i), resultSelector);
+#else
+ return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => taskSelector(x, i).ToObservable(), (x, i, t, _) => resultSelector(x, i, t));
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
#if !NO_PERF
@@ -891,6 +957,15 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TTaskResult, TResult>(source, x => FromAsync(ct => taskSelector(x, ct)), resultSelector);
#endif
}
+
+ public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
+#else
+ return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => FromAsync(ct => taskSelector(x, i, ct)), (x, i, t, _) => resultSelector(x, i, t));
+#endif
+ }
#endif
private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
@@ -901,7 +976,7 @@ namespace System.Reactive.Linq
return source.Select(selector).Merge();
#endif
}
-
+
private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
#if !NO_PERF
@@ -925,7 +1000,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
#else
- return SelectMany_<TSource, TResult>(source, x => collectionSelector(x).Select(y => resultSelector(x, y)));
+ return SelectMany_<TSource, TResult>(source, (x, i) => collectionSelector(x, i).Select((y, i2) => resultSelector(x, i, y, i2)));
#endif
}
@@ -946,19 +1021,23 @@ namespace System.Reactive.Linq
#endif
}
- public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted)
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
{
#if !NO_PERF
return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
#else
- return source.Materialize().SelectMany(notification =>
+ return Defer(() =>
{
- if (notification.Kind == NotificationKind.OnNext)
- return onNext(notification.Value);
- else if (notification.Kind == NotificationKind.OnError)
- return onError(notification.Exception);
- else
- return onCompleted();
+ var index = 0;
+ return source.Materialize().SelectMany(notification =>
+ {
+ if (notification.Kind == NotificationKind.OnNext)
+ return onNext(notification.Value, checked(index++));
+ else if (notification.Kind == NotificationKind.OnError)
+ return onError(notification.Exception);
+ else
+ return onCompleted();
+ });
});
#endif
}
@@ -977,7 +1056,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
return new SelectMany<TSource, TResult>(source, selector);
#else
- return SelectMany_<TSource, TResult, TResult>(source, selector, (_, x) => x);
+ return SelectMany_<TSource, TResult, TResult>(source, selector, (_, __, x, ___) => x);
#endif
}
@@ -986,6 +1065,11 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
+ public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
#if !NO_PERF
@@ -1045,9 +1129,67 @@ namespace System.Reactive.Linq
#endif
}
- public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
+#if !NO_PERF
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+#else
+ return new AnonymousObservable<TResult>(observer =>
+ {
+ var index = 0;
+
+ return source.Subscribe(
+ x =>
+ {
+ var xs = default(IEnumerable<TCollection>);
+ try
+ {
+ xs = collectionSelector(x, checked(index++));
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+
+ var e = xs.GetEnumerator();
+
+ try
+ {
+ var eIndex = 0;
+ var hasNext = true;
+ while (hasNext)
+ {
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = resultSelector(x, index, e.Current, checked(eIndex++));
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+
+ if (hasNext)
+ observer.OnNext(current);
+ }
+ }
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ },
+ observer.OnError,
+ observer.OnCompleted
+ )
+ });
+#endif
}
#endregion
@@ -1059,7 +1201,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var skip = source as Skip<TSource>;
if (skip != null && skip._scheduler == null)
- return skip.Ω(count);
+ return skip.Omega(count);
return new Skip<TSource>(source, count);
#else
@@ -1156,7 +1298,7 @@ namespace System.Reactive.Linq
{
var take = source as Take<TSource>;
if (take != null && take._scheduler == null)
- return take.Ω(count);
+ return take.Omega(count);
return new Take<TSource>(source, count);
}
@@ -1248,7 +1390,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var where = source as Where<TSource>;
if (where != null)
- return where.Ω(predicate);
+ return where.Omega(predicate);
return new Where<TSource>(source, predicate);
#else