diff options
author | Atsushi Eno <atsushieno@veritas-vos-liberabit.com> | 2013-01-22 12:25:22 +0400 |
---|---|---|
committer | Atsushi Eno <atsushieno@veritas-vos-liberabit.com> | 2013-01-22 12:25:22 +0400 |
commit | cde9fc6a8fe569203cb991121a35c2a9c7f4c420 (patch) | |
tree | 8633a637be4973b221d9c7e9378af5e0a08b5670 /Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs | |
parent | 8911e1d3f169a0e378b4e237926269d9218c8fd3 (diff) |
import 2b5dbddd740b, new directory structure in the original rx.
Diffstat (limited to 'Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs')
-rw-r--r-- | Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs b/Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs new file mode 100644 index 0000000..3475817 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.WindowsRuntime/Foundation/AsyncInfoToObservableBridge.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if HAS_WINRT +using System.Reactive.Disposables; +using System.Reactive.Subjects; +using Windows.Foundation; + +namespace System.Reactive.Windows.Foundation +{ + class AsyncInfoToObservableBridge<TResult, TProgress> : ObservableBase<TResult> + { + private readonly Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> _onCompleted; + private readonly Func<IAsyncInfo, TResult> _getResult; + private readonly AsyncSubject<TResult> _subject; + + public AsyncInfoToObservableBridge(IAsyncInfo info, Action<IAsyncInfo, Action<IAsyncInfo, AsyncStatus>> onCompleted, Func<IAsyncInfo, TResult> getResult, Action<IAsyncInfo, Action<IAsyncInfo, TProgress>> onProgress, IProgress<TProgress> progress, bool multiValue) + { + _onCompleted = onCompleted; + _getResult = getResult; + + _subject = new AsyncSubject<TResult>(); + + if (onProgress != null) + { + onProgress(info, (iai, p) => + { + if (multiValue && getResult != null) + _subject.OnNext(getResult(iai)); + + if (progress != null) + progress.Report(p); + }); + } + + Done(info, info.Status, true); + } + + private void Done(IAsyncInfo info, AsyncStatus status, bool initial) + { + var error = default(Exception); + var result = default(TResult); + + // + // Initial interactions with the IAsyncInfo object. Those could fail, which indicates + // a rogue implementation. Failure is just propagated out. + // + switch (status) + { + case AsyncStatus.Error: + error = info.ErrorCode; + if (error == null) + throw new InvalidOperationException("The asynchronous operation failed with a null error code."); + break; + case AsyncStatus.Canceled: + error = new OperationCanceledException(); + break; + case AsyncStatus.Completed: + if (_getResult != null) + result = _getResult(info); + break; + default: + if (!initial) + throw new InvalidOperationException("The asynchronous operation completed unexpectedly."); + + _onCompleted(info, (iai, s) => Done(iai, s, false)); + return; + } + + // + // Close as early as possible, before running continuations which could fail. In case of + // failure above, we don't close out the object in order to allow for debugging of the + // rogue implementation without losing state prematurely. Notice _getResults is merely + // an indirect call to the appropriate GetResults method, which is not supposed to throw. + // Instead, an Error status should be returned. + // + info.Close(); + + // + // Now we run the continuations, which could take a long time. Failure here is catastrophic + // and under control of the upstream subscriber. + // + if (error != null) + { + _subject.OnError(error); + } + else + { + if (_getResult != null) + _subject.OnNext(result); + + _subject.OnCompleted(); + } + } + + protected override IDisposable SubscribeCore(IObserver<TResult> observer) + { + return _subject.Subscribe(observer); + } + } +} +#endif
\ No newline at end of file |