diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Core/Reactive')
71 files changed, 10138 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousObservable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousObservable.cs new file mode 100644 index 0000000..81ffa53 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousObservable.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Disposables; + +namespace System.Reactive +{ + /// <summary> + /// Class to create an IObservable<T> instance from a delegate-based implementation of the Subscribe method. + /// </summary> + /// <typeparam name="T">The type of the elements in the sequence.</typeparam> + public sealed class AnonymousObservable<T> : ObservableBase<T> + { + private readonly Func<IObserver<T>, IDisposable> _subscribe; + + /// <summary> + /// Creates an observable sequence object from the specified subscription function. + /// </summary> + /// <param name="subscribe">Subscribe method implementation.</param> + /// <exception cref="ArgumentNullException"><paramref name="subscribe"/> is null.</exception> + public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) + { + if (subscribe == null) + throw new ArgumentNullException("subscribe"); + + _subscribe = subscribe; + } + + /// <summary> + /// Calls the subscription function that was supplied to the constructor. + /// </summary> + /// <param name="observer">Observer to send notifications to.</param> + /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns> + protected override IDisposable SubscribeCore(IObserver<T> observer) + { + return _subscribe(observer) ?? Disposable.Empty; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousObserver.cs new file mode 100644 index 0000000..ea0f9ec --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousObserver.cs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + /// <summary> + /// Class to create an IObserver<T> instance from delegate-based implementations of the On* methods. + /// </summary> + /// <typeparam name="T">The type of the elements in the sequence.</typeparam> + public sealed class AnonymousObserver<T> : ObserverBase<T> + { + private readonly Action<T> _onNext; + private readonly Action<Exception> _onError; + private readonly Action _onCompleted; + + /// <summary> + /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions. + /// </summary> + /// <param name="onNext">Observer's OnNext action implementation.</param> + /// <param name="onError">Observer's OnError action implementation.</param> + /// <param name="onCompleted">Observer's OnCompleted action implementation.</param> + /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception> + public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + _onNext = onNext; + _onError = onError; + _onCompleted = onCompleted; + } + + /// <summary> + /// Creates an observer from the specified OnNext action. + /// </summary> + /// <param name="onNext">Observer's OnNext action implementation.</param> + /// <exception cref="ArgumentNullException"><paramref name="onNext"/> is null.</exception> + public AnonymousObserver(Action<T> onNext) + : this(onNext, Stubs.Throw, Stubs.Nop) + { + } + + /// <summary> + /// Creates an observer from the specified OnNext and OnError actions. + /// </summary> + /// <param name="onNext">Observer's OnNext action implementation.</param> + /// <param name="onError">Observer's OnError action implementation.</param> + /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> is null.</exception> + public AnonymousObserver(Action<T> onNext, Action<Exception> onError) + : this(onNext, onError, Stubs.Nop) + { + } + + /// <summary> + /// Creates an observer from the specified OnNext and OnCompleted actions. + /// </summary> + /// <param name="onNext">Observer's OnNext action implementation.</param> + /// <param name="onCompleted">Observer's OnCompleted action implementation.</param> + /// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception> + public AnonymousObserver(Action<T> onNext, Action onCompleted) + : this(onNext, Stubs.Throw, onCompleted) + { + } + + /// <summary> + /// Calls the onNext action. + /// </summary> + /// <param name="value">Next element in the sequence.</param> + protected override void OnNextCore(T value) + { + _onNext(value); + } + + /// <summary> + /// Calls the onError action. + /// </summary> + /// <param name="error">The error that has occurred.</param> + protected override void OnErrorCore(Exception error) + { + _onError(error); + } + + /// <summary> + /// Calls the onCompleted action. + /// </summary> + protected override void OnCompletedCore() + { + _onCompleted(); + } + + internal IObserver<T> MakeSafe(IDisposable disposable) + { + return new AnonymousSafeObserver<T>(_onNext, _onError, _onCompleted, disposable); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousSafeObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousSafeObserver.cs new file mode 100644 index 0000000..ae7cccc --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/AnonymousSafeObserver.cs @@ -0,0 +1,85 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Threading; + +namespace System.Reactive +{ + // + // See AutoDetachObserver.cs for more information on the safeguarding requirement and + // its implementation aspects. + // + + /// <summary> + /// This class fuses logic from ObserverBase, AnonymousObserver, and SafeObserver into one class. When an observer + /// needs to be safeguarded, an instance of this type can be created by SafeObserver.Create when it detects its + /// input is an AnonymousObserver, which is commonly used by end users when using the Subscribe extension methods + /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which + /// helps debugging and some performance. + /// </summary> + class AnonymousSafeObserver<T> : IObserver<T> + { + private readonly Action<T> _onNext; + private readonly Action<Exception> _onError; + private readonly Action _onCompleted; + private readonly IDisposable _disposable; + + private int isStopped; + + public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable) + { + _onNext = onNext; + _onError = onError; + _onCompleted = onCompleted; + _disposable = disposable; + } + + public void OnNext(T value) + { + if (isStopped == 0) + { + var __noError = false; + try + { + _onNext(value); + __noError = true; + } + finally + { + if (!__noError) + _disposable.Dispose(); + } + } + } + + public void OnError(Exception error) + { + if (Interlocked.Exchange(ref isStopped, 1) == 0) + { + try + { + _onError(error); + } + finally + { + _disposable.Dispose(); + } + } + } + + public void OnCompleted() + { + if (Interlocked.Exchange(ref isStopped, 1) == 0) + { + try + { + _onCompleted(); + } + finally + { + _disposable.Dispose(); + } + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/AsyncLock.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/AsyncLock.cs new file mode 100644 index 0000000..73def1a --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/AsyncLock.cs @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Asynchronous lock. + /// </summary> + public sealed class AsyncLock : IDisposable + { + private readonly Queue<Action> queue = new Queue<Action>(); + private bool isAcquired = false; + private bool hasFaulted = false; + + /// <summary> + /// Queues the action for execution. If the caller acquires the lock and becomes the owner, + /// the queue is processed. If the lock is already owned, the action is queued and will get + /// processed by the owner. + /// </summary> + /// <param name="action">Action to queue for execution.</param> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public void Wait(Action action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var isOwner = false; + lock (queue) + { + if (!hasFaulted) + { + queue.Enqueue(action); + isOwner = !isAcquired; + isAcquired = true; + } + } + + if (isOwner) + { + while (true) + { + var work = default(Action); + lock (queue) + { + if (queue.Count > 0) + work = queue.Dequeue(); + else + { + isAcquired = false; + break; + } + } + + try + { + work(); + } + catch + { + lock (queue) + { + queue.Clear(); + hasFaulted = true; + } + throw; + } + } + } + } + + /// <summary> + /// Clears the work items in the queue and drops further work being queued. + /// </summary> + public void Dispose() + { + lock (queue) + { + queue.Clear(); + hasFaulted = true; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/CatchScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/CatchScheduler.cs new file mode 100644 index 0000000..b4e9fa3 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/CatchScheduler.cs @@ -0,0 +1,151 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Reactive.Disposables; + +#if !NO_WEAKTABLE +using System.Runtime.CompilerServices; +#endif + +namespace System.Reactive.Concurrency +{ + class CatchScheduler<TException> : SchedulerWrapper + where TException : Exception + { + private readonly Func<TException, bool> _handler; + + public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler) + : base(scheduler) + { + _handler = handler; + } + + protected override Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<IScheduler, TState, IDisposable> action) + { + return (self, state) => + { + try + { + return action(GetRecursiveWrapper(self), state); + } + catch (TException exception) + { + if (!_handler(exception)) + throw; + + return Disposable.Empty; + } + }; + } + +#if !NO_WEAKTABLE + public CatchScheduler(IScheduler scheduler, Func<TException, bool> handler, ConditionalWeakTable<IScheduler, IScheduler> cache) + : base(scheduler, cache) + { + _handler = handler; + } + + protected override SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache) + { + return new CatchScheduler<TException>(scheduler, _handler, cache); + } +#else + protected override SchedulerWrapper Clone(IScheduler scheduler) + { + return new CatchScheduler<TException>(scheduler, _handler); + } +#endif + + protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service) + { + service = provider.GetService(serviceType); + + if (service != null) + { + if (serviceType == typeof(ISchedulerLongRunning)) + service = new CatchSchedulerLongRunning((ISchedulerLongRunning)service, _handler); + else if (serviceType == typeof(ISchedulerPeriodic)) + service = new CatchSchedulerPeriodic((ISchedulerPeriodic)service, _handler); + } + + return true; + } + + class CatchSchedulerLongRunning : ISchedulerLongRunning + { + private readonly ISchedulerLongRunning _scheduler; + private readonly Func<TException, bool> _handler; + + public CatchSchedulerLongRunning(ISchedulerLongRunning scheduler, Func<TException, bool> handler) + { + _scheduler = scheduler; + _handler = handler; + } + + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + return _scheduler.ScheduleLongRunning(state, (state_, cancel) => + { + try + { + action(state_, cancel); + } + catch (TException exception) + { + if (!_handler(exception)) + throw; + } + }); + } + } + + class CatchSchedulerPeriodic : ISchedulerPeriodic + { + private readonly ISchedulerPeriodic _scheduler; + private readonly Func<TException, bool> _handler; + + public CatchSchedulerPeriodic(ISchedulerPeriodic scheduler, Func<TException, bool> handler) + { + _scheduler = scheduler; + _handler = handler; + } + + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + var failed = false; + + var d = new SingleAssignmentDisposable(); + + d.Disposable = _scheduler.SchedulePeriodic(state, period, state_ => + { + // + // Cancellation may not be granted immediately; prevent from running user + // code in that case. Periodic schedulers are assumed to introduce some + // degree of concurrency, so we should return from the SchedulePeriodic + // call eventually, allowing the d.Dispose() call in the catch block to + // take effect. + // + if (failed) + return default(TState); + + try + { + return action(state_); + } + catch (TException exception) + { + failed = true; + + if (!_handler(exception)) + throw; + + d.Dispose(); + return default(TState); + } + }); + + return d; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs new file mode 100644 index 0000000..cc9cd57 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs @@ -0,0 +1,439 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_THREAD +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class DefaultConcurrencyAbstractionLayer/*Impl*/ : IConcurrencyAbstractionLayer + { + public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) + { + return new Timer(action, state, Normalize(dueTime)); + } + + public IDisposable StartPeriodicTimer(Action action, TimeSpan period) + { + // + // MSDN documentation states the following: + // + // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once; + // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method." + // + if (period <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + + return new PeriodicTimer(action, period); + } + + public IDisposable QueueUserWorkItem(Action<object> action, object state) + { + System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state); + return Disposable.Empty; + } + +#if USE_SLEEP_MS + public void Sleep(TimeSpan timeout) + { + System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds); + } +#else + public void Sleep(TimeSpan timeout) + { + System.Threading.Thread.Sleep(Normalize(timeout)); + } +#endif + + public IStopwatch StartStopwatch() + { + return new DefaultStopwatch(); + } + + public bool SupportsLongRunning + { + get { return true; } + } + + public void StartThread(Action<object> action, object state) + { + new Thread(() => + { + action(state); + }) { IsBackground = true }.Start(); + } + + private static TimeSpan Normalize(TimeSpan dueTime) + { + if (dueTime < TimeSpan.Zero) + return TimeSpan.Zero; + + return dueTime; + } + +#if USE_TIMER_SELF_ROOT + + // + // Some historical context. In the early days of Rx, we discovered an issue with + // the rooting of timers, causing them to get GC'ed even when the IDisposable of + // a scheduled activity was kept alive. The original code simply created a timer + // as follows: + // + // var t = default(Timer); + // t = new Timer(_ => + // { + // t = null; + // Debug.WriteLine("Hello!"); + // }, null, 5000, Timeout.Infinite); + // + // IIRC the reference to "t" captured by the closure wasn't sufficient on .NET CF + // to keep the timer rooted, causing problems on Windows Phone 7. As a result, we + // added rooting code using a dictionary (SD 7280), which we carried forward all + // the way to Rx v2.0 RTM. + // + // However, the desktop CLR's implementation of System.Threading.Timer exhibits + // other characteristics where a timer can root itself when the timer is still + // reachable through the state or callback parameters. To illustrate this, run + // the following piece of code: + // + // static void Main() + // { + // Bar(); + // + // while (true) + // { + // GC.Collect(); + // GC.WaitForPendingFinalizers(); + // Thread.Sleep(100); + // } + // } + // + // static void Bar() + // { + // var t = default(Timer); + // t = new Timer(_ => + // { + // t = null; // Comment out this line to see the timer stop + // Console.WriteLine("Hello!"); + // }, null, 5000, Timeout.Infinite); + // } + // + // When the closure over "t" is removed, the timer will stop automatically upon + // garbage collection. However, when retaining the reference, this problem does + // not exist. The code below exploits this behavior, avoiding unnecessary costs + // to root timers in a thread-safe manner. + // + // Below is a fragment of SOS output, proving the proper rooting: + // + // !gcroot 02492440 + // HandleTable: + // 005a13fc (pinned handle) + // -> 03491010 System.Object[] + // -> 024924dc System.Threading.TimerQueue + // -> 02492450 System.Threading.TimerQueueTimer + // -> 02492420 System.Threading.TimerCallback + // -> 02492414 TimerRootingExperiment.Program+<>c__DisplayClass1 + // -> 02492440 System.Threading.Timer + // + // With the USE_TIMER_SELF_ROOT symbol, we shake off this additional rooting code + // for newer platforms where this no longer needed. We checked this on .NET Core + // as well as .NET 4.0, and only #define this symbol for those platforms. + // + + class Timer : IDisposable + { + private Action<object> _action; + private volatile System.Threading.Timer _timer; + + public Timer(Action<object> action, object state, TimeSpan dueTime) + { + _action = action; + + // Don't want the spin wait in Tick to get stuck if this thread gets aborted. + try { } + finally + { + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + } + } + + private void Tick(object state) + { + try + { + _action(state); + } + finally + { + SpinWait.SpinUntil(IsTimerAssigned); + Dispose(); + } + } + + private bool IsTimerAssigned() + { + return _timer != null; + } + + public void Dispose() + { + var timer = _timer; + if (timer != TimerStubs.Never) + { + _action = Stubs<object>.Ignore; + _timer = TimerStubs.Never; + + timer.Dispose(); + } + } + } + + class PeriodicTimer : IDisposable + { + private Action _action; + private volatile System.Threading.Timer _timer; + + public PeriodicTimer(Action action, TimeSpan period) + { + _action = action; + + // + // Rooting of the timer happens through the this.Tick delegate's target object, + // which is the current instance and has a field to store the Timer instance. + // + _timer = new System.Threading.Timer(this.Tick, null, period, period); + } + + private void Tick(object state) + { + _action(); + } + + public void Dispose() + { + var timer = _timer; + if (timer != null) + { + _action = Stubs.Nop; + _timer = null; + + timer.Dispose(); + } + } + } +#else + class Timer : IDisposable + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + + private Action<object> _action; + private System.Threading.Timer _timer; + + private bool _hasAdded; + private bool _hasRemoved; + + public Timer(Action<object> action, object state, TimeSpan dueTime) + { + _action = action; + _timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); + + lock (s_timers) + { + if (!_hasRemoved) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + + _hasAdded = true; + } + } + } + + private void Tick(object state) + { + try + { + _action(state); + } + finally + { + Dispose(); + } + } + + public void Dispose() + { + _action = Stubs<object>.Ignore; + + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + if (!_hasRemoved) + { + timer = _timer; + _timer = null; + + if (_hasAdded && timer != null) + s_timers.Remove(timer); + + _hasRemoved = true; + } + } + + if (timer != null) + timer.Dispose(); + } + } + + class PeriodicTimer : IDisposable + { + // + // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running. + // +#if !NO_HASHSET + private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>(); +#else + private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>(); +#endif + + private Action _action; + private System.Threading.Timer _timer; + + public PeriodicTimer(Action action, TimeSpan period) + { + _action = action; + _timer = new System.Threading.Timer(Tick, null, period, period); + + lock (s_timers) + { +#if !NO_HASHSET + s_timers.Add(_timer); +#else + s_timers.Add(_timer, null); +#endif + } + } + + private void Tick(object state) + { + _action(); + } + + public void Dispose() + { + var timer = default(System.Threading.Timer); + + lock (s_timers) + { + timer = _timer; + _timer = null; + + if (timer != null) + s_timers.Remove(timer); + } + + if (timer != null) + { + timer.Dispose(); + _action = Stubs.Nop; + } + } + } +#endif + } +} +#else +using System; +using System.Reactive.Disposables; +using System.Threading.Tasks; + +namespace System.Reactive.Concurrency +{ + internal class DefaultConcurrencyAbstractionLayer : IConcurrencyAbstractionLayer + { + public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) + { + var cancel = new CancellationDisposable(); + Task.Delay(dueTime, cancel.Token).ContinueWith( + _ => action(state), + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion + ); + return cancel; + } + + public IDisposable StartPeriodicTimer(Action action, TimeSpan period) + { + var cancel = new CancellationDisposable(); + + var moveNext = default(Action); + moveNext = () => + { + Task.Delay(period, cancel.Token).ContinueWith( + _ => + { + moveNext(); + action(); + }, + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion + ); + }; + + moveNext(); + + return cancel; + } + + public IDisposable QueueUserWorkItem(Action<object> action, object state) + { + var cancel = new CancellationDisposable(); + Task.Factory.StartNew(action, state, cancel.Token); + return cancel; + } + + public void Sleep(TimeSpan timeout) + { + Task.Delay(timeout).Wait(); + } + + public IStopwatch StartStopwatch() + { + return new DefaultStopwatch(); + } + + public bool SupportsLongRunning + { + get { return true; } + } + + public void StartThread(Action<object> action, object state) + { + Task.Factory.StartNew(() => + { + action(state); + }, TaskCreationOptions.LongRunning); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.cs new file mode 100644 index 0000000..d09058f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.cs @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reactive.PlatformServices; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// (Infrastructure) Concurrency abstraction layer. + /// </summary> + internal static class ConcurrencyAbstractionLayer + { + private static Lazy<IConcurrencyAbstractionLayer> s_current = new Lazy<IConcurrencyAbstractionLayer>(Initialize); + + /// <summary> + /// Gets the current CAL. If no CAL has been set yet, it will be initialized to the default. + /// </summary> + public static IConcurrencyAbstractionLayer Current + { + get + { + return s_current.Value; + } + } + + private static IConcurrencyAbstractionLayer Initialize() + { + return PlatformEnlightenmentProvider.Current.GetService<IConcurrencyAbstractionLayer>() ?? new DefaultConcurrencyAbstractionLayer(); + } + } + + /// <summary> + /// (Infrastructure) Concurrency abstraction layer interface. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IConcurrencyAbstractionLayer + { + /// <summary> + /// Queues a method for execution at the specified relative time. + /// </summary> + /// <param name="action">Method to execute.</param> + /// <param name="state">State to pass to the method.</param> + /// <param name="dueTime">Time to execute the method on.</param> + /// <returns>Disposable object that can be used to stop the timer.</returns> + IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime); + + /// <summary> + /// Queues a method for periodic execution based on the specified period. + /// </summary> + /// <param name="action">Method to execute; should be safe for reentrancy.</param> + /// <param name="period">Period for running the method periodically.</param> + /// <returns>Disposable object that can be used to stop the timer.</returns> + IDisposable StartPeriodicTimer(Action action, TimeSpan period); + + /// <summary> + /// Queues a method for execution. + /// </summary> + /// <param name="action">Method to execute.</param> + /// <param name="state">State to pass to the method.</param> + /// <returns>Disposable object that can be used to cancel the queued method.</returns> + IDisposable QueueUserWorkItem(Action<object> action, object state); + + /// <summary> + /// Blocking sleep operation. + /// </summary> + /// <param name="timeout">Time to sleep.</param> + void Sleep(TimeSpan timeout); + + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + IStopwatch StartStopwatch(); + + /// <summary> + /// Gets whether long-running scheduling is supported. + /// </summary> + bool SupportsLongRunning + { + get; + } + + /// <summary> + /// Starts a new long-running thread. + /// </summary> + /// <param name="action">Method to execute.</param> + /// <param name="state">State to pass to the method.</param> + void StartThread(Action<object> action, object state); + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/CurrentThreadScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/CurrentThreadScheduler.cs new file mode 100644 index 0000000..1a051c6 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/CurrentThreadScheduler.cs @@ -0,0 +1,191 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Threading; +using System.Reactive.Disposables; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the current thread. + /// </summary> + /// <seealso cref="Scheduler.CurrentThread">Singleton instance of this type exposed through this static property.</seealso> + public sealed class CurrentThreadScheduler : LocalScheduler + { + private static readonly CurrentThreadScheduler s_instance = new CurrentThreadScheduler(); + + CurrentThreadScheduler() + { + } + + /// <summary> + /// Gets the singleton instance of the current thread scheduler. + /// </summary> + public static CurrentThreadScheduler Instance + { + get { return s_instance; } + } + +#if !NO_TLS + [ThreadStatic] + static SchedulerQueue<TimeSpan> s_threadLocalQueue; + + [ThreadStatic] + static IStopwatch s_clock; + + private static SchedulerQueue<TimeSpan> GetQueue() + { + return s_threadLocalQueue; + } + + private static void SetQueue(SchedulerQueue<TimeSpan> newQueue) + { + s_threadLocalQueue = newQueue; + } + + private static TimeSpan Time + { + get + { + if (s_clock == null) + s_clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + return s_clock.Elapsed; + } + } +#else + private static readonly System.Collections.Generic.Dictionary<int, SchedulerQueue<TimeSpan>> s_queues = new System.Collections.Generic.Dictionary<int, SchedulerQueue<TimeSpan>>(); + + private static readonly System.Collections.Generic.Dictionary<int, IStopwatch> s_clocks = new System.Collections.Generic.Dictionary<int, IStopwatch>(); + + private static SchedulerQueue<TimeSpan> GetQueue() + { + lock (s_queues) + { + var item = default(SchedulerQueue<TimeSpan>); + if (s_queues.TryGetValue(Thread.CurrentThread.ManagedThreadId, out item)) + return item; + + return null; + } + } + + private static void SetQueue(SchedulerQueue<TimeSpan> newQueue) + { + lock (s_queues) + { + if (newQueue == null) + s_queues.Remove(Thread.CurrentThread.ManagedThreadId); + else + s_queues[Thread.CurrentThread.ManagedThreadId] = newQueue; + } + } + + private static TimeSpan Time + { + get + { + lock (s_clocks) + { + var clock = default(IStopwatch); + if (!s_clocks.TryGetValue(Thread.CurrentThread.ManagedThreadId, out clock)) + s_clocks[Thread.CurrentThread.ManagedThreadId] = clock = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + return clock.Elapsed; + } + } + } +#endif + + /// <summary> + /// Gets a value that indicates whether the caller must call a Schedule method. + /// </summary> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Justification = "Now marked as obsolete.")] + [EditorBrowsable(EditorBrowsableState.Never)] + [Obsolete(Constants_Core.OBSOLETE_SCHEDULEREQUIRED)] // Preferring static method call over instance method call. + public bool ScheduleRequired + { + get + { + return IsScheduleRequired; + } + } + + /// <summary> + /// Gets a value that indicates whether the caller must call a Schedule method. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Advanced)] + public static bool IsScheduleRequired + { + get + { + return GetQueue() == null; + } + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Time + Scheduler.Normalize(dueTime); + + var si = new ScheduledItem<TimeSpan, TState>(this, state, action, dt); + + var queue = GetQueue(); + + if (queue == null) + { + queue = new SchedulerQueue<TimeSpan>(4); + queue.Enqueue(si); + + CurrentThreadScheduler.SetQueue(queue); + try + { + Trampoline.Run(queue); + } + finally + { + CurrentThreadScheduler.SetQueue(null); + } + } + else + { + queue.Enqueue(si); + } + + return Disposable.Create(si.Cancel); + } + + static class Trampoline + { + public static void Run(SchedulerQueue<TimeSpan> queue) + { + while (queue.Count > 0) + { + var item = queue.Dequeue(); + if (!item.IsCanceled) + { + var wait = item.DueTime - CurrentThreadScheduler.Time; + if (wait.Ticks > 0) + { + ConcurrencyAbstractionLayer.Current.Sleep(wait); + } + + if (!item.IsCanceled) + item.Invoke(); + } + } + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs new file mode 100644 index 0000000..1e5ae79 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs @@ -0,0 +1,176 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on the platform's default scheduler. + /// </summary> + /// <seealso cref="Scheduler.Default">Singleton instance of this type exposed through this static property.</seealso> + public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic + { + private static readonly DefaultScheduler s_instance = new DefaultScheduler(); + private static IConcurrencyAbstractionLayer s_cal = ConcurrencyAbstractionLayer.Current; + + /// <summary> + /// Gets the singleton instance of the default scheduler. + /// </summary> + public static DefaultScheduler Instance + { + get + { + return s_instance; + } + } + + private DefaultScheduler() + { + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + var cancel = s_cal.QueueUserWorkItem(_ => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, null); + + return new CompositeDisposable( + d, + cancel + ); + } + + /// <summary> + /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + + var d = new SingleAssignmentDisposable(); + + var cancel = s_cal.StartTimer(_ => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }, null, dt); + + return new CompositeDisposable( + d, + cancel + ); + } + + /// <summary> + /// Schedules a periodic piece of work, using a System.Threading.Timer object. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) + { + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + var state1 = state; + var gate = new AsyncLock(); + + var cancel = s_cal.StartPeriodicTimer(() => + { + gate.Wait(() => + { + state1 = action(state1); + }); + }, period); + + return Disposable.Create(() => + { + cancel.Dispose(); + gate.Dispose(); + action = Stubs<TState>.I; + }); + } + + /// <summary> + /// Discovers scheduler services by interface type. + /// </summary> + /// <param name="serviceType">Scheduler service interface type to discover.</param> + /// <returns>Object implementing the requested service, if available; null otherwise.</returns> + protected override object GetService(Type serviceType) + { + if (serviceType == typeof(ISchedulerLongRunning)) + { + if (s_cal.SupportsLongRunning) + { + return LongRunning.Instance; + } + } + + return base.GetService(serviceType); + } + + class LongRunning : ISchedulerLongRunning + { + public static ISchedulerLongRunning Instance = new LongRunning(); + + public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var cancel = new BooleanDisposable(); + + DefaultScheduler.s_cal.StartThread( + arg => + { + var d = (ICancelable)arg; + + // + // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning + // requires us to ensure the scheduled work gets an opportunity to observe + // the cancellation request. + // + action(state, d); + }, + cancel + ); + + return cancel; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/DisableOptimizationsScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/DisableOptimizationsScheduler.cs new file mode 100644 index 0000000..dec54b9 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/DisableOptimizationsScheduler.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Linq; + +#if !NO_WEAKTABLE +using System.Runtime.CompilerServices; +#endif + +namespace System.Reactive.Concurrency +{ + class DisableOptimizationsScheduler : SchedulerWrapper + { + private readonly Type[] _optimizationInterfaces; + + public DisableOptimizationsScheduler(IScheduler scheduler) + : base(scheduler) + { + _optimizationInterfaces = Scheduler.OPTIMIZATIONS; + } + + public DisableOptimizationsScheduler(IScheduler scheduler, Type[] optimizationInterfaces) + : base(scheduler) + { + _optimizationInterfaces = optimizationInterfaces; + } + +#if !NO_WEAKTABLE + public DisableOptimizationsScheduler(IScheduler scheduler, Type[] optimizationInterfaces, ConditionalWeakTable<IScheduler, IScheduler> cache) + : base(scheduler, cache) + { + _optimizationInterfaces = optimizationInterfaces; + } + + protected override SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache) + { + return new DisableOptimizationsScheduler(scheduler, _optimizationInterfaces, cache); + } +#else + protected override SchedulerWrapper Clone(IScheduler scheduler) + { + return new DisableOptimizationsScheduler(scheduler, _optimizationInterfaces); + } +#endif + + protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service) + { + service = null; + return _optimizationInterfaces.Contains(serviceType); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ImmediateScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ImmediateScheduler.cs new file mode 100644 index 0000000..017fce5 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ImmediateScheduler.cs @@ -0,0 +1,123 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; +using System.Reactive.Disposables; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work to run immediately on the current thread. + /// </summary> + /// <seealso cref="Scheduler.Immediate">Singleton instance of this type exposed through this static property.</seealso> + public sealed class ImmediateScheduler : LocalScheduler + { + private static readonly ImmediateScheduler s_instance = new ImmediateScheduler(); + + ImmediateScheduler() + { + } + + /// <summary> + /// Gets the singleton instance of the immediate scheduler. + /// </summary> + public static ImmediateScheduler Instance + { + get { return s_instance; } + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return action(new AsyncLockScheduler(), state); + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks > 0) + { + ConcurrencyAbstractionLayer.Current.Sleep(dt); + } + + return action(new AsyncLockScheduler(), state); + } + + class AsyncLockScheduler : LocalScheduler + { + AsyncLock asyncLock; + + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var m = new SingleAssignmentDisposable(); + + if (asyncLock == null) + asyncLock = new AsyncLock(); + + asyncLock.Wait(() => + { + if (!m.IsDisposed) + m.Disposable = action(this, state); + }); + + return m; + } + + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + if (dueTime.Ticks <= 0) + return Schedule<TState>(state, action); + + var timer = ConcurrencyAbstractionLayer.Current.StartStopwatch(); + + var m = new SingleAssignmentDisposable(); + + if (asyncLock == null) + asyncLock = new AsyncLock(); + + asyncLock.Wait(() => + { + if (!m.IsDisposed) + { + var sleep = dueTime - timer.Elapsed; + if (sleep.Ticks > 0) + { + ConcurrencyAbstractionLayer.Current.Sleep(sleep); + } + if (!m.IsDisposed) + m.Disposable = action(this, state); + } + }); + + return m; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs new file mode 100644 index 0000000..ded2b28 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs @@ -0,0 +1,487 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Reactive.PlatformServices; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + public partial class LocalScheduler + { + /// <summary> + /// Gate to protect queues and to synchronize scheduling decisions and system clock + /// change management. + /// </summary> + private static readonly object s_gate = new object(); + + /// <summary> + /// Long term work queue. Contains work that's due beyond SHORTTERM, computed at the + /// time of enqueueing. + /// </summary> + private static readonly PriorityQueue<WorkItem/*!*/> s_longTerm = new PriorityQueue<WorkItem/*!*/>(); + + /// <summary> + /// Disposable resource for the long term timer that will reevaluate and dispatch the + /// first item in the long term queue. A serial disposable is used to make "dispose + /// current and assign new" logic easier. The disposable itself is never disposed. + /// </summary> + private static readonly SerialDisposable s_nextLongTermTimer = new SerialDisposable(); + + /// <summary> + /// Item at the head of the long term queue for which the current long term timer is + /// running. Used to detect changes in the queue and decide whether we should replace + /// or can continue using the current timer (because no earlier long term work was + /// added to the queue). + /// </summary> + private static WorkItem s_nextLongTermWorkItem = null; + + /// <summary> + /// Short term work queue. Contains work that's due soon, computed at the time of + /// enqueueing or upon reevaluation of the long term queue causing migration of work + /// items. This queue is kept in order to be able to relocate short term items back + /// to the long term queue in case a system clock change occurs. + /// </summary> + private static readonly PriorityQueue<WorkItem/*!*/> s_shortTerm = new PriorityQueue<WorkItem/*!*/>(); + + /// <summary> + /// Set of disposable handles to all of the current short term work Schedule calls, + /// allowing those to be cancelled upon a system clock change. + /// </summary> +#if !NO_HASHSET + private static readonly HashSet<IDisposable> s_shortTermWork = new HashSet<IDisposable>(); +#else + private static readonly Dictionary<IDisposable, object> s_shortTermWork = new Dictionary<IDisposable, object>(); +#endif + + /// <summary> + /// Threshold where an item is considered to be short term work or gets moved from + /// long term to short term. + /// </summary> + private static readonly TimeSpan SHORTTERM = TimeSpan.FromSeconds(10); + + /// <summary> + /// Maximum error ratio for timer drift. We've seen machines with 10s drift on a + /// daily basis, which is in the order 10E-4, so we allow for extra margin here. + /// This value is used to calculate early arrival for the long term queue timer + /// that will reevaluate work for the short term queue. + /// + /// Example: -------------------------------...---------------------*-----$ + /// ^ ^ + /// | | + /// early due + /// 0.999 1.0 + /// + /// We also make the gap between early and due at least LONGTOSHORT so we have + /// enough time to transition work to short term and as a courtesy to the + /// destination scheduler to manage its queues etc. + /// </summary> + private const int MAXERRORRATIO = 1000; + + /// <summary> + /// Minimum threshold for the long term timer to fire before the queue is reevaluated + /// for short term work. This value is chosen to be less than SHORTTERM in order to + /// ensure the timer fires and has work to transition to the short term queue. + /// </summary> + private static readonly TimeSpan LONGTOSHORT = TimeSpan.FromSeconds(5); + + /// <summary> + /// Threshold used to determine when a short term timer has fired too early compared + /// to the absolute due time. This provides a last chance protection against early + /// completion of scheduled work, which can happen in case of time adjustment in the + /// operating system (cf. GetSystemTimeAdjustment). + /// </summary> + private static readonly TimeSpan RETRYSHORT = TimeSpan.FromMilliseconds(50); + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")] + static LocalScheduler() + { + // + // Hook up for system clock change notifications. This doesn't do anything until the + // AddRef method is called (which can throw). + // + SystemClock.SystemClockChanged += SystemClockChanged; + } + + /// <summary> + /// Enqueues absolute time scheduled work in the timer queue or the short term work list. + /// </summary> + /// <param name="scheduler">Scheduler to run the work on. Typically "this" from the caller's perspective (LocalScheduler.Schedule), but parameter kept because we have a single (static) timer queue across all of Rx local schedulers.</param> + /// <param name="state">State to pass to the action.</param> + /// <param name="dueTime">Absolute time to run the work on. The timer queue is responsible to execute the work close to the specified time, also accounting for system clock changes.</param> + /// <param name="action">Action to run, potentially recursing into the scheduler.</param> + /// <returns>Disposable object to prevent the work from running.</returns> + private static IDisposable Enqueue<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) + { + // + // Work that's due in the past is sent to the underlying scheduler through the Schedule + // overload for execution at TimeSpan.Zero. We don't go to the overload for immediate + // scheduling in order to: + // + // - Preserve the time-based nature of the call as surfaced to the underlying scheduler, + // as it may use different queuing strategies. + // + // - Optimize for the default behavior of LocalScheduler where a virtual call to Schedule + // for immediate execution calls into the abstract Schedule method with TimeSpan.Zero. + // + var due = Scheduler.Normalize(dueTime - scheduler.Now); + if (due == TimeSpan.Zero) + { + return scheduler.Schedule<TState>(state, TimeSpan.Zero, action); + } + + // + // We're going down the path of queueing up work or scheduling it, so we need to make + // sure we can get system clock change notifications. If not, the call below is expected + // to throw NotSupportedException. WorkItem.Invoke decreases the ref count again to allow + // the system clock monitor to stop if there's no work left. Notice work items always + // reach an execution stage since we don't dequeue items but merely mark them as cancelled + // through WorkItem.Dispose. Double execution is also prevented, so the ref count should + // correctly balance out. + // + SystemClock.AddRef(); + + var workItem = new WorkItem<TState>(scheduler, state, dueTime, action); + + if (due <= SHORTTERM) + { + ScheduleShortTermWork(workItem); + } + else + { + ScheduleLongTermWork(workItem); + } + + return workItem; + } + + /// <summary> + /// Schedule work that's due in the short term. This leads to relative scheduling calls to the + /// underlying scheduler for short TimeSpan values. If the system clock changes in the meantime, + /// the short term work is attempted to be cancelled and reevaluated. + /// </summary> + /// <param name="item">Work item to schedule in the short term. The caller is responsible to determine the work is indeed short term.</param> + private static void ScheduleShortTermWork(WorkItem/*!*/ item) + { + lock (s_gate) + { + s_shortTerm.Enqueue(item); + + // + // We don't bother trying to dequeue the item or stop the timer upon cancellation, + // but always let the timer fire to do the queue maintenance. When the item is + // cancelled, it won't run (see WorkItem.Invoke). In the event of a system clock + // change, all outstanding work in s_shortTermWork is cancelled and the short + // term queue is reevaluated, potentially prompting rescheduling of short term + // work. Notice work is protected against double execution by the implementation + // of WorkItem.Invoke. + // + var d = new SingleAssignmentDisposable(); +#if !NO_HASHSET + s_shortTermWork.Add(d); +#else + s_shortTermWork.Add(d, null); +#endif + + // + // We normalize the time delta again (possibly redundant), because we can't assume + // the underlying scheduler implementations is valid and deals with negative values + // (though it should). + // + var dueTime = Scheduler.Normalize(item.DueTime - item.Scheduler.Now); + d.Disposable = item.Scheduler.Schedule(d, dueTime, ExecuteNextShortTermWorkItem); + } + } + + /// <summary> + /// Callback to process the next short term work item. + /// </summary> + /// <param name="scheduler">Recursive scheduler supplied by the underlying scheduler.</param> + /// <param name="cancel">Disposable used to identify the work the timer was triggered for (see code for usage).</param> + /// <returns>Empty disposable. Recursive work cancellation is wired through the original WorkItem.</returns> + private static IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel) + { + var next = default(WorkItem); + + lock (s_gate) + { + // + // Notice that even though we try to cancel all work in the short term queue upon a + // system clock change, cancellation may not be honored immediately and there's a + // small chance this code runs for work that has been cancelled. Because the handler + // doesn't execute the work that triggered the time-based Schedule call, but always + // runs the work from the short term queue in order, we need to make sure we're not + // stealing items in the queue. We can do so by remembering the object identity of + // the disposable and check whether it still exists in the short term work list. If + // not, a system clock change handler has gotten rid of it as part of reevaluating + // the short term queue, but we still ended up here because the inherent race in the + // call to Dispose versus the underlying timer. It's also possible the underlying + // scheduler does a bad job at cancellation, so this measure helps for that too. + // + if (s_shortTermWork.Remove(cancel) && s_shortTerm.Count > 0) + { + next = s_shortTerm.Dequeue(); + } + } + + if (next != null) + { + // + // If things don't make sense and we're way too early to run the work, this is our + // final chance to prevent us from running before the due time. This situation can + // arise when Windows applies system clock adjustment (see SetSystemTimeAdjustment) + // and as a result the clock is ticking slower. If the clock is ticking faster due + // to such an adjustment, too bad :-). We try to minimize the window for the final + // relative time based scheduling such that 10%+ adjustments to the clock rate + // have only "little" impact (range of 100s of ms). On an absolute time scale, we + // don't provide stronger guarantees. + // + if (next.DueTime - next.Scheduler.Now >= RETRYSHORT) + { + ScheduleShortTermWork(next); + } + else + { + // + // Invocation happens on the recursive scheduler supplied to the function. We + // are already running on the target scheduler, so we should stay on board. + // Not doing so would have unexpected behavior for e.g. NewThreadScheduler, + // causing a whole new thread to be allocated because of a top-level call to + // the Schedule method rather than a recursive one. + // + // Notice if work got cancelled, the call to Invoke will not propagate to user + // code because of the IsDisposed check inside. + // + next.Invoke(scheduler); + } + } + + // + // No need to return anything better here. We already handed out the original WorkItem + // object upon the call to Enqueue (called itself by Schedule). The disposable inside + // the work item allows a cancellation request to chase the underlying computation. + // + return Disposable.Empty; + } + + /// <summary> + /// Schedule work that's due on the long term. This leads to the work being queued up for + /// eventual transitioning to the short term work list. + /// </summary> + /// <param name="item">Work item to schedule on the long term. The caller is responsible to determine the work is indeed long term.</param> + private static void ScheduleLongTermWork(WorkItem/*!*/ item) + { + lock (s_gate) + { + s_longTerm.Enqueue(item); + + // + // In case we're the first long-term item in the queue now, the timer will have + // to be updated. + // + UpdateLongTermProcessingTimer(); + } + } + + /// <summary> + /// Updates the long term timer which is responsible to transition work from the head of the + /// long term queue to the short term work list. + /// </summary> + /// <remarks>Should be called under the scheduler lock.</remarks> + private static void UpdateLongTermProcessingTimer() + { + /* + * CALLERS - Ensure this is called under the lock! + * + lock (s_gate) */ + { + if (s_longTerm.Count == 0) + return; + + // + // To avoid setting the timer all over again for the first work item if it hasn't changed, + // we keep track of the next long term work item that will be processed by the timer. + // + var next = s_longTerm.Peek(); + if (next == s_nextLongTermWorkItem) + return; + + // + // We need to arrive early in order to accommodate for potential drift. The relative amount + // of drift correction is kept in MAXERRORRATIO. At the very least, we want to be LONGTOSHORT + // early to make the final jump from long term to short term, giving the target scheduler + // enough time to process the item through its queue. LONGTOSHORT is chosen such that the + // error due to drift is negligible. + // + var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now); + var remainder = TimeSpan.FromTicks(Math.Max(due.Ticks / MAXERRORRATIO, LONGTOSHORT.Ticks)); + var dueEarly = due - remainder; + + s_nextLongTermWorkItem = next; + s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueEarly); + } + } + + /// <summary> + /// Evaluates the long term queue, transitioning short term work to the short term list, + /// and adjusting the new long term processing timer accordingly. + /// </summary> + /// <param name="state">Ignored.</param> + private static void EvaluateLongTermQueue(object state) + { + lock (s_gate) + { + var next = default(WorkItem); + + while (s_longTerm.Count > 0) + { + next = s_longTerm.Peek(); + + var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now); + if (due >= SHORTTERM) + break; + + var item = s_longTerm.Dequeue(); + ScheduleShortTermWork(item); + } + + s_nextLongTermWorkItem = null; + UpdateLongTermProcessingTimer(); + } + } + + /// <summary> + /// Callback invoked when a system clock change is observed in order to adjust and reevaluate + /// the internal scheduling queues. + /// </summary> + /// <param name="args">Currently not used.</param> + /// <param name="sender">Currently not used.</param> + private static void SystemClockChanged(object sender, SystemClockChangedEventArgs args) + { + lock (s_gate) + { + // + // Best-effort cancellation of short term work. A check for presence in the hash set + // is used to notice race conditions between cancellation and the timer firing (also + // guarded by the same gate object). See checks in ExecuteNextShortTermWorkItem. + // +#if !NO_HASHSET + foreach (var d in s_shortTermWork) +#else + foreach (var d in s_shortTermWork.Keys) +#endif + d.Dispose(); + + s_shortTermWork.Clear(); + + // + // Transition short term work to the long term queue for reevaluation by calling the + // EvaluateLongTermQueue method. We don't know which direction the clock was changed + // in, so we don't optimize for special cases, but always transition the whole queue. + // Notice the short term queue is bounded to SHORTTERM length. + // + while (s_shortTerm.Count > 0) + { + var next = s_shortTerm.Dequeue(); + s_longTerm.Enqueue(next); + } + + // + // Reevaluate the queue and don't forget to null out the current timer to force the + // method to create a new timer for the new first long term item. + // + s_nextLongTermWorkItem = null; + EvaluateLongTermQueue(null); + } + } + + /// <summary> + /// Represents a work item in the absolute time scheduler. + /// </summary> + /// <remarks> + /// This type is very similar to ScheduledItem, but we need a different Invoke signature to allow customization + /// of the target scheduler (e.g. when called in a recursive scheduling context, see ExecuteNextShortTermWorkItem). + /// </remarks> + abstract class WorkItem : IComparable<WorkItem>, IDisposable + { + private readonly IScheduler _scheduler; + private readonly DateTimeOffset _dueTime; + private readonly SingleAssignmentDisposable _disposable; + private int _hasRun; + + public WorkItem(IScheduler scheduler, DateTimeOffset dueTime) + { + _scheduler = scheduler; + _dueTime = dueTime; + _disposable = new SingleAssignmentDisposable(); + _hasRun = 0; + } + + public IScheduler Scheduler + { + get { return _scheduler; } + } + + public DateTimeOffset DueTime + { + get { return _dueTime; } + } + + public void Invoke(IScheduler scheduler) + { + // + // Protect against possible maltreatment of the scheduler queues or races in + // execution of a work item that got relocated across system clock changes. + // Under no circumstance whatsoever we should run work twice. The monitor's + // ref count should also be subject to this policy. + // + if (Interlocked.Exchange(ref _hasRun, 1) == 0) + { + try + { + if (!_disposable.IsDisposed) + _disposable.Disposable = InvokeCore(scheduler); + } + finally + { + SystemClock.Release(); + } + } + } + + protected abstract IDisposable InvokeCore(IScheduler scheduler); + + public int CompareTo(WorkItem/*!*/ other) + { + return Comparer<DateTimeOffset>.Default.Compare(this._dueTime, other._dueTime); + } + + public void Dispose() + { + _disposable.Dispose(); + } + } + + /// <summary> + /// Represents a work item that closes over scheduler invocation state. Subtyping is + /// used to have a common type for the scheduler queues. + /// </summary> + sealed class WorkItem<TState> : WorkItem + { + private readonly TState _state; + private readonly Func<IScheduler, TState, IDisposable> _action; + + public WorkItem(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) + : base(scheduler, dueTime) + { + _state = state; + _action = action; + } + + protected override IDisposable InvokeCore(IScheduler scheduler) + { + return _action(scheduler, _state); + } + } + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.cs new file mode 100644 index 0000000..b71367a --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.cs @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Abstract base class for machine-local schedulers, using the local system clock for time-based operations. + /// </summary> + public abstract partial class LocalScheduler : IScheduler, IStopwatchProvider, IServiceProvider + { + /// <summary> + /// Gets the scheduler's notion of current time. + /// </summary> + public virtual DateTimeOffset Now + { + get { return Scheduler.Now; } + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public virtual IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return Schedule(state, TimeSpan.Zero, action); + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + public abstract IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action); + + /// <summary> + /// Schedules an action to be executed at dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public virtual IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return Enqueue(this, state, dueTime, action); + } + + /// <summary> + /// Starts a new stopwatch object. + /// </summary> + /// <returns>New stopwatch object; started at the time of the request.</returns> + /// <remarks> + /// Platform-specific scheduler implementations should reimplement IStopwatchProvider to provide a more + /// efficient IStopwatch implementation (if available). + /// </remarks> + public virtual IStopwatch StartStopwatch() + { + return ConcurrencyAbstractionLayer.Current.StartStopwatch(); + } + + object IServiceProvider.GetService(Type serviceType) + { + return GetService(serviceType); + } + + /// <summary> + /// Discovers scheduler services by interface type. The base class implementation returns + /// requested services for each scheduler interface implemented by the derived class. For + /// more control over service discovery, derived types can override this method. + /// </summary> + /// <param name="serviceType">Scheduler service interface type to discover.</param> + /// <returns>Object implementing the requested service, if available; null otherwise.</returns> + protected virtual object GetService(Type serviceType) + { +#if !NO_PERF + if (serviceType == typeof(IStopwatchProvider)) + return this as IStopwatchProvider; + else if (serviceType == typeof(ISchedulerLongRunning)) + return this as ISchedulerLongRunning; + else if (serviceType == typeof(ISchedulerPeriodic)) + return this as ISchedulerPeriodic; +#endif + + return null; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ScheduledItem.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ScheduledItem.cs new file mode 100644 index 0000000..d3ea2f1 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ScheduledItem.cs @@ -0,0 +1,244 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Disposables; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Abstract base class for scheduled work items. + /// </summary> + /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam> + public abstract class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, IComparable<ScheduledItem<TAbsolute>> + where TAbsolute : IComparable<TAbsolute> + { + private readonly SingleAssignmentDisposable _disposable = new SingleAssignmentDisposable(); + private readonly TAbsolute _dueTime; + private readonly IComparer<TAbsolute> _comparer; + + /// <summary> + /// Creates a new scheduled work item to run at the specified time. + /// </summary> + /// <param name="dueTime">Absolute time at which the work item has to be executed.</param> + /// <param name="comparer">Comparer used to compare work items based on their scheduled time.</param> + /// <exception cref="ArgumentNullException"><paramref name="comparer"/> is null.</exception> + protected ScheduledItem(TAbsolute dueTime, IComparer<TAbsolute> comparer) + { + if (comparer == null) + throw new ArgumentNullException("comparer"); + + _dueTime = dueTime; + _comparer = comparer; + } + + /// <summary> + /// Gets the absolute time at which the item is due for invocation. + /// </summary> + public TAbsolute DueTime + { + get { return _dueTime; } + } + + /// <summary> + /// Invokes the work item. + /// </summary> + public void Invoke() + { + if (!_disposable.IsDisposed) + _disposable.Disposable = InvokeCore(); + } + + /// <summary> + /// Implement this method to perform the work item invocation, returning a disposable object for deep cancellation. + /// </summary> + /// <returns>Disposable object used to cancel the work item and/or derived work items.</returns> + protected abstract IDisposable InvokeCore(); + + #region Inequality + + /// <summary> + /// Compares the work item with another work item based on absolute time values. + /// </summary> + /// <param name="other">Work item to compare the current work item to.</param> + /// <returns>Relative ordering between this and the specified work item.</returns> + /// <remarks>The inequality operators are overloaded to provide results consistent with the IComparable implementation. Equality operators implement traditional reference equality semantics.</remarks> + public int CompareTo(ScheduledItem<TAbsolute> other) + { + // MSDN: By definition, any object compares greater than null, and two null references compare equal to each other. + if (object.ReferenceEquals(other, null)) + return 1; + + return _comparer.Compare(DueTime, other.DueTime); + } + + /// <summary> + /// Determines whether one specified ScheduledItem<TAbsolute> object is due before a second specified ScheduledItem<TAbsolute> object. + /// </summary> + /// <param name="left">The first object to compare.</param> + /// <param name="right">The second object to compare.</param> + /// <returns>true if the DueTime value of left is earlier than the DueTime value of right; otherwise, false.</returns> + /// <remarks>This operator provides results consistent with the IComparable implementation.</remarks> + public static bool operator <(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) + { + return Comparer<ScheduledItem<TAbsolute>>.Default.Compare(left, right) < 0; + } + + /// <summary> + /// Determines whether one specified ScheduledItem<TAbsolute> object is due before or at the same of a second specified ScheduledItem<TAbsolute> object. + /// </summary> + /// <param name="left">The first object to compare.</param> + /// <param name="right">The second object to compare.</param> + /// <returns>true if the DueTime value of left is earlier than or simultaneous with the DueTime value of right; otherwise, false.</returns> + /// <remarks>This operator provides results consistent with the IComparable implementation.</remarks> + public static bool operator <=(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) + { + return Comparer<ScheduledItem<TAbsolute>>.Default.Compare(left, right) <= 0; + } + + /// <summary> + /// Determines whether one specified ScheduledItem<TAbsolute> object is due after a second specified ScheduledItem<TAbsolute> object. + /// </summary> + /// <param name="left">The first object to compare.</param> + /// <param name="right">The second object to compare.</param> + /// <returns>true if the DueTime value of left is later than the DueTime value of right; otherwise, false.</returns> + /// <remarks>This operator provides results consistent with the IComparable implementation.</remarks> + public static bool operator >(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) + { + return Comparer<ScheduledItem<TAbsolute>>.Default.Compare(left, right) > 0; + } + + /// <summary> + /// Determines whether one specified ScheduledItem<TAbsolute> object is due after or at the same time of a second specified ScheduledItem<TAbsolute> object. + /// </summary> + /// <param name="left">The first object to compare.</param> + /// <param name="right">The second object to compare.</param> + /// <returns>true if the DueTime value of left is later than or simultaneous with the DueTime value of right; otherwise, false.</returns> + /// <remarks>This operator provides results consistent with the IComparable implementation.</remarks> + public static bool operator >=(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) + { + return Comparer<ScheduledItem<TAbsolute>>.Default.Compare(left, right) >= 0; + } + + #endregion + + #region Equality + + /// <summary> + /// Determines whether two specified ScheduledItem<TAbsolute, TValue> objects are equal. + /// </summary> + /// <param name="left">The first object to compare.</param> + /// <param name="right">The second object to compare.</param> + /// <returns>true if both ScheduledItem<TAbsolute, TValue> are equal; otherwise, false.</returns> + /// <remarks>This operator does not provide results consistent with the IComparable implementation. Instead, it implements reference equality.</remarks> + public static bool operator ==(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) + { + return object.ReferenceEquals(left, right); + } + + /// <summary> + /// Determines whether two specified ScheduledItem<TAbsolute, TValue> objects are inequal. + /// </summary> + /// <param name="left">The first object to compare.</param> + /// <param name="right">The second object to compare.</param> + /// <returns>true if both ScheduledItem<TAbsolute, TValue> are inequal; otherwise, false.</returns> + /// <remarks>This operator does not provide results consistent with the IComparable implementation. Instead, it implements reference equality.</remarks> + public static bool operator !=(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) + { + return !(left == right); + } + + /// <summary> + /// Determines whether a ScheduledItem<TAbsolute> object is equal to the specified object. + /// </summary> + /// <param name="obj">The object to compare to the current ScheduledItem<TAbsolute> object.</param> + /// <returns>true if the obj parameter is a ScheduledItem<TAbsolute> object and is equal to the current ScheduledItem<TAbsolute> object; otherwise, false.</returns> + public override bool Equals(object obj) + { + return object.ReferenceEquals(this, obj); + } + + /// <summary> + /// Returns the hash code for the current ScheduledItem<TAbsolute> object. + /// </summary> + /// <returns>A 32-bit signed integer hash code.</returns> + public override int GetHashCode() + { + return base.GetHashCode(); + } + + #endregion + + /// <summary> + /// Cancels the work item by disposing the resource returned by InvokeCore as soon as possible. + /// </summary> + public void Cancel() + { + _disposable.Dispose(); + } + + /// <summary> + /// Gets whether the work item has received a cancellation request. + /// </summary> + public bool IsCanceled + { + get { return _disposable.IsDisposed; } + } + } + + /// <summary> + /// Represents a scheduled work item based on the materialization of an IScheduler.Schedule method call. + /// </summary> + /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam> + /// <typeparam name="TValue">Type of the state passed to the scheduled action.</typeparam> + public sealed class ScheduledItem<TAbsolute, TValue> : ScheduledItem<TAbsolute> + where TAbsolute : IComparable<TAbsolute> + { + private readonly IScheduler _scheduler; + private readonly TValue _state; + private readonly Func<IScheduler, TValue, IDisposable> _action; + + /// <summary> + /// Creates a materialized work item. + /// </summary> + /// <param name="scheduler">Recursive scheduler to invoke the scheduled action with.</param> + /// <param name="state">State to pass to the scheduled action.</param> + /// <param name="action">Scheduled action.</param> + /// <param name="dueTime">Time at which to run the scheduled action.</param> + /// <param name="comparer">Comparer used to compare work items based on their scheduled time.</param> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> or <paramref name="comparer"/> is null.</exception> + public ScheduledItem(IScheduler scheduler, TValue state, Func<IScheduler, TValue, IDisposable> action, TAbsolute dueTime, IComparer<TAbsolute> comparer) + : base(dueTime, comparer) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + _scheduler = scheduler; + _state = state; + _action = action; + } + + /// <summary> + /// Creates a materialized work item. + /// </summary> + /// <param name="scheduler">Recursive scheduler to invoke the scheduled action with.</param> + /// <param name="state">State to pass to the scheduled action.</param> + /// <param name="action">Scheduled action.</param> + /// <param name="dueTime">Time at which to run the scheduled action.</param> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public ScheduledItem(IScheduler scheduler, TValue state, Func<IScheduler, TValue, IDisposable> action, TAbsolute dueTime) + : this(scheduler, state, action, dueTime, Comparer<TAbsolute>.Default) + { + } + + /// <summary> + /// Invokes the scheduled action with the supplied recursive scheduler and state. + /// </summary> + /// <returns>Cancellation resource returned by the scheduled action.</returns> + protected override IDisposable InvokeCore() + { + return _action(_scheduler, _state); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Async.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Async.cs new file mode 100644 index 0000000..ac881a8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Async.cs @@ -0,0 +1,432 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if HAS_AWAIT +using System.Reactive.Disposables; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Reactive.Concurrency +{ + public static partial class Scheduler + { + /// <summary> + /// Yields execution of the current work item on the scheduler to another work item on the scheduler. + /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation). + /// </summary> + /// <param name="scheduler">Scheduler to yield work on.</param> + /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static SchedulerOperation Yield(this IScheduler scheduler) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new SchedulerOperation(a => scheduler.Schedule(a), scheduler.GetCancellationToken()); + } + + /// <summary> + /// Yields execution of the current work item on the scheduler to another work item on the scheduler. + /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation). + /// </summary> + /// <param name="scheduler">Scheduler to yield work on.</param> + /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param> + /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new SchedulerOperation(a => scheduler.Schedule(a), cancellationToken); + } + + /// <summary> + /// Suspends execution of the current work item on the scheduler for the specified duration. + /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration. + /// </summary> + /// <param name="scheduler">Scheduler to yield work on.</param> + /// <param name="dueTime">Time when the continuation should run.</param> + /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken()); + } + + /// <summary> + /// Suspends execution of the current work item on the scheduler for the specified duration. + /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration. + /// </summary> + /// <param name="scheduler">Scheduler to yield work on.</param> + /// <param name="dueTime">Time when the continuation should run.</param> + /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param> + /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken); + } + + /// <summary> + /// Suspends execution of the current work item on the scheduler until the specified due time. + /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time. + /// </summary> + /// <param name="scheduler">Scheduler to yield work on.</param> + /// <param name="dueTime">Time when the continuation should run.</param> + /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken()); + } + + /// <summary> + /// Suspends execution of the current work item on the scheduler until the specified due time. + /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time. + /// </summary> + /// <param name="scheduler">Scheduler to yield work on.</param> + /// <param name="dueTime">Time when the continuation should run.</param> + /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param> + /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="state">State to pass to the asynchronous method.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_<TState>(scheduler, state, action); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="state">State to pass to the asynchronous method.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_<TState>(scheduler, state, action); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct)); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task<IDisposable>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, default(object), (self, o, ct) => action(self, ct)); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="state">State to pass to the asynchronous method.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, state, dueTime, action); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="state">State to pass to the asynchronous method.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, state, dueTime, action); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="state">State to pass to the asynchronous method.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, state, dueTime, action); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="state">State to pass to the asynchronous method.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, state, dueTime, action); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); + } + + /// <summary> + /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style. + /// </summary> + /// <param name="scheduler">Scheduler to schedule work on.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param> + /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return ScheduleAsync_(scheduler, default(object), dueTime, (self, o, ct) => action(self, ct)); + } + + private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action) + { + return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action)); + } + + private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + return scheduler.Schedule(state, (self, s) => InvokeAsync(self, s, action)); + } + + private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action) + { + return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); + } + + private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); + } + + private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action) + { + return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); + } + + private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + return scheduler.Schedule(state, dueTime, (self, s) => InvokeAsync(self, s, action)); + } + + private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action) + { + var c = new CancellationDisposable(); + var d = new SingleAssignmentDisposable(); + + action(new CancelableScheduler(self, c.Token), s, c.Token).ContinueWith(t => + { + if (t.IsCanceled) + return; + + if (t.Exception != null) + t.Exception.Handle(e => e is OperationCanceledException); + + d.Disposable = t.Result; + }, TaskContinuationOptions.ExecuteSynchronously); + + return new CompositeDisposable(c, d); + } + + private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action) + { + return InvokeAsync(self, s, (self_, state, ct) => action(self_, state, ct).ContinueWith(_ => Disposable.Empty)); + } + + private static CancellationToken GetCancellationToken(this IScheduler scheduler) + { + var cs = scheduler as CancelableScheduler; + return cs != null ? cs.Token : CancellationToken.None; + } + + class CancelableScheduler : IScheduler + { + private readonly IScheduler _scheduler; + private readonly CancellationToken _cancellationToken; + + public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken) + { + _scheduler = scheduler; + _cancellationToken = cancellationToken; + } + + public CancellationToken Token + { + get { return _cancellationToken; } + } + + public DateTimeOffset Now + { + get { return _scheduler.Now; } + } + + public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + return _scheduler.Schedule(state, action); + } + + public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + return _scheduler.Schedule(state, dueTime, action); + } + + public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) + { + return _scheduler.Schedule(state, dueTime, action); + } + } + + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Recursive.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Recursive.cs new file mode 100644 index 0000000..564b999 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Recursive.cs @@ -0,0 +1,254 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Reactive.Disposables; + +namespace System.Reactive.Concurrency +{ + public static partial class Scheduler + { + /// <summary> + /// Schedules an action to be executed recursively. + /// </summary> + /// <param name="scheduler">Scheduler to execute the recursive action on.</param> + /// <param name="action">Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule(this IScheduler scheduler, Action<Action> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(action, (_action, self) => _action(() => self(_action))); + } + + /// <summary> + /// Schedules an action to be executed recursively. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to execute the recursive action on.</param> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule<TState>(this IScheduler scheduler, TState state, Action<TState, Action<TState>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(new Pair<TState, Action<TState, Action<TState>>> { First = state, Second = action }, InvokeRec1); + } + + static IDisposable InvokeRec1<TState>(IScheduler scheduler, Pair<TState, Action<TState, Action<TState>>> pair) + { + var group = new CompositeDisposable(1); + var gate = new object(); + var state = pair.First; + var action = pair.Second; + + Action<TState> recursiveAction = null; + recursiveAction = state1 => action(state1, state2 => + { + var isAdded = false; + var isDone = false; + var d = default(IDisposable); + d = scheduler.Schedule(state2, (scheduler1, state3) => + { + lock (gate) + { + if (isAdded) + group.Remove(d); + else + isDone = true; + } + recursiveAction(state3); + return Disposable.Empty; + }); + + lock (gate) + { + if (!isDone) + { + group.Add(d); + isAdded = true; + } + } + }); + + recursiveAction(state); + + return group; + } + + /// <summary> + /// Schedules an action to be executed recursively after a specified relative due time. + /// </summary> + /// <param name="scheduler">Scheduler to execute the recursive action on.</param> + /// <param name="action">Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified relative time.</param> + /// <param name="dueTime">Relative time after which to execute the action for the first time.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action<Action<TimeSpan>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(action, dueTime, (_action, self) => _action(dt => self(_action, dt))); + } + + /// <summary> + /// Schedules an action to be executed recursively after a specified relative due time. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to execute the recursive action on.</param> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state.</param> + /// <param name="dueTime">Relative time after which to execute the action for the first time.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Action<TState, Action<TState, TimeSpan>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(new Pair<TState, Action<TState, Action<TState, TimeSpan>>> { First = state, Second = action }, dueTime, InvokeRec2); + } + + static IDisposable InvokeRec2<TState>(IScheduler scheduler, Pair<TState, Action<TState, Action<TState, TimeSpan>>> pair) + { + var group = new CompositeDisposable(1); + var gate = new object(); + var state = pair.First; + var action = pair.Second; + + Action<TState> recursiveAction = null; + recursiveAction = state1 => action(state1, (state2, dueTime1) => + { + var isAdded = false; + var isDone = false; + var d = default(IDisposable); + d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) => + { + lock (gate) + { + if (isAdded) + group.Remove(d); + else + isDone = true; + } + recursiveAction(state3); + return Disposable.Empty; + }); + + lock (gate) + { + if (!isDone) + { + group.Add(d); + isAdded = true; + } + } + }); + + recursiveAction(state); + + return group; + } + + /// <summary> + /// Schedules an action to be executed recursively at a specified absolute due time. + /// </summary> + /// <param name="scheduler">Scheduler to execute the recursive action on.</param> + /// <param name="action">Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified absolute time.</param> + /// <param name="dueTime">Absolute time at which to execute the action for the first time.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset dueTime, Action<Action<DateTimeOffset>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(action, dueTime, (_action, self) => _action(dt => self(_action, dt))); + } + + /// <summary> + /// Schedules an action to be executed recursively at a specified absolute due time. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to execute the recursive action on.</param> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state.</param> + /// <param name="dueTime">Absolute time at which to execute the action for the first time.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Action<TState, Action<TState, DateTimeOffset>> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(new Pair<TState, Action<TState, Action<TState, DateTimeOffset>>> { First = state, Second = action }, dueTime, InvokeRec3); + } + + static IDisposable InvokeRec3<TState>(IScheduler scheduler, Pair<TState, Action<TState, Action<TState, DateTimeOffset>>> pair) + { + var group = new CompositeDisposable(1); + var gate = new object(); + var state = pair.First; + var action = pair.Second; + + Action<TState> recursiveAction = null; + recursiveAction = state1 => action(state1, (state2, dueTime1) => + { + var isAdded = false; + var isDone = false; + var d = default(IDisposable); + d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) => + { + lock (gate) + { + if (isAdded) + group.Remove(d); + else + isDone = true; + } + recursiveAction(state3); + return Disposable.Empty; + }); + + lock (gate) + { + if (!isDone) + { + group.Add(d); + isAdded = true; + } + } + }); + + recursiveAction(state); + + return group; + } + +#if !NO_SERIALIZABLE + [Serializable] +#endif + struct Pair<T1, T2> + { + public T1 First; + public T2 Second; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs new file mode 100644 index 0000000..d553d89 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.Emulation.cs @@ -0,0 +1,623 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Diagnostics; +using System.Reactive.Disposables; +using System.Reactive.PlatformServices; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + public static partial class Scheduler + { + /// <summary> + /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. + /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. + /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. + /// Otherwise, the periodic task will be emulated using recursive scheduling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">The scheduler to run periodic work on.</param> + /// <param name="state">Initial state passed to the action upon the first iteration.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed, potentially updating the state.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + return SchedulePeriodic_(scheduler, state, period, action); + } + + /// <summary> + /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. + /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. + /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. + /// Otherwise, the periodic task will be emulated using recursive scheduling. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="scheduler">Scheduler to execute the action on.</param> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler, TState state, TimeSpan period, Action<TState> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + return SchedulePeriodic_(scheduler, state, period, state_ => { action(state_); return state_; }); + } + + /// <summary> + /// Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. + /// If the scheduler supports periodic scheduling, the request will be forwarded to the periodic scheduling implementation. + /// If the scheduler provides stopwatch functionality, the periodic task will be emulated using recursive scheduling with a stopwatch to correct for time slippage. + /// Otherwise, the periodic task will be emulated using recursive scheduling. + /// </summary> + /// <param name="scheduler">Scheduler to execute the action on.</param> + /// <param name="period">Period for running the work periodically.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception> + public static IDisposable SchedulePeriodic(this IScheduler scheduler, TimeSpan period, Action action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (period < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("period"); + if (action == null) + throw new ArgumentNullException("action"); + + return SchedulePeriodic_(scheduler, action, period, a => { a(); return a; }); + } + + /// <summary> + /// Starts a new stopwatch object by dynamically discovering the scheduler's capabilities. + /// If the scheduler provides stopwatch functionality, the request will be forwarded to the stopwatch provider implementation. + /// Otherwise, the stopwatch will be emulated using the scheduler's notion of absolute time. + /// </summary> + /// <param name="scheduler">Scheduler to obtain a stopwatch for.</param> + /// <returns>New stopwatch object; started at the time of the request.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + /// <remarks>The resulting stopwatch object can have non-monotonic behavior.</remarks> + public static IStopwatch StartStopwatch(this IScheduler scheduler) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + // + // All schedulers deriving from LocalScheduler will automatically pick up this + // capability based on a local stopwatch, typically using QueryPerformanceCounter + // through the System.Diagnostics.Stopwatch class. + // + // Notice virtual time schedulers do implement this facility starting from Rx v2.0, + // using subtraction of their absolute time notion to compute elapsed time values. + // This is fine because those schedulers do not allow the clock to go back in time. + // + // For schedulers that don't have a stopwatch, we have to pick some fallback logic + // here. We could either dismiss the scheduler's notion of time and go for the CAL's + // stopwatch facility, or go with a stopwatch based on "scheduler.Now", which has + // the drawback of potentially going back in time: + // + // - Using the CAL's stopwatch facility causes us to abondon the scheduler's + // potentially virtualized notion of time, always going for the local system + // time instead. + // + // - Using the scheduler's Now property for calculations can break monotonicity, + // and there's no right answer on how to deal with jumps back in time. + // + // However, even the built-in stopwatch in the BCL can potentially fall back to + // subtraction of DateTime values in case no high-resolution performance counter is + // available, causing monotonicity to break down. We're not trying to solve this + // problem there either (though we could check IsHighResolution and smoothen out + // non-monotonic points somehow), so we pick the latter option as the lesser of + // two evils (also because it should occur rarely). + // + // Users of the stopwatch retrieved by this method could detect non-sensical data + // revealing a jump back in time, or implement custom fallback logic like the one + // shown below. + // + var swp = scheduler.AsStopwatchProvider(); + if (swp != null) + return swp.StartStopwatch(); + + return new EmulatedStopwatch(scheduler); + } + + private static IDisposable SchedulePeriodic_<TState>(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) + { + // + // Design rationale: + // + // In Rx v1.x, we employed recursive scheduling for periodic tasks. The following code + // fragment shows how the Timer (and hence Interval) function used to be implemented: + // + // var p = Normalize(period); + // + // return new AnonymousObservable<long>(observer => + // { + // var d = dueTime; + // long count = 0; + // return scheduler.Schedule(d, self => + // { + // if (p > TimeSpan.Zero) + // { + // var now = scheduler.Now; + // d = d + p; + // if (d <= now) + // d = now + p; + // } + // + // observer.OnNext(count); + // count = unchecked(count + 1); + // self(d); + // }); + // }); + // + // Despite the purity of this approach, it suffered from a set of drawbacks: + // + // 1) Usage of IScheduler.Now to correct for time drift did have a positive effect for + // a limited number of scenarios, in particular when a short period was used. The + // major issues with this are: + // + // a) Relying on absolute time at the LINQ layer in Rx's layer map, causing issues + // when the system clock changes. Various customers hit this issue, reported to + // us on the MSDN forums. Basically, when the clock goes forward, the recursive + // loop wants to catch up as quickly as it can; when it goes backwards, a long + // silence will occur. (See 2 for a discussion of WP7 related fixes.) + // + // b) Even if a) would be addressed by using Rx v2.0's capabilities to monitor for + // system clock changes, the solution would violate the reasonable expectation + // of operators overloads using TimeSpan *not* relying on absolute time. + // + // c) Drift correction doesn't work for large periods when the system encounters + // systematic drift. For example, in the lab we've seen cases of drift up to + // tens of seconds on a 24 hour timeframe. Correcting for this drift by making + // a recursive call with a due time of 24 * 3600 with 10 seconds of adjustment + // won't fix systematic drift. + // + // 2) This implementation has been plagued with issues around application container + // lifecycle models, in particular Windows Phone 7's model of tombstoning and in + // particular its "dormant state". This feature was introduced in Mango to enable + // fast application switching. Essentially, the phone's OS puts the application + // in a suspended state when the user navigates "forward" (or takes an incoming + // call for instance). When the application is woken up again, threads are resumed + // and we're faced with an illusion of missed events due to the use of absolute + // time, not relative to how the application observes it. This caused nightmare + // scenarios of fast battery drain due to the flood of catch-up work. + // + // See http://msdn.microsoft.com/en-us/library/ff817008(v=vs.92).aspx for more + // information on this. + // + // 3) Recursive scheduling imposes a non-trivial cost due to the creation of many + // single-shot timers and closures. For high frequency timers, this can cause a + // lot of churn in the GC, which we like to avoid (operators shouldn't have hidden + // linear - or worse - allocation cost). + // + // Notice these drawbacks weren't limited to the use of Timer and Interval directly, + // as many operators such as Sample, Buffer, and Window used such sequences for their + // periodic behavior (typically by delegating to a more general overload). + // + // As a result, in Rx v2.0, we took the decision to improve periodic timing based on + // the following design decisions: + // + // 1) When the scheduler has the ability to run a periodic task, it should implement + // the ISchedulerPeriodic interface and expose it through the IServiceProvider + // interface. Passing the intent of the user through all layers of Rx, down to the + // underlying infrastructure provides delegation of responsibilities. This allows + // the target scheduler to optimize execution in various ways, e.g. by employing + // techniques such as timer coalescing. + // + // See http://www.bing.com/search?q=windows+timer+coalescing for information on + // techniques like timer coalescing which may be applied more aggressively in + // future OS releases in order to reduce power consumption. + // + // 2) Emulation of periodic scheduling is used to avoid breaking existing code that + // uses schedulers without this capability. We expect those fallback paths to be + // exercised rarely, though the use of DisableOptimizations can trigger them as + // well. In such cases we rely on stopwatches or a carefully crafted recursive + // scheme to deal with (or maximally compensate for) slippage or time. Behavior + // of periodic tasks is expected to be as follows: + // + // timer ticks 0-------1-------2-------3-------4-------5-------6----... + // | | | +====+ +==+ | | + // user code +~~~| +~| +~~~~~~~~~~~|+~~~~|+~~| +~~~| +~~| + // + // rather than the following scheme, where time slippage is introduced by user + // code running on the scheduler: + // + // timer ticks 0####-------1##-------2############-------3#####-----... + // | | | | + // user code +~~~| +~| +~~~~~~~~~~~| +~~~~| + // + // (Side-note: Unfortunately, we didn't reserve the name Interval for the latter + // behavior, but used it as an alias for "periodic scheduling" with + // the former behavior, delegating to the Timer implementation. One + // can simulate this behavior using Generate, which uses tail calls.) + // + // This behavior is important for operations like Sample, Buffer, and Window, all + // of which expect proper spacing of events, even if the user code takes a long + // time to complete (considered a bad practice nonetheless, cf. ObserveOn). + // + // 3) To deal with the issue of suspensions induced by application lifecycle events + // in Windows Phone and WinRT applications, we decided to hook available system + // events through IHostLifecycleNotifications, discovered through the PEP in order + // to maintain portability of the core of Rx. + // + var periodic = scheduler.AsPeriodic(); + if (periodic != null) + { + return periodic.SchedulePeriodic(state, period, action); + } + + var swp = scheduler.AsStopwatchProvider(); + if (swp != null) + { + var spr = new SchedulePeriodicStopwatch<TState>(scheduler, state, period, action, swp); + return spr.Start(); + } + else + { + var spr = new SchedulePeriodicRecursive<TState>(scheduler, state, period, action); + return spr.Start(); + } + } + + class SchedulePeriodicStopwatch<TState> + { + private readonly IScheduler _scheduler; + private readonly TimeSpan _period; + private readonly Func<TState, TState> _action; + private readonly IStopwatchProvider _stopwatchProvider; + + public SchedulePeriodicStopwatch(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action, IStopwatchProvider stopwatchProvider) + { + _scheduler = scheduler; + _period = period; + _action = action; + _stopwatchProvider = stopwatchProvider; + + _state = state; + _runState = STOPPED; + } + + private TState _state; + + private readonly object _gate = new object(); + private readonly AutoResetEvent _resumeEvent = new AutoResetEvent(false); + private volatile int _runState; + private IStopwatch _stopwatch; + private TimeSpan _nextDue; + private TimeSpan _suspendedAt; + private TimeSpan _inactiveTime; + + // + // State transition diagram: + // (c) + // +-----------<-----------+ + // / \ + // / (b) \ + // | +-->--SUSPENDED---+ + // (a) v / | + // ^----STOPPED -->-- RUNNING -->--+ v (e) + // \ | + // +-->--DISPOSED----$ + // (d) + // + // (a) Start --> call to Schedule the Tick method + // (b) Suspending event handler --> Tick gets blocked waiting for _resumeEvent + // (c) Resuming event handler --> _resumeEvent is signaled, Tick continues + // (d) Dispose returned object from Start --> scheduled work is cancelled + // (e) Dispose returned object from Start --> unblocks _resumeEvent, Tick exits + // + private const int STOPPED = 0; + private const int RUNNING = 1; + private const int SUSPENDED = 2; + private const int DISPOSED = 3; + + public IDisposable Start() + { + RegisterHostLifecycleEventHandlers(); + + _stopwatch = _stopwatchProvider.StartStopwatch(); + _nextDue = _period; + _runState = RUNNING; + + return new CompositeDisposable(2) + { + _scheduler.Schedule(_nextDue, Tick), + Disposable.Create(Cancel) + }; + } + + private void Tick(Action<TimeSpan> recurse) + { + _nextDue += _period; + _state = _action(_state); + + var next = default(TimeSpan); + + while (true) + { + var shouldWaitForResume = false; + + lock (_gate) + { + if (_runState == RUNNING) + { + // + // This is the fast path. We just let the stopwatch continue to + // run while we're suspended, but compensate for time that was + // recorded as inactive based on cumulative deltas computed in + // the suspend and resume event handlers. + // + next = Normalize(_nextDue - (_stopwatch.Elapsed - _inactiveTime)); + break; + } + else if (_runState == DISPOSED) + { + // + // In case the periodic job gets disposed but we are currently + // waiting to come back out of suspension, we should make sure + // we don't remain blocked indefinitely. Hence, we set the event + // in the Cancel method and trap this case here to bail out from + // the scheduled work gracefully. + // + return; + } + else + { + // + // This is the least common case where we got suspended and need + // to block such that future reevaluations of the next due time + // will pick up the cumulative inactive time delta. + // + Debug.Assert(_runState == SUSPENDED); + shouldWaitForResume = true; + } + } + + // + // Only happens in the SUSPENDED case; otherwise we will have broken from + // the loop or have quit the Tick method. After returning from the wait, + // we'll either be RUNNING again, quit due to a DISPOSED transition, or + // be extremely unlucky to find ourselves SUSPENDED again and be blocked + // once more. + // + if (shouldWaitForResume) + _resumeEvent.WaitOne(); + } + + recurse(next); + } + + private void Cancel() + { + UnregisterHostLifecycleEventHandlers(); + + lock (_gate) + { + _runState = DISPOSED; + + if (!Environment.HasShutdownStarted) + _resumeEvent.Set(); + } + } + + private void Suspending(object sender, HostSuspendingEventArgs args) + { + // + // The host is telling us we're about to be suspended. At this point, time + // computations will still be in a valid range (next <= _period), but after + // we're woken up again, Tick would start to go on a crucade to catch up. + // + // This has caused problems in the past, where the flood of events caused + // batteries to drain etc (see design rationale discussion higher up). + // + // In order to mitigate this problem, we force Tick to suspend before its + // next computation of the next due time. Notice we can't afford to block + // during the Suspending event handler; the host expects us to respond to + // this event quickly, such that we're not keeping the application from + // suspending promptly. + // + lock (_gate) + { + if (_runState == RUNNING) + { + _suspendedAt = _stopwatch.Elapsed; + _runState = SUSPENDED; + + if (!Environment.HasShutdownStarted) + _resumeEvent.Reset(); + } + } + } + + private void Resuming(object sender, HostResumingEventArgs args) + { + // + // The host is telling us we're being resumed. At this point, code will + // already be running in the process, so a past timer may still expire and + // cause the code in Tick to run. Two interleavings are possible now: + // + // 1) We enter the gate first, and will adjust the cumulative inactive + // time delta used for correction. The code in Tick will have the + // illusion nothing happened and find itself RUNNING when entering + // the gate, resuming activities as before. + // + // 2) The code in Tick enters the gate first, and takes notice of the + // currently SUSPENDED state. It leaves the gate, entering the wait + // state for _resumeEvent. Next, we enter to adjust the cumulative + // inactive time delta, switch to the RUNNING state and signal the + // event for Tick to carry on and recompute its next due time based + // on the new cumulative delta. + // + lock (_gate) + { + if (_runState == SUSPENDED) + { + _inactiveTime += _stopwatch.Elapsed - _suspendedAt; + _runState = RUNNING; + + if (!Environment.HasShutdownStarted) + _resumeEvent.Set(); + } + } + } + + private void RegisterHostLifecycleEventHandlers() + { + HostLifecycleService.Suspending += Suspending; + HostLifecycleService.Resuming += Resuming; + HostLifecycleService.AddRef(); + } + + private void UnregisterHostLifecycleEventHandlers() + { + HostLifecycleService.Suspending -= Suspending; + HostLifecycleService.Resuming -= Resuming; + HostLifecycleService.Release(); + } + } + + class SchedulePeriodicRecursive<TState> + { + private readonly IScheduler _scheduler; + private readonly TimeSpan _period; + private readonly Func<TState, TState> _action; + + public SchedulePeriodicRecursive(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action) + { + _scheduler = scheduler; + _period = period; + _action = action; + + _state = state; + } + + private TState _state; + private int _pendingTickCount; + private IDisposable _cancel; + + public IDisposable Start() + { + _pendingTickCount = 0; + + var d = new SingleAssignmentDisposable(); + _cancel = d; + + d.Disposable = _scheduler.Schedule(TICK, _period, Tick); + + return d; + } + + // + // The protocol using the three commands is explained in the Tick implementation below. + // + private const int TICK = 0; + private const int DISPATCH_START = 1; + private const int DISPATCH_END = 2; + + private void Tick(int command, Action<int, TimeSpan> recurse) + { + switch (command) + { + case TICK: + // + // Ticks keep going at the specified periodic rate. We do a head call such + // that no slippage is introduced because of DISPATCH_START work involving + // user code that may take arbitrarily long. + // + recurse(TICK, _period); + + // + // If we're not transitioning from 0 to 1 pending tick, another processing + // request is in flight which will see a non-zero pending tick count after + // doing the final decrement, causing it to reschedule immediately. We can + // safely bail out, delegating work to the catch-up tail calls. + // + if (Interlocked.Increment(ref _pendingTickCount) == 1) + goto case DISPATCH_START; + + break; + + case DISPATCH_START: + try + { + _state = _action(_state); + } + catch (Exception e) + { + _cancel.Dispose(); + e.Throw(); + } + + // + // This is very subtle. We can't do a goto case DISPATCH_END here because it + // wouldn't introduce interleaving of periodic ticks that are due. In order + // to have best effort behavior for schedulers that don't have concurrency, + // we yield by doing a recursive call here. Notice this doesn't heal all of + // the problem, because the TICK commands that may be dispatched before the + // scheduled DISPATCH_END will do a "recurse(TICK, period)", which is relative + // from the point of entrance. Really all we're doing here is damage control + // for the case there's no stopwatch provider which should be rare (notice + // the LocalScheduler base class always imposes a stopwatch, but it can get + // disabled using DisableOptimizations; legacy implementations of schedulers + // from the v1.x days will not have a stopwatch). + // + recurse(DISPATCH_END, TimeSpan.Zero); + + break; + + case DISPATCH_END: + // + // If work was due while we were still running user code, the count will have + // been incremented by the periodic tick handler above. In that case, we will + // reschedule ourselves for dispatching work immediately. + // + // Notice we don't run a loop here, in order to allow interleaving of work on + // the scheduler by making recursive calls. In case we would use AsyncLock to + // ensure serialized execution the owner could get stuck in such a loop, thus + // we make tail calls to play nice with the scheduler. + // + if (Interlocked.Decrement(ref _pendingTickCount) > 0) + recurse(DISPATCH_START, TimeSpan.Zero); + + break; + } + } + } + + class EmulatedStopwatch : IStopwatch + { + private readonly IScheduler _scheduler; + private readonly DateTimeOffset _start; + + public EmulatedStopwatch(IScheduler scheduler) + { + _scheduler = scheduler; + _start = _scheduler.Now; + } + + public TimeSpan Elapsed + { + get { return Scheduler.Normalize(_scheduler.Now - _start); } + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.cs new file mode 100644 index 0000000..be689b0 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Services.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive.Concurrency +{ + // + // NOTE: When adding interface-based optimizations here, ensure to add the type to the list of + // interface-based optimizations used by DisableOptimizations and the RawScheduler type. + // + public static partial class Scheduler + { + internal static Type[] OPTIMIZATIONS = new Type[] { + typeof(ISchedulerLongRunning), + typeof(IStopwatchProvider), + typeof(ISchedulerPeriodic), + /* update this list if new interface-based optimizations are added */ + }; + + /// <summary> + /// Returns the ISchedulerLongRunning implementation of the specified scheduler, or null if no such implementation is available. + /// </summary> + /// <param name="scheduler">Scheduler to get the ISchedulerLongRunning implementation for.</param> + /// <returns>The scheduler's ISchedulerLongRunning implementation if available; null otherwise.</returns> + /// <remarks> + /// This helper method is made available for query operator authors in order to discover scheduler services by using the required + /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services. + /// </remarks> + public static ISchedulerLongRunning AsLongRunning(this IScheduler scheduler) + { + var svc = scheduler as IServiceProvider; + if (svc != null) + return (ISchedulerLongRunning)svc.GetService(typeof(ISchedulerLongRunning)); + + return null; + } + + /// <summary> + /// Returns the IStopwatchProvider implementation of the specified scheduler, or null if no such implementation is available. + /// </summary> + /// <param name="scheduler">Scheduler to get the IStopwatchProvider implementation for.</param> + /// <returns>The scheduler's IStopwatchProvider implementation if available; null otherwise.</returns> + /// <remarks> + /// <para> + /// This helper method is made available for query operator authors in order to discover scheduler services by using the required + /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services. + /// </para> + /// <para> + /// Consider using <see cref="Scheduler.StartStopwatch"/> in case a stopwatch is required, but use of emulation stopwatch based + /// on the scheduler's clock is acceptable. Use of this method is recommended for best-effort use of the stopwatch provider + /// scheduler service, where the caller falls back to not using stopwatches if this facility wasn't found. + /// </para> + /// </remarks> + public static IStopwatchProvider AsStopwatchProvider(this IScheduler scheduler) + { + var svc = scheduler as IServiceProvider; + if (svc != null) + return (IStopwatchProvider)svc.GetService(typeof(IStopwatchProvider)); + + return null; + } + + /// <summary> + /// Returns the IStopwatchProvider implementation of the specified scheduler, or null if no such implementation is available. + /// </summary> + /// <param name="scheduler">Scheduler to get the IStopwatchProvider implementation for.</param> + /// <returns>The scheduler's IStopwatchProvider implementation if available; null otherwise.</returns> + /// <remarks> + /// <para> + /// This helper method is made available for query operator authors in order to discover scheduler services by using the required + /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services. + /// </para> + /// <para> + /// Consider using the Scheduler.SchedulePeriodic extension methods for IScheduler in case periodic scheduling is required and + /// emulation of periodic behavior using other scheduler services is desirable. Use of this method is recommended for best-effort + /// use of the periodic scheduling service, where the caller falls back to not using periodic scheduling if this facility wasn't + /// found. + /// </para> + /// </remarks> + public static ISchedulerPeriodic AsPeriodic(this IScheduler scheduler) + { + var svc = scheduler as IServiceProvider; + if (svc != null) + return (ISchedulerPeriodic)svc.GetService(typeof(ISchedulerPeriodic)); + + return null; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Simple.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Simple.cs new file mode 100644 index 0000000..7b91e62 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Simple.cs @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Reactive.Disposables; + +namespace System.Reactive.Concurrency +{ + public static partial class Scheduler + { + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <param name="scheduler">Scheduler to execute the action on.</param> + /// <param name="action">Action to execute.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule(this IScheduler scheduler, Action action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(action, Invoke); + } + + /// <summary> + /// Schedules an action to be executed after the specified relative due time. + /// </summary> + /// <param name="scheduler">Scheduler to execute the action on.</param> + /// <param name="action">Action to execute.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(action, dueTime, Invoke); + } + + /// <summary> + /// Schedules an action to be executed at the specified absolute due time. + /// </summary> + /// <param name="scheduler">Scheduler to execute the action on.</param> + /// <param name="action">Action to execute.</param> + /// <param name="dueTime">Absolute time at which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable Schedule(this IScheduler scheduler, DateTimeOffset dueTime, Action action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.Schedule(action, dueTime, Invoke); + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <param name="scheduler">Scheduler to execute the action on.</param> + /// <param name="action">Action to execute.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is null.</exception> + public static IDisposable ScheduleLongRunning(this ISchedulerLongRunning scheduler, Action<ICancelable> action) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (action == null) + throw new ArgumentNullException("action"); + + return scheduler.ScheduleLongRunning(action, (a, c) => a(c)); + } + + static IDisposable Invoke(IScheduler scheduler, Action action) + { + action(); + return Disposable.Empty; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Wrappers.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Wrappers.cs new file mode 100644 index 0000000..a30ff69 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.Wrappers.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive.Concurrency +{ + public static partial class Scheduler + { + /// <summary> + /// Returns a scheduler that represents the original scheduler, without any of its interface-based optimizations (e.g. long running scheduling). + /// </summary> + /// <param name="scheduler">Scheduler to disable all optimizations for.</param> + /// <returns>Proxy to the original scheduler but without any optimizations enabled.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception> + public static IScheduler DisableOptimizations(this IScheduler scheduler) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new DisableOptimizationsScheduler(scheduler); + } + + /// <summary> + /// Returns a scheduler that represents the original scheduler, without the specified set of interface-based optimizations (e.g. long running scheduling). + /// </summary> + /// <param name="scheduler">Scheduler to disable the specified optimizations for.</param> + /// <param name="optimizationInterfaces">Types of the optimization interfaces that have to be disabled.</param> + /// <returns>Proxy to the original scheduler but without the specified optimizations enabled.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="optimizationInterfaces"/> is null.</exception> + public static IScheduler DisableOptimizations(this IScheduler scheduler, params Type[] optimizationInterfaces) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (optimizationInterfaces == null) + throw new ArgumentNullException("optimizationInterfaces"); + + return new DisableOptimizationsScheduler(scheduler, optimizationInterfaces); + } + + /// <summary> + /// Returns a scheduler that wraps the original scheduler, adding exception handling for scheduled actions. + /// </summary> + /// <typeparam name="TException">Type of the exception to check for.</typeparam> + /// <param name="scheduler">Scheduler to apply an exception filter for.</param> + /// <param name="handler">Handler that's run if an exception is caught. The exception will be rethrown if the handler returns false.</param> + /// <returns>Wrapper around the original scheduler, enforcing exception handling.</returns> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="handler"/> is null.</exception> + public static IScheduler Catch<TException>(this IScheduler scheduler, Func<TException, bool> handler) + where TException : Exception + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (handler == null) + throw new ArgumentNullException("handler"); + + return new CatchScheduler<TException>(scheduler, handler); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.cs new file mode 100644 index 0000000..2b4a5ac --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Scheduler.cs @@ -0,0 +1,142 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Reactive.Disposables; +using System.Reactive.PlatformServices; +using System.Globalization; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Provides a set of static properties to access commonly used schedulers. + /// </summary> + public static partial class Scheduler + { + // TODO - Review whether this is too eager. + // Make first use of Scheduler trigger access to and initialization of the CAL. + private static DefaultScheduler s_default = DefaultScheduler.Instance; + + /// <summary> + /// Gets the current time according to the local machine's system clock. + /// </summary> + public static DateTimeOffset Now + { + get + { + return SystemClock.UtcNow; + } + } + + /// <summary> + /// Normalizes the specified TimeSpan value to a positive value. + /// </summary> + /// <param name="timeSpan">The TimeSpan value to normalize.</param> + /// <returns>The specified TimeSpan value if it is zero or positive; otherwise, TimeSpan.Zero.</returns> + public static TimeSpan Normalize(TimeSpan timeSpan) + { + if (timeSpan.Ticks < 0) + return TimeSpan.Zero; + return timeSpan; + } + + /// <summary> + /// Gets a scheduler that schedules work immediately on the current thread. + /// </summary> + public static ImmediateScheduler Immediate + { + get + { + return ImmediateScheduler.Instance; + } + } + + /// <summary> + /// Gets a scheduler that schedules work as soon as possible on the current thread. + /// </summary> + public static CurrentThreadScheduler CurrentThread + { + get + { + return CurrentThreadScheduler.Instance; + } + } + + /// <summary> + /// Gets a scheduler that schedules work on the platform's default scheduler. + /// </summary> + public static DefaultScheduler Default + { + get + { + return s_default; + } + } + + + // + // Notice we include all of the scheduler properties below, unconditionally. In Rx v2.0 + // beta and RC, we limited this a la carte menu to reflect the platform's capabilities. + // However, this caused different builds for Windows 8, .NET 4.5, and Portable Library + // to be required. In the RTM timeframe, we opted for unifying all of this based on a + // single Portable Library build of the core set of assemblies. As such, we're presented + // with a choice of either locking down those properties to the intersection, or keeping + // compatibility for those who upgrade from.NET 4.0 to .NET 4.5. We chose the latter, so + // we need to keep properties like NewThread here, even though they'll be obsolete from + // day 0 of Rx v2.0 (including our Portable Library story). Also, the NewThread one will + // be non-functional for Windows 8, causing a runtime exception to be thrown. + // + + + private static Lazy<IScheduler> s_threadPool = new Lazy<IScheduler>(() => Initialize("ThreadPool")); + + /// <summary> + /// Gets a scheduler that schedules work on the thread pool. + /// </summary> + [Obsolete(Constants_Core.OBSOLETE_SCHEDULER_THREADPOOL)] + public static IScheduler ThreadPool + { + get + { + return s_threadPool.Value; + } + } + + private static Lazy<IScheduler> s_newThread = new Lazy<IScheduler>(() => Initialize("NewThread")); + + /// <summary> + /// Gets a scheduler that schedules work on a new thread using default thread creation options. + /// </summary> + [Obsolete(Constants_Core.OBSOLETE_SCHEDULER_NEWTHREAD)] + public static IScheduler NewThread + { + get + { + return s_newThread.Value; + } + } + +#if !NO_TPL + private static Lazy<IScheduler> s_taskPool = new Lazy<IScheduler>(() => Initialize("TaskPool")); + + /// <summary> + /// Gets a scheduler that schedules work on Task Parallel Library (TPL) task pool using the default TaskScheduler. + /// </summary> + [Obsolete(Constants_Core.OBSOLETE_SCHEDULER_TASKPOOL)] + public static IScheduler TaskPool + { + get + { + return s_taskPool.Value; + } + } +#endif + + private static IScheduler Initialize(string name) + { + var res = PlatformEnlightenmentProvider.Current.GetService<IScheduler>(name); + if (res == null) + throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Strings_Core.CANT_OBTAIN_SCHEDULER, name)); + return res; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerDefaults.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerDefaults.cs new file mode 100644 index 0000000..ecbe361 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerDefaults.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive.Concurrency +{ + internal static class SchedulerDefaults + { + internal static IScheduler ConstantTimeOperations { get { return ImmediateScheduler.Instance; } } + internal static IScheduler TailRecursion { get { return ImmediateScheduler.Instance; } } + internal static IScheduler Iteration { get { return CurrentThreadScheduler.Instance; } } + internal static IScheduler TimeBasedOperations { get { return DefaultScheduler.Instance; } } + internal static IScheduler AsyncConversions { get { return DefaultScheduler.Instance; } } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerOperation.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerOperation.cs new file mode 100644 index 0000000..955b34a --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerOperation.cs @@ -0,0 +1,159 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if HAS_AWAIT +using System.ComponentModel; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an awaitable scheduler operation. Awaiting the object causes the continuation to be posted back to the originating scheduler's work queue. + /// </summary> + public sealed class SchedulerOperation + { + private readonly Func<Action, IDisposable> _schedule; + private readonly CancellationToken _cancellationToken; + private readonly bool _postBackToOriginalContext; + + internal SchedulerOperation(Func<Action, IDisposable> schedule, CancellationToken cancellationToken) + : this(schedule, cancellationToken, false) + { + } + + internal SchedulerOperation(Func<Action, IDisposable> schedule, CancellationToken cancellationToken, bool postBackToOriginalContext) + { + _schedule = schedule; + _cancellationToken = cancellationToken; + _postBackToOriginalContext = postBackToOriginalContext; + } + + /// <summary> + /// Controls whether the continuation is run on the originating synchronization context (false by default). + /// </summary> + /// <param name="continueOnCapturedContext">true to run the continuation on the captured synchronization context; false otherwise (default).</param> + /// <returns>Scheduler operation object with configured await behavior.</returns> + public SchedulerOperation ConfigureAwait(bool continueOnCapturedContext) + { + return new SchedulerOperation(_schedule, _cancellationToken, continueOnCapturedContext); + } + + /// <summary> + /// Gets an awaiter for the scheduler operation, used to post back the continuation. + /// </summary> + /// <returns>Awaiter for the scheduler operation.</returns> + public SchedulerOperationAwaiter GetAwaiter() + { + return new SchedulerOperationAwaiter(_schedule, _cancellationToken, _postBackToOriginalContext); + } + } + + /// <summary> + /// (Infrastructure) Scheduler operation awaiter type used by the code generated for C# await and Visual Basic Await expressions. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public sealed class SchedulerOperationAwaiter + : INotifyCompletion + { + private readonly Func<Action, IDisposable> _schedule; + private readonly CancellationToken _cancellationToken; + private readonly bool _postBackToOriginalContext; + private readonly CancellationTokenRegistration _ctr; + + internal SchedulerOperationAwaiter(Func<Action, IDisposable> schedule, CancellationToken cancellationToken, bool postBackToOriginalContext) + { + _schedule = schedule; + _cancellationToken = cancellationToken; + _postBackToOriginalContext = postBackToOriginalContext; + + if (cancellationToken.CanBeCanceled) + { + _ctr = _cancellationToken.Register(Cancel); + } + } + + /// <summary> + /// Indicates whether the scheduler operation has completed. Returns false unless cancellation was already requested. + /// </summary> + public bool IsCompleted + { + get { return _cancellationToken.IsCancellationRequested; } + } + + /// <summary> + /// Completes the scheduler operation, throwing an OperationCanceledException in case cancellation was requested. + /// </summary> + public void GetResult() + { + _cancellationToken.ThrowIfCancellationRequested(); + } + + /// <summary> + /// Registers the continuation with the scheduler operation. + /// </summary> + /// <param name="continuation">Continuation to be run on the originating scheduler.</param> + public void OnCompleted(Action continuation) + { + if (continuation == null) + throw new ArgumentNullException("continuation"); + + if (_continuation != null) + throw new InvalidOperationException(Strings_Core.SCHEDULER_OPERATION_ALREADY_AWAITED); + + if (_postBackToOriginalContext) + { + var ctx = SynchronizationContext.Current; + if (ctx != null) + { + var original = continuation; + continuation = () => + { + // + // No need for OperationStarted and OperationCompleted calls here; + // this code is invoked through await support and will have a way + // to observe its start/complete behavior, either through returned + // Task objects or the async method builder's interaction with the + // SynchronizationContext object. + // + // In general though, Rx doesn't play nicely with synchronization + // contexts objects at the scheduler level. It's possible to start + // async operations by calling Schedule, without a way to observe + // their completion. Not interacting with SynchronizationContext + // is a concious design decision as the performance impact was non + // negligable and our schedulers abstract over more constructs. + // + ctx.Post(a => ((Action)a)(), original); + }; + } + } + + var ran = 0; + + _continuation = () => + { + if (Interlocked.Exchange(ref ran, 1) == 0) + { + _ctr.Dispose(); // no null-check needed (struct) + continuation(); + } + }; + + _work = _schedule(_continuation); + } + + private volatile Action _continuation; + private volatile IDisposable _work; + + private void Cancel() + { + var w = _work; + if (w != null) + w.Dispose(); + + var c = _continuation; + if (c != null) + c(); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerQueue.cs new file mode 100644 index 0000000..2cd7beb --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerQueue.cs @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Efficient scheduler queue that maintains scheduled items sorted by absolute time. + /// </summary> + /// <typeparam name="TAbsolute">Absolute time representation type.</typeparam> + /// <remarks>This type is not thread safe; users should ensure proper synchronization.</remarks> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix", Justification = "But it *is* a queue!")] + public class SchedulerQueue<TAbsolute> + where TAbsolute : IComparable<TAbsolute> + { + private readonly PriorityQueue<ScheduledItem<TAbsolute>> _queue; + + /// <summary> + /// Creates a new scheduler queue with a default initial capacity. + /// </summary> + public SchedulerQueue() + : this(1024) + { + } + + /// <summary> + /// Creats a new scheduler queue with the specified initial capacity. + /// </summary> + /// <param name="capacity">Initial capacity of the scheduler queue.</param> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than zero.</exception> + public SchedulerQueue(int capacity) + { + if (capacity < 0) + throw new ArgumentOutOfRangeException("capacity"); + + _queue = new PriorityQueue<ScheduledItem<TAbsolute>>(capacity); + } + + /// <summary> + /// Gets the number of scheduled items in the scheduler queue. + /// </summary> + public int Count + { + get + { + return _queue.Count; + } + } + + /// <summary> + /// Enqueues the specified work item to be scheduled. + /// </summary> + /// <param name="scheduledItem">Work item to be scheduled.</param> + public void Enqueue(ScheduledItem<TAbsolute> scheduledItem) + { + _queue.Enqueue(scheduledItem); + } + + /// <summary> + /// Removes the specified work item from the scheduler queue. + /// </summary> + /// <param name="scheduledItem">Work item to be removed from the scheduler queue.</param> + /// <returns>true if the item was found; false otherwise.</returns> + public bool Remove(ScheduledItem<TAbsolute> scheduledItem) + { + return _queue.Remove(scheduledItem); + } + + /// <summary> + /// Dequeues the next work item from the scheduler queue. + /// </summary> + /// <returns>Next work item in the scheduler queue (removed).</returns> + public ScheduledItem<TAbsolute> Dequeue() + { + return _queue.Dequeue(); + } + + /// <summary> + /// Peeks the next work item in the scheduler queue. + /// </summary> + /// <returns>Next work item in the scheduler queue (not removed).</returns> + public ScheduledItem<TAbsolute> Peek() + { + return _queue.Peek(); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerWrapper.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerWrapper.cs new file mode 100644 index 0000000..15ff59d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SchedulerWrapper.cs @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +#if !NO_WEAKTABLE +using System.Runtime.CompilerServices; +#endif + +namespace System.Reactive.Concurrency +{ + internal abstract class SchedulerWrapper : IScheduler, IServiceProvider + { + protected readonly IScheduler _scheduler; + + public SchedulerWrapper(IScheduler scheduler) + { + _scheduler = scheduler; + +#if !NO_WEAKTABLE + _cache = new ConditionalWeakTable<IScheduler, IScheduler>(); +#endif + } + + public DateTimeOffset Now + { + get { return _scheduler.Now; } + } + + public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return _scheduler.Schedule(state, Wrap(action)); + } + + public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return _scheduler.Schedule(state, dueTime, Wrap(action)); + } + + public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + return _scheduler.Schedule(state, dueTime, Wrap(action)); + } + + protected virtual Func<IScheduler, TState, IDisposable> Wrap<TState>(Func<IScheduler, TState, IDisposable> action) + { + return (self, state) => action(GetRecursiveWrapper(self), state); + } + +#if !NO_WEAKTABLE + private readonly ConditionalWeakTable<IScheduler, IScheduler> _cache; + + public SchedulerWrapper(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache) + { + _scheduler = scheduler; + _cache = cache; + } + + protected IScheduler GetRecursiveWrapper(IScheduler scheduler) + { + return _cache.GetValue(scheduler, s => Clone(s, _cache)); + } + + protected abstract SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache); +#else + private readonly object _gate = new object(); + private IScheduler _recursiveOriginal; + private IScheduler _recursiveWrapper; + + protected IScheduler GetRecursiveWrapper(IScheduler scheduler) + { + var recursiveWrapper = default(IScheduler); + + lock (_gate) + { + // + // Chances are the recursive scheduler will remain the same. In practice, this + // single-shot caching scheme works out quite well. Notice we propagate our + // mini-cache to recursive raw scheduler wrappers too. + // + if (!object.ReferenceEquals(scheduler, _recursiveOriginal)) + { + _recursiveOriginal = scheduler; + + var wrapper = Clone(scheduler); + wrapper._recursiveOriginal = scheduler; + wrapper._recursiveWrapper = wrapper; + + _recursiveWrapper = wrapper; + } + + recursiveWrapper = _recursiveWrapper; + } + + return recursiveWrapper; + } + + protected abstract SchedulerWrapper Clone(IScheduler scheduler); +#endif + + public object GetService(Type serviceType) + { + var serviceProvider = _scheduler as IServiceProvider; + if (serviceProvider == null) + return null; + + var result = default(object); + if (TryGetService(serviceProvider, serviceType, out result)) + return result; + + return serviceProvider.GetService(serviceType); + } + + protected abstract bool TryGetService(IServiceProvider provider, Type serviceType, out object service); + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Stopwatch.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Stopwatch.Default.cs new file mode 100644 index 0000000..028f0ae --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Stopwatch.Default.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_STOPWATCH +using System.Diagnostics; + +namespace System.Reactive.Concurrency +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class DefaultStopwatch/*Impl*/ : IStopwatch + { + private readonly Stopwatch _sw; + + public DefaultStopwatch() + { + _sw = Stopwatch.StartNew(); + } + + public TimeSpan Elapsed + { + get { return _sw.Elapsed; } + } + } +} +#else +namespace System.Reactive.Concurrency +{ + // This class is only used on Silverlight in the browser. It mimicks !Stopwatch.HighResolution behavior and suffers from + // use of absolute time. See work item 486045. + internal class DefaultStopwatch : IStopwatch + { + private readonly DateTime _start; + + public DefaultStopwatch() + { + _start = DateTime.UtcNow; + } + + public TimeSpan Elapsed + { + get { return DateTime.UtcNow - _start; } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs new file mode 100644 index 0000000..7fd80d0 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + class ObserveOn<TSource> : Producer<TSource> + { + private readonly IObservable<TSource> _source; + private readonly IScheduler _scheduler; + + public ObserveOn(IObservable<TSource> source, IScheduler scheduler) + { + _source = source; + _scheduler = scheduler; + } + +#if !NO_SYNCCTX + private readonly SynchronizationContext _context; + + public ObserveOn(IObservable<TSource> source, SynchronizationContext context) + { + _source = source; + _context = context; + } +#endif + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")] + protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) + { +#if !NO_SYNCCTX + if (_context != null) + { + var sink = new ς(this, observer, cancel); + setSink(sink); + return sink.Run(); + } + else +#endif + { + var sink = new ObserveOnObserver<TSource>(_scheduler, observer, cancel); + setSink(sink); + return _source.SubscribeSafe(sink); + } + } + +#if !NO_SYNCCTX + class ς : Sink<TSource>, IObserver<TSource> + { + private readonly ObserveOn<TSource> _parent; + + public ς(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + public IDisposable Run() + { + // + // The interactions with OperationStarted/OperationCompleted below allow + // for test frameworks to wait until a whole sequence is observed, running + // asserts on a per-message level. Also, for ASP.NET pages, the use of the + // built-in synchronization context would allow processing to finished in + // its entirety before moving on with the page lifecycle. + // + _parent._context.OperationStarted(); + + var d = _parent._source.SubscribeSafe(this); + var c = Disposable.Create(() => + { + _parent._context.OperationCompleted(); + }); + + return new CompositeDisposable(d, c); + } + + public void OnNext(TSource value) + { + _parent._context.Post(OnNextPosted, value); + } + + public void OnError(Exception error) + { + _parent._context.Post(OnErrorPosted, error); + } + + public void OnCompleted() + { + _parent._context.Post(OnCompletedPosted, null); + } + + private void OnNextPosted(object value) + { + base._observer.OnNext((TSource)value); + } + + private void OnErrorPosted(object error) + { + base._observer.OnError((Exception)error); + base.Dispose(); + } + + private void OnCompletedPosted(object ignored) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } +#endif + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.Synchronize.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.Synchronize.cs new file mode 100644 index 0000000..85cd0ec --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.Synchronize.cs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System; + +namespace System.Reactive.Concurrency +{ + class Synchronize<TSource> : Producer<TSource> + { + private readonly IObservable<TSource> _source; + private readonly object _gate; + + public Synchronize(IObservable<TSource> source, object gate) + { + _source = source; + _gate = gate; + } + + public Synchronize(IObservable<TSource> source) + { + _source = source; + } + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")] + protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) + { + var sink = new _(this, observer, cancel); + setSink(sink); + return _source.SubscribeSafe(sink); + } + + class _ : Sink<TSource>, IObserver<TSource> + { + private readonly Synchronize<TSource> _parent; + private readonly object _gate; + + public _(Synchronize<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + _gate = _parent._gate ?? new object(); + } + + public void OnNext(TSource value) + { + lock (_gate) + { + base._observer.OnNext(value); + } + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.cs new file mode 100644 index 0000000..d923912 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.cs @@ -0,0 +1,210 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.ComponentModel; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Provides basic synchronization and scheduling services for observable sequences. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Advanced)] + public static class Synchronization + { + #region SubscribeOn + + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler. + /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler) + { + if (source == null) + throw new ArgumentNullException("source"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new AnonymousObservable<TSource>(observer => + { + var m = new SingleAssignmentDisposable(); + var d = new SerialDisposable(); + d.Disposable = m; + + m.Disposable = scheduler.Schedule(() => + { + d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer)); + }); + + return d; + }); + } + +#if !NO_SYNCCTX + /// <summary> + /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param> + /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception> + /// <remarks> + /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context. + /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>. + /// </remarks> + public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context) + { + if (source == null) + throw new ArgumentNullException("source"); + if (context == null) + throw new ArgumentNullException("context"); + + return new AnonymousObservable<TSource>(observer => + { + var subscription = new SingleAssignmentDisposable(); + context.PostWithStartComplete(() => + { + if (!subscription.IsDisposed) + subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer)); + }); + return subscription; + }); + } +#endif + + #endregion + + #region ObserveOn + + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="scheduler">Scheduler to notify observers on.</param> + /// <returns>The source sequence whose observations happen on the specified scheduler.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler) + { + if (source == null) + throw new ArgumentNullException("source"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + +#if !NO_PERF + return new ObserveOn<TSource>(source, scheduler); +#else + return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null))); +#endif + } + +#if !NO_SYNCCTX + /// <summary> + /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="context">Synchronization context to notify observers on.</param> + /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception> + public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context) + { + if (source == null) + throw new ArgumentNullException("source"); + if (context == null) + throw new ArgumentNullException("context"); + +#if !NO_PERF + return new ObserveOn<TSource>(source, context); +#else + return new AnonymousObservable<TSource>(observer => + { + context.OperationStarted(); + + return source.Subscribe( + x => context.Post(_ => + { + observer.OnNext(x); + }, null), + exception => context.Post(_ => + { + observer.OnError(exception); + }, null), + () => context.Post(_ => + { + observer.OnCompleted(); + }, null) + ).Finally(() => + { + context.OperationCompleted(); + }); + }); +#endif + } +#endif + + #endregion + + #region Synchronize + + /// <summary> + /// Wraps the source sequence in order to ensure observer callbacks are properly serialized. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> + public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source) + { + if (source == null) + throw new ArgumentNullException("source"); + +#if !NO_PERF + return new Synchronize<TSource>(source); +#else + return new AnonymousObservable<TSource>(observer => + { + var gate = new object(); + return source.Subscribe(Observer.Synchronize(observer, gate)); + }); +#endif + } + + /// <summary> + /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object. + /// </summary> + /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> + /// <param name="source">Source sequence.</param> + /// <param name="gate">Gate object to synchronize each observer call on.</param> + /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns> + /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception> + public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate) + { + if (source == null) + throw new ArgumentNullException("source"); + if (gate == null) + throw new ArgumentNullException("gate"); + +#if !NO_PERF + return new Synchronize<TSource>(source, gate); +#else + return new AnonymousObservable<TSource>(observer => + { + return source.Subscribe(Observer.Synchronize(observer, gate)); + }); +#endif + } + + #endregion + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SynchronizationContextScheduler.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SynchronizationContextScheduler.cs new file mode 100644 index 0000000..a7a7de3 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/SynchronizationContextScheduler.cs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_SYNCCTX +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + /// <summary> + /// Represents an object that schedules units of work on a provided <seealso cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + public class SynchronizationContextScheduler : LocalScheduler + { + private readonly SynchronizationContext _context; + private readonly bool _alwaysPost; + + /// <summary> + /// Creates an object that schedules units of work on the provided <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + /// <param name="context">Synchronization context to schedule units of work on.</param> + /// <exception cref="ArgumentNullException"><paramref name="context"/> is null.</exception> + public SynchronizationContextScheduler(SynchronizationContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + _context = context; + _alwaysPost = true; + } + + /// <summary> + /// Creates an object that schedules units of work on the provided <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + /// <param name="context">Synchronization context to schedule units of work on.</param> + /// <param name="alwaysPost">Configures whether scheduling always posts to the synchronization context, regardless whether the caller is on the same synchronization context.</param> + /// <exception cref="ArgumentNullException"><paramref name="context"/> is null.</exception> + public SynchronizationContextScheduler(SynchronizationContext context, bool alwaysPost) + { + if (context == null) + throw new ArgumentNullException("context"); + + _context = context; + _alwaysPost = alwaysPost; + } + + /// <summary> + /// Schedules an action to be executed. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var d = new SingleAssignmentDisposable(); + + if (!_alwaysPost && _context == SynchronizationContext.Current) + { + d.Disposable = action(this, state); + } + else + { + _context.PostWithStartComplete(() => + { + if (!d.IsDisposed) + d.Disposable = action(this, state); + }); + } + + return d; + } + + /// <summary> + /// Schedules an action to be executed after dueTime. + /// </summary> + /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam> + /// <param name="state">State passed to the action to be executed.</param> + /// <param name="action">Action to be executed.</param> + /// <param name="dueTime">Relative time after which to execute the action.</param> + /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns> + /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception> + public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) + { + if (action == null) + throw new ArgumentNullException("action"); + + var dt = Scheduler.Normalize(dueTime); + if (dt.Ticks == 0) + return Schedule(state, action); + + return DefaultScheduler.Instance.Schedule(state, dt, (_, state1) => Schedule(state1, action)); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/AnonymousDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/AnonymousDisposable.cs new file mode 100644 index 0000000..f31bbf4 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/AnonymousDisposable.cs @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents an Action-based disposable. + /// </summary> + internal sealed class AnonymousDisposable : ICancelable + { + private volatile Action _dispose; + + /// <summary> + /// Constructs a new disposable with the given action used for disposal. + /// </summary> + /// <param name="dispose">Disposal action which will be run upon calling Dispose.</param> + public AnonymousDisposable(Action dispose) + { + System.Diagnostics.Debug.Assert(dispose != null); + + _dispose = dispose; + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _dispose == null; } + } + + /// <summary> + /// Calls the disposal action if and only if the current instance hasn't been disposed yet. + /// </summary> + public void Dispose() + { +#pragma warning disable 0420 + var dispose = Interlocked.Exchange(ref _dispose, null); +#pragma warning restore 0420 + if (dispose != null) + { + dispose(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/BooleanDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/BooleanDisposable.cs new file mode 100644 index 0000000..1f1a21f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/BooleanDisposable.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource that can be checked for disposal status. + /// </summary> + public sealed class BooleanDisposable : ICancelable + { + // Keep internal! This is used as sentinel in other IDisposable implementations to detect disposal and + // should never be exposed to user code in order to prevent users from swapping in the sentinel. Have + // a look at the code in e.g. SingleAssignmentDisposable for usage patterns. + internal static readonly BooleanDisposable True = new BooleanDisposable(true); + + private volatile bool _isDisposed; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.BooleanDisposable"/> class. + /// </summary> + public BooleanDisposable() + { + } + + private BooleanDisposable(bool isDisposed) + { + _isDisposed = isDisposed; + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _isDisposed; } + } + + /// <summary> + /// Sets the status to disposed, which can be observer through the <see cref="IsDisposed"/> property. + /// </summary> + public void Dispose() + { + _isDisposed = true; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/CancellationDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/CancellationDisposable.cs new file mode 100644 index 0000000..5c67511 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/CancellationDisposable.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_TPL +using System.Threading; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource that has an associated <seealso cref="T:System.Threading.CancellationToken"/> that will be set to the cancellation requested state upon disposal. + /// </summary> + public sealed class CancellationDisposable : ICancelable + { + private readonly CancellationTokenSource _cts; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.CancellationDisposable"/> class that uses an existing <seealso cref="T:System.Threading.CancellationTokenSource"/>. + /// </summary> + /// <param name="cts"><seealso cref="T:System.Threading.CancellationTokenSource"/> used for cancellation.</param> + /// <exception cref="ArgumentNullException"><paramref name="cts"/> is null.</exception> + public CancellationDisposable(CancellationTokenSource cts) + { + if (cts == null) + throw new ArgumentNullException("cts"); + + _cts = cts; + } + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.CancellationDisposable"/> class that uses a new <seealso cref="T:System.Threading.CancellationTokenSource"/>. + /// </summary> + public CancellationDisposable() + : this(new CancellationTokenSource()) + { + } + + /// <summary> + /// Gets the <see cref="T:System.Threading.CancellationToken"/> used by this CancellationDisposable. + /// </summary> + public CancellationToken Token + { + get { return _cts.Token; } + } + + /// <summary> + /// Cancels the underlying <seealso cref="T:System.Threading.CancellationTokenSource"/>. + /// </summary> + public void Dispose() + { + _cts.Cancel(); + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _cts.IsCancellationRequested; } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/CompositeDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/CompositeDisposable.cs new file mode 100644 index 0000000..6416cff --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/CompositeDisposable.cs @@ -0,0 +1,276 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Linq; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a group of disposable resources that are disposed together. + /// </summary> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix", Justification = "Backward compat + ideally want to get rid of the ICollection nature of the type.")] + public sealed class CompositeDisposable : ICollection<IDisposable>, ICancelable + { + private readonly object _gate = new object(); + + private bool _disposed; + private List<IDisposable> _disposables; + private int _count; + private const int SHRINK_THRESHOLD = 64; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.CompositeDisposable"/> class with no disposables contained by it initially. + /// </summary> + public CompositeDisposable() + { + _disposables = new List<IDisposable>(); + } + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.CompositeDisposable"/> class with the specified number of disposables. + /// </summary> + /// <param name="capacity">The number of disposables that the new CompositeDisposable can initially store.</param> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than zero.</exception> + public CompositeDisposable(int capacity) + { + if (capacity < 0) + throw new ArgumentOutOfRangeException("capacity"); + + _disposables = new List<IDisposable>(capacity); + } + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.CompositeDisposable"/> class from a group of disposables. + /// </summary> + /// <param name="disposables">Disposables that will be disposed together.</param> + /// <exception cref="ArgumentNullException"><paramref name="disposables"/> is null.</exception> + public CompositeDisposable(params IDisposable[] disposables) + { + if (disposables == null) + throw new ArgumentNullException("disposables"); + + _disposables = new List<IDisposable>(disposables); + _count = _disposables.Count; + } + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.CompositeDisposable"/> class from a group of disposables. + /// </summary> + /// <param name="disposables">Disposables that will be disposed together.</param> + /// <exception cref="ArgumentNullException"><paramref name="disposables"/> is null.</exception> + public CompositeDisposable(IEnumerable<IDisposable> disposables) + { + if (disposables == null) + throw new ArgumentNullException("disposables"); + + _disposables = new List<IDisposable>(disposables); + _count = _disposables.Count; + } + + /// <summary> + /// Gets the number of disposables contained in the CompositeDisposable. + /// </summary> + public int Count + { + get + { + return _count; + } + } + + /// <summary> + /// Adds a disposable to the CompositeDisposable or disposes the disposable if the CompositeDisposable is disposed. + /// </summary> + /// <param name="item">Disposable to add.</param> + /// <exception cref="ArgumentNullException"><paramref name="item"/> is null.</exception> + public void Add(IDisposable item) + { + if (item == null) + throw new ArgumentNullException("item"); + + var shouldDispose = false; + lock (_gate) + { + shouldDispose = _disposed; + if (!_disposed) + { + _disposables.Add(item); + _count++; + } + } + if (shouldDispose) + item.Dispose(); + } + + /// <summary> + /// Removes and disposes the first occurrence of a disposable from the CompositeDisposable. + /// </summary> + /// <param name="item">Disposable to remove.</param> + /// <returns>true if found; false otherwise.</returns> + /// <exception cref="ArgumentNullException"><paramref name="item"/> is null.</exception> + public bool Remove(IDisposable item) + { + if (item == null) + throw new ArgumentNullException("item"); + + var shouldDispose = false; + + lock (_gate) + { + if (!_disposed) + { + // + // List<T> doesn't shrink the size of the underlying array but does collapse the array + // by copying the tail one position to the left of the removal index. We don't need + // index-based lookup but only ordering for sequential disposal. So, instead of spending + // cycles on the Array.Copy imposed by Remove, we use a null sentinel value. We also + // do manual Swiss cheese detection to shrink the list if there's a lot of holes in it. + // + var i = _disposables.IndexOf(item); + if (i >= 0) + { + shouldDispose = true; + _disposables[i] = null; + _count--; + + if (_disposables.Capacity > SHRINK_THRESHOLD && _count < _disposables.Capacity / 2) + { + var old = _disposables; + _disposables = new List<IDisposable>(_disposables.Capacity / 2); + + foreach (var d in old) + if (d != null) + _disposables.Add(d); + } + } + } + } + + if (shouldDispose) + item.Dispose(); + + return shouldDispose; + } + + /// <summary> + /// Disposes all disposables in the group and removes them from the group. + /// </summary> + public void Dispose() + { + var currentDisposables = default(IDisposable[]); + lock (_gate) + { + if (!_disposed) + { + _disposed = true; + currentDisposables = _disposables.ToArray(); + _disposables.Clear(); + _count = 0; + } + } + + if (currentDisposables != null) + { + foreach (var d in currentDisposables) + if (d != null) + d.Dispose(); + } + } + + /// <summary> + /// Removes and disposes all disposables from the CompositeDisposable, but does not dispose the CompositeDisposable. + /// </summary> + public void Clear() + { + var currentDisposables = default(IDisposable[]); + lock (_gate) + { + currentDisposables = _disposables.ToArray(); + _disposables.Clear(); + _count = 0; + } + + foreach (var d in currentDisposables) + if (d != null) + d.Dispose(); + } + + /// <summary> + /// Determines whether the CompositeDisposable contains a specific disposable. + /// </summary> + /// <param name="item">Disposable to search for.</param> + /// <returns>true if the disposable was found; otherwise, false.</returns> + /// <exception cref="ArgumentNullException"><paramref name="item"/> is null.</exception> + public bool Contains(IDisposable item) + { + if (item == null) + throw new ArgumentNullException("item"); + + lock (_gate) + { + return _disposables.Contains(item); + } + } + + /// <summary> + /// Copies the disposables contained in the CompositeDisposable to an array, starting at a particular array index. + /// </summary> + /// <param name="array">Array to copy the contained disposables to.</param> + /// <param name="arrayIndex">Target index at which to copy the first disposable of the group.</param> + /// <exception cref="ArgumentNullException"><paramref name="array"/> is null.</exception> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="arrayIndex"/> is less than zero. -or - <paramref name="arrayIndex"/> is larger than or equal to the array length.</exception> + public void CopyTo(IDisposable[] array, int arrayIndex) + { + if (array == null) + throw new ArgumentNullException("array"); + if (arrayIndex < 0 || arrayIndex >= array.Length) + throw new ArgumentOutOfRangeException("arrayIndex"); + + lock (_gate) + { + Array.Copy(_disposables.Where(d => d != null).ToArray(), 0, array, arrayIndex, array.Length - arrayIndex); + } + } + + /// <summary> + /// Always returns false. + /// </summary> + public bool IsReadOnly + { + get { return false; } + } + + /// <summary> + /// Returns an enumerator that iterates through the CompositeDisposable. + /// </summary> + /// <returns>An enumerator to iterate over the disposables.</returns> + public IEnumerator<IDisposable> GetEnumerator() + { + var res = default(IEnumerable<IDisposable>); + + lock (_gate) + { + res = _disposables.Where(d => d != null).ToList(); + } + + return res.GetEnumerator(); + } + + /// <summary> + /// Returns an enumerator that iterates through the CompositeDisposable. + /// </summary> + /// <returns>An enumerator to iterate over the disposables.</returns> + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _disposed; } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/ContextDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/ContextDisposable.cs new file mode 100644 index 0000000..ebe3479 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/ContextDisposable.cs @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_SYNCCTX +using System.Reactive.Concurrency; +using System.Threading; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource whose disposal invocation will be posted to the specified <seealso cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + public sealed class ContextDisposable : ICancelable + { + private readonly SynchronizationContext _context; + private volatile IDisposable _disposable; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.ContextDisposable"/> class that uses the specified <see cref="T:System.Threading.SynchronizationContext"/> on which to dispose the specified disposable resource. + /// </summary> + /// <param name="context">Context to perform disposal on.</param> + /// <param name="disposable">Disposable whose Dispose operation to run on the given synchronization context.</param> + /// <exception cref="ArgumentNullException"><paramref name="context"/> or <paramref name="disposable"/> is null.</exception> + public ContextDisposable(SynchronizationContext context, IDisposable disposable) + { + if (context == null) + throw new ArgumentNullException("context"); + if (disposable == null) + throw new ArgumentNullException("disposable"); + + _context = context; + _disposable = disposable; + } + + /// <summary> + /// Gets the provided <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + public SynchronizationContext Context + { + get { return _context; } + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _disposable == BooleanDisposable.True; } + } + + /// <summary> + /// Disposes the underlying disposable on the provided <see cref="T:System.Threading.SynchronizationContext"/>. + /// </summary> + public void Dispose() + { +#pragma warning disable 0420 + var disposable = Interlocked.Exchange(ref _disposable, BooleanDisposable.True); +#pragma warning restore 0420 + + if (disposable != BooleanDisposable.True) + { + _context.PostWithStartComplete(d => d.Dispose(), disposable); + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/DefaultDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/DefaultDisposable.cs new file mode 100644 index 0000000..6f643c3 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/DefaultDisposable.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable that does nothing on disposal. + /// </summary> + internal sealed class DefaultDisposable : IDisposable + { + /// <summary> + /// Singleton default disposable. + /// </summary> + public static readonly DefaultDisposable Instance = new DefaultDisposable(); + + private DefaultDisposable() + { + } + + /// <summary> + /// Does nothing. + /// </summary> + public void Dispose() + { + // no op + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/Disposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/Disposable.cs new file mode 100644 index 0000000..fc77f3b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/Disposable.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Provides a set of static methods for creating Disposables. + /// </summary> + public static class Disposable + { + /// <summary> + /// Gets the disposable that does nothing when disposed. + /// </summary> + public static IDisposable Empty + { + get { return DefaultDisposable.Instance; } + } + + /// <summary> + /// Creates a disposable object that invokes the specified action when disposed. + /// </summary> + /// <param name="dispose">Action to run during the first call to <see cref="IDisposable.Dispose"/>. The action is guaranteed to be run at most once.</param> + /// <returns>The disposable object that runs the given action upon disposal.</returns> + /// <exception cref="ArgumentNullException"><paramref name="dispose"/> is null.</exception> + public static IDisposable Create(Action dispose) + { + if (dispose == null) + throw new ArgumentNullException("dispose"); + + return new AnonymousDisposable(dispose); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/MultipleAssignmentDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/MultipleAssignmentDisposable.cs new file mode 100644 index 0000000..f7bc2e8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/MultipleAssignmentDisposable.cs @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource whose underlying disposable resource can be swapped for another disposable resource. + /// </summary> + public sealed class MultipleAssignmentDisposable : ICancelable + { + private readonly object _gate = new object(); + private IDisposable _current; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.MultipleAssignmentDisposable"/> class with no current underlying disposable. + /// </summary> + public MultipleAssignmentDisposable() + { + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get + { + lock (_gate) + { + // We use a sentinel value to indicate we've been disposed. This sentinel never leaks + // to the outside world (see the Disposable property getter), so no-one can ever assign + // this value to us manually. + return _current == BooleanDisposable.True; + } + } + } + + /// <summary> + /// Gets or sets the underlying disposable. After disposal, the result of getting this property is undefined. + /// </summary> + /// <remarks>If the MutableDisposable has already been disposed, assignment to this property causes immediate disposal of the given disposable object.</remarks> + public IDisposable Disposable + { + get + { + lock (_gate) + { + if (_current == BooleanDisposable.True /* see IsDisposed */) + return DefaultDisposable.Instance; // Don't leak the sentinel value. + + return _current; + } + } + + set + { + var shouldDispose = false; + lock (_gate) + { + shouldDispose = (_current == BooleanDisposable.True /* see IsDisposed */); + if (!shouldDispose) + { + _current = value; + } + } + if (shouldDispose && value != null) + value.Dispose(); + } + } + + /// <summary> + /// Disposes the underlying disposable as well as all future replacements. + /// </summary> + public void Dispose() + { + var old = default(IDisposable); + + lock (_gate) + { + if (_current != BooleanDisposable.True /* see IsDisposed */) + { + old = _current; + _current = BooleanDisposable.True /* see IsDisposed */; + } + } + + if (old != null) + old.Dispose(); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/RefCountDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/RefCountDisposable.cs new file mode 100644 index 0000000..9bd0407 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/RefCountDisposable.cs @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource that only disposes its underlying disposable resource when all <see cref="GetDisposable">dependent disposable objects</see> have been disposed. + /// </summary> + public sealed class RefCountDisposable : ICancelable + { + private readonly object _gate = new object(); + private IDisposable _disposable; + private bool _isPrimaryDisposed; + private int _count; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.RefCountDisposable"/> class with the specified disposable. + /// </summary> + /// <param name="disposable">Underlying disposable.</param> + /// <exception cref="ArgumentNullException"><paramref name="disposable"/> is null.</exception> + public RefCountDisposable(IDisposable disposable) + { + if (disposable == null) + throw new ArgumentNullException("disposable"); + + _disposable = disposable; + _isPrimaryDisposed = false; + _count = 0; + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _disposable == null; } + } + + /// <summary> + /// Returns a dependent disposable that when disposed decreases the refcount on the underlying disposable. + /// </summary> + /// <returns>A dependent disposable contributing to the reference count that manages the underlying disposable's lifetime.</returns> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Backward compat + non-trivial work for a property getter.")] + public IDisposable GetDisposable() + { + lock (_gate) + { + if (_disposable == null) + { + return Disposable.Empty; + } + else + { + _count++; + return new InnerDisposable(this); + } + } + } + + /// <summary> + /// Disposes the underlying disposable only when all dependent disposables have been disposed. + /// </summary> + public void Dispose() + { + var disposable = default(IDisposable); + lock (_gate) + { + if (_disposable != null) + { + if (!_isPrimaryDisposed) + { + _isPrimaryDisposed = true; + + if (_count == 0) + { + disposable = _disposable; + _disposable = null; + } + } + } + } + + if (disposable != null) + disposable.Dispose(); + } + + private void Release() + { + var disposable = default(IDisposable); + lock (_gate) + { + if (_disposable != null) + { + _count--; + + System.Diagnostics.Debug.Assert(_count >= 0); + + if (_isPrimaryDisposed) + { + if (_count == 0) + { + disposable = _disposable; + _disposable = null; + } + } + } + } + + if (disposable != null) + disposable.Dispose(); + } + + sealed class InnerDisposable : IDisposable + { + private RefCountDisposable _parent; + + public InnerDisposable(RefCountDisposable parent) + { + _parent = parent; + } + + public void Dispose() + { + var parent = Interlocked.Exchange(ref _parent, null); + if (parent != null) + parent.Release(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/ScheduledDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/ScheduledDisposable.cs new file mode 100644 index 0000000..eb742aa --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/ScheduledDisposable.cs @@ -0,0 +1,85 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Concurrency; +using System.Threading; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource whose disposal invocation will be scheduled on the specified <seealso cref="T:System.Reactive.Concurrency.IScheduler"/>. + /// </summary> + public sealed class ScheduledDisposable : ICancelable + { + private readonly IScheduler _scheduler; + private volatile IDisposable _disposable; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.ScheduledDisposable"/> class that uses an <see cref="T:System.Reactive.Concurrency.IScheduler"/> on which to dispose the disposable. + /// </summary> + /// <param name="scheduler">Scheduler where the disposable resource will be disposed on.</param> + /// <param name="disposable">Disposable resource to dispose on the given scheduler.</param> + /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="disposable"/> is null.</exception> + public ScheduledDisposable(IScheduler scheduler, IDisposable disposable) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + if (disposable == null) + throw new ArgumentNullException("disposable"); + + _scheduler = scheduler; + _disposable = disposable; + } + + /// <summary> + /// Gets the scheduler where the disposable resource will be disposed on. + /// </summary> + public IScheduler Scheduler + { + get { return _scheduler; } + } + + /// <summary> + /// Gets the underlying disposable. After disposal, the result is undefined. + /// </summary> + public IDisposable Disposable + { + get + { + var current = _disposable; + + if (current == BooleanDisposable.True) + return DefaultDisposable.Instance; // Don't leak the sentinel value. + + return current; + } + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get { return _disposable == BooleanDisposable.True; } + } + + /// <summary> + /// Disposes the wrapped disposable on the provided scheduler. + /// </summary> + public void Dispose() + { + Scheduler.Schedule(DisposeInner); + } + + private void DisposeInner() + { +#pragma warning disable 0420 + var disposable = Interlocked.Exchange(ref _disposable, BooleanDisposable.True); +#pragma warning restore 0420 + + if (disposable != BooleanDisposable.True) + { + disposable.Dispose(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/SerialDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/SerialDisposable.cs new file mode 100644 index 0000000..835bd68 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/SerialDisposable.cs @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource whose underlying disposable resource can be replaced by another disposable resource, causing automatic disposal of the previous underlying disposable resource. + /// </summary> + public sealed class SerialDisposable : ICancelable + { + private readonly object _gate = new object(); + private IDisposable _current; + private bool _disposed; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.SerialDisposable"/> class. + /// </summary> + public SerialDisposable() + { + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get + { + lock (_gate) + { + return _disposed; + } + } + } + + /// <summary> + /// Gets or sets the underlying disposable. + /// </summary> + /// <remarks>If the SerialDisposable has already been disposed, assignment to this property causes immediate disposal of the given disposable object. Assigning this property disposes the previous disposable object.</remarks> + public IDisposable Disposable + { + get + { + return _current; + } + + set + { + var shouldDispose = false; + var old = default(IDisposable); + lock (_gate) + { + shouldDispose = _disposed; + if (!shouldDispose) + { + old = _current; + _current = value; + } + } + if (old != null) + old.Dispose(); + if (shouldDispose && value != null) + value.Dispose(); + } + } + + /// <summary> + /// Disposes the underlying disposable as well as all future replacements. + /// </summary> + public void Dispose() + { + var old = default(IDisposable); + + lock (_gate) + { + if (!_disposed) + { + _disposed = true; + old = _current; + _current = null; + } + } + + if (old != null) + old.Dispose(); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/SingleAssignmentDisposable.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/SingleAssignmentDisposable.cs new file mode 100644 index 0000000..8931c25 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Disposables/SingleAssignmentDisposable.cs @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; + +namespace System.Reactive.Disposables +{ + /// <summary> + /// Represents a disposable resource which only allows a single assignment of its underlying disposable resource. + /// If an underlying disposable resource has already been set, future attempts to set the underlying disposable resource will throw an <see cref="T:System.InvalidOperationException"/>. + /// </summary> + public sealed class SingleAssignmentDisposable : ICancelable + { + private volatile IDisposable _current; + + /// <summary> + /// Initializes a new instance of the <see cref="T:System.Reactive.Disposables.SingleAssignmentDisposable"/> class. + /// </summary> + public SingleAssignmentDisposable() + { + } + + /// <summary> + /// Gets a value that indicates whether the object is disposed. + /// </summary> + public bool IsDisposed + { + get + { + // We use a sentinel value to indicate we've been disposed. This sentinel never leaks + // to the outside world (see the Disposable property getter), so no-one can ever assign + // this value to us manually. + return _current == BooleanDisposable.True; + } + } + + /// <summary> + /// Gets or sets the underlying disposable. After disposal, the result of getting this property is undefined. + /// </summary> + /// <exception cref="InvalidOperationException">Thrown if the SingleAssignmentDisposable has already been assigned to.</exception> + public IDisposable Disposable + { + get + { + var current = _current; + + if (current == BooleanDisposable.True) + return DefaultDisposable.Instance; // Don't leak the sentinel value. + + return current; + } + + set + { +#pragma warning disable 0420 + var old = Interlocked.CompareExchange(ref _current, value, null); +#pragma warning restore 0420 + if (old == null) + return; + + if (old != BooleanDisposable.True) + throw new InvalidOperationException(Strings_Core.DISPOSABLE_ALREADY_ASSIGNED); + + if (value != null) + value.Dispose(); + } + } + + /// <summary> + /// Disposes the underlying disposable. + /// </summary> + public void Dispose() + { +#pragma warning disable 0420 + var old = Interlocked.Exchange(ref _current, BooleanDisposable.True); +#pragma warning restore 0420 + if (old != null) + old.Dispose(); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs new file mode 100644 index 0000000..a2237ae --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AsyncLockObserver.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Concurrency; + +namespace System.Reactive +{ + internal class AsyncLockObserver<T> : ObserverBase<T> + { + private readonly AsyncLock _gate; + private readonly IObserver<T> _observer; + + public AsyncLockObserver(IObserver<T> observer, AsyncLock gate) + { + _gate = gate; + _observer = observer; + } + + protected override void OnNextCore(T value) + { + _gate.Wait(() => + { + _observer.OnNext(value); + }); + } + + protected override void OnErrorCore(Exception exception) + { + _gate.Wait(() => + { + _observer.OnError(exception); + }); + } + + protected override void OnCompletedCore() + { + _gate.Wait(() => + { + _observer.OnCompleted(); + }); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs new file mode 100644 index 0000000..b4d67dc --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Disposables; + +namespace System.Reactive +{ + class AutoDetachObserver<T> : ObserverBase<T> + { + private readonly IObserver<T> observer; + private readonly SingleAssignmentDisposable m = new SingleAssignmentDisposable(); + + public AutoDetachObserver(IObserver<T> observer) + { + this.observer = observer; + } + + public IDisposable Disposable + { + set { m.Disposable = value; } + } + + protected override void OnNextCore(T value) + { + // + // Safeguarding of the pipeline against rogue observers is required for proper + // resource cleanup. Consider the following example: + // + // var xs = Observable.Interval(TimeSpan.FromSeconds(1)); + // var ys = <some random sequence>; + // var res = xs.CombineLatest(ys, (x, y) => x + y); + // + // The marble diagram of the query above looks as follows: + // + // xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---... + // | | | | | | | | | + // ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---... + // | | | | | | | | | | | | | | + // v v v v v v v v v v v v v v + // res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---... + // | + // @#& + // + // Notice the free-threaded nature of Rx, where messages on the resulting sequence + // are produced by either of the two input sequences to CombineLatest. + // + // Now assume an exception happens in the OnNext callback for the observer of res, + // at the indicated point marked with @#& above. The callback runs in the context + // of ys, so the exception will take down the scheduler thread of ys. This by + // itself is a problem (that can be mitigated by a Catch operator on IScheduler), + // but notice how the timer that produces xs is kept alive. + // + // The safe-guarding code below ensures the acquired resources are disposed when + // the user callback throws. + // + var __noError = false; + try + { + observer.OnNext(value); + __noError = true; + } + finally + { + if (!__noError) + Dispose(); + } + } + + protected override void OnErrorCore(Exception exception) + { + try + { + observer.OnError(exception); + } + finally + { + Dispose(); + } + } + + protected override void OnCompletedCore() + { + try + { + observer.OnCompleted(); + } + finally + { + Dispose(); + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + m.Dispose(); + } + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs new file mode 100644 index 0000000..ca201f4 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/CheckedObserver.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Threading; + +namespace System.Reactive +{ + internal class CheckedObserver<T> : IObserver<T> + { + private readonly IObserver<T> _observer; + private int _state; + + private const int IDLE = 0; + private const int BUSY = 1; + private const int DONE = 2; + + public CheckedObserver(IObserver<T> observer) + { + _observer = observer; + } + + public void OnNext(T value) + { + CheckAccess(); + + try + { + _observer.OnNext(value); + } + finally + { + Interlocked.Exchange(ref _state, IDLE); + } + } + + public void OnError(Exception error) + { + CheckAccess(); + + try + { + _observer.OnError(error); + } + finally + { + Interlocked.Exchange(ref _state, DONE); + } + } + + public void OnCompleted() + { + CheckAccess(); + + try + { + _observer.OnCompleted(); + } + finally + { + Interlocked.Exchange(ref _state, DONE); + } + } + + private void CheckAccess() + { + switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE)) + { + case BUSY: + throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED); + case DONE: + throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs new file mode 100644 index 0000000..8b7ec81 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs @@ -0,0 +1,576 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +/* + * WARNING: Auto-generated file (7/18/2012 4:59:53 PM) + * + * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentDictionary.cs + */ + +#if NO_CDS_COLLECTIONS + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics.CodeAnalysis; +using System.Reflection; +using System.Threading; + +namespace System.Collections.Concurrent +{ + internal class ConcurrentDictionary<TKey, TValue> + { + /* >>> Code copied from the Array class */ + + // We impose limits on maximum array lenght in each dimension to allow efficient + // implementation of advanced range check elimination in future. + // Keep in sync with vm\gcscan.cpp and HashHelpers.MaxPrimeArrayLength. + internal const int MaxArrayLength = 0X7FEFFFFF; + + /* <<< Code copied from the Array class */ + + private class Tables + { + internal readonly Node[] m_buckets; // A singly-linked list for each bucket. + internal readonly object[] m_locks; // A set of locks, each guarding a section of the table. + internal volatile int[] m_countPerLock; // The number of elements guarded by each lock. + + internal Tables(Node[] buckets, object[] locks, int[] countPerLock) + { + m_buckets = buckets; + m_locks = locks; + m_countPerLock = countPerLock; + } + } + + private volatile Tables m_tables; // Internal tables of the dictionary + private readonly IEqualityComparer<TKey> m_comparer; // Key equality comparer + private readonly bool m_growLockArray; // Whether to dynamically increase the size of the striped lock + private int m_budget; // The maximum number of elements per lock before a resize operation is triggered + + // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the + // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference + // and blocking, but also the more expensive operations that require all locks become (e.g. table + // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good + // compromise. + private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4; + + // The default capacity, i.e. the initial # of buckets. When choosing this value, we are making + // a trade-off between the size of a very small dictionary, and the number of resizes when + // constructing a large dictionary. Also, the capacity should not be divisible by a small prime. + private const int DEFAULT_CAPACITY = 31; + + // The maximum size of the striped lock that will not be exceeded when locks are automatically + // added as the dictionary grows. However, the user is allowed to exceed this limit by passing + // a concurrency level larger than MAX_LOCK_NUMBER into the constructor. + private const int MAX_LOCK_NUMBER = 1024; + + // Whether TValue is a type that can be written atomically (i.e., with no danger of torn reads) + private static readonly bool s_isValueWriteAtomic = IsValueWriteAtomic(); + + private static bool IsValueWriteAtomic() + { + Type valueType = typeof(TValue); + + // + // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without + // the risk of tearing. + // + // See http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-335.pdf + // + bool isAtomic = + (valueType.GetTypeInfo().IsClass) + || valueType == typeof(Boolean) + || valueType == typeof(Char) + || valueType == typeof(Byte) + || valueType == typeof(SByte) + || valueType == typeof(Int16) + || valueType == typeof(UInt16) + || valueType == typeof(Int32) + || valueType == typeof(UInt32) + || valueType == typeof(Single); + + if (!isAtomic && IntPtr.Size == 8) + { + isAtomic |= valueType == typeof(Double) || valueType == typeof(Int64); + } + + return isAtomic; + } + + public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { } + + internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) + { + if (concurrencyLevel < 1) + { + throw new ArgumentOutOfRangeException("concurrencyLevel"); + } + if (capacity < 0) + { + throw new ArgumentOutOfRangeException("capacity"); + } + if (comparer == null) throw new ArgumentNullException("comparer"); + + // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard + // any buckets. + if (capacity < concurrencyLevel) + { + capacity = concurrencyLevel; + } + + object[] locks = new object[concurrencyLevel]; + for (int i = 0; i < locks.Length; i++) + { + locks[i] = new object(); + } + + int[] countPerLock = new int[locks.Length]; + Node[] buckets = new Node[capacity]; + m_tables = new Tables(buckets, locks, countPerLock); + + m_comparer = comparer; + m_growLockArray = growLockArray; + m_budget = buckets.Length / locks.Length; + } + + public bool TryAdd(TKey key, TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + TValue dummy; + return TryAddInternal(key, value, false, true, out dummy); + } + + public bool TryRemove(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + return TryRemoveInternal(key, out value, false, default(TValue)); + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) + { + while (true) + { + Tables tables = m_tables; + + int bucketNo, lockNo; + GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + lock (tables.m_locks[lockNo]) + { + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + Node prev = null; + for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next) + { + if (m_comparer.Equals(curr.m_key, key)) + { + if (matchValue) + { + bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value); + if (!valuesMatch) + { + value = default(TValue); + return false; + } + } + + if (prev == null) + { + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], curr.m_next); + } + else + { + prev.m_next = curr.m_next; + } + + value = curr.m_value; + tables.m_countPerLock[lockNo]--; + return true; + } + prev = curr; + } + } + + value = default(TValue); + return false; + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + public bool TryGetValue(TKey key, out TValue value) + { + if (key == null) throw new ArgumentNullException("key"); + + int bucketNo, lockNoUnused; + + // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize. + Tables tables = m_tables; + + GetBucketAndLockNo(m_comparer.GetHashCode(key), out bucketNo, out lockNoUnused, tables.m_buckets.Length, tables.m_locks.Length); + + // We can get away w/out a lock here. + // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i]. + Node n = Volatile.Read<Node>(ref tables.m_buckets[bucketNo]); + + while (n != null) + { + if (m_comparer.Equals(n.m_key, key)) + { + value = n.m_value; + return true; + } + n = n.m_next; + } + + value = default(TValue); + return false; + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) + { + int hashcode = m_comparer.GetHashCode(key); + + while (true) + { + int bucketNo, lockNo; + + Tables tables = m_tables; + GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); + + bool resizeDesired = false; + bool lockTaken = false; + try + { + if (acquireLock) + Monitor.Enter(tables.m_locks[lockNo], ref lockTaken); + + // If the table just got resized, we may not be holding the right lock, and must retry. + // This should be a rare occurence. + if (tables != m_tables) + { + continue; + } + + // Try to find this key in the bucket + Node prev = null; + for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) + { + if (m_comparer.Equals(node.m_key, key)) + { + // The key was found in the dictionary. If updates are allowed, update the value for that key. + // We need to create a new node for the update, in order to support TValue types that cannot + // be written atomically, since lock-free reads may be happening concurrently. + if (updateIfExists) + { + if (s_isValueWriteAtomic) + { + node.m_value = value; + } + else + { + Node newNode = new Node(node.m_key, value, hashcode, node.m_next); + if (prev == null) + { + tables.m_buckets[bucketNo] = newNode; + } + else + { + prev.m_next = newNode; + } + } + resultingValue = value; + } + else + { + resultingValue = node.m_value; + } + return false; + } + prev = node; + } + + // The key was not found in the bucket. Insert the key-value pair. + Volatile.Write<Node>(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo])); + checked + { + tables.m_countPerLock[lockNo]++; + } + + // + // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. + // It is also possible that GrowTable will increase the budget but won't resize the bucket table. + // That happens if the bucket table is found to be poorly utilized due to a bad hash function. + // + if (tables.m_countPerLock[lockNo] > m_budget) + { + resizeDesired = true; + } + } + finally + { + if (lockTaken) + Monitor.Exit(tables.m_locks[lockNo]); + } + + // + // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. + // + // Concurrency notes: + // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. + // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 + // and then verify that the table we passed to it as the argument is still the current table. + // + if (resizeDesired) + { + GrowTable(tables); + } + + resultingValue = value; + return true; + } + } + + public ICollection<TValue> Values + { + get { return GetValues(); } + } + + private void GrowTable(Tables tables) + { + int locksAcquired = 0; + try + { + // The thread that first obtains m_locks[0] will be the one doing the resize operation + AcquireLocks(0, 1, ref locksAcquired); + + // Make sure nobody resized the table while we were waiting for lock 0: + if (tables != m_tables) + { + // We assume that since the table reference is different, it was already resized (or the budget + // was adjusted). If we ever decide to do table shrinking, or replace the table for other reasons, + // we will have to revisit this logic. + return; + } + + // Compute the (approx.) total size. Use an Int64 accumulation variable to avoid an overflow. + long approxCount = 0; + for (int i = 0; i < tables.m_countPerLock.Length; i++) + { + approxCount += tables.m_countPerLock[i]; + } + + // + // If the bucket array is too empty, double the budget instead of resizing the table + // + if (approxCount < tables.m_buckets.Length / 4) + { + m_budget = 2 * m_budget; + if (m_budget < 0) + { + m_budget = int.MaxValue; + } + return; + } + + + // Compute the new table size. We find the smallest integer larger than twice the previous table size, and not divisible by + // 2,3,5 or 7. We can consider a different table-sizing policy in the future. + int newLength = 0; + bool maximizeTableSize = false; + try + { + checked + { + // Double the size of the buckets table and add one, so that we have an odd integer. + newLength = tables.m_buckets.Length * 2 + 1; + + // Now, we only need to check odd integers, and find the first that is not divisible + // by 3, 5 or 7. + while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) + { + newLength += 2; + } + + if (newLength > MaxArrayLength) + { + maximizeTableSize = true; + } + } + } + catch (OverflowException) + { + maximizeTableSize = true; + } + + if (maximizeTableSize) + { + newLength = MaxArrayLength; + + // We want to make sure that GrowTable will not be called again, since table is at the maximum size. + // To achieve that, we set the budget to int.MaxValue. + // + // (There is one special case that would allow GrowTable() to be called in the future: + // calling Clear() on the ConcurrentDictionary will shrink the table and lower the budget.) + m_budget = int.MaxValue; + } + + // Now acquire all other locks for the table + AcquireLocks(1, tables.m_locks.Length, ref locksAcquired); + + object[] newLocks = tables.m_locks; + + // Add more locks + if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER) + { + newLocks = new object[tables.m_locks.Length * 2]; + Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length); + + for (int i = tables.m_locks.Length; i < newLocks.Length; i++) + { + newLocks[i] = new object(); + } + } + + Node[] newBuckets = new Node[newLength]; + int[] newCountPerLock = new int[newLocks.Length]; + + // Copy all data into a new table, creating new nodes for all elements + for (int i = 0; i < tables.m_buckets.Length; i++) + { + Node current = tables.m_buckets[i]; + while (current != null) + { + Node next = current.m_next; + int newBucketNo, newLockNo; + GetBucketAndLockNo(current.m_hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); + + newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, current.m_hashcode, newBuckets[newBucketNo]); + + checked + { + newCountPerLock[newLockNo]++; + } + + current = next; + } + } + + // Adjust the budget + m_budget = Math.Max(1, newBuckets.Length / newLocks.Length); + + // Replace tables with the new versions + m_tables = new Tables(newBuckets, newLocks, newCountPerLock); + } + finally + { + // Release all locks that we took earlier + ReleaseLocks(0, locksAcquired); + } + } + + private void GetBucketAndLockNo( + int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) + { + bucketNo = (hashcode & 0x7fffffff) % bucketCount; + lockNo = bucketNo % lockCount; + } + + private static int DefaultConcurrencyLevel + { + get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; } + } + + private void AcquireAllLocks(ref int locksAcquired) + { + // First, acquire lock 0 + AcquireLocks(0, 1, ref locksAcquired); + + // Now that we have lock 0, the m_locks array will not change (i.e., grow), + // and so we can safely read m_locks.Length. + AcquireLocks(1, m_tables.m_locks.Length, ref locksAcquired); + } + + private void AcquireLocks(int fromInclusive, int toExclusive, ref int locksAcquired) + { + object[] locks = m_tables.m_locks; + + for (int i = fromInclusive; i < toExclusive; i++) + { + bool lockTaken = false; + try + { + Monitor.Enter(locks[i], ref lockTaken); + } + finally + { + if (lockTaken) + { + locksAcquired++; + } + } + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] + private void ReleaseLocks(int fromInclusive, int toExclusive) + { + for (int i = fromInclusive; i < toExclusive; i++) + { + Monitor.Exit(m_tables.m_locks[i]); + } + } + + [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "ConcurrencyCop just doesn't know about these locks")] + private ReadOnlyCollection<TValue> GetValues() + { + int locksAcquired = 0; + try + { + AcquireAllLocks(ref locksAcquired); + List<TValue> values = new List<TValue>(); + + for (int i = 0; i < m_tables.m_buckets.Length; i++) + { + Node current = m_tables.m_buckets[i]; + while (current != null) + { + values.Add(current.m_value); + current = current.m_next; + } + } + + return new ReadOnlyCollection<TValue>(values); + } + finally + { + ReleaseLocks(0, locksAcquired); + } + } + + private class Node + { + internal TKey m_key; + internal TValue m_value; + internal volatile Node m_next; + internal int m_hashcode; + + internal Node(TKey key, TValue value, int hashcode, Node next) + { + m_key = key; + m_value = value; + m_next = next; + m_hashcode = hashcode; + } + } + } +} + +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs new file mode 100644 index 0000000..76ab088 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentQueue.cs @@ -0,0 +1,316 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +/* + * WARNING: Auto-generated file (7/18/2012 4:47:38 PM) + * + * Stripped down code based on ndp\clr\src\BCL\System\Collections\Concurrent\ConcurrentQueue.cs + */ + +#if NO_CDS_COLLECTIONS + +#pragma warning disable 0420 + +using System; +using System.Collections.Generic; +using System.Diagnostics.Contracts; +using System.Threading; + +namespace System.Collections.Concurrent +{ + internal class ConcurrentQueue<T> + { + private volatile Segment m_head; + private volatile Segment m_tail; + + private const int SEGMENT_SIZE = 32; + + public ConcurrentQueue() + { + m_head = m_tail = new Segment(0, this); + } + + public bool IsEmpty + { + get + { + Segment head = m_head; + if (!head.IsEmpty) + //fast route 1: + //if current head is not empty, then queue is not empty + return false; + else if (head.Next == null) + //fast route 2: + //if current head is empty and it's the last segment + //then queue is empty + return true; + else + //slow route: + //current head is empty and it is NOT the last segment, + //it means another thread is growing new segment + { + SpinWait spin = new SpinWait(); + while (head.IsEmpty) + { + if (head.Next == null) + return true; + + spin.SpinOnce(); + head = m_head; + } + return false; + } + } + } + + public void Enqueue(T item) + { + SpinWait spin = new SpinWait(); + while (true) + { + Segment tail = m_tail; + if (tail.TryAppend(item)) + return; + spin.SpinOnce(); + } + } + + public bool TryDequeue(out T result) + { + while (!IsEmpty) + { + Segment head = m_head; + if (head.TryRemove(out result)) + return true; + //since method IsEmpty spins, we don't need to spin in the while loop + } + result = default(T); + return false; + } + + private class Segment + { + //we define two volatile arrays: m_array and m_state. Note that the accesses to the array items + //do not get volatile treatment. But we don't need to worry about loading adjacent elements or + //store/load on adjacent elements would suffer reordering. + // - Two stores: these are at risk, but CLRv2 memory model guarantees store-release hence we are safe. + // - Two loads: because one item from two volatile arrays are accessed, the loads of the array references + // are sufficient to prevent reordering of the loads of the elements. + internal volatile T[] m_array; + + // For each entry in m_array, the corresponding entry in m_state indicates whether this position contains + // a valid value. m_state is initially all false. + internal volatile VolatileBool[] m_state; + + //pointer to the next segment. null if the current segment is the last segment + private volatile Segment m_next; + + //We use this zero based index to track how many segments have been created for the queue, and + //to compute how many active segments are there currently. + // * The number of currently active segments is : m_tail.m_index - m_head.m_index + 1; + // * m_index is incremented with every Segment.Grow operation. We use Int64 type, and we can safely + // assume that it never overflows. To overflow, we need to do 2^63 increments, even at a rate of 4 + // billion (2^32) increments per second, it takes 2^31 seconds, which is about 64 years. + internal readonly long m_index; + + //indices of where the first and last valid values + // - m_low points to the position of the next element to pop from this segment, range [0, infinity) + // m_low >= SEGMENT_SIZE implies the segment is disposable + // - m_high points to the position of the latest pushed element, range [-1, infinity) + // m_high == -1 implies the segment is new and empty + // m_high >= SEGMENT_SIZE-1 means this segment is ready to grow. + // and the thread who sets m_high to SEGMENT_SIZE-1 is responsible to grow the segment + // - Math.Min(m_low, SEGMENT_SIZE) > Math.Min(m_high, SEGMENT_SIZE-1) implies segment is empty + // - initially m_low =0 and m_high=-1; + private volatile int m_low; + private volatile int m_high; + + private volatile ConcurrentQueue<T> m_source; + + internal Segment(long index, ConcurrentQueue<T> source) + { + m_array = new T[SEGMENT_SIZE]; + m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false + m_high = -1; + Contract.Assert(index >= 0); + m_index = index; + m_source = source; + } + + internal Segment Next + { + get { return m_next; } + } + + internal bool IsEmpty + { + get { return (Low > High); } + } + + internal void UnsafeAdd(T value) + { + Contract.Assert(m_high < SEGMENT_SIZE - 1); + m_high++; + m_array[m_high] = value; + m_state[m_high].m_value = true; + } + + internal Segment UnsafeGrow() + { + Contract.Assert(m_high >= SEGMENT_SIZE - 1); + Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow + m_next = newSegment; + return newSegment; + } + + internal void Grow() + { + //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) + Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow + m_next = newSegment; + Contract.Assert(m_source.m_tail == this); + m_source.m_tail = m_next; + } + + internal bool TryAppend(T value) + { + //quickly check if m_high is already over the boundary, if so, bail out + if (m_high >= SEGMENT_SIZE - 1) + { + return false; + } + + //Now we will use a CAS to increment m_high, and store the result in newhigh. + //Depending on how many free spots left in this segment and how many threads are doing this Increment + //at this time, the returning "newhigh" can be + // 1) < SEGMENT_SIZE - 1 : we took a spot in this segment, and not the last one, just insert the value + // 2) == SEGMENT_SIZE - 1 : we took the last spot, insert the value AND grow the segment + // 3) > SEGMENT_SIZE - 1 : we failed to reserve a spot in this segment, we return false to + // Queue.Enqueue method, telling it to try again in the next segment. + + int newhigh = SEGMENT_SIZE; //initial value set to be over the boundary + + //We need do Interlocked.Increment and value/state update in a finally block to ensure that they run + //without interuption. This is to prevent anything from happening between them, and another dequeue + //thread maybe spinning forever to wait for m_state[] to be true; + try + { } + finally + { + newhigh = Interlocked.Increment(ref m_high); + if (newhigh <= SEGMENT_SIZE - 1) + { + m_array[newhigh] = value; + m_state[newhigh].m_value = true; + } + + //if this thread takes up the last slot in the segment, then this thread is responsible + //to grow a new segment. Calling Grow must be in the finally block too for reliability reason: + //if thread abort during Grow, other threads will be left busy spinning forever. + if (newhigh == SEGMENT_SIZE - 1) + { + Grow(); + } + } + + //if newhigh <= SEGMENT_SIZE-1, it means the current thread successfully takes up a spot + return newhigh <= SEGMENT_SIZE - 1; + } + + internal bool TryRemove(out T result) + { + SpinWait spin = new SpinWait(); + int lowLocal = Low, highLocal = High; + while (lowLocal <= highLocal) + { + //try to update m_low + if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) + { + //if the specified value is not available (this spot is taken by a push operation, + // but the value is not written into yet), then spin + SpinWait spinLocal = new SpinWait(); + while (!m_state[lowLocal].m_value) + { + spinLocal.SpinOnce(); + } + result = m_array[lowLocal]; + m_array[lowLocal] = default(T); //release the reference to the object. + + //if the current thread sets m_low to SEGMENT_SIZE, which means the current segment becomes + //disposable, then this thread is responsible to dispose this segment, and reset m_head + if (lowLocal + 1 >= SEGMENT_SIZE) + { + // Invariant: we only dispose the current m_head, not any other segment + // In usual situation, disposing a segment is simply seting m_head to m_head.m_next + // But there is one special case, where m_head and m_tail points to the same and ONLY + //segment of the queue: Another thread A is doing Enqueue and finds that it needs to grow, + //while the *current* thread is doing *this* Dequeue operation, and finds that it needs to + //dispose the current (and ONLY) segment. Then we need to wait till thread A finishes its + //Grow operation, this is the reason of having the following while loop + spinLocal = new SpinWait(); + while (m_next == null) + { + spinLocal.SpinOnce(); + } + Contract.Assert(m_source.m_head == this); + m_source.m_head = m_next; + } + return true; + } + else + { + //CAS failed due to contention: spin briefly and retry + spin.SpinOnce(); + lowLocal = Low; highLocal = High; + } + }//end of while + result = default(T); + return false; + } + + internal bool TryPeek(out T result) + { + result = default(T); + int lowLocal = Low; + if (lowLocal > High) + return false; + SpinWait spin = new SpinWait(); + while (!m_state[lowLocal].m_value) + { + spin.SpinOnce(); + } + result = m_array[lowLocal]; + return true; + } + + internal int Low + { + get + { + return Math.Min(m_low, SEGMENT_SIZE); + } + } + + internal int High + { + get + { + //if m_high > SEGMENT_SIZE, it means it's out of range, we should return + //SEGMENT_SIZE-1 as the logical position + return Math.Min(m_high, SEGMENT_SIZE - 1); + } + } + + } + }//end of class Segment + + struct VolatileBool + { + public VolatileBool(bool value) + { + m_value = value; + } + public volatile bool m_value; + } +} + +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs new file mode 100644 index 0000000..a43acb2 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Constants.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + // We can't make those based on the Strings_Core.resx file, because the ObsoleteAttribute needs a compile-time constant. + + class Constants_Core + { + private const string OBSOLETE_REFACTORING = "This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies."; + + public const string OBSOLETE_SCHEDULER_NEWTHREAD = OBSOLETE_REFACTORING + " Please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use NewThreadScheduler.Default to obtain an instance of this scheduler type. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + public const string OBSOLETE_SCHEDULER_TASKPOOL = OBSOLETE_REFACTORING + " Please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use TaskPoolScheduler.Default to obtain an instance of this scheduler type. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + public const string OBSOLETE_SCHEDULER_THREADPOOL = OBSOLETE_REFACTORING + " Consider using Scheduler.Default to obtain the platform's most appropriate pool-based scheduler. In order to access a specific pool-based scheduler, please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use the appropriate scheduler in the System.Reactive.Concurrency namespace. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + + public const string OBSOLETE_SCHEDULEREQUIRED = "This instance property is no longer supported. Use CurrentThreadScheduler.IsScheduleRequired instead. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs new file mode 100644 index 0000000..209bd55 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.Default.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if HAS_EDI +namespace System.Reactive.PlatformServices +{ + // + // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms. + // Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator + // behavior of Rx for PLIB when used on a more capable platform. + // + internal class DefaultExceptionServices/*Impl*/ : IExceptionServices + { + public void Rethrow(Exception exception) + { + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw(); + } + } +} +#else +namespace System.Reactive.PlatformServices +{ + internal class DefaultExceptionServices : IExceptionServices + { + public void Rethrow(Exception exception) + { + throw exception; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs new file mode 100644 index 0000000..1fceeba --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ExceptionServices.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reactive.PlatformServices; + +namespace System.Reactive +{ + internal static class ExceptionHelpers + { + private static Lazy<IExceptionServices> s_services = new Lazy<IExceptionServices>(Initialize); + + public static void Throw(this Exception exception) + { + s_services.Value.Rethrow(exception); + } + + public static void ThrowIfNotNull(this Exception exception) + { + if (exception != null) + s_services.Value.Rethrow(exception); + } + + private static IExceptionServices Initialize() + { + return PlatformEnlightenmentProvider.Current.GetService<IExceptionServices>() ?? new DefaultExceptionServices(); + } + } +} + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Services to rethrow exceptions. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IExceptionServices + { + /// <summary> + /// Rethrows the specified exception. + /// </summary> + /// <param name="exception">Exception to rethrow.</param> + void Rethrow(Exception exception); + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs new file mode 100644 index 0000000..a8f6a7a --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/HostLifecycleService.cs @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Threading; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provides access to the host's lifecycle management services. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public static class HostLifecycleService + { + private static Lazy<IHostLifecycleNotifications> s_notifications = new Lazy<IHostLifecycleNotifications>(InitializeNotifications); + + private static int _refCount; + + /// <summary> + /// Event that gets raised when the host suspends the application. + /// </summary> + public static event EventHandler<HostSuspendingEventArgs> Suspending; + + /// <summary> + /// Event that gets raised when the host resumes the application. + /// </summary> + public static event EventHandler<HostResumingEventArgs> Resuming; + + /// <summary> + /// Adds a reference to the host lifecycle manager, causing it to be sending notifications. + /// </summary> + public static void AddRef() + { + if (Interlocked.Increment(ref _refCount) == 1) + { + var notifications = s_notifications.Value; + if (notifications != null) + { + notifications.Suspending += OnSuspending; + notifications.Resuming += OnResuming; + } + } + } + + /// <summary> + /// Removes a reference to the host lifecycle manager, causing it to stop sending notifications + /// if the removed reference was the last one. + /// </summary> + public static void Release() + { + if (Interlocked.Decrement(ref _refCount) == 0) + { + var notifications = s_notifications.Value; + if (notifications != null) + { + notifications.Suspending -= OnSuspending; + notifications.Resuming -= OnResuming; + } + } + } + + private static void OnSuspending(object sender, HostSuspendingEventArgs e) + { + var suspending = Suspending; + if (suspending != null) + suspending(sender, e); + } + + private static void OnResuming(object sender, HostResumingEventArgs e) + { + var resuming = Resuming; + if (resuming != null) + resuming(sender, e); + } + + private static IHostLifecycleNotifications InitializeNotifications() + { + return PlatformEnlightenmentProvider.Current.GetService<IHostLifecycleNotifications>(); + } + } + + /// <summary> + /// (Infrastructure) Provides notifications about the host's lifecycle events. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IHostLifecycleNotifications + { + /// <summary> + /// Event that gets raised when the host suspends. + /// </summary> + event EventHandler<HostSuspendingEventArgs> Suspending; + + /// <summary> + /// Event that gets raised when the host resumes. + /// </summary> + event EventHandler<HostResumingEventArgs> Resuming; + } + + /// <summary> + /// (Infrastructure) Event arguments for host suspension events. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class HostSuspendingEventArgs : EventArgs + { + } + + /// <summary> + /// (Infrastructure) Event arguments for host resumption events. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class HostResumingEventArgs : EventArgs + { + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs new file mode 100644 index 0000000..5cc9b2d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ImmutableList.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal class ImmutableList<T> + { + T[] data; + + public ImmutableList() + { + data = new T[0]; + } + + public ImmutableList(T[] data) + { + this.data = data; + } + + public ImmutableList<T> Add(T value) + { + var newData = new T[data.Length + 1]; + Array.Copy(data, newData, data.Length); + newData[data.Length] = value; + return new ImmutableList<T>(newData); + } + + public ImmutableList<T> Remove(T value) + { + var i = IndexOf(value); + if (i < 0) + return this; + var newData = new T[data.Length - 1]; + Array.Copy(data, 0, newData, 0, i); + Array.Copy(data, i + 1, newData, i, data.Length - i - 1); + return new ImmutableList<T>(newData); + } + + public int IndexOf(T value) + { + for (var i = 0; i < data.Length; ++i) + if (data[i].Equals(value)) + return i; + return -1; + } + + public T[] Data + { + get { return data; } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs new file mode 100644 index 0000000..4094dce --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Lazy.cs @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_LAZY +#pragma warning disable 0420 + +// +// Based on ndp\clr\src\BCL\System\Lazy.cs but with LazyThreadSafetyMode.ExecutionAndPublication mode behavior hardcoded. +// + +using System.Diagnostics; +using System.Threading; +using System.Reactive; + +namespace System +{ + internal class Lazy<T> + { + class Boxed + { + internal Boxed(T value) + { + m_value = value; + } + + internal T m_value; + } + + static Func<T> ALREADY_INVOKED_SENTINEL = delegate { return default(T); }; + + private object m_boxed; + private Func<T> m_valueFactory; + private volatile object m_threadSafeObj; + + public Lazy(Func<T> valueFactory) + { + m_threadSafeObj = new object(); + m_valueFactory = valueFactory; + } + +#if !NO_DEBUGGER_ATTRIBUTES + [DebuggerBrowsable(DebuggerBrowsableState.Never)] +#endif + public T Value + { + get + { + Boxed boxed = null; + if (m_boxed != null) + { + boxed = m_boxed as Boxed; + if (boxed != null) + { + return boxed.m_value; + } + + var exc = m_boxed as Exception; + exc.Throw(); + } + + return LazyInitValue(); + } + } + + private T LazyInitValue() + { + Boxed boxed = null; + object threadSafeObj = m_threadSafeObj; + bool lockTaken = false; + try + { + if (threadSafeObj != (object)ALREADY_INVOKED_SENTINEL) + { + Monitor.Enter(threadSafeObj); + lockTaken = true; + } + + if (m_boxed == null) + { + boxed = CreateValue(); + m_boxed = boxed; + m_threadSafeObj = ALREADY_INVOKED_SENTINEL; + } + else + { + boxed = m_boxed as Boxed; + if (boxed == null) + { + var exc = m_boxed as Exception; + exc.Throw(); + } + } + } + finally + { + if (lockTaken) + Monitor.Exit(threadSafeObj); + } + + return boxed.m_value; + } + + private Boxed CreateValue() + { + Boxed boxed = null; + try + { + if (m_valueFactory == ALREADY_INVOKED_SENTINEL) + throw new InvalidOperationException(); + + Func<T> factory = m_valueFactory; + m_valueFactory = ALREADY_INVOKED_SENTINEL; + + boxed = new Boxed(factory()); + } + catch (Exception ex) + { + m_boxed = ex; + throw; + } + + return boxed; + } + } +} +#pragma warning restore 0420 +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs new file mode 100644 index 0000000..6a2e2f8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Observers.cs @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal class NopObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Instance = new NopObserver<T>(); + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + } + + internal class DoneObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Completed = new DoneObserver<T>(); + + public Exception Exception { get; set; } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + } + + internal class DisposedObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Instance = new DisposedObserver<T>(); + + public void OnCompleted() + { + throw new ObjectDisposedException(""); + } + + public void OnError(Exception error) + { + throw new ObjectDisposedException(""); + } + + public void OnNext(T value) + { + throw new ObjectDisposedException(""); + } + } + + internal class Observer<T> : IObserver<T> + { + private readonly ImmutableList<IObserver<T>> _observers; + + public Observer(ImmutableList<IObserver<T>> observers) + { + _observers = observers; + } + + public void OnCompleted() + { + foreach (var observer in _observers.Data) + observer.OnCompleted(); + } + + public void OnError(Exception error) + { + foreach (var observer in _observers.Data) + observer.OnError(error); + } + + public void OnNext(T value) + { + foreach (var observer in _observers.Data) + observer.OnNext(value); + } + + internal IObserver<T> Add(IObserver<T> observer) + { + return new Observer<T>(_observers.Add(observer)); + } + + internal IObserver<T> Remove(IObserver<T> observer) + { + var i = Array.IndexOf(_observers.Data, observer); + if (i < 0) + return this; + + if (_observers.Data.Length == 2) + { + return _observers.Data[1 - i]; + } + else + { + return new Observer<T>(_observers.Remove(observer)); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs new file mode 100644 index 0000000..f2483e8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reflection; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Interface for enlightenment providers. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IPlatformEnlightenmentProvider + { + /// <summary> + /// (Infastructure) Tries to gets the specified service. + /// </summary> + /// <typeparam name="T">Service type.</typeparam> + /// <param name="args">Optional set of arguments.</param> + /// <returns>Service instance or null if not found.</returns> + T GetService<T>(params object[] args) where T : class; + } + + /// <summary> + /// (Infrastructure) Provider for platform-specific framework enlightenments. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public static class PlatformEnlightenmentProvider + { + private static readonly object s_gate = new object(); + private static IPlatformEnlightenmentProvider s_current; + + /// <summary> + /// (Infrastructure) Gets the current enlightenment provider. If none is loaded yet, accessing this property triggers provider resolution. + /// </summary> + /// <remarks> + /// This member is used by the Rx infrastructure and not meant for public consumption or implementation. + /// </remarks> + public static IPlatformEnlightenmentProvider Current + { + get + { + if (s_current == null) + { + lock (s_gate) + { + if (s_current == null) + { + // + // TODO: Investigate whether we can simplify this logic to just use "System.Reactive.PlatformServices.PlatformEnlightenmentProvider, System.Reactive.PlatformServices". + // It turns out this doesn't quite work on Silverlight. On the other hand, in .NET Compact Framework 3.5, we mysteriously have to use that path. + // + +#if NETCF35 + var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, System.Reactive.PlatformServices"; +#else +#if CRIPPLED_REFLECTION + var ifType = typeof(IPlatformEnlightenmentProvider).GetTypeInfo(); +#else + var ifType = typeof(IPlatformEnlightenmentProvider); +#endif + var asm = new AssemblyName(ifType.Assembly.FullName); + asm.Name = "System.Reactive.PlatformServices"; + var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, " + asm.FullName; +#endif + + var t = Type.GetType(name, false); + if (t != null) + s_current = (IPlatformEnlightenmentProvider)Activator.CreateInstance(t); + else + s_current = new DefaultPlatformEnlightenmentProvider(); + } + } + } + + return s_current; + } + + set + { + lock (s_gate) + { + s_current = value; + } + } + } + } + + class DefaultPlatformEnlightenmentProvider : IPlatformEnlightenmentProvider + { + public T GetService<T>(object[] args) where T : class + { + return null; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs new file mode 100644 index 0000000..8a02cd5 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PriorityQueue.cs @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; +using System.Collections.Generic; + +namespace System.Reactive +{ + internal class PriorityQueue<T> where T : IComparable<T> + { +#if !NO_INTERLOCKED_64 + private static long _count = long.MinValue; +#else + private static int _count = int.MinValue; +#endif + private IndexedItem[] _items; + private int _size; + + public PriorityQueue() + : this(16) + { + } + + public PriorityQueue(int capacity) + { + _items = new IndexedItem[capacity]; + _size = 0; + } + + private bool IsHigherPriority(int left, int right) + { + return _items[left].CompareTo(_items[right]) < 0; + } + + private void Percolate(int index) + { + if (index >= _size || index < 0) + return; + var parent = (index - 1) / 2; + if (parent < 0 || parent == index) + return; + + if (IsHigherPriority(index, parent)) + { + var temp = _items[index]; + _items[index] = _items[parent]; + _items[parent] = temp; + Percolate(parent); + } + } + + private void Heapify() + { + Heapify(0); + } + + private void Heapify(int index) + { + if (index >= _size || index < 0) + return; + + var left = 2 * index + 1; + var right = 2 * index + 2; + var first = index; + + if (left < _size && IsHigherPriority(left, first)) + first = left; + if (right < _size && IsHigherPriority(right, first)) + first = right; + if (first != index) + { + var temp = _items[index]; + _items[index] = _items[first]; + _items[first] = temp; + Heapify(first); + } + } + + public int Count { get { return _size; } } + + public T Peek() + { + if (_size == 0) + throw new InvalidOperationException(Strings_Core.HEAP_EMPTY); + + return _items[0].Value; + } + + private void RemoveAt(int index) + { + _items[index] = _items[--_size]; + _items[_size] = default(IndexedItem); + Heapify(); + if (_size < _items.Length / 4) + { + var temp = _items; + _items = new IndexedItem[_items.Length / 2]; + Array.Copy(temp, 0, _items, 0, _size); + } + } + + public T Dequeue() + { + var result = Peek(); + RemoveAt(0); + return result; + } + + public void Enqueue(T item) + { + if (_size >= _items.Length) + { + var temp = _items; + _items = new IndexedItem[_items.Length * 2]; + Array.Copy(temp, _items, temp.Length); + } + + var index = _size++; + _items[index] = new IndexedItem { Value = item, Id = Interlocked.Increment(ref _count) }; + Percolate(index); + } + + public bool Remove(T item) + { + for (var i = 0; i < _size; ++i) + { + if (EqualityComparer<T>.Default.Equals(_items[i].Value, item)) + { + RemoveAt(i); + return true; + } + } + + return false; + } + + struct IndexedItem : IComparable<IndexedItem> + { + public T Value; +#if !NO_INTERLOCKED_64 + public long Id; +#else + public int Id; +#endif + + public int CompareTo(IndexedItem other) + { + var c = Value.CompareTo(other.Value); + if (c == 0) + c = Id.CompareTo(other.Id); + return c; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs new file mode 100644 index 0000000..f03b45f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Producer.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive +{ + /// <summary> + /// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + internal interface IProducer< +#if !NO_VARIANCE + out +#endif + TSource> : IObservable<TSource> + { + IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard); + } + + /// <summary> + /// Base class for implementation of query operators, providing performance benefits over the use of Observable.Create. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + internal abstract class Producer<TSource> : IProducer<TSource> + { + /// <summary> + /// Publicly visible Subscribe method. + /// </summary> + /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param> + /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns> + public IDisposable Subscribe(IObserver<TSource> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + return SubscribeRaw(observer, true); + } + + public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard) + { + var state = new State(); + state.observer = observer; + state.sink = new SingleAssignmentDisposable(); + state.subscription = new SingleAssignmentDisposable(); + + var d = new CompositeDisposable(2) { state.sink, state.subscription }; + + // + // See AutoDetachObserver.cs for more information on the safeguarding requirement and + // its implementation aspects. + // + if (enableSafeguard) + { + state.observer = SafeObserver<TSource>.Create(state.observer, d); + } + + if (CurrentThreadScheduler.IsScheduleRequired) + { + CurrentThreadScheduler.Instance.Schedule(state, Run); + } + else + { + state.subscription.Disposable = this.Run(state.observer, state.subscription, state.Assign); + } + + return d; + } + + struct State + { + public SingleAssignmentDisposable sink; + public SingleAssignmentDisposable subscription; + public IObserver<TSource> observer; + + public void Assign(IDisposable s) + { + sink.Disposable = s; + } + } + + private IDisposable Run(IScheduler _, State x) + { + x.subscription.Disposable = this.Run(x.observer, x.subscription, x.Assign); + return Disposable.Empty; + } + + /// <summary> + /// Core implementation of the query operator, called upon a new subscription to the producer object. + /// </summary> + /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param> + /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param> + /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param> + /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns> + /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks> + protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink); + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs new file mode 100644 index 0000000..2693569 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SafeObserver.cs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; + +namespace System.Reactive +{ + // + // See AutoDetachObserver.cs for more information on the safeguarding requirement and + // its implementation aspects. + // + + class SafeObserver<TSource> : IObserver<TSource> + { + private readonly IObserver<TSource> _observer; + private readonly IDisposable _disposable; + + public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable) + { + var a = observer as AnonymousObserver<TSource>; + if (a != null) + return a.MakeSafe(disposable); + else + return new SafeObserver<TSource>(observer, disposable); + } + + private SafeObserver(IObserver<TSource> observer, IDisposable disposable) + { + _observer = observer; + _disposable = disposable; + } + + public void OnNext(TSource value) + { + var __noError = false; + try + { + _observer.OnNext(value); + __noError = true; + } + finally + { + if (!__noError) + _disposable.Dispose(); + } + } + + public void OnError(Exception error) + { + try + { + _observer.OnError(error); + } + finally + { + _disposable.Dispose(); + } + } + + public void OnCompleted() + { + try + { + _observer.OnCompleted(); + } + finally + { + _disposable.Dispose(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs new file mode 100644 index 0000000..2b728e2 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs @@ -0,0 +1,441 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Threading; + +namespace System.Reactive +{ +#if !NO_PERF && !NO_CDS + using System.Collections.Concurrent; + using System.Diagnostics; + + internal class ScheduledObserver<T> : ObserverBase<T>, IDisposable + { + private volatile int _state = 0; + private const int STOPPED = 0; + private const int RUNNING = 1; + private const int PENDING = 2; + private const int FAULTED = 9; + + private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>(); + private volatile bool _failed; + private volatile Exception _error; + private volatile bool _completed; + + private readonly IObserver<T> _observer; + private readonly IScheduler _scheduler; + private readonly ISchedulerLongRunning _longRunning; + private readonly SerialDisposable _disposable = new SerialDisposable(); + + public ScheduledObserver(IScheduler scheduler, IObserver<T> observer) + { + _scheduler = scheduler; + _observer = observer; + _longRunning = _scheduler.AsLongRunning(); + + if (_longRunning != null) + _dispatcherEvent = new SemaphoreSlim(0); + } + + private readonly object _dispatcherInitGate = new object(); + private SemaphoreSlim _dispatcherEvent; + private IDisposable _dispatcherJob; + + private void EnsureDispatcher() + { + if (_dispatcherJob == null) + { + lock (_dispatcherInitGate) + { + if (_dispatcherJob == null) + { + _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch); + + _disposable.Disposable = new CompositeDisposable(2) + { + _dispatcherJob, + Disposable.Create(() => _dispatcherEvent.Release()) + }; + } + } + } + } + + private void Dispatch(ICancelable cancel) + { + while (true) + { + _dispatcherEvent.Wait(); + + if (cancel.IsDisposed) + return; + + var next = default(T); + while (_queue.TryDequeue(out next)) + { + try + { + _observer.OnNext(next); + } + catch + { + var nop = default(T); + while (_queue.TryDequeue(out nop)) + ; + + throw; + } + + _dispatcherEvent.Wait(); + + if (cancel.IsDisposed) + return; + } + + if (_failed) + { + _observer.OnError(_error); + Dispose(); + return; + } + + if (_completed) + { + _observer.OnCompleted(); + Dispose(); + return; + } + } + } + + public void EnsureActive() + { + EnsureActive(1); + } + + public void EnsureActive(int n) + { + if (_longRunning != null) + { + if (n > 0) + _dispatcherEvent.Release(n); + + EnsureDispatcher(); + } + else + EnsureActiveSlow(); + } + + private void EnsureActiveSlow() + { + var isOwner = false; + +#pragma warning disable 0420 + while (true) + { + var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED); + if (old == STOPPED) + { + isOwner = true; // RUNNING + break; + } + + if (old == FAULTED) + return; + + // If we find the consumer loop running, we transition to PENDING to handle + // the case where the queue is seen empty by the consumer, making it transition + // to the STOPPED state, but we inserted an item into the queue. + // + // C: _queue.TryDequeue == false (RUNNING) + // ---------------------------------------------- + // P: _queue.Enqueue(...) + // EnsureActive + // Exchange(ref _state, RUNNING) == RUNNING + // ---------------------------------------------- + // C: transition to STOPPED (STOPPED) + // + // In this case, P would believe C is running and not invoke the scheduler + // using the isOwner flag. + // + // By introducing an intermediate PENDING state and using CAS in the consumer + // to only transition to STOPPED in case we were still RUNNING, we can force + // the consumer to reconsider the decision to transition to STOPPED. In that + // case, the consumer loops again and re-reads from the queue and other state + // fields. At least one bit of state will have changed because EnsureActive + // should only be called after invocation of IObserver<T> methods that touch + // this state. + // + if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING) + break; + } +#pragma warning restore 0420 + + if (isOwner) + { + _disposable.Disposable = _scheduler.Schedule<object>(null, Run); + } + } + + private void Run(object state, Action<object> recurse) + { +#pragma warning disable 0420 + var next = default(T); + while (!_queue.TryDequeue(out next)) + { + if (_failed) + { + // Between transitioning to _failed and the queue check in the loop, + // items could have been queued, so we can't stop yet. We don't spin + // and immediately re-check the queue. + // + // C: _queue.TryDequeue == false + // ---------------------------------------------- + // P: OnNext(...) + // _queue.Enqueue(...) // Will get lost + // P: OnError(...) + // _failed = true + // ---------------------------------------------- + // C: if (_failed) + // _observer.OnError(...) // Lost an OnNext + // + if (!_queue.IsEmpty) + continue; + + Interlocked.Exchange(ref _state, STOPPED); + _observer.OnError(_error); + Dispose(); + return; + } + + if (_completed) + { + // Between transitioning to _completed and the queue check in the loop, + // items could have been queued, so we can't stop yet. We don't spin + // and immediately re-check the queue. + // + // C: _queue.TryDequeue == false + // ---------------------------------------------- + // P: OnNext(...) + // _queue.Enqueue(...) // Will get lost + // P: OnCompleted(...) + // _completed = true + // ---------------------------------------------- + // C: if (_completed) + // _observer.OnCompleted() // Lost an OnNext + // + if (!_queue.IsEmpty) + continue; + + Interlocked.Exchange(ref _state, STOPPED); + _observer.OnCompleted(); + Dispose(); + return; + } + + var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING); + if (old == RUNNING || old == FAULTED) + return; + + Debug.Assert(old == PENDING); + + // The producer has put us in the PENDING state to prevent us from + // transitioning to STOPPED, so we go RUNNING again and re-check our state. + _state = RUNNING; + } + + Interlocked.Exchange(ref _state, RUNNING); + +#pragma warning restore 0420 + + try + { + _observer.OnNext(next); + } + catch + { +#pragma warning disable 0420 + Interlocked.Exchange(ref _state, FAULTED); +#pragma warning restore 0420 + + var nop = default(T); + while (_queue.TryDequeue(out nop)) + ; + throw; + } + + recurse(state); + } + + protected override void OnNextCore(T value) + { + _queue.Enqueue(value); + } + + protected override void OnErrorCore(Exception exception) + { + _error = exception; + _failed = true; + } + + protected override void OnCompletedCore() + { + _completed = true; + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + _disposable.Dispose(); + } + } + } +#else + class ScheduledObserver<T> : ObserverBase<T>, IDisposable + { + private bool _isAcquired = false; + private bool _hasFaulted = false; + private readonly Queue<Action> _queue = new Queue<Action>(); + private readonly IObserver<T> _observer; + private readonly IScheduler _scheduler; + private readonly SerialDisposable _disposable = new SerialDisposable(); + + public ScheduledObserver(IScheduler scheduler, IObserver<T> observer) + { + _scheduler = scheduler; + _observer = observer; + } + + public void EnsureActive(int n) + { + EnsureActive(); + } + + public void EnsureActive() + { + var isOwner = false; + + lock (_queue) + { + if (!_hasFaulted && _queue.Count > 0) + { + isOwner = !_isAcquired; + _isAcquired = true; + } + } + + if (isOwner) + { + _disposable.Disposable = _scheduler.Schedule<object>(null, Run); + } + } + + private void Run(object state, Action<object> recurse) + { + var work = default(Action); + lock (_queue) + { + if (_queue.Count > 0) + work = _queue.Dequeue(); + else + { + _isAcquired = false; + return; + } + } + + try + { + work(); + } + catch + { + lock (_queue) + { + _queue.Clear(); + _hasFaulted = true; + } + throw; + } + + recurse(state); + } + + protected override void OnNextCore(T value) + { + lock (_queue) + _queue.Enqueue(() => _observer.OnNext(value)); + } + + protected override void OnErrorCore(Exception exception) + { + lock (_queue) + _queue.Enqueue(() => _observer.OnError(exception)); + } + + protected override void OnCompletedCore() + { + lock (_queue) + _queue.Enqueue(() => _observer.OnCompleted()); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + _disposable.Dispose(); + } + } + } +#endif + + class ObserveOnObserver<T> : ScheduledObserver<T> + { + private IDisposable _cancel; + + public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel) + : base(scheduler, observer) + { + _cancel = cancel; + } + + protected override void OnNextCore(T value) + { + base.OnNextCore(value); + EnsureActive(); + } + + protected override void OnErrorCore(Exception exception) + { + base.OnErrorCore(exception); + EnsureActive(); + } + + protected override void OnCompletedCore() + { + base.OnCompletedCore(); + EnsureActive(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + var cancel = Interlocked.Exchange(ref _cancel, null); + if (cancel != null) + { + cancel.Dispose(); + } + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs new file mode 100644 index 0000000..4f5eeee --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_SEMAPHORE && SILVERLIGHT +using System; +using System.Threading; + +namespace System.Reactive.Threading +{ + //Monitor based implementation of Semaphore + //that mimicks the .NET Semaphore class (System.Threading.Semaphore) + + internal sealed class Semaphore : IDisposable + { + private int m_currentCount; + private readonly int m_maximumCount; + private readonly object m_lockObject; + private bool m_disposed; + + public Semaphore(int initialCount, int maximumCount) + { + if (initialCount < 0) + { + throw new ArgumentOutOfRangeException("initialCount", "Non-negative number required."); + } + if (maximumCount < 1) + { + throw new ArgumentOutOfRangeException("maximumCount", "Positive number required."); + } + if (initialCount > maximumCount) + { + throw new ArgumentException("Initial count must be smaller than maximum"); + } + + m_currentCount = initialCount; + m_maximumCount = maximumCount; + m_lockObject = new object(); + } + + public int Release() + { + return this.Release(1); + } + + public int Release(int releaseCount) + { + if (releaseCount < 1) + { + throw new ArgumentOutOfRangeException("releaseCount", "Positive number required."); + } + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + var oldCount = default(int); + lock (m_lockObject) + { + oldCount = m_currentCount; + if (releaseCount + m_currentCount > m_maximumCount) + { + throw new ArgumentOutOfRangeException("releaseCount", "Amount of releases would overflow maximum"); + } + m_currentCount += releaseCount; + //PulseAll makes sure all waiting threads get queued for acquiring the lock + //Pulse would only queue one thread. + + Monitor.PulseAll(m_lockObject); + } + return oldCount; + } + + public bool WaitOne() + { + return WaitOne(Timeout.Infinite); + } + + public bool WaitOne(int millisecondsTimeout) + { + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + lock (m_lockObject) + { + while (m_currentCount == 0) + { + if (!Monitor.Wait(m_lockObject, millisecondsTimeout)) + { + return false; + } + } + m_currentCount--; + return true; + } + } + + public bool WaitOne(TimeSpan timeout) + { + return WaitOne((int)timeout.TotalMilliseconds); + } + + public void Close() + { + Dispose(); + } + + public void Dispose() + { + //the .NET CLR semaphore does not release waits upon dispose + //so we don't do that either. + m_disposed = true; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs new file mode 100644 index 0000000..847c14d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Xna.cs @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_SEMAPHORE && (XNA || NETCF) +using System; +using System.Collections.Generic; +using System.Threading; + +namespace System.Reactive.Threading +{ + //Monitor based implementation of Semaphore + //that mimicks the .NET Semaphore class (System.Threading.Semaphore) + + internal sealed class Semaphore : IDisposable + { + private int m_currentCount; + private readonly int m_maximumCount; + private readonly object m_lockObject; + private bool m_disposed; + private readonly List<ManualResetEvent> m_waiting; + + public Semaphore(int initialCount, int maximumCount) + { + if (initialCount < 0) + { + throw new ArgumentOutOfRangeException("initialCount", "Non-negative number required."); + } + if (maximumCount < 1) + { + throw new ArgumentOutOfRangeException("maximumCount", "Positive number required."); + } + if (initialCount > maximumCount) + { + throw new ArgumentException("Initial count must be smaller than maximum"); + } + m_waiting = new List<ManualResetEvent>(); + m_currentCount = initialCount; + m_maximumCount = maximumCount; + m_lockObject = new object(); + } + + public int Release() + { + return this.Release(1); + } + + public int Release(int releaseCount) + { + if (releaseCount < 1) + { + throw new ArgumentOutOfRangeException("releaseCount", "Positive number required."); + } + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + var oldCount = default(int); + var toBeReleased = new List<ManualResetEvent>(); + lock (m_lockObject) + { + oldCount = m_currentCount; + if (releaseCount + m_currentCount > m_maximumCount) + { + throw new ArgumentOutOfRangeException("releaseCount", "Amount of releases would overflow maximum"); + } + + var waiting = m_waiting.ToArray(); + var left = Math.Max(0, releaseCount - waiting.Length); + for (var i = 0; i < releaseCount && i < m_waiting.Count; i++) + { + toBeReleased.Add(waiting[i]); + m_waiting.RemoveAt(0); + } + m_currentCount += left; + } + foreach(var release in toBeReleased) + { + release.Set(); + } + return oldCount; + } + + public bool WaitOne() + { + return WaitOne(Timeout.Infinite); + } + + public bool WaitOne(int millisecondsTimeout) + { + if (m_disposed) + { + throw new ObjectDisposedException("Semaphore"); + } + + var manualResetEvent = default(ManualResetEvent); + + lock (m_lockObject) + { + if (m_currentCount == 0) + { + manualResetEvent = new ManualResetEvent(false); + m_waiting.Add(manualResetEvent); + } + else + { + m_currentCount--; + return true; + } + } +#if XNA_31_ZUNE || NETCF35 + if (!manualResetEvent.WaitOne(millisecondsTimeout, false)) +#else + if (!manualResetEvent.WaitOne(millisecondsTimeout)) +#endif + { + lock(m_lockObject) + { + m_waiting.Remove(manualResetEvent); + } + return false; + } + return true; + } + + public bool WaitOne(TimeSpan timeout) + { + return WaitOne((int)timeout.TotalMilliseconds); + } + + public void Close() + { + Dispose(); + } + + public void Dispose() + { + //the .NET CLR semaphore does not release waits upon dispose + //so we don't do that either. + m_disposed = true; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs new file mode 100644 index 0000000..ffe6ee5 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Sink.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System.Threading; + +namespace System.Reactive +{ + /// <summary> + /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + /// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks> + internal abstract class Sink<TSource> : IDisposable + { + protected internal volatile IObserver<TSource> _observer; + private IDisposable _cancel; + + public Sink(IObserver<TSource> observer, IDisposable cancel) + { + _observer = observer; + _cancel = cancel; + } + + public virtual void Dispose() + { + _observer = NopObserver<TSource>.Instance; + + var cancel = Interlocked.Exchange(ref _cancel, null); + if (cancel != null) + { + cancel.Dispose(); + } + } + + public IObserver<TSource> GetForwarder() + { + return new _(this); + } + + class _ : IObserver<TSource> + { + private readonly Sink<TSource> _forward; + + public _(Sink<TSource> forward) + { + _forward = forward; + } + + public void OnNext(TSource value) + { + _forward._observer.OnNext(value); + } + + public void OnError(Exception error) + { + _forward._observer.OnError(error); + _forward.Dispose(); + } + + public void OnCompleted() + { + _forward._observer.OnCompleted(); + _forward.Dispose(); + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs new file mode 100644 index 0000000..d3c2374 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Stubs.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal static class Stubs<T> + { + public static readonly Action<T> Ignore = _ => { }; + public static readonly Func<T, T> I = _ => _; + } + + internal static class Stubs + { + public static readonly Action Nop = () => { }; + public static readonly Action<Exception> Throw = ex => { ex.Throw(); }; + } + +#if !NO_THREAD + internal static class TimerStubs + { + public static readonly System.Threading.Timer Never = new System.Threading.Timer(_ => { }); + } +#endif +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs new file mode 100644 index 0000000..c417f90 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizationContextExtensions.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_SYNCCTX +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace System.Reactive.Concurrency +{ + internal static class SynchronizationContextExtensions + { + public static void PostWithStartComplete<T>(this SynchronizationContext context, Action<T> action, T state) + { + context.OperationStarted(); + + context.Post( + o => + { + try + { + action((T)o); + } + finally + { + context.OperationCompleted(); + } + }, + state + ); + } + + public static void PostWithStartComplete(this SynchronizationContext context, Action action) + { + context.OperationStarted(); + + context.Post( + _ => + { + try + { + action(); + } + finally + { + context.OperationCompleted(); + } + }, + null + ); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs new file mode 100644 index 0000000..e41294e --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SynchronizedObserver.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + internal class SynchronizedObserver<T> : ObserverBase<T> + { + private readonly object _gate; + private readonly IObserver<T> _observer; + + public SynchronizedObserver(IObserver<T> observer, object gate) + { + _gate = gate; + _observer = observer; + } + + protected override void OnNextCore(T value) + { + lock (_gate) + { + _observer.OnNext(value); + } + } + + protected override void OnErrorCore(Exception exception) + { + lock (_gate) + { + _observer.OnError(exception); + } + } + + protected override void OnCompletedCore() + { + lock (_gate) + { + _observer.OnCompleted(); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs new file mode 100644 index 0000000..b9c8167 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.Default.cs @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provides access to the local system clock. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class DefaultSystemClock : ISystemClock + { + /// <summary> + /// Gets the current time. + /// </summary> + public DateTimeOffset UtcNow + { + get { return DateTimeOffset.UtcNow; } + } + } + + internal class DefaultSystemClockMonitor : PeriodicTimerSystemClockMonitor + { + private static readonly TimeSpan DEFAULT_PERIOD = TimeSpan.FromSeconds(1); + + public DefaultSystemClockMonitor() + : base(DEFAULT_PERIOD) + { + } + } + + /// <summary> + /// (Infrastructure) Monitors for system clock changes based on a periodic timer. + /// </summary> + [EditorBrowsable(EditorBrowsableState.Never)] + public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged + { + private readonly TimeSpan _period; + private readonly SerialDisposable _timer; + + private DateTimeOffset _lastTime; + private EventHandler<SystemClockChangedEventArgs> _systemClockChanged; + + private const int SYNC_MAXRETRIES = 100; + private const double SYNC_MAXDELTA = 10; + private const int MAXERROR = 100; + + /// <summary> + /// Creates a new monitor for system clock changes with the specified polling frequency. + /// </summary> + /// <param name="period">Polling frequency for system clock changes.</param> + public PeriodicTimerSystemClockMonitor(TimeSpan period) + { + _period = period; + _timer = new SerialDisposable(); + } + + /// <summary> + /// Event that gets raised when a system clock change is detected. + /// </summary> + public event EventHandler<SystemClockChangedEventArgs> SystemClockChanged + { + add + { + NewTimer(); + + _systemClockChanged += value; + } + + remove + { + _systemClockChanged -= value; + + _timer.Disposable = Disposable.Empty; + } + } + + private void NewTimer() + { + _timer.Disposable = Disposable.Empty; + + var n = 0; + do + { + _lastTime = SystemClock.UtcNow; + _timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period); + } while (Math.Abs((SystemClock.UtcNow - _lastTime).TotalMilliseconds) > SYNC_MAXDELTA && ++n < SYNC_MAXRETRIES); + + if (n >= SYNC_MAXRETRIES) + throw new InvalidOperationException(Strings_Core.FAILED_CLOCK_MONITORING); + } + + private void TimeChanged() + { + var now = SystemClock.UtcNow; + var diff = now - (_lastTime + _period); + if (Math.Abs(diff.TotalMilliseconds) >= MAXERROR) + { + var scc = _systemClockChanged; + if (scc != null) + scc(this, new SystemClockChangedEventArgs(_lastTime + _period, now)); + + NewTimer(); + } + else + { + _lastTime = SystemClock.UtcNow; + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs new file mode 100644 index 0000000..e482dd0 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/SystemClock.cs @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.ComponentModel; +using System.Threading; + +namespace System.Reactive.PlatformServices +{ + /// <summary> + /// (Infrastructure) Provides access to local system clock services. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public static class SystemClock + { + private static Lazy<ISystemClock> s_serviceSystemClock = new Lazy<ISystemClock>(InitializeSystemClock); + private static Lazy<INotifySystemClockChanged> s_serviceSystemClockChanged = new Lazy<INotifySystemClockChanged>(InitializeSystemClockChanged); + + private static int _refCount; + + /// <summary> + /// Gets the local system clock time. + /// </summary> + public static DateTimeOffset UtcNow + { + get { return s_serviceSystemClock.Value.UtcNow; } + } + + /// <summary> + /// Event that gets raised when a system clock change is detected, if there's any interest as indicated by AddRef calls. + /// </summary> + public static event EventHandler<SystemClockChangedEventArgs> SystemClockChanged; + + /// <summary> + /// Adds a reference to the system clock monitor, causing it to be sending notifications. + /// </summary> + /// <exception cref="NotSupportedException">Thrown when the system doesn't support sending clock change notifications.</exception> + public static void AddRef() + { + if (Interlocked.Increment(ref _refCount) == 1) + { + s_serviceSystemClockChanged.Value.SystemClockChanged += OnSystemClockChanged; + } + } + + /// <summary> + /// Removes a reference to the system clock monitor, causing it to stop sending notifications + /// if the removed reference was the last one. + /// </summary> + public static void Release() + { + if (Interlocked.Decrement(ref _refCount) == 0) + { + s_serviceSystemClockChanged.Value.SystemClockChanged -= OnSystemClockChanged; + } + } + + private static void OnSystemClockChanged(object sender, SystemClockChangedEventArgs e) + { + var scc = SystemClockChanged; + if (scc != null) + scc(sender, e); + } + + private static ISystemClock InitializeSystemClock() + { + return PlatformEnlightenmentProvider.Current.GetService<ISystemClock>() ?? new DefaultSystemClock(); + } + + private static INotifySystemClockChanged InitializeSystemClockChanged() + { + return PlatformEnlightenmentProvider.Current.GetService<INotifySystemClockChanged>() ?? new DefaultSystemClockMonitor(); + } + } + + /// <summary> + /// (Infrastructure) Provides access to the local system clock. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface ISystemClock + { + /// <summary> + /// Gets the current time. + /// </summary> + DateTimeOffset UtcNow { get; } + } + + /// <summary> + /// (Infrastructure) Provides a mechanism to notify local schedulers about system clock changes. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public interface INotifySystemClockChanged + { + /// <summary> + /// Event that gets raised when a system clock change is detected. + /// </summary> + event EventHandler<SystemClockChangedEventArgs> SystemClockChanged; + } + + /// <summary> + /// (Infrastructure) Event arguments for system clock change notifications. + /// </summary> + /// <remarks> + /// This type is used by the Rx infrastructure and not meant for public consumption or implementation. + /// No guarantees are made about forward compatibility of the type's functionality and its usage. + /// </remarks> + [EditorBrowsable(EditorBrowsableState.Never)] + public class SystemClockChangedEventArgs : EventArgs + { + /// <summary> + /// Creates a new system clock notification object with unknown old and new times. + /// </summary> + public SystemClockChangedEventArgs() + : this(DateTimeOffset.MinValue, DateTimeOffset.MaxValue) + { + } + + /// <summary> + /// Creates a new system clock notification object with the specified old and new times. + /// </summary> + /// <param name="oldTime">Time before the system clock changed, or DateTimeOffset.MinValue if not known.</param> + /// <param name="newTime">Time after the system clock changed, or DateTimeOffset.MaxValue if not known.</param> + public SystemClockChangedEventArgs(DateTimeOffset oldTime, DateTimeOffset newTime) + { + OldTime = oldTime; + NewTime = newTime; + } + + /// <summary> + /// Gets the time before the system clock changed, or DateTimeOffset.MinValue if not known. + /// </summary> + public DateTimeOffset OldTime { get; private set; } + + /// <summary> + /// Gets the time after the system clock changed, or DateTimeOffset.MaxValue if not known. + /// </summary> + public DateTimeOffset NewTime { get; private set; } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Notification.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Notification.cs new file mode 100644 index 0000000..54c4e7c --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Notification.cs @@ -0,0 +1,649 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Diagnostics; +using System.Globalization; +using System.Collections.Generic; +using System.Reactive.Concurrency; + +#pragma warning disable 0659 +#pragma warning disable 0661 + +namespace System.Reactive +{ + /// <summary> + /// Indicates the type of a notification. + /// </summary> + public enum NotificationKind + { + /// <summary> + /// Represents an OnNext notification. + /// </summary> + OnNext, + + /// <summary> + /// Represents an OnError notification. + /// </summary> + OnError, + + /// <summary> + /// Represents an OnCompleted notification. + /// </summary> + OnCompleted + } + + /// <summary> + /// Represents a notification to an observer. + /// </summary> + /// <typeparam name="T">The type of the elements received by the observer.</typeparam> +#if !NO_SERIALIZABLE + [Serializable] +#endif + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2218:OverrideGetHashCodeOnOverridingEquals", Justification = "Resembles a discriminated union with finite number of subclasses (external users shouldn't create their own subtypes), each of which does override GetHashCode itself.")] + public abstract class Notification<T> : IEquatable<Notification<T>> + { + /// <summary> + /// Default constructor used by derived types. + /// </summary> + protected internal Notification() + { + } + + /// <summary> + /// Returns the value of an OnNext notification or throws an exception. + /// </summary> + public abstract T Value + { + get; + } + + /// <summary> + /// Returns a value that indicates whether the notification has a value. + /// </summary> + public abstract bool HasValue + { + get; + } + + /// <summary> + /// Returns the exception of an OnError notification or returns null. + /// </summary> + public abstract Exception Exception + { + get; + } + + /// <summary> + /// Gets the kind of notification that is represented. + /// </summary> + public abstract NotificationKind Kind + { + get; + } + + /// <summary> + /// Represents an OnNext notification to an observer. + /// </summary> +#if !NO_DEBUGGER_ATTRIBUTES + [DebuggerDisplay("OnNext({Value})")] +#endif +#if !NO_SERIALIZABLE + [Serializable] +#endif + internal sealed class OnNextNotification : Notification<T> + { + T value; + + /// <summary> + /// Constructs a notification of a new value. + /// </summary> + public OnNextNotification(T value) + { + this.value = value; + } + + /// <summary> + /// Returns the value of an OnNext notification. + /// </summary> + public override T Value { get { return value; } } + + /// <summary> + /// Returns null. + /// </summary> + public override Exception Exception { get { return null; } } + + /// <summary> + /// Returns true. + /// </summary> + public override bool HasValue { get { return true; } } + + /// <summary> + /// Returns NotificationKind.OnNext. + /// </summary> + public override NotificationKind Kind { get { return NotificationKind.OnNext; } } + + /// <summary> + /// Returns the hash code for this instance. + /// </summary> + public override int GetHashCode() + { + return EqualityComparer<T>.Default.GetHashCode(Value); + } + + /// <summary> + /// Indicates whether this instance and a specified object are equal. + /// </summary> + public override bool Equals(Notification<T> other) + { + if (Object.ReferenceEquals(this, other)) + return true; + if (Object.ReferenceEquals(other, null)) + return false; + if (other.Kind != NotificationKind.OnNext) + return false; + return EqualityComparer<T>.Default.Equals(Value, other.Value); + } + + /// <summary> + /// Returns a string representation of this instance. + /// </summary> + public override string ToString() + { + return String.Format(CultureInfo.CurrentCulture, "OnNext({0})", Value); + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + public override void Accept(IObserver<T> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + observer.OnNext(Value); + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification and returns the produced result. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + /// <returns>Result produced by the observation.</returns> + public override TResult Accept<TResult>(IObserver<T, TResult> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + return observer.OnNext(Value); + } + + /// <summary> + /// Invokes the delegate corresponding to the notification. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + public override void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + onNext(Value); + } + + /// <summary> + /// Invokes the delegate corresponding to the notification and returns the produced result. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + /// <returns>Result produced by the observation.</returns> + public override TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + return onNext(Value); + } + } + + /// <summary> + /// Represents an OnError notification to an observer. + /// </summary> +#if !NO_DEBUGGER_ATTRIBUTES + [DebuggerDisplay("OnError({Exception})")] +#endif +#if !NO_SERIALIZABLE + [Serializable] +#endif + internal sealed class OnErrorNotification : Notification<T> + { + Exception exception; + + /// <summary> + /// Constructs a notification of an exception. + /// </summary> + public OnErrorNotification(Exception exception) + { + this.exception = exception; + } + + /// <summary> + /// Throws the exception. + /// </summary> + public override T Value { get { exception.Throw(); return default(T); } } + + /// <summary> + /// Returns the exception. + /// </summary> + public override Exception Exception { get { return exception; } } + + /// <summary> + /// Returns false. + /// </summary> + public override bool HasValue { get { return false; } } + + /// <summary> + /// Returns NotificationKind.OnError. + /// </summary> + public override NotificationKind Kind { get { return NotificationKind.OnError; } } + + /// <summary> + /// Returns the hash code for this instance. + /// </summary> + public override int GetHashCode() + { + return Exception.GetHashCode(); + } + + /// <summary> + /// Indicates whether this instance and other are equal. + /// </summary> + public override bool Equals(Notification<T> other) + { + if (Object.ReferenceEquals(this, other)) + return true; + if (Object.ReferenceEquals(other, null)) + return false; + if (other.Kind != NotificationKind.OnError) + return false; + return Object.Equals(Exception, other.Exception); + } + + /// <summary> + /// Returns a string representation of this instance. + /// </summary> + public override string ToString() + { + return String.Format(CultureInfo.CurrentCulture, "OnError({0})", Exception.GetType().FullName); + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + public override void Accept(IObserver<T> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + observer.OnError(Exception); + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification and returns the produced result. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + /// <returns>Result produced by the observation.</returns> + public override TResult Accept<TResult>(IObserver<T, TResult> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + return observer.OnError(Exception); + } + + /// <summary> + /// Invokes the delegate corresponding to the notification. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + public override void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + onError(Exception); + } + + /// <summary> + /// Invokes the delegate corresponding to the notification and returns the produced result. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + /// <returns>Result produced by the observation.</returns> + public override TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + return onError(Exception); + } + } + + /// <summary> + /// Represents an OnCompleted notification to an observer. + /// </summary> +#if !NO_DEBUGGER_ATTRIBUTES + [DebuggerDisplay("OnCompleted()")] +#endif +#if !NO_SERIALIZABLE + [Serializable] +#endif + internal sealed class OnCompletedNotification : Notification<T> + { + /// <summary> + /// Constructs a notification of the end of a sequence. + /// </summary> + public OnCompletedNotification() + { + } + + /// <summary> + /// Throws an InvalidOperationException. + /// </summary> + public override T Value { get { throw new InvalidOperationException(Strings_Core.COMPLETED_NO_VALUE); } } + + /// <summary> + /// Returns null. + /// </summary> + public override Exception Exception { get { return null; } } + + /// <summary> + /// Returns false. + /// </summary> + public override bool HasValue { get { return false; } } + + /// <summary> + /// Returns NotificationKind.OnCompleted. + /// </summary> + public override NotificationKind Kind { get { return NotificationKind.OnCompleted; } } + + /// <summary> + /// Returns the hash code for this instance. + /// </summary> + public override int GetHashCode() + { + return typeof(T).GetHashCode() ^ 8510; + } + + /// <summary> + /// Indicates whether this instance and other are equal. + /// </summary> + public override bool Equals(Notification<T> other) + { + if (Object.ReferenceEquals(this, other)) + return true; + if (Object.ReferenceEquals(other, null)) + return false; + return other.Kind == NotificationKind.OnCompleted; + } + + /// <summary> + /// Returns a string representation of this instance. + /// </summary> + public override string ToString() + { + return "OnCompleted()"; + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + public override void Accept(IObserver<T> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + observer.OnCompleted(); + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification and returns the produced result. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + /// <returns>Result produced by the observation.</returns> + public override TResult Accept<TResult>(IObserver<T, TResult> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + return observer.OnCompleted(); + } + + /// <summary> + /// Invokes the delegate corresponding to the notification. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + public override void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + onCompleted(); + } + + /// <summary> + /// Invokes the delegate corresponding to the notification and returns the produced result. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + /// <returns>Result produced by the observation.</returns> + public override TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted) + { + if (onNext == null) + throw new ArgumentNullException("onNext"); + if (onError == null) + throw new ArgumentNullException("onError"); + if (onCompleted == null) + throw new ArgumentNullException("onCompleted"); + + return onCompleted(); + } + } + + /// <summary> + /// Determines whether the current Notification<T> object has the same observer message payload as a specified Notification<T> value. + /// </summary> + /// <param name="other">An object to compare to the current Notification<T> object.</param> + /// <returns>true if both Notification<T> objects have the same observer message payload; otherwise, false.</returns> + /// <remarks> + /// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any). + /// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method. + /// In case one wants to determine whether two Notification<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead. + /// </remarks> + public abstract bool Equals(Notification<T> other); + + /// <summary> + /// Determines whether the two specified Notification<T> objects have the same observer message payload. + /// </summary> + /// <param name="left">The first Notification<T> to compare, or null.</param> + /// <param name="right">The second Notification<T> to compare, or null.</param> + /// <returns>true if the first Notification<T> value has the same observer message payload as the second Notification<T> value; otherwise, false.</returns> + /// <remarks> + /// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any). + /// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method. + /// In case one wants to determine whether two Notification<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead. + /// </remarks> + public static bool operator ==(Notification<T> left, Notification<T> right) + { + if (object.ReferenceEquals(left, right)) + return true; + + if ((object)left == null || (object)right == null) + return false; + + return left.Equals(right); + } + + /// <summary> + /// Determines whether the two specified Notification<T> objects have a different observer message payload. + /// </summary> + /// <param name="left">The first Notification<T> to compare, or null.</param> + /// <param name="right">The second Notification<T> to compare, or null.</param> + /// <returns>true if the first Notification<T> value has a different observer message payload as the second Notification<T> value; otherwise, false.</returns> + /// <remarks> + /// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any). + /// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method. + /// In case one wants to determine whether two Notification<T> objects represent a different observer method call, use Object.ReferenceEquals identity equality instead. + /// </remarks> + public static bool operator !=(Notification<T> left, Notification<T> right) + { + return !(left == right); + } + + /// <summary> + /// Determines whether the specified System.Object is equal to the current Notification<T>. + /// </summary> + /// <param name="obj">The System.Object to compare with the current Notification<T>.</param> + /// <returns>true if the specified System.Object is equal to the current Notification<T>; otherwise, false.</returns> + /// <remarks> + /// Equality of Notification<T> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any). + /// This means two Notification<T> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method. + /// In case one wants to determine whether two Notification<T> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead. + /// </remarks> + public override bool Equals(object obj) + { + return Equals(obj as Notification<T>); + } + + /// <summary> + /// Invokes the observer's method corresponding to the notification. + /// </summary> + /// <param name="observer">Observer to invoke the notification on.</param> + public abstract void Accept(IObserver<T> observer); + + /// <summary> + /// Invokes the observer's method corresponding to the notification and returns the produced result. + /// </summary> + /// <typeparam name="TResult">The type of the result returned from the observer's notification handlers.</typeparam> + /// <param name="observer">Observer to invoke the notification on.</param> + /// <returns>Result produced by the observation.</returns> + public abstract TResult Accept<TResult>(IObserver<T, TResult> observer); + + /// <summary> + /// Invokes the delegate corresponding to the notification. + /// </summary> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + public abstract void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted); + + /// <summary> + /// Invokes the delegate corresponding to the notification and returns the produced result. + /// </summary> + /// <typeparam name="TResult">The type of the result returned from the notification handler delegates.</typeparam> + /// <param name="onNext">Delegate to invoke for an OnNext notification.</param> + /// <param name="onError">Delegate to invoke for an OnError notification.</param> + /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param> + /// <returns>Result produced by the observation.</returns> + public abstract TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted); + + /// <summary> + /// Returns an observable sequence with a single notification, using the immediate scheduler. + /// </summary> + /// <returns>The observable sequence that surfaces the behavior of the notification upon subscription.</returns> + public IObservable<T> ToObservable() + { + return this.ToObservable(ImmediateScheduler.Instance); + } + + /// <summary> + /// Returns an observable sequence with a single notification. + /// </summary> + /// <param name="scheduler">Scheduler to send out the notification calls on.</param> + /// <returns>The observable sequence that surfaces the behavior of the notification upon subscription.</returns> + public IObservable<T> ToObservable(IScheduler scheduler) + { + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + return new AnonymousObservable<T>(observer => scheduler.Schedule(() => + { + this.Accept(observer); + if (this.Kind == NotificationKind.OnNext) + observer.OnCompleted(); + })); + } + } + + /// <summary> + /// Provides a set of static methods for constructing notifications. + /// </summary> + public static class Notification + { + /// <summary> + /// Creates an object that represents an OnNext notification to an observer. + /// </summary> + /// <typeparam name="T">The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.</typeparam> + /// <param name="value">The value contained in the notification.</param> + /// <returns>The OnNext notification containing the value.</returns> + public static Notification<T> CreateOnNext<T>(T value) + { + return new Notification<T>.OnNextNotification(value); + } + + /// <summary> + /// Creates an object that represents an OnError notification to an observer. + /// </summary> + /// <typeparam name="T">The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.</typeparam> + /// <param name="error">The exception contained in the notification.</param> + /// <returns>The OnError notification containing the exception.</returns> + /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception> + public static Notification<T> CreateOnError<T>(Exception error) + { + if (error == null) + throw new ArgumentNullException("error"); + + return new Notification<T>.OnErrorNotification(error); + } + + /// <summary> + /// Creates an object that represents an OnCompleted notification to an observer. + /// </summary> + /// <typeparam name="T">The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.</typeparam> + /// <returns>The OnCompleted notification.</returns> + public static Notification<T> CreateOnCompleted<T>() + { + return new Notification<T>.OnCompletedNotification(); + } + } +} + +#pragma warning restore 0659 +#pragma warning restore 0661
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/ObservableBase.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/ObservableBase.cs new file mode 100644 index 0000000..8945b2f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/ObservableBase.cs @@ -0,0 +1,107 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive +{ + /// <summary> + /// Abstract base class for implementations of the IObservable<T> interface. + /// </summary> + /// <remarks> + /// If you don't need a named type to create an observable sequence (i.e. you rather need + /// an instance rather than a reusable type), use the Observable.Create method to create + /// an observable sequence with specified subscription behavior. + /// </remarks> + /// <typeparam name="T">The type of the elements in the sequence.</typeparam> + public abstract class ObservableBase<T> : IObservable<T> + { + /// <summary> + /// Subscribes the given observer to the observable sequence. + /// </summary> + /// <param name="observer">Observer that will receive notifications from the observable sequence.</param> + /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns> + /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception> + public IDisposable Subscribe(IObserver<T> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + var autoDetachObserver = new AutoDetachObserver<T>(observer); + + if (CurrentThreadScheduler.IsScheduleRequired) + { + // + // Notice we don't protect this piece of code using an exception handler to + // redirect errors to the OnError channel. This call to Schedule will run the + // trampoline, so we'd be catching all exceptions, including those from user + // callbacks that happen to run there. For example, consider: + // + // Observable.Return(42, Scheduler.CurrentThread) + // .Subscribe(x => { throw new Exception(); }); + // + // Here, the OnNext(42) call would be scheduled on the trampoline, so when we + // return from the scheduled Subscribe call, the CurrentThreadScheduler moves + // on to invoking this work item. Too much of protection here would cause the + // exception thrown in OnNext to circle back to OnError, which looks like the + // sequence can't make up its mind. + // + CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe); + } + else + { + try + { + autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver); + } + catch (Exception exception) + { + // + // This can happen when there's a synchronous callback to OnError in the + // implementation of SubscribeCore, which also throws. So, we're seeing + // an exception being thrown from a handler. + // + // For compat with v1.x, we rethrow the exception in this case, keeping + // in mind this should be rare but if it happens, something's totally + // screwed up. + // + if (!autoDetachObserver.Fail(exception)) + throw; + } + } + + return autoDetachObserver; + } + + private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoDetachObserver) + { + try + { + autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver); + } + catch (Exception exception) + { + // + // This can happen when there's a synchronous callback to OnError in the + // implementation of SubscribeCore, which also throws. So, we're seeing + // an exception being thrown from a handler. + // + // For compat with v1.x, we rethrow the exception in this case, keeping + // in mind this should be rare but if it happens, something's totally + // screwed up. + // + if (!autoDetachObserver.Fail(exception)) + throw; + } + + return Disposable.Empty; + } + + /// <summary> + /// Implement this method with the core subscription logic for the observable sequence. + /// </summary> + /// <param name="observer">Observer to send notifications to.</param> + /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns> + protected abstract IDisposable SubscribeCore(IObserver<T> observer); + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/ObserverBase.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/ObserverBase.cs new file mode 100644 index 0000000..389a923 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/ObserverBase.cs @@ -0,0 +1,114 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Threading; + +namespace System.Reactive +{ + /// <summary> + /// Abstract base class for implementations of the IObserver<T> interface. + /// </summary> + /// <remarks>This base class enforces the grammar of observers where OnError and OnCompleted are terminal messages.</remarks> + /// <typeparam name="T">The type of the elements in the sequence.</typeparam> + public abstract class ObserverBase<T> : IObserver<T>, IDisposable + { + private int isStopped; + + /// <summary> + /// Creates a new observer in a non-stopped state. + /// </summary> + protected ObserverBase() + { + isStopped = 0; + } + + /// <summary> + /// Notifies the observer of a new element in the sequence. + /// </summary> + /// <param name="value">Next element in the sequence.</param> + public void OnNext(T value) + { + if (isStopped == 0) + OnNextCore(value); + } + + /// <summary> + /// Implement this method to react to the receival of a new element in the sequence. + /// </summary> + /// <param name="value">Next element in the sequence.</param> + /// <remarks>This method only gets called when the observer hasn't stopped yet.</remarks> + protected abstract void OnNextCore(T value); + + /// <summary> + /// Notifies the observer that an exception has occurred. + /// </summary> + /// <param name="error">The error that has occurred.</param> + /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception> + public void OnError(Exception error) + { + if (error == null) + throw new ArgumentNullException("error"); + + if (Interlocked.Exchange(ref isStopped, 1) == 0) + { + OnErrorCore(error); + } + } + + /// <summary> + /// Implement this method to react to the occurrence of an exception. + /// </summary> + /// <param name="error">The error that has occurred.</param> + /// <remarks>This method only gets called when the observer hasn't stopped yet, and causes the observer to stop.</remarks> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1716:IdentifiersShouldNotMatchKeywords", MessageId = "Error", Justification = "Same name as in the IObserver<T> definition of OnError in the BCL.")] + protected abstract void OnErrorCore(Exception error); + + /// <summary> + /// Notifies the observer of the end of the sequence. + /// </summary> + public void OnCompleted() + { + if (Interlocked.Exchange(ref isStopped, 1) == 0) + { + OnCompletedCore(); + } + } + + /// <summary> + /// Implement this method to react to the end of the sequence. + /// </summary> + /// <remarks>This method only gets called when the observer hasn't stopped yet, and causes the observer to stop.</remarks> + protected abstract void OnCompletedCore(); + + /// <summary> + /// Disposes the observer, causing it to transition to the stopped state. + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// <summary> + /// Core implementation of IDisposable. + /// </summary> + /// <param name="disposing">true if the Dispose call was triggered by the IDisposable.Dispose method; false if it was triggered by the finalizer.</param> + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + isStopped = 1; + } + } + + internal bool Fail(Exception error) + { + if (Interlocked.Exchange(ref isStopped, 1) == 0) + { + OnErrorCore(error); + return true; + } + + return false; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Unit.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Unit.cs new file mode 100644 index 0000000..4d3429b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Unit.cs @@ -0,0 +1,82 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + /// <summary> + /// Represents a type with a single value. This type is often used to denote the successful completion of a void-returning method (C#) or a Sub procedure (Visual Basic). + /// </summary> +#if !NO_SERIALIZABLE + [Serializable] +#endif + public struct Unit : IEquatable<Unit> + { + /// <summary> + /// Determines whether the specified Unit values is equal to the current Unit. Because Unit has a single value, this always returns true. + /// </summary> + /// <param name="other">An object to compare to the current Unit value.</param> + /// <returns>Because Unit has a single value, this always returns true.</returns> + public bool Equals(Unit other) + { + return true; + } + + /// <summary> + /// Determines whether the specified System.Object is equal to the current Unit. + /// </summary> + /// <param name="obj">The System.Object to compare with the current Unit.</param> + /// <returns>true if the specified System.Object is a Unit value; otherwise, false.</returns> + public override bool Equals(object obj) + { + return obj is Unit; + } + + /// <summary> + /// Returns the hash code for the current Unit value. + /// </summary> + /// <returns>A hash code for the current Unit value.</returns> + public override int GetHashCode() + { + return 0; + } + + /// <summary> + /// Returns a string representation of the current Unit value. + /// </summary> + /// <returns>String representation of the current Unit value.</returns> + public override string ToString() + { + return "()"; + } + + /// <summary> + /// Determines whether the two specified Unit values are equal. Because Unit has a single value, this always returns true. + /// </summary> + /// <param name="first">The first Unit value to compare.</param> + /// <param name="second">The second Unit value to compare.</param> + /// <returns>Because Unit has a single value, this always returns true.</returns> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA1801:ReviewUnusedParameters", MessageId = "first", Justification = "Parameter required for operator overloading."), System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA1801:ReviewUnusedParameters", MessageId = "second", Justification = "Parameter required for operator overloading.")] + public static bool operator ==(Unit first, Unit second) + { + return true; + } + + /// <summary> + /// Determines whether the two specified Unit values are not equal. Because Unit has a single value, this always returns false. + /// </summary> + /// <param name="first">The first Unit value to compare.</param> + /// <param name="second">The second Unit value to compare.</param> + /// <returns>Because Unit has a single value, this always returns false.</returns> + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA1801:ReviewUnusedParameters", MessageId = "first", Justification = "Parameter required for operator overloading."), System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA1801:ReviewUnusedParameters", MessageId = "second", Justification = "Parameter required for operator overloading.")] + public static bool operator !=(Unit first, Unit second) + { + return false; + } + + static readonly Unit _default = new Unit(); + + /// <summary> + /// Gets the single unit value. + /// </summary> + public static Unit Default { get { return _default; } } + } +} |