diff options
author | Atsushi Eno <atsushieno@gmail.com> | 2013-12-16 17:30:03 +0400 |
---|---|---|
committer | Atsushi Eno <atsushieno@gmail.com> | 2013-12-16 17:30:03 +0400 |
commit | 74a538f6725ebc83efda4bb07d5747e8a6359e19 (patch) | |
tree | 7c98de97c88c78b4aca4b25b36db310f82c26865 /Rx/NET/Source/System.Reactive.Core/Reactive | |
parent | 50e7bdb4507f7e4c2aefb7772d57d9a80f4d42b0 (diff) |
I made changes from the original source tree to match the older tree
so that we don't have to make several changes to project tree generator.
(There is actually no new sources in Rx so hopefully we can just reuse
existing modifications in the tree).
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Core/Reactive')
6 files changed, 125 insertions, 31 deletions
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs index cc9cd57..b3ec5be 100644 --- a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs @@ -22,16 +22,21 @@ namespace System.Reactive.Concurrency public IDisposable StartPeriodicTimer(Action action, TimeSpan period) { - // - // MSDN documentation states the following: - // - // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once; - // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method." - // - if (period <= TimeSpan.Zero) + if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException("period"); - return new PeriodicTimer(action, period); + // + // The contract for periodic scheduling in Rx is that specifying TimeSpan.Zero as the period causes the scheduler to + // call back periodically as fast as possible, sequentially. + // + if (period == TimeSpan.Zero) + { + return new FastPeriodicTimer(action); + } + else + { + return new PeriodicTimer(action, period); + } } public IDisposable QueueUserWorkItem(Action<object> action, object state) @@ -362,6 +367,37 @@ namespace System.Reactive.Concurrency } } #endif + + class FastPeriodicTimer : IDisposable + { + private readonly Action _action; + private bool disposed; + + public FastPeriodicTimer(Action action) + { + _action = action; + + new System.Threading.Thread(Loop) + { + Name = "Rx-FastPeriodicTimer", + IsBackground = true + } + .Start(); + } + + private void Loop() + { + while (!disposed) + { + _action(); + } + } + + public void Dispose() + { + disposed = true; + } + } } } #else @@ -375,8 +411,12 @@ namespace System.Reactive.Concurrency { public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime) { - var cancel = new CancellationDisposable(); + var cancel = new CancellationDisposable(); +#if USE_TASKEX + TaskEx.Delay(dueTime, cancel.Token).ContinueWith( +#else Task.Delay(dueTime, cancel.Token).ContinueWith( +#endif _ => action(state), TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion ); @@ -385,24 +425,35 @@ namespace System.Reactive.Concurrency public IDisposable StartPeriodicTimer(Action action, TimeSpan period) { - var cancel = new CancellationDisposable(); - - var moveNext = default(Action); - moveNext = () => + if (period <= TimeSpan.Zero) { - Task.Delay(period, cancel.Token).ContinueWith( - _ => - { - moveNext(); - action(); - }, - TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion - ); - }; - - moveNext(); + return new FastPeriodicTimer(action); + } + else + { + var cancel = new CancellationDisposable(); - return cancel; + var moveNext = default(Action); + moveNext = () => + { +#if USE_TASKEX + TaskEx.Delay(period, cancel.Token).ContinueWith( +#else + Task.Delay(period, cancel.Token).ContinueWith( +#endif + _ => + { + moveNext(); + action(); + }, + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion + ); + }; + + moveNext(); + + return cancel; + } } public IDisposable QueueUserWorkItem(Action<object> action, object state) @@ -414,7 +465,12 @@ namespace System.Reactive.Concurrency public void Sleep(TimeSpan timeout) { +#if USE_TASKEX + TaskEx.Delay(timeout).Wait(); +#else Task.Delay(timeout).Wait(); +#endif + } public IStopwatch StartStopwatch() @@ -434,6 +490,32 @@ namespace System.Reactive.Concurrency action(state); }, TaskCreationOptions.LongRunning); } + + class FastPeriodicTimer : IDisposable + { + private readonly Action _action; + private bool disposed; + + public FastPeriodicTimer(Action action) + { + _action = action; + + Task.Factory.StartNew(Loop, TaskCreationOptions.LongRunning); + } + + private void Loop() + { + while (!disposed) + { + _action(); + } + } + + public void Dispose() + { + disposed = true; + } + } } } #endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs index ded2b28..7518037 100644 --- a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs @@ -93,6 +93,11 @@ namespace System.Reactive.Concurrency /// </summary> private static readonly TimeSpan RETRYSHORT = TimeSpan.FromMilliseconds(50); + /// <summary> + /// Longest interval supported by <see cref="System.Threading.Timer"/>. + /// </summary> + private static readonly TimeSpan MAXSUPPORTEDTIMER = TimeSpan.FromMilliseconds((1L << 32) - 2); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")] static LocalScheduler() { @@ -317,8 +322,13 @@ namespace System.Reactive.Concurrency var remainder = TimeSpan.FromTicks(Math.Max(due.Ticks / MAXERRORRATIO, LONGTOSHORT.Ticks)); var dueEarly = due - remainder; + // + // Limit the interval to maximum supported by underlying Timer. + // + var dueCapped = TimeSpan.FromTicks(Math.Min(dueEarly.Ticks, MAXSUPPORTEDTIMER.Ticks)); + s_nextLongTermWorkItem = next; - s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueEarly); + s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueCapped); } } diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs index 7fd80d0..20a8a40 100644 --- a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs @@ -35,7 +35,7 @@ namespace System.Reactive.Concurrency #if !NO_SYNCCTX if (_context != null) { - var sink = new ς(this, observer, cancel); + var sink = new ObserveOnSink(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -49,11 +49,11 @@ namespace System.Reactive.Concurrency } #if !NO_SYNCCTX - class ς : Sink<TSource>, IObserver<TSource> + class ObserveOnSink : Sink<TSource>, IObserver<TSource> { private readonly ObserveOn<TSource> _parent; - public ς(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public ObserveOnSink(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs index 8b7ec81..25e3957 100644 --- a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs @@ -99,6 +99,8 @@ namespace System.Collections.Concurrent public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { } + public ConcurrentDictionary(int capacity, IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, capacity, true, comparer) { } + internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) { if (concurrencyLevel < 1) diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs index f2483e8..26d29d8 100644 --- a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs @@ -60,7 +60,7 @@ namespace System.Reactive.PlatformServices #if NETCF35 var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, System.Reactive.PlatformServices"; #else -#if CRIPPLED_REFLECTION +#if CRIPPLED_REFLECTION && HAS_WINRT var ifType = typeof(IPlatformEnlightenmentProvider).GetTypeInfo(); #else var ifType = typeof(IPlatformEnlightenmentProvider); diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs index 4f5eeee..d73a81f 100644 --- a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs +++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs @@ -1,6 +1,6 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. -#if NO_SEMAPHORE && SILVERLIGHT +#if NO_SEMAPHORE && (SILVERLIGHT || PLIB_LITE) using System; using System.Threading; |