// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; using System.Linq; #if !NO_TPL using System.Reactive.Threading.Tasks; using System.Threading.Tasks; #endif namespace System.Reactive.Linq { #if !NO_PERF using ObservableImpl; #endif internal partial class QueryLanguage { #region - Create - public virtual IObservable Create(Func, IDisposable> subscribe) { return new AnonymousObservable(subscribe); } public virtual IObservable Create(Func, Action> subscribe) { return new AnonymousObservable(o => { var a = subscribe(o); return a != null ? Disposable.Create(a) : Disposable.Empty; }); } #endregion #region - CreateAsync - #if !NO_TPL public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new AnonymousObservable(observer => { var cancellable = new CancellationDisposable(); var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new AnonymousObserver(Stubs.Ignore, observer.OnError, observer.OnCompleted); var subscription = taskObservable.Subscribe(taskCompletionObserver); return new CompositeDisposable(cancellable, subscription); }); } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new AnonymousObservable(observer => { var subscription = new SingleAssignmentDisposable(); var cancellable = new CancellationDisposable(); var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new AnonymousObserver(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop); // // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // taskObservable.Subscribe(taskCompletionObserver); return new CompositeDisposable(cancellable, subscription); }); } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new AnonymousObservable(observer => { var subscription = new SingleAssignmentDisposable(); var cancellable = new CancellationDisposable(); var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new AnonymousObserver(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop); // // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed. // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // taskObservable.Subscribe(taskCompletionObserver); return new CompositeDisposable(cancellable, subscription); }); } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } #endif #endregion #region + Defer + public virtual IObservable Defer(Func> observableFactory) { #if !NO_PERF return new Defer(observableFactory); #else return new AnonymousObservable(observer => { IObservable result; try { result = observableFactory(); } catch (Exception exception) { return Throw(exception).Subscribe(observer); } return result.Subscribe(observer); }); #endif } #endregion #region + DeferAsync + #if !NO_TPL public virtual IObservable Defer(Func>> observableFactoryAsync) { return Defer(() => StartAsync(observableFactoryAsync).Merge()); } public virtual IObservable Defer(Func>> observableFactoryAsync) { return Defer(() => StartAsync(observableFactoryAsync).Merge()); } #endif #endregion #region + Empty + public virtual IObservable Empty() { #if !NO_PERF return new Empty(SchedulerDefaults.ConstantTimeOperations); #else return Empty_(SchedulerDefaults.ConstantTimeOperations); #endif } public virtual IObservable Empty(IScheduler scheduler) { #if !NO_PERF return new Empty(scheduler); #else return Empty_(scheduler); #endif } #if NO_PERF private static IObservable Empty_(IScheduler scheduler) { return new AnonymousObservable(observer => scheduler.Schedule(observer.OnCompleted)); } #endif #endregion #region + Generate + public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector) { #if !NO_PERF return new Generate(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); #else return Generate_(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); #endif } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, IScheduler scheduler) { #if !NO_PERF return new Generate(initialState, condition, iterate, resultSelector, scheduler); #else return Generate_(initialState, condition, iterate, resultSelector, scheduler); #endif } #if NO_PERF private static IObservable Generate_(TState initialState, Func condition, Func iterate, Func resultSelector, IScheduler scheduler) { return new AnonymousObservable(observer => { var state = initialState; var first = true; return scheduler.Schedule(self => { var hasResult = false; var result = default(TResult); try { if (first) first = false; else state = iterate(state); hasResult = condition(state); if (hasResult) result = resultSelector(state); } catch (Exception exception) { observer.OnError(exception); return; } if (hasResult) { observer.OnNext(result); self(); } else observer.OnCompleted(); }); }); } #endif #endregion #region + Never + public virtual IObservable Never() { #if !NO_PERF return new Never(); #else return new AnonymousObservable(observer => Disposable.Empty); #endif } #endregion #region + Range + public virtual IObservable Range(int start, int count) { return Range_(start, count, SchedulerDefaults.Iteration); } public virtual IObservable Range(int start, int count, IScheduler scheduler) { return Range_(start, count, scheduler); } private static IObservable Range_(int start, int count, IScheduler scheduler) { #if !NO_PERF return new Range(start, count, scheduler); #else return new AnonymousObservable(observer => { return scheduler.Schedule(0, (i, self) => { if (i < count) { observer.OnNext(start + i); self(i + 1); } else observer.OnCompleted(); }); }); #endif } #endregion #region + Repeat + public virtual IObservable Repeat(TResult value) { #if !NO_PERF return new Repeat(value, null, SchedulerDefaults.Iteration); #else return Repeat_(value, SchedulerDefaults.Iteration); #endif } public virtual IObservable Repeat(TResult value, IScheduler scheduler) { #if !NO_PERF return new Repeat(value, null, scheduler); #else return Repeat_(value, scheduler); #endif } #if NO_PERF private IObservable Repeat_(TResult value, IScheduler scheduler) { return Return(value, scheduler).Repeat(); } #endif public virtual IObservable Repeat(TResult value, int repeatCount) { #if !NO_PERF return new Repeat(value, repeatCount, SchedulerDefaults.Iteration); #else return Repeat_(value, repeatCount, SchedulerDefaults.Iteration); #endif } public virtual IObservable Repeat(TResult value, int repeatCount, IScheduler scheduler) { #if !NO_PERF return new Repeat(value, repeatCount, scheduler); #else return Repeat_(value, repeatCount, scheduler); #endif } #if NO_PERF private IObservable Repeat_(TResult value, int repeatCount, IScheduler scheduler) { return Return(value, scheduler).Repeat(repeatCount); } #endif #endregion #region + Return + public virtual IObservable Return(TResult value) { #if !NO_PERF return new Return(value, SchedulerDefaults.ConstantTimeOperations); #else return Return_(value, SchedulerDefaults.ConstantTimeOperations); #endif } public virtual IObservable Return(TResult value, IScheduler scheduler) { #if !NO_PERF return new Return(value, scheduler); #else return Return_(value, scheduler); #endif } #if NO_PERF private static IObservable Return_(TResult value, IScheduler scheduler) { return new AnonymousObservable(observer => scheduler.Schedule(() => { observer.OnNext(value); observer.OnCompleted(); }) ); } #endif #endregion #region + Throw + public virtual IObservable Throw(Exception exception) { #if !NO_PERF return new Throw(exception, SchedulerDefaults.ConstantTimeOperations); #else return Throw_(exception, SchedulerDefaults.ConstantTimeOperations); #endif } public virtual IObservable Throw(Exception exception, IScheduler scheduler) { #if !NO_PERF return new Throw(exception, scheduler); #else return Throw_(exception, scheduler); #endif } #if NO_PERF private static IObservable Throw_(Exception exception, IScheduler scheduler) { return new AnonymousObservable(observer => scheduler.Schedule(() => observer.OnError(exception))); } #endif #endregion #region + Using + public virtual IObservable Using(Func resourceFactory, Func> observableFactory) where TResource : IDisposable { #if !NO_PERF return new Using(resourceFactory, observableFactory); #else return new AnonymousObservable(observer => { var source = default(IObservable); var disposable = Disposable.Empty; try { var resource = resourceFactory(); if (resource != null) disposable = resource; source = observableFactory(resource); } catch (Exception exception) { return new CompositeDisposable(Throw(exception).Subscribe(observer), disposable); } return new CompositeDisposable(source.Subscribe(observer), disposable); }); #endif } #endregion #region - UsingAsync - #if !NO_TPL public virtual IObservable Using(Func> resourceFactoryAsync, Func>> observableFactoryAsync) where TResource : IDisposable { return Observable.FromAsync(resourceFactoryAsync) .SelectMany(resource => Observable.Using( () => resource, resource_ => Observable.FromAsync>(ct => observableFactoryAsync(resource_, ct)).Merge() ) ); } #endif #endregion } }