diff options
Diffstat (limited to 'Rx/NET/Source/Tests.System.Reactive/Stress/Linq/FromEvent.cs')
-rw-r--r-- | Rx/NET/Source/Tests.System.Reactive/Stress/Linq/FromEvent.cs | 412 |
1 files changed, 412 insertions, 0 deletions
diff --git a/Rx/NET/Source/Tests.System.Reactive/Stress/Linq/FromEvent.cs b/Rx/NET/Source/Tests.System.Reactive/Stress/Linq/FromEvent.cs new file mode 100644 index 0000000..f867795 --- /dev/null +++ b/Rx/NET/Source/Tests.System.Reactive/Stress/Linq/FromEvent.cs @@ -0,0 +1,412 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if STRESS +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reflection; +using System.Threading; + +namespace ReactiveTests.Stress.Linq +{ + public class FromEvent + { + private static Lazy<Random> s_rand = new Lazy<Random>(); + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// + /// Runs a set of combinations of the RefCount_* tests. + /// </summary> + public static void RefCount_Mix() + { + Console.Title = MethodInfo.GetCurrentMethod().Name + " - 0% complete"; + + for (int i = 1; i <= 100; i++) + { + var repeatCount = 10; + + foreach (var msgCount in new[] { 100, 1000, 10000, 100000 }) + { + // concurrency level {10, 20, ..., 100} + RefCount_ConcurrencyLevel_Linear(msgCount, repeatCount, 10, 100, 10); + + // concurrency level {100, 200, ..., 1000} + RefCount_ConcurrencyLevel_Linear(msgCount, repeatCount, 100, 1000, 100); + + // concurrency level {1, 2, 4, ..., 65536} + RefCount_ConcurrencyLevel_Exponential(msgCount, repeatCount, 1, 65536, 2); + } + + foreach (var maxMsgCount in new[] { 10, 100, 1000, 10000, 100000 }) + { + foreach (var maxConcurrency in new[] { 10, 100, 1000, 10000, 100000 }) + { + RefCount_Rand(repeatCount, maxMsgCount, maxConcurrency); + } + } + + Console.Title = MethodInfo.GetCurrentMethod().Name + " - " + i + "% complete"; + } + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps. + /// + /// Runs a set of combinations of the RefCount_* tests. + /// </summary> + public static void RefCountWithPost_Mix() + { + Console.Title = MethodInfo.GetCurrentMethod().Name + " - 0% complete"; + + for (int i = 1; i <= 100; i++) + { + var repeatCount = 10; + + foreach (var msgCount in new[] { 100, 1000, 10000, 100000 }) + { + // concurrency level {10, 20, ..., 100} + RefCountWithPost_ConcurrencyLevel_Linear(msgCount, repeatCount, 10, 100, 10); + + // concurrency level {100, 200, ..., 1000} + RefCountWithPost_ConcurrencyLevel_Linear(msgCount, repeatCount, 100, 1000, 100); + + // concurrency level {1, 2, 4, ..., 65536} + RefCountWithPost_ConcurrencyLevel_Exponential(msgCount, repeatCount, 1, 65536, 2); + } + + foreach (var maxMsgCount in new[] { 10, 100, 1000, 10000, 100000 }) + { + foreach (var maxConcurrency in new[] { 10, 100, 1000, 10000, 100000 }) + { + RefCountWithPost_Rand(repeatCount, maxMsgCount, maxConcurrency); + } + } + + Console.Title = MethodInfo.GetCurrentMethod().Name + " - " + i + "% complete"; + } + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// + /// Uses random parameters for the number of messages and the level of concurrency. + /// </summary> + /// <param name="n">Number of iterations.</param> + /// <param name="maxN">Maximum number of message.</param> + /// <param name="maxM">Maximum level of concurrency.</param> + public static void RefCount_Rand(int n, int maxN, int maxM) + { + RefCount_(RefCount_Rand_Params(n, maxN, maxM)); + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps. + /// + /// Uses random parameters for the number of messages and the level of concurrency. + /// </summary> + /// <param name="n">Number of iterations.</param> + /// <param name="maxN">Maximum number of message.</param> + /// <param name="maxM">Maximum level of concurrency.</param> + public static void RefCountWithPost_Rand(int n, int maxN, int maxM) + { + RefCountWithPost_(RefCount_Rand_Params(n, maxN, maxM)); + } + + private static IEnumerable<Tuple<int, int>> RefCount_Rand_Params(int n, int maxN, int maxM) + { + for (int i = 0; i < n; i++) + { + var N = s_rand.Value.Next(1, maxN); + var M = s_rand.Value.Next(1, maxM); + + yield return new Tuple<int, int>(N, M); + } + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// + /// Uses linear increments for the concurrency level. + /// </summary> + /// <param name="N">Number of messages.</param> + /// <param name="n">Number of iterations.</param> + /// <param name="min">Minimum level of concurrency.</param> + /// <param name="max">Maximum level of concurrency.</param> + /// <param name="step">Additive step size to increase level of concurrency.</param> + public static void RefCount_ConcurrencyLevel_Linear(int N, int n, int min, int max, int step) + { + RefCount_(RefCount_ConcurrencyLevel_Linear_Params(N, n, min, max, step)); + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps. + /// + /// Uses linear increments for the concurrency level. + /// </summary> + /// <param name="N">Number of messages.</param> + /// <param name="n">Number of iterations.</param> + /// <param name="min">Minimum level of concurrency.</param> + /// <param name="max">Maximum level of concurrency.</param> + /// <param name="step">Additive step size to increase level of concurrency.</param> + public static void RefCountWithPost_ConcurrencyLevel_Linear(int N, int n, int min, int max, int step) + { + RefCountWithPost_(RefCount_ConcurrencyLevel_Linear_Params(N, n, min, max, step)); + } + + private static IEnumerable<Tuple<int, int>> RefCount_ConcurrencyLevel_Linear_Params(int N, int n, int min, int max, int step) + { + for (int i = 0; i < n; i++) + { + for (int M = min; M <= max; M += step) + { + yield return new Tuple<int, int>(N, M); + } + } + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// + /// Uses exponential increments for the concurrency level. + /// </summary> + /// <param name="N">Number of messages.</param> + /// <param name="n">Number of iterations.</param> + /// <param name="min">Minimum level of concurrency.</param> + /// <param name="max">Maximum level of concurrency.</param> + /// <param name="step">Multiplicative step size to increase level of concurrency.</param> + public static void RefCount_ConcurrencyLevel_Exponential(int N, int n, int min, int max, int step) + { + RefCount_(RefCount_ConcurrencyLevel_Exponential_Params(N, n, min, max, step)); + } + + /// <summary> + /// Multiple threads are subscribing to a FromEventPattern sequence and disposing their subscriptions. + /// While this is going on, one consumer does not want to be disturbed while receiving the sequence. + /// Subscriptions are happening on the ThreadPool, possibly causing (expected) time gaps. + /// + /// Uses exponential increments for the concurrency level. + /// </summary> + /// <param name="N">Number of messages.</param> + /// <param name="n">Number of iterations.</param> + /// <param name="min">Minimum level of concurrency.</param> + /// <param name="max">Maximum level of concurrency.</param> + /// <param name="step">Multiplicative step size to increase level of concurrency.</param> + public static void RefCountWithPost_ConcurrencyLevel_Exponential(int N, int n, int min, int max, int step) + { + RefCountWithPost_(RefCount_ConcurrencyLevel_Exponential_Params(N, n, min, max, step)); + } + + private static IEnumerable<Tuple<int, int>> RefCount_ConcurrencyLevel_Exponential_Params(int N, int n, int min, int max, int step) + { + for (int i = 0; i < n; i++) + { + for (int M = min; M <= max; M *= step) + { + yield return new Tuple<int, int>(N, M); + } + } + } + + private static void RefCount_(IEnumerable<Tuple<int, int>> parameters) + { + foreach (var p in parameters) + { + var N = p.Item1; + var M = p.Item2; + + Console.Write("N = {0}, M = {1} - ", N, M); + + var bar = new Bar(); + + var foo = Observable.FromEventPattern<FooEventArgs>(h => { Console.Write("+"); bar.Foo += h; }, h => { bar.Foo -= h; Console.Write("-"); }); + + var res = new List<int>(); + var n = 0; + var e = new ManualResetEvent(false); + + var cd = new CountdownEvent(M * 2); + for (int i = 0; i < M; i++) + { + var f = new SingleAssignmentDisposable(); + + ThreadPool.QueueUserWorkItem(_ => + { + f.Disposable = foo.Subscribe(__ => { Console.Write("!"); }); + cd.Signal(); + }); + + ThreadPool.QueueUserWorkItem(_ => + { + f.Dispose(); + cd.Signal(); + }); + } + + Console.Write("{SB}"); + + var d = foo.Subscribe(x => + { + //Console.Write("&"); + + if (++n == N) + e.Set(); + + res.Add(x.EventArgs.Qux); + }); + + Console.Write("{SE}"); + + var t = new Thread(() => + { + Console.Write("{TB}"); + + for (int i = 0; i < N; i++) + bar.OnFoo(i); + + Console.Write("{TE}"); + }); + + t.Start(); + t.Join(); + + cd.Wait(); + + e.WaitOne(); + d.Dispose(); + + if (!res.SequenceEqual(Enumerable.Range(0, N))) + { + Console.WriteLine("Panic!"); + break; + } + + Console.WriteLine("."); + } + } + + private static void RefCountWithPost_(IEnumerable<Tuple<int, int>> parameters) + { + var worker = new Thread(() => + { + SynchronizationContext.SetSynchronizationContext(new MySyncCtx()); + + foreach (var p in parameters) + { + var N = p.Item1; + var M = p.Item2; + + Console.Write("N = {0}, M = {1} - ", N, M); + + var bar = new Bar(); + + var foo = Observable.FromEventPattern<FooEventArgs>(h => { /*Console.Write("+");*/ bar.Foo += h; }, h => { bar.Foo -= h; /*Console.Write("-"); */}); + + var e = new ManualResetEvent(false); + + var cd = new CountdownEvent(M * 2); + for (int i = 0; i < M; i++) + { + var f = new SingleAssignmentDisposable(); + + ThreadPool.QueueUserWorkItem(_ => + { + f.Disposable = foo.Subscribe(__ => { /*Console.Write("!");*/ }); + cd.Signal(); + }); + + ThreadPool.QueueUserWorkItem(_ => + { + f.Dispose(); + cd.Signal(); + }); + } + + var hasObserved = 0; + + Console.Write("{SB}"); + + var d = foo.Subscribe(x => + { + // + // [on BARTDE-M6500 with CPU and RAM pressure] + // + // Up to 8K concurrent observers, we typically don't see a time gap (expected worst-case behavior). + // The code below uses an event to check the desired behavior of eventually tuning in to the event stream. + // + Console.Write("&" + x.EventArgs.Qux); + e.Set(); + Interlocked.Exchange(ref hasObserved, 1); + }); + + Console.Write("{SE}"); + + var t = new Thread(() => + { + Console.Write("{TB}"); + + var i = 0; + while (Thread.VolatileRead(ref hasObserved) == 0) + bar.OnFoo(i++); + + Console.Write("{TE}"); + }); + + t.Start(); + t.Join(); + + cd.Wait(); + + e.WaitOne(); + d.Dispose(); + + Console.WriteLine("."); + } + }); + + worker.Start(); + worker.Join(); + } + + class Bar + { + public event EventHandler<FooEventArgs> Foo; + + public void OnFoo(int x) + { + var foo = Foo; + if (foo != null) + foo(this, new FooEventArgs { Qux = x }); + } + } + + class FooEventArgs : EventArgs + { + public int Qux { get; set; } + } + + class MySyncCtx : SynchronizationContext + { + public override void Post(SendOrPostCallback d, object state) + { + ThreadPool.QueueUserWorkItem(_ => + { + d(state); + }); + } + } + } +} +#endif
\ No newline at end of file |