Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs125
1 files changed, 113 insertions, 12 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
index 687a894..054f974 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
@@ -17,8 +17,11 @@ namespace System.Reactive.Linq.Observαble
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
+ private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex;
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
+ private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
+ private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -27,6 +30,13 @@ namespace System.Reactive.Linq.Observαble
_resultSelector = resultSelector;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorWithIndex = collectionSelector;
+ _resultSelectorWithIndex = resultSelector;
+ }
+
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
_source = source;
@@ -34,6 +44,13 @@ namespace System.Reactive.Linq.Observαble
_resultSelector = resultSelector;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorEWithIndex = collectionSelector;
+ _resultSelectorWithIndex = resultSelector;
+ }
+
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
@@ -47,7 +64,7 @@ namespace System.Reactive.Linq.Observαble
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_collectionSelector != null)
+ if (_collectionSelector != null || _collectionSelectorWithIndex != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
@@ -77,12 +94,14 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
+ _indexInSource = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
+ private int _indexInSource;
public IDisposable Run()
{
@@ -103,7 +122,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- collection = _parent._collectionSelector(value);
+ if (_parent._collectionSelector != null)
+ collection = _parent._collectionSelector(value);
+ else
+ {
+ checked { _indexInSource++; }
+ collection = _parent._collectionSelectorWithIndex(value, _indexInSource);
+ }
}
catch (Exception ex)
{
@@ -117,7 +142,7 @@ namespace System.Reactive.Linq.Observαble
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription));
+ innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource));
}
public void OnError(Exception error)
@@ -158,12 +183,16 @@ namespace System.Reactive.Linq.Observαble
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
+ private int _indexInSource;
+ private int _indexInIntermediate = -1;
- public ι(_ parent, TSource value, IDisposable self)
+ public ι(_ parent, TSource value, IDisposable self, int indexInSource)
{
_parent = parent;
_value = value;
_self = self;
+ _indexInSource = indexInSource;
+ _indexInIntermediate = -1;
}
public void OnNext(TCollection value)
@@ -172,7 +201,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- res = _parent._parent._resultSelector(_value, value);
+ if (_parent._parent._resultSelector != null)
+ res = _parent._parent._resultSelector(_value, value);
+ else
+ {
+ checked { _indexInIntermediate++; }
+ res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate);
+ }
}
catch (Exception ex)
{
@@ -222,11 +257,13 @@ namespace System.Reactive.Linq.Observαble
class ε : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
+ private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection
public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
+ _indexInSource = -1;
}
public void OnNext(TSource value)
@@ -234,7 +271,13 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TCollection>);
try
{
- xs = _parent._collectionSelectorE(value);
+ if (_parent._collectionSelectorE != null)
+ xs = _parent._collectionSelectorE(value);
+ else
+ {
+ checked { _indexInSource++; }
+ xs = _parent._collectionSelectorEWithIndex(value, _indexInSource);
+ }
}
catch (Exception exception)
{
@@ -257,6 +300,7 @@ namespace System.Reactive.Linq.Observαble
try
{
+ int indexInIntermediate = -1;
var hasNext = true;
while (hasNext)
{
@@ -267,7 +311,15 @@ namespace System.Reactive.Linq.Observαble
{
hasNext = e.MoveNext();
if (hasNext)
- current = _parent._resultSelector(value, e.Current);
+ {
+ if (_parent._resultSelector != null)
+ current = _parent._resultSelector(value, e.Current);
+ else
+ {
+ checked { indexInIntermediate++; }
+ current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate);
+ }
+ }
}
catch (Exception exception)
{
@@ -445,7 +497,11 @@ namespace System.Reactive.Linq.Observαble
private readonly Func<TSource, IObservable<TResult>> _selector;
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
+ private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex;
+ private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError;
+ private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted;
private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
+ private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
@@ -461,12 +517,32 @@ namespace System.Reactive.Linq.Observαble
_selectorOnCompleted = selectorOnCompleted;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ {
+ _source = source;
+ _selectorWithIndex = selector;
+ }
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted)
+ {
+ _source = source;
+ _selectorWithIndex = selector;
+ _selectorWithIndexOnError = selectorOnError;
+ _selectorWithIndexOnCompleted = selectorOnCompleted;
+ }
+
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
{
_source = source;
_selectorE = selector;
}
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
+ {
+ _source = source;
+ _selectorEWithIndex = selector;
+ }
+
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
@@ -479,7 +555,7 @@ namespace System.Reactive.Linq.Observαble
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_selector != null)
+ if (_selector != null || _selectorWithIndex != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
@@ -509,12 +585,14 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
+ _index = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
+ private int _index;
public IDisposable Run()
{
@@ -535,7 +613,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- inner = _parent._selector(value);
+ if (_parent._selector != null)
+ inner = _parent._selector(value);
+ else
+ {
+ checked { _index++; }
+ inner = _parent._selectorWithIndex(value, _index);
+ }
}
catch (Exception ex)
{
@@ -558,7 +642,13 @@ namespace System.Reactive.Linq.Observαble
try
{
- inner = _parent._selectorOnError(error);
+ if (_parent._selectorOnError != null)
+ inner = _parent._selectorOnError(error);
+ else
+ {
+ checked { _index++; }
+ inner = _parent._selectorWithIndexOnError(error, _index);
+ }
}
catch (Exception ex)
{
@@ -592,7 +682,10 @@ namespace System.Reactive.Linq.Observαble
try
{
- inner = _parent._selectorOnCompleted();
+ if (_parent._selectorOnCompleted != null)
+ inner = _parent._selectorOnCompleted();
+ else
+ inner = _parent._selectorWithIndexOnCompleted(_index);
}
catch (Exception ex)
{
@@ -692,11 +785,13 @@ namespace System.Reactive.Linq.Observαble
class ε : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
+ private int _index;
public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
+ _index = -1;
}
public void OnNext(TSource value)
@@ -704,7 +799,13 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TResult>);
try
{
- xs = _parent._selectorE(value);
+ if (_parent._selectorE != null)
+ xs = _parent._selectorE(value);
+ else
+ {
+ checked { _index++; }
+ xs = _parent._selectorEWithIndex(value, _index);
+ }
}
catch (Exception exception)
{