// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections; using System.Collections.Generic; using System.Threading; using System.Reactive.Disposables; #if NO_SEMAPHORE using System.Reactive.Threading; #endif namespace System.Reactive.Linq { #if !NO_PERF using Observαble; #endif internal partial class QueryLanguage { #region - Chunkify - public virtual IEnumerable> Chunkify(IObservable source) { return source.Collect>(() => new List(), (lst, x) => { lst.Add(x); return lst; }, _ => new List()); } #endregion #region + Collect + public virtual IEnumerable Collect(IObservable source, Func newCollector, Func merge) { return Collect_(source, newCollector, merge, _ => newCollector()); } public virtual IEnumerable Collect(IObservable source, Func getInitialCollector, Func merge, Func getNewCollector) { return Collect_(source, getInitialCollector, merge, getNewCollector); } private static IEnumerable Collect_(IObservable source, Func getInitialCollector, Func merge, Func getNewCollector) { #if !NO_PERF return new Collect(source, getInitialCollector, merge, getNewCollector); #else return new AnonymousEnumerable(() => { var c = getInitialCollector(); var f = default(Notification); var o = new object(); var done = false; return PushToPull( source, x => { lock (o) { if (x.HasValue) { try { c = merge(c, x.Value); } catch (Exception ex) { f = Notification.CreateOnError(ex); } } else f = x; } }, () => { if (f != null) { if (f.Kind == NotificationKind.OnError) { return Notification.CreateOnError(f.Exception); } else { if (done) return Notification.CreateOnCompleted(); else done = true; } } var l = default(TResult); lock (o) { l = c; c = getNewCollector(c); } return Notification.CreateOnNext(l); } ); }); #endif } #endregion #region First public virtual TSource First(IObservable source) { return FirstOrDefaultInternal(source, true); } public virtual TSource First(IObservable source, Func predicate) { return First(Where(source, predicate)); } #endregion #region FirstOrDefault public virtual TSource FirstOrDefault(IObservable source) { return FirstOrDefaultInternal(source, false); } public virtual TSource FirstOrDefault(IObservable source, Func predicate) { return FirstOrDefault(Where(source, predicate)); } private static TSource FirstOrDefaultInternal(IObservable source, bool throwOnEmpty) { var value = default(TSource); var seenValue = false; var ex = default(Exception); var evt = new ManualResetEvent(false); // // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink. // using (source.Subscribe/*Unsafe*/(new AnonymousObserver( v => { if (!seenValue) { value = v; } seenValue = true; evt.Set(); }, e => { ex = e; evt.Set(); }, () => { evt.Set(); }))) { evt.WaitOne(); } ex.ThrowIfNotNull(); if (throwOnEmpty && !seenValue) throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); return value; } #endregion #region + ForEach + public virtual void ForEach(IObservable source, Action onNext) { #if !NO_PERF var evt = new ManualResetEvent(false); var sink = new ForEach._(onNext, () => evt.Set()); using (source.SubscribeSafe(sink)) { evt.WaitOne(); } sink.Error.ThrowIfNotNull(); #else ForEach_(source, onNext); #endif } public virtual void ForEach(IObservable source, Action onNext) { #if !NO_PERF var evt = new ManualResetEvent(false); var sink = new ForEach.τ(onNext, () => evt.Set()); using (source.SubscribeSafe(sink)) { evt.WaitOne(); } sink.Error.ThrowIfNotNull(); #else var i = 0; ForEach_(source, x => onNext(x, checked(i++))); #endif } #if NO_PERF private static void ForEach_(IObservable source, Action onNext) { var exception = default(Exception); var evt = new ManualResetEvent(false); using (source.Subscribe( x => { try { onNext(x); } catch (Exception ex) { exception = ex; evt.Set(); } }, ex => { exception = ex; evt.Set(); }, () => evt.Set() )) { evt.WaitOne(); } if (exception != null) exception.Throw(); } #endif #endregion #region + GetEnumerator + public virtual IEnumerator GetEnumerator(IObservable source) { #if !NO_PERF && !NO_CDS var e = new GetEnumerator(); return e.Run(source); #else var q = new Queue>(); var s = new Semaphore(0, int.MaxValue); return PushToPull( source, x => { lock (q) q.Enqueue(x); s.Release(); }, () => { s.WaitOne(); lock (q) return q.Dequeue(); }); #endif } #endregion #region Last public virtual TSource Last(IObservable source) { return LastOrDefaultInternal(source, true); } public virtual TSource Last(IObservable source, Func predicate) { return Last(Where(source, predicate)); } #endregion #region LastOrDefault public virtual TSource LastOrDefault(IObservable source) { return LastOrDefaultInternal(source, false); } public virtual TSource LastOrDefault(IObservable source, Func predicate) { return LastOrDefault(Where(source, predicate)); } private static TSource LastOrDefaultInternal(IObservable source, bool throwOnEmpty) { var value = default(TSource); var seenValue = false; var ex = default(Exception); var evt = new ManualResetEvent(false); // // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink. // using (source.Subscribe/*Unsafe*/(new AnonymousObserver( v => { seenValue = true; value = v; }, e => { ex = e; evt.Set(); }, () => { evt.Set(); }))) { evt.WaitOne(); } ex.ThrowIfNotNull(); if (throwOnEmpty && !seenValue) throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); return value; } #endregion #region + Latest + public virtual IEnumerable Latest(IObservable source) { return new Latest(source); } #endregion #region + MostRecent + public virtual IEnumerable MostRecent(IObservable source, TSource initialValue) { return new MostRecent(source, initialValue); } #endregion #region + Next + public virtual IEnumerable Next(IObservable source) { return new Next(source); } #endregion #region Single public virtual TSource Single(IObservable source) { return SingleOrDefaultInternal(source, true); } public virtual TSource Single(IObservable source, Func predicate) { return Single(Where(source, predicate)); } #endregion #region SingleOrDefault public virtual TSource SingleOrDefault(IObservable source) { return SingleOrDefaultInternal(source, false); } public virtual TSource SingleOrDefault(IObservable source, Func predicate) { return SingleOrDefault(Where(source, predicate)); } private static TSource SingleOrDefaultInternal(IObservable source, bool throwOnEmpty) { var value = default(TSource); var seenValue = false; var ex = default(Exception); var evt = new ManualResetEvent(false); // // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink. // using (source.Subscribe/*Unsafe*/(new AnonymousObserver( v => { if (seenValue) { ex = new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT); evt.Set(); } value = v; seenValue = true; }, e => { ex = e; evt.Set(); }, () => { evt.Set(); }))) { evt.WaitOne(); } ex.ThrowIfNotNull(); if (throwOnEmpty && !seenValue) throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); return value; } #endregion #region Wait public virtual TSource Wait(IObservable source) { return LastOrDefaultInternal(source, true); } #endregion #region |> Helpers <| #if NO_CDS || NO_PERF private static IEnumerator PushToPull(IObservable source, Action> push, Func> pull) { var subscription = new SingleAssignmentDisposable(); var adapter = new PushPullAdapter(push, pull, subscription.Dispose); subscription.Disposable = source.SubscribeSafe(adapter); return adapter; } #endif #endregion } }