diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs | 1796 |
1 files changed, 1796 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs new file mode 100644 index 0000000..21f36f4 --- /dev/null +++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs @@ -0,0 +1,1796 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Subjects; + +#if !NO_TPL +using System.Reactive.Threading.Tasks; +using System.Threading; +using System.Threading.Tasks; +#endif + +namespace System.Reactive.Linq +{ + internal partial class QueryLanguage + { + #region FromAsyncPattern + + #region Func + + public virtual Func<IObservable<TResult>> FromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return () => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(iar => + { + // Note: Even if the callback completes synchronously, outgoing On* calls + // cannot throw in user code since there can't be any subscribers + // to the AsyncSubject yet. Therefore, there is no need to protect + // against exceptions that'd be caught below and sent (incorrectly) + // into the Observable.Throw sequence being constructed. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, IObservable<TResult>> FromAsyncPattern<T1, TResult>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return x => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, IObservable<TResult>> FromAsyncPattern<T1, T2, TResult>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + +#if !NO_LARGEARITY + public virtual Func<T1, T2, T3, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, TResult>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e, f) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, f, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e, f, g) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, f, g, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e, f, g, h) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, f, g, h, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e, f, g, h, i) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, f, g, h, i, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e, f, g, h, i, j) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, f, g, h, i, j, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end) + { + return (x, y, z, a, b, c, d, e, f, g, h, i, j, k) => + { + var subject = new AsyncSubject<TResult>(); + try + { + begin(x, y, z, a, b, c, d, e, f, g, h, i, j, k, iar => + { + // See remark on FromAsyncPattern<TResult>. + TResult result; + try + { + result = end(iar); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }, null); + } + catch (Exception exception) + { + return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions); + } + return subject.AsObservable(); + }; + } +#endif + + #endregion + + #region Action + + public virtual Func<IObservable<Unit>> FromAsyncPattern(Func<AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, IObservable<Unit>> FromAsyncPattern<T1>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, IObservable<Unit>> FromAsyncPattern<T1, T2>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + +#if !NO_LARGEARITY + public virtual Func<T1, T2, T3, IObservable<Unit>> FromAsyncPattern<T1, T2, T3>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end) + { + return FromAsyncPattern(begin, iar => + { + end(iar); + return Unit.Default; + }); + } +#endif + + #endregion + + #endregion + + #region Start[Async] + + #region Func + + public virtual IObservable<TSource> Start<TSource>(Func<TSource> function) + { + return ToAsync(function)(); + } + + public virtual IObservable<TSource> Start<TSource>(Func<TSource> function, IScheduler scheduler) + { + return ToAsync(function, scheduler)(); + } + +#if !NO_TPL + public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync) + { + var task = default(Task<TSource>); + try + { + task = functionAsync(); + } + catch (Exception exception) + { + return Throw<TSource>(exception); + } + + return task.ToObservable(); + } + + public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync) + { + var cancellable = new CancellationDisposable(); + + var task = default(Task<TSource>); + try + { + task = functionAsync(cancellable.Token); + } + catch (Exception exception) + { + return Throw<TSource>(exception); + } + + var result = task.ToObservable(); + + return new AnonymousObservable<TSource>(observer => + { + // + // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>. + // + var subscription = result.Subscribe/*Unsafe*/(observer); + return new CompositeDisposable(cancellable, subscription); + }); + } +#endif + + #endregion + + #region Action + + public virtual IObservable<Unit> Start(Action action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions)(); + } + + public virtual IObservable<Unit> Start(Action action, IScheduler scheduler) + { + return ToAsync(action, scheduler)(); + } + +#if !NO_TPL + public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync) + { + var task = default(Task); + try + { + task = actionAsync(); + } + catch (Exception exception) + { + return Throw<Unit>(exception); + } + + return task.ToObservable(); + } + + public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync) + { + var cancellable = new CancellationDisposable(); + + var task = default(Task); + try + { + task = actionAsync(cancellable.Token); + } + catch (Exception exception) + { + return Throw<Unit>(exception); + } + + var result = task.ToObservable(); + + return new AnonymousObservable<Unit>(observer => + { + // + // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>. + // + var subscription = result.Subscribe/*Unsafe*/(observer); + return new CompositeDisposable(cancellable, subscription); + }); + } +#endif + + #endregion + + #endregion + + #region FromAsync + +#if !NO_TPL + + #region Func + + public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync) + { + return Defer(() => StartAsync(functionAsync)); + } + + public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync) + { + return Defer(() => StartAsync(functionAsync)); + } + + #endregion + + #region Action + + public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync) + { + return Defer(() => StartAsync(actionAsync)); + } + + public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync) + { + return Defer(() => StartAsync(actionAsync)); + } + + #endregion + +#endif + + #endregion + + #region ToAsync + + #region Func + + public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function, IScheduler scheduler) + { + return () => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function, IScheduler scheduler) + { + return (first) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function, IScheduler scheduler) + { + return (first, second) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function, IScheduler scheduler) + { + return (first, second, third) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + +#if !NO_LARGEARITY + + public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function) + { + return ToAsync(function, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) => + { + var subject = new AsyncSubject<TResult>(); + scheduler.Schedule(() => + { + var result = default(TResult); + try + { + result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(result); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + +#endif + + #endregion + + #region Action + + public virtual Func<IObservable<Unit>> ToAsync(Action action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<IObservable<Unit>> ToAsync(Action action, IScheduler scheduler) + { + return () => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + + return subject.AsObservable(); + }; + } + + public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action, IScheduler scheduler) + { + return (first) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action, IScheduler scheduler) + { + return (first, second) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action, IScheduler scheduler) + { + return (first, second, third) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action, IScheduler scheduler) + { + return (first, second, third, fourth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + +#if !NO_LARGEARITY + + public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eight) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eight); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action) + { + return ToAsync(action, SchedulerDefaults.AsyncConversions); + } + + public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action, IScheduler scheduler) + { + return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) => + { + var subject = new AsyncSubject<Unit>(); + scheduler.Schedule(() => + { + try + { + action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth); + } + catch (Exception exception) + { + subject.OnError(exception); + return; + } + subject.OnNext(Unit.Default); + subject.OnCompleted(); + }); + return subject.AsObservable(); + }; + } + +#endif + + #endregion + + #endregion + } +} |