// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Collections.Generic; using System.Reactive; using System.Reactive.Disposables; #if !NO_TPL using System.Threading; using System.Threading.Tasks; #endif namespace System.Reactive.Linq.Observαble { class SelectMany : Producer { private readonly IObservable _source; private readonly Func> _collectionSelector; private readonly Func> _collectionSelectorE; private readonly Func _resultSelector; public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorE = collectionSelector; _resultSelector = resultSelector; } #if !NO_TPL private readonly Func> _collectionSelectorT; public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorT = collectionSelector; _resultSelector = resultSelector; } #endif protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { if (_collectionSelector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } #if !NO_TPL else if (_collectionSelectorT != null) { var sink = new τ(this, observer, cancel); setSink(sink); return sink.Run(); } #endif else { var sink = new ε(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } class _ : Sink, IObserver { private readonly SelectMany _parent; public _(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; public IDisposable Run() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); return _group; } public void OnNext(TSource value) { var collection = default(IObservable); try { collection = _parent._collectionSelector(value); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription)); } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } class ι : IObserver { private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; public ι(_ parent, TSource value, IDisposable self) { _parent = parent; _value = value; _self = self; } public void OnNext(TCollection value) { var res = default(TResult); try { res = _parent._parent._resultSelector(_value, value); } catch (Exception ex) { lock (_parent._gate) { _parent._observer.OnError(ex); _parent.Dispose(); } return; } lock (_parent._gate) _parent._observer.OnNext(res); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } class ε : Sink, IObserver { private readonly SelectMany _parent; public ε(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public void OnNext(TSource value) { var xs = default(IEnumerable); try { xs = _parent._collectionSelectorE(value); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } var e = default(IEnumerator); try { e = xs.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } try { var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = _parent._resultSelector(value, e.Current); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } if (hasNext) base._observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } #if !NO_TPL #pragma warning disable 0420 class τ : Sink, IObserver { private readonly SelectMany _parent; public τ(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private CancellationDisposable _cancel; private volatile int _count; public IDisposable Run() { _gate = new object(); _cancel = new CancellationDisposable(); _count = 1; return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); } public void OnNext(TSource value) { var task = default(Task); try { Interlocked.Increment(ref _count); task = _parent._collectionSelectorT(value, _cancel.Token); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } if (task.IsCompleted) { OnCompletedTask(value, task); } else { AttachContinuation(value, task); } } private void AttachContinuation(TSource value, Task task) { // // Separate method to avoid closure in synchronous completion case. // task.ContinueWith(t => OnCompletedTask(value, t)); } private void OnCompletedTask(TSource value, Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { var res = default(TResult); try { res = _parent._resultSelector(value, task.Result); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } lock (_gate) base._observer.OnNext(res); OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { base._observer.OnError(task.Exception.InnerException); base.Dispose(); } } break; case TaskStatus.Canceled: { if (!_cancel.IsDisposed) { lock (_gate) { base._observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } } break; } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } #pragma warning restore 0420 #endif } class SelectMany : Producer { private readonly IObservable _source; private readonly Func> _selector; private readonly Func> _selectorOnError; private readonly Func> _selectorOnCompleted; private readonly Func> _selectorE; public SelectMany(IObservable source, Func> selector) { _source = source; _selector = selector; } public SelectMany(IObservable source, Func> selector, Func> selectorOnError, Func> selectorOnCompleted) { _source = source; _selector = selector; _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } public SelectMany(IObservable source, Func> selector) { _source = source; _selectorE = selector; } #if !NO_TPL private readonly Func> _selectorT; public SelectMany(IObservable source, Func> selector) { _source = source; _selectorT = selector; } #endif protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { if (_selector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } #if !NO_TPL else if (_selectorT != null) { var sink = new τ(this, observer, cancel); setSink(sink); return sink.Run(); } #endif else { var sink = new ε(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } class _ : Sink, IObserver { private readonly SelectMany _parent; public _(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; public IDisposable Run() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); return _group; } public void OnNext(TSource value) { var inner = default(IObservable); try { inner = _parent._selector(value); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); } public void OnError(Exception error) { if (_parent._selectorOnError != null) { var inner = default(IObservable); try { inner = _parent._selectorOnError(error); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); Final(); } else { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } } public void OnCompleted() { if (_parent._selectorOnCompleted != null) { var inner = default(IObservable); try { inner = _parent._selectorOnCompleted(); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); } Final(); } private void Final() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } private void SubscribeInner(IObservable inner) { var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription)); } class ι : IObserver { private readonly _ _parent; private readonly IDisposable _self; public ι(_ parent, IDisposable self) { _parent = parent; _self = self; } public void OnNext(TResult value) { lock (_parent._gate) _parent._observer.OnNext(value); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } class ε : Sink, IObserver { private readonly SelectMany _parent; public ε(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public void OnNext(TSource value) { var xs = default(IEnumerable); try { xs = _parent._selectorE(value); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } var e = default(IEnumerator); try { e = xs.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } try { var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } if (hasNext) base._observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } #if !NO_TPL #pragma warning disable 0420 class τ : Sink, IObserver { private readonly SelectMany _parent; public τ(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private CancellationDisposable _cancel; private volatile int _count; public IDisposable Run() { _gate = new object(); _cancel = new CancellationDisposable(); _count = 1; return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); } public void OnNext(TSource value) { var task = default(Task); try { Interlocked.Increment(ref _count); task = _parent._selectorT(value, _cancel.Token); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } if (task.IsCompleted) { OnCompletedTask(task); } else { task.ContinueWith(OnCompletedTask); } } private void OnCompletedTask(Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { lock (_gate) base._observer.OnNext(task.Result); OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { base._observer.OnError(task.Exception.InnerException); base.Dispose(); } } break; case TaskStatus.Canceled: { if (!_cancel.IsDisposed) { lock (_gate) { base._observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } } break; } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } #pragma warning restore 0420 #endif } } #endif