Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs')
-rw-r--r--Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs1796
1 files changed, 1796 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs
new file mode 100644
index 0000000..21f36f4
--- /dev/null
+++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs
@@ -0,0 +1,1796 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+
+#if !NO_TPL
+using System.Reactive.Threading.Tasks;
+using System.Threading;
+using System.Threading.Tasks;
+#endif
+
+namespace System.Reactive.Linq
+{
+ internal partial class QueryLanguage
+ {
+ #region FromAsyncPattern
+
+ #region Func
+
+ public virtual Func<IObservable<TResult>> FromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return () =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(iar =>
+ {
+ // Note: Even if the callback completes synchronously, outgoing On* calls
+ // cannot throw in user code since there can't be any subscribers
+ // to the AsyncSubject yet. Therefore, there is no need to protect
+ // against exceptions that'd be caught below and sent (incorrectly)
+ // into the Observable.Throw sequence being constructed.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, IObservable<TResult>> FromAsyncPattern<T1, TResult>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return x =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, IObservable<TResult>> FromAsyncPattern<T1, T2, TResult>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+#if !NO_LARGEARITY
+ public virtual Func<T1, T2, T3, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, TResult>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e, f) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, f, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e, f, g) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, f, g, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e, f, g, h) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, f, g, h, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e, f, g, h, i) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, f, g, h, i, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e, f, g, h, i, j) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, f, g, h, i, j, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
+ {
+ return (x, y, z, a, b, c, d, e, f, g, h, i, j, k) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ try
+ {
+ begin(x, y, z, a, b, c, d, e, f, g, h, i, j, k, iar =>
+ {
+ // See remark on FromAsyncPattern<TResult>.
+ TResult result;
+ try
+ {
+ result = end(iar);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ }, null);
+ }
+ catch (Exception exception)
+ {
+ return Observable.Throw<TResult>(exception, SchedulerDefaults.AsyncConversions);
+ }
+ return subject.AsObservable();
+ };
+ }
+#endif
+
+ #endregion
+
+ #region Action
+
+ public virtual Func<IObservable<Unit>> FromAsyncPattern(Func<AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, IObservable<Unit>> FromAsyncPattern<T1>(Func<T1, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, IObservable<Unit>> FromAsyncPattern<T1, T2>(Func<T1, T2, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+#if !NO_LARGEARITY
+ public virtual Func<T1, T2, T3, IObservable<Unit>> FromAsyncPattern<T1, T2, T3>(Func<T1, T2, T3, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4>(Func<T1, T2, T3, T4, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5>(Func<T1, T2, T3, T4, T5, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6>(Func<T1, T2, T3, T4, T5, T6, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7>(Func<T1, T2, T3, T4, T5, T6, T7, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8>(Func<T1, T2, T3, T4, T5, T6, T7, T8, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> FromAsyncPattern<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end)
+ {
+ return FromAsyncPattern(begin, iar =>
+ {
+ end(iar);
+ return Unit.Default;
+ });
+ }
+#endif
+
+ #endregion
+
+ #endregion
+
+ #region Start[Async]
+
+ #region Func
+
+ public virtual IObservable<TSource> Start<TSource>(Func<TSource> function)
+ {
+ return ToAsync(function)();
+ }
+
+ public virtual IObservable<TSource> Start<TSource>(Func<TSource> function, IScheduler scheduler)
+ {
+ return ToAsync(function, scheduler)();
+ }
+
+#if !NO_TPL
+ public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync)
+ {
+ var task = default(Task<TSource>);
+ try
+ {
+ task = functionAsync();
+ }
+ catch (Exception exception)
+ {
+ return Throw<TSource>(exception);
+ }
+
+ return task.ToObservable();
+ }
+
+ public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync)
+ {
+ var cancellable = new CancellationDisposable();
+
+ var task = default(Task<TSource>);
+ try
+ {
+ task = functionAsync(cancellable.Token);
+ }
+ catch (Exception exception)
+ {
+ return Throw<TSource>(exception);
+ }
+
+ var result = task.ToObservable();
+
+ return new AnonymousObservable<TSource>(observer =>
+ {
+ //
+ // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
+ //
+ var subscription = result.Subscribe/*Unsafe*/(observer);
+ return new CompositeDisposable(cancellable, subscription);
+ });
+ }
+#endif
+
+ #endregion
+
+ #region Action
+
+ public virtual IObservable<Unit> Start(Action action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions)();
+ }
+
+ public virtual IObservable<Unit> Start(Action action, IScheduler scheduler)
+ {
+ return ToAsync(action, scheduler)();
+ }
+
+#if !NO_TPL
+ public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync)
+ {
+ var task = default(Task);
+ try
+ {
+ task = actionAsync();
+ }
+ catch (Exception exception)
+ {
+ return Throw<Unit>(exception);
+ }
+
+ return task.ToObservable();
+ }
+
+ public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync)
+ {
+ var cancellable = new CancellationDisposable();
+
+ var task = default(Task);
+ try
+ {
+ task = actionAsync(cancellable.Token);
+ }
+ catch (Exception exception)
+ {
+ return Throw<Unit>(exception);
+ }
+
+ var result = task.ToObservable();
+
+ return new AnonymousObservable<Unit>(observer =>
+ {
+ //
+ // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
+ //
+ var subscription = result.Subscribe/*Unsafe*/(observer);
+ return new CompositeDisposable(cancellable, subscription);
+ });
+ }
+#endif
+
+ #endregion
+
+ #endregion
+
+ #region FromAsync
+
+#if !NO_TPL
+
+ #region Func
+
+ public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync)
+ {
+ return Defer(() => StartAsync(functionAsync));
+ }
+
+ public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync)
+ {
+ return Defer(() => StartAsync(functionAsync));
+ }
+
+ #endregion
+
+ #region Action
+
+ public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync)
+ {
+ return Defer(() => StartAsync(actionAsync));
+ }
+
+ public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync)
+ {
+ return Defer(() => StartAsync(actionAsync));
+ }
+
+ #endregion
+
+#endif
+
+ #endregion
+
+ #region ToAsync
+
+ #region Func
+
+ public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function, IScheduler scheduler)
+ {
+ return () =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function();
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T, IObservable<TResult>> ToAsync<T, TResult>(Func<T, TResult> function, IScheduler scheduler)
+ {
+ return (first) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, IObservable<TResult>> ToAsync<T1, T2, TResult>(Func<T1, T2, TResult> function, IScheduler scheduler)
+ {
+ return (first, second) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, IObservable<TResult>> ToAsync<T1, T2, T3, TResult>(Func<T1, T2, T3, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, IObservable<TResult>> ToAsync<T1, T2, T3, T4, TResult>(Func<T1, T2, T3, T4, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+#if !NO_LARGEARITY
+
+ public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, TResult>(Func<T1, T2, T3, T4, T5, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, TResult>(Func<T1, T2, T3, T4, T5, T6, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function)
+ {
+ return ToAsync(function, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<TResult>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult>(Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, TResult> function, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
+ {
+ var subject = new AsyncSubject<TResult>();
+ scheduler.Schedule(() =>
+ {
+ var result = default(TResult);
+ try
+ {
+ result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(result);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+#endif
+
+ #endregion
+
+ #region Action
+
+ public virtual Func<IObservable<Unit>> ToAsync(Action action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<IObservable<Unit>> ToAsync(Action action, IScheduler scheduler)
+ {
+ return () =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action();
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<TSource, IObservable<Unit>> ToAsync<TSource>(Action<TSource> action, IScheduler scheduler)
+ {
+ return (first) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, IObservable<Unit>> ToAsync<T1, T2>(Action<T1, T2> action, IScheduler scheduler)
+ {
+ return (first, second) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, IObservable<Unit>> ToAsync<T1, T2, T3>(Action<T1, T2, T3> action, IScheduler scheduler)
+ {
+ return (first, second, third) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, IObservable<Unit>> ToAsync<T1, T2, T3, T4>(Action<T1, T2, T3, T4> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+#if !NO_LARGEARITY
+
+ public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5>(Action<T1, T2, T3, T4, T5> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6>(Action<T1, T2, T3, T4, T5, T6> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7>(Action<T1, T2, T3, T4, T5, T6, T7> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eight);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action)
+ {
+ return ToAsync(action, SchedulerDefaults.AsyncConversions);
+ }
+
+ public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(Action<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> action, IScheduler scheduler)
+ {
+ return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
+ {
+ var subject = new AsyncSubject<Unit>();
+ scheduler.Schedule(() =>
+ {
+ try
+ {
+ action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
+ }
+ catch (Exception exception)
+ {
+ subject.OnError(exception);
+ return;
+ }
+ subject.OnNext(Unit.Default);
+ subject.OnCompleted();
+ });
+ return subject.AsObservable();
+ };
+ }
+
+#endif
+
+ #endregion
+
+ #endregion
+ }
+}