Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs')
-rw-r--r--Rx.NET/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs441
1 files changed, 441 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs b/Rx.NET/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs
new file mode 100644
index 0000000..2b728e2
--- /dev/null
+++ b/Rx.NET/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs
@@ -0,0 +1,441 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive
+{
+#if !NO_PERF && !NO_CDS
+ using System.Collections.Concurrent;
+ using System.Diagnostics;
+
+ internal class ScheduledObserver<T> : ObserverBase<T>, IDisposable
+ {
+ private volatile int _state = 0;
+ private const int STOPPED = 0;
+ private const int RUNNING = 1;
+ private const int PENDING = 2;
+ private const int FAULTED = 9;
+
+ private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
+ private volatile bool _failed;
+ private volatile Exception _error;
+ private volatile bool _completed;
+
+ private readonly IObserver<T> _observer;
+ private readonly IScheduler _scheduler;
+ private readonly ISchedulerLongRunning _longRunning;
+ private readonly SerialDisposable _disposable = new SerialDisposable();
+
+ public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
+ {
+ _scheduler = scheduler;
+ _observer = observer;
+ _longRunning = _scheduler.AsLongRunning();
+
+ if (_longRunning != null)
+ _dispatcherEvent = new SemaphoreSlim(0);
+ }
+
+ private readonly object _dispatcherInitGate = new object();
+ private SemaphoreSlim _dispatcherEvent;
+ private IDisposable _dispatcherJob;
+
+ private void EnsureDispatcher()
+ {
+ if (_dispatcherJob == null)
+ {
+ lock (_dispatcherInitGate)
+ {
+ if (_dispatcherJob == null)
+ {
+ _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
+
+ _disposable.Disposable = new CompositeDisposable(2)
+ {
+ _dispatcherJob,
+ Disposable.Create(() => _dispatcherEvent.Release())
+ };
+ }
+ }
+ }
+ }
+
+ private void Dispatch(ICancelable cancel)
+ {
+ while (true)
+ {
+ _dispatcherEvent.Wait();
+
+ if (cancel.IsDisposed)
+ return;
+
+ var next = default(T);
+ while (_queue.TryDequeue(out next))
+ {
+ try
+ {
+ _observer.OnNext(next);
+ }
+ catch
+ {
+ var nop = default(T);
+ while (_queue.TryDequeue(out nop))
+ ;
+
+ throw;
+ }
+
+ _dispatcherEvent.Wait();
+
+ if (cancel.IsDisposed)
+ return;
+ }
+
+ if (_failed)
+ {
+ _observer.OnError(_error);
+ Dispose();
+ return;
+ }
+
+ if (_completed)
+ {
+ _observer.OnCompleted();
+ Dispose();
+ return;
+ }
+ }
+ }
+
+ public void EnsureActive()
+ {
+ EnsureActive(1);
+ }
+
+ public void EnsureActive(int n)
+ {
+ if (_longRunning != null)
+ {
+ if (n > 0)
+ _dispatcherEvent.Release(n);
+
+ EnsureDispatcher();
+ }
+ else
+ EnsureActiveSlow();
+ }
+
+ private void EnsureActiveSlow()
+ {
+ var isOwner = false;
+
+#pragma warning disable 0420
+ while (true)
+ {
+ var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
+ if (old == STOPPED)
+ {
+ isOwner = true; // RUNNING
+ break;
+ }
+
+ if (old == FAULTED)
+ return;
+
+ // If we find the consumer loop running, we transition to PENDING to handle
+ // the case where the queue is seen empty by the consumer, making it transition
+ // to the STOPPED state, but we inserted an item into the queue.
+ //
+ // C: _queue.TryDequeue == false (RUNNING)
+ // ----------------------------------------------
+ // P: _queue.Enqueue(...)
+ // EnsureActive
+ // Exchange(ref _state, RUNNING) == RUNNING
+ // ----------------------------------------------
+ // C: transition to STOPPED (STOPPED)
+ //
+ // In this case, P would believe C is running and not invoke the scheduler
+ // using the isOwner flag.
+ //
+ // By introducing an intermediate PENDING state and using CAS in the consumer
+ // to only transition to STOPPED in case we were still RUNNING, we can force
+ // the consumer to reconsider the decision to transition to STOPPED. In that
+ // case, the consumer loops again and re-reads from the queue and other state
+ // fields. At least one bit of state will have changed because EnsureActive
+ // should only be called after invocation of IObserver<T> methods that touch
+ // this state.
+ //
+ if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
+ break;
+ }
+#pragma warning restore 0420
+
+ if (isOwner)
+ {
+ _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
+ }
+ }
+
+ private void Run(object state, Action<object> recurse)
+ {
+#pragma warning disable 0420
+ var next = default(T);
+ while (!_queue.TryDequeue(out next))
+ {
+ if (_failed)
+ {
+ // Between transitioning to _failed and the queue check in the loop,
+ // items could have been queued, so we can't stop yet. We don't spin
+ // and immediately re-check the queue.
+ //
+ // C: _queue.TryDequeue == false
+ // ----------------------------------------------
+ // P: OnNext(...)
+ // _queue.Enqueue(...) // Will get lost
+ // P: OnError(...)
+ // _failed = true
+ // ----------------------------------------------
+ // C: if (_failed)
+ // _observer.OnError(...) // Lost an OnNext
+ //
+ if (!_queue.IsEmpty)
+ continue;
+
+ Interlocked.Exchange(ref _state, STOPPED);
+ _observer.OnError(_error);
+ Dispose();
+ return;
+ }
+
+ if (_completed)
+ {
+ // Between transitioning to _completed and the queue check in the loop,
+ // items could have been queued, so we can't stop yet. We don't spin
+ // and immediately re-check the queue.
+ //
+ // C: _queue.TryDequeue == false
+ // ----------------------------------------------
+ // P: OnNext(...)
+ // _queue.Enqueue(...) // Will get lost
+ // P: OnCompleted(...)
+ // _completed = true
+ // ----------------------------------------------
+ // C: if (_completed)
+ // _observer.OnCompleted() // Lost an OnNext
+ //
+ if (!_queue.IsEmpty)
+ continue;
+
+ Interlocked.Exchange(ref _state, STOPPED);
+ _observer.OnCompleted();
+ Dispose();
+ return;
+ }
+
+ var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
+ if (old == RUNNING || old == FAULTED)
+ return;
+
+ Debug.Assert(old == PENDING);
+
+ // The producer has put us in the PENDING state to prevent us from
+ // transitioning to STOPPED, so we go RUNNING again and re-check our state.
+ _state = RUNNING;
+ }
+
+ Interlocked.Exchange(ref _state, RUNNING);
+
+#pragma warning restore 0420
+
+ try
+ {
+ _observer.OnNext(next);
+ }
+ catch
+ {
+#pragma warning disable 0420
+ Interlocked.Exchange(ref _state, FAULTED);
+#pragma warning restore 0420
+
+ var nop = default(T);
+ while (_queue.TryDequeue(out nop))
+ ;
+ throw;
+ }
+
+ recurse(state);
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ _queue.Enqueue(value);
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ _error = exception;
+ _failed = true;
+ }
+
+ protected override void OnCompletedCore()
+ {
+ _completed = true;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ {
+ _disposable.Dispose();
+ }
+ }
+ }
+#else
+ class ScheduledObserver<T> : ObserverBase<T>, IDisposable
+ {
+ private bool _isAcquired = false;
+ private bool _hasFaulted = false;
+ private readonly Queue<Action> _queue = new Queue<Action>();
+ private readonly IObserver<T> _observer;
+ private readonly IScheduler _scheduler;
+ private readonly SerialDisposable _disposable = new SerialDisposable();
+
+ public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
+ {
+ _scheduler = scheduler;
+ _observer = observer;
+ }
+
+ public void EnsureActive(int n)
+ {
+ EnsureActive();
+ }
+
+ public void EnsureActive()
+ {
+ var isOwner = false;
+
+ lock (_queue)
+ {
+ if (!_hasFaulted && _queue.Count > 0)
+ {
+ isOwner = !_isAcquired;
+ _isAcquired = true;
+ }
+ }
+
+ if (isOwner)
+ {
+ _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
+ }
+ }
+
+ private void Run(object state, Action<object> recurse)
+ {
+ var work = default(Action);
+ lock (_queue)
+ {
+ if (_queue.Count > 0)
+ work = _queue.Dequeue();
+ else
+ {
+ _isAcquired = false;
+ return;
+ }
+ }
+
+ try
+ {
+ work();
+ }
+ catch
+ {
+ lock (_queue)
+ {
+ _queue.Clear();
+ _hasFaulted = true;
+ }
+ throw;
+ }
+
+ recurse(state);
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ lock (_queue)
+ _queue.Enqueue(() => _observer.OnNext(value));
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ lock (_queue)
+ _queue.Enqueue(() => _observer.OnError(exception));
+ }
+
+ protected override void OnCompletedCore()
+ {
+ lock (_queue)
+ _queue.Enqueue(() => _observer.OnCompleted());
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ {
+ _disposable.Dispose();
+ }
+ }
+ }
+#endif
+
+ class ObserveOnObserver<T> : ScheduledObserver<T>
+ {
+ private IDisposable _cancel;
+
+ public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
+ : base(scheduler, observer)
+ {
+ _cancel = cancel;
+ }
+
+ protected override void OnNextCore(T value)
+ {
+ base.OnNextCore(value);
+ EnsureActive();
+ }
+
+ protected override void OnErrorCore(Exception exception)
+ {
+ base.OnErrorCore(exception);
+ EnsureActive();
+ }
+
+ protected override void OnCompletedCore()
+ {
+ base.OnCompletedCore();
+ EnsureActive();
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+
+ if (disposing)
+ {
+ var cancel = Interlocked.Exchange(ref _cancel, null);
+ if (cancel != null)
+ {
+ cancel.Dispose();
+ }
+ }
+ }
+ }
+}