diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs')
-rw-r--r-- | Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs | 125 |
1 files changed, 113 insertions, 12 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs index 687a894..054f974 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs @@ -17,8 +17,11 @@ namespace System.Reactive.Linq.Observαble { private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TCollection>> _collectionSelector; + private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex; private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE; + private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex; private readonly Func<TSource, TCollection, TResult> _resultSelector; + private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex; public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { @@ -27,6 +30,13 @@ namespace System.Reactive.Linq.Observαble _resultSelector = resultSelector; } + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) + { + _source = source; + _collectionSelectorWithIndex = collectionSelector; + _resultSelectorWithIndex = resultSelector; + } + public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { _source = source; @@ -34,6 +44,13 @@ namespace System.Reactive.Linq.Observαble _resultSelector = resultSelector; } + public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) + { + _source = source; + _collectionSelectorEWithIndex = collectionSelector; + _resultSelectorWithIndex = resultSelector; + } + #if !NO_TPL private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT; @@ -47,7 +64,7 @@ namespace System.Reactive.Linq.Observαble protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) { - if (_collectionSelector != null) + if (_collectionSelector != null || _collectionSelectorWithIndex != null) { var sink = new _(this, observer, cancel); setSink(sink); @@ -77,12 +94,14 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; + _indexInSource = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; + private int _indexInSource; public IDisposable Run() { @@ -103,7 +122,13 @@ namespace System.Reactive.Linq.Observαble try { - collection = _parent._collectionSelector(value); + if (_parent._collectionSelector != null) + collection = _parent._collectionSelector(value); + else + { + checked { _indexInSource++; } + collection = _parent._collectionSelectorWithIndex(value, _indexInSource); + } } catch (Exception ex) { @@ -117,7 +142,7 @@ namespace System.Reactive.Linq.Observαble var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription)); + innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource)); } public void OnError(Exception error) @@ -158,12 +183,16 @@ namespace System.Reactive.Linq.Observαble private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; + private int _indexInSource; + private int _indexInIntermediate = -1; - public ι(_ parent, TSource value, IDisposable self) + public ι(_ parent, TSource value, IDisposable self, int indexInSource) { _parent = parent; _value = value; _self = self; + _indexInSource = indexInSource; + _indexInIntermediate = -1; } public void OnNext(TCollection value) @@ -172,7 +201,13 @@ namespace System.Reactive.Linq.Observαble try { - res = _parent._parent._resultSelector(_value, value); + if (_parent._parent._resultSelector != null) + res = _parent._parent._resultSelector(_value, value); + else + { + checked { _indexInIntermediate++; } + res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate); + } } catch (Exception ex) { @@ -222,11 +257,13 @@ namespace System.Reactive.Linq.Observαble class ε : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TCollection, TResult> _parent; + private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; + _indexInSource = -1; } public void OnNext(TSource value) @@ -234,7 +271,13 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable<TCollection>); try { - xs = _parent._collectionSelectorE(value); + if (_parent._collectionSelectorE != null) + xs = _parent._collectionSelectorE(value); + else + { + checked { _indexInSource++; } + xs = _parent._collectionSelectorEWithIndex(value, _indexInSource); + } } catch (Exception exception) { @@ -257,6 +300,7 @@ namespace System.Reactive.Linq.Observαble try { + int indexInIntermediate = -1; var hasNext = true; while (hasNext) { @@ -267,7 +311,15 @@ namespace System.Reactive.Linq.Observαble { hasNext = e.MoveNext(); if (hasNext) - current = _parent._resultSelector(value, e.Current); + { + if (_parent._resultSelector != null) + current = _parent._resultSelector(value, e.Current); + else + { + checked { indexInIntermediate++; } + current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate); + } + } } catch (Exception exception) { @@ -445,7 +497,11 @@ namespace System.Reactive.Linq.Observαble private readonly Func<TSource, IObservable<TResult>> _selector; private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; + private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex; + private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError; + private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted; private readonly Func<TSource, IEnumerable<TResult>> _selectorE; + private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex; public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) { @@ -461,12 +517,32 @@ namespace System.Reactive.Linq.Observαble _selectorOnCompleted = selectorOnCompleted; } + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) + { + _source = source; + _selectorWithIndex = selector; + } + + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted) + { + _source = source; + _selectorWithIndex = selector; + _selectorWithIndexOnError = selectorOnError; + _selectorWithIndexOnCompleted = selectorOnCompleted; + } + public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) { _source = source; _selectorE = selector; } + public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) + { + _source = source; + _selectorEWithIndex = selector; + } + #if !NO_TPL private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT; @@ -479,7 +555,7 @@ namespace System.Reactive.Linq.Observαble protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) { - if (_selector != null) + if (_selector != null || _selectorWithIndex != null) { var sink = new _(this, observer, cancel); setSink(sink); @@ -509,12 +585,14 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; + _index = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; + private int _index; public IDisposable Run() { @@ -535,7 +613,13 @@ namespace System.Reactive.Linq.Observαble try { - inner = _parent._selector(value); + if (_parent._selector != null) + inner = _parent._selector(value); + else + { + checked { _index++; } + inner = _parent._selectorWithIndex(value, _index); + } } catch (Exception ex) { @@ -558,7 +642,13 @@ namespace System.Reactive.Linq.Observαble try { - inner = _parent._selectorOnError(error); + if (_parent._selectorOnError != null) + inner = _parent._selectorOnError(error); + else + { + checked { _index++; } + inner = _parent._selectorWithIndexOnError(error, _index); + } } catch (Exception ex) { @@ -592,7 +682,10 @@ namespace System.Reactive.Linq.Observαble try { - inner = _parent._selectorOnCompleted(); + if (_parent._selectorOnCompleted != null) + inner = _parent._selectorOnCompleted(); + else + inner = _parent._selectorWithIndexOnCompleted(_index); } catch (Exception ex) { @@ -692,11 +785,13 @@ namespace System.Reactive.Linq.Observαble class ε : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TResult> _parent; + private int _index; public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; + _index = -1; } public void OnNext(TSource value) @@ -704,7 +799,13 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable<TResult>); try { - xs = _parent._selectorE(value); + if (_parent._selectorE != null) + xs = _parent._selectorE(value); + else + { + checked { _index++; } + xs = _parent._selectorEWithIndex(value, _index); + } } catch (Exception exception) { |