diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs | 462 |
1 files changed, 462 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs new file mode 100644 index 0000000..4aa25b2 --- /dev/null +++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Creation.cs @@ -0,0 +1,462 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Threading; +using System.Linq; + +#if !NO_TPL +using System.Reactive.Threading.Tasks; +using System.Threading.Tasks; +#endif + +namespace System.Reactive.Linq +{ +#if !NO_PERF + using Observαble; +#endif + + internal partial class QueryLanguage + { + #region - Create - + + public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) + { + return new AnonymousObservable<TSource>(subscribe); + } + + public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe) + { + return new AnonymousObservable<TSource>(o => + { + var a = subscribe(o); + return a != null ? Disposable.Create(a) : Disposable.Empty; + }); + } + + #endregion + + #region - CreateAsync - + +#if !NO_TPL + public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync) + { + return new AnonymousObservable<TResult>(observer => + { + var cancellable = new CancellationDisposable(); + + var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); + var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted); + var subscription = taskObservable.Subscribe(taskCompletionObserver); + + return new CompositeDisposable(cancellable, subscription); + }); + } + + public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync) + { + return Create<TResult>((observer, token) => subscribeAsync(observer)); + } + + public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync) + { + return new AnonymousObservable<TResult>(observer => + { + var subscription = new SingleAssignmentDisposable(); + var cancellable = new CancellationDisposable(); + + var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); + var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop); + + // + // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. + // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free. + // + taskObservable.Subscribe(taskCompletionObserver); + + return new CompositeDisposable(cancellable, subscription); + }); + } + + public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync) + { + return Create<TResult>((observer, token) => subscribeAsync(observer)); + } + + public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync) + { + return new AnonymousObservable<TResult>(observer => + { + var subscription = new SingleAssignmentDisposable(); + var cancellable = new CancellationDisposable(); + + var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); + var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop); + + // + // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed. + // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free. + // + taskObservable.Subscribe(taskCompletionObserver); + + return new CompositeDisposable(cancellable, subscription); + }); + } + + public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync) + { + return Create<TResult>((observer, token) => subscribeAsync(observer)); + } +#endif + + #endregion + + #region + Defer + + + public virtual IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory) + { +#if !NO_PERF + return new Defer<TValue>(observableFactory); +#else + return new AnonymousObservable<TValue>(observer => + { + IObservable<TValue> result; + try + { + result = observableFactory(); + } + catch (Exception exception) + { + return Throw<TValue>(exception).Subscribe(observer); + } + + return result.Subscribe(observer); + }); +#endif + } + + #endregion + + #region + DeferAsync + + +#if !NO_TPL + public virtual IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync) + { + return Defer(() => StartAsync(observableFactoryAsync).Merge()); + } + + public virtual IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync) + { + return Defer(() => StartAsync(observableFactoryAsync).Merge()); + } +#endif + + #endregion + + #region + Empty + + + public virtual IObservable<TResult> Empty<TResult>() + { +#if !NO_PERF + return new Empty<TResult>(SchedulerDefaults.ConstantTimeOperations); +#else + return Empty_<TResult>(SchedulerDefaults.ConstantTimeOperations); +#endif + } + + public virtual IObservable<TResult> Empty<TResult>(IScheduler scheduler) + { +#if !NO_PERF + return new Empty<TResult>(scheduler); +#else + return Empty_<TResult>(scheduler); +#endif + } + +#if NO_PERF + private static IObservable<TResult> Empty_<TResult>(IScheduler scheduler) + { + return new AnonymousObservable<TResult>(observer => scheduler.Schedule(observer.OnCompleted)); + } +#endif + + #endregion + + #region + Generate + + + public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector) + { +#if !NO_PERF + return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); +#else + return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); +#endif + } + + public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler) + { +#if !NO_PERF + return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler); +#else + return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler); +#endif + } + +#if NO_PERF + private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler) + { + return new AnonymousObservable<TResult>(observer => + { + var state = initialState; + var first = true; + return scheduler.Schedule(self => + { + var hasResult = false; + var result = default(TResult); + try + { + if (first) + first = false; + else + state = iterate(state); + hasResult = condition(state); + if (hasResult) + result = resultSelector(state); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + + if (hasResult) + { + observer.OnNext(result); + self(); + } + else + observer.OnCompleted(); + }); + }); + } +#endif + + #endregion + + #region + Never + + + public virtual IObservable<TResult> Never<TResult>() + { +#if !NO_PERF + return new Never<TResult>(); +#else + return new AnonymousObservable<TResult>(observer => Disposable.Empty); +#endif + } + + #endregion + + #region + Range + + + public virtual IObservable<int> Range(int start, int count) + { + return Range_(start, count, SchedulerDefaults.Iteration); + } + + public virtual IObservable<int> Range(int start, int count, IScheduler scheduler) + { + return Range_(start, count, scheduler); + } + + private static IObservable<int> Range_(int start, int count, IScheduler scheduler) + { +#if !NO_PERF + return new Range(start, count, scheduler); +#else + return new AnonymousObservable<int>(observer => + { + return scheduler.Schedule(0, (i, self) => + { + if (i < count) + { + observer.OnNext(start + i); + self(i + 1); + } + else + observer.OnCompleted(); + }); + }); +#endif + } + + #endregion + + #region + Repeat + + + public virtual IObservable<TResult> Repeat<TResult>(TResult value) + { +#if !NO_PERF + return new Repeat<TResult>(value, null, SchedulerDefaults.Iteration); +#else + return Repeat_(value, SchedulerDefaults.Iteration); +#endif + } + + public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler) + { +#if !NO_PERF + return new Repeat<TResult>(value, null, scheduler); +#else + return Repeat_<TResult>(value, scheduler); +#endif + } + +#if NO_PERF + private IObservable<TResult> Repeat_<TResult>(TResult value, IScheduler scheduler) + { + return Return(value, scheduler).Repeat(); + } +#endif + + public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount) + { +#if !NO_PERF + return new Repeat<TResult>(value, repeatCount, SchedulerDefaults.Iteration); +#else + return Repeat_(value, repeatCount, SchedulerDefaults.Iteration); +#endif + } + + public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler) + { +#if !NO_PERF + return new Repeat<TResult>(value, repeatCount, scheduler); +#else + return Repeat_(value, repeatCount, scheduler); +#endif + } + +#if NO_PERF + private IObservable<TResult> Repeat_<TResult>(TResult value, int repeatCount, IScheduler scheduler) + { + return Return(value, scheduler).Repeat(repeatCount); + } +#endif + + #endregion + + #region + Return + + + public virtual IObservable<TResult> Return<TResult>(TResult value) + { +#if !NO_PERF + return new Return<TResult>(value, SchedulerDefaults.ConstantTimeOperations); +#else + return Return_<TResult>(value, SchedulerDefaults.ConstantTimeOperations); +#endif + } + + public virtual IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler) + { +#if !NO_PERF + return new Return<TResult>(value, scheduler); +#else + return Return_<TResult>(value, scheduler); +#endif + } + +#if NO_PERF + private static IObservable<TResult> Return_<TResult>(TResult value, IScheduler scheduler) + { + return new AnonymousObservable<TResult>(observer => + scheduler.Schedule(() => + { + observer.OnNext(value); + observer.OnCompleted(); + }) + ); + } +#endif + + #endregion + + #region + Throw + + + public virtual IObservable<TResult> Throw<TResult>(Exception exception) + { +#if !NO_PERF + return new Throw<TResult>(exception, SchedulerDefaults.ConstantTimeOperations); +#else + return Throw_<TResult>(exception, SchedulerDefaults.ConstantTimeOperations); +#endif + } + + public virtual IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler) + { +#if !NO_PERF + return new Throw<TResult>(exception, scheduler); +#else + return Throw_<TResult>(exception, scheduler); +#endif + } + +#if NO_PERF + private static IObservable<TResult> Throw_<TResult>(Exception exception, IScheduler scheduler) + { + return new AnonymousObservable<TResult>(observer => scheduler.Schedule(() => observer.OnError(exception))); + } +#endif + + #endregion + + #region + Using + + + public virtual IObservable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IObservable<TSource>> observableFactory) where TResource : IDisposable + { +#if !NO_PERF + return new Using<TSource, TResource>(resourceFactory, observableFactory); +#else + return new AnonymousObservable<TSource>(observer => + { + var source = default(IObservable<TSource>); + var disposable = Disposable.Empty; + try + { + var resource = resourceFactory(); + if (resource != null) + disposable = resource; + source = observableFactory(resource); + } + catch (Exception exception) + { + return new CompositeDisposable(Throw<TSource>(exception).Subscribe(observer), disposable); + } + + return new CompositeDisposable(source.Subscribe(observer), disposable); + }); +#endif + } + + #endregion + + #region - UsingAsync - + +#if !NO_TPL + + public virtual IObservable<TSource> Using<TSource, TResource>(Func<CancellationToken, Task<TResource>> resourceFactoryAsync, Func<TResource, CancellationToken, Task<IObservable<TSource>>> observableFactoryAsync) where TResource : IDisposable + { + return Observable.FromAsync<TResource>(resourceFactoryAsync) + .SelectMany(resource => + Observable.Using<TSource, TResource>( + () => resource, + resource_ => Observable.FromAsync<IObservable<TSource>>(ct => observableFactoryAsync(resource_, ct)).Merge() + ) + ); + } + +#endif + + #endregion + } +} |