From 4d702d6c69ca2645a61a2978b527027aaed4b997 Mon Sep 17 00:00:00 2001 From: Atsushi Eno Date: Fri, 8 Feb 2013 16:47:02 +0900 Subject: import 5526a6f490db, official Rx 2.1 release. --- Rx/NET/Source/BuildAll.proj | 2 +- Rx/NET/Source/BuildSetup.bat | 6 +- Rx/NET/Source/Common.targets | 8 +- .../Microsoft.Reactive.Testing/ReactiveTest.cs | 52 +++ Rx/NET/Source/README.txt | 25 +- Rx/NET/Source/Rx.sln | 4 +- .../Reactive/Linq/IQueryLanguage.cs | 5 + .../Reactive/Linq/Observable.Single.cs | 51 +++ .../Linq/Observable.StandardSequenceOperators.cs | 112 ++++++ .../Reactive/Linq/Observable/SelectMany.cs | 125 +++++- .../Reactive/Linq/QueryLanguage.Single.cs | 19 + .../QueryLanguage.StandardSequenceOperators.cs | 59 +++ .../System.Reactive.Linq/Reactive/Timestamped.cs | 18 + .../Reactive/Linq/Qbservable.Generated.cs | 423 ++++++++++++++++----- .../Tests/Linq/ObservableSingleTest.cs | 50 +++ .../Linq/ObservableStandardQueryOperatorTest.cs | 192 ++++++++++ 16 files changed, 1031 insertions(+), 120 deletions(-) diff --git a/Rx/NET/Source/BuildAll.proj b/Rx/NET/Source/BuildAll.proj index acfdce6..69736e4 100644 --- a/Rx/NET/Source/BuildAll.proj +++ b/Rx/NET/Source/BuildAll.proj @@ -88,7 +88,7 @@ - + diff --git a/Rx/NET/Source/BuildSetup.bat b/Rx/NET/Source/BuildSetup.bat index 49dc22f..21bb950 100644 --- a/Rx/NET/Source/BuildSetup.bat +++ b/Rx/NET/Source/BuildSetup.bat @@ -1 +1,5 @@ -msbuild BuildAll.proj /p:BuildSetup=1 /p:BuildNumber=0.9.0.0 /p:RxRelease=EXPERIMENTAL \ No newline at end of file +@REM msbuild BuildAll.proj /p:BuildSetup=1 /p:BuildNumber=2.0.30116.0 /p:RxRelease=EXPERIMENTAL + +@REM I've had good success with /t:Rebuild. But will omit it for speed. +@REM msbuild BuildAll.proj /t:Rebuild /p:BuildSetup=1 /p:SignedBuild=1 /p:BuildNumber=2.1.30201.0 /p:RxRelease=RTM +msbuild BuildAll.proj /p:BuildSetup=1 /p:SignedBuild=1 /p:BuildNumber=2.1.30201.0 /p:RxRelease=RTM diff --git a/Rx/NET/Source/Common.targets b/Rx/NET/Source/Common.targets index 713def0..90b6fe2 100644 --- a/Rx/NET/Source/Common.targets +++ b/Rx/NET/Source/Common.targets @@ -93,7 +93,7 @@ v4.5 Profile78 true - true + @@ -102,7 +102,7 @@ true DESKTOPCLR DESKTOPCLR45 - true + @@ -113,7 +113,7 @@ WINDOWS WINDOWS8 en-US - true + @@ -153,7 +153,7 @@ WINDOWS_PHONE WINDOWS_PHONE8 true - true + diff --git a/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs b/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs index 067cc72..37ff279 100644 --- a/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs +++ b/Rx/NET/Source/Microsoft.Reactive.Testing/ReactiveTest.cs @@ -64,6 +64,20 @@ namespace Microsoft.Reactive.Testing return new Recorded>(ticks, Notification.CreateOnCompleted()); } + /// + /// Factory method for an OnCompleted notification record at a given time. + /// + /// The element type for the resulting notification object. + /// An unused instance of type T, to force the compiler to infer that T as part of the method's return value. + /// Recorded virtual time the OnCompleted notification occurs. + /// Recorded OnCompleted notification. + /// This overload is used for anonymous types - by passing in an instance of the type, the compiler can infer the + /// anonymous type without you having to try naming the type. + public static Recorded> OnCompleted(T dummy, long ticks) + { + return new Recorded>(ticks, Notification.CreateOnCompleted()); + } + /// /// Factory method for an OnError notification record at a given time with a given error. /// @@ -96,6 +110,44 @@ namespace Microsoft.Reactive.Testing return new Recorded>(ticks, new OnErrorPredicate(predicate)); } + /// + /// Factory method for an OnError notification record at a given time with a given error. + /// + /// The element type for the resulting notification object. + /// An unused instance of type T, to force the compiler to infer that T as part of the method's return value. + /// Recorded virtual time the OnError notification occurs. + /// Recorded exception stored in the OnError notification. + /// Recorded OnError notification. + /// is null. + /// This overload is used for anonymous types - by passing in an instance of the type, the compiler can infer the + /// anonymous type without you having to try naming the type. + public static Recorded> OnError(T dummy, long ticks, Exception exception) + { + if (exception == null) + throw new ArgumentNullException("exception"); + + return new Recorded>(ticks, Notification.CreateOnError(exception)); + } + + /// + /// Factory method for writing an assert that checks for an OnError notification record at a given time, using the specified predicate to check the exception. + /// + /// The element type for the resulting notification object. + /// An unused instance of type T, to force the compiler to infer that T as part of the method's return value. + /// Recorded virtual time the OnError notification occurs. + /// Predicate function to check the OnError notification value against an expected exception. + /// Recorded OnError notification with a predicate to assert a given exception. + /// is null. + /// This overload is used for anonymous types - by passing in an instance of the type, the compiler can infer the + /// anonymous type without you having to try naming the type. + public static Recorded> OnError(T dummy, long ticks, Func predicate) + { + if (predicate == null) + throw new ArgumentNullException("predicate"); + + return new Recorded>(ticks, new OnErrorPredicate(predicate)); + } + /// /// Factory method for a subscription record based on a given subscription and disposal time. /// diff --git a/Rx/NET/Source/README.txt b/Rx/NET/Source/README.txt index 2239644..b808037 100644 --- a/Rx/NET/Source/README.txt +++ b/Rx/NET/Source/README.txt @@ -1,11 +1,18 @@ -Build Instructions +To build all flavors of Rx, you will need several different SDK's installed: -In order to be able to build all configurations listed in the Visual Studio solution, the following -components need to be installed: - * Windows Phone 8 SDK (https://dev.windowsphone.com/en-us/downloadsdk) - * Visual Studio SDK for building VS extensions, VSIX (http://www.microsoft.com/en-us/download/details.aspx?id=30668) - * Windows Installer Xml (WiX) toolset (http://wix.codeplex.com) +Visual Studio 2012 +Windows Phone 8 SDK (create.msdn.com) +Windows Phone 7.1 SDK (create.msdn.com) +Silverlight 4 SDK +Microsoft Silverlight 4 Tools for Visual Studio 2010 +Xbox XNA Game Studio 4.0* - To build the XNA components, you need to install the Xbox XNA Game Studio 4.0. - Instructions on how install the required XNA components on Windows 8 are available at the - following link: http://blogs.msdn.com/b/astebner/archive/2012/02/29/10274694.aspx \ No newline at end of file +* Note: Installing Xbox XNA Game Studio is tricky on Windows 8, as you must install some other dependencies first. + More information can be found here: http://blogs.msdn.com/b/astebner/archive/2012/02/29/10274694.aspx + +For building installers and Visual Studio extensions, we need the following installed: +Visual Studio 2012 SDK +Windows Installer XML (WiX) toolset (wix.codeplex.com), preferably version 3.5. + + +Note: the XNA Game Studio build has not been actively maintained and may not build. Feel free to fix it! diff --git a/Rx/NET/Source/Rx.sln b/Rx/NET/Source/Rx.sln index d563cad..c46be4f 100644 --- a/Rx/NET/Source/Rx.sln +++ b/Rx/NET/Source/Rx.sln @@ -45,10 +45,10 @@ Global GlobalSection(TeamFoundationVersionControl) = preSolution SccNumberOfProjects = 1 SccEnterpriseProvider = {4CA58AB2-18FA-4F8D-95D4-32DDF27D184C} - SccTeamFoundationServer = https://tfs1.interop.msftlabs.com/tfs/interop%20team%20projects + SccTeamFoundationServer = http://tfs1:8085/tfs/interop%20team%20projects SccProjectUniqueName0 = Playground\\Playground.csproj SccProjectName0 = Playground - SccAuxPath0 = https://tfs1.interop.msftlabs.com/tfs/interop%20team%20projects + SccAuxPath0 = http://tfs1:8085/tfs/interop%20team%20projects SccLocalPath0 = Playground SccProvider0 = {4CA58AB2-18FA-4F8D-95D4-32DDF27D184C} EndGlobalSection diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs index 9cf8105..4b35af6 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs @@ -694,10 +694,15 @@ namespace System.Reactive.Linq IObservable Select(IObservable source, Func selector); IObservable SelectMany(IObservable source, IObservable other); IObservable SelectMany(IObservable source, Func> selector); + IObservable SelectMany(IObservable source, Func> selector); IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector); + IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector); IObservable SelectMany(IObservable source, Func> onNext, Func> onError, Func> onCompleted); + IObservable SelectMany(IObservable source, Func> onNext, Func> onError, Func> onCompleted); IObservable SelectMany(IObservable source, Func> selector); + IObservable SelectMany(IObservable source, Func> selector); IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector); + IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector); IObservable Skip(IObservable source, int count); IObservable SkipWhile(IObservable source, Func predicate); IObservable SkipWhile(IObservable source, Func predicate); diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs index 749f823..5d8e3ef 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Single.cs @@ -506,6 +506,30 @@ namespace System.Reactive.Linq return s_impl.StartWith(source, values); } + /// + /// Prepends a sequence of values to an observable sequence. + /// + /// The type of the elements in the source sequence. + /// Source sequence to prepend values to. + /// Values to prepend to the specified sequence. + /// The source sequence prepended with the specified values. + /// or is null. + public static IObservable StartWith(this IObservable source, IEnumerable values) + { + if (source == null) + throw new ArgumentNullException("source"); + if (values == null) + throw new ArgumentNullException("values"); + + TSource[] valueArray = values as TSource[]; + if (valueArray == null) + { + List valueList = new List(values); + valueArray = valueList.ToArray(); + } + return s_impl.StartWith(source, valueArray); + } + /// /// Prepends a sequence of values to an observable sequence. /// @@ -527,6 +551,33 @@ namespace System.Reactive.Linq return s_impl.StartWith(source, scheduler, values); } + /// + /// Prepends a sequence of values to an observable sequence. + /// + /// The type of the elements in the source sequence. + /// Source sequence to prepend values to. + /// Scheduler to emit the prepended values on. + /// Values to prepend to the specified sequence. + /// The source sequence prepended with the specified values. + /// or or is null. + public static IObservable StartWith(this IObservable source, IScheduler scheduler, IEnumerable values) + { + if (source == null) + throw new ArgumentNullException("source"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (values == null) + throw new ArgumentNullException("values"); + + TSource[] valueArray = values as TSource[]; + if (valueArray == null) + { + List valueList = new List(values); + valueArray = valueList.ToArray(); + } + return s_impl.StartWith(source, scheduler, valueArray); + } + #endregion #region + TakeLast + diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs index e20e3eb..8eb9289 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs @@ -544,6 +544,25 @@ namespace System.Reactive.Linq return s_impl.SelectMany(source, selector); } + /// + /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected inner sequences and the elements in the merged result sequence. + /// An observable sequence of elements to project. + /// A transform function to apply to each source element; the second parameter of the function represents the index of the source element. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + /// or is null. + public static IObservable SelectMany(this IObservable source, Func> selector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (selector == null) + throw new ArgumentNullException("selector"); + + return s_impl.SelectMany(source, selector); + } + #if !NO_TPL /// /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence. @@ -609,6 +628,29 @@ namespace System.Reactive.Linq return s_impl.SelectMany(source, collectionSelector, resultSelector); } + /// + /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected intermediate sequences. + /// The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements. + /// An observable sequence of elements to project. + /// A transform function to apply to each source element; the second parameter of the function represents the index of the source element. + /// A transform function to apply to each element of the intermediate sequence. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element. + /// or or is null. + public static IObservable SelectMany(this IObservable source, Func> collectionSelector, Func resultSelector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (collectionSelector == null) + throw new ArgumentNullException("collectionSelector"); + if (resultSelector == null) + throw new ArgumentNullException("resultSelector"); + + return s_impl.SelectMany(source, collectionSelector, resultSelector); + } + #if !NO_TPL /// /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence. @@ -684,6 +726,31 @@ namespace System.Reactive.Linq return s_impl.SelectMany(source, onNext, onError, onCompleted); } + /// + /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected inner sequences and the elements in the merged result sequence. + /// An observable sequence of notifications to project. + /// A transform function to apply to each element; the second parameter represents the index of the source element. + /// A transform function to apply when an error occurs in the source sequence; the second parameter represents the index of the source element. + /// A transform function to apply when the end of the source sequence is reached; the second parameter represents the number of elements observed. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence. + /// or or or is null. + public static IObservable SelectMany(this IObservable source, Func> onNext, Func> onError, Func> onCompleted) + { + if (source == null) + throw new ArgumentNullException("source"); + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + return s_impl.SelectMany(source, onNext, onError, onCompleted); + } + /// /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence. /// @@ -704,6 +771,27 @@ namespace System.Reactive.Linq return s_impl.SelectMany(source, selector); } + /// + /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence. + /// The index of each source element is used in the projected form of that element. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence. + /// An observable sequence of elements to project. + /// A transform function to apply to each source element; the second parameter of the function represents the index of the source element. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + /// or is null. + /// The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the conversion. + public static IObservable SelectMany(this IObservable source, Func> selector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (selector == null) + throw new ArgumentNullException("selector"); + + return s_impl.SelectMany(source, selector); + } + /// /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence. /// @@ -728,6 +816,30 @@ namespace System.Reactive.Linq return s_impl.SelectMany(source, collectionSelector, resultSelector); } + /// + /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected intermediate enumerable sequences. + /// The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements. + /// An observable sequence of elements to project. + /// A transform function to apply to each element; the second parameter of the function represents the index of the source element. + /// A transform function to apply to each element of the intermediate sequence. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element. + /// or or is null. + /// The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the conversion. + public static IObservable SelectMany(this IObservable source, Func> collectionSelector, Func resultSelector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (collectionSelector == null) + throw new ArgumentNullException("collectionSelector"); + if (resultSelector == null) + throw new ArgumentNullException("resultSelector"); + + return s_impl.SelectMany(source, collectionSelector, resultSelector); + } + #endregion #region + Skip + diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs index 687a894..054f974 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs @@ -17,8 +17,11 @@ namespace System.Reactive.Linq.Observαble { private readonly IObservable _source; private readonly Func> _collectionSelector; + private readonly Func> _collectionSelectorWithIndex; private readonly Func> _collectionSelectorE; + private readonly Func> _collectionSelectorEWithIndex; private readonly Func _resultSelector; + private readonly Func _resultSelectorWithIndex; public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { @@ -27,6 +30,13 @@ namespace System.Reactive.Linq.Observαble _resultSelector = resultSelector; } + public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) + { + _source = source; + _collectionSelectorWithIndex = collectionSelector; + _resultSelectorWithIndex = resultSelector; + } + public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; @@ -34,6 +44,13 @@ namespace System.Reactive.Linq.Observαble _resultSelector = resultSelector; } + public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) + { + _source = source; + _collectionSelectorEWithIndex = collectionSelector; + _resultSelectorWithIndex = resultSelector; + } + #if !NO_TPL private readonly Func> _collectionSelectorT; @@ -47,7 +64,7 @@ namespace System.Reactive.Linq.Observαble protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { - if (_collectionSelector != null) + if (_collectionSelector != null || _collectionSelectorWithIndex != null) { var sink = new _(this, observer, cancel); setSink(sink); @@ -77,12 +94,14 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; + _indexInSource = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; + private int _indexInSource; public IDisposable Run() { @@ -103,7 +122,13 @@ namespace System.Reactive.Linq.Observαble try { - collection = _parent._collectionSelector(value); + if (_parent._collectionSelector != null) + collection = _parent._collectionSelector(value); + else + { + checked { _indexInSource++; } + collection = _parent._collectionSelectorWithIndex(value, _indexInSource); + } } catch (Exception ex) { @@ -117,7 +142,7 @@ namespace System.Reactive.Linq.Observαble var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription)); + innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource)); } public void OnError(Exception error) @@ -158,12 +183,16 @@ namespace System.Reactive.Linq.Observαble private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; + private int _indexInSource; + private int _indexInIntermediate = -1; - public ι(_ parent, TSource value, IDisposable self) + public ι(_ parent, TSource value, IDisposable self, int indexInSource) { _parent = parent; _value = value; _self = self; + _indexInSource = indexInSource; + _indexInIntermediate = -1; } public void OnNext(TCollection value) @@ -172,7 +201,13 @@ namespace System.Reactive.Linq.Observαble try { - res = _parent._parent._resultSelector(_value, value); + if (_parent._parent._resultSelector != null) + res = _parent._parent._resultSelector(_value, value); + else + { + checked { _indexInIntermediate++; } + res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate); + } } catch (Exception ex) { @@ -222,11 +257,13 @@ namespace System.Reactive.Linq.Observαble class ε : Sink, IObserver { private readonly SelectMany _parent; + private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection public ε(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; + _indexInSource = -1; } public void OnNext(TSource value) @@ -234,7 +271,13 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable); try { - xs = _parent._collectionSelectorE(value); + if (_parent._collectionSelectorE != null) + xs = _parent._collectionSelectorE(value); + else + { + checked { _indexInSource++; } + xs = _parent._collectionSelectorEWithIndex(value, _indexInSource); + } } catch (Exception exception) { @@ -257,6 +300,7 @@ namespace System.Reactive.Linq.Observαble try { + int indexInIntermediate = -1; var hasNext = true; while (hasNext) { @@ -267,7 +311,15 @@ namespace System.Reactive.Linq.Observαble { hasNext = e.MoveNext(); if (hasNext) - current = _parent._resultSelector(value, e.Current); + { + if (_parent._resultSelector != null) + current = _parent._resultSelector(value, e.Current); + else + { + checked { indexInIntermediate++; } + current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate); + } + } } catch (Exception exception) { @@ -445,7 +497,11 @@ namespace System.Reactive.Linq.Observαble private readonly Func> _selector; private readonly Func> _selectorOnError; private readonly Func> _selectorOnCompleted; + private readonly Func> _selectorWithIndex; + private readonly Func> _selectorWithIndexOnError; + private readonly Func> _selectorWithIndexOnCompleted; private readonly Func> _selectorE; + private readonly Func> _selectorEWithIndex; public SelectMany(IObservable source, Func> selector) { @@ -461,12 +517,32 @@ namespace System.Reactive.Linq.Observαble _selectorOnCompleted = selectorOnCompleted; } + public SelectMany(IObservable source, Func> selector) + { + _source = source; + _selectorWithIndex = selector; + } + + public SelectMany(IObservable source, Func> selector, Func> selectorOnError, Func> selectorOnCompleted) + { + _source = source; + _selectorWithIndex = selector; + _selectorWithIndexOnError = selectorOnError; + _selectorWithIndexOnCompleted = selectorOnCompleted; + } + public SelectMany(IObservable source, Func> selector) { _source = source; _selectorE = selector; } + public SelectMany(IObservable source, Func> selector) + { + _source = source; + _selectorEWithIndex = selector; + } + #if !NO_TPL private readonly Func> _selectorT; @@ -479,7 +555,7 @@ namespace System.Reactive.Linq.Observαble protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { - if (_selector != null) + if (_selector != null || _selectorWithIndex != null) { var sink = new _(this, observer, cancel); setSink(sink); @@ -509,12 +585,14 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; + _index = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; + private int _index; public IDisposable Run() { @@ -535,7 +613,13 @@ namespace System.Reactive.Linq.Observαble try { - inner = _parent._selector(value); + if (_parent._selector != null) + inner = _parent._selector(value); + else + { + checked { _index++; } + inner = _parent._selectorWithIndex(value, _index); + } } catch (Exception ex) { @@ -558,7 +642,13 @@ namespace System.Reactive.Linq.Observαble try { - inner = _parent._selectorOnError(error); + if (_parent._selectorOnError != null) + inner = _parent._selectorOnError(error); + else + { + checked { _index++; } + inner = _parent._selectorWithIndexOnError(error, _index); + } } catch (Exception ex) { @@ -592,7 +682,10 @@ namespace System.Reactive.Linq.Observαble try { - inner = _parent._selectorOnCompleted(); + if (_parent._selectorOnCompleted != null) + inner = _parent._selectorOnCompleted(); + else + inner = _parent._selectorWithIndexOnCompleted(_index); } catch (Exception ex) { @@ -692,11 +785,13 @@ namespace System.Reactive.Linq.Observαble class ε : Sink, IObserver { private readonly SelectMany _parent; + private int _index; public ε(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; + _index = -1; } public void OnNext(TSource value) @@ -704,7 +799,13 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable); try { - xs = _parent._selectorE(value); + if (_parent._selectorE != null) + xs = _parent._selectorE(value); + else + { + checked { _index++; } + xs = _parent._selectorEWithIndex(value, _index); + } } catch (Exception exception) { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs index 09e2df4..5f64e16 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs @@ -496,6 +496,25 @@ namespace System.Reactive.Linq return StartWith_(source, scheduler, values); } + public virtual IObservable StartWith(IObservable source, IEnumerable values) + { + return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values); + } + + public virtual IObservable StartWith(IObservable source, IScheduler scheduler, IEnumerable values) + { + if (values == null) + throw new ArgumentNullException("values"); + + var valueArray = values as TSource[]; + if (valueArray == null) + { + List valueList = new List(values); + valueArray = valueList.ToArray(); + } + return StartWith_(source, scheduler, valueArray); + } + private static IObservable StartWith_(IObservable source, IScheduler scheduler, params TSource[] values) { return values.ToObservable(scheduler).Concat(source); diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs index eff30dd..0089f04 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs @@ -838,6 +838,11 @@ namespace System.Reactive.Linq return SelectMany_(source, selector); } + public virtual IObservable SelectMany(IObservable source, Func> selector) + { + return SelectMany_(source, selector); + } + #if !NO_TPL public virtual IObservable SelectMany(IObservable source, Func> selector) { @@ -863,6 +868,11 @@ namespace System.Reactive.Linq return SelectMany_(source, collectionSelector, resultSelector); } + public virtual IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) + { + return SelectMany_(source, collectionSelector, resultSelector); + } + #if !NO_TPL public virtual IObservable SelectMany(IObservable source, Func> taskSelector, Func resultSelector) { @@ -892,6 +902,15 @@ namespace System.Reactive.Linq #endif } + private static IObservable SelectMany_(IObservable source, Func> selector) + { +#if !NO_PERF + return new SelectMany(source, selector); +#else + return source.Select(selector).Merge(); +#endif + } + private static IObservable SelectMany_(IObservable source, Func> collectionSelector, Func resultSelector) { #if !NO_PERF @@ -901,6 +920,15 @@ namespace System.Reactive.Linq #endif } + private static IObservable SelectMany_(IObservable source, Func> collectionSelector, Func resultSelector) + { +#if !NO_PERF + return new SelectMany(source, collectionSelector, resultSelector); +#else + return SelectMany_(source, x => collectionSelector(x).Select(y => resultSelector(x, y))); +#endif + } + public virtual IObservable SelectMany(IObservable source, Func> onNext, Func> onError, Func> onCompleted) { #if !NO_PERF @@ -918,6 +946,23 @@ namespace System.Reactive.Linq #endif } + public virtual IObservable SelectMany(IObservable source, Func> onNext, Func> onError, Func> onCompleted) + { +#if !NO_PERF + return new SelectMany(source, onNext, onError, onCompleted); +#else + return source.Materialize().SelectMany(notification => + { + if (notification.Kind == NotificationKind.OnNext) + return onNext(notification.Value); + else if (notification.Kind == NotificationKind.OnError) + return onError(notification.Exception); + else + return onCompleted(); + }); +#endif + } + public virtual IObservable SelectMany(IObservable source, Func> selector) { #if !NO_PERF @@ -927,6 +972,15 @@ namespace System.Reactive.Linq #endif } + public virtual IObservable SelectMany(IObservable source, Func> selector) + { +#if !NO_PERF + return new SelectMany(source, selector); +#else + return SelectMany_(source, selector, (_, x) => x); +#endif + } + public virtual IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { return SelectMany_(source, collectionSelector, resultSelector); @@ -991,6 +1045,11 @@ namespace System.Reactive.Linq #endif } + public virtual IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) + { + return new SelectMany(source, collectionSelector, resultSelector); + } + #endregion #region + Skip + diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs index 896c451..fc33401 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Timestamped.cs @@ -112,4 +112,22 @@ namespace System.Reactive return String.Format(CultureInfo.CurrentCulture, "{0}@{1}", Value, Timestamp); } } + + /// + /// A helper class with a factory method for creating Timestamped<T> instances. + /// + public static class Timestamped + { + /// + /// Creates an instance of a Timestamped<T>. This is syntactic sugar that uses type inference + /// to avoid specifying a type in a constructor call, which is very useful when using anonymous types. + /// + /// The value to be annotated with a timestamp. + /// Timestamp associated with the value. + /// Creates a new timestamped value. + public static Timestamped Create(T value, DateTimeOffset timestamp) + { + return new Timestamped(value, timestamp); + } + } } diff --git a/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs b/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs index 2cf02e9..2a5f3e4 100644 --- a/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs +++ b/Rx/NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs @@ -1,5 +1,6 @@ /* - * WARNING: Auto-generated file (8/14/2012 12:14:10 AM) + * WARNING: Auto-generated file (1/10/2013 8:30:32 PM) + * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory). */ #pragma warning disable 1591 @@ -8507,27 +8508,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum value. + /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8538,27 +8539,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum value. + /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8569,27 +8570,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum value. + /// Invokes a transform function on each element of a sequence and returns the maximum value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8600,27 +8601,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. + /// Invokes a transform function on each element of a sequence and returns the maximum value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8631,27 +8632,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. + /// Invokes a transform function on each element of a sequence and returns the maximum value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8662,27 +8663,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. + /// Invokes a transform function on each element of a sequence and returns the maximum value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8693,27 +8694,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. + /// Invokes a transform function on each element of a sequence and returns the maximum value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8724,27 +8725,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. + /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8755,27 +8756,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum value. + /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -8786,27 +8787,27 @@ namespace System.Reactive.Linq } /// - /// Invokes a transform function on each element of a sequence and returns the maximum value. + /// Invokes a transform function on each element of a sequence and returns the maximum nullable value. /// /// The type of the elements in the source sequence. /// A sequence of values to determine the maximum value of. /// A transform function to apply to each element. - /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. + /// An observable sequence containing a single element with the value of type that corresponds to the maximum value in the source sequence. /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Max(this IQbservable source, Expression> selector) + public static IQbservable Max(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Max(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -11465,6 +11466,42 @@ namespace System.Reactive.Linq ); } + /// + /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected intermediate sequences. + /// The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements. + /// An observable sequence of elements to project. + /// A transform function to apply to each source element; the second parameter of the function represents the index of the source element. + /// A transform function to apply to each element of the intermediate sequence. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element. + /// + /// or or is null. + public static IQbservable SelectMany(this IQbservable source, Expression>> collectionSelector, Expression> resultSelector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (collectionSelector == null) + throw new ArgumentNullException("collectionSelector"); + if (resultSelector == null) + throw new ArgumentNullException("resultSelector"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.SelectMany(default(IQbservable), default(Expression>>), default(Expression>))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)), +#endif + source.Expression, + collectionSelector, + resultSelector + ) + ); + } + /// /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence. /// @@ -11502,6 +11539,43 @@ namespace System.Reactive.Linq ); } + /// + /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected intermediate enumerable sequences. + /// The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements. + /// An observable sequence of elements to project. + /// A transform function to apply to each element; the second parameter of the function represents the index of the source element. + /// A transform function to apply to each element of the intermediate sequence. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element. + /// + /// or or is null. + /// The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the conversion. + public static IQbservable SelectMany(this IQbservable source, Expression>> collectionSelector, Expression> resultSelector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (collectionSelector == null) + throw new ArgumentNullException("collectionSelector"); + if (resultSelector == null) + throw new ArgumentNullException("resultSelector"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.SelectMany(default(IQbservable), default(Expression>>), default(Expression>))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)), +#endif + source.Expression, + collectionSelector, + resultSelector + ) + ); + } + /// /// Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence. /// @@ -11572,6 +11646,45 @@ namespace System.Reactive.Linq ); } + /// + /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected inner sequences and the elements in the merged result sequence. + /// An observable sequence of notifications to project. + /// A transform function to apply to each element; the second parameter represents the index of the source element. + /// A transform function to apply when an error occurs in the source sequence; the second parameter represents the index of the source element. + /// A transform function to apply when the end of the source sequence is reached; the second parameter represents the number of elements observed. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence. + /// + /// or or or is null. + public static IQbservable SelectMany(this IQbservable source, Expression>> onNext, Expression>> onError, Expression>> onCompleted) + { + if (source == null) + throw new ArgumentNullException("source"); + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.SelectMany(default(IQbservable), default(Expression>>), default(Expression>>), default(Expression>>))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), +#endif + source.Expression, + onNext, + onError, + onCompleted + ) + ); + } + /// /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. /// @@ -11603,6 +11716,37 @@ namespace System.Reactive.Linq ); } + /// + /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected inner sequences and the elements in the merged result sequence. + /// An observable sequence of elements to project. + /// A transform function to apply to each source element; the second parameter of the function represents the index of the source element. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + /// + /// or is null. + public static IQbservable SelectMany(this IQbservable source, Expression>> selector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (selector == null) + throw new ArgumentNullException("selector"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.SelectMany(default(IQbservable), default(Expression>>))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), +#endif + source.Expression, + selector + ) + ); + } + #if !NO_TPL /// /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence. @@ -11703,6 +11847,39 @@ namespace System.Reactive.Linq ); } + /// + /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence. + /// The index of each source element is used in the projected form of that element. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence. + /// An observable sequence of elements to project. + /// A transform function to apply to each source element; the second parameter of the function represents the index of the source element. + /// An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + /// + /// or is null. + /// The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the conversion. + public static IQbservable SelectMany(this IQbservable source, Expression>> selector) + { + if (source == null) + throw new ArgumentNullException("source"); + if (selector == null) + throw new ArgumentNullException("selector"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.SelectMany(default(IQbservable), default(Expression>>))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), +#endif + source.Expression, + selector + ) + ); + } + #if !NO_TPL /// /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence. @@ -12795,6 +12972,40 @@ namespace System.Reactive.Linq ); } + /// + /// Prepends a sequence of values to an observable sequence. + /// + /// The type of the elements in the source sequence. + /// Source sequence to prepend values to. + /// Scheduler to emit the prepended values on. + /// Values to prepend to the specified sequence. + /// The source sequence prepended with the specified values. + /// + /// or or is null. + public static IQbservable StartWith(this IQbservable source, IScheduler scheduler, IEnumerable values) + { + if (source == null) + throw new ArgumentNullException("source"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (values == null) + throw new ArgumentNullException("values"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.StartWith(default(IQbservable), default(IScheduler), default(IEnumerable))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), +#endif + source.Expression, + Expression.Constant(scheduler, typeof(IScheduler)), + GetSourceExpression(values) + ) + ); + } + /// /// Prepends a sequence of values to an observable sequence. /// @@ -12825,6 +13036,36 @@ namespace System.Reactive.Linq ); } + /// + /// Prepends a sequence of values to an observable sequence. + /// + /// The type of the elements in the source sequence. + /// Source sequence to prepend values to. + /// Values to prepend to the specified sequence. + /// The source sequence prepended with the specified values. + /// + /// or is null. + public static IQbservable StartWith(this IQbservable source, IEnumerable values) + { + if (source == null) + throw new ArgumentNullException("source"); + if (values == null) + throw new ArgumentNullException("values"); + + return source.Provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.StartWith(default(IQbservable), default(IEnumerable))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), +#endif + source.Expression, + GetSourceExpression(values) + ) + ); + } + /// /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used; /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn. @@ -13162,7 +13403,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13171,18 +13412,18 @@ namespace System.Reactive.Linq /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13193,7 +13434,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13202,18 +13443,18 @@ namespace System.Reactive.Linq /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13224,7 +13465,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13234,18 +13475,18 @@ namespace System.Reactive.Linq /// or is null. /// (Asynchronous) The sum of the projected values for the elements in the source sequence is larger than . /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13256,7 +13497,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13266,18 +13507,18 @@ namespace System.Reactive.Linq /// or is null. /// (Asynchronous) The sum of the projected values for the elements in the source sequence is larger than . /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13288,7 +13529,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13298,18 +13539,18 @@ namespace System.Reactive.Linq /// or is null. /// (Asynchronous) The sum of the projected values for the elements in the source sequence is larger than . /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13320,7 +13561,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13329,18 +13570,18 @@ namespace System.Reactive.Linq /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13351,7 +13592,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13360,18 +13601,18 @@ namespace System.Reactive.Linq /// /// or is null. /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13382,7 +13623,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13392,18 +13633,18 @@ namespace System.Reactive.Linq /// or is null. /// (Asynchronous) The sum of the projected values for the elements in the source sequence is larger than . /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13414,7 +13655,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13424,18 +13665,18 @@ namespace System.Reactive.Linq /// or is null. /// (Asynchronous) The sum of the projected values for the elements in the source sequence is larger than . /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif @@ -13446,7 +13687,7 @@ namespace System.Reactive.Linq } /// - /// Computes the sum of a sequence of values that are obtained by invoking a transform function on each element of the input sequence. + /// Computes the sum of a sequence of nullable values that are obtained by invoking a transform function on each element of the input sequence. /// /// The type of the elements in the source sequence. /// A sequence of values that are used to calculate a sum. @@ -13456,18 +13697,18 @@ namespace System.Reactive.Linq /// or is null. /// (Asynchronous) The sum of the projected values for the elements in the source sequence is larger than . /// The return type of this operator differs from the corresponding operator on IEnumerable in order to retain asynchronous behavior. - public static IQbservable Sum(this IQbservable source, Expression> selector) + public static IQbservable Sum(this IQbservable source, Expression> selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); - return source.Provider.CreateQuery( + return source.Provider.CreateQuery( Expression.Call( null, #if CRIPPLED_REFLECTION - InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), + InfoOf(() => Qbservable.Sum(default(IQbservable), default(Expression>))), #else ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), #endif diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs index 7361fed..8ebe588 100644 --- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs +++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableSingleTest.cs @@ -3568,6 +3568,56 @@ namespace ReactiveTests.Tests ); } + [TestMethod] + public void StartWith_Enumerable() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(150, 1), + OnNext(220, 4), + OnCompleted(250) + ); + + List data = new List(new[] { 1, 2, 3 }); + var res = scheduler.Start(() => + xs.StartWith(data) + ); + + res.Messages.AssertEqual( + OnNext(200, 1), + OnNext(200, 2), + OnNext(200, 3), + OnNext(220, 4), + OnCompleted(250) + ); + } + + [TestMethod] + public void StartWith_Enumerable_Scheduler() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(150, 1), + OnNext(220, 4), + OnCompleted(250) + ); + + List data = new List(new[] { 1, 2, 3 }); + var res = scheduler.Start(() => + xs.StartWith(scheduler, data) + ); + + res.Messages.AssertEqual( + OnNext(201, 1), + OnNext(202, 2), + OnNext(203, 3), + OnNext(220, 4), + OnCompleted(250) + ); + } + #endregion #region + TakeLast + diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs index e006bc1..0670909 100644 --- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs +++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs @@ -8424,6 +8424,198 @@ namespace ReactiveTests.Tests ); } + [TestMethod] + // Tests this overload: + // IObservable SelectMany(IObservable source, Func> selector); + public void SelectMany_WithIndex_Complete() + { + var scheduler = new TestScheduler(); + + ITestableObservable cs = scheduler.CreateHotObservable( + OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored. + OnNext(250, 'a'), + OnNext(270, 'l'), + OnNext(310, 'o'), + OnCompleted(410) + ); + + var res = scheduler.Start(() => + cs.SelectMany( + (x, i) => Observable.Return(new { x, i }, scheduler) + )); + + res.Messages.AssertEqual( + OnNext(251, new { x = 'a', i = 0 }), + OnNext(271, new { x = 'l', i = 1 }), + OnNext(311, new { x = 'o', i = 2 }), + OnCompleted(new { x = default(char), i = default(int) }, 410) + ); + + cs.Subscriptions.AssertEqual( + Subscribe(200, 410)); + } + + [TestMethod] + // Tests this overload: + // IObservable SelectMany(IObservable source, Func> selector); + public void SelectMany_WithIndex_IEnumerable_Complete() + { + var scheduler = new TestScheduler(); + + ITestableObservable cs = scheduler.CreateHotObservable( + OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored. + OnNext(250, 'a'), + OnNext(270, 'l'), + OnNext(310, 'o'), + OnCompleted(410) + ); + + var res = scheduler.Start(() => + cs.SelectMany( + (c, i) => new [] { new { c = c, i = i } } + )); + + + res.Messages.AssertEqual( + OnNext(250, new { c = 'a', i = 0 }), + OnNext(270, new { c = 'l', i = 1 }), + OnNext(310, new { c = 'o', i = 2 }), + OnCompleted(new { c = default(char), i = default(int) }, 410) + ); + + cs.Subscriptions.AssertEqual( + Subscribe(200, 410)); + } + + + [TestMethod] + // Tests this overload: + // IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector); + public void SelectMany_WithIndex_IObservable_ResultSelector_Complete() + { + var scheduler = new TestScheduler(); + + ITestableObservable> css = scheduler.CreateHotObservable( + OnNext(190, scheduler.CreateColdObservable( + OnNext(1, 'h'), + OnCompleted(2))), + OnNext(250, scheduler.CreateColdObservable( + OnNext(1, 'a'), + OnCompleted(2))), + OnNext(270, scheduler.CreateColdObservable( + OnNext(1, 'l'), + OnCompleted(2))), + OnNext(310, scheduler.CreateColdObservable( + OnNext(1, 'o'), + OnNext(2, ' '), + OnNext(3, 'r'), + OnNext(4, 'u'), + OnNext(5, 'l'), + OnNext(6, 'e'), + OnNext(7, 'z'), + OnCompleted(8))), + OnCompleted>(410) + ); + + var res = scheduler.Start(() => + css.SelectMany( + (foo, i) => + { + return foo.Select(c => new { c = c, i = i }); + }, + (source, i, cs, j) => new { c = cs.c, i = cs.i, i2 = i, j = j } + )); + + res.Messages.AssertEqual( + OnNext(251, new { c = 'a', i = 0, i2 = 0, j = 0 }), + OnNext(271, new { c = 'l', i = 1, i2 = 1, j = 0 }), + OnNext(311, new { c = 'o', i = 2, i2 = 2, j = 0 }), + OnNext(312, new { c = ' ', i = 2, i2 = 2, j = 1 }), + OnNext(313, new { c = 'r', i = 2, i2 = 2, j = 2 }), + OnNext(314, new { c = 'u', i = 2, i2 = 2, j = 3 }), + OnNext(315, new { c = 'l', i = 2, i2 = 2, j = 4 }), + OnNext(316, new { c = 'e', i = 2, i2 = 2, j = 5 }), + OnNext(317, new { c = 'z', i = 2, i2 = 2, j = 6 }), + OnCompleted(new { c = 'a', i = 0, i2 = 0, j = 0 }, 410) + ); + + css.Subscriptions.AssertEqual( + Subscribe(200, 410)); + } + + + [TestMethod] + // Tests this overload: + // IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector); + public void SelectMany_WithIndex_IEnumerable_ResultSelector_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 5), + OnNext(340, 4), + OnNext(420, 3), + OnCompleted(600) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, i) => new[] { new { x = x + 1, i = i }, new { x = -x, i = i }, new { x = x * x, i = i } }, + (x, i, y, j) => new { x = x, i = i, y = y.x, y_i = y.i, j = j }) + ); + + res.Messages.AssertEqual( + OnNext(210, new { x = 5, i = 0, y = 6, y_i = 0, j = 0 }), + OnNext(210, new { x = 5, i = 0, y = -5, y_i = 0, j = 1 }), + OnNext(210, new { x = 5, i = 0, y = 25, y_i = 0, j = 2 }), + OnNext(340, new { x = 4, i = 1, y = 5, y_i = 1, j = 0 }), + OnNext(340, new { x = 4, i = 1, y = -4, y_i = 1, j = 1 }), + OnNext(340, new { x = 4, i = 1, y = 16, y_i = 1, j = 2 }), + OnNext(420, new { x = 3, i = 2, y = 4, y_i = 2, j = 0 }), + OnNext(420, new { x = 3, i = 2, y = -3, y_i = 2, j = 1 }), + OnNext(420, new { x = 3, i = 2, y = 9, y_i = 2, j = 2 }), + OnCompleted(new { x = default(int), i = default(int), y = default(int), y_i = default(int), j = default(int) }, 600) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 600) + ); + } + + [TestMethod] + // Tests this overload: + // IObservable SelectMany(IObservable source, Func> onNext, Func> onError, Func> onCompleted); + public void SelectMany_WithIndex_Triple_Complete() + { + var scheduler = new TestScheduler(); + + ITestableObservable cs = scheduler.CreateHotObservable( + OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored. + OnNext(250, 'a'), + OnNext(270, 'l'), + OnNext(310, 'o'), + OnCompleted(410) + ); + + var res = scheduler.Start(() => + cs.SelectMany( + (c, i) => Observable.Return(new { c = c, i = i }, scheduler), + (ex, i) => { throw ex; }, + (i) => Observable.Repeat(new { c = 'x', i = -1 }, i, scheduler) + )); + + res.Messages.AssertEqual( + OnNext(251, new { c = 'a', i = 0 }), + OnNext(271, new { c = 'l', i = 1 }), + OnNext(311, new { c = 'o', i = 2 }), + OnCompleted(new { c = default(char), i = default(int) }, 410) + ); + + cs.Subscriptions.AssertEqual( + Subscribe(200, 410)); + } + + [TestMethod] public void SelectMany_Enumerable_ArgumentChecking() { -- cgit v1.2.3