// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; #if !NO_TPL using System.Reactive.Threading.Tasks; using System.Threading.Tasks; #endif namespace System.Reactive.Linq { #if !NO_PERF using Observαble; #endif internal partial class QueryLanguage { #region + Amb + public virtual IObservable Amb(IObservable first, IObservable second) { return Amb_(first, second); } public virtual IObservable Amb(params IObservable[] sources) { return Amb_(sources); } public virtual IObservable Amb(IEnumerable> sources) { return Amb_(sources); } private static IObservable Amb_(IEnumerable> sources) { return sources.Aggregate(Observable.Never(), (previous, current) => previous.Amb(current)); } private static IObservable Amb_(IObservable leftSource, IObservable rightSource) { #if !NO_PERF return new Amb(leftSource, rightSource); #else return new AnonymousObservable(observer => { var leftSubscription = new SingleAssignmentDisposable(); var rightSubscription = new SingleAssignmentDisposable(); var choice = AmbState.Neither; var gate = new object(); var left = new AmbObserver(); var right = new AmbObserver(); left.Observer = Observer.Synchronize(Observer.Create( x => { if (choice == AmbState.Neither) { choice = AmbState.Left; rightSubscription.Dispose(); left.Observer = observer; } if (choice == AmbState.Left) observer.OnNext(x); }, ex => { if (choice == AmbState.Neither) { choice = AmbState.Left; rightSubscription.Dispose(); left.Observer = observer; } if (choice == AmbState.Left) observer.OnError(ex); }, () => { if (choice == AmbState.Neither) { choice = AmbState.Left; rightSubscription.Dispose(); left.Observer = observer; } if (choice == AmbState.Left) observer.OnCompleted(); } ), gate); right.Observer = Observer.Synchronize(Observer.Create( x => { if (choice == AmbState.Neither) { choice = AmbState.Right; leftSubscription.Dispose(); right.Observer = observer; } if (choice == AmbState.Right) observer.OnNext(x); }, ex => { if (choice == AmbState.Neither) { choice = AmbState.Right; leftSubscription.Dispose(); right.Observer = observer; } if (choice == AmbState.Right) observer.OnError(ex); }, () => { if (choice == AmbState.Neither) { choice = AmbState.Right; leftSubscription.Dispose(); right.Observer = observer; } if (choice == AmbState.Right) observer.OnCompleted(); } ), gate); leftSubscription.Disposable = leftSource.Subscribe(left); rightSubscription.Disposable = rightSource.Subscribe(right); return new CompositeDisposable(leftSubscription, rightSubscription); }); #endif } #if NO_PERF class AmbObserver : IObserver { public virtual IObserver Observer { get; set; } public virtual void OnCompleted() { Observer.OnCompleted(); } public virtual void OnError(Exception error) { Observer.OnError(error); } public virtual void OnNext(TSource value) { Observer.OnNext(value); } } enum AmbState { Left, Right, Neither } #endif #endregion #region + Buffer + public virtual IObservable> Buffer(IObservable source, Func> bufferClosingSelector) { #if !NO_PERF return new Buffer(source, bufferClosingSelector); #else return source.Window(bufferClosingSelector).SelectMany(ToList); #endif } public virtual IObservable> Buffer(IObservable source, IObservable bufferOpenings, Func> bufferClosingSelector) { return source.Window(bufferOpenings, bufferClosingSelector).SelectMany(ToList); } public virtual IObservable> Buffer(IObservable source, IObservable bufferBoundaries) { #if !NO_PERF return new Buffer(source, bufferBoundaries); #else return source.Window(bufferBoundaries).SelectMany(ToList); #endif } #endregion #region + Catch + public virtual IObservable Catch(IObservable source, Func> handler) where TException : Exception { #if !NO_PERF return new Catch(source, handler); #else return new AnonymousObservable(observer => { var subscription = new SerialDisposable(); var d1 = new SingleAssignmentDisposable(); subscription.Disposable = d1; d1.Disposable = source.Subscribe(observer.OnNext, exception => { var e = exception as TException; if (e != null) { IObservable result; try { result = handler(e); } catch (Exception ex) { observer.OnError(ex); return; } var d = new SingleAssignmentDisposable(); subscription.Disposable = d; d.Disposable = result.Subscribe(observer); } else observer.OnError(exception); }, observer.OnCompleted); return subscription; }); #endif } public virtual IObservable Catch(IObservable first, IObservable second) { return Catch_(new[] { first, second }); } public virtual IObservable Catch(params IObservable[] sources) { return Catch_(sources); } public virtual IObservable Catch(IEnumerable> sources) { return Catch_(sources); } private static IObservable Catch_(IEnumerable> sources) { #if !NO_PERF return new Catch(sources); #else return new AnonymousObservable(observer => { var gate = new AsyncLock(); var isDisposed = false; var e = sources.GetEnumerator(); var subscription = new SerialDisposable(); var lastException = default(Exception); var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() => { var current = default(IObservable); var hasNext = false; var ex = default(Exception); if (!isDisposed) { try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; else e.Dispose(); } catch (Exception exception) { ex = exception; e.Dispose(); } } else return; if (ex != null) { observer.OnError(ex); return; } if (!hasNext) { if (lastException != null) observer.OnError(lastException); else observer.OnCompleted(); return; } var d = new SingleAssignmentDisposable(); subscription.Disposable = d; d.Disposable = current.Subscribe(observer.OnNext, exception => { lastException = exception; self(); }, observer.OnCompleted); })); return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() => { e.Dispose(); isDisposed = true; }))); }); #endif } #endregion #region + CombineLatest + public virtual IObservable CombineLatest(IObservable first, IObservable second, Func resultSelector) { #if !NO_PERF return new CombineLatest(first, second, resultSelector); #else return new AnonymousObservable(observer => { var hasLeft = false; var hasRight = false; var left = default(TFirst); var right = default(TSecond); var leftDone = false; var rightDone = false; var leftSubscription = new SingleAssignmentDisposable(); var rightSubscription = new SingleAssignmentDisposable(); var gate = new object(); leftSubscription.Disposable = first.Synchronize(gate).Subscribe( l => { hasLeft = true; left = l; if (hasRight) { var res = default(TResult); try { res = resultSelector(left, right); } catch (Exception ex) { observer.OnError(ex); return; } observer.OnNext(res); } else if (rightDone) { observer.OnCompleted(); return; } }, observer.OnError, () => { leftDone = true; if (rightDone) { observer.OnCompleted(); return; } } ); rightSubscription.Disposable = second.Synchronize(gate).Subscribe( r => { hasRight = true; right = r; if (hasLeft) { var res = default(TResult); try { res = resultSelector(left, right); } catch (Exception ex) { observer.OnError(ex); return; } observer.OnNext(res); } else if (leftDone) { observer.OnCompleted(); return; } }, observer.OnError, () => { rightDone = true; if (leftDone) { observer.OnCompleted(); return; } } ); return new CompositeDisposable(leftSubscription, rightSubscription); }); #endif } #if !NO_PERF /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */ #region CombineLatest auto-generated code (6/10/2012 7:25:03 PM) public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, Func resultSelector) { return new CombineLatest(source1, source2, source3, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, resultSelector); } #if !NO_LARGEARITY public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, IObservable source14, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, IObservable source14, IObservable source15, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector); } public virtual IObservable CombineLatest(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, IObservable source14, IObservable source15, IObservable source16, Func resultSelector) { return new CombineLatest(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector); } #endif #endregion #endif public virtual IObservable CombineLatest(IEnumerable> sources, Func, TResult> resultSelector) { return CombineLatest_(sources, resultSelector); } public virtual IObservable> CombineLatest(IEnumerable> sources) { return CombineLatest_>(sources, res => res.ToList()); } public virtual IObservable> CombineLatest(params IObservable[] sources) { return CombineLatest_>(sources, res => res.ToList()); } private static IObservable CombineLatest_(IEnumerable> sources, Func, TResult> resultSelector) { #if !NO_PERF return new CombineLatest(sources, resultSelector); #else return new AnonymousObservable(observer => { var srcs = sources.ToArray(); var N = srcs.Length; var hasValue = new bool[N]; var hasValueAll = false; var values = new List(N); for (int i = 0; i < N; i++) values.Add(default(TSource)); var isDone = new bool[N]; var next = new Action(i => { hasValue[i] = true; if (hasValueAll || (hasValueAll = hasValue.All(Stubs.I))) { var res = default(TResult); try { res = resultSelector(new ReadOnlyCollection(values)); } catch (Exception ex) { observer.OnError(ex); return; } observer.OnNext(res); } else if (isDone.Where((x, j) => j != i).All(Stubs.I)) { observer.OnCompleted(); return; } }); var done = new Action(i => { isDone[i] = true; if (isDone.All(Stubs.I)) { observer.OnCompleted(); return; } }); var subscriptions = new SingleAssignmentDisposable[N]; var gate = new object(); for (int i = 0; i < N; i++) { var j = i; subscriptions[j] = new SingleAssignmentDisposable { Disposable = srcs[j].Synchronize(gate).Subscribe( x => { values[j] = x; next(j); }, observer.OnError, () => { done(j); } ) }; } return new CompositeDisposable(subscriptions); }); #endif } #endregion #region + Concat + public virtual IObservable Concat(IObservable first, IObservable second) { return Concat_(new[] { first, second }); } public virtual IObservable Concat(params IObservable[] sources) { return Concat_(sources); } public virtual IObservable Concat(IEnumerable> sources) { return Concat_(sources); } private static IObservable Concat_(IEnumerable> sources) { #if !NO_PERF return new Concat(sources); #else return new AnonymousObservable(observer => { var isDisposed = false; var e = sources.GetEnumerator(); var subscription = new SerialDisposable(); var gate = new AsyncLock(); var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() => { var current = default(IObservable); var hasNext = false; var ex = default(Exception); if (!isDisposed) { try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; else e.Dispose(); } catch (Exception exception) { ex = exception; e.Dispose(); } } else return; if (ex != null) { observer.OnError(ex); return; } if (!hasNext) { observer.OnCompleted(); return; } var d = new SingleAssignmentDisposable(); subscription.Disposable = d; d.Disposable = current.Subscribe(observer.OnNext, observer.OnError, self); })); return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() => { e.Dispose(); isDisposed = true; }))); }); #endif } public virtual IObservable Concat(IObservable> sources) { return Concat_(sources); } #if !NO_TPL public virtual IObservable Concat(IObservable> sources) { return Concat_(Select(sources, TaskObservableExtensions.ToObservable)); } #endif private IObservable Concat_(IObservable> sources) { return Merge(sources, 1); } #endregion #region + Merge + public virtual IObservable Merge(IObservable> sources) { return Merge_(sources); } #if !NO_TPL public virtual IObservable Merge(IObservable> sources) { #if !NO_PERF return new Merge(sources); #else return Merge_(Select(sources, TaskObservableExtensions.ToObservable)); #endif } #endif public virtual IObservable Merge(IObservable> sources, int maxConcurrent) { return Merge_(sources, maxConcurrent); } public virtual IObservable Merge(IEnumerable> sources, int maxConcurrent) { return Merge_(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations), maxConcurrent); } public virtual IObservable Merge(IEnumerable> sources, int maxConcurrent, IScheduler scheduler) { return Merge_(sources.ToObservable(scheduler), maxConcurrent); } public virtual IObservable Merge(IObservable first, IObservable second) { return Merge_(new[] { first, second }.ToObservable(SchedulerDefaults.ConstantTimeOperations)); } public virtual IObservable Merge(IObservable first, IObservable second, IScheduler scheduler) { return Merge_(new[] { first, second }.ToObservable(scheduler)); } public virtual IObservable Merge(params IObservable[] sources) { return Merge_(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations)); } public virtual IObservable Merge(IScheduler scheduler, params IObservable[] sources) { return Merge_(sources.ToObservable(scheduler)); } public virtual IObservable Merge(IEnumerable> sources) { return Merge_(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations)); } public virtual IObservable Merge(IEnumerable> sources, IScheduler scheduler) { return Merge_(sources.ToObservable(scheduler)); } private static IObservable Merge_(IObservable> sources) { #if !NO_PERF return new Merge(sources); #else return new AnonymousObservable(observer => { var gate = new object(); var isStopped = false; var m = new SingleAssignmentDisposable(); var group = new CompositeDisposable() { m }; m.Disposable = sources.Subscribe( innerSource => { var innerSubscription = new SingleAssignmentDisposable(); group.Add(innerSubscription); innerSubscription.Disposable = innerSource.Subscribe( x => { lock (gate) observer.OnNext(x); }, exception => { lock (gate) observer.OnError(exception); }, () => { group.Remove(innerSubscription); // modification MUST occur before subsequent check if (isStopped && group.Count == 1) // isStopped must be checked before group Count to ensure outer is not creating more groups lock (gate) observer.OnCompleted(); }); }, exception => { lock (gate) observer.OnError(exception); }, () => { isStopped = true; // modification MUST occur before subsequent check if (group.Count == 1) lock (gate) observer.OnCompleted(); }); return group; }); #endif } private static IObservable Merge_(IObservable> sources, int maxConcurrent) { #if !NO_PERF return new Merge(sources, maxConcurrent); #else return new AnonymousObservable(observer => { var gate = new object(); var q = new Queue>(); var isStopped = false; var group = new CompositeDisposable(); var activeCount = 0; var subscribe = default(Action>); subscribe = xs => { var subscription = new SingleAssignmentDisposable(); group.Add(subscription); subscription.Disposable = xs.Subscribe( x => { lock (gate) observer.OnNext(x); }, exception => { lock (gate) observer.OnError(exception); }, () => { group.Remove(subscription); lock (gate) { if (q.Count > 0) { var s = q.Dequeue(); subscribe(s); } else { activeCount--; if (isStopped && activeCount == 0) observer.OnCompleted(); } } }); }; group.Add(sources.Subscribe( innerSource => { lock (gate) { if (activeCount < maxConcurrent) { activeCount++; subscribe(innerSource); } else q.Enqueue(innerSource); } }, exception => { lock (gate) observer.OnError(exception); }, () => { lock (gate) { isStopped = true; if (activeCount == 0) observer.OnCompleted(); } })); return group; }); #endif } #endregion #region + OnErrorResumeNext + public virtual IObservable OnErrorResumeNext(IObservable first, IObservable second) { return OnErrorResumeNext_(new[] { first, second }); } public virtual IObservable OnErrorResumeNext(params IObservable[] sources) { return OnErrorResumeNext_(sources); } public virtual IObservable OnErrorResumeNext(IEnumerable> sources) { return OnErrorResumeNext_(sources); } private static IObservable OnErrorResumeNext_(IEnumerable> sources) { #if !NO_PERF return new OnErrorResumeNext(sources); #else return new AnonymousObservable(observer => { var gate = new AsyncLock(); var isDisposed = false; var e = sources.GetEnumerator(); var subscription = new SerialDisposable(); var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() => { var current = default(IObservable); var hasNext = false; var ex = default(Exception); if (!isDisposed) { try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; else e.Dispose(); } catch (Exception exception) { ex = exception; e.Dispose(); } } else return; if (ex != null) { observer.OnError(ex); return; } if (!hasNext) { observer.OnCompleted(); return; } var d = new SingleAssignmentDisposable(); subscription.Disposable = d; d.Disposable = current.Subscribe(observer.OnNext, exception => self(), self); })); return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() => { e.Dispose(); isDisposed = true; }))); }); #endif } #endregion #region + SkipUntil + public virtual IObservable SkipUntil(IObservable source, IObservable other) { #if !NO_PERF return new SkipUntil(source, other); #else return new AnonymousObservable(observer => { var sourceSubscription = new SingleAssignmentDisposable(); var otherSubscription = new SingleAssignmentDisposable(); var open = false; var gate = new object(); sourceSubscription.Disposable = source.Synchronize(gate).Subscribe( x => { if (open) observer.OnNext(x); }, observer.OnError, // BREAKING CHANGE - Error propagation was guarded by "other" source in v1.0.10621 (due to materialization). () => { if (open) observer.OnCompleted(); } ); otherSubscription.Disposable = other.Synchronize(gate).Subscribe( x => { open = true; otherSubscription.Dispose(); }, observer.OnError ); return new CompositeDisposable(sourceSubscription, otherSubscription); }); #endif } #endregion #region + Switch + public virtual IObservable Switch(IObservable> sources) { return Switch_(sources); } #if !NO_TPL public virtual IObservable Switch(IObservable> sources) { return Switch_(Select(sources, TaskObservableExtensions.ToObservable)); } #endif private IObservable Switch_(IObservable> sources) { #if !NO_PERF return new Switch(sources); #else return new AnonymousObservable(observer => { var gate = new object(); var innerSubscription = new SerialDisposable(); var isStopped = false; var latest = 0UL; var hasLatest = false; var subscription = sources.Subscribe( innerSource => { var id = default(ulong); lock (gate) { id = unchecked(++latest); hasLatest = true; } var d = new SingleAssignmentDisposable(); innerSubscription.Disposable = d; d.Disposable = innerSource.Subscribe( x => { lock (gate) { if (latest == id) observer.OnNext(x); } }, exception => { lock (gate) { if (latest == id) observer.OnError(exception); } }, () => { lock (gate) { if (latest == id) { hasLatest = false; if (isStopped) observer.OnCompleted(); } } }); }, exception => { lock (gate) observer.OnError(exception); }, () => { lock (gate) { isStopped = true; if (!hasLatest) observer.OnCompleted(); } }); return new CompositeDisposable(subscription, innerSubscription); }); #endif } #endregion #region + TakeUntil + public virtual IObservable TakeUntil(IObservable source, IObservable other) { #if !NO_PERF return new TakeUntil(source, other); #else return new AnonymousObservable(observer => { var sourceSubscription = new SingleAssignmentDisposable(); var otherSubscription = new SingleAssignmentDisposable(); var gate = new object(); // COMPAT - Order of Subscribe calls per v1.0.10621 otherSubscription.Disposable = other.Synchronize(gate).Subscribe( x => { observer.OnCompleted(); }, observer.OnError ); sourceSubscription.Disposable = source.Synchronize(gate).Finally(otherSubscription.Dispose).Subscribe(observer); return new CompositeDisposable(sourceSubscription, otherSubscription); }); #endif } #endregion #region + Window + public virtual IObservable> Window(IObservable source, Func> windowClosingSelector) { #if !NO_PERF return new Window(source, windowClosingSelector); #else return new AnonymousObservable>(observer => { var window = new Subject(); var gate = new object(); var m = new SerialDisposable(); var d = new CompositeDisposable(2) { m }; var r = new RefCountDisposable(d); observer.OnNext(window.AddRef(r)); d.Add(source.SubscribeSafe(new AnonymousObserver( x => { lock (gate) { window.OnNext(x); } }, ex => { lock (gate) { window.OnError(ex); observer.OnError(ex); } }, () => { lock (gate) { window.OnCompleted(); observer.OnCompleted(); } }))); var l = new AsyncLock(); Action createWindowClose = null; createWindowClose = () => { var windowClose = default(IObservable); try { windowClose = windowClosingSelector(); } catch (Exception exception) { lock (gate) { observer.OnError(exception); } return; } var m1 = new SingleAssignmentDisposable(); m.Disposable = m1; m1.Disposable = windowClose.Take(1).SubscribeSafe(new AnonymousObserver( Stubs.Ignore, ex => { lock (gate) { window.OnError(ex); observer.OnError(ex); } }, () => { lock (gate) { window.OnCompleted(); window = new Subject(); observer.OnNext(window.AddRef(r)); } l.Wait(createWindowClose); })); }; l.Wait(createWindowClose); return r; }); #endif } public virtual IObservable> Window(IObservable source, IObservable windowOpenings, Func> windowClosingSelector) { return windowOpenings.GroupJoin(source, windowClosingSelector, _ => Observable.Empty(), (_, window) => window); } public virtual IObservable> Window(IObservable source, IObservable windowBoundaries) { #if !NO_PERF return new Window(source, windowBoundaries); #else return new AnonymousObservable>(observer => { var window = new Subject(); var gate = new object(); var d = new CompositeDisposable(2); var r = new RefCountDisposable(d); observer.OnNext(window.AddRef(r)); d.Add(source.SubscribeSafe(new AnonymousObserver( x => { lock (gate) { window.OnNext(x); } }, ex => { lock (gate) { window.OnError(ex); observer.OnError(ex); } }, () => { lock (gate) { window.OnCompleted(); observer.OnCompleted(); } } ))); d.Add(windowBoundaries.SubscribeSafe(new AnonymousObserver( w => { lock (gate) { window.OnCompleted(); window = new Subject(); observer.OnNext(window.AddRef(r)); } }, ex => { lock (gate) { window.OnError(ex); observer.OnError(ex); } }, () => { lock (gate) { window.OnCompleted(); observer.OnCompleted(); } } ))); return r; }); #endif } #endregion #region + Zip + public virtual IObservable Zip(IObservable first, IObservable second, Func resultSelector) { #if !NO_PERF return new Zip(first, second, resultSelector); #else return new AnonymousObservable(observer => { var queueLeft = new Queue(); var queueRight = new Queue(); var leftDone = false; var rightDone = false; var leftSubscription = new SingleAssignmentDisposable(); var rightSubscription = new SingleAssignmentDisposable(); var gate = new object(); leftSubscription.Disposable = first.Synchronize(gate).Subscribe( l => { if (queueRight.Count > 0) { var r = queueRight.Dequeue(); var res = default(TResult); try { res = resultSelector(l, r); } catch (Exception ex) { observer.OnError(ex); return; } observer.OnNext(res); } else { if (rightDone) { observer.OnCompleted(); return; } queueLeft.Enqueue(l); } }, observer.OnError, () => { leftDone = true; if (rightDone) { observer.OnCompleted(); return; } } ); rightSubscription.Disposable = second.Synchronize(gate).Subscribe( r => { if (queueLeft.Count > 0) { var l = queueLeft.Dequeue(); var res = default(TResult); try { res = resultSelector(l, r); } catch (Exception ex) { observer.OnError(ex); return; } observer.OnNext(res); } else { if (leftDone) { observer.OnCompleted(); return; } queueRight.Enqueue(r); } }, observer.OnError, () => { rightDone = true; if (leftDone) { observer.OnCompleted(); return; } } ); return new CompositeDisposable(leftSubscription, rightSubscription, Disposable.Create(() => { queueLeft.Clear(); queueRight.Clear(); })); }); #endif } public virtual IObservable Zip(IEnumerable> sources, Func, TResult> resultSelector) { return Zip_(sources).Select(resultSelector); } public virtual IObservable> Zip(IEnumerable> sources) { return Zip_(sources); } public virtual IObservable> Zip(params IObservable[] sources) { return Zip_(sources); } private static IObservable> Zip_(IEnumerable> sources) { #if !NO_PERF return new Zip(sources); #else return new AnonymousObservable>(observer => { var srcs = sources.ToArray(); var N = srcs.Length; var queues = new Queue[N]; for (int i = 0; i < N; i++) queues[i] = new Queue(); var isDone = new bool[N]; var next = new Action(i => { if (queues.All(q => q.Count > 0)) { var res = queues.Select(q => q.Dequeue()).ToList(); observer.OnNext(res); } else if (isDone.Where((x, j) => j != i).All(Stubs.I)) { observer.OnCompleted(); return; } }); var done = new Action(i => { isDone[i] = true; if (isDone.All(Stubs.I)) { observer.OnCompleted(); return; } }); var subscriptions = new SingleAssignmentDisposable[N]; var gate = new object(); for (int i = 0; i < N; i++) { var j = i; subscriptions[j] = new SingleAssignmentDisposable { Disposable = srcs[j].Synchronize(gate).Subscribe( x => { queues[j].Enqueue(x); next(j); }, observer.OnError, () => { done(j); } ) }; } return new CompositeDisposable(subscriptions) { Disposable.Create(() => { foreach (var q in queues) q.Clear(); }) }; }); #endif } #if !NO_PERF /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */ #region Zip auto-generated code (6/10/2012 8:15:28 PM) public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, Func resultSelector) { return new Zip(source1, source2, source3, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, Func resultSelector) { return new Zip(source1, source2, source3, source4, resultSelector); } #if !NO_LARGEARITY public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, IObservable source14, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, IObservable source14, IObservable source15, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector); } public virtual IObservable Zip(IObservable source1, IObservable source2, IObservable source3, IObservable source4, IObservable source5, IObservable source6, IObservable source7, IObservable source8, IObservable source9, IObservable source10, IObservable source11, IObservable source12, IObservable source13, IObservable source14, IObservable source15, IObservable source16, Func resultSelector) { return new Zip(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector); } #endif #endregion #endif public virtual IObservable Zip(IObservable first, IEnumerable second, Func resultSelector) { #if !NO_PERF return new Zip(first, second, resultSelector); #else return new AnonymousObservable(observer => { var rightEnumerator = second.GetEnumerator(); var leftSubscription = first.Subscribe(left => { var hasNext = false; try { hasNext = rightEnumerator.MoveNext(); } catch (Exception ex) { observer.OnError(ex); return; } if (hasNext) { var right = default(TSecond); try { right = rightEnumerator.Current; } catch (Exception ex) { observer.OnError(ex); return; } TResult result; try { result = resultSelector(left, right); } catch (Exception ex) { observer.OnError(ex); return; } observer.OnNext(result); } else { observer.OnCompleted(); } }, observer.OnError, observer.OnCompleted ); return new CompositeDisposable(leftSubscription, rightEnumerator); }); #endif } #endregion #region |> Helpers <| #if NO_PERF private static IObservable Combine(IObservable leftSource, IObservable rightSource, Func, IDisposable, IDisposable, IObserver, Notification>>> combinerSelector) { return new AnonymousObservable(observer => { var leftSubscription = new SingleAssignmentDisposable(); var rightSubscription = new SingleAssignmentDisposable(); var combiner = combinerSelector(observer, leftSubscription, rightSubscription); var gate = new object(); leftSubscription.Disposable = leftSource.Materialize().Select(x => Either, Notification>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner); rightSubscription.Disposable = rightSource.Materialize().Select(x => Either, Notification>.CreateRight(x)).Synchronize(gate).Subscribe(combiner); return new CompositeDisposable(leftSubscription, rightSubscription); }); } #endif #endregion } }