// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; namespace System.Reactive.Linq.Observαble { abstract class Select : Producer { public abstract IObservable Ω(Func selector); } class Select : Select { private readonly IObservable _source; private readonly Func _selector; private readonly Func _selectorI; public Select(IObservable source, Func selector) { _source = source; _selector = selector; } public Select(IObservable source, Func selector) { _source = source; _selectorI = selector; } public override IObservable Ω(Func selector) { if (_selector != null) return new Select(_source, x => selector(_selector(x))); else return new Select(this, selector); } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { if (_selector != null) { var sink = new _(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } else { var sink = new τ(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } class _ : Sink, IObserver { private readonly Select _parent; public _(Select parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public void OnNext(TSource value) { var result = default(TResult); try { result = _parent._selector(value); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } base._observer.OnNext(result); } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } class τ : Sink, IObserver { private readonly Select _parent; private int _index; public τ(Select parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; _index = 0; } public void OnNext(TSource value) { var result = default(TResult); try { result = _parent._selectorI(value, checked(_index++)); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } base._observer.OnNext(result); } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } } } #endif