// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Joins; namespace System.Reactive.Linq { internal partial class QueryLanguage { #region And public virtual Pattern And(IObservable left, IObservable right) { return new Pattern(left, right); } #endregion #region Then public virtual Plan Then(IObservable source, Func selector) { return new Pattern(source).Then(selector); } #endregion #region When public virtual IObservable When(params Plan[] plans) { return When((IEnumerable>)plans); } public virtual IObservable When(IEnumerable> plans) { return new AnonymousObservable(observer => { var externalSubscriptions = new Dictionary(); var gate = new object(); var activePlans = new List(); var outObserver = Observer.Create(observer.OnNext, exception => { foreach (var po in externalSubscriptions.Values) { po.Dispose(); } observer.OnError(exception); }, observer.OnCompleted); try { foreach (var plan in plans) activePlans.Add(plan.Activate(externalSubscriptions, outObserver, activePlan => { activePlans.Remove(activePlan); if (activePlans.Count == 0) outObserver.OnCompleted(); })); } catch (Exception e) { // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // return Throw(e).Subscribe/*Unsafe*/(observer); } var group = new CompositeDisposable(externalSubscriptions.Values.Count); foreach (var joinObserver in externalSubscriptions.Values) { joinObserver.Subscribe(gate); group.Add(joinObserver); } return group; }); } #endregion } }