Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
committerAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
commit74a538f6725ebc83efda4bb07d5747e8a6359e19 (patch)
tree7c98de97c88c78b4aca4b25b36db310f82c26865 /Rx/NET/Source/System.Reactive.Core/Reactive
parent50e7bdb4507f7e4c2aefb7772d57d9a80f4d42b0 (diff)
Import Official Rx 2.2 (3ebdd2e09991)HEADmaster
I made changes from the original source tree to match the older tree so that we don't have to make several changes to project tree generator. (There is actually no new sources in Rx so hopefully we can just reuse existing modifications in the tree).
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Core/Reactive')
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs132
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs6
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs2
6 files changed, 125 insertions, 31 deletions
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs
index cc9cd57..b3ec5be 100644
--- a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/ConcurrencyAbstractionLayer.Default.cs
@@ -22,16 +22,21 @@ namespace System.Reactive.Concurrency
public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
{
- //
- // MSDN documentation states the following:
- //
- // "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once;
- // the periodic behavior of the timer is disabled, but can be re-enabled using the Change method."
- //
- if (period <= TimeSpan.Zero)
+ if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("period");
- return new PeriodicTimer(action, period);
+ //
+ // The contract for periodic scheduling in Rx is that specifying TimeSpan.Zero as the period causes the scheduler to
+ // call back periodically as fast as possible, sequentially.
+ //
+ if (period == TimeSpan.Zero)
+ {
+ return new FastPeriodicTimer(action);
+ }
+ else
+ {
+ return new PeriodicTimer(action, period);
+ }
}
public IDisposable QueueUserWorkItem(Action<object> action, object state)
@@ -362,6 +367,37 @@ namespace System.Reactive.Concurrency
}
}
#endif
+
+ class FastPeriodicTimer : IDisposable
+ {
+ private readonly Action _action;
+ private bool disposed;
+
+ public FastPeriodicTimer(Action action)
+ {
+ _action = action;
+
+ new System.Threading.Thread(Loop)
+ {
+ Name = "Rx-FastPeriodicTimer",
+ IsBackground = true
+ }
+ .Start();
+ }
+
+ private void Loop()
+ {
+ while (!disposed)
+ {
+ _action();
+ }
+ }
+
+ public void Dispose()
+ {
+ disposed = true;
+ }
+ }
}
}
#else
@@ -375,8 +411,12 @@ namespace System.Reactive.Concurrency
{
public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime)
{
- var cancel = new CancellationDisposable();
+ var cancel = new CancellationDisposable();
+#if USE_TASKEX
+ TaskEx.Delay(dueTime, cancel.Token).ContinueWith(
+#else
Task.Delay(dueTime, cancel.Token).ContinueWith(
+#endif
_ => action(state),
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion
);
@@ -385,24 +425,35 @@ namespace System.Reactive.Concurrency
public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
{
- var cancel = new CancellationDisposable();
-
- var moveNext = default(Action);
- moveNext = () =>
+ if (period <= TimeSpan.Zero)
{
- Task.Delay(period, cancel.Token).ContinueWith(
- _ =>
- {
- moveNext();
- action();
- },
- TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion
- );
- };
-
- moveNext();
+ return new FastPeriodicTimer(action);
+ }
+ else
+ {
+ var cancel = new CancellationDisposable();
- return cancel;
+ var moveNext = default(Action);
+ moveNext = () =>
+ {
+#if USE_TASKEX
+ TaskEx.Delay(period, cancel.Token).ContinueWith(
+#else
+ Task.Delay(period, cancel.Token).ContinueWith(
+#endif
+ _ =>
+ {
+ moveNext();
+ action();
+ },
+ TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion
+ );
+ };
+
+ moveNext();
+
+ return cancel;
+ }
}
public IDisposable QueueUserWorkItem(Action<object> action, object state)
@@ -414,7 +465,12 @@ namespace System.Reactive.Concurrency
public void Sleep(TimeSpan timeout)
{
+#if USE_TASKEX
+ TaskEx.Delay(timeout).Wait();
+#else
Task.Delay(timeout).Wait();
+#endif
+
}
public IStopwatch StartStopwatch()
@@ -434,6 +490,32 @@ namespace System.Reactive.Concurrency
action(state);
}, TaskCreationOptions.LongRunning);
}
+
+ class FastPeriodicTimer : IDisposable
+ {
+ private readonly Action _action;
+ private bool disposed;
+
+ public FastPeriodicTimer(Action action)
+ {
+ _action = action;
+
+ Task.Factory.StartNew(Loop, TaskCreationOptions.LongRunning);
+ }
+
+ private void Loop()
+ {
+ while (!disposed)
+ {
+ _action();
+ }
+ }
+
+ public void Dispose()
+ {
+ disposed = true;
+ }
+ }
}
}
#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs
index ded2b28..7518037 100644
--- a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs
@@ -93,6 +93,11 @@ namespace System.Reactive.Concurrency
/// </summary>
private static readonly TimeSpan RETRYSHORT = TimeSpan.FromMilliseconds(50);
+ /// <summary>
+ /// Longest interval supported by <see cref="System.Threading.Timer"/>.
+ /// </summary>
+ private static readonly TimeSpan MAXSUPPORTEDTIMER = TimeSpan.FromMilliseconds((1L << 32) - 2);
+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")]
static LocalScheduler()
{
@@ -317,8 +322,13 @@ namespace System.Reactive.Concurrency
var remainder = TimeSpan.FromTicks(Math.Max(due.Ticks / MAXERRORRATIO, LONGTOSHORT.Ticks));
var dueEarly = due - remainder;
+ //
+ // Limit the interval to maximum supported by underlying Timer.
+ //
+ var dueCapped = TimeSpan.FromTicks(Math.Min(dueEarly.Ticks, MAXSUPPORTEDTIMER.Ticks));
+
s_nextLongTermWorkItem = next;
- s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueEarly);
+ s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueCapped);
}
}
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs
index 7fd80d0..20a8a40 100644
--- a/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Concurrency/Synchronization.ObserveOn.cs
@@ -35,7 +35,7 @@ namespace System.Reactive.Concurrency
#if !NO_SYNCCTX
if (_context != null)
{
- var sink = new ς(this, observer, cancel);
+ var sink = new ObserveOnSink(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -49,11 +49,11 @@ namespace System.Reactive.Concurrency
}
#if !NO_SYNCCTX
- class ς : Sink<TSource>, IObserver<TSource>
+ class ObserveOnSink : Sink<TSource>, IObserver<TSource>
{
private readonly ObserveOn<TSource> _parent;
- public ς(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public ObserveOnSink(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs
index 8b7ec81..25e3957 100644
--- a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs
@@ -99,6 +99,8 @@ namespace System.Collections.Concurrent
public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { }
+ public ConcurrentDictionary(int capacity, IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, capacity, true, comparer) { }
+
internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
{
if (concurrencyLevel < 1)
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs
index f2483e8..26d29d8 100644
--- a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/PlatformEnlightenmentProvider.cs
@@ -60,7 +60,7 @@ namespace System.Reactive.PlatformServices
#if NETCF35
var name = "System.Reactive.PlatformServices.CurrentPlatformEnlightenmentProvider, System.Reactive.PlatformServices";
#else
-#if CRIPPLED_REFLECTION
+#if CRIPPLED_REFLECTION && HAS_WINRT
var ifType = typeof(IPlatformEnlightenmentProvider).GetTypeInfo();
#else
var ifType = typeof(IPlatformEnlightenmentProvider);
diff --git a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs
index 4f5eeee..d73a81f 100644
--- a/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs
+++ b/Rx/NET/Source/System.Reactive.Core/Reactive/Internal/Semaphore.Silverlight.cs
@@ -1,6 +1,6 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-#if NO_SEMAPHORE && SILVERLIGHT
+#if NO_SEMAPHORE && (SILVERLIGHT || PLIB_LITE)
using System;
using System.Threading;