// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region + Multicast + public virtual IConnectableObservable Multicast(IObservable source, ISubject subject) { return new ConnectableObservable(source, subject); } public virtual IObservable Multicast(IObservable source, Func> subjectSelector, Func, IObservable> selector) { return new Multicast(source, subjectSelector, selector); } #endregion #region + Publish + public virtual IConnectableObservable Publish(IObservable source) { return source.Multicast(new Subject()); } public virtual IObservable Publish(IObservable source, Func, IObservable> selector) { return source.Multicast(() => new Subject(), selector); } public virtual IConnectableObservable Publish(IObservable source, TSource initialValue) { return source.Multicast(new BehaviorSubject(initialValue)); } public virtual IObservable Publish(IObservable source, Func, IObservable> selector, TSource initialValue) { return source.Multicast(() => new BehaviorSubject(initialValue), selector); } #endregion #region + PublishLast + public virtual IConnectableObservable PublishLast(IObservable source) { return source.Multicast(new AsyncSubject()); } public virtual IObservable PublishLast(IObservable source, Func, IObservable> selector) { return source.Multicast(() => new AsyncSubject(), selector); } #endregion #region + RefCount + public virtual IObservable RefCount(IConnectableObservable source) { return new RefCount(source); } #endregion #region + Replay + public virtual IConnectableObservable Replay(IObservable source) { return source.Multicast(new ReplaySubject()); } public virtual IConnectableObservable Replay(IObservable source, IScheduler scheduler) { return source.Multicast(new ReplaySubject(scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector) { return source.Multicast(() => new ReplaySubject(), selector); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(scheduler), selector); } public virtual IConnectableObservable Replay(IObservable source, TimeSpan window) { return source.Multicast(new ReplaySubject(window)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, TimeSpan window) { return source.Multicast(() => new ReplaySubject(window), selector); } public virtual IConnectableObservable Replay(IObservable source, TimeSpan window, IScheduler scheduler) { return source.Multicast(new ReplaySubject(window, scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, TimeSpan window, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(window, scheduler), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize, IScheduler scheduler) { return source.Multicast(new ReplaySubject(bufferSize, scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(bufferSize, scheduler), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize) { return source.Multicast(new ReplaySubject(bufferSize)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize) { return source.Multicast(() => new ReplaySubject(bufferSize), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize, TimeSpan window) { return source.Multicast(new ReplaySubject(bufferSize, window)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, TimeSpan window) { return source.Multicast(() => new ReplaySubject(bufferSize, window), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize, TimeSpan window, IScheduler scheduler) { return source.Multicast(new ReplaySubject(bufferSize, window, scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, TimeSpan window, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(bufferSize, window, scheduler), selector); } #endregion } }