Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@veritas-vos-liberabit.com>2013-02-08 11:47:02 +0400
committerAtsushi Eno <atsushieno@veritas-vos-liberabit.com>2013-02-08 11:47:02 +0400
commit4d702d6c69ca2645a61a2978b527027aaed4b997 (patch)
tree6f204c75c6df593c63191f5735765c68c5b2cc82
parent287eca34bbef9169f6c659458caa5817fe23aae6 (diff)
import 5526a6f490db, official Rx 2.1 release.
-rw-r--r--Rx/NET/Source/BuildAll.proj2
-rw-r--r--Rx/NET/Source/BuildSetup.bat6
-rw-r--r--Rx/NET/Source/Common.targets8
-rw-r--r--Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs52
-rw-r--r--Rx/NET/Source/README.txt25
-rw-r--r--Rx/NET/Source/Rx.sln4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs5
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs51
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs112
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs125
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs19
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs59
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs18
-rw-r--r--Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs423
-rw-r--r--Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs50
-rw-r--r--Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs192
16 files changed, 1031 insertions, 120 deletions
diff --git a/Rx/NET/Source/BuildAll.proj b/Rx/NET/Source/BuildAll.proj
index acfdce6..69736e4 100644
--- a/Rx/NET/Source/BuildAll.proj
+++ b/Rx/NET/Source/BuildAll.proj
@@ -88,7 +88,7 @@
<CreateItem Include="@(Flavor)" AdditionalMetadata="Properties=Configuration=%(Flavor.Configuration)%3BPlatform=Any CPU%3BOutDir=$(LayoutOutputFolder)\%(Flavor.Configuration)\">
<Output TaskParameter="Include" ItemName="ProjectToBuild" />
</CreateItem>
- <CreateItem Include="Setup\BuildAll.proj" AdditionalMetadata="Properties=BinariesLayoutFolder=$(LayoutOutputFolder)%3BOutDir=$(SetupOutputFolder)" Condition=" '$(BuildSetup)' == '1' ">
+ <CreateItem Include="..\..\Private\Setup\BuildAll.proj" AdditionalMetadata="Properties=BinariesLayoutFolder=$(LayoutOutputFolder)%3BOutDir=$(SetupOutputFolder)" Condition=" '$(BuildSetup)' == '1' ">
<Output TaskParameter="Include" ItemName="SetupToBuild" />
</CreateItem>
</Target>
diff --git a/Rx/NET/Source/BuildSetup.bat b/Rx/NET/Source/BuildSetup.bat
index 49dc22f..21bb950 100644
--- a/Rx/NET/Source/BuildSetup.bat
+++ b/Rx/NET/Source/BuildSetup.bat
@@ -1 +1,5 @@
-msbuild BuildAll.proj /p:BuildSetup=1 /p:BuildNumber=0.9.0.0 /p:RxRelease=EXPERIMENTAL \ No newline at end of file
+@REM msbuild BuildAll.proj /p:BuildSetup=1 /p:BuildNumber=2.0.30116.0 /p:RxRelease=EXPERIMENTAL
+
+@REM I've had good success with /t:Rebuild. But will omit it for speed.
+@REM msbuild BuildAll.proj /t:Rebuild /p:BuildSetup=1 /p:SignedBuild=1 /p:BuildNumber=2.1.30201.0 /p:RxRelease=RTM
+msbuild BuildAll.proj /p:BuildSetup=1 /p:SignedBuild=1 /p:BuildNumber=2.1.30201.0 /p:RxRelease=RTM
diff --git a/Rx/NET/Source/Common.targets b/Rx/NET/Source/Common.targets
index 713def0..90b6fe2 100644
--- a/Rx/NET/Source/Common.targets
+++ b/Rx/NET/Source/Common.targets
@@ -93,7 +93,7 @@
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<TargetFrameworkProfile>Profile78</TargetFrameworkProfile>
<NoStdLib>true</NoStdLib>
- <SignKeyEnhanced>true</SignKeyEnhanced>
+ <!-- <SignKeyEnhanced>true</SignKeyEnhanced> Disabled due to CLR signing bug w/ SHA2 keys blocking Windows Phone 8 apps from being signed. -->
</PropertyGroup>
<PropertyGroup Condition=" '$(BuildTarget)' == '45' ">
@@ -102,7 +102,7 @@
<NoStdLib>true</NoStdLib>
<BuildPlatform>DESKTOPCLR</BuildPlatform>
<BuildFlavor>DESKTOPCLR45</BuildFlavor>
- <SignKeyEnhanced>true</SignKeyEnhanced>
+ <!-- <SignKeyEnhanced>true</SignKeyEnhanced> Disabled due to CLR signing bug w/ SHA2 keys blocking Windows Phone 8 apps from being signed. -->
</PropertyGroup>
<PropertyGroup Condition=" '$(BuildTarget)' == '8' ">
@@ -113,7 +113,7 @@
<BuildPlatform>WINDOWS</BuildPlatform>
<BuildFlavor>WINDOWS8</BuildFlavor>
<DefaultLanguage>en-US</DefaultLanguage>
- <SignKeyEnhanced>true</SignKeyEnhanced>
+ <!-- <SignKeyEnhanced>true</SignKeyEnhanced> Disabled due to CLR signing bug w/ SHA2 keys blocking Windows Phone 8 apps from being signed. -->
</PropertyGroup>
<PropertyGroup Condition=" '$(BuildTarget)' == '40' ">
@@ -153,7 +153,7 @@
<BuildPlatform>WINDOWS_PHONE</BuildPlatform>
<BuildFlavor>WINDOWS_PHONE8</BuildFlavor>
<NoStdLib>true</NoStdLib>
- <SignKeyEnhanced>true</SignKeyEnhanced>
+ <!-- <SignKeyEnhanced>true</SignKeyEnhanced> Disabled due to CLR signing bug w/ SHA2 keys blocking Windows Phone 8 apps from being signed. -->
</PropertyGroup>
<PropertyGroup Condition=" '$(BuildTarget)' == 'XBLV' ">
diff --git a/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs b/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs
index 067cc72..37ff279 100644
--- a/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs
+++ b/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs
@@ -65,6 +65,20 @@ namespace Microsoft.Reactive.Testing
}
/// <summary>
+ /// Factory method for an OnCompleted notification record at a given time.
+ /// </summary>
+ /// <typeparam name="T">The element type for the resulting notification object.</typeparam>
+ /// <param name="dummy">An unused instance of type T, to force the compiler to infer that T as part of the method's return value.</param>
+ /// <param name="ticks">Recorded virtual time the OnCompleted notification occurs.</param>
+ /// <returns>Recorded OnCompleted notification.</returns>
+ /// <remarks>This overload is used for anonymous types - by passing in an instance of the type, the compiler can infer the
+ /// anonymous type without you having to try naming the type.</remarks>
+ public static Recorded<Notification<T>> OnCompleted<T>(T dummy, long ticks)
+ {
+ return new Recorded<Notification<T>>(ticks, Notification.CreateOnCompleted<T>());
+ }
+
+ /// <summary>
/// Factory method for an OnError notification record at a given time with a given error.
/// </summary>
/// <typeparam name="T">The element type for the resulting notification object.</typeparam>
@@ -97,6 +111,44 @@ namespace Microsoft.Reactive.Testing
}
/// <summary>
+ /// Factory method for an OnError notification record at a given time with a given error.
+ /// </summary>
+ /// <typeparam name="T">The element type for the resulting notification object.</typeparam>
+ /// <param name="dummy">An unused instance of type T, to force the compiler to infer that T as part of the method's return value.</param>
+ /// <param name="ticks">Recorded virtual time the OnError notification occurs.</param>
+ /// <param name="exception">Recorded exception stored in the OnError notification.</param>
+ /// <returns>Recorded OnError notification.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="exception"/> is null.</exception>
+ /// <remarks>This overload is used for anonymous types - by passing in an instance of the type, the compiler can infer the
+ /// anonymous type without you having to try naming the type.</remarks>
+ public static Recorded<Notification<T>> OnError<T>(T dummy, long ticks, Exception exception)
+ {
+ if (exception == null)
+ throw new ArgumentNullException("exception");
+
+ return new Recorded<Notification<T>>(ticks, Notification.CreateOnError<T>(exception));
+ }
+
+ /// <summary>
+ /// Factory method for writing an assert that checks for an OnError notification record at a given time, using the specified predicate to check the exception.
+ /// </summary>
+ /// <typeparam name="T">The element type for the resulting notification object.</typeparam>
+ /// <param name="dummy">An unused instance of type T, to force the compiler to infer that T as part of the method's return value.</param>
+ /// <param name="ticks">Recorded virtual time the OnError notification occurs.</param>
+ /// <param name="predicate">Predicate function to check the OnError notification value against an expected exception.</param>
+ /// <returns>Recorded OnError notification with a predicate to assert a given exception.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="predicate"/> is null.</exception>
+ /// <remarks>This overload is used for anonymous types - by passing in an instance of the type, the compiler can infer the
+ /// anonymous type without you having to try naming the type.</remarks>
+ public static Recorded<Notification<T>> OnError<T>(T dummy, long ticks, Func<Exception, bool> predicate)
+ {
+ if (predicate == null)
+ throw new ArgumentNullException("predicate");
+
+ return new Recorded<Notification<T>>(ticks, new OnErrorPredicate<T>(predicate));
+ }
+
+ /// <summary>
/// Factory method for a subscription record based on a given subscription and disposal time.
/// </summary>
/// <param name="start">Virtual time indicating when the subscription was created.</param>
diff --git a/Rx/NET/Source/README.txt b/Rx/NET/Source/README.txt
index 2239644..b808037 100644
--- a/Rx/NET/Source/README.txt
+++ b/Rx/NET/Source/README.txt
@@ -1,11 +1,18 @@
-Build Instructions
+To build all flavors of Rx, you will need several different SDK's installed:
-In order to be able to build all configurations listed in the Visual Studio solution, the following
-components need to be installed:
- * Windows Phone 8 SDK (https://dev.windowsphone.com/en-us/downloadsdk)
- * Visual Studio SDK for building VS extensions, VSIX (http://www.microsoft.com/en-us/download/details.aspx?id=30668)
- * Windows Installer Xml (WiX) toolset (http://wix.codeplex.com)
+Visual Studio 2012
+Windows Phone 8 SDK (create.msdn.com)
+Windows Phone 7.1 SDK (create.msdn.com)
+Silverlight 4 SDK
+Microsoft Silverlight 4 Tools for Visual Studio 2010
+Xbox XNA Game Studio 4.0*
- To build the XNA components, you need to install the Xbox XNA Game Studio 4.0.
- Instructions on how install the required XNA components on Windows 8 are available at the
- following link: http://blogs.msdn.com/b/astebner/archive/2012/02/29/10274694.aspx \ No newline at end of file
+* Note: Installing Xbox XNA Game Studio is tricky on Windows 8, as you must install some other dependencies first.
+ More information can be found here: http://blogs.msdn.com/b/astebner/archive/2012/02/29/10274694.aspx
+
+For building installers and Visual Studio extensions, we need the following installed:
+Visual Studio 2012 SDK
+Windows Installer XML (WiX) toolset (wix.codeplex.com), preferably version 3.5.
+
+
+Note: the XNA Game Studio build has not been actively maintained and may not build. Feel free to fix it!
diff --git a/Rx/NET/Source/Rx.sln b/Rx/NET/Source/Rx.sln
index d563cad..c46be4f 100644
--- a/Rx/NET/Source/Rx.sln
+++ b/Rx/NET/Source/Rx.sln
@@ -45,10 +45,10 @@ Global
GlobalSection(TeamFoundationVersionControl) = preSolution
SccNumberOfProjects = 1
SccEnterpriseProvider = {4CA58AB2-18FA-4F8D-95D4-32DDF27D184C}
- SccTeamFoundationServer = https://tfs1.interop.msftlabs.com/tfs/interop%20team%20projects
+ SccTeamFoundationServer = http://tfs1:8085/tfs/interop%20team%20projects
SccProjectUniqueName0 = Playground\\Playground.csproj
SccProjectName0 = Playground
- SccAuxPath0 = https://tfs1.interop.msftlabs.com/tfs/interop%20team%20projects
+ SccAuxPath0 = http://tfs1:8085/tfs/interop%20team%20projects
SccLocalPath0 = Playground
SccProvider0 = {4CA58AB2-18FA-4F8D-95D4-32DDF27D184C}
EndGlobalSection
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
index 9cf8105..4b35af6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
@@ -694,10 +694,15 @@ namespace System.Reactive.Linq
IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, TResult> selector);
IObservable<TOther> SelectMany<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector);
+ IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector);
IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector);
+ IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted);
+ IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted);
IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector);
+ IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector);
IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector);
+ IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector);
IObservable<TSource> Skip<TSource>(IObservable<TSource> source, int count);
IObservable<TSource> SkipWhile<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate);
IObservable<TSource> SkipWhile<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate);
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs
index 749f823..5d8e3ef 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs
@@ -511,6 +511,30 @@ namespace System.Reactive.Linq
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to prepend values to.</param>
+ /// <param name="values">Values to prepend to the specified sequence.</param>
+ /// <returns>The source sequence prepended with the specified values.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="values"/> is null.</exception>
+ public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, IEnumerable<TSource> values)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (values == null)
+ throw new ArgumentNullException("values");
+
+ TSource[] valueArray = values as TSource[];
+ if (valueArray == null)
+ {
+ List<TSource> valueList = new List<TSource>(values);
+ valueArray = valueList.ToArray();
+ }
+ return s_impl.StartWith<TSource>(source, valueArray);
+ }
+
+ /// <summary>
+ /// Prepends a sequence of values to an observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence to prepend values to.</param>
/// <param name="scheduler">Scheduler to emit the prepended values on.</param>
/// <param name="values">Values to prepend to the specified sequence.</param>
/// <returns>The source sequence prepended with the specified values.</returns>
@@ -527,6 +551,33 @@ namespace System.Reactive.Linq
return s_impl.StartWith<TSource>(source, scheduler, values);
}
+ /// <summary>
+ /// Prepends a sequence of values to an observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence to prepend values to.</param>
+ /// <param name="scheduler">Scheduler to emit the prepended values on.</param>
+ /// <param name="values">Values to prepend to the specified sequence.</param>
+ /// <returns>The source sequence prepended with the specified values.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> or <paramref name="values"/> is null.</exception>
+ public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (scheduler == null)
+ throw new ArgumentNullException("scheduler");
+ if (values == null)
+ throw new ArgumentNullException("values");
+
+ TSource[] valueArray = values as TSource[];
+ if (valueArray == null)
+ {
+ List<TSource> valueList = new List<TSource>(values);
+ valueArray = valueList.ToArray();
+ }
+ return s_impl.StartWith<TSource>(source, scheduler, valueArray);
+ }
+
#endregion
#region + TakeLast +
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
index e20e3eb..8eb9289 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
@@ -544,6 +544,25 @@ namespace System.Reactive.Linq
return s_impl.SelectMany<TSource, TResult>(source, selector);
}
+ /// <summary>
+ /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
+ public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (selector == null)
+ throw new ArgumentNullException("selector");
+
+ return s_impl.SelectMany<TSource, TResult>(source, selector);
+ }
+
#if !NO_TPL
/// <summary>
/// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
@@ -609,6 +628,29 @@ namespace System.Reactive.Linq
return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
+ /// <summary>
+ /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="collectionSelector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
+ public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (collectionSelector == null)
+ throw new ArgumentNullException("collectionSelector");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
#if !NO_TPL
/// <summary>
/// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
@@ -685,6 +727,31 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of notifications to project.</param>
+ /// <param name="onNext">A transform function to apply to each element; the second parameter represents the index of the source element.</param>
+ /// <param name="onError">A transform function to apply when an error occurs in the source sequence; the second parameter represents the index of the source element.</param>
+ /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached; the second parameter represents the number of elements observed.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
+ public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (onNext == null)
+ throw new ArgumentNullException("onNext");
+ if (onError == null)
+ throw new ArgumentNullException("onError");
+ if (onCompleted == null)
+ throw new ArgumentNullException("onCompleted");
+
+ return s_impl.SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
+ }
+
+ /// <summary>
/// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -705,6 +772,27 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
+ /// The index of each source element is used in the projected form of that element.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
+ /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
+ public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (selector == null)
+ throw new ArgumentNullException("selector");
+
+ return s_impl.SelectMany<TSource, TResult>(source, selector);
+ }
+
+ /// <summary>
/// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -728,6 +816,30 @@ namespace System.Reactive.Linq
return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
+ /// <summary>
+ /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
+ /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
+ /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
+ public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (collectionSelector == null)
+ throw new ArgumentNullException("collectionSelector");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
#endregion
#region + Skip +
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
index 687a894..054f974 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
@@ -17,8 +17,11 @@ namespace System.Reactive.Linq.Observαble
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
+ private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex;
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
+ private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
+ private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -27,6 +30,13 @@ namespace System.Reactive.Linq.Observαble
_resultSelector = resultSelector;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorWithIndex = collectionSelector;
+ _resultSelectorWithIndex = resultSelector;
+ }
+
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
_source = source;
@@ -34,6 +44,13 @@ namespace System.Reactive.Linq.Observαble
_resultSelector = resultSelector;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorEWithIndex = collectionSelector;
+ _resultSelectorWithIndex = resultSelector;
+ }
+
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
@@ -47,7 +64,7 @@ namespace System.Reactive.Linq.Observαble
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_collectionSelector != null)
+ if (_collectionSelector != null || _collectionSelectorWithIndex != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
@@ -77,12 +94,14 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
+ _indexInSource = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
+ private int _indexInSource;
public IDisposable Run()
{
@@ -103,7 +122,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- collection = _parent._collectionSelector(value);
+ if (_parent._collectionSelector != null)
+ collection = _parent._collectionSelector(value);
+ else
+ {
+ checked { _indexInSource++; }
+ collection = _parent._collectionSelectorWithIndex(value, _indexInSource);
+ }
}
catch (Exception ex)
{
@@ -117,7 +142,7 @@ namespace System.Reactive.Linq.Observαble
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription));
+ innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource));
}
public void OnError(Exception error)
@@ -158,12 +183,16 @@ namespace System.Reactive.Linq.Observαble
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
+ private int _indexInSource;
+ private int _indexInIntermediate = -1;
- public ι(_ parent, TSource value, IDisposable self)
+ public ι(_ parent, TSource value, IDisposable self, int indexInSource)
{
_parent = parent;
_value = value;
_self = self;
+ _indexInSource = indexInSource;
+ _indexInIntermediate = -1;
}
public void OnNext(TCollection value)
@@ -172,7 +201,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- res = _parent._parent._resultSelector(_value, value);
+ if (_parent._parent._resultSelector != null)
+ res = _parent._parent._resultSelector(_value, value);
+ else
+ {
+ checked { _indexInIntermediate++; }
+ res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate);
+ }
}
catch (Exception ex)
{
@@ -222,11 +257,13 @@ namespace System.Reactive.Linq.Observαble
class ε : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
+ private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection
public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
+ _indexInSource = -1;
}
public void OnNext(TSource value)
@@ -234,7 +271,13 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TCollection>);
try
{
- xs = _parent._collectionSelectorE(value);
+ if (_parent._collectionSelectorE != null)
+ xs = _parent._collectionSelectorE(value);
+ else
+ {
+ checked { _indexInSource++; }
+ xs = _parent._collectionSelectorEWithIndex(value, _indexInSource);
+ }
}
catch (Exception exception)
{
@@ -257,6 +300,7 @@ namespace System.Reactive.Linq.Observαble
try
{
+ int indexInIntermediate = -1;
var hasNext = true;
while (hasNext)
{
@@ -267,7 +311,15 @@ namespace System.Reactive.Linq.Observαble
{
hasNext = e.MoveNext();
if (hasNext)
- current = _parent._resultSelector(value, e.Current);
+ {
+ if (_parent._resultSelector != null)
+ current = _parent._resultSelector(value, e.Current);
+ else
+ {
+ checked { indexInIntermediate++; }
+ current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate);
+ }
+ }
}
catch (Exception exception)
{
@@ -445,7 +497,11 @@ namespace System.Reactive.Linq.Observαble
private readonly Func<TSource, IObservable<TResult>> _selector;
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
+ private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex;
+ private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError;
+ private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted;
private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
+ private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
@@ -461,12 +517,32 @@ namespace System.Reactive.Linq.Observαble
_selectorOnCompleted = selectorOnCompleted;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ {
+ _source = source;
+ _selectorWithIndex = selector;
+ }
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted)
+ {
+ _source = source;
+ _selectorWithIndex = selector;
+ _selectorWithIndexOnError = selectorOnError;
+ _selectorWithIndexOnCompleted = selectorOnCompleted;
+ }
+
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
{
_source = source;
_selectorE = selector;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
+ {
+ _source = source;
+ _selectorEWithIndex = selector;
+ }
+
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
@@ -479,7 +555,7 @@ namespace System.Reactive.Linq.Observαble
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_selector != null)
+ if (_selector != null || _selectorWithIndex != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
@@ -509,12 +585,14 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
+ _index = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
+ private int _index;
public IDisposable Run()
{
@@ -535,7 +613,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- inner = _parent._selector(value);
+ if (_parent._selector != null)
+ inner = _parent._selector(value);
+ else
+ {
+ checked { _index++; }
+ inner = _parent._selectorWithIndex(value, _index);
+ }
}
catch (Exception ex)
{
@@ -558,7 +642,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- inner = _parent._selectorOnError(error);
+ if (_parent._selectorOnError != null)
+ inner = _parent._selectorOnError(error);
+ else
+ {
+ checked { _index++; }
+ inner = _parent._selectorWithIndexOnError(error, _index);
+ }
}
catch (Exception ex)
{
@@ -592,7 +682,10 @@ namespace System.Reactive.Linq.Observαble
try
{
- inner = _parent._selectorOnCompleted();
+ if (_parent._selectorOnCompleted != null)
+ inner = _parent._selectorOnCompleted();
+ else
+ inner = _parent._selectorWithIndexOnCompleted(_index);
}
catch (Exception ex)
{
@@ -692,11 +785,13 @@ namespace System.Reactive.Linq.Observαble
class ε : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
+ private int _index;
public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
+ _index = -1;
}
public void OnNext(TSource value)
@@ -704,7 +799,13 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TResult>);
try
{
- xs = _parent._selectorE(value);
+ if (_parent._selectorE != null)
+ xs = _parent._selectorE(value);
+ else
+ {
+ checked { _index++; }
+ xs = _parent._selectorEWithIndex(value, _index);
+ }
}
catch (Exception exception)
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs
index 09e2df4..5f64e16 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs
@@ -496,6 +496,25 @@ namespace System.Reactive.Linq
return StartWith_<TSource>(source, scheduler, values);
}
+ public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IEnumerable<TSource> values)
+ {
+ return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values);
+ }
+
+ public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
+ {
+ if (values == null)
+ throw new ArgumentNullException("values");
+
+ var valueArray = values as TSource[];
+ if (valueArray == null)
+ {
+ List<TSource> valueList = new List<TSource>(values);
+ valueArray = valueList.ToArray();
+ }
+ return StartWith_<TSource>(source, scheduler, valueArray);
+ }
+
private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
{
return values.ToObservable(scheduler).Concat(source);
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
index eff30dd..0089f04 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
@@ -838,6 +838,11 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TResult>(source, selector);
}
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ {
+ return SelectMany_<TSource, TResult>(source, selector);
+ }
+
#if !NO_TPL
public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
{
@@ -863,6 +868,11 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
+ public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
#if !NO_TPL
public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
@@ -892,6 +902,15 @@ namespace System.Reactive.Linq
#endif
}
+ private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, selector);
+#else
+ return source.Select(selector).Merge();
+#endif
+ }
+
private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
#if !NO_PERF
@@ -901,6 +920,15 @@ namespace System.Reactive.Linq
#endif
}
+ private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+#else
+ return SelectMany_<TSource, TResult>(source, x => collectionSelector(x).Select(y => resultSelector(x, y)));
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
{
#if !NO_PERF
@@ -918,6 +946,23 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
+#else
+ return source.Materialize().SelectMany(notification =>
+ {
+ if (notification.Kind == NotificationKind.OnNext)
+ return onNext(notification.Value);
+ else if (notification.Kind == NotificationKind.OnError)
+ return onError(notification.Exception);
+ else
+ return onCompleted();
+ });
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
{
#if !NO_PERF
@@ -927,6 +972,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, selector);
+#else
+ return SelectMany_<TSource, TResult, TResult>(source, selector, (_, x) => x);
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
@@ -991,6 +1045,11 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
#endregion
#region + Skip +
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs
index 896c451..fc33401 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs
@@ -112,4 +112,22 @@ namespace System.Reactive
return String.Format(CultureInfo.CurrentCulture, "{0}@{1}", Value, Timestamp);
}
}
+
+ /// <summary>
+ /// A helper class with a factory method for creating Timestamped&lt;T&gt; instances.
+ /// </summary>
+ public static class Timestamped
+ {
+ /// <summary>
+ /// Creates an instance of a Timestamped&lt;T&gt;. This is syntactic sugar that uses type inference
+ /// to avoid specifying a type in a constructor call, which is very useful when using anonymous types.
+ /// </summary>
+ /// <param name="value">The value to be annotated with a timestamp.</param>
+ /// <param name="timestamp">Timestamp associated with the value.</param>
+ /// <returns>Creates a new timestamped value.</returns>
+ public static Timestamped<T> Create<T>(T value, DateTimeOffset timestamp)
+ {
+ return new Timestamped<T>(value, timestamp);
+ }
+ }
}
diff --git a/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs b/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs
index 2cf02e9..2a5f3e4 100644
--- a/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs
+++ b/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs
@@ -1,5 +1,6 @@
/*
- * WARNING: Auto-generated file (8/14/2012 12:14:10 AM)
+ * WARNING: Auto-generated file (1/10/2013 8:30:32 PM)
+ * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
*/
#pragma warning disable 1591
@@ -8507,27 +8508,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Decimal" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Int32" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Decimal" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Int32&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<decimal> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal>> selector)
+ public static IQbservable<int?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<decimal>(
+ return source.Provider.CreateQuery<int?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8538,27 +8539,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Int32" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Int64" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Int32" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Int64&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<int> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int>> selector)
+ public static IQbservable<long?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<int>(
+ return source.Provider.CreateQuery<long?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8569,27 +8570,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Int64" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Double" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Int64" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Double" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<long> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long>> selector)
+ public static IQbservable<double> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<long>(
+ return source.Provider.CreateQuery<double>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8600,27 +8601,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Double" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Single" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Double&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Single" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<double?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double?>> selector)
+ public static IQbservable<float> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<double?>(
+ return source.Provider.CreateQuery<float>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double?>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8631,27 +8632,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Single" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Decimal" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Single&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Decimal" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<float?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float?>> selector)
+ public static IQbservable<decimal> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<float?>(
+ return source.Provider.CreateQuery<decimal>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float?>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8662,27 +8663,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Decimal" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Int32" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Decimal&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Int32" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<decimal?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal?>> selector)
+ public static IQbservable<int> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<decimal?>(
+ return source.Provider.CreateQuery<int>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal?>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8693,27 +8694,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Int32" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Int64" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Int32&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Int64" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<int?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int?>> selector)
+ public static IQbservable<long> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<int?>(
+ return source.Provider.CreateQuery<long>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int?>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8724,27 +8725,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Int64" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Double" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Int64&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Double&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<long?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long?>> selector)
+ public static IQbservable<double?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<long?>(
+ return source.Provider.CreateQuery<double?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long?>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8755,27 +8756,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Double" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Single" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Double" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Single&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<double> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double>> selector)
+ public static IQbservable<float?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<double>(
+ return source.Provider.CreateQuery<float?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -8786,27 +8787,27 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Invokes a transform function on each element of a sequence and returns the maximum <see cref="T:System.Single" /> value.
+ /// Invokes a transform function on each element of a sequence and returns the maximum nullable <see cref="T:System.Decimal" /> value.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values to determine the maximum value of.</param>
/// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Single" /> that corresponds to the maximum value in the source sequence.</returns>
+ /// <returns>An observable sequence containing a single element with the value of type <see cref="T:System.Nullable&lt;System.Decimal&gt;" /> that corresponds to the maximum value in the source sequence.</returns>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<float> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float>> selector)
+ public static IQbservable<decimal?> Max<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<float>(
+ return source.Provider.CreateQuery<decimal?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float>>))),
+ InfoOf(() => Qbservable.Max<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -11466,6 +11467,42 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="collectionSelector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
+ public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (collectionSelector == null)
+ throw new ArgumentNullException("collectionSelector");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return source.Provider.CreateQuery<TResult>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
+#endif
+ source.Expression,
+ collectionSelector,
+ resultSelector
+ )
+ );
+ }
+
+ /// <summary>
/// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -11503,6 +11540,43 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
+ /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
+ /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+ public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (collectionSelector == null)
+ throw new ArgumentNullException("collectionSelector");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return source.Provider.CreateQuery<TResult>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
+#endif
+ source.Expression,
+ collectionSelector,
+ resultSelector
+ )
+ );
+ }
+
+ /// <summary>
/// Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -11573,6 +11647,45 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of notifications to project.</param>
+ /// <param name="onNext">A transform function to apply to each element; the second parameter represents the index of the source element.</param>
+ /// <param name="onError">A transform function to apply when an error occurs in the source sequence; the second parameter represents the index of the source element.</param>
+ /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached; the second parameter represents the number of elements observed.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="onNext" /> or <paramref name="onError" /> or <paramref name="onCompleted" /> is null.</exception>
+ public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TResult>>> onNext, Expression<Func<Exception, int, IObservable<TResult>>> onError, Expression<Func<int, IObservable<TResult>>> onCompleted)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (onNext == null)
+ throw new ArgumentNullException("onNext");
+ if (onError == null)
+ throw new ArgumentNullException("onError");
+ if (onCompleted == null)
+ throw new ArgumentNullException("onCompleted");
+
+ return source.Provider.CreateQuery<TResult>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TResult>>>), default(Expression<Func<Exception, int, IObservable<TResult>>>), default(Expression<Func<int, IObservable<TResult>>>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
+#endif
+ source.Expression,
+ onNext,
+ onError,
+ onCompleted
+ )
+ );
+ }
+
+ /// <summary>
/// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
@@ -11603,6 +11716,37 @@ namespace System.Reactive.Linq
);
}
+ /// <summary>
+ /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
+ public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TResult>>> selector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (selector == null)
+ throw new ArgumentNullException("selector");
+
+ return source.Provider.CreateQuery<TResult>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TResult>>>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
+#endif
+ source.Expression,
+ selector
+ )
+ );
+ }
+
#if !NO_TPL
/// <summary>
/// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
@@ -11703,6 +11847,39 @@ namespace System.Reactive.Linq
);
}
+ /// <summary>
+ /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
+ /// The index of each source element is used in the projected form of that element.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
+ /// <param name="source">An observable sequence of elements to project.</param>
+ /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
+ /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
+ /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+ public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TResult>>> selector)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (selector == null)
+ throw new ArgumentNullException("selector");
+
+ return source.Provider.CreateQuery<TResult>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TResult>>>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
+#endif
+ source.Expression,
+ selector
+ )
+ );
+ }
+
#if !NO_TPL
/// <summary>
/// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
@@ -12800,6 +12977,40 @@ namespace System.Reactive.Linq
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to prepend values to.</param>
+ /// <param name="scheduler">Scheduler to emit the prepended values on.</param>
+ /// <param name="values">Values to prepend to the specified sequence.</param>
+ /// <returns>The source sequence prepended with the specified values.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="scheduler" /> or <paramref name="values" /> is null.</exception>
+ public static IQbservable<TSource> StartWith<TSource>(this IQbservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (scheduler == null)
+ throw new ArgumentNullException("scheduler");
+ if (values == null)
+ throw new ArgumentNullException("values");
+
+ return source.Provider.CreateQuery<TSource>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.StartWith<TSource>(default(IQbservable<TSource>), default(IScheduler), default(IEnumerable<TSource>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+ source.Expression,
+ Expression.Constant(scheduler, typeof(IScheduler)),
+ GetSourceExpression(values)
+ )
+ );
+ }
+
+ /// <summary>
+ /// Prepends a sequence of values to an observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence to prepend values to.</param>
/// <param name="values">Values to prepend to the specified sequence.</param>
/// <returns>The source sequence prepended with the specified values.</returns>
/// <exception cref="T:System.ArgumentNullException">
@@ -12826,6 +13037,36 @@ namespace System.Reactive.Linq
}
/// <summary>
+ /// Prepends a sequence of values to an observable sequence.
+ /// </summary>
+ /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+ /// <param name="source">Source sequence to prepend values to.</param>
+ /// <param name="values">Values to prepend to the specified sequence.</param>
+ /// <returns>The source sequence prepended with the specified values.</returns>
+ /// <exception cref="T:System.ArgumentNullException">
+ /// <paramref name="source" /> or <paramref name="values" /> is null.</exception>
+ public static IQbservable<TSource> StartWith<TSource>(this IQbservable<TSource> source, IEnumerable<TSource> values)
+ {
+ if (source == null)
+ throw new ArgumentNullException("source");
+ if (values == null)
+ throw new ArgumentNullException("values");
+
+ return source.Provider.CreateQuery<TSource>(
+ Expression.Call(
+ null,
+#if CRIPPLED_REFLECTION
+ InfoOf(() => Qbservable.StartWith<TSource>(default(IQbservable<TSource>), default(IEnumerable<TSource>))),
+#else
+ ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+ source.Expression,
+ GetSourceExpression(values)
+ )
+ );
+ }
+
+ /// <summary>
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used;
/// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
/// </summary>
@@ -13162,7 +13403,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of nullable <see cref="T:System.Double" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of <see cref="T:System.Double" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13171,18 +13412,18 @@ namespace System.Reactive.Linq
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<double?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double?>> selector)
+ public static IQbservable<double> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<double?>(
+ return source.Provider.CreateQuery<double>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double?>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13193,7 +13434,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of nullable <see cref="T:System.Single" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of <see cref="T:System.Single" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13202,18 +13443,18 @@ namespace System.Reactive.Linq
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<float?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float?>> selector)
+ public static IQbservable<float> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<float?>(
+ return source.Provider.CreateQuery<float>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float?>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13224,7 +13465,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of nullable <see cref="T:System.Decimal" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of <see cref="T:System.Decimal" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13234,18 +13475,18 @@ namespace System.Reactive.Linq
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <exception cref="T:System.OverflowException">(Asynchronous) The sum of the projected values for the elements in the source sequence is larger than <see cref="M:System.Decimal.MaxValue" />.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<decimal?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal?>> selector)
+ public static IQbservable<decimal> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<decimal?>(
+ return source.Provider.CreateQuery<decimal>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal?>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13256,7 +13497,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of nullable <see cref="T:System.Int32" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of <see cref="T:System.Int32" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13266,18 +13507,18 @@ namespace System.Reactive.Linq
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <exception cref="T:System.OverflowException">(Asynchronous) The sum of the projected values for the elements in the source sequence is larger than <see cref="M:System.Int32.MaxValue" />.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<int?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int?>> selector)
+ public static IQbservable<int> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<int?>(
+ return source.Provider.CreateQuery<int>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int?>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13288,7 +13529,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of nullable <see cref="T:System.Int64" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of <see cref="T:System.Int64" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13298,18 +13539,18 @@ namespace System.Reactive.Linq
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <exception cref="T:System.OverflowException">(Asynchronous) The sum of the projected values for the elements in the source sequence is larger than <see cref="M:System.Int64.MaxValue" />.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<long?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long?>> selector)
+ public static IQbservable<long> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<long?>(
+ return source.Provider.CreateQuery<long>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long?>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13320,7 +13561,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of <see cref="T:System.Double" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of nullable <see cref="T:System.Double" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13329,18 +13570,18 @@ namespace System.Reactive.Linq
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<double> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double>> selector)
+ public static IQbservable<double?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, double?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<double>(
+ return source.Provider.CreateQuery<double?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, double?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13351,7 +13592,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of <see cref="T:System.Single" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of nullable <see cref="T:System.Single" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13360,18 +13601,18 @@ namespace System.Reactive.Linq
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<float> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float>> selector)
+ public static IQbservable<float?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, float?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<float>(
+ return source.Provider.CreateQuery<float?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, float?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13382,7 +13623,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of <see cref="T:System.Decimal" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of nullable <see cref="T:System.Decimal" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13392,18 +13633,18 @@ namespace System.Reactive.Linq
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <exception cref="T:System.OverflowException">(Asynchronous) The sum of the projected values for the elements in the source sequence is larger than <see cref="M:System.Decimal.MaxValue" />.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<decimal> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal>> selector)
+ public static IQbservable<decimal?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, decimal?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<decimal>(
+ return source.Provider.CreateQuery<decimal?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, decimal?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13414,7 +13655,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of <see cref="T:System.Int32" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of nullable <see cref="T:System.Int32" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13424,18 +13665,18 @@ namespace System.Reactive.Linq
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <exception cref="T:System.OverflowException">(Asynchronous) The sum of the projected values for the elements in the source sequence is larger than <see cref="M:System.Int32.MaxValue" />.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<int> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int>> selector)
+ public static IQbservable<int?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, int?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<int>(
+ return source.Provider.CreateQuery<int?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, int?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
@@ -13446,7 +13687,7 @@ namespace System.Reactive.Linq
}
/// <summary>
- /// Computes the sum of a sequence of <see cref="T:System.Int64" /> values that are obtained by invoking a transform function on each element of the input sequence.
+ /// Computes the sum of a sequence of nullable <see cref="T:System.Int64" /> values that are obtained by invoking a transform function on each element of the input sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">A sequence of values that are used to calculate a sum.</param>
@@ -13456,18 +13697,18 @@ namespace System.Reactive.Linq
/// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
/// <exception cref="T:System.OverflowException">(Asynchronous) The sum of the projected values for the elements in the source sequence is larger than <see cref="M:System.Int64.MaxValue" />.</exception>
/// <remarks>The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior.</remarks>
- public static IQbservable<long> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long>> selector)
+ public static IQbservable<long?> Sum<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, long?>> selector)
{
if (source == null)
throw new ArgumentNullException("source");
if (selector == null)
throw new ArgumentNullException("selector");
- return source.Provider.CreateQuery<long>(
+ return source.Provider.CreateQuery<long?>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
- InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long>>))),
+ InfoOf(() => Qbservable.Sum<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, long?>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs
index 7361fed..8ebe588 100644
--- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs
+++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs
@@ -3568,6 +3568,56 @@ namespace ReactiveTests.Tests
);
}
+ [TestMethod]
+ public void StartWith_Enumerable()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(150, 1),
+ OnNext(220, 4),
+ OnCompleted<int>(250)
+ );
+
+ List<int> data = new List<int>(new[] { 1, 2, 3 });
+ var res = scheduler.Start(() =>
+ xs.StartWith(data)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(200, 1),
+ OnNext(200, 2),
+ OnNext(200, 3),
+ OnNext(220, 4),
+ OnCompleted<int>(250)
+ );
+ }
+
+ [TestMethod]
+ public void StartWith_Enumerable_Scheduler()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(150, 1),
+ OnNext(220, 4),
+ OnCompleted<int>(250)
+ );
+
+ List<int> data = new List<int>(new[] { 1, 2, 3 });
+ var res = scheduler.Start(() =>
+ xs.StartWith(scheduler, data)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(201, 1),
+ OnNext(202, 2),
+ OnNext(203, 3),
+ OnNext(220, 4),
+ OnCompleted<int>(250)
+ );
+ }
+
#endregion
#region + TakeLast +
diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
index e006bc1..0670909 100644
--- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
+++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
@@ -8425,6 +8425,198 @@ namespace ReactiveTests.Tests
}
[TestMethod]
+ // Tests this overload:
+ // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector);
+ public void SelectMany_WithIndex_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ ITestableObservable<char> cs = scheduler.CreateHotObservable(
+ OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
+ OnNext(250, 'a'),
+ OnNext(270, 'l'),
+ OnNext(310, 'o'),
+ OnCompleted<char>(410)
+ );
+
+ var res = scheduler.Start(() =>
+ cs.SelectMany(
+ (x, i) => Observable.Return(new { x, i }, scheduler)
+ ));
+
+ res.Messages.AssertEqual(
+ OnNext(251, new { x = 'a', i = 0 }),
+ OnNext(271, new { x = 'l', i = 1 }),
+ OnNext(311, new { x = 'o', i = 2 }),
+ OnCompleted(new { x = default(char), i = default(int) }, 410)
+ );
+
+ cs.Subscriptions.AssertEqual(
+ Subscribe(200, 410));
+ }
+
+ [TestMethod]
+ // Tests this overload:
+ // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector);
+ public void SelectMany_WithIndex_IEnumerable_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ ITestableObservable<char> cs = scheduler.CreateHotObservable(
+ OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
+ OnNext(250, 'a'),
+ OnNext(270, 'l'),
+ OnNext(310, 'o'),
+ OnCompleted<char>(410)
+ );
+
+ var res = scheduler.Start(() =>
+ cs.SelectMany(
+ (c, i) => new [] { new { c = c, i = i } }
+ ));
+
+
+ res.Messages.AssertEqual(
+ OnNext(250, new { c = 'a', i = 0 }),
+ OnNext(270, new { c = 'l', i = 1 }),
+ OnNext(310, new { c = 'o', i = 2 }),
+ OnCompleted(new { c = default(char), i = default(int) }, 410)
+ );
+
+ cs.Subscriptions.AssertEqual(
+ Subscribe(200, 410));
+ }
+
+
+ [TestMethod]
+ // Tests this overload:
+ // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, int, TResult> resultSelector);
+ public void SelectMany_WithIndex_IObservable_ResultSelector_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ ITestableObservable<ITestableObservable<char>> css = scheduler.CreateHotObservable(
+ OnNext(190, scheduler.CreateColdObservable(
+ OnNext(1, 'h'),
+ OnCompleted<char>(2))),
+ OnNext(250, scheduler.CreateColdObservable(
+ OnNext(1, 'a'),
+ OnCompleted<char>(2))),
+ OnNext(270, scheduler.CreateColdObservable(
+ OnNext(1, 'l'),
+ OnCompleted<char>(2))),
+ OnNext(310, scheduler.CreateColdObservable(
+ OnNext(1, 'o'),
+ OnNext(2, ' '),
+ OnNext(3, 'r'),
+ OnNext(4, 'u'),
+ OnNext(5, 'l'),
+ OnNext(6, 'e'),
+ OnNext(7, 'z'),
+ OnCompleted<char>(8))),
+ OnCompleted<ITestableObservable<char>>(410)
+ );
+
+ var res = scheduler.Start(() =>
+ css.SelectMany(
+ (foo, i) =>
+ {
+ return foo.Select(c => new { c = c, i = i });
+ },
+ (source, i, cs, j) => new { c = cs.c, i = cs.i, i2 = i, j = j }
+ ));
+
+ res.Messages.AssertEqual(
+ OnNext(251, new { c = 'a', i = 0, i2 = 0, j = 0 }),
+ OnNext(271, new { c = 'l', i = 1, i2 = 1, j = 0 }),
+ OnNext(311, new { c = 'o', i = 2, i2 = 2, j = 0 }),
+ OnNext(312, new { c = ' ', i = 2, i2 = 2, j = 1 }),
+ OnNext(313, new { c = 'r', i = 2, i2 = 2, j = 2 }),
+ OnNext(314, new { c = 'u', i = 2, i2 = 2, j = 3 }),
+ OnNext(315, new { c = 'l', i = 2, i2 = 2, j = 4 }),
+ OnNext(316, new { c = 'e', i = 2, i2 = 2, j = 5 }),
+ OnNext(317, new { c = 'z', i = 2, i2 = 2, j = 6 }),
+ OnCompleted(new { c = 'a', i = 0, i2 = 0, j = 0 }, 410)
+ );
+
+ css.Subscriptions.AssertEqual(
+ Subscribe(200, 410));
+ }
+
+
+ [TestMethod]
+ // Tests this overload:
+ // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector);
+ public void SelectMany_WithIndex_IEnumerable_ResultSelector_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 5),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, i) => new[] { new { x = x + 1, i = i }, new { x = -x, i = i }, new { x = x * x, i = i } },
+ (x, i, y, j) => new { x = x, i = i, y = y.x, y_i = y.i, j = j })
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 5, i = 0, y = 6, y_i = 0, j = 0 }),
+ OnNext(210, new { x = 5, i = 0, y = -5, y_i = 0, j = 1 }),
+ OnNext(210, new { x = 5, i = 0, y = 25, y_i = 0, j = 2 }),
+ OnNext(340, new { x = 4, i = 1, y = 5, y_i = 1, j = 0 }),
+ OnNext(340, new { x = 4, i = 1, y = -4, y_i = 1, j = 1 }),
+ OnNext(340, new { x = 4, i = 1, y = 16, y_i = 1, j = 2 }),
+ OnNext(420, new { x = 3, i = 2, y = 4, y_i = 2, j = 0 }),
+ OnNext(420, new { x = 3, i = 2, y = -3, y_i = 2, j = 1 }),
+ OnNext(420, new { x = 3, i = 2, y = 9, y_i = 2, j = 2 }),
+ OnCompleted(new { x = default(int), i = default(int), y = default(int), y_i = default(int), j = default(int) }, 600)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ // Tests this overload:
+ // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted);
+ public void SelectMany_WithIndex_Triple_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ ITestableObservable<char> cs = scheduler.CreateHotObservable(
+ OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
+ OnNext(250, 'a'),
+ OnNext(270, 'l'),
+ OnNext(310, 'o'),
+ OnCompleted<char>(410)
+ );
+
+ var res = scheduler.Start(() =>
+ cs.SelectMany(
+ (c, i) => Observable.Return(new { c = c, i = i }, scheduler),
+ (ex, i) => { throw ex; },
+ (i) => Observable.Repeat(new { c = 'x', i = -1 }, i, scheduler)
+ ));
+
+ res.Messages.AssertEqual(
+ OnNext(251, new { c = 'a', i = 0 }),
+ OnNext(271, new { c = 'l', i = 1 }),
+ OnNext(311, new { c = 'o', i = 2 }),
+ OnCompleted(new { c = default(char), i = default(int) }, 410)
+ );
+
+ cs.Subscriptions.AssertEqual(
+ Subscribe(200, 410));
+ }
+
+
+ [TestMethod]
public void SelectMany_Enumerable_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, IEnumerable<int>>.Instance));