From d1174f3f8979321a9182925df460e07e08157b41 Mon Sep 17 00:00:00 2001 From: Atsushi Eno Date: Tue, 13 Nov 2012 03:47:31 +0900 Subject: partial import of ca05fdeb565e: Reactive Extensions OSS V1.0 --- .../Reactive/Linq/QueryLanguage.Single.cs | 661 +++++++++++++++++++++ 1 file changed, 661 insertions(+) create mode 100644 Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs') diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs new file mode 100644 index 0000000..09e2df4 --- /dev/null +++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Single.cs @@ -0,0 +1,661 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Subjects; + +namespace System.Reactive.Linq +{ +#if !NO_PERF + using Observαble; +#endif + + internal partial class QueryLanguage + { + #region + AsObservable + + + public virtual IObservable AsObservable(IObservable source) + { +#if !NO_PERF + var asObservable = source as AsObservable; + if (asObservable != null) + return asObservable.Ω(); + + return new AsObservable(source); +#else + return new AnonymousObservable(observer => source.Subscribe(observer)); +#endif + } + + #endregion + + #region + Buffer + + + public virtual IObservable> Buffer(IObservable source, int count) + { + return Buffer_(source, count, count); + } + + public virtual IObservable> Buffer(IObservable source, int count, int skip) + { + return Buffer_(source, count, skip); + } + + private static IObservable> Buffer_(IObservable source, int count, int skip) + { +#if !NO_PERF + return new Buffer(source, count, skip); +#else + return Window_(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0); +#endif + } + + #endregion + + #region + Dematerialize + + + public virtual IObservable Dematerialize(IObservable> source) + { +#if !NO_PERF + var materialize = source as Materialize; + if (materialize != null) + return materialize.Dematerialize(); + + return new Dematerialize(source); +#else + return new AnonymousObservable(observer => + source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted)); +#endif + } + + #endregion + + #region + DistinctUntilChanged + + + public virtual IObservable DistinctUntilChanged(IObservable source) + { + return DistinctUntilChanged_(source, x => x, EqualityComparer.Default); + } + + public virtual IObservable DistinctUntilChanged(IObservable source, IEqualityComparer comparer) + { + return DistinctUntilChanged_(source, x => x, comparer); + } + + public virtual IObservable DistinctUntilChanged(IObservable source, Func keySelector) + { + return DistinctUntilChanged_(source, keySelector, EqualityComparer.Default); + } + + public virtual IObservable DistinctUntilChanged(IObservable source, Func keySelector, IEqualityComparer comparer) + { + return DistinctUntilChanged_(source, keySelector, comparer); + } + + private static IObservable DistinctUntilChanged_(IObservable source, Func keySelector, IEqualityComparer comparer) + { +#if !NO_PERF + return new DistinctUntilChanged(source, keySelector, comparer); +#else + return new AnonymousObservable(observer => + { + var currentKey = default(TKey); + var hasCurrentKey = false; + return source.Subscribe( + value => + { + var key = default(TKey); + try + { + key = keySelector(value); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + var comparerEquals = false; + if (hasCurrentKey) + { + try + { + comparerEquals = comparer.Equals(currentKey, key); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + } + if (!hasCurrentKey || !comparerEquals) + { + hasCurrentKey = true; + currentKey = key; + observer.OnNext(value); + } + }, + observer.OnError, + observer.OnCompleted); + }); +#endif + } + + #endregion + + #region + Do + + + public virtual IObservable Do(IObservable source, Action onNext) + { +#if !NO_PERF + return Do_(source, onNext, Stubs.Ignore, Stubs.Nop); +#else + // PERFORMANCE - Use of Select allows for operator coalescing + return source.Select( + x => + { + onNext(x); + return x; + } + ); +#endif + } + + public virtual IObservable Do(IObservable source, Action onNext, Action onCompleted) + { +#if !NO_PERF + return Do_(source, onNext, Stubs.Ignore, onCompleted); +#else + return new AnonymousObservable(obs => + { + return source.Subscribe( + x => + { + try + { + onNext(x); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnNext(x); + }, + obs.OnError, + () => + { + try + { + onCompleted(); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnCompleted(); + }); + }); +#endif + } + + public virtual IObservable Do(IObservable source, Action onNext, Action onError) + { +#if !NO_PERF + return Do_(source, onNext, onError, Stubs.Nop); +#else + return new AnonymousObservable(obs => + { + return source.Subscribe( + x => + { + try + { + onNext(x); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnNext(x); + }, + ex => + { + try + { + onError(ex); + } + catch (Exception ex2) + { + obs.OnError(ex2); + } + obs.OnError(ex); + }, + obs.OnCompleted); + }); +#endif + } + + public virtual IObservable Do(IObservable source, Action onNext, Action onError, Action onCompleted) + { + return Do_(source, onNext, onError, onCompleted); + } + + public virtual IObservable Do(IObservable source, IObserver observer) + { + return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted); + } + + private static IObservable Do_(IObservable source, Action onNext, Action onError, Action onCompleted) + { +#if !NO_PERF + return new Do(source, onNext, onError, onCompleted); +#else + return new AnonymousObservable(obs => + { + return source.Subscribe( + x => + { + try + { + onNext(x); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnNext(x); + }, + ex => + { + try + { + onError(ex); + } + catch (Exception ex2) + { + obs.OnError(ex2); + } + obs.OnError(ex); + }, + () => + { + try + { + onCompleted(); + } + catch (Exception ex) + { + obs.OnError(ex); + } + obs.OnCompleted(); + }); + }); +#endif + } + + #endregion + + #region + Finally + + + public virtual IObservable Finally(IObservable source, Action finallyAction) + { +#if !NO_PERF + return new Finally(source, finallyAction); +#else + return new AnonymousObservable(observer => + { + var subscription = source.Subscribe(observer); + + return Disposable.Create(() => + { + try + { + subscription.Dispose(); + } + finally + { + finallyAction(); + } + }); + }); +#endif + } + + #endregion + + #region + IgnoreElements + + + public virtual IObservable IgnoreElements(IObservable source) + { +#if !NO_PERF + var ignoreElements = source as IgnoreElements; + if (ignoreElements != null) + return ignoreElements.Ω(); + + return new IgnoreElements(source); +#else + return new AnonymousObservable(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted)); +#endif + } + + #endregion + + #region + Materialize + + + public virtual IObservable> Materialize(IObservable source) + { +#if !NO_PERF + // + // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to + // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case + // a reduction to xs.AsObservable() would be performed, those notification elements would survive. + // + + return new Materialize(source); +#else + return new AnonymousObservable>(observer => + source.Subscribe( + value => observer.OnNext(Notification.CreateOnNext(value)), + exception => + { + observer.OnNext(Notification.CreateOnError(exception)); + observer.OnCompleted(); + }, + () => + { + observer.OnNext(Notification.CreateOnCompleted()); + observer.OnCompleted(); + })); +#endif + } + + #endregion + + #region - Repeat - + + public virtual IObservable Repeat(IObservable source) + { + return RepeatInfinite(source).Concat(); + } + + private static IEnumerable RepeatInfinite(T value) + { + while (true) + yield return value; + } + + public virtual IObservable Repeat(IObservable source, int repeatCount) + { + return Enumerable.Repeat(source, repeatCount).Concat(); + } + + #endregion + + #region - Retry - + + public virtual IObservable Retry(IObservable source) + { + return RepeatInfinite(source).Catch(); + } + + public virtual IObservable Retry(IObservable source, int retryCount) + { + return Enumerable.Repeat(source, retryCount).Catch(); + } + + #endregion + + #region + Scan + + + public virtual IObservable Scan(IObservable source, TAccumulate seed, Func accumulator) + { +#if !NO_PERF + return new Scan(source, seed, accumulator); +#else + return Defer(() => + { + var accumulation = default(TAccumulate); + var hasAccumulation = false; + return source.Select(x => + { + if (hasAccumulation) + accumulation = accumulator(accumulation, x); + else + { + accumulation = accumulator(seed, x); + hasAccumulation = true; + } + return accumulation; + }); + }); +#endif + } + + public virtual IObservable Scan(IObservable source, Func accumulator) + { +#if !NO_PERF + return new Scan(source, accumulator); +#else + return Defer(() => + { + var accumulation = default(TSource); + var hasAccumulation = false; + return source.Select(x => + { + if (hasAccumulation) + accumulation = accumulator(accumulation, x); + else + { + accumulation = x; + hasAccumulation = true; + } + return accumulation; + }); + }); +#endif + } + + #endregion + + #region + SkipLast + + + public virtual IObservable SkipLast(IObservable source, int count) + { +#if !NO_PERF + return new SkipLast(source, count); +#else + return new AnonymousObservable(observer => + { + var q = new Queue(); + + return source.Subscribe( + x => + { + q.Enqueue(x); + if (q.Count > count) + observer.OnNext(q.Dequeue()); + }, + observer.OnError, + observer.OnCompleted); + }); +#endif + } + + #endregion + + #region - StartWith - + + public virtual IObservable StartWith(IObservable source, params TSource[] values) + { + return StartWith_(source, SchedulerDefaults.ConstantTimeOperations, values); + } + + public virtual IObservable StartWith(IObservable source, IScheduler scheduler, params TSource[] values) + { + return StartWith_(source, scheduler, values); + } + + private static IObservable StartWith_(IObservable source, IScheduler scheduler, params TSource[] values) + { + return values.ToObservable(scheduler).Concat(source); + } + + #endregion + + #region + TakeLast + + + public virtual IObservable TakeLast(IObservable source, int count) + { + return TakeLast_(source, count, SchedulerDefaults.Iteration); + } + + public virtual IObservable TakeLast(IObservable source, int count, IScheduler scheduler) + { + return TakeLast_(source, count, scheduler); + } + + private static IObservable TakeLast_(IObservable source, int count, IScheduler scheduler) + { +#if !NO_PERF + return new TakeLast(source, count, scheduler); +#else + return new AnonymousObservable(observer => + { + var q = new Queue(); + + var g = new CompositeDisposable(); + + g.Add(source.Subscribe( + x => + { + q.Enqueue(x); + if (q.Count > count) + q.Dequeue(); + }, + observer.OnError, + () => + { + g.Add(scheduler.Schedule(rec => + { + if (q.Count > 0) + { + observer.OnNext(q.Dequeue()); + rec(); + } + else + { + observer.OnCompleted(); + } + })); + } + )); + + return g; + }); +#endif + } + + public virtual IObservable> TakeLastBuffer(IObservable source, int count) + { +#if !NO_PERF + return new TakeLastBuffer(source, count); +#else + return new AnonymousObservable>(observer => + { + var q = new Queue(); + + return source.Subscribe( + x => + { + q.Enqueue(x); + if (q.Count > count) + q.Dequeue(); + }, + observer.OnError, + () => + { + observer.OnNext(q.ToList()); + observer.OnCompleted(); + }); + }); +#endif + } + + #endregion + + #region + Window + + + public virtual IObservable> Window(IObservable source, int count, int skip) + { + return Window_(source, count, skip); + } + + public virtual IObservable> Window(IObservable source, int count) + { + return Window_(source, count, count); + } + + private static IObservable> Window_(IObservable source, int count, int skip) + { +#if !NO_PERF + return new Window(source, count, skip); +#else + return new AnonymousObservable>(observer => + { + var q = new Queue>(); + var n = 0; + + var m = new SingleAssignmentDisposable(); + var refCountDisposable = new RefCountDisposable(m); + + Action createWindow = () => + { + var s = new Subject(); + q.Enqueue(s); + observer.OnNext(s.AddRef(refCountDisposable)); + }; + + createWindow(); + + m.Disposable = source.Subscribe( + x => + { + foreach (var s in q) + s.OnNext(x); + + var c = n - count + 1; + if (c >= 0 && c % skip == 0) + { + var s = q.Dequeue(); + s.OnCompleted(); + } + + n++; + if (n % skip == 0) + createWindow(); + }, + exception => + { + while (q.Count > 0) + q.Dequeue().OnError(exception); + + observer.OnError(exception); + }, + () => + { + while (q.Count > 0) + q.Dequeue().OnCompleted(); + + observer.OnCompleted(); + } + ); + + return refCountDisposable; + }); +#endif + } + + #endregion + } +} -- cgit v1.2.3