Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@veritas-vos-liberabit.com>2013-01-22 12:25:22 +0400
committerAtsushi Eno <atsushieno@veritas-vos-liberabit.com>2013-01-22 12:25:22 +0400
commitcde9fc6a8fe569203cb991121a35c2a9c7f4c420 (patch)
tree8633a637be4973b221d9c7e9378af5e0a08b5670 /Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs
parent8911e1d3f169a0e378b4e237926269d9218c8fd3 (diff)
import 2b5dbddd740b, new directory structure in the original rx.
Diffstat (limited to 'Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs')
-rw-r--r--Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs101
1 files changed, 101 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs b/Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs
new file mode 100644
index 0000000..3475817
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs
@@ -0,0 +1,101 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if HAS_WINRT
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+using Windows.Foundation;
+
+namespace System.Reactive.Windows.Foundation
+{
+ class AsyncInfoToObservableBridge<TResult, TProgress> : ObservableBase<TResult>
+ {
+ private readonly Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> _onCompleted;
+ private readonly Func<IAsyncInfo, TResult> _getResult;
+ private readonly AsyncSubject<TResult> _subject;
+
+ public AsyncInfoToObservableBridge(IAsyncInfo info, Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> onCompleted, Func<IAsyncInfo, TResult> getResult, Action<IAsyncInfo, Action<IAsyncInfo, TProgress>> onProgress, IProgress<TProgress> progress, bool multiValue)
+ {
+ _onCompleted = onCompleted;
+ _getResult = getResult;
+
+ _subject = new AsyncSubject<TResult>();
+
+ if (onProgress != null)
+ {
+ onProgress(info, (iai, p) =>
+ {
+ if (multiValue && getResult != null)
+ _subject.OnNext(getResult(iai));
+
+ if (progress != null)
+ progress.Report(p);
+ });
+ }
+
+ Done(info, info.Status, true);
+ }
+
+ private void Done(IAsyncInfo info, AsyncStatus status, bool initial)
+ {
+ var error = default(Exception);
+ var result = default(TResult);
+
+ //
+ // Initial interactions with the IAsyncInfo object. Those could fail, which indicates
+ // a rogue implementation. Failure is just propagated out.
+ //
+ switch (status)
+ {
+ case AsyncStatus.Error:
+ error = info.ErrorCode;
+ if (error == null)
+ throw new InvalidOperationException("The asynchronous operation failed with a null error code.");
+ break;
+ case AsyncStatus.Canceled:
+ error = new OperationCanceledException();
+ break;
+ case AsyncStatus.Completed:
+ if (_getResult != null)
+ result = _getResult(info);
+ break;
+ default:
+ if (!initial)
+ throw new InvalidOperationException("The asynchronous operation completed unexpectedly.");
+
+ _onCompleted(info, (iai, s) => Done(iai, s, false));
+ return;
+ }
+
+ //
+ // Close as early as possible, before running continuations which could fail. In case of
+ // failure above, we don't close out the object in order to allow for debugging of the
+ // rogue implementation without losing state prematurely. Notice _getResults is merely
+ // an indirect call to the appropriate GetResults method, which is not supposed to throw.
+ // Instead, an Error status should be returned.
+ //
+ info.Close();
+
+ //
+ // Now we run the continuations, which could take a long time. Failure here is catastrophic
+ // and under control of the upstream subscriber.
+ //
+ if (error != null)
+ {
+ _subject.OnError(error);
+ }
+ else
+ {
+ if (_getResult != null)
+ _subject.OnNext(result);
+
+ _subject.OnCompleted();
+ }
+ }
+
+ protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+ {
+ return _subject.Subscribe(observer);
+ }
+ }
+}
+#endif \ No newline at end of file