diff options
author | Atsushi Eno <atsushieno@veritas-vos-liberabit.com> | 2012-11-12 22:47:31 +0400 |
---|---|---|
committer | Atsushi Eno <atsushieno@veritas-vos-liberabit.com> | 2012-11-12 22:52:17 +0400 |
commit | d1174f3f8979321a9182925df460e07e08157b41 (patch) | |
tree | d16fb2fc191bf68ff0e2aac600adf71aba8cad01 /Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs | |
parent | d90a52595e24b1216c89f6cb5f245262db1810ae (diff) |
partial import of ca05fdeb565e: Reactive Extensions OSS V1.0
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs | 661 |
1 files changed, 661 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs new file mode 100644 index 0000000..09e2df4 --- /dev/null +++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs @@ -0,0 +1,661 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Subjects; + +namespace System.Reactive.Linq +{ +#if !NO_PERF + using Observαble; +#endif + + internal partial class QueryLanguage + { + #region + AsObservable + + + public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source) + { +#if !NO_PERF + var asObservable = source as AsObservable<TSource>; + if (asObservable != null) + return asObservable.Ω(); + + return new AsObservable<TSource>(source); +#else + return new AnonymousObservable<TSource>(observer => source.Subscribe(observer)); +#endif + } + + #endregion + + #region + Buffer + + + public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count) + { + return Buffer_<TSource>(source, count, count); + } + + public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip) + { + return Buffer_<TSource>(source, count, skip); + } + + private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip) + { +#if !NO_PERF + return new Buffer<TSource>(source, count, skip); +#else + return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0); +#endif + } + + #endregion + + #region + Dematerialize + + + public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source) + { +#if !NO_PERF + var materialize = source as Materialize<TSource>; + if (materialize != null) + return materialize.Dematerialize(); + + return new Dematerialize<TSource>(source); +#else + return new AnonymousObservable<TSource>(observer => + source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted)); +#endif + } + + #endregion + + #region + DistinctUntilChanged + + + public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source) + { + return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default); + } + + public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer) + { + return DistinctUntilChanged_(source, x => x, comparer); + } + + public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector) + { + return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default); + } + + public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer) + { + return DistinctUntilChanged_(source, keySelector, comparer); + } + + private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer) + { +#if !NO_PERF + return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer); +#else + return new AnonymousObservable<TSource>(observer => + { + var currentKey = default(TKey); + var hasCurrentKey = false; + return source.Subscribe( + value => + { + var key = default(TKey); + try + { + key = keySelector(value); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + var comparerEquals = false; + if (hasCurrentKey) + { + try + { + comparerEquals = comparer.Equals(currentKey, key); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + } + if (!hasCurrentKey || !comparerEquals) + { + hasCurrentKey = true; + currentKey = key; + observer.OnNext(value); + } + }, + observer.OnError, + observer.OnCompleted); + }); +#endif + } + + #endregion + + #region + Do + + + public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext) + { +#if !NO_PERF + return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop); +#else + // PERFORMANCE - Use of Select allows for operator coalescing + return source.Select( + x => + { + onNext(x); + return x; + } + ); +#endif + } + + public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted) + { +#if !NO_PERF + return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted); +#else + return new AnonymousObservable<TSource>(obs => + { + return source.Subscribe( + x => + { + try + { + onNext(x); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnNext(x); + }, + obs.OnError, + () => + { + try + { + onCompleted(); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnCompleted(); + }); + }); +#endif + } + + public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError) + { +#if !NO_PERF + return Do_<TSource>(source, onNext, onError, Stubs.Nop); +#else + return new AnonymousObservable<TSource>(obs => + { + return source.Subscribe( + x => + { + try + { + onNext(x); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnNext(x); + }, + ex => + { + try + { + onError(ex); + } + catch (Exception ex2) + { + obs.OnError(ex2); + } + obs.OnError(ex); + }, + obs.OnCompleted); + }); +#endif + } + + public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted) + { + return Do_(source, onNext, onError, onCompleted); + } + + public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer) + { + return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted); + } + + private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted) + { +#if !NO_PERF + return new Do<TSource>(source, onNext, onError, onCompleted); +#else + return new AnonymousObservable<TSource>(obs => + { + return source.Subscribe( + x => + { + try + { + onNext(x); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnNext(x); + }, + ex => + { + try + { + onError(ex); + } + catch (Exception ex2) + { + obs.OnError(ex2); + } + obs.OnError(ex); + }, + () => + { + try + { + onCompleted(); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnCompleted(); + }); + }); +#endif + } + + #endregion + + #region + Finally + + + public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction) + { +#if !NO_PERF + return new Finally<TSource>(source, finallyAction); +#else + return new AnonymousObservable<TSource>(observer => + { + var subscription = source.Subscribe(observer); + + return Disposable.Create(() => + { + try + { + subscription.Dispose(); + } + finally + { + finallyAction(); + } + }); + }); +#endif + } + + #endregion + + #region + IgnoreElements + + + public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source) + { +#if !NO_PERF + var ignoreElements = source as IgnoreElements<TSource>; + if (ignoreElements != null) + return ignoreElements.Ω(); + + return new IgnoreElements<TSource>(source); +#else + return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted)); +#endif + } + + #endregion + + #region + Materialize + + + public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source) + { +#if !NO_PERF + // + // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to + // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case + // a reduction to xs.AsObservable() would be performed, those notification elements would survive. + // + + return new Materialize<TSource>(source); +#else + return new AnonymousObservable<Notification<TSource>>(observer => + source.Subscribe( + value => observer.OnNext(Notification.CreateOnNext<TSource>(value)), + exception => + { + observer.OnNext(Notification.CreateOnError<TSource>(exception)); + observer.OnCompleted(); + }, + () => + { + observer.OnNext(Notification.CreateOnCompleted<TSource>()); + observer.OnCompleted(); + })); +#endif + } + + #endregion + + #region - Repeat - + + public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source) + { + return RepeatInfinite(source).Concat(); + } + + private static IEnumerable<T> RepeatInfinite<T>(T value) + { + while (true) + yield return value; + } + + public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount) + { + return Enumerable.Repeat(source, repeatCount).Concat(); + } + + #endregion + + #region - Retry - + + public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source) + { + return RepeatInfinite(source).Catch(); + } + + public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount) + { + return Enumerable.Repeat(source, retryCount).Catch(); + } + + #endregion + + #region + Scan + + + public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator) + { +#if !NO_PERF + return new Scan<TSource, TAccumulate>(source, seed, accumulator); +#else + return Defer(() => + { + var accumulation = default(TAccumulate); + var hasAccumulation = false; + return source.Select(x => + { + if (hasAccumulation) + accumulation = accumulator(accumulation, x); + else + { + accumulation = accumulator(seed, x); + hasAccumulation = true; + } + return accumulation; + }); + }); +#endif + } + + public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator) + { +#if !NO_PERF + return new Scan<TSource>(source, accumulator); +#else + return Defer(() => + { + var accumulation = default(TSource); + var hasAccumulation = false; + return source.Select(x => + { + if (hasAccumulation) + accumulation = accumulator(accumulation, x); + else + { + accumulation = x; + hasAccumulation = true; + } + return accumulation; + }); + }); +#endif + } + + #endregion + + #region + SkipLast + + + public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count) + { +#if !NO_PERF + return new SkipLast<TSource>(source, count); +#else + return new AnonymousObservable<TSource>(observer => + { + var q = new Queue<TSource>(); + + return source.Subscribe( + x => + { + q.Enqueue(x); + if (q.Count > count) + observer.OnNext(q.Dequeue()); + }, + observer.OnError, + observer.OnCompleted); + }); +#endif + } + + #endregion + + #region - StartWith - + + public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values) + { + return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values); + } + + public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values) + { + return StartWith_<TSource>(source, scheduler, values); + } + + private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values) + { + return values.ToObservable(scheduler).Concat(source); + } + + #endregion + + #region + TakeLast + + + public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count) + { + return TakeLast_(source, count, SchedulerDefaults.Iteration); + } + + public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler) + { + return TakeLast_(source, count, scheduler); + } + + private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler) + { +#if !NO_PERF + return new TakeLast<TSource>(source, count, scheduler); +#else + return new AnonymousObservable<TSource>(observer => + { + var q = new Queue<TSource>(); + + var g = new CompositeDisposable(); + + g.Add(source.Subscribe( + x => + { + q.Enqueue(x); + if (q.Count > count) + q.Dequeue(); + }, + observer.OnError, + () => + { + g.Add(scheduler.Schedule(rec => + { + if (q.Count > 0) + { + observer.OnNext(q.Dequeue()); + rec(); + } + else + { + observer.OnCompleted(); + } + })); + } + )); + + return g; + }); +#endif + } + + public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count) + { +#if !NO_PERF + return new TakeLastBuffer<TSource>(source, count); +#else + return new AnonymousObservable<IList<TSource>>(observer => + { + var q = new Queue<TSource>(); + + return source.Subscribe( + x => + { + q.Enqueue(x); + if (q.Count > count) + q.Dequeue(); + }, + observer.OnError, + () => + { + observer.OnNext(q.ToList()); + observer.OnCompleted(); + }); + }); +#endif + } + + #endregion + + #region + Window + + + public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip) + { + return Window_<TSource>(source, count, skip); + } + + public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count) + { + return Window_<TSource>(source, count, count); + } + + private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip) + { +#if !NO_PERF + return new Window<TSource>(source, count, skip); +#else + return new AnonymousObservable<IObservable<TSource>>(observer => + { + var q = new Queue<ISubject<TSource>>(); + var n = 0; + + var m = new SingleAssignmentDisposable(); + var refCountDisposable = new RefCountDisposable(m); + + Action createWindow = () => + { + var s = new Subject<TSource>(); + q.Enqueue(s); + observer.OnNext(s.AddRef(refCountDisposable)); + }; + + createWindow(); + + m.Disposable = source.Subscribe( + x => + { + foreach (var s in q) + s.OnNext(x); + + var c = n - count + 1; + if (c >= 0 && c % skip == 0) + { + var s = q.Dequeue(); + s.OnCompleted(); + } + + n++; + if (n % skip == 0) + createWindow(); + }, + exception => + { + while (q.Count > 0) + q.Dequeue().OnError(exception); + + observer.OnError(exception); + }, + () => + { + while (q.Count > 0) + q.Dequeue().OnCompleted(); + + observer.OnCompleted(); + } + ); + + return refCountDisposable; + }); +#endif + } + + #endregion + } +} |