diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Where.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Where.cs | 135 |
1 files changed, 0 insertions, 135 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Where.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Where.cs deleted file mode 100644 index a8eaa92..0000000 --- a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Where.cs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#if !NO_PERF -using System; - -namespace System.Reactive.Linq.Observαble -{ - class Where<TSource> : Producer<TSource> - { - private readonly IObservable<TSource> _source; - private readonly Func<TSource, bool> _predicate; - private readonly Func<TSource, int, bool> _predicateI; - - public Where(IObservable<TSource> source, Func<TSource, bool> predicate) - { - _source = source; - _predicate = predicate; - } - - public Where(IObservable<TSource> source, Func<TSource, int, bool> predicate) - { - _source = source; - _predicateI = predicate; - } - - public IObservable<TSource> Ω(Func<TSource, bool> predicate) - { - if (_predicate != null) - return new Where<TSource>(_source, x => _predicate(x) && predicate(x)); - else - return new Where<TSource>(this, predicate); - } - - protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) - { - if (_predicate != null) - { - var sink = new _(this, observer, cancel); - setSink(sink); - return _source.SubscribeSafe(sink); - } - else - { - var sink = new τ(this, observer, cancel); - setSink(sink); - return _source.SubscribeSafe(sink); - } - } - - class _ : Sink<TSource>, IObserver<TSource> - { - private readonly Where<TSource> _parent; - - public _(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel) - : base(observer, cancel) - { - _parent = parent; - } - - public void OnNext(TSource value) - { - var shouldRun = default(bool); - try - { - shouldRun = _parent._predicate(value); - } - catch (Exception exception) - { - base._observer.OnError(exception); - base.Dispose(); - return; - } - - if (shouldRun) - base._observer.OnNext(value); - } - - public void OnError(Exception error) - { - base._observer.OnError(error); - base.Dispose(); - } - - public void OnCompleted() - { - base._observer.OnCompleted(); - base.Dispose(); - } - } - - class τ : Sink<TSource>, IObserver<TSource> - { - private readonly Where<TSource> _parent; - private int _index; - - public τ(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel) - : base(observer, cancel) - { - _parent = parent; - _index = 0; - } - - public void OnNext(TSource value) - { - var shouldRun = default(bool); - try - { - shouldRun = _parent._predicateI(value, checked(_index++)); - } - catch (Exception exception) - { - base._observer.OnError(exception); - base.Dispose(); - return; - } - - if (shouldRun) - base._observer.OnNext(value); - } - - public void OnError(Exception error) - { - base._observer.OnError(error); - base.Dispose(); - } - - public void OnCompleted() - { - base._observer.OnCompleted(); - base.Dispose(); - } - } - } -} -#endif
\ No newline at end of file |