diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs')
-rw-r--r-- | Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs | 962 |
1 files changed, 856 insertions, 106 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs index 054f974..49d7cf9 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs @@ -1,9 +1,7 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF -using System; using System.Collections.Generic; -using System.Reactive; using System.Reactive.Disposables; #if !NO_TPL @@ -11,17 +9,17 @@ using System.Threading; using System.Threading.Tasks; #endif -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SelectMany<TSource, TCollection, TResult> : Producer<TResult> { private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TCollection>> _collectionSelector; - private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex; + private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI; private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE; - private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex; + private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI; private readonly Func<TSource, TCollection, TResult> _resultSelector; - private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex; + private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI; public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { @@ -33,8 +31,8 @@ namespace System.Reactive.Linq.Observαble public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { _source = source; - _collectionSelectorWithIndex = collectionSelector; - _resultSelectorWithIndex = resultSelector; + _collectionSelectorI = collectionSelector; + _resultSelectorI = resultSelector; } public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) @@ -47,12 +45,14 @@ namespace System.Reactive.Linq.Observαble public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { _source = source; - _collectionSelectorEWithIndex = collectionSelector; - _resultSelectorWithIndex = resultSelector; + _collectionSelectorEI = collectionSelector; + _resultSelectorI = resultSelector; } #if !NO_TPL private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT; + private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI; + private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI; public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { @@ -60,27 +60,52 @@ namespace System.Reactive.Linq.Observαble _collectionSelectorT = collectionSelector; _resultSelector = resultSelector; } + + public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector) + { + _source = source; + _collectionSelectorTI = collectionSelector; + _resultSelectorTI = resultSelector; + } #endif protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) { - if (_collectionSelector != null || _collectionSelectorWithIndex != null) + if (_collectionSelector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } + else if (_collectionSelectorI != null) + { + var sink = new IndexSelectorImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } #if !NO_TPL else if (_collectionSelectorT != null) { - var sink = new τ(this, observer, cancel); + var sink = new SelectManyImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } + else if (_collectionSelectorTI != null) + { + var sink = new Sigma(this, observer, cancel); setSink(sink); return sink.Run(); } #endif + else if (_collectionSelectorE != null) + { + var sink = new NoSelectorImpl(this, observer, cancel); + setSink(sink); + return _source.SubscribeSafe(sink); + } else { - var sink = new ε(this, observer, cancel); + var sink = new Omega(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -94,14 +119,12 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _indexInSource = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; - private int _indexInSource; public IDisposable Run() { @@ -122,13 +145,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._collectionSelector != null) - collection = _parent._collectionSelector(value); - else - { - checked { _indexInSource++; } - collection = _parent._collectionSelectorWithIndex(value, _indexInSource); - } + collection = _parent._collectionSelector(value); } catch (Exception ex) { @@ -142,7 +159,7 @@ namespace System.Reactive.Linq.Observαble var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource)); + innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription)); } public void OnError(Exception error) @@ -178,21 +195,17 @@ namespace System.Reactive.Linq.Observαble } } - class ι : IObserver<TCollection> + class Iter : IObserver<TCollection> { private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; - private int _indexInSource; - private int _indexInIntermediate = -1; - public ι(_ parent, TSource value, IDisposable self, int indexInSource) + public Iter(_ parent, TSource value, IDisposable self) { _parent = parent; _value = value; _self = self; - _indexInSource = indexInSource; - _indexInIntermediate = -1; } public void OnNext(TCollection value) @@ -201,14 +214,164 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._parent._resultSelector != null) - res = _parent._parent._resultSelector(_value, value); - else + res = _parent._parent._resultSelector(_value, value); + } + catch (Exception ex) + { + lock (_parent._gate) + { + _parent._observer.OnError(ex); + _parent.Dispose(); + } + return; + } + + lock (_parent._gate) + _parent._observer.OnNext(res); + } + + public void OnError(Exception error) + { + lock (_parent._gate) + { + _parent._observer.OnError(error); + _parent.Dispose(); + } + } + + public void OnCompleted() + { + _parent._group.Remove(_self); + if (_parent._isStopped && _parent._group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_parent._gate) { - checked { _indexInIntermediate++; } - res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate); + _parent._observer.OnCompleted(); + _parent.Dispose(); } } + } + } + } + + class IndexSelectorImpl : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TCollection, TResult> _parent; + + public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private bool _isStopped; + private CompositeDisposable _group; + private SingleAssignmentDisposable _sourceSubscription; + private int _index; + + public IDisposable Run() + { + _gate = new object(); + _isStopped = false; + _group = new CompositeDisposable(); + + _sourceSubscription = new SingleAssignmentDisposable(); + _group.Add(_sourceSubscription); + _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); + + return _group; + } + + public void OnNext(TSource value) + { + var index = checked(_index++); + var collection = default(IObservable<TCollection>); + + try + { + collection = _parent._collectionSelectorI(value, index); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + var innerSubscription = new SingleAssignmentDisposable(); + _group.Add(innerSubscription); + innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription)); + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + _isStopped = true; + if (_group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + else + { + _sourceSubscription.Dispose(); + } + } + + class Iter : IObserver<TCollection> + { + private readonly IndexSelectorImpl _parent; + private readonly TSource _value; + private readonly int _valueIndex; + private readonly IDisposable _self; + + public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self) + { + _parent = parent; + _value = value; + _valueIndex = index; + _self = self; + } + + private int _index; + + public void OnNext(TCollection value) + { + var res = default(TResult); + + try + { + res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++)); + } catch (Exception ex) { lock (_parent._gate) @@ -254,16 +417,14 @@ namespace System.Reactive.Linq.Observαble } } - class ε : Sink<TResult>, IObserver<TSource> + class NoSelectorImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TCollection, TResult> _parent; - private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection - public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; - _indexInSource = -1; } public void OnNext(TSource value) @@ -271,14 +432,93 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable<TCollection>); try { - if (_parent._collectionSelectorE != null) - xs = _parent._collectionSelectorE(value); - else + xs = _parent._collectionSelectorE(value); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + var e = default(IEnumerator<TCollection>); + try + { + e = xs.GetEnumerator(); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + try + { + var hasNext = true; + while (hasNext) { - checked { _indexInSource++; } - xs = _parent._collectionSelectorEWithIndex(value, _indexInSource); + hasNext = false; + var current = default(TResult); + + try + { + hasNext = e.MoveNext(); + if (hasNext) + current = _parent._resultSelector(value, e.Current); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + if (hasNext) + base._observer.OnNext(current); } } + finally + { + if (e != null) + e.Dispose(); + } + } + + public void OnError(Exception error) + { + base._observer.OnError(error); + base.Dispose(); + } + + public void OnCompleted() + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + + class Omega : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TCollection, TResult> _parent; + + public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private int _index; + + public void OnNext(TSource value) + { + var index = checked(_index++); + + var xs = default(IEnumerable<TCollection>); + try + { + xs = _parent._collectionSelectorEI(value, index); + } catch (Exception exception) { base._observer.OnError(exception); @@ -300,7 +540,7 @@ namespace System.Reactive.Linq.Observαble try { - int indexInIntermediate = -1; + var eIndex = 0; var hasNext = true; while (hasNext) { @@ -311,15 +551,7 @@ namespace System.Reactive.Linq.Observαble { hasNext = e.MoveNext(); if (hasNext) - { - if (_parent._resultSelector != null) - current = _parent._resultSelector(value, e.Current); - else - { - checked { indexInIntermediate++; } - current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate); - } - } + current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++)); } catch (Exception exception) { @@ -354,11 +586,11 @@ namespace System.Reactive.Linq.Observαble #if !NO_TPL #pragma warning disable 0420 - class τ : Sink<TResult>, IObserver<TSource> + class SelectManyImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TCollection, TResult> _parent; - public τ(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -487,6 +719,143 @@ namespace System.Reactive.Linq.Observαble } } } + + class Sigma : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TCollection, TResult> _parent; + + public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private CancellationDisposable _cancel; + private volatile int _count; + private int _index; + + public IDisposable Run() + { + _gate = new object(); + _cancel = new CancellationDisposable(); + _count = 1; + + return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); + } + + public void OnNext(TSource value) + { + var index = checked(_index++); + + var task = default(Task<TCollection>); + try + { + Interlocked.Increment(ref _count); + task = _parent._collectionSelectorTI(value, index, _cancel.Token); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + + return; + } + + if (task.IsCompleted) + { + OnCompletedTask(value, index, task); + } + else + { + AttachContinuation(value, index, task); + } + } + + private void AttachContinuation(TSource value, int index, Task<TCollection> task) + { + // + // Separate method to avoid closure in synchronous completion case. + // + task.ContinueWith(t => OnCompletedTask(value, index, t)); + } + + private void OnCompletedTask(TSource value, int index, Task<TCollection> task) + { + switch (task.Status) + { + case TaskStatus.RanToCompletion: + { + var res = default(TResult); + try + { + res = _parent._resultSelectorTI(value, index, task.Result); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + + return; + } + + lock (_gate) + base._observer.OnNext(res); + + OnCompleted(); + } + break; + case TaskStatus.Faulted: + { + lock (_gate) + { + base._observer.OnError(task.Exception.InnerException); + base.Dispose(); + } + } + break; + case TaskStatus.Canceled: + { + if (!_cancel.IsDisposed) + { + lock (_gate) + { + base._observer.OnError(new TaskCanceledException(task)); + base.Dispose(); + } + } + } + break; + } + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + if (Interlocked.Decrement(ref _count) == 0) + { + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + } + } #pragma warning restore 0420 #endif } @@ -495,13 +864,11 @@ namespace System.Reactive.Linq.Observαble { private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TResult>> _selector; + private readonly Func<TSource, int, IObservable<TResult>> _selectorI; private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; - private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex; - private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError; - private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted; private readonly Func<TSource, IEnumerable<TResult>> _selectorE; - private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex; + private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI; public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) { @@ -509,26 +876,26 @@ namespace System.Reactive.Linq.Observαble _selector = selector; } - public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) { _source = source; - _selector = selector; - _selectorOnError = selectorOnError; - _selectorOnCompleted = selectorOnCompleted; + _selectorI = selector; } - public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) + public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) { _source = source; - _selectorWithIndex = selector; + _selector = selector; + _selectorOnError = selectorOnError; + _selectorOnCompleted = selectorOnCompleted; } - public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted) + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) { _source = source; - _selectorWithIndex = selector; - _selectorWithIndexOnError = selectorOnError; - _selectorWithIndexOnCompleted = selectorOnCompleted; + _selectorI = selector; + _selectorOnError = selectorOnError; + _selectorOnCompleted = selectorOnCompleted; } public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) @@ -540,38 +907,63 @@ namespace System.Reactive.Linq.Observαble public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) { _source = source; - _selectorEWithIndex = selector; + _selectorEI = selector; } #if !NO_TPL private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT; + private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI; public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) { _source = source; _selectorT = selector; } + + public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) + { + _source = source; + _selectorTI = selector; + } #endif protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) { - if (_selector != null || _selectorWithIndex != null) + if (_selector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } + else if (_selectorI != null) + { + var sink = new IndexSelectorImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } #if !NO_TPL else if (_selectorT != null) { - var sink = new τ(this, observer, cancel); + var sink = new SelectManyImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } + else if (_selectorTI != null) + { + var sink = new Sigma(this, observer, cancel); setSink(sink); return sink.Run(); } #endif + else if (_selectorE != null) + { + var sink = new NoSelectorImpl(this, observer, cancel); + setSink(sink); + return _source.SubscribeSafe(sink); + } else { - var sink = new ε(this, observer, cancel); + var sink = new Omega(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -585,14 +977,12 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _index = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; - private int _index; public IDisposable Run() { @@ -613,13 +1003,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._selector != null) - inner = _parent._selector(value); - else - { - checked { _index++; } - inner = _parent._selectorWithIndex(value, _index); - } + inner = _parent._selector(value); } catch (Exception ex) { @@ -642,13 +1026,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._selectorOnError != null) - inner = _parent._selectorOnError(error); - else - { - checked { _index++; } - inner = _parent._selectorWithIndexOnError(error, _index); - } + inner = _parent._selectorOnError(error); } catch (Exception ex) { @@ -682,10 +1060,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._selectorOnCompleted != null) - inner = _parent._selectorOnCompleted(); - else - inner = _parent._selectorWithIndexOnCompleted(_index); + inner = _parent._selectorOnCompleted(); } catch (Exception ex) { @@ -731,15 +1106,15 @@ namespace System.Reactive.Linq.Observαble { var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription)); + innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); } - class ι : IObserver<TResult> + class Iter : IObserver<TResult> { private readonly _ _parent; private readonly IDisposable _self; - public ι(_ parent, IDisposable self) + public Iter(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -782,16 +1157,203 @@ namespace System.Reactive.Linq.Observαble } } - class ε : Sink<TResult>, IObserver<TSource> + class IndexSelectorImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TResult> _parent; + + public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private bool _isStopped; + private CompositeDisposable _group; + private SingleAssignmentDisposable _sourceSubscription; private int _index; - public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public IDisposable Run() + { + _gate = new object(); + _isStopped = false; + _group = new CompositeDisposable(); + + _sourceSubscription = new SingleAssignmentDisposable(); + _group.Add(_sourceSubscription); + _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); + + return _group; + } + + public void OnNext(TSource value) + { + var inner = default(IObservable<TResult>); + + try + { + inner = _parent._selectorI(value, checked(_index++)); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + SubscribeInner(inner); + } + + public void OnError(Exception error) + { + if (_parent._selectorOnError != null) + { + var inner = default(IObservable<TResult>); + + try + { + inner = _parent._selectorOnError(error); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + SubscribeInner(inner); + + Final(); + } + else + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + } + + public void OnCompleted() + { + if (_parent._selectorOnCompleted != null) + { + var inner = default(IObservable<TResult>); + + try + { + inner = _parent._selectorOnCompleted(); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + SubscribeInner(inner); + } + + Final(); + } + + private void Final() + { + _isStopped = true; + if (_group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + else + { + _sourceSubscription.Dispose(); + } + } + + private void SubscribeInner(IObservable<TResult> inner) + { + var innerSubscription = new SingleAssignmentDisposable(); + _group.Add(innerSubscription); + innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); + } + + class Iter : IObserver<TResult> + { + private readonly IndexSelectorImpl _parent; + private readonly IDisposable _self; + + public Iter(IndexSelectorImpl parent, IDisposable self) + { + _parent = parent; + _self = self; + } + + public void OnNext(TResult value) + { + lock (_parent._gate) + _parent._observer.OnNext(value); + } + + public void OnError(Exception error) + { + lock (_parent._gate) + { + _parent._observer.OnError(error); + _parent.Dispose(); + } + } + + public void OnCompleted() + { + _parent._group.Remove(_self); + if (_parent._isStopped && _parent._group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_parent._gate) + { + _parent._observer.OnCompleted(); + _parent.Dispose(); + } + } + } + } + } + + class NoSelectorImpl : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TResult> _parent; + + public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; - _index = -1; } public void OnNext(TSource value) @@ -799,14 +1361,91 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable<TResult>); try { - if (_parent._selectorE != null) - xs = _parent._selectorE(value); - else + xs = _parent._selectorE(value); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + var e = default(IEnumerator<TResult>); + try + { + e = xs.GetEnumerator(); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + try + { + var hasNext = true; + while (hasNext) { - checked { _index++; } - xs = _parent._selectorEWithIndex(value, _index); + hasNext = false; + var current = default(TResult); + + try + { + hasNext = e.MoveNext(); + if (hasNext) + current = e.Current; + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + if (hasNext) + base._observer.OnNext(current); } } + finally + { + if (e != null) + e.Dispose(); + } + } + + public void OnError(Exception error) + { + base._observer.OnError(error); + base.Dispose(); + } + + public void OnCompleted() + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + + class Omega : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TResult> _parent; + + public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private int _index; + + public void OnNext(TSource value) + { + var xs = default(IEnumerable<TResult>); + try + { + xs = _parent._selectorEI(value, checked(_index++)); + } catch (Exception exception) { base._observer.OnError(exception); @@ -873,11 +1512,11 @@ namespace System.Reactive.Linq.Observαble #if !NO_TPL #pragma warning disable 0420 - class τ : Sink<TResult>, IObserver<TSource> + class SelectManyImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TResult> _parent; - public τ(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -982,6 +1621,117 @@ namespace System.Reactive.Linq.Observαble } } } + + class Sigma : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TResult> _parent; + + public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private CancellationDisposable _cancel; + private volatile int _count; + private int _index; + + public IDisposable Run() + { + _gate = new object(); + _cancel = new CancellationDisposable(); + _count = 1; + + return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); + } + + public void OnNext(TSource value) + { + var task = default(Task<TResult>); + try + { + Interlocked.Increment(ref _count); + task = _parent._selectorTI(value, checked(_index++), _cancel.Token); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + + return; + } + + if (task.IsCompleted) + { + OnCompletedTask(task); + } + else + { + task.ContinueWith(OnCompletedTask); + } + } + + private void OnCompletedTask(Task<TResult> task) + { + switch (task.Status) + { + case TaskStatus.RanToCompletion: + { + lock (_gate) + base._observer.OnNext(task.Result); + + OnCompleted(); + } + break; + case TaskStatus.Faulted: + { + lock (_gate) + { + base._observer.OnError(task.Exception.InnerException); + base.Dispose(); + } + } + break; + case TaskStatus.Canceled: + { + if (!_cancel.IsDisposed) + { + lock (_gate) + { + base._observer.OnError(new TaskCanceledException(task)); + base.Dispose(); + } + } + } + break; + } + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + if (Interlocked.Decrement(ref _count) == 0) + { + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + } + } #pragma warning restore 0420 #endif } |