Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
committerAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
commit74a538f6725ebc83efda4bb07d5747e8a6359e19 (patch)
tree7c98de97c88c78b4aca4b25b36db310f82c26865 /Rx/NET/Source/System.Reactive.Linq/Reactive
parent50e7bdb4507f7e4c2aefb7772d57d9a80f4d42b0 (diff)
Import Official Rx 2.2 (3ebdd2e09991)HEADmaster
I made changes from the original source tree to match the older tree so that we don't have to make several changes to project tree generator. (There is actually no new sources in Rx so hopefully we can just reuse existing modifications in the tree).
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs23
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Blocking.cs14
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Creation.cs6
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Imperative.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Multiple.cs13
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs6
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs367
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs40
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs26
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs6
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs16
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs14
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs16
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs58
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs30
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs30
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs28
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs962
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs20
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs10
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs40
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Aggregates.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Binding.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Blocking.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Concurrency.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Conversions.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Events.cs18
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Imperative.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Multiple.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs6
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs212
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs10
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Subjects/BehaviorSubject.cs32
121 files changed, 1844 insertions, 525 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs
index dee33fb..5a6e1e2 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
using System.Collections.Generic;
-using System.Reactive.Linq.Observαble;
+using System.Reactive.Linq.ObservableImpl;
namespace System.Reactive
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs
index effdd08..34dec93 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs
@@ -15,7 +15,7 @@ namespace System.Reactive
{
public static TDelegate CreateDelegate<TDelegate>(object o, MethodInfo method)
{
-#if CRIPPLED_REFLECTION
+#if (CRIPPLED_REFLECTION && HAS_WINRT)
return (TDelegate)(object)method.CreateDelegate(typeof(TDelegate), o);
#else
return (TDelegate)(object)Delegate.CreateDelegate(typeof(TDelegate), o, method);
@@ -24,7 +24,7 @@ namespace System.Reactive
public static Delegate CreateDelegate(Type delegateType, object o, MethodInfo method)
{
-#if CRIPPLED_REFLECTION
+#if (CRIPPLED_REFLECTION && HAS_WINRT)
return method.CreateDelegate(delegateType, o);
#else
return Delegate.CreateDelegate(delegateType, o, method);
@@ -101,7 +101,7 @@ namespace System.Reactive
public static EventInfo GetEventEx(this Type type, string name, bool isStatic)
{
-#if CRIPPLED_REFLECTION
+#if (CRIPPLED_REFLECTION && HAS_WINRT)
// TODO: replace in the future by System.Reflection.RuntimeExtensions extension methods
var q = new Queue<TypeInfo>();
q.Enqueue(type.GetTypeInfo());
@@ -127,7 +127,7 @@ namespace System.Reactive
#endif
}
-#if CRIPPLED_REFLECTION
+#if (CRIPPLED_REFLECTION && HAS_WINRT)
public static MethodInfo GetMethod(this Type type, string name)
{
return type.GetTypeInfo().GetDeclaredMethod(name);
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
index 4b35af6..2abdae0 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
@@ -9,7 +8,7 @@ using System.Reactive.Subjects;
using System.Threading;
#if !NO_REMOTING
-using System.Runtime.Remoting.Lifetime;
+
#endif
#if !NO_TPL
@@ -386,7 +385,7 @@ namespace System.Reactive.Linq
#endregion
#region * Conversions *
-
+
IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer);
IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler);
IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source);
@@ -503,7 +502,7 @@ namespace System.Reactive.Linq
IObservable<TEventArgs> FromEvent<TEventArgs>(Action<Action<TEventArgs>> addHandler, Action<Action<TEventArgs>> removeHandler, IScheduler scheduler);
IObservable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler);
IObservable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler, IScheduler scheduler);
-
+
#endregion
#region * Imperative *
@@ -683,10 +682,18 @@ namespace System.Reactive.Linq
IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer);
IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector);
IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer);
+ IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity);
+ IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer);
+ IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity);
+ IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer);
IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector);
- IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector,Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
+ IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector);
+ IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer);
+ IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity);
+ IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer);
+ IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity);
IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector);
IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector);
IObservable<TResult> OfType<TResult>(IObservable<object> source);
@@ -698,7 +705,7 @@ namespace System.Reactive.Linq
IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector);
IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted);
- IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted);
+ IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector);
IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector);
@@ -715,9 +722,13 @@ namespace System.Reactive.Linq
#if !NO_TPL
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, Task<TResult>> selector);
+ IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector);
+ IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector);
IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector);
+ IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector);
IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector);
+ IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector);
#endif
#endregion
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Blocking.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Blocking.cs
index 1575b3d..db2a1d1 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Blocking.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Blocking.cs
@@ -76,7 +76,7 @@ namespace System.Reactive.Linq
#endregion
- #region First
+ #region + First +
/// <summary>
/// Returns the first element of an observable sequence.
@@ -123,7 +123,7 @@ namespace System.Reactive.Linq
#endregion
- #region FirstOrDefault
+ #region + FirstOrDefault +
/// <summary>
/// Returns the first element of an observable sequence, or a default value if no such element exists.
@@ -233,7 +233,7 @@ namespace System.Reactive.Linq
#endregion
- #region Last
+ #region + Last +
/// <summary>
/// Returns the last element of an observable sequence.
@@ -280,7 +280,7 @@ namespace System.Reactive.Linq
#endregion
- #region LastOrDefault
+ #region + LastOrDefault +
/// <summary>
/// Returns the last element of an observable sequence, or a default value if no such element exists.
@@ -385,7 +385,7 @@ namespace System.Reactive.Linq
#endregion
- #region Single
+ #region + Single +
/// <summary>
/// Returns the only element of an observable sequence, and throws an exception if there is not exactly one element in the observable sequence.
@@ -432,7 +432,7 @@ namespace System.Reactive.Linq
#endregion
- #region SingleOrDefault
+ #region + SingleOrDefault +
/// <summary>
/// Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method throws an exception if there is more than one element in the observable sequence.
@@ -479,7 +479,7 @@ namespace System.Reactive.Linq
#endregion
- #region Wait
+ #region + Wait +
/// <summary>
/// Waits for the observable sequence to complete and returns the last element of the sequence.
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs
index e14df19..7f29bfd 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Concurrency.cs
@@ -5,7 +5,7 @@ using System.Threading;
namespace System.Reactive.Linq
{
- public static partial class Observable
+ public static partial class Observable
{
#region + ObserveOn +
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Creation.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Creation.cs
index 3de4a43..9b39888 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Creation.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Creation.cs
@@ -12,7 +12,7 @@ namespace System.Reactive.Linq
{
public static partial class Observable
{
- #region - Create -
+ #region + Create +
/// <summary>
/// Creates an observable sequence from a specified Subscribe method implementation.
@@ -56,7 +56,7 @@ namespace System.Reactive.Linq
#endregion
- #region - CreateAsync -
+ #region + CreateAsync +
#if !NO_TPL
/// <summary>
@@ -591,7 +591,7 @@ namespace System.Reactive.Linq
#endregion
- #region - UsingAsync -
+ #region + UsingAsync +
#if !NO_TPL
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Imperative.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Imperative.cs
index 0c8089e..2124e62 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Imperative.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Imperative.cs
@@ -12,7 +12,7 @@ namespace System.Reactive.Linq
{
public static partial class Observable
{
- #region ForEachAsync
+ #region + ForEachAsync +
#if !NO_TPL
/// <summary>
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Multiple.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Multiple.cs
index 9cb62de..db0d61b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Multiple.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Multiple.cs
@@ -67,7 +67,7 @@ namespace System.Reactive.Linq
#endregion
- #region Buffer
+ #region + Buffer +
/// <summary>
/// Projects each element of an observable sequence into consecutive non-overlapping buffers.
@@ -1403,8 +1403,10 @@ namespace System.Reactive.Linq
#region + Switch +
/// <summary>
- /// Switches between the inner observable sequences such that the resulting sequence always produces elements from the most recently received inner observable sequence.
- /// Each time a new inner observable sequence is received, the previous inner observable sequence is unsubscribed from.
+ /// Transforms an observable sequence of observable sequences into an observable sequence
+ /// producing values only from the most recent observable sequence.
+ /// Each time a new inner observable sequence is received, unsubscribe from the
+ /// previous inner observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
/// <param name="sources">Observable sequence of inner observable sequences.</param>
@@ -1421,7 +1423,8 @@ namespace System.Reactive.Linq
#if !NO_TPL
/// <summary>
- /// Switches between the tasks such that the resulting sequence always produces results from the most recently received task.
+ /// Transforms an observable sequence of tasks into an observable sequence
+ /// producing values only from the most recent observable sequence.
/// Each time a new task is received, the previous task's result is ignored.
/// </summary>
/// <typeparam name="TSource">The type of the results produced by the source tasks.</typeparam>
@@ -1464,7 +1467,7 @@ namespace System.Reactive.Linq
#endregion
- #region Window
+ #region + Window +
/// <summary>
/// Projects each element of an observable sequence into consecutive non-overlapping windows.
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs
index 5d8e3ef..8782702 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs
@@ -339,7 +339,7 @@ namespace System.Reactive.Linq
#endregion
- #region - Repeat -
+ #region + Repeat +
/// <summary>
/// Repeats the observable sequence indefinitely.
@@ -377,7 +377,7 @@ namespace System.Reactive.Linq
#endregion
- #region - Retry -
+ #region + Retry +
/// <summary>
/// Repeats the source observable sequence until it successfully terminates.
@@ -486,7 +486,7 @@ namespace System.Reactive.Linq
#endregion
- #region - StartWith -
+ #region + StartWith +
/// <summary>
/// Prepends a sequence of values to an observable sequence.
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
index 8eb9289..803ba26 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
@@ -241,6 +241,112 @@ namespace System.Reactive.Linq
return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
}
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+
+ return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity);
+ }
+
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <param name="comparer">An equality comparer to compare keys with.</param>
+ /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+ if (comparer == null)
+ throw new ArgumentNullException("comparer");
+
+ return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity, comparer);
+ }
+
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity and selects the resulting elements by using a specified function.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (elementSelector == null)
+ throw new ArgumentNullException("elementSelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+
+ return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity);
+ }
+
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <param name="comparer">An equality comparer to compare keys with.</param>
+ /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (elementSelector == null)
+ throw new ArgumentNullException("elementSelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+ if (comparer == null)
+ throw new ArgumentNullException("comparer");
+
+ return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
+ }
+
#endregion
#region + GroupByUntil +
@@ -329,7 +435,7 @@ namespace System.Reactive.Linq
/// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
- public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector,Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
if (source == null)
throw new ArgumentNullException("source");
@@ -371,6 +477,148 @@ namespace System.Reactive.Linq
return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector);
}
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+ /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+ /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+ /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <param name="comparer">An equality comparer to compare keys with.</param>
+ /// <returns>
+ /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+ /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
+ /// </returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (elementSelector == null)
+ throw new ArgumentNullException("elementSelector");
+ if (durationSelector == null)
+ throw new ArgumentNullException("durationSelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+ if (comparer == null)
+ throw new ArgumentNullException("comparer");
+
+ return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
+ }
+
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and selects the resulting elements by using a specified function.
+ /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+ /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+ /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <returns>
+ /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+ /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+ /// </returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (elementSelector == null)
+ throw new ArgumentNullException("elementSelector");
+ if (durationSelector == null)
+ throw new ArgumentNullException("durationSelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+
+ return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity);
+ }
+
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+ /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+ /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <param name="comparer">An equality comparer to compare keys with.</param>
+ /// <returns>
+ /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+ /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+ /// </returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (durationSelector == null)
+ throw new ArgumentNullException("durationSelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+ if (comparer == null)
+ throw new ArgumentNullException("comparer");
+
+ return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity, comparer);
+ }
+
+ /// <summary>
+ /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+ /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+ /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+ /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+ /// <param name="source">An observable sequence whose elements to group.</param>
+ /// <param name="keySelector">A function to extract the key for each element.</param>
+ /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+ /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+ /// <returns>
+ /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+ /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+ /// </returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
+ /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+ public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (keySelector == null)
+ throw new ArgumentNullException("keySelector");
+ if (durationSelector == null)
+ throw new ArgumentNullException("durationSelector");
+ if (capacity < 0)
+ throw new ArgumentOutOfRangeException("capacity");
+
+ return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity);
+ }
+
#endregion
#region + GroupJoin +
@@ -545,12 +793,12 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+ /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
/// <param name="source">An observable sequence of elements to project.</param>
- /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
/// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
@@ -585,6 +833,26 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
+ /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
+ public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (selector == null)
+ throw new ArgumentNullException("selector");
+
+ return s_impl.SelectMany<TSource, TResult>(source, selector);
+ }
+
+ /// <summary>
/// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -603,6 +871,26 @@ namespace System.Reactive.Linq
return s_impl.SelectMany<TSource, TResult>(source, selector);
}
+
+ /// <summary>
+ /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
+ /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
+ public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (selector == null)
+ throw new ArgumentNullException("selector");
+
+ return s_impl.SelectMany<TSource, TResult>(source, selector);
+ }
#endif
/// <summary>
@@ -629,14 +917,14 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+ /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
/// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
/// <param name="source">An observable sequence of elements to project.</param>
- /// <param name="collectionSelector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
- /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+ /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
/// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
@@ -677,6 +965,30 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
+ /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+ public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (taskSelector == null)
+ throw new ArgumentNullException("taskSelector");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
+ }
+
+ /// <summary>
/// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -699,6 +1011,30 @@ namespace System.Reactive.Linq
return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
}
+
+ /// <summary>
+ /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
+ /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+ public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (taskSelector == null)
+ throw new ArgumentNullException("taskSelector");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
+ }
#endif
/// <summary>
@@ -727,17 +1063,17 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+ /// Projects each notification of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
/// <param name="source">An observable sequence of notifications to project.</param>
- /// <param name="onNext">A transform function to apply to each element; the second parameter represents the index of the source element.</param>
- /// <param name="onError">A transform function to apply when an error occurs in the source sequence; the second parameter represents the index of the source element.</param>
- /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached; the second parameter represents the number of elements observed.</param>
+ /// <param name="onNext">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="onError">A transform function to apply when an error occurs in the source sequence.</param>
+ /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached.</param>
/// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted)
+ public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
{
if (source == null)
throw new ArgumentNullException("source");
@@ -772,13 +1108,12 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
- /// The index of each source element is used in the projected form of that element.
+ /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
/// <param name="source">An observable sequence of elements to project.</param>
- /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
/// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
/// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
@@ -817,14 +1152,14 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+ /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
/// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
/// <param name="source">An observable sequence of elements to project.</param>
/// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
- /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
/// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
/// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
index 3fc6962..46e66c6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class AddRef<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs
index d1b6ef8..6ae63c0 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Aggregate<TSource, TAccumulate, TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs
index 09c825d..49a0933 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class All<TSource> : Producer<bool>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
index 7272cee..ea110c5 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Amb<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs
index 1414bdd..c8eb37a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Any<TSource> : Producer<bool>
{
@@ -25,7 +25,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new AnyImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -65,11 +65,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<bool>, IObserver<TSource>
+ class AnyImpl : Sink<bool>, IObserver<TSource>
{
private readonly Any<TSource> _parent;
- public π(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel)
+ public AnyImpl(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs
index 09d5f14..c4e9d2f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class AsObservable<TSource> : Producer<TSource>, IEvaluatableObservable<TSource>
{
@@ -14,7 +14,7 @@ namespace System.Reactive.Linq.Observαble
_source = source;
}
- public IObservable<TSource> Ω()
+ public IObservable<TSource> Omega()
{
return this;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs
index fb2d6b7..897da02 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class AverageDouble : Producer<double>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
index a82b550..ce6ced8 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
@@ -8,7 +8,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Buffer<TSource> : Producer<IList<TSource>>
{
@@ -53,7 +53,7 @@ namespace System.Reactive.Linq.Observαble
}
else if (_count > 0)
{
- var sink = new μ(this, observer, cancel);
+ var sink = new Impl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -61,13 +61,13 @@ namespace System.Reactive.Linq.Observαble
{
if (_timeSpan == _timeShift)
{
- var sink = new π(this, observer, cancel);
+ var sink = new BufferTimeShift(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new BufferImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -143,11 +143,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<IList<TSource>>, IObserver<TSource>
+ class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource> _parent;
- public τ(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -283,11 +283,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<IList<TSource>>, IObserver<TSource>
+ class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource> _parent;
- public π(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -346,11 +346,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class μ : Sink<IList<TSource>>, IObserver<TSource>
+ class Impl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource> _parent;
- public μ(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -487,7 +487,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new β(this, observer, cancel);
+ var sink = new Beta(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -544,7 +544,7 @@ namespace System.Reactive.Linq.Observαble
var closingSubscription = new SingleAssignmentDisposable();
_m.Disposable = closingSubscription;
- closingSubscription.Disposable = bufferClose.SubscribeSafe(new ω(this, closingSubscription));
+ closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));
}
private void CloseBuffer(IDisposable closingSubscription)
@@ -561,12 +561,12 @@ namespace System.Reactive.Linq.Observαble
_bufferGate.Wait(CreateBufferClose);
}
- class ω : IObserver<TBufferClosing>
+ class Omega : IObserver<TBufferClosing>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ω(_ parent, IDisposable self)
+ public Omega(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -617,11 +617,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class β : Sink<IList<TSource>>, IObserver<TSource>
+ class Beta : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource, TBufferClosing> _parent;
- public β(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -641,16 +641,16 @@ namespace System.Reactive.Linq.Observαble
_refCountDisposable = new RefCountDisposable(d);
d.Add(_parent._source.SubscribeSafe(this));
- d.Add(_parent._bufferBoundaries.SubscribeSafe(new ω(this)));
+ d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));
return _refCountDisposable;
}
- class ω : IObserver<TBufferClosing>
+ class Omega : IObserver<TBufferClosing>
{
- private readonly β _parent;
+ private readonly Beta _parent;
- public ω(β parent)
+ public Omega(Beta parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs
index b104050..b3ba37c 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Case<TValue, TResult> : Producer<TResult>, IEvaluatableObservable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs
index 48820b3..b123357 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs
@@ -3,9 +3,9 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
- class Cast<TSource, TResult> : Producer<TResult> /* Could optimize further by deriving from Select<TResult> and providing Ω<TResult2>. We're not doing this (yet) for debuggability. */
+ class Cast<TSource, TResult> : Producer<TResult> /* Could optimize further by deriving from Select<TResult> and providing Omega<TResult2>. We're not doing this (yet) for debuggability. */
{
private readonly IObservable<TSource> _source;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs
index 71e0037..06bf911 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Catch<TSource> : Producer<TSource>
{
@@ -136,7 +136,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_subscription.Disposable = d;
- d.Disposable = result.SubscribeSafe(new ε(this));
+ d.Disposable = result.SubscribeSafe(new Impl(this));
}
else
{
@@ -151,11 +151,11 @@ namespace System.Reactive.Linq.Observαble
base.Dispose();
}
- class ε : IObserver<TSource>
+ class Impl : IObserver<TSource>
{
private readonly _ _parent;
- public ε(_ parent)
+ public Impl(_ parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs
index 2cfd240..8f0bf24 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs
@@ -7,7 +7,7 @@ using System.Reactive;
using System.Reactive.Threading;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Collect<TSource, TResult> : PushToPullAdapter<TSource, TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
index 2188307..9748e24 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
@@ -7,7 +7,7 @@ using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
#region Binary
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs
index 5a9b2e4..cd8c78a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Concat<TSource> : Producer<TSource>, IConcatenatable<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs
index b26e2d4..69e30dc 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Contains<TSource> : Producer<bool>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs
index 7432816..0c2f389 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Count<TSource> : Producer<int>
{
@@ -31,7 +31,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new π(this, observer, cancel);
+ var sink = new CountImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -77,12 +77,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<int>, IObserver<TSource>
+ class CountImpl : Sink<int>, IObserver<TSource>
{
private readonly Count<TSource> _parent;
private int _count;
- public π(Count<TSource> parent, IObserver<int> observer, IDisposable cancel)
+ public CountImpl(Count<TSource> parent, IObserver<int> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs
index ed4bb89..551e536 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DefaultIfEmpty<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs
index ba3dcc4..aa49c12 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Defer<TValue> : Producer<TValue>, IEvaluatableObservable<TValue>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
index 3e2cbba..5ea3d25 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
@@ -10,7 +10,7 @@ using System.Threading;
using System.Reactive.Threading;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Delay<TSource> : Producer<TSource>
{
@@ -37,7 +37,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_scheduler.AsLongRunning() != null)
{
- var sink = new λ(this, observer, cancel);
+ var sink = new LongRunningImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -227,9 +227,9 @@ namespace System.Reactive.Linq.Observαble
// implementation, the loop below kept running while there was work for immediate dispatch,
// potentially causing a long running work item on the target scheduler. With the addition
// of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
- // interface and perform different processing (see λ). To reduce the code churn in the old
- // loop code here, we set the shouldYield flag to true after the first dispatch iteration,
- // in order to break from the loop and enter the recursive scheduling path.
+ // interface and perform different processing (see LongRunningImpl). To reduce the code
+ // churn in the old loop code here, we set the shouldYield flag to true after the first
+ // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
//
var shouldYield = false;
@@ -322,11 +322,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class λ : Sink<TSource>, IObserver<TSource>
+ class LongRunningImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Delay<TSource> _parent;
- public λ(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -629,7 +629,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new σ(this));
+ _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
}
return new CompositeDisposable(_subscription, _delays);
@@ -660,7 +660,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_delays.Add(d);
- d.Disposable = delay.SubscribeSafe(new δ(this, value, d));
+ d.Disposable = delay.SubscribeSafe(new Delta(this, value, d));
}
public void OnError(Exception error)
@@ -692,11 +692,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class σ : IObserver<TDelay>
+ class SubscriptionDelay : IObserver<TDelay>
{
private readonly _ _parent;
- public σ(_ parent)
+ public SubscriptionDelay(_ parent)
{
_parent = parent;
}
@@ -718,13 +718,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TDelay>
+ class Delta : IObserver<TDelay>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
- public δ(_ parent, TSource value, IDisposable self)
+ public Delta(_ parent, TSource value, IDisposable self)
{
_parent = parent;
_value = value;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs
index c6a05cc..f5a742a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DelaySubscription<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs
index 112b156..590ef3b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Dematerialize<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs
index af20277..bd4b0ca 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Distinct<TSource, TKey> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs
index 10c7923..1ba8f5f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DistinctUntilChanged<TSource, TKey> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs
index 2f73cfa..a4b3747 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Do<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs
index 3ef40eb..5dc0bc6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DoWhile<TSource> : Producer<TSource>, IConcatenatable<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs
index b2b1212..0798d3b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ElementAt<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs
index 42c7241..30a2b65 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Empty<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs
index ca9a5d8..a70ac94 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Finally<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs
index 8d8bba7..62da120 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class FirstAsync<TSource> : Producer<TSource>
{
@@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new FirstAsyncImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -73,11 +73,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<TSource>, IObserver<TSource>
+ class FirstAsyncImpl : Sink<TSource>, IObserver<TSource>
{
private readonly FirstAsync<TSource> _parent;
- public π(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public FirstAsyncImpl(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs
index 85b9e03..948f319 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class For<TSource, TResult> : Producer<TResult>, IConcatenatable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs
index 38e112d..9b5c024 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs
@@ -4,7 +4,7 @@
using System;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ForEach<TSource>
{
@@ -62,7 +62,7 @@ namespace System.Reactive.Linq.Observαble
}
}
- public class τ : IObserver<TSource>
+ public class ForEachImpl : IObserver<TSource>
{
private readonly Action<TSource, int> _onNext;
private readonly Action _done;
@@ -71,7 +71,7 @@ namespace System.Reactive.Linq.Observαble
private Exception _exception;
private int _stopped;
- public τ(Action<TSource, int> onNext, Action done)
+ public ForEachImpl(Action<TSource, int> onNext, Action done)
{
_onNext = onNext;
_done = done;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
index 82c4348..90aa779 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
@@ -156,7 +156,7 @@ using System.Reactive.Subjects;
// subject (which is opaque to the event producer). Not to mention that the subject would always be
// rooted by the target event (even when the FromEvent[Pattern] observable wrapper is unreachable).
//
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class FromEvent<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, TEventArgs>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs
index 76da050..ca98290 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs
@@ -12,23 +12,23 @@ using System.Threading;
//
// See FromEvent.cs for more information.
//
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class FromEventPattern
{
- public class τ<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TEventArgs>>
+ public class Impl<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TEventArgs>>
#if !NO_EVENTARGS_CONSTRAINT
where TEventArgs : EventArgs
#endif
{
private readonly Func<EventHandler<TEventArgs>, TDelegate> _conversion;
- public τ(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
+ public Impl(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
}
- public τ(Func<EventHandler<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
+ public Impl(Func<EventHandler<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
_conversion = conversion;
@@ -52,12 +52,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- public class τ<TDelegate, TSender, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TSender, TEventArgs>>
+ public class Impl<TDelegate, TSender, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TSender, TEventArgs>>
#if !NO_EVENTARGS_CONSTRAINT
where TEventArgs : EventArgs
#endif
{
- public τ(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
+ public Impl(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
}
@@ -69,7 +69,7 @@ namespace System.Reactive.Linq.Observαble
}
}
- public class ρ<TSender, TEventArgs, TResult> : EventProducer<Delegate, TResult>
+ public class Handler<TSender, TEventArgs, TResult> : EventProducer<Delegate, TResult>
{
private readonly object _target;
private readonly Type _delegateType;
@@ -80,7 +80,7 @@ namespace System.Reactive.Linq.Observαble
private readonly bool _isWinRT;
#endif
- public ρ(object target, Type delegateType, MethodInfo addMethod, MethodInfo removeMethod, Func<TSender, TEventArgs, TResult> getResult, bool isWinRT, IScheduler scheduler)
+ public Handler(object target, Type delegateType, MethodInfo addMethod, MethodInfo removeMethod, Func<TSender, TEventArgs, TResult> getResult, bool isWinRT, IScheduler scheduler)
: base(scheduler)
{
#if HAS_WINRT
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs
index 89ef862..7e46655 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs
@@ -7,7 +7,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Generate<TState, TResult> : Producer<TResult>
{
@@ -52,13 +52,13 @@ namespace System.Reactive.Linq.Observαble
{
if (_timeSelectorA != null)
{
- var sink = new α(this, observer, cancel);
+ var sink = new SelectorA(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else if (_timeSelectorR != null)
{
- var sink = new δ(this, observer, cancel);
+ var sink = new Delta(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -70,11 +70,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class α : Sink<TResult>
+ class SelectorA : Sink<TResult>
{
private readonly Generate<TState, TResult> _parent;
- public α(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -130,11 +130,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : Sink<TResult>
+ class Delta : Sink<TResult>
{
private readonly Generate<TState, TResult> _parent;
- public δ(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
index 05bf20b..a25efa8 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
@@ -8,7 +8,7 @@ using System.Diagnostics;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs
index 1e64100..4f91030 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs
@@ -7,20 +7,22 @@ using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupBy<TSource, TKey, TElement> : Producer<IGroupedObservable<TKey, TElement>>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, TKey> _keySelector;
private readonly Func<TSource, TElement> _elementSelector;
+ private readonly int? _capacity;
private readonly IEqualityComparer<TKey> _comparer;
- public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+ public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
_source = source;
_keySelector = keySelector;
_elementSelector = elementSelector;
+ _capacity = capacity;
_comparer = comparer;
}
@@ -49,7 +51,15 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+
+ if (_parent._capacity.HasValue)
+ {
+ _map = new Dictionary<TKey, ISubject<TElement>>(_parent._capacity.Value, _parent._comparer);
+ }
+ else
+ {
+ _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+ }
}
public void OnNext(TSource value)
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
index 7d91f73..9baf146 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
@@ -1,13 +1,13 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- #if !NO_PERF
+#if !NO_PERF
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>>
{
@@ -15,14 +15,16 @@ namespace System.Reactive.Linq.Observαble
private readonly Func<TSource, TKey> _keySelector;
private readonly Func<TSource, TElement> _elementSelector;
private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
+ private readonly int? _capacity;
private readonly IEqualityComparer<TKey> _comparer;
- public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
_source = source;
_keySelector = keySelector;
_elementSelector = elementSelector;
_durationSelector = durationSelector;
+ _capacity = capacity;
_comparer = comparer;
}
@@ -52,7 +54,7 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _map = new Map<TKey, ISubject<TElement>>(_parent._comparer);
+ _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
_nullGate = new object();
}
@@ -127,7 +129,7 @@ namespace System.Reactive.Linq.Observαble
var md = new SingleAssignmentDisposable();
_parent._groupDisposable.Add(md);
- md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, key, writer, md));
}
var element = default(TElement);
@@ -163,14 +165,14 @@ namespace System.Reactive.Linq.Observαble
writer.OnNext(element);
}
- class δ : IObserver<TDuration>
+ class Delta : IObserver<TDuration>
{
private readonly _ _parent;
private readonly TKey _key;
private readonly ISubject<TElement> _writer;
private readonly IDisposable _self;
- public δ(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
+ public Delta(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
{
_parent = parent;
_key = key;
@@ -272,11 +274,38 @@ namespace System.Reactive.Linq.Observαble
#if !NO_CDS
class Map<TKey, TValue>
{
+#if !NO_CDS_COLLECTIONS
+ // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs
+
+ // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
+ // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
+ // and blocking, but also the more expensive operations that require all locks become (e.g. table
+ // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
+ // compromise.
+ private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
+
+ private static int DefaultConcurrencyLevel
+ {
+ get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
+ }
+#endif
+
private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
- public Map(IEqualityComparer<TKey> comparer)
+ public Map(int? capacity, IEqualityComparer<TKey> comparer)
{
- _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+ if (capacity.HasValue)
+ {
+#if NO_CDS_COLLECTIONS
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer);
+#else
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
+#endif
+ }
+ else
+ {
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+ }
}
public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
@@ -327,9 +356,16 @@ namespace System.Reactive.Linq.Observαble
{
private readonly Dictionary<TKey, TValue> _map;
- public Map(IEqualityComparer<TKey> comparer)
+ public Map(int? capacity, IEqualityComparer<TKey> comparer)
{
- _map = new Dictionary<TKey, TValue>(comparer);
+ if (capacity.HasValue)
+ {
+ _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
+ }
+ else
+ {
+ _map = new Dictionary<TKey, TValue>(comparer);
+ }
}
public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs
index 66e49be..19c8c45 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
{
@@ -68,18 +68,18 @@ namespace System.Reactive.Linq.Observαble
_rightID = 0;
_rightMap = new Dictionary<int, TRight>();
- leftSubscription.Disposable = _parent._left.SubscribeSafe(new λ(this, leftSubscription));
- rightSubscription.Disposable = _parent._right.SubscribeSafe(new ρ(this, rightSubscription));
+ leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
+ rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
return _refCount;
}
- class λ : IObserver<TLeft>
+ class LeftObserver : IObserver<TLeft>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public λ(_ parent, IDisposable self)
+ public LeftObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -122,7 +122,7 @@ namespace System.Reactive.Linq.Observαble
}
// BREAKING CHANGE v2 > v1.x - The duration sequence is subscribed to before the result sequence is evaluated.
- md.Disposable = duration.SubscribeSafe(new δ(this, id, s, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, s, md));
var result = default(TResult);
try
@@ -146,14 +146,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TLeftDuration>
+ class Delta : IObserver<TLeftDuration>
{
- private readonly λ _parent;
+ private readonly LeftObserver _parent;
private readonly int _id;
private readonly IObserver<TRight> _group;
private readonly IDisposable _self;
- public δ(λ parent, int id, IObserver<TRight> group, IDisposable self)
+ public Delta(LeftObserver parent, int id, IObserver<TRight> group, IDisposable self)
{
_parent = parent;
_id = id;
@@ -201,12 +201,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ρ : IObserver<TRight>
+ class RightObserver : IObserver<TRight>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ρ(_ parent, IDisposable self)
+ public RightObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -242,7 +242,7 @@ namespace System.Reactive.Linq.Observαble
OnError(exception);
return;
}
- md.Disposable = duration.SubscribeSafe(new δ(this, id, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
lock (_parent._gate)
{
@@ -251,13 +251,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TRightDuration>
+ class Delta : IObserver<TRightDuration>
{
- private readonly ρ _parent;
+ private readonly RightObserver _parent;
private readonly int _id;
private readonly IDisposable _self;
- public δ(ρ parent, int id, IDisposable self)
+ public Delta(RightObserver parent, int id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs
index 8b1804a..8740c32 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class If<TResult> : Producer<TResult>, IEvaluatableObservable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs
index 65441bd..1f932ac 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class IgnoreElements<TSource> : Producer<TSource>
{
@@ -14,7 +14,7 @@ namespace System.Reactive.Linq.Observαble
_source = source;
}
- public IObservable<TSource> Ω()
+ public IObservable<TSource> Omega()
{
return this;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs
index 674c6f8..722750e 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class IsEmpty<TSource> : Producer<bool>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs
index 13e6ad2..4d22d34 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
{
@@ -69,18 +69,18 @@ namespace System.Reactive.Linq.Observαble
_rightID = 0;
_rightMap = new Dictionary<int, TRight>();
- leftSubscription.Disposable = _parent._left.SubscribeSafe(new λ(this, leftSubscription));
- rightSubscription.Disposable = _parent._right.SubscribeSafe(new ρ(this, rightSubscription));
+ leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
+ rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
return _group;
}
- class λ : IObserver<TLeft>
+ class LeftObserver : IObserver<TLeft>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public λ(_ parent, IDisposable self)
+ public LeftObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -124,7 +124,7 @@ namespace System.Reactive.Linq.Observαble
return;
}
- md.Disposable = duration.SubscribeSafe(new δ(this, id, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
lock (_parent._gate)
{
@@ -147,13 +147,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TLeftDuration>
+ class Delta : IObserver<TLeftDuration>
{
- private readonly λ _parent;
+ private readonly LeftObserver _parent;
private readonly int _id;
private readonly IDisposable _self;
- public δ(λ parent, int id, IDisposable self)
+ public Delta(LeftObserver parent, int id, IDisposable self)
{
_parent = parent;
_id = id;
@@ -203,12 +203,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ρ : IObserver<TRight>
+ class RightObserver : IObserver<TRight>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ρ(_ parent, IDisposable self)
+ public RightObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -252,7 +252,7 @@ namespace System.Reactive.Linq.Observαble
return;
}
- md.Disposable = duration.SubscribeSafe(new δ(this, id, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
lock (_parent._gate)
{
@@ -275,13 +275,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TRightDuration>
+ class Delta : IObserver<TRightDuration>
{
- private readonly ρ _parent;
+ private readonly RightObserver _parent;
private readonly int _id;
private readonly IDisposable _self;
- public δ(ρ parent, int id, IDisposable self)
+ public Delta(RightObserver parent, int id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs
index 1ecd930..0a77d32 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class LastAsync<TSource> : Producer<TSource>
{
@@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new LastAsyncImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -77,13 +77,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<TSource>, IObserver<TSource>
+ class LastAsyncImpl : Sink<TSource>, IObserver<TSource>
{
private readonly LastAsync<TSource> _parent;
private TSource _value;
private bool _seenValue;
- public π(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public LastAsyncImpl(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs
index 2699797..bc92e6d 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Latest<TSource> : PushToPullAdapter<TSource, TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs
index 0108e18..63c9f24 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class LongCount<TSource> : Producer<long>
{
@@ -31,7 +31,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new π(this, observer, cancel);
+ var sink = new LongCountImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -77,12 +77,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<long>, IObserver<TSource>
+ class LongCountImpl : Sink<long>, IObserver<TSource>
{
private readonly LongCount<TSource> _parent;
private long _count;
- public π(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel)
+ public LongCountImpl(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs
index 102267d..fa8e1ee 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Materialize<TSource> : Producer<Notification<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs
index da339da..8e86e75 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Max<TSource> : Producer<TSource>
{
@@ -28,19 +28,19 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new δ(this, observer, cancel);
+ var sink = new Delta(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
}
- class δ : Sink<TSource>, IObserver<TSource>
+ class Delta : Sink<TSource>, IObserver<TSource>
{
private readonly Max<TSource> _parent;
private bool _hasValue;
private TSource _lastValue;
- public δ(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs
index 52a9a42..4bf97f5 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class MaxBy<TSource, TKey> : Producer<IList<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs
index a27c3c3..dd710cb 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs
@@ -10,7 +10,7 @@ using System.Threading;
using System.Threading.Tasks;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Merge<TSource> : Producer<TSource>
{
@@ -41,14 +41,14 @@ namespace System.Reactive.Linq.Observαble
{
if (_maxConcurrent > 0)
{
- var sink = new μ(this, observer, cancel);
+ var sink = new MergeConcurrent(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#if !NO_TPL
else if (_sourcesT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new MergeImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -93,7 +93,7 @@ namespace System.Reactive.Linq.Observαble
{
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = value.SubscribeSafe(new ι(this, innerSubscription));
+ innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription));
}
public void OnError(Exception error)
@@ -129,12 +129,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TSource>
+ class Iter : IObserver<TSource>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ι(_ parent, IDisposable self)
+ public Iter(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -177,11 +177,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class μ : Sink<TSource>, IObserver<IObservable<TSource>>
+ class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>>
{
private readonly Merge<TSource> _parent;
- public μ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -253,15 +253,15 @@ namespace System.Reactive.Linq.Observαble
{
var subscription = new SingleAssignmentDisposable();
_group.Add(subscription);
- subscription.Disposable = innerSource.SubscribeSafe(new ι(this, subscription));
+ subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription));
}
- class ι : IObserver<TSource>
+ class Iter : IObserver<TSource>
{
- private readonly μ _parent;
+ private readonly MergeConcurrent _parent;
private readonly IDisposable _self;
- public ι(μ parent, IDisposable self)
+ public Iter(MergeConcurrent parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -308,11 +308,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TSource>, IObserver<Task<TSource>>
+ class MergeImpl : Sink<TSource>, IObserver<Task<TSource>>
{
private readonly Merge<TSource> _parent;
- public τ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs
index f7b6197..fc08002 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Min<TSource> : Producer<TSource>
{
@@ -28,19 +28,19 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new δ(this, observer, cancel);
+ var sink = new Delta(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
}
- class δ : Sink<TSource>, IObserver<TSource>
+ class Delta : Sink<TSource>, IObserver<TSource>
{
private readonly Min<TSource> _parent;
private bool _hasValue;
private TSource _lastValue;
- public δ(Min<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public Delta(Min<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs
index a32bf92..d9c93f6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class MinBy<TSource, TKey> : Producer<IList<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs
index 937d811..7311a33 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class MostRecent<TSource> : PushToPullAdapter<TSource, TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
index 4a77300..ee163a5 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Multicast<TSource, TIntermediate, TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs
index a9adc52..eae632f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Never<TResult> : IObservable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
index 2d4ec45..4b20276 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Next<TSource> : PushToPullAdapter<TSource, TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs
index d5d7428..eaf7613 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ObserveOn<TSource> : Producer<TSource>
{
@@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble
#if !NO_SYNCCTX
if (_context != null)
{
- var sink = new ς(this, observer, cancel);
+ var sink = new ObserveOnImpl(this, observer, cancel);
setSink(sink);
return _source.Subscribe(sink);
}
@@ -47,11 +47,11 @@ namespace System.Reactive.Linq.Observαble
}
#if !NO_SYNCCTX
- class ς : Sink<TSource>, IObserver<TSource>
+ class ObserveOnImpl : Sink<TSource>, IObserver<TSource>
{
private readonly ObserveOn<TSource> _parent;
- public ς(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public ObserveOnImpl(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs
index 07e4515..27a0ca3 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class OfType<TSource, TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs
index 26dfd34..077a23f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class OnErrorResumeNext<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs
index 916d12b..6351365 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs
@@ -5,7 +5,7 @@ using System.Collections;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
abstract class PushToPullAdapter<TSource, TResult> : IEnumerable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs
index c58d94c..414ae72 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Range : Producer<int>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs
index 1762807..4efb917 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class RefCount<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs
index fab0607..4f418cb 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Repeat<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs
index cd9ad02..db98f88 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Return<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
index 0d46c62..6d0e68b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Sample<TSource, TSample> : Producer<TSource>
{
@@ -51,7 +51,7 @@ namespace System.Reactive.Linq.Observαble
_sourceSubscription = sourceSubscription;
sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
- var samplerSubscription = _parent._sampler.SubscribeSafe(new σ(this));
+ var samplerSubscription = _parent._sampler.SubscribeSafe(new SampleImpl(this));
return new CompositeDisposable(_sourceSubscription, samplerSubscription);
}
@@ -83,11 +83,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class σ : IObserver<TSample>
+ class SampleImpl : IObserver<TSample>
{
private readonly _ _parent;
- public σ(_ parent)
+ public SampleImpl(_ parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs
index 6c8f2fe..421177f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Scan<TSource, TAccumulate> : Producer<TAccumulate>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs
index e853210..330fe3a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs
@@ -3,11 +3,11 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
abstract class Select<TResult> : Producer<TResult>
{
- public abstract IObservable<TResult2> Ω<TResult2>(Func<TResult, TResult2> selector);
+ public abstract IObservable<TResult2> Omega<TResult2>(Func<TResult, TResult2> selector);
}
class Select<TSource, TResult> : Select<TResult>
@@ -28,7 +28,7 @@ namespace System.Reactive.Linq.Observαble
_selectorI = selector;
}
- public override IObservable<TResult2> Ω<TResult2>(Func<TResult, TResult2> selector)
+ public override IObservable<TResult2> Omega<TResult2>(Func<TResult, TResult2> selector)
{
if (_selector != null)
return new Select<TSource, TResult2>(_source, x => selector(_selector(x)));
@@ -46,7 +46,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -92,12 +92,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectImpl : Sink<TResult>, IObserver<TSource>
{
private readonly Select<TSource, TResult> _parent;
private int _index;
- public τ(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectImpl(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
index 054f974..49d7cf9 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
@@ -1,9 +1,7 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
-using System;
using System.Collections.Generic;
-using System.Reactive;
using System.Reactive.Disposables;
#if !NO_TPL
@@ -11,17 +9,17 @@ using System.Threading;
using System.Threading.Tasks;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SelectMany<TSource, TCollection, TResult> : Producer<TResult>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
- private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex;
+ private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI;
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
- private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex;
+ private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
- private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex;
+ private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -33,8 +31,8 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
- _collectionSelectorWithIndex = collectionSelector;
- _resultSelectorWithIndex = resultSelector;
+ _collectionSelectorI = collectionSelector;
+ _resultSelectorI = resultSelector;
}
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
@@ -47,12 +45,14 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
- _collectionSelectorEWithIndex = collectionSelector;
- _resultSelectorWithIndex = resultSelector;
+ _collectionSelectorEI = collectionSelector;
+ _resultSelectorI = resultSelector;
}
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
+ private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI;
+ private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI;
public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -60,27 +60,52 @@ namespace System.Reactive.Linq.Observαble
_collectionSelectorT = collectionSelector;
_resultSelector = resultSelector;
}
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorTI = collectionSelector;
+ _resultSelectorTI = resultSelector;
+ }
#endif
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_collectionSelector != null || _collectionSelectorWithIndex != null)
+ if (_collectionSelector != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return sink.Run();
}
+ else if (_collectionSelectorI != null)
+ {
+ var sink = new IndexSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
#if !NO_TPL
else if (_collectionSelectorT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectManyImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
+ else if (_collectionSelectorTI != null)
+ {
+ var sink = new Sigma(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#endif
+ else if (_collectionSelectorE != null)
+ {
+ var sink = new NoSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return _source.SubscribeSafe(sink);
+ }
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new Omega(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -94,14 +119,12 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _indexInSource = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
- private int _indexInSource;
public IDisposable Run()
{
@@ -122,13 +145,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._collectionSelector != null)
- collection = _parent._collectionSelector(value);
- else
- {
- checked { _indexInSource++; }
- collection = _parent._collectionSelectorWithIndex(value, _indexInSource);
- }
+ collection = _parent._collectionSelector(value);
}
catch (Exception ex)
{
@@ -142,7 +159,7 @@ namespace System.Reactive.Linq.Observαble
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource));
+ innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription));
}
public void OnError(Exception error)
@@ -178,21 +195,17 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TCollection>
+ class Iter : IObserver<TCollection>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
- private int _indexInSource;
- private int _indexInIntermediate = -1;
- public ι(_ parent, TSource value, IDisposable self, int indexInSource)
+ public Iter(_ parent, TSource value, IDisposable self)
{
_parent = parent;
_value = value;
_self = self;
- _indexInSource = indexInSource;
- _indexInIntermediate = -1;
}
public void OnNext(TCollection value)
@@ -201,14 +214,164 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._parent._resultSelector != null)
- res = _parent._parent._resultSelector(_value, value);
- else
+ res = _parent._parent._resultSelector(_value, value);
+ }
+ catch (Exception ex)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(ex);
+ _parent.Dispose();
+ }
+ return;
+ }
+
+ lock (_parent._gate)
+ _parent._observer.OnNext(res);
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(error);
+ _parent.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _parent._group.Remove(_self);
+ if (_parent._isStopped && _parent._group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_parent._gate)
{
- checked { _indexInIntermediate++; }
- res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate);
+ _parent._observer.OnCompleted();
+ _parent.Dispose();
}
}
+ }
+ }
+ }
+
+ class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private bool _isStopped;
+ private CompositeDisposable _group;
+ private SingleAssignmentDisposable _sourceSubscription;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _isStopped = false;
+ _group = new CompositeDisposable();
+
+ _sourceSubscription = new SingleAssignmentDisposable();
+ _group.Add(_sourceSubscription);
+ _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
+
+ return _group;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+ var collection = default(IObservable<TCollection>);
+
+ try
+ {
+ collection = _parent._collectionSelectorI(value, index);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ var innerSubscription = new SingleAssignmentDisposable();
+ _group.Add(innerSubscription);
+ innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription));
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _isStopped = true;
+ if (_group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ else
+ {
+ _sourceSubscription.Dispose();
+ }
+ }
+
+ class Iter : IObserver<TCollection>
+ {
+ private readonly IndexSelectorImpl _parent;
+ private readonly TSource _value;
+ private readonly int _valueIndex;
+ private readonly IDisposable _self;
+
+ public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)
+ {
+ _parent = parent;
+ _value = value;
+ _valueIndex = index;
+ _self = self;
+ }
+
+ private int _index;
+
+ public void OnNext(TCollection value)
+ {
+ var res = default(TResult);
+
+ try
+ {
+ res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++));
+ }
catch (Exception ex)
{
lock (_parent._gate)
@@ -254,16 +417,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TSource>
+ class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
- private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection
- public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
- _indexInSource = -1;
}
public void OnNext(TSource value)
@@ -271,14 +432,93 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TCollection>);
try
{
- if (_parent._collectionSelectorE != null)
- xs = _parent._collectionSelectorE(value);
- else
+ xs = _parent._collectionSelectorE(value);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ var e = default(IEnumerator<TCollection>);
+ try
+ {
+ e = xs.GetEnumerator();
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ try
+ {
+ var hasNext = true;
+ while (hasNext)
{
- checked { _indexInSource++; }
- xs = _parent._collectionSelectorEWithIndex(value, _indexInSource);
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = _parent._resultSelector(value, e.Current);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ if (hasNext)
+ base._observer.OnNext(current);
}
}
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+
+ class Omega : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private int _index;
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+
+ var xs = default(IEnumerable<TCollection>);
+ try
+ {
+ xs = _parent._collectionSelectorEI(value, index);
+ }
catch (Exception exception)
{
base._observer.OnError(exception);
@@ -300,7 +540,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- int indexInIntermediate = -1;
+ var eIndex = 0;
var hasNext = true;
while (hasNext)
{
@@ -311,15 +551,7 @@ namespace System.Reactive.Linq.Observαble
{
hasNext = e.MoveNext();
if (hasNext)
- {
- if (_parent._resultSelector != null)
- current = _parent._resultSelector(value, e.Current);
- else
- {
- checked { indexInIntermediate++; }
- current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate);
- }
- }
+ current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++));
}
catch (Exception exception)
{
@@ -354,11 +586,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectManyImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
- public τ(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -487,6 +719,143 @@ namespace System.Reactive.Linq.Observαble
}
}
}
+
+ class Sigma : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private CancellationDisposable _cancel;
+ private volatile int _count;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _cancel = new CancellationDisposable();
+ _count = 1;
+
+ return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+ }
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+
+ var task = default(Task<TCollection>);
+ try
+ {
+ Interlocked.Increment(ref _count);
+ task = _parent._collectionSelectorTI(value, index, _cancel.Token);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ if (task.IsCompleted)
+ {
+ OnCompletedTask(value, index, task);
+ }
+ else
+ {
+ AttachContinuation(value, index, task);
+ }
+ }
+
+ private void AttachContinuation(TSource value, int index, Task<TCollection> task)
+ {
+ //
+ // Separate method to avoid closure in synchronous completion case.
+ //
+ task.ContinueWith(t => OnCompletedTask(value, index, t));
+ }
+
+ private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.RanToCompletion:
+ {
+ var res = default(TResult);
+ try
+ {
+ res = _parent._resultSelectorTI(value, index, task.Result);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ lock (_gate)
+ base._observer.OnNext(res);
+
+ OnCompleted();
+ }
+ break;
+ case TaskStatus.Faulted:
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(task.Exception.InnerException);
+ base.Dispose();
+ }
+ }
+ break;
+ case TaskStatus.Canceled:
+ {
+ if (!_cancel.IsDisposed)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(new TaskCanceledException(task));
+ base.Dispose();
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Decrement(ref _count) == 0)
+ {
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ }
+ }
#pragma warning restore 0420
#endif
}
@@ -495,13 +864,11 @@ namespace System.Reactive.Linq.Observαble
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TResult>> _selector;
+ private readonly Func<TSource, int, IObservable<TResult>> _selectorI;
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
- private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex;
- private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError;
- private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted;
private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
- private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex;
+ private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
@@ -509,26 +876,26 @@ namespace System.Reactive.Linq.Observαble
_selector = selector;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
_source = source;
- _selector = selector;
- _selectorOnError = selectorOnError;
- _selectorOnCompleted = selectorOnCompleted;
+ _selectorI = selector;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
{
_source = source;
- _selectorWithIndex = selector;
+ _selector = selector;
+ _selectorOnError = selectorOnError;
+ _selectorOnCompleted = selectorOnCompleted;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted)
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
{
_source = source;
- _selectorWithIndex = selector;
- _selectorWithIndexOnError = selectorOnError;
- _selectorWithIndexOnCompleted = selectorOnCompleted;
+ _selectorI = selector;
+ _selectorOnError = selectorOnError;
+ _selectorOnCompleted = selectorOnCompleted;
}
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
@@ -540,38 +907,63 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
{
_source = source;
- _selectorEWithIndex = selector;
+ _selectorEI = selector;
}
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
+ private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI;
public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selectorT = selector;
}
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+ _source = source;
+ _selectorTI = selector;
+ }
#endif
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_selector != null || _selectorWithIndex != null)
+ if (_selector != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return sink.Run();
}
+ else if (_selectorI != null)
+ {
+ var sink = new IndexSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
#if !NO_TPL
else if (_selectorT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectManyImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
+ else if (_selectorTI != null)
+ {
+ var sink = new Sigma(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#endif
+ else if (_selectorE != null)
+ {
+ var sink = new NoSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return _source.SubscribeSafe(sink);
+ }
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new Omega(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -585,14 +977,12 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _index = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
- private int _index;
public IDisposable Run()
{
@@ -613,13 +1003,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selector != null)
- inner = _parent._selector(value);
- else
- {
- checked { _index++; }
- inner = _parent._selectorWithIndex(value, _index);
- }
+ inner = _parent._selector(value);
}
catch (Exception ex)
{
@@ -642,13 +1026,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selectorOnError != null)
- inner = _parent._selectorOnError(error);
- else
- {
- checked { _index++; }
- inner = _parent._selectorWithIndexOnError(error, _index);
- }
+ inner = _parent._selectorOnError(error);
}
catch (Exception ex)
{
@@ -682,10 +1060,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selectorOnCompleted != null)
- inner = _parent._selectorOnCompleted();
- else
- inner = _parent._selectorWithIndexOnCompleted(_index);
+ inner = _parent._selectorOnCompleted();
}
catch (Exception ex)
{
@@ -731,15 +1106,15 @@ namespace System.Reactive.Linq.Observαble
{
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription));
+ innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
}
- class ι : IObserver<TResult>
+ class Iter : IObserver<TResult>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ι(_ parent, IDisposable self)
+ public Iter(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -782,16 +1157,203 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TSource>
+ class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
+
+ public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private bool _isStopped;
+ private CompositeDisposable _group;
+ private SingleAssignmentDisposable _sourceSubscription;
private int _index;
- public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _isStopped = false;
+ _group = new CompositeDisposable();
+
+ _sourceSubscription = new SingleAssignmentDisposable();
+ _group.Add(_sourceSubscription);
+ _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
+
+ return _group;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorI(value, checked(_index++));
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+ }
+
+ public void OnError(Exception error)
+ {
+ if (_parent._selectorOnError != null)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorOnError(error);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+
+ Final();
+ }
+ else
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (_parent._selectorOnCompleted != null)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorOnCompleted();
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+ }
+
+ Final();
+ }
+
+ private void Final()
+ {
+ _isStopped = true;
+ if (_group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ else
+ {
+ _sourceSubscription.Dispose();
+ }
+ }
+
+ private void SubscribeInner(IObservable<TResult> inner)
+ {
+ var innerSubscription = new SingleAssignmentDisposable();
+ _group.Add(innerSubscription);
+ innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
+ }
+
+ class Iter : IObserver<TResult>
+ {
+ private readonly IndexSelectorImpl _parent;
+ private readonly IDisposable _self;
+
+ public Iter(IndexSelectorImpl parent, IDisposable self)
+ {
+ _parent = parent;
+ _self = self;
+ }
+
+ public void OnNext(TResult value)
+ {
+ lock (_parent._gate)
+ _parent._observer.OnNext(value);
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(error);
+ _parent.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _parent._group.Remove(_self);
+ if (_parent._isStopped && _parent._group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_parent._gate)
+ {
+ _parent._observer.OnCompleted();
+ _parent.Dispose();
+ }
+ }
+ }
+ }
+ }
+
+ class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
- _index = -1;
}
public void OnNext(TSource value)
@@ -799,14 +1361,91 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TResult>);
try
{
- if (_parent._selectorE != null)
- xs = _parent._selectorE(value);
- else
+ xs = _parent._selectorE(value);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ var e = default(IEnumerator<TResult>);
+ try
+ {
+ e = xs.GetEnumerator();
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ try
+ {
+ var hasNext = true;
+ while (hasNext)
{
- checked { _index++; }
- xs = _parent._selectorEWithIndex(value, _index);
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = e.Current;
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ if (hasNext)
+ base._observer.OnNext(current);
}
}
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+
+ class Omega : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private int _index;
+
+ public void OnNext(TSource value)
+ {
+ var xs = default(IEnumerable<TResult>);
+ try
+ {
+ xs = _parent._selectorEI(value, checked(_index++));
+ }
catch (Exception exception)
{
base._observer.OnError(exception);
@@ -873,11 +1512,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectManyImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
- public τ(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -982,6 +1621,117 @@ namespace System.Reactive.Linq.Observαble
}
}
}
+
+ class Sigma : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private CancellationDisposable _cancel;
+ private volatile int _count;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _cancel = new CancellationDisposable();
+ _count = 1;
+
+ return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+ }
+
+ public void OnNext(TSource value)
+ {
+ var task = default(Task<TResult>);
+ try
+ {
+ Interlocked.Increment(ref _count);
+ task = _parent._selectorTI(value, checked(_index++), _cancel.Token);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ if (task.IsCompleted)
+ {
+ OnCompletedTask(task);
+ }
+ else
+ {
+ task.ContinueWith(OnCompletedTask);
+ }
+ }
+
+ private void OnCompletedTask(Task<TResult> task)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.RanToCompletion:
+ {
+ lock (_gate)
+ base._observer.OnNext(task.Result);
+
+ OnCompleted();
+ }
+ break;
+ case TaskStatus.Faulted:
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(task.Exception.InnerException);
+ base.Dispose();
+ }
+ }
+ break;
+ case TaskStatus.Canceled:
+ {
+ if (!_cancel.IsDisposed)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(new TaskCanceledException(task));
+ base.Dispose();
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Decrement(ref _count) == 0)
+ {
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ }
+ }
#pragma warning restore 0420
#endif
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
index 4661f82..6b03aba 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SequenceEqual<TSource> : Producer<bool>
{
@@ -38,7 +38,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new SequenceEqualImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -226,11 +226,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<bool>, IObserver<TSource>
+ class SequenceEqualImpl : Sink<bool>, IObserver<TSource>
{
private readonly SequenceEqual<TSource> _parent;
- public ε(SequenceEqual<TSource> parent, IObserver<bool> observer, IDisposable cancel)
+ public SequenceEqualImpl(SequenceEqual<TSource> parent, IObserver<bool> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs
index 19c07e5..c4dce4a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SingleAsync<TSource> : Producer<TSource>
{
@@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new SingleAsyncImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -84,13 +84,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<TSource>, IObserver<TSource>
+ class SingleAsyncImpl : Sink<TSource>, IObserver<TSource>
{
private readonly SingleAsync<TSource> _parent;
private TSource _value;
private bool _seenValue;
- public π(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SingleAsyncImpl(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
index a092d99..7222816 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Skip<TSource> : Producer<TSource>
{
@@ -27,7 +27,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(int count)
+ public IObservable<TSource> Omega(int count)
{
//
// Sum semantics:
@@ -39,7 +39,7 @@ namespace System.Reactive.Linq.Observαble
return new Skip<TSource>(_source, _count + count);
}
- public IObservable<TSource> Ω(TimeSpan duration)
+ public IObservable<TSource> Omega(TimeSpan duration)
{
//
// Maximum semantics:
@@ -66,7 +66,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SkipImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -105,12 +105,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class SkipImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Skip<TSource> _parent;
private volatile bool _open;
- public τ(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SkipImpl(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs
index 596f183..6bd771e 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SkipLast<TSource> : Producer<TSource>
{
@@ -37,7 +37,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SkipLastImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -75,12 +75,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class SkipLastImpl : Sink<TSource>, IObserver<TSource>
{
private readonly SkipLast<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
- public τ(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SkipLastImpl(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
index 8d4c7a9..8acce09 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
@@ -6,7 +6,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SkipUntil<TSource, TOther> : Producer<TSource>
{
@@ -140,7 +140,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(DateTimeOffset startTime)
+ public IObservable<TSource> Omega(DateTimeOffset startTime)
{
//
// Maximum semantics:
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs
index e6e379d..b5694dc 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SkipWhile<TSource> : Producer<TSource>
{
@@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SkipWhileImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -86,13 +86,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class SkipWhileImpl : Sink<TSource>, IObserver<TSource>
{
private readonly SkipWhile<TSource> _parent;
private bool _running;
private int _index;
- public τ(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SkipWhileImpl(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs
index 6d51153..c5346f8 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SumDouble : Producer<double>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
index 794b0d4..e27d6eb 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Switch<TSource> : Producer<TSource>
{
@@ -65,7 +65,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_innerSubscription.Disposable = d;
- d.Disposable = value.SubscribeSafe(new ι(this, id, d));
+ d.Disposable = value.SubscribeSafe(new Iter(this, id, d));
}
public void OnError(Exception error)
@@ -91,13 +91,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TSource>
+ class Iter : IObserver<TSource>
{
private readonly _ _parent;
private readonly ulong _id;
private readonly IDisposable _self;
- public ι(_ parent, ulong id, IDisposable self)
+ public Iter(_ parent, ulong id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs
index 36985e9..ef4e885 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Synchronize<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
index 738e7c0..255e3d2 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Take<TSource> : Producer<TSource>
{
@@ -27,7 +27,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(int count)
+ public IObservable<TSource> Omega(int count)
{
//
// Minimum semantics:
@@ -42,7 +42,7 @@ namespace System.Reactive.Linq.Observαble
return new Take<TSource>(_source, count);
}
- public IObservable<TSource> Ω(TimeSpan duration)
+ public IObservable<TSource> Omega(TimeSpan duration)
{
//
// Minimum semantics:
@@ -69,7 +69,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new TakeImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -115,11 +115,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class TakeImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Take<TSource> _parent;
- public τ(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TakeImpl(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
index 98bf40e..441a502 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeLast<TSource> : Producer<TSource>
{
@@ -41,7 +41,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new TakeLastImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -131,12 +131,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class TakeLastImpl : Sink<TSource>, IObserver<TSource>
{
private readonly TakeLast<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
- public τ(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TakeLastImpl(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs
index 71b36b0..2a15825 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs
@@ -7,7 +7,7 @@ using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeLastBuffer<TSource> : Producer<IList<TSource>>
{
@@ -39,7 +39,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new Impl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -82,12 +82,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<IList<TSource>>, IObserver<TSource>
+ class Impl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly TakeLastBuffer<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
- public τ(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public Impl(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
index eccc222..62b450a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
@@ -6,7 +6,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeUntil<TSource, TOther> : Producer<TSource>
{
@@ -173,7 +173,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(DateTimeOffset endTime)
+ public IObservable<TSource> Omega(DateTimeOffset endTime)
{
//
// Minimum semantics:
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs
index 30f63ca..dbf6117 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeWhile<TSource> : Producer<TSource>
{
@@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new TakeWhileImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -91,13 +91,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class TakeWhileImpl : Sink<TSource>, IObserver<TSource>
{
private readonly TakeWhile<TSource> _parent;
private bool _running;
private int _index;
- public τ(TakeWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TakeWhileImpl(TakeWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
index 7b7ad86..5061114 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Throttle<TSource> : Producer<TSource>
{
@@ -192,7 +192,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_cancelable.Disposable = d;
- d.Disposable = throttle.SubscribeSafe(new δ(this, value, currentid, d));
+ d.Disposable = throttle.SubscribeSafe(new Delta(this, value, currentid, d));
}
public void OnError(Exception error)
@@ -226,14 +226,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TThrottle>
+ class Delta : IObserver<TThrottle>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly ulong _currentid;
private readonly IDisposable _self;
- public δ(_ parent, TSource value, ulong currentid, IDisposable self)
+ public Delta(_ parent, TSource value, ulong currentid, IDisposable self)
{
_parent = parent;
_value = value;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs
index 3b10894..9940a9d 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Throw<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs
index dacfa03..4c40863 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs
@@ -7,7 +7,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TimeInterval<TSource> : Producer<System.Reactive.TimeInterval<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
index 4896294..94b78a6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Timeout<TSource> : Producer<TSource>
{
@@ -35,23 +35,23 @@ namespace System.Reactive.Linq.Observαble
{
if (_dueTimeA.HasValue)
{
- var sink = new α(this, observer, cancel);
+ var sink = new TimeA(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
{
- var sink = new ρ(this, observer, cancel);
+ var sink = new TimeR(this, observer, cancel);
setSink(sink);
return sink.Run();
}
}
- class α : Sink<TSource>, IObserver<TSource>
+ class TimeA : Sink<TSource>, IObserver<TSource>
{
private readonly Timeout<TSource> _parent;
- public α(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TimeA(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -136,11 +136,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ρ : Sink<TSource>, IObserver<TSource>
+ class TimeR : Sink<TSource>, IObserver<TSource>
{
private readonly Timeout<TSource> _parent;
- public ρ(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TimeR(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -358,16 +358,16 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_timer.Disposable = d;
- d.Disposable = timeout.SubscribeSafe(new τ(this, myid, d));
+ d.Disposable = timeout.SubscribeSafe(new TimeoutImpl(this, myid, d));
}
- class τ : IObserver<TTimeout>
+ class TimeoutImpl : IObserver<TTimeout>
{
private readonly _ _parent;
private readonly ulong _id;
private readonly IDisposable _self;
- public τ(_ parent, ulong id, IDisposable self)
+ public TimeoutImpl(_ parent, ulong id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
index 22ee0df..14df757 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
@@ -7,7 +7,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Timer : Producer<long>
{
@@ -34,7 +34,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_period.HasValue)
{
- var sink = new π(this, observer, cancel);
+ var sink = new TimerImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -76,12 +76,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<long>
+ class TimerImpl : Sink<long>
{
private readonly Timer _parent;
private readonly TimeSpan _period;
- public π(Timer parent, IObserver<long> observer, IDisposable cancel)
+ public TimerImpl(Timer parent, IObserver<long> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs
index ed54678..19bf1be 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Timestamp<TSource> : Producer<Timestamped<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs
index ba039ae..97b9bd4 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToArray<TSource> : Producer<TSource[]>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs
index 919c1a1..5cfd9ec 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToDictionary<TSource, TKey, TElement> : Producer<IDictionary<TKey, TElement>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs
index 34956d4..a265098 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToList<TSource> : Producer<IList<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs
index 55ff7d3..d86f5ee 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToLookup<TSource, TKey, TElement> : Producer<ILookup<TKey, TElement>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs
index 795d54d..8de8ff7 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToObservable<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
index 996ee0e..680d1f4 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Using<TSource, TResource> : Producer<TSource>
where TResource : IDisposable
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs
index a8eaa92..3ab5f73 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Where<TSource> : Producer<TSource>
{
@@ -23,7 +23,7 @@ namespace System.Reactive.Linq.Observαble
_predicateI = predicate;
}
- public IObservable<TSource> Ω(Func<TSource, bool> predicate)
+ public IObservable<TSource> Omega(Func<TSource, bool> predicate)
{
if (_predicate != null)
return new Where<TSource>(_source, x => _predicate(x) && predicate(x));
@@ -41,7 +41,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new WhereImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -88,12 +88,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class WhereImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Where<TSource> _parent;
private int _index;
- public τ(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public WhereImpl(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs
index 178e4c8..2f24adf 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class While<TSource> : Producer<TSource>, IConcatenatable<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs
index 47059c7..4dad064 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs
@@ -9,7 +9,7 @@ using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Window<TSource> : Producer<IObservable<TSource>>
{
@@ -54,7 +54,7 @@ namespace System.Reactive.Linq.Observαble
}
else if (_count > 0)
{
- var sink = new μ(this, observer, cancel);
+ var sink = new BoundedWindowImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -62,13 +62,13 @@ namespace System.Reactive.Linq.Observαble
{
if (_timeSpan == _timeShift)
{
- var sink = new π(this, observer, cancel);
+ var sink = new TimeShiftImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new WindowImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -151,11 +151,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<IObservable<TSource>>, IObserver<TSource>
+ class WindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource> _parent;
- public τ(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public WindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -296,11 +296,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<IObservable<TSource>>, IObserver<TSource>
+ class TimeShiftImpl : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource> _parent;
- public π(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public TimeShiftImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -371,11 +371,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class μ : Sink<IObservable<TSource>>, IObserver<TSource>
+ class BoundedWindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource> _parent;
- public μ(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public BoundedWindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -516,7 +516,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new β(this, observer, cancel);
+ var sink = new Beta(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -578,7 +578,7 @@ namespace System.Reactive.Linq.Observαble
var closingSubscription = new SingleAssignmentDisposable();
_m.Disposable = closingSubscription;
- closingSubscription.Disposable = windowClose.SubscribeSafe(new ω(this, closingSubscription));
+ closingSubscription.Disposable = windowClose.SubscribeSafe(new Omega(this, closingSubscription));
}
private void CloseWindow(IDisposable closingSubscription)
@@ -597,12 +597,12 @@ namespace System.Reactive.Linq.Observαble
_windowGate.Wait(CreateWindowClose);
}
- class ω : IObserver<TWindowClosing>
+ class Omega : IObserver<TWindowClosing>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ω(_ parent, IDisposable self)
+ public Omega(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -653,11 +653,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class β : Sink<IObservable<TSource>>, IObserver<TSource>
+ class Beta : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource, TWindowClosing> _parent;
- public β(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public Beta(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -680,16 +680,16 @@ namespace System.Reactive.Linq.Observαble
base._observer.OnNext(window);
d.Add(_parent._source.SubscribeSafe(this));
- d.Add(_parent._windowBoundaries.SubscribeSafe(new ω(this)));
+ d.Add(_parent._windowBoundaries.SubscribeSafe(new Omega(this)));
return _refCountDisposable;
}
- class ω : IObserver<TWindowClosing>
+ class Omega : IObserver<TWindowClosing>
{
- private readonly β _parent;
+ private readonly Beta _parent;
- public ω(β parent)
+ public Omega(Beta parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
index bde6e29..77434c7 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
@@ -7,7 +7,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
#region Binary
@@ -42,7 +42,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new ZipImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -258,11 +258,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TFirst>
+ class ZipImpl : Sink<TResult>, IObserver<TFirst>
{
private readonly Zip<TFirst, TSecond, TResult> _parent;
- public ε(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs
index d4b0dbc..4489003 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
}
#endif
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Aggregates.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Aggregates.cs
index 739fc2d..8967297 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Aggregates.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Aggregates.cs
@@ -7,7 +7,7 @@ using System.Reactive.Disposables;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Binding.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Binding.cs
index d9a96ba..8066a0e 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Binding.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Binding.cs
@@ -6,7 +6,7 @@ using System.Reactive.Subjects;
namespace System.Reactive.Linq
{
- using Observαble;
+ using ObservableImpl;
internal partial class QueryLanguage
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Blocking.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Blocking.cs
index 2b37006..5f69873 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Blocking.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Blocking.cs
@@ -12,7 +12,7 @@ using System.Reactive.Threading;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
@@ -195,7 +195,7 @@ namespace System.Reactive.Linq
{
#if !NO_PERF
var evt = new ManualResetEvent(false);
- var sink = new ForEach<TSource>.τ(onNext, () => evt.Set());
+ var sink = new ForEach<TSource>.ForEachImpl(onNext, () => evt.Set());
using (source.SubscribeSafe(sink))
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Concurrency.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Concurrency.cs
index 58408b2..0dc4035 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Concurrency.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Concurrency.cs
@@ -7,7 +7,7 @@ using System.Threading;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Conversions.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Conversions.cs
index 44d6cf6..d795b4d 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Conversions.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Conversions.cs
@@ -7,7 +7,7 @@ using System.Reactive.Disposables;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs
index 4aa25b2..55bae61 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs
@@ -14,7 +14,7 @@ using System.Threading.Tasks;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Events.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Events.cs
index 3b9f2bf..453bcee 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Events.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Events.cs
@@ -13,7 +13,7 @@ using System.Runtime.InteropServices.WindowsRuntime;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
//
@@ -57,7 +57,7 @@ namespace System.Reactive.Linq
private static IObservable<EventPattern<EventArgs>> FromEventPattern_(Action<EventHandler> addHandler, Action<EventHandler> removeHandler, IScheduler scheduler)
{
#if !NO_PERF
- return new FromEventPattern.τ<EventHandler, EventArgs>(e => new EventHandler(e), addHandler, removeHandler, scheduler);
+ return new FromEventPattern.Impl<EventHandler, EventArgs>(e => new EventHandler(e), addHandler, removeHandler, scheduler);
#else
var res = Observable.FromEventPattern<EventHandler, EventArgs>(e => new EventHandler(e), addHandler, removeHandler);
return SynchronizeEvents(res, scheduler);
@@ -67,7 +67,7 @@ namespace System.Reactive.Linq
private static IObservable<EventPattern<object>> FromEventPattern_(Action<EventHandler> addHandler, Action<EventHandler> removeHandler, IScheduler scheduler)
{
#if !NO_PERF
- return new FromEventPattern.τ<EventHandler, object>(e => new EventHandler(e), addHandler, removeHandler, scheduler);
+ return new FromEventPattern.Impl<EventHandler, object>(e => new EventHandler(e), addHandler, removeHandler, scheduler);
#else
var res = Observable.FromEventPattern<EventHandler, object>(e => new EventHandler(e), addHandler, removeHandler);
return SynchronizeEvents(res, scheduler);
@@ -108,7 +108,7 @@ namespace System.Reactive.Linq
#endif
{
#if !NO_PERF
- return new FromEventPattern.τ<TDelegate, TEventArgs>(addHandler, removeHandler, scheduler);
+ return new FromEventPattern.Impl<TDelegate, TEventArgs>(addHandler, removeHandler, scheduler);
#else
var res = new AnonymousObservable<EventPattern<TEventArgs>>(observer =>
{
@@ -151,7 +151,7 @@ namespace System.Reactive.Linq
#endif
{
#if !NO_PERF
- return new FromEventPattern.τ<TDelegate, TEventArgs>(conversion, addHandler, removeHandler, scheduler);
+ return new FromEventPattern.Impl<TDelegate, TEventArgs>(conversion, addHandler, removeHandler, scheduler);
#else
var res = new AnonymousObservable<EventPattern<TEventArgs>>(observer =>
{
@@ -193,7 +193,7 @@ namespace System.Reactive.Linq
#endif
{
#if !NO_PERF
- return new FromEventPattern.τ<TDelegate, TSender, TEventArgs>(addHandler, removeHandler, scheduler);
+ return new FromEventPattern.Impl<TDelegate, TSender, TEventArgs>(addHandler, removeHandler, scheduler);
#else
var res = new AnonymousObservable<EventPattern<TSender, TEventArgs>>(observer =>
{
@@ -240,7 +240,7 @@ namespace System.Reactive.Linq
#endif
{
#if !NO_PERF
- return new FromEventPattern.τ<EventHandler<TEventArgs>, TEventArgs>(handler => handler, addHandler, removeHandler, scheduler);
+ return new FromEventPattern.Impl<EventHandler<TEventArgs>, TEventArgs>(handler => handler, addHandler, removeHandler, scheduler);
#else
var res = Observable.FromEventPattern<EventHandler<TEventArgs>, TEventArgs>(handler => handler, addHandler, removeHandler);
return SynchronizeEvents(res, scheduler);
@@ -480,7 +480,7 @@ namespace System.Reactive.Linq
if (isWinRT)
{
#if !NO_PERF
- return new FromEventPattern.ρ<TSender, TEventArgs, TResult>(target, delegateType, addMethod, removeMethod, getResult, true, scheduler);
+ return new FromEventPattern.Handler<TSender, TEventArgs, TResult>(target, delegateType, addMethod, removeMethod, getResult, true, scheduler);
#else
return new AnonymousObservable<TResult>(observer =>
{
@@ -494,7 +494,7 @@ namespace System.Reactive.Linq
#endif
#if !NO_PERF
- return new FromEventPattern.ρ<TSender, TEventArgs, TResult>(target, delegateType, addMethod, removeMethod, getResult, false, scheduler);
+ return new FromEventPattern.Handler<TSender, TEventArgs, TResult>(target, delegateType, addMethod, removeMethod, getResult, false, scheduler);
#else
var res = new AnonymousObservable<TResult>(observer =>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Imperative.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Imperative.cs
index 75b86b3..1825035 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Imperative.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Imperative.cs
@@ -13,7 +13,7 @@ using System.Threading.Tasks;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Multiple.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Multiple.cs
index 7308740..c726e82 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Multiple.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Multiple.cs
@@ -16,7 +16,7 @@ using System.Threading.Tasks;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs
index 5f64e16..2770ade 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs
@@ -9,7 +9,7 @@ using System.Reactive.Subjects;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
@@ -21,7 +21,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var asObservable = source as AsObservable<TSource>;
if (asObservable != null)
- return asObservable.Ω();
+ return asObservable.Omega();
return new AsObservable<TSource>(source);
#else
@@ -331,7 +331,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var ignoreElements = source as IgnoreElements<TSource>;
if (ignoreElements != null)
- return ignoreElements.Ω();
+ return ignoreElements.Omega();
return new IgnoreElements<TSource>(source);
#else
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
index 0089f04..41ceb9c 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
@@ -1,13 +1,10 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
-using System.Linq;
using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
-using System.Reactive.Subjects;
+
#if !NO_TPL
-using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
#endif
@@ -15,7 +12,7 @@ using System.Threading.Tasks;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
@@ -156,30 +153,50 @@ namespace System.Reactive.Linq
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
{
- return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
{
- return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
- return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
{
- return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+ {
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+ {
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
}
- private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+ private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
#if !NO_PERF
- return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+ return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
#else
- return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), comparer);
+ return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer);
#endif
}
@@ -189,32 +206,54 @@ namespace System.Reactive.Linq
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
- return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
{
- return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, EqualityComparer<TKey>.Default);
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
- return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, comparer);
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
{
- return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, EqualityComparer<TKey>.Default);
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
}
- private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
#if !NO_PERF
- return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+ return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
#else
return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer =>
{
- var map = new Dictionary<TKey, ISubject<TElement>>(comparer);
+ var map = capacity.HasValue
+ ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer)
+ : new Dictionary<TKey, ISubject<TElement>>(comparer);
var groupDisposable = new CompositeDisposable();
var refCountDisposable = new RefCountDisposable(groupDisposable);
@@ -733,7 +772,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var select = source as Select<TSource>;
if (select != null)
- return select.Ω(selector);
+ return select.Omega(selector);
return new Select<TSource, TResult>(source, selector);
#else
@@ -853,6 +892,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, (x, i, token) => selector(x, i));
+#else
+ return SelectMany_<TSource, TResult>(source, (x, i) => selector(x, i).ToObservable());
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
#if !NO_PERF
@@ -861,6 +909,15 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TResult>(source, x => FromAsync(ct => selector(x, ct)));
#endif
}
+
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, selector);
+#else
+ return SelectMany_<TSource, TResult>(source, (x, i) => FromAsync(ct => selector(x, i, ct)));
+#endif
+ }
#endif
public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
@@ -883,6 +940,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TTaskResult, TResult>(source, (x, i, token) => taskSelector(x, i), resultSelector);
+#else
+ return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => taskSelector(x, i).ToObservable(), (x, i, t, _) => resultSelector(x, i, t));
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
#if !NO_PERF
@@ -891,6 +957,15 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TTaskResult, TResult>(source, x => FromAsync(ct => taskSelector(x, ct)), resultSelector);
#endif
}
+
+ public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
+#else
+ return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => FromAsync(ct => taskSelector(x, i, ct)), (x, i, t, _) => resultSelector(x, i, t));
+#endif
+ }
#endif
private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
@@ -901,7 +976,7 @@ namespace System.Reactive.Linq
return source.Select(selector).Merge();
#endif
}
-
+
private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
#if !NO_PERF
@@ -925,7 +1000,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
#else
- return SelectMany_<TSource, TResult>(source, x => collectionSelector(x).Select(y => resultSelector(x, y)));
+ return SelectMany_<TSource, TResult>(source, (x, i) => collectionSelector(x, i).Select((y, i2) => resultSelector(x, i, y, i2)));
#endif
}
@@ -946,19 +1021,23 @@ namespace System.Reactive.Linq
#endif
}
- public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted)
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
{
#if !NO_PERF
return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
#else
- return source.Materialize().SelectMany(notification =>
+ return Defer(() =>
{
- if (notification.Kind == NotificationKind.OnNext)
- return onNext(notification.Value);
- else if (notification.Kind == NotificationKind.OnError)
- return onError(notification.Exception);
- else
- return onCompleted();
+ var index = 0;
+ return source.Materialize().SelectMany(notification =>
+ {
+ if (notification.Kind == NotificationKind.OnNext)
+ return onNext(notification.Value, checked(index++));
+ else if (notification.Kind == NotificationKind.OnError)
+ return onError(notification.Exception);
+ else
+ return onCompleted();
+ });
});
#endif
}
@@ -977,7 +1056,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
return new SelectMany<TSource, TResult>(source, selector);
#else
- return SelectMany_<TSource, TResult, TResult>(source, selector, (_, x) => x);
+ return SelectMany_<TSource, TResult, TResult>(source, selector, (_, __, x, ___) => x);
#endif
}
@@ -986,6 +1065,11 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
+ public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
#if !NO_PERF
@@ -1045,9 +1129,67 @@ namespace System.Reactive.Linq
#endif
}
- public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
+#if !NO_PERF
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+#else
+ return new AnonymousObservable<TResult>(observer =>
+ {
+ var index = 0;
+
+ return source.Subscribe(
+ x =>
+ {
+ var xs = default(IEnumerable<TCollection>);
+ try
+ {
+ xs = collectionSelector(x, checked(index++));
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+
+ var e = xs.GetEnumerator();
+
+ try
+ {
+ var eIndex = 0;
+ var hasNext = true;
+ while (hasNext)
+ {
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = resultSelector(x, index, e.Current, checked(eIndex++));
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+
+ if (hasNext)
+ observer.OnNext(current);
+ }
+ }
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ },
+ observer.OnError,
+ observer.OnCompleted
+ )
+ });
+#endif
}
#endregion
@@ -1059,7 +1201,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var skip = source as Skip<TSource>;
if (skip != null && skip._scheduler == null)
- return skip.Ω(count);
+ return skip.Omega(count);
return new Skip<TSource>(source, count);
#else
@@ -1156,7 +1298,7 @@ namespace System.Reactive.Linq
{
var take = source as Take<TSource>;
if (take != null && take._scheduler == null)
- return take.Ω(count);
+ return take.Omega(count);
return new Take<TSource>(source, count);
}
@@ -1248,7 +1390,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var where = source as Where<TSource>;
if (where != null)
- return where.Ω(predicate);
+ return where.Omega(predicate);
return new Where<TSource>(source, predicate);
#else
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs
index 161e1c1..f495177 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs
@@ -8,7 +8,7 @@ using System.Reactive.Subjects;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
@@ -612,7 +612,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var skip = source as Skip<TSource>;
if (skip != null && skip._scheduler == scheduler)
- return skip.Ω(duration);
+ return skip.Omega(duration);
return new Skip<TSource>(source, duration, scheduler);
#else
@@ -703,7 +703,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var skipUntil = source as SkipUntil<TSource>;
if (skipUntil != null && skipUntil._scheduler == scheduler)
- return skipUntil.Ω(startTime);
+ return skipUntil.Omega(startTime);
return new SkipUntil<TSource>(source, startTime, scheduler);
#else
@@ -747,7 +747,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var take = source as Take<TSource>;
if (take != null && take._scheduler == scheduler)
- return take.Ω(duration);
+ return take.Omega(duration);
return new Take<TSource>(source, duration, scheduler);
#else
@@ -914,7 +914,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var takeUntil = source as TakeUntil<TSource>;
if (takeUntil != null && takeUntil._scheduler == scheduler)
- return takeUntil.Ω(endTime);
+ return takeUntil.Omega(endTime);
return new TakeUntil<TSource>(source, endTime, scheduler);
#else
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Subjects/BehaviorSubject.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Subjects/BehaviorSubject.cs
index daf23eb..211ad09 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Subjects/BehaviorSubject.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Subjects/BehaviorSubject.cs
@@ -11,6 +11,38 @@ namespace System.Reactive.Subjects
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
public sealed class BehaviorSubject<T> : ISubject<T>, IDisposable
{
+ /// <summary>
+ /// Gets the current value or throws an exception.
+ /// </summary>
+ /// <value>The initial value passed to the constructor until <see cref="OnNext"/> is called; after which, the last value passed to <see cref="OnNext"/>.</value>
+ /// <remarks>
+ /// <para><see cref="Value"/> is frozen after <see cref="OnCompleted"/> is called.</para>
+ /// <para>After <see cref="OnError"/> is called, <see cref="Value"/> always throws the specified exception.</para>
+ /// <para>An exception is always thrown after <see cref="Dispose"/> is called.</para>
+ /// <alert type="caller">
+ /// Reading <see cref="Value"/> is a thread-safe operation, though there's a potential race condition when <see cref="OnNext"/> or <see cref="OnError"/> are being invoked concurrently.
+ /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions.
+ /// </alert>
+ /// </remarks>
+ /// <exception cref="ObjectDisposedException">Dispose was called.</exception>
+ public T Value
+ {
+ get
+ {
+ lock (_gate)
+ {
+ CheckDisposed();
+
+ if (_exception != null)
+ {
+ throw _exception;
+ }
+
+ return _value;
+ }
+ }
+ }
+
private readonly object _gate = new object();
private ImmutableList<IObserver<T>> _observers;