Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs')
-rw-r--r--Rx.NET/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs98
1 files changed, 98 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs b/Rx.NET/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs
new file mode 100644
index 0000000..e565a42
--- /dev/null
+++ b/Rx.NET/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs
@@ -0,0 +1,98 @@
+using System.Reactive.Disposables;
+
+namespace System.Reactive
+{
+ class AutoDetachObserver<T> : ObserverBase<T>
+ {
+ private readonly IObserver<T> observer;
+ private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable();
+
+ public AutoDetachObserver(IObserver<T> observer)
+ {
+ this.observer = observer;
+ }
+
+ public IDisposable Disposable
+ {
+ set { m.Disposable = value; }
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ //
+ // Safeguarding of the pipeline against rogue observers is required for proper
+ // resource cleanup. Consider the following example:
+ //
+ // var xs = Observable.Interval(TimeSpan.FromSeconds(1));
+ // var ys = <some random sequence>;
+ // var res = xs.CombineLatest(ys, (x, y) => x + y);
+ //
+ // The marble diagram of the query above looks as follows:
+ //
+ // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
+ // | | | | | | | | |
+ // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
+ // | | | | | | | | | | | | | |
+ // v v v v v v v v v v v v v v
+ // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
+ // |
+ // @#&
+ //
+ // Notice the free-threaded nature of Rx, where messages on the resulting sequence
+ // are produced by either of the two input sequences to CombineLatest.
+ //
+ // Now assume an exception happens in the OnNext callback for the observer of res,
+ // at the indicated point marked with @#& above. The callback runs in the context
+ // of ys, so the exception will take down the scheduler thread of ys. This by
+ // itself is a problem (that can be mitigated by a Catch operator on IScheduler),
+ // but notice how the timer that produces xs is kept alive.
+ //
+ // The safe-guarding code below ensures the acquired resources are disposed when
+ // the user callback throws.
+ //
+ var __noError = false;
+ try
+ {
+ observer.OnNext(value);
+ __noError = true;
+ }
+ finally
+ {
+ if (!__noError)
+ Dispose();
+ }
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ try
+ {
+ observer.OnError(exception);
+ }
+ finally
+ {
+ Dispose();
+ }
+ }
+
+ protected override void OnCompletedCore()
+ {
+ try
+ {
+ observer.OnCompleted();
+ }
+ finally
+ {
+ Dispose();
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ m.Dispose();
+ }
+ }
+} \ No newline at end of file