Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs962
1 files changed, 856 insertions, 106 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
index 054f974..49d7cf9 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
@@ -1,9 +1,7 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
-using System;
using System.Collections.Generic;
-using System.Reactive;
using System.Reactive.Disposables;
#if !NO_TPL
@@ -11,17 +9,17 @@ using System.Threading;
using System.Threading.Tasks;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SelectMany<TSource, TCollection, TResult> : Producer<TResult>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
- private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex;
+ private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI;
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
- private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex;
+ private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
- private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex;
+ private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -33,8 +31,8 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
- _collectionSelectorWithIndex = collectionSelector;
- _resultSelectorWithIndex = resultSelector;
+ _collectionSelectorI = collectionSelector;
+ _resultSelectorI = resultSelector;
}
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
@@ -47,12 +45,14 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
- _collectionSelectorEWithIndex = collectionSelector;
- _resultSelectorWithIndex = resultSelector;
+ _collectionSelectorEI = collectionSelector;
+ _resultSelectorI = resultSelector;
}
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
+ private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI;
+ private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI;
public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -60,27 +60,52 @@ namespace System.Reactive.Linq.Observαble
_collectionSelectorT = collectionSelector;
_resultSelector = resultSelector;
}
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorTI = collectionSelector;
+ _resultSelectorTI = resultSelector;
+ }
#endif
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_collectionSelector != null || _collectionSelectorWithIndex != null)
+ if (_collectionSelector != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return sink.Run();
}
+ else if (_collectionSelectorI != null)
+ {
+ var sink = new IndexSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
#if !NO_TPL
else if (_collectionSelectorT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectManyImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
+ else if (_collectionSelectorTI != null)
+ {
+ var sink = new Sigma(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#endif
+ else if (_collectionSelectorE != null)
+ {
+ var sink = new NoSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return _source.SubscribeSafe(sink);
+ }
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new Omega(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -94,14 +119,12 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _indexInSource = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
- private int _indexInSource;
public IDisposable Run()
{
@@ -122,13 +145,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._collectionSelector != null)
- collection = _parent._collectionSelector(value);
- else
- {
- checked { _indexInSource++; }
- collection = _parent._collectionSelectorWithIndex(value, _indexInSource);
- }
+ collection = _parent._collectionSelector(value);
}
catch (Exception ex)
{
@@ -142,7 +159,7 @@ namespace System.Reactive.Linq.Observαble
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource));
+ innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription));
}
public void OnError(Exception error)
@@ -178,21 +195,17 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TCollection>
+ class Iter : IObserver<TCollection>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
- private int _indexInSource;
- private int _indexInIntermediate = -1;
- public ι(_ parent, TSource value, IDisposable self, int indexInSource)
+ public Iter(_ parent, TSource value, IDisposable self)
{
_parent = parent;
_value = value;
_self = self;
- _indexInSource = indexInSource;
- _indexInIntermediate = -1;
}
public void OnNext(TCollection value)
@@ -201,14 +214,164 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._parent._resultSelector != null)
- res = _parent._parent._resultSelector(_value, value);
- else
+ res = _parent._parent._resultSelector(_value, value);
+ }
+ catch (Exception ex)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(ex);
+ _parent.Dispose();
+ }
+ return;
+ }
+
+ lock (_parent._gate)
+ _parent._observer.OnNext(res);
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(error);
+ _parent.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _parent._group.Remove(_self);
+ if (_parent._isStopped && _parent._group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_parent._gate)
{
- checked { _indexInIntermediate++; }
- res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate);
+ _parent._observer.OnCompleted();
+ _parent.Dispose();
}
}
+ }
+ }
+ }
+
+ class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private bool _isStopped;
+ private CompositeDisposable _group;
+ private SingleAssignmentDisposable _sourceSubscription;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _isStopped = false;
+ _group = new CompositeDisposable();
+
+ _sourceSubscription = new SingleAssignmentDisposable();
+ _group.Add(_sourceSubscription);
+ _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
+
+ return _group;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+ var collection = default(IObservable<TCollection>);
+
+ try
+ {
+ collection = _parent._collectionSelectorI(value, index);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ var innerSubscription = new SingleAssignmentDisposable();
+ _group.Add(innerSubscription);
+ innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription));
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _isStopped = true;
+ if (_group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ else
+ {
+ _sourceSubscription.Dispose();
+ }
+ }
+
+ class Iter : IObserver<TCollection>
+ {
+ private readonly IndexSelectorImpl _parent;
+ private readonly TSource _value;
+ private readonly int _valueIndex;
+ private readonly IDisposable _self;
+
+ public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)
+ {
+ _parent = parent;
+ _value = value;
+ _valueIndex = index;
+ _self = self;
+ }
+
+ private int _index;
+
+ public void OnNext(TCollection value)
+ {
+ var res = default(TResult);
+
+ try
+ {
+ res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++));
+ }
catch (Exception ex)
{
lock (_parent._gate)
@@ -254,16 +417,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TSource>
+ class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
- private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection
- public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
- _indexInSource = -1;
}
public void OnNext(TSource value)
@@ -271,14 +432,93 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TCollection>);
try
{
- if (_parent._collectionSelectorE != null)
- xs = _parent._collectionSelectorE(value);
- else
+ xs = _parent._collectionSelectorE(value);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ var e = default(IEnumerator<TCollection>);
+ try
+ {
+ e = xs.GetEnumerator();
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ try
+ {
+ var hasNext = true;
+ while (hasNext)
{
- checked { _indexInSource++; }
- xs = _parent._collectionSelectorEWithIndex(value, _indexInSource);
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = _parent._resultSelector(value, e.Current);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ if (hasNext)
+ base._observer.OnNext(current);
}
}
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+
+ class Omega : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private int _index;
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+
+ var xs = default(IEnumerable<TCollection>);
+ try
+ {
+ xs = _parent._collectionSelectorEI(value, index);
+ }
catch (Exception exception)
{
base._observer.OnError(exception);
@@ -300,7 +540,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- int indexInIntermediate = -1;
+ var eIndex = 0;
var hasNext = true;
while (hasNext)
{
@@ -311,15 +551,7 @@ namespace System.Reactive.Linq.Observαble
{
hasNext = e.MoveNext();
if (hasNext)
- {
- if (_parent._resultSelector != null)
- current = _parent._resultSelector(value, e.Current);
- else
- {
- checked { indexInIntermediate++; }
- current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate);
- }
- }
+ current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++));
}
catch (Exception exception)
{
@@ -354,11 +586,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectManyImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
- public τ(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -487,6 +719,143 @@ namespace System.Reactive.Linq.Observαble
}
}
}
+
+ class Sigma : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private CancellationDisposable _cancel;
+ private volatile int _count;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _cancel = new CancellationDisposable();
+ _count = 1;
+
+ return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+ }
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+
+ var task = default(Task<TCollection>);
+ try
+ {
+ Interlocked.Increment(ref _count);
+ task = _parent._collectionSelectorTI(value, index, _cancel.Token);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ if (task.IsCompleted)
+ {
+ OnCompletedTask(value, index, task);
+ }
+ else
+ {
+ AttachContinuation(value, index, task);
+ }
+ }
+
+ private void AttachContinuation(TSource value, int index, Task<TCollection> task)
+ {
+ //
+ // Separate method to avoid closure in synchronous completion case.
+ //
+ task.ContinueWith(t => OnCompletedTask(value, index, t));
+ }
+
+ private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.RanToCompletion:
+ {
+ var res = default(TResult);
+ try
+ {
+ res = _parent._resultSelectorTI(value, index, task.Result);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ lock (_gate)
+ base._observer.OnNext(res);
+
+ OnCompleted();
+ }
+ break;
+ case TaskStatus.Faulted:
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(task.Exception.InnerException);
+ base.Dispose();
+ }
+ }
+ break;
+ case TaskStatus.Canceled:
+ {
+ if (!_cancel.IsDisposed)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(new TaskCanceledException(task));
+ base.Dispose();
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Decrement(ref _count) == 0)
+ {
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ }
+ }
#pragma warning restore 0420
#endif
}
@@ -495,13 +864,11 @@ namespace System.Reactive.Linq.Observαble
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TResult>> _selector;
+ private readonly Func<TSource, int, IObservable<TResult>> _selectorI;
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
- private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex;
- private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError;
- private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted;
private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
- private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex;
+ private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
@@ -509,26 +876,26 @@ namespace System.Reactive.Linq.Observαble
_selector = selector;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
_source = source;
- _selector = selector;
- _selectorOnError = selectorOnError;
- _selectorOnCompleted = selectorOnCompleted;
+ _selectorI = selector;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
{
_source = source;
- _selectorWithIndex = selector;
+ _selector = selector;
+ _selectorOnError = selectorOnError;
+ _selectorOnCompleted = selectorOnCompleted;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted)
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
{
_source = source;
- _selectorWithIndex = selector;
- _selectorWithIndexOnError = selectorOnError;
- _selectorWithIndexOnCompleted = selectorOnCompleted;
+ _selectorI = selector;
+ _selectorOnError = selectorOnError;
+ _selectorOnCompleted = selectorOnCompleted;
}
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
@@ -540,38 +907,63 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
{
_source = source;
- _selectorEWithIndex = selector;
+ _selectorEI = selector;
}
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
+ private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI;
public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selectorT = selector;
}
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+ _source = source;
+ _selectorTI = selector;
+ }
#endif
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_selector != null || _selectorWithIndex != null)
+ if (_selector != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return sink.Run();
}
+ else if (_selectorI != null)
+ {
+ var sink = new IndexSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
#if !NO_TPL
else if (_selectorT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectManyImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
+ else if (_selectorTI != null)
+ {
+ var sink = new Sigma(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#endif
+ else if (_selectorE != null)
+ {
+ var sink = new NoSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return _source.SubscribeSafe(sink);
+ }
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new Omega(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -585,14 +977,12 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _index = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
- private int _index;
public IDisposable Run()
{
@@ -613,13 +1003,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selector != null)
- inner = _parent._selector(value);
- else
- {
- checked { _index++; }
- inner = _parent._selectorWithIndex(value, _index);
- }
+ inner = _parent._selector(value);
}
catch (Exception ex)
{
@@ -642,13 +1026,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selectorOnError != null)
- inner = _parent._selectorOnError(error);
- else
- {
- checked { _index++; }
- inner = _parent._selectorWithIndexOnError(error, _index);
- }
+ inner = _parent._selectorOnError(error);
}
catch (Exception ex)
{
@@ -682,10 +1060,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selectorOnCompleted != null)
- inner = _parent._selectorOnCompleted();
- else
- inner = _parent._selectorWithIndexOnCompleted(_index);
+ inner = _parent._selectorOnCompleted();
}
catch (Exception ex)
{
@@ -731,15 +1106,15 @@ namespace System.Reactive.Linq.Observαble
{
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription));
+ innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
}
- class ι : IObserver<TResult>
+ class Iter : IObserver<TResult>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ι(_ parent, IDisposable self)
+ public Iter(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -782,16 +1157,203 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TSource>
+ class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
+
+ public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private bool _isStopped;
+ private CompositeDisposable _group;
+ private SingleAssignmentDisposable _sourceSubscription;
private int _index;
- public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _isStopped = false;
+ _group = new CompositeDisposable();
+
+ _sourceSubscription = new SingleAssignmentDisposable();
+ _group.Add(_sourceSubscription);
+ _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
+
+ return _group;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorI(value, checked(_index++));
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+ }
+
+ public void OnError(Exception error)
+ {
+ if (_parent._selectorOnError != null)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorOnError(error);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+
+ Final();
+ }
+ else
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (_parent._selectorOnCompleted != null)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorOnCompleted();
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+ }
+
+ Final();
+ }
+
+ private void Final()
+ {
+ _isStopped = true;
+ if (_group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ else
+ {
+ _sourceSubscription.Dispose();
+ }
+ }
+
+ private void SubscribeInner(IObservable<TResult> inner)
+ {
+ var innerSubscription = new SingleAssignmentDisposable();
+ _group.Add(innerSubscription);
+ innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
+ }
+
+ class Iter : IObserver<TResult>
+ {
+ private readonly IndexSelectorImpl _parent;
+ private readonly IDisposable _self;
+
+ public Iter(IndexSelectorImpl parent, IDisposable self)
+ {
+ _parent = parent;
+ _self = self;
+ }
+
+ public void OnNext(TResult value)
+ {
+ lock (_parent._gate)
+ _parent._observer.OnNext(value);
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(error);
+ _parent.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _parent._group.Remove(_self);
+ if (_parent._isStopped && _parent._group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_parent._gate)
+ {
+ _parent._observer.OnCompleted();
+ _parent.Dispose();
+ }
+ }
+ }
+ }
+ }
+
+ class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
- _index = -1;
}
public void OnNext(TSource value)
@@ -799,14 +1361,91 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TResult>);
try
{
- if (_parent._selectorE != null)
- xs = _parent._selectorE(value);
- else
+ xs = _parent._selectorE(value);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ var e = default(IEnumerator<TResult>);
+ try
+ {
+ e = xs.GetEnumerator();
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ try
+ {
+ var hasNext = true;
+ while (hasNext)
{
- checked { _index++; }
- xs = _parent._selectorEWithIndex(value, _index);
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = e.Current;
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ if (hasNext)
+ base._observer.OnNext(current);
}
}
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+
+ class Omega : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private int _index;
+
+ public void OnNext(TSource value)
+ {
+ var xs = default(IEnumerable<TResult>);
+ try
+ {
+ xs = _parent._selectorEI(value, checked(_index++));
+ }
catch (Exception exception)
{
base._observer.OnError(exception);
@@ -873,11 +1512,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectManyImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
- public τ(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -982,6 +1621,117 @@ namespace System.Reactive.Linq.Observαble
}
}
}
+
+ class Sigma : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private CancellationDisposable _cancel;
+ private volatile int _count;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _cancel = new CancellationDisposable();
+ _count = 1;
+
+ return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+ }
+
+ public void OnNext(TSource value)
+ {
+ var task = default(Task<TResult>);
+ try
+ {
+ Interlocked.Increment(ref _count);
+ task = _parent._selectorTI(value, checked(_index++), _cancel.Token);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ if (task.IsCompleted)
+ {
+ OnCompletedTask(task);
+ }
+ else
+ {
+ task.ContinueWith(OnCompletedTask);
+ }
+ }
+
+ private void OnCompletedTask(Task<TResult> task)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.RanToCompletion:
+ {
+ lock (_gate)
+ base._observer.OnNext(task.Result);
+
+ OnCompleted();
+ }
+ break;
+ case TaskStatus.Faulted:
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(task.Exception.InnerException);
+ base.Dispose();
+ }
+ }
+ break;
+ case TaskStatus.Canceled:
+ {
+ if (!_cancel.IsDisposed)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(new TaskCanceledException(task));
+ base.Dispose();
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Decrement(ref _count) == 0)
+ {
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ }
+ }
#pragma warning restore 0420
#endif
}