// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; #if !NO_TPL using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; #endif namespace System.Reactive.Linq { #if !NO_PERF using Observαble; #endif internal partial class QueryLanguage { #region + Cast + public virtual IObservable Cast(IObservable source) { #if !NO_PERF return new Cast(source); #else return source.Select(x => (TResult)x); #endif } #endregion #region + DefaultIfEmpty + public virtual IObservable DefaultIfEmpty(IObservable source) { #if !NO_PERF return new DefaultIfEmpty(source, default(TSource)); #else return DefaultIfEmpty_(source, default(TSource)); #endif } public virtual IObservable DefaultIfEmpty(IObservable source, TSource defaultValue) { #if !NO_PERF return new DefaultIfEmpty(source, defaultValue); #else return DefaultIfEmpty_(source, defaultValue); #endif } #if NO_PERF private static IObservable DefaultIfEmpty_(IObservable source, TSource defaultValue) { return new AnonymousObservable(observer => { var found = false; return source.Subscribe( x => { found = true; observer.OnNext(x); }, observer.OnError, () => { if (!found) observer.OnNext(defaultValue); observer.OnCompleted(); } ); }); } #endif #endregion #region + Distinct + public virtual IObservable Distinct(IObservable source) { #if !NO_PERF return new Distinct(source, x => x, EqualityComparer.Default); #else return Distinct_(source, x => x, EqualityComparer.Default); #endif } public virtual IObservable Distinct(IObservable source, IEqualityComparer comparer) { #if !NO_PERF return new Distinct(source, x => x, comparer); #else return Distinct_(source, x => x, comparer); #endif } public virtual IObservable Distinct(IObservable source, Func keySelector) { #if !NO_PERF return new Distinct(source, keySelector, EqualityComparer.Default); #else return Distinct_(source, keySelector, EqualityComparer.Default); #endif } public virtual IObservable Distinct(IObservable source, Func keySelector, IEqualityComparer comparer) { #if !NO_PERF return new Distinct(source, keySelector, comparer); #else return Distinct_(source, keySelector, comparer); #endif } #if NO_PERF private static IObservable Distinct_(IObservable source, Func keySelector, IEqualityComparer comparer) { return new AnonymousObservable(observer => { var hashSet = new HashSet(comparer); return source.Subscribe( x => { var key = default(TKey); var hasAdded = false; try { key = keySelector(x); hasAdded = hashSet.Add(key); } catch (Exception exception) { observer.OnError(exception); return; } if (hasAdded) observer.OnNext(x); }, observer.OnError, observer.OnCompleted ); }); } #endif #endregion #region + GroupBy + public virtual IObservable> GroupBy(IObservable source, Func keySelector, Func elementSelector) { return GroupBy_(source, keySelector, elementSelector, EqualityComparer.Default); } public virtual IObservable> GroupBy(IObservable source, Func keySelector, IEqualityComparer comparer) { return GroupBy_(source, keySelector, x => x, comparer); } public virtual IObservable> GroupBy(IObservable source, Func keySelector) { return GroupBy_(source, keySelector, x => x, EqualityComparer.Default); } public virtual IObservable> GroupBy(IObservable source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { return GroupBy_(source, keySelector, elementSelector, comparer); } private static IObservable> GroupBy_(IObservable source, Func keySelector, Func elementSelector, IEqualityComparer comparer) { #if !NO_PERF return new GroupBy(source, keySelector, elementSelector, comparer); #else return GroupByUntil_(source, keySelector, elementSelector, _ => Observable.Never(), comparer); #endif } #endregion #region + GroupByUntil + public virtual IObservable> GroupByUntil(IObservable source, Func keySelector, Func elementSelector, Func, IObservable> durationSelector, IEqualityComparer comparer) { return GroupByUntil_(source, keySelector, elementSelector, durationSelector, comparer); } public virtual IObservable> GroupByUntil(IObservable source, Func keySelector, Func elementSelector, Func, IObservable> durationSelector) { return GroupByUntil_(source, keySelector, elementSelector, durationSelector, EqualityComparer.Default); } public virtual IObservable> GroupByUntil(IObservable source, Func keySelector, Func, IObservable> durationSelector, IEqualityComparer comparer) { return GroupByUntil_(source, keySelector, x => x, durationSelector, comparer); } public virtual IObservable> GroupByUntil(IObservable source, Func keySelector, Func, IObservable> durationSelector) { return GroupByUntil_(source, keySelector, x => x, durationSelector, EqualityComparer.Default); } private static IObservable> GroupByUntil_(IObservable source, Func keySelector, Func elementSelector, Func, IObservable> durationSelector, IEqualityComparer comparer) { #if !NO_PERF return new GroupByUntil(source, keySelector, elementSelector, durationSelector, comparer); #else return new AnonymousObservable>(observer => { var map = new Dictionary>(comparer); var groupDisposable = new CompositeDisposable(); var refCountDisposable = new RefCountDisposable(groupDisposable); groupDisposable.Add(source.Subscribe(x => { var key = default(TKey); try { key = keySelector(x); } catch (Exception exception) { lock (map) foreach (var w in map.Values.ToArray()) w.OnError(exception); observer.OnError(exception); return; } var fireNewMapEntry = false; var writer = default(ISubject); try { lock (map) { if (!map.TryGetValue(key, out writer)) { writer = new Subject(); map.Add(key, writer); fireNewMapEntry = true; } } } catch (Exception exception) { lock (map) { foreach (var w in map.Values.ToArray()) w.OnError(exception); } observer.OnError(exception); return; } if (fireNewMapEntry) { var group = new GroupedObservable(key, writer, refCountDisposable); var durationGroup = new GroupedObservable(key, writer); var duration = default(IObservable); try { duration = durationSelector(durationGroup); } catch (Exception exception) { foreach (var w in map.Values.ToArray()) w.OnError(exception); observer.OnError(exception); return; } observer.OnNext(group); var md = new SingleAssignmentDisposable(); groupDisposable.Add(md); Action expire = () => { lock (map) { if (map.Remove(key)) writer.OnCompleted(); } groupDisposable.Remove(md); }; md.Disposable = duration.Take(1).Subscribe( _ => { }, exception => { lock (map) foreach (var o in map.Values.ToArray()) o.OnError(exception); observer.OnError(exception); }, expire); } var element = default(TElement); try { element = elementSelector(x); } catch (Exception exception) { lock (map) foreach (var w in map.Values.ToArray()) w.OnError(exception); observer.OnError(exception); return; } writer.OnNext(element); }, e => { lock (map) foreach (var w in map.Values.ToArray()) w.OnError(e); observer.OnError(e); }, () => { lock (map) foreach (var w in map.Values.ToArray()) w.OnCompleted(); observer.OnCompleted(); })); return refCountDisposable; }); #endif } #endregion #region + GroupJoin + public virtual IObservable GroupJoin(IObservable left, IObservable right, Func> leftDurationSelector, Func> rightDurationSelector, Func, TResult> resultSelector) { return GroupJoin_(left, right, leftDurationSelector, rightDurationSelector, resultSelector); } private static IObservable GroupJoin_(IObservable left, IObservable right, Func> leftDurationSelector, Func> rightDurationSelector, Func, TResult> resultSelector) { #if !NO_PERF return new GroupJoin(left, right, leftDurationSelector, rightDurationSelector, resultSelector); #else return new AnonymousObservable(observer => { var gate = new object(); var group = new CompositeDisposable(); var r = new RefCountDisposable(group); var leftMap = new Dictionary>(); var rightMap = new Dictionary(); var leftID = 0; var rightID = 0; group.Add(left.Subscribe( value => { var s = new Subject(); var id = 0; lock (gate) { id = leftID++; leftMap.Add(id, s); } lock (gate) { var result = default(TResult); try { result = resultSelector(value, s.AddRef(r)); } catch (Exception exception) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); return; } observer.OnNext(result); foreach (var rightValue in rightMap.Values.ToArray()) { s.OnNext(rightValue); } } var md = new SingleAssignmentDisposable(); group.Add(md); Action expire = () => { lock (gate) if (leftMap.Remove(id)) s.OnCompleted(); group.Remove(md); }; var duration = default(IObservable); try { duration = leftDurationSelector(value); } catch (Exception exception) { lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); } return; } md.Disposable = duration.Take(1).Subscribe( _ => { }, exception => { lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); } }, expire); }, exception => { lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); } }, () => { lock (gate) observer.OnCompleted(); })); group.Add(right.Subscribe( value => { var id = 0; lock (gate) { id = rightID++; rightMap.Add(id, value); } var md = new SingleAssignmentDisposable(); group.Add(md); Action expire = () => { lock (gate) rightMap.Remove(id); group.Remove(md); }; var duration = default(IObservable); try { duration = rightDurationSelector(value); } catch (Exception exception) { lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); } return; } md.Disposable = duration.Take(1).Subscribe( _ => { }, exception => { lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); } }, expire); lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnNext(value); } }, exception => { lock (gate) { foreach (var o in leftMap.Values.ToArray()) o.OnError(exception); observer.OnError(exception); } })); return r; }); #endif } #endregion #region + Join + public virtual IObservable Join(IObservable left, IObservable right, Func> leftDurationSelector, Func> rightDurationSelector, Func resultSelector) { return Join_(left, right, leftDurationSelector, rightDurationSelector, resultSelector); } private static IObservable Join_(IObservable left, IObservable right, Func> leftDurationSelector, Func> rightDurationSelector, Func resultSelector) { #if !NO_PERF return new Join(left, right, leftDurationSelector, rightDurationSelector, resultSelector); #else return new AnonymousObservable(observer => { var gate = new object(); var leftDone = false; var rightDone = false; var group = new CompositeDisposable(); var leftMap = new Dictionary(); var rightMap = new Dictionary(); var leftID = 0; var rightID = 0; group.Add(left.Subscribe( value => { var id = 0; lock (gate) { id = leftID++; leftMap.Add(id, value); } var md = new SingleAssignmentDisposable(); group.Add(md); Action expire = () => { lock (gate) { if (leftMap.Remove(id) && leftMap.Count == 0 && leftDone) observer.OnCompleted(); } group.Remove(md); }; var duration = default(IObservable); try { duration = leftDurationSelector(value); } catch (Exception exception) { observer.OnError(exception); return; } md.Disposable = duration.Take(1).Subscribe( _ => { }, error => { lock (gate) observer.OnError(error); }, expire); lock (gate) { foreach (var rightValue in rightMap.Values.ToArray()) { var result = default(TResult); try { result = resultSelector(value, rightValue); } catch (Exception exception) { observer.OnError(exception); return; } observer.OnNext(result); } } }, error => { lock (gate) observer.OnError(error); }, () => { lock (gate) { leftDone = true; if (rightDone || leftMap.Count == 0) observer.OnCompleted(); } })); group.Add(right.Subscribe( value => { var id = 0; lock (gate) { id = rightID++; rightMap.Add(id, value); } var md = new SingleAssignmentDisposable(); group.Add(md); Action expire = () => { lock (gate) { if (rightMap.Remove(id) && rightMap.Count == 0 && rightDone) observer.OnCompleted(); } group.Remove(md); }; var duration = default(IObservable); try { duration = rightDurationSelector(value); } catch (Exception exception) { observer.OnError(exception); return; } md.Disposable = duration.Take(1).Subscribe( _ => { }, error => { lock (gate) observer.OnError(error); }, expire); lock (gate) { foreach (var leftValue in leftMap.Values.ToArray()) { var result = default(TResult); try { result = resultSelector(leftValue, value); } catch (Exception exception) { observer.OnError(exception); return; } observer.OnNext(result); } } }, error => { lock (gate) observer.OnError(error); }, () => { lock (gate) { rightDone = true; if (leftDone || rightMap.Count == 0) observer.OnCompleted(); } })); return group; }); #endif } #endregion #region + OfType + public virtual IObservable OfType(IObservable source) { #if !NO_PERF return new OfType(source); #else return source.Where(x => x is TResult).Cast(); #endif } #endregion #region + Select + public virtual IObservable Select(IObservable source, Func selector) { #if !NO_PERF var select = source as Select; if (select != null) return select.Ω(selector); return new Select(source, selector); #else var s = source as SelectObservable; if (s != null) return s.Select(selector); return new SelectObservable(source, selector); #endif } #if NO_PERF abstract class SelectObservable : ObservableBase { public abstract IObservable Select(Func selector); } class SelectObservable : SelectObservable { private readonly IObservable _source; private readonly Func _selector; public SelectObservable(IObservable source, Func selector) { _source = source; _selector = selector; } protected override IDisposable SubscribeCore(IObserver observer) { return _source.Subscribe(new Observer(observer, _selector)); } public override IObservable Select(Func selector) { return new SelectObservable(_source, x => selector(_selector(x))); } class Observer : ObserverBase { private readonly IObserver _observer; private readonly Func _selector; public Observer(IObserver observer, Func selector) { _observer = observer; _selector = selector; } protected override void OnNextCore(TSource value) { TResult result; try { result = _selector(value); } catch (Exception exception) { _observer.OnError(exception); return; } _observer.OnNext(result); } protected override void OnErrorCore(Exception error) { _observer.OnError(error); } protected override void OnCompletedCore() { _observer.OnCompleted(); } } } #endif public virtual IObservable Select(IObservable source, Func selector) { #if !NO_PERF return new Select(source, selector); #else return Defer(() => { var index = 0; return source.Select(x => selector(x, checked(index++))); }); #endif } #endregion #region + SelectMany + public virtual IObservable SelectMany(IObservable source, IObservable other) { return SelectMany_(source, _ => other); } public virtual IObservable SelectMany(IObservable source, Func> selector) { return SelectMany_(source, selector); } #if !NO_TPL public virtual IObservable SelectMany(IObservable source, Func> selector) { return SelectMany_(source, x => selector(x).ToObservable()); } public virtual IObservable SelectMany(IObservable source, Func> selector) { return SelectMany_(source, x => FromAsync(ct => selector(x, ct))); } #endif public virtual IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { return SelectMany_(source, collectionSelector, resultSelector); } #if !NO_TPL public virtual IObservable SelectMany(IObservable source, Func> taskSelector, Func resultSelector) { return SelectMany_(source, x => taskSelector(x).ToObservable(), resultSelector); } public virtual IObservable SelectMany(IObservable source, Func> taskSelector, Func resultSelector) { return SelectMany_(source, x => FromAsync(ct => taskSelector(x, ct)), resultSelector); } #endif private static IObservable SelectMany_(IObservable source, Func> selector) { #if !NO_PERF return new SelectMany(source, selector); #else return source.Select(selector).Merge(); #endif } private static IObservable SelectMany_(IObservable source, Func> collectionSelector, Func resultSelector) { #if !NO_PERF return new SelectMany(source, collectionSelector, resultSelector); #else return SelectMany_(source, x => collectionSelector(x).Select(y => resultSelector(x, y))); #endif } public virtual IObservable SelectMany(IObservable source, Func> onNext, Func> onError, Func> onCompleted) { #if !NO_PERF return new SelectMany(source, onNext, onError, onCompleted); #else return source.Materialize().SelectMany(notification => { if (notification.Kind == NotificationKind.OnNext) return onNext(notification.Value); else if (notification.Kind == NotificationKind.OnError) return onError(notification.Exception); else return onCompleted(); }); #endif } public virtual IObservable SelectMany(IObservable source, Func> selector) { #if !NO_PERF return new SelectMany(source, selector); #else return SelectMany_(source, selector, (_, x) => x); #endif } public virtual IObservable SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { return SelectMany_(source, collectionSelector, resultSelector); } private static IObservable SelectMany_(IObservable source, Func> collectionSelector, Func resultSelector) { #if !NO_PERF return new SelectMany(source, collectionSelector, resultSelector); #else return new AnonymousObservable(observer => source.Subscribe( x => { var xs = default(IEnumerable); try { xs = collectionSelector(x); } catch (Exception exception) { observer.OnError(exception); return; } var e = xs.GetEnumerator(); try { var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = resultSelector(x, e.Current); } catch (Exception exception) { observer.OnError(exception); return; } if (hasNext) observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } }, observer.OnError, observer.OnCompleted ) ); #endif } #endregion #region + Skip + public virtual IObservable Skip(IObservable source, int count) { #if !NO_PERF var skip = source as Skip; if (skip != null && skip._scheduler == null) return skip.Ω(count); return new Skip(source, count); #else return new AnonymousObservable(observer => { var remaining = count; return source.Subscribe( x => { if (remaining <= 0) observer.OnNext(x); else remaining--; }, observer.OnError, observer.OnCompleted); }); #endif } #endregion #region + SkipWhile + public virtual IObservable SkipWhile(IObservable source, Func predicate) { #if !NO_PERF return new SkipWhile(source, predicate); #else return SkipWhile_(source, (x, i) => predicate(x)); #endif } public virtual IObservable SkipWhile(IObservable source, Func predicate) { #if !NO_PERF return new SkipWhile(source, predicate); #else return SkipWhile_(source, predicate); #endif } #if NO_PERF private static IObservable SkipWhile_(IObservable source, Func predicate) { return new AnonymousObservable(observer => { var running = false; var i = 0; return source.Subscribe( x => { if (!running) try { running = !predicate(x, checked(i++)); } catch (Exception exception) { observer.OnError(exception); return; } if (running) observer.OnNext(x); }, observer.OnError, observer.OnCompleted); }); } #endif #endregion #region + Take + public virtual IObservable Take(IObservable source, int count) { if (count == 0) return Empty(); return Take_(source, count); } public virtual IObservable Take(IObservable source, int count, IScheduler scheduler) { if (count == 0) return Empty(scheduler); return Take_(source, count); } #if !NO_PERF private static IObservable Take_(IObservable source, int count) { var take = source as Take; if (take != null && take._scheduler == null) return take.Ω(count); return new Take(source, count); } #else private static IObservable Take_(IObservable source, int count) { return new AnonymousObservable(observer => { var remaining = count; return source.Subscribe( x => { if (remaining > 0) { --remaining; observer.OnNext(x); if (remaining == 0) observer.OnCompleted(); } }, observer.OnError, observer.OnCompleted); }); } #endif #endregion #region + TakeWhile + public virtual IObservable TakeWhile(IObservable source, Func predicate) { #if !NO_PERF return new TakeWhile(source, predicate); #else return TakeWhile_(source, (x, i) => predicate(x)); #endif } public virtual IObservable TakeWhile(IObservable source, Func predicate) { #if !NO_PERF return new TakeWhile(source, predicate); #else return TakeWhile_(source, predicate); #endif } #if NO_PERF private static IObservable TakeWhile_(IObservable source, Func predicate) { return new AnonymousObservable(observer => { var running = true; var i = 0; return source.Subscribe( x => { if (running) { try { running = predicate(x, checked(i++)); } catch (Exception exception) { observer.OnError(exception); return; } if (running) observer.OnNext(x); else observer.OnCompleted(); } }, observer.OnError, observer.OnCompleted); }); } #endif #endregion #region + Where + public virtual IObservable Where(IObservable source, Func predicate) { #if !NO_PERF var where = source as Where; if (where != null) return where.Ω(predicate); return new Where(source, predicate); #else var w = source as WhereObservable; if (w != null) return w.Where(predicate); return new WhereObservable(source, predicate); #endif } #if NO_PERF class WhereObservable : ObservableBase { private readonly IObservable _source; private readonly Func _predicate; public WhereObservable(IObservable source, Func predicate) { _source = source; _predicate = predicate; } protected override IDisposable SubscribeCore(IObserver observer) { return _source.Subscribe(new Observer(observer, _predicate)); } public IObservable Where(Func predicate) { return new WhereObservable(_source, x => _predicate(x) && predicate(x)); } class Observer : ObserverBase { private readonly IObserver _observer; private readonly Func _predicate; public Observer(IObserver observer, Func predicate) { _observer = observer; _predicate = predicate; } protected override void OnNextCore(TSource value) { bool shouldRun; try { shouldRun = _predicate(value); } catch (Exception exception) { _observer.OnError(exception); return; } if (shouldRun) _observer.OnNext(value); } protected override void OnErrorCore(Exception error) { _observer.OnError(error); } protected override void OnCompletedCore() { _observer.OnCompleted(); } } } #endif public virtual IObservable Where(IObservable source, Func predicate) { #if !NO_PERF return new Where(source, predicate); #else return Defer(() => { var index = 0; return source.Where(x => predicate(x, checked(index++))); }); #endif } #endregion } }