Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs')
-rw-r--r--Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs6870
1 files changed, 6741 insertions, 129 deletions
diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
index 0670909..dd17353 100644
--- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
+++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
@@ -2486,7 +2486,7 @@ namespace ReactiveTests.Tests
var nullGroup = scheduler.CreateObserver<string>();
var err = default(Exception);
-
+
scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
scheduler.Start();
@@ -2553,6 +2553,1704 @@ namespace ReactiveTests.Tests
#endregion
+ #region + GroupBy w/capacity +
+
+ private const int _groupByCapacity = 1024;
+
+ [TestMethod]
+ public void GroupBy_Capacity_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_KeyEle_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_KeyComparer_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, (IEqualityComparer<int>)null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Key_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_WithKeyComparer()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ }, _groupByCapacity, comparer).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(570, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ }, x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ }, _groupByCapacity, comparer
+ ).Select(g => g.Key),
+ 355
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 355)
+ );
+
+ Assert.AreEqual(5, keyInvoked);
+ Assert.AreEqual(5, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 10)
+ throw ex;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(9, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 10)
+ throw ex;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(10, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_ComparerEqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnError<string>(310, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 310)
+ );
+
+ Assert.AreEqual(4, keyInvoked);
+ Assert.AreEqual(3, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_ComparerGetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(8, keyInvoked);
+ Assert.AreEqual(7, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(570)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Complete_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(570)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex1 = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex1),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }, ex => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnError<string>(570, ex1)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnError<string>(570, ex1)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnError<string>(570, ex1)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(570, ex1)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(400, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof")
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab ")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 400)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var keyInvoked = 0;
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 6)
+ throw ex;
+ return x.Trim();
+ }, x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(3, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(360, ex)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var eleInvoked = 0;
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 6)
+ throw ex;
+ return Reverse(x);
+ }, _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(360, ex)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Comparer_EqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Comparer_GetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(2, inners.Count);
+
+ outerResults.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR")
+ );
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof")
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(570)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Multiple_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+ scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
+ scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
+ scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof")
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Escape_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnCompleted<string>(600)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Escape_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<string>(600, ex)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Escape_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, new Exception())
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose());
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 400)
+ );
+
+ res.Messages.AssertEqual(
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_NullKeys_Simple()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_NullKeys_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ var nullGroup = scheduler.CreateObserver<string>();
+ var err = default(Exception);
+
+ scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
+ scheduler.Start();
+
+ Assert.AreSame(ex, err);
+
+ nullGroup.Messages.AssertEqual(
+ OnNext(220, "bar"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ #endregion
+
#region + GroupByUntil +
[TestMethod]
@@ -4405,6 +6103,1874 @@ namespace ReactiveTests.Tests
#endregion
+ #region + GroupByUntil w/capacity +
+
+ private const int _groupByUntilCapacity = 1024;
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>)));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>)));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1));
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1));
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_WithKeyComparer()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnError<string>(570, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key),
+ 355
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 355)
+ );
+
+ Assert.AreEqual(5, keyInvoked);
+ Assert.AreEqual(5, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 10)
+ throw ex;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(9, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 10)
+ throw ex;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(10, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_ComparerEqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnError<string>(310, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 310)
+ );
+
+ Assert.AreEqual(4, keyInvoked);
+ Assert.AreEqual(3, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_ComparerGetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(8, keyInvoked);
+ Assert.AreEqual(7, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnCompleted<string>(320)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnCompleted<string>(570)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Complete_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex1 = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex1),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }, ex => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnCompleted<string>(320)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(570, ex1)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnError<string>(570, ex1)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(400, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab ")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 400)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var keyInvoked = 0;
+ var ex = new Exception();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 6)
+ throw ex;
+ return x.Trim();
+ }, x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(3, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 6)
+ throw ex;
+ return Reverse(x);
+ }, g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Comparer_EqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Comparer_GetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(2, inners.Count);
+
+ outerResults.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR")
+ );
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(420)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Multiple_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+ scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
+ scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
+ scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Escape_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnCompleted<string>(600)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Escape_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<string>(600, ex)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Escape_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, new Exception())
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose());
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+
+ res.Messages.AssertEqual(
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Default()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim().ToLower();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "bar"),
+ OnNext(350, "baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "foo"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_DurationSelector_Throws()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, "foo")
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil<string, string, string>(x => x, g => { throw ex; }, _groupByUntilCapacity)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<IGroupedObservable<string, string>>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Simple_Never()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Simple_Expire1()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var n = 0;
+ var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ Assert.AreEqual(2, n);
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Simple_Expire2()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var n = 0;
+ var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ Assert.AreEqual(2, n);
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ var nullGroup = scheduler.CreateObserver<string>();
+ var err = default(Exception);
+
+ scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
+ scheduler.Start();
+
+ Assert.AreSame(ex, err);
+
+ nullGroup.Messages.AssertEqual(
+ OnNext(220, "bar"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ #endregion
+
#region + GroupJoin +
[TestMethod]
@@ -8425,196 +11991,620 @@ namespace ReactiveTests.Tests
}
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector);
- public void SelectMany_WithIndex_Complete()
+ public void SelectManyWithIndex_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IObservable<int>>)null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance).Subscribe(null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => Observable.Return(new { x, i }))
+ );
+
+ var witness = new { x = 0, i = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0 }),
+ OnNext(220, new { x = 3, i = 1 }),
+ OnNext(250, new { x = 5, i = 2 }),
+ OnNext(270, new { x = 1, i = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => x)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnNext(930, 401),
+ OnNext(940, 402),
+ OnCompleted<int>(960)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 900));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 960));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 950));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Complete_InnerNotComplete()
{
var scheduler = new TestScheduler();
- ITestableObservable<char> cs = scheduler.CreateHotObservable(
- OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
- OnNext(250, 'a'),
- OnNext(270, 'l'),
- OnNext(310, 'o'),
- OnCompleted<char>(410)
- );
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
var res = scheduler.Start(() =>
- cs.SelectMany(
- (x, i) => Observable.Return(new { x, i }, scheduler)
- ));
+ xs.SelectMany((x, _) => x)
+ );
res.Messages.AssertEqual(
- OnNext(251, new { x = 'a', i = 0 }),
- OnNext(271, new { x = 'l', i = 1 }),
- OnNext(311, new { x = 'o', i = 2 }),
- OnCompleted(new { x = default(char), i = default(int) }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnNext(930, 401),
+ OnNext(940, 402)
);
- cs.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 900));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 1000));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 960));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 950));
}
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector);
- public void SelectMany_WithIndex_IEnumerable_Complete()
+ public void SelectManyWithIndex_Complete_OuterNotComplete()
{
var scheduler = new TestScheduler();
- ITestableObservable<char> cs = scheduler.CreateHotObservable(
- OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
- OnNext(250, 'a'),
- OnNext(270, 'l'),
- OnNext(310, 'o'),
- OnCompleted<char>(410)
- );
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100)))
+ );
var res = scheduler.Start(() =>
- cs.SelectMany(
- (c, i) => new [] { new { c = c, i = i } }
- ));
+ xs.SelectMany((x, _) => x)
+ );
-
res.Messages.AssertEqual(
- OnNext(250, new { c = 'a', i = 0 }),
- OnNext(270, new { c = 'l', i = 1 }),
- OnNext(310, new { c = 'o', i = 2 }),
- OnCompleted(new { c = default(char), i = default(int) }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnNext(930, 401),
+ OnNext(940, 402)
);
- cs.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 1000));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 960));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 950));
}
+ [TestMethod]
+ public void SelectManyWithIndex_Error_Outer()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnError<ITestableObservable<int>>(900, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => x)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnError<int>(900, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 900));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 900));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 900));
+ }
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, int, TResult> resultSelector);
- public void SelectMany_WithIndex_IObservable_ResultSelector_Complete()
+ public void SelectManyWithIndex_Error_Inner()
{
var scheduler = new TestScheduler();
- ITestableObservable<ITestableObservable<char>> css = scheduler.CreateHotObservable(
- OnNext(190, scheduler.CreateColdObservable(
- OnNext(1, 'h'),
- OnCompleted<char>(2))),
- OnNext(250, scheduler.CreateColdObservable(
- OnNext(1, 'a'),
- OnCompleted<char>(2))),
- OnNext(270, scheduler.CreateColdObservable(
- OnNext(1, 'l'),
- OnCompleted<char>(2))),
- OnNext(310, scheduler.CreateColdObservable(
- OnNext(1, 'o'),
- OnNext(2, ' '),
- OnNext(3, 'r'),
- OnNext(4, 'u'),
- OnNext(5, 'l'),
- OnNext(6, 'e'),
- OnNext(7, 'z'),
- OnCompleted<char>(8))),
- OnCompleted<ITestableObservable<char>>(410)
- );
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnError<int>(460, ex))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
var res = scheduler.Start(() =>
- css.SelectMany(
- (foo, i) =>
- {
- return foo.Select(c => new { c = c, i = i });
- },
- (source, i, cs, j) => new { c = cs.c, i = cs.i, i2 = i, j = j }
- ));
+ xs.SelectMany((x, _) => x)
+ );
res.Messages.AssertEqual(
- OnNext(251, new { c = 'a', i = 0, i2 = 0, j = 0 }),
- OnNext(271, new { c = 'l', i = 1, i2 = 1, j = 0 }),
- OnNext(311, new { c = 'o', i = 2, i2 = 2, j = 0 }),
- OnNext(312, new { c = ' ', i = 2, i2 = 2, j = 1 }),
- OnNext(313, new { c = 'r', i = 2, i2 = 2, j = 2 }),
- OnNext(314, new { c = 'u', i = 2, i2 = 2, j = 3 }),
- OnNext(315, new { c = 'l', i = 2, i2 = 2, j = 4 }),
- OnNext(316, new { c = 'e', i = 2, i2 = 2, j = 5 }),
- OnNext(317, new { c = 'z', i = 2, i2 = 2, j = 6 }),
- OnCompleted(new { c = 'a', i = 0, i2 = 0, j = 0 }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnError<int>(760, ex)
);
- css.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 760));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 760));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 760));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ );
}
-
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector);
- public void SelectMany_WithIndex_IEnumerable_ResultSelector_Complete()
+ public void SelectManyWithIndex_Dispose()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable(
- OnNext(210, 5),
- OnNext(340, 4),
- OnNext(420, 3),
- OnCompleted<int>(600)
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
);
var res = scheduler.Start(() =>
- xs.SelectMany(
- (x, i) => new[] { new { x = x + 1, i = i }, new { x = -x, i = i }, new { x = x * x, i = i } },
- (x, i, y, j) => new { x = x, i = i, y = y.x, y_i = y.i, j = j })
+ xs.SelectMany((x, _) => x),
+ 700
);
-
+
res.Messages.AssertEqual(
- OnNext(210, new { x = 5, i = 0, y = 6, y_i = 0, j = 0 }),
- OnNext(210, new { x = 5, i = 0, y = -5, y_i = 0, j = 1 }),
- OnNext(210, new { x = 5, i = 0, y = 25, y_i = 0, j = 2 }),
- OnNext(340, new { x = 4, i = 1, y = 5, y_i = 1, j = 0 }),
- OnNext(340, new { x = 4, i = 1, y = -4, y_i = 1, j = 1 }),
- OnNext(340, new { x = 4, i = 1, y = 16, y_i = 1, j = 2 }),
- OnNext(420, new { x = 3, i = 2, y = 4, y_i = 2, j = 0 }),
- OnNext(420, new { x = 3, i = 2, y = -3, y_i = 2, j = 1 }),
- OnNext(420, new { x = 3, i = 2, y = 9, y_i = 2, j = 2 }),
- OnCompleted(new { x = default(int), i = default(int), y = default(int), y_i = default(int), j = default(int) }, 600)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303)
);
xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
+ Subscribe(200, 700));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 700));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 700));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
);
}
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted);
- public void SelectMany_WithIndex_Triple_Complete()
+ public void SelectManyWithIndex_Throw()
{
var scheduler = new TestScheduler();
- ITestableObservable<char> cs = scheduler.CreateHotObservable(
- OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
- OnNext(250, 'a'),
- OnNext(270, 'l'),
- OnNext(310, 'o'),
- OnCompleted<char>(410)
- );
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
+
+ var invoked = 0;
+
+ var ex = new Exception();
var res = scheduler.Start(() =>
- cs.SelectMany(
- (c, i) => Observable.Return(new { c = c, i = i }, scheduler),
- (ex, i) => { throw ex; },
- (i) => Observable.Repeat(new { c = 'x', i = -1 }, i, scheduler)
- ));
+ xs.SelectMany((x, _) =>
+ {
+ invoked++;
+ if (invoked == 3)
+ throw ex;
+ return x;
+ })
+ );
res.Messages.AssertEqual(
- OnNext(251, new { c = 'a', i = 0 }),
- OnNext(271, new { c = 'l', i = 1 }),
- OnNext(311, new { c = 'o', i = 2 }),
- OnCompleted(new { c = default(char), i = default(int) }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnError<int>(550, ex)
);
- cs.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 550));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 550));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 550));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ Assert.AreEqual(3, invoked);
}
+ [TestMethod]
+ public void SelectManyWithIndex_UseFunction()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(10), scheduler).Select(__ => x).Take(x))
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, 4),
+ OnNext(230, 3),
+ OnNext(230, 4),
+ OnNext(240, 3),
+ OnNext(240, 4),
+ OnNext(250, 3),
+ OnNext(250, 4),
+ OnNext(260, 5),
+ OnNext(270, 5),
+ OnNext(280, 1),
+ OnNext(280, 5),
+ OnNext(290, 5),
+ OnNext(300, 5),
+ OnCompleted<int>(300)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
[TestMethod]
public void SelectMany_Enumerable_ArgumentChecking()
@@ -9311,6 +13301,652 @@ namespace ReactiveTests.Tests
}
[TestMethod]
+ public void SelectManyWithIndex_Enumerable_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, IEnumerable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IEnumerable<int>>)null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IEnumerable<int>>.Instance).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, IEnumerable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IEnumerable<int>>)null, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IEnumerable<int>>.Instance, (Func<int, int, int, int, int>)null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => new[] { new { x, i } })
+ );
+
+ var witness = new { x = 0, i = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0 }),
+ OnNext(220, new { x = 3, i = 1 }),
+ OnNext(250, new { x = 5, i = 2 }),
+ OnNext(270, new { x = 1, i = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_ResultSelector_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => Enumerable.Range(10, i + 1), (x, i, y, j) => new { x, i, y, j })
+ );
+
+ var witness = new { x = 0, i = 0, y = 0, j = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 10, j = 0 }),
+ OnNext(250, new { x = 5, i = 2, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 10, j = 0 }),
+ OnNext(270, new { x = 1, i = 3, y = 11, j = 1 }),
+ OnNext(270, new { x = 1, i = 3, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 13, j = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var inners = new List<MockEnumerable<int>>();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) =>
+ {
+ var ys = new MockEnumerable<int>(scheduler, Enumerable.Repeat(x, x));
+ inners.Add(ys);
+ return ys;
+ })
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+
+ Assert.AreEqual(4, inners.Count);
+
+ inners[0].Subscriptions.AssertEqual(
+ Subscribe(210, 210)
+ );
+
+ inners[1].Subscriptions.AssertEqual(
+ Subscribe(340, 340)
+ );
+
+ inners[2].Subscriptions.AssertEqual(
+ Subscribe(420, 420)
+ );
+
+ inners[3].Subscriptions.AssertEqual(
+ Subscribe(510, 510)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Complete_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(510, 4),
+ OnNext(510, 4),
+ OnCompleted<int>(600)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnError<int>(600, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x))
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnNext(510, 2),
+ OnError<int>(600, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Error_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnError<int>(600, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(510, 4),
+ OnNext(510, 4),
+ OnError<int>(600, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x)),
+ 350
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 350)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Dispose_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y),
+ 350
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 350)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_SelectorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var invoked = 0;
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) =>
+ {
+ invoked++;
+ if (invoked == 3)
+ throw ex;
+
+ return Enumerable.Repeat(x, x);
+ })
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnError<int>(420, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(3, invoked);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_ResultSelectorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var inners = new List<MockEnumerable<int>>();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) =>
+ {
+ var ys = new MockEnumerable<int>(scheduler, Enumerable.Repeat(x, x));
+ inners.Add(ys);
+ return ys;
+ },
+ (x, _, y, __) =>
+ {
+ if (x == 3)
+ throw ex;
+
+ return x + y;
+ }
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnError<int>(420, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(3, inners.Count);
+
+ inners[0].Subscriptions.AssertEqual(
+ Subscribe(210, 210)
+ );
+
+ inners[1].Subscriptions.AssertEqual(
+ Subscribe(340, 340)
+ );
+
+ inners[2].Subscriptions.AssertEqual(
+ Subscribe(420, 420)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_ResultSelector_GetEnumeratorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new RogueEnumerable<int>(ex), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_SelectorThrows_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var invoked = 0;
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) =>
+ {
+ invoked++;
+ if (invoked == 3)
+ throw ex;
+
+ return Enumerable.Repeat(x, x);
+ },
+ (x, _, y, __) => x + y
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnError<int>(420, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(3, invoked);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_CurrentThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new CurrentThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_CurrentThrows_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new CurrentThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_GetEnumeratorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new RogueEnumerable<int>(ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_MoveNextThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new MoveNextThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_MoveNextThrows_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new MoveNextThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
public void SelectMany_QueryOperator_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
@@ -9585,12 +14221,294 @@ namespace ReactiveTests.Tests
Subscribe(200, 221)
);
}
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IObservable<int>>)null, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance, ((Func<int, int, int, int, int>)null)));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance).Subscribe(null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => Observable.Range(10, i + 1), (x, i, y, j) => new { x, i, y, j })
+ );
+
+ var witness = new { x = 0, i = 0, y = 0, j = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 10, j = 0 }),
+ OnNext(250, new { x = 5, i = 2, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 10, j = 0 }),
+ OnNext(270, new { x = 1, i = 3, y = 11, j = 1 }),
+ OnNext(270, new { x = 1, i = 3, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 13, j = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_CompleteOuterFirst()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnNext(223, 20),
+ OnNext(223, 31),
+ OnNext(223, 42),
+ OnNext(224, 50),
+ OnNext(224, 21),
+ OnNext(224, 32),
+ OnNext(224, 43),
+ OnNext(225, 51),
+ OnNext(226, 52),
+ OnNext(227, 53),
+ OnNext(228, 54),
+ OnCompleted<int>(228)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 224)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_CompleteInnerFirst()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(300)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnNext(223, 20),
+ OnNext(223, 31),
+ OnNext(223, 42),
+ OnNext(224, 50),
+ OnNext(224, 21),
+ OnNext(224, 32),
+ OnNext(224, 43),
+ OnNext(225, 51),
+ OnNext(226, 52),
+ OnNext(227, 53),
+ OnNext(228, 54),
+ OnCompleted<int>(300)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 300)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ErrorOuter()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnError<int>(224, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnNext(223, 20),
+ OnNext(223, 31),
+ OnNext(223, 42),
+ OnError<int>(224, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 224)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ErrorInner()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => x == 2
+ ? Observable.Throw<long>(ex, scheduler)
+ : Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x),
+ (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnError<int>(223, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 223)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y),
+ 223
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 223)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ThrowSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Throw<IObservable<long>>(ex), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(220, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 220)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ThrowResult()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => Throw<int>(ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(221, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 221)
+ );
+ }
+
[TestMethod]
public void SelectMany_Triple_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, (Func<int, IObservable<int>>)null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, null, DummyFunc<IObservable<int>>.Instance));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, null));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance).Subscribe(null));
@@ -10329,6 +15247,787 @@ namespace ReactiveTests.Tests
);
}
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(null, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, (Func<int, int, IObservable<int>>)null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, null, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance).Subscribe(null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var witness = new { x = 0, i = 0 };
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, i) => Observable.Return(new { x, i }, scheduler),
+ ex => Observable.Throw(ex, scheduler, witness),
+ () => Observable.Empty(scheduler, witness)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, new { x = 0, i = 0 }),
+ OnNext(302, new { x = 1, i = 1 }),
+ OnNext(303, new { x = 2, i = 2 }),
+ OnNext(304, new { x = 3, i = 3 }),
+ OnNext(305, new { x = 4, i = 4 }),
+ OnCompleted(306, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Identity()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Throw<int>(ex, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnCompleted<int>(306)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_InnersWithTiming1()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ysn = scheduler.CreateColdObservable(
+ OnNext(10, 10),
+ OnNext(20, 11),
+ OnNext(30, 12),
+ OnCompleted<int>(40)
+ );
+
+ var yse = scheduler.CreateColdObservable(
+ OnNext(0, 99),
+ OnCompleted<int>(10)
+ );
+
+ var ysc = scheduler.CreateColdObservable(
+ OnNext(10, 42),
+ OnCompleted<int>(20)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => ysn,
+ ex => yse,
+ () => ysc
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 10),
+ OnNext(311, 10),
+ OnNext(312, 10),
+ OnNext(313, 10),
+ OnNext(314, 10),
+ OnNext(315, 42),
+ OnNext(320, 11),
+ OnNext(321, 11),
+ OnNext(322, 11),
+ OnNext(323, 11),
+ OnNext(324, 11),
+ OnNext(330, 12),
+ OnNext(331, 12),
+ OnNext(332, 12),
+ OnNext(333, 12),
+ OnNext(334, 12),
+ OnCompleted<int>(344)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+
+ ysn.Subscriptions.AssertEqual(
+ Subscribe(300, 340),
+ Subscribe(301, 341),
+ Subscribe(302, 342),
+ Subscribe(303, 343),
+ Subscribe(304, 344)
+ );
+
+ yse.Subscriptions.AssertEqual(
+ );
+
+ ysc.Subscriptions.AssertEqual(
+ Subscribe(305, 325)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_InnersWithTiming2()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ysn = scheduler.CreateColdObservable(
+ OnNext(10, 10),
+ OnNext(20, 11),
+ OnNext(30, 12),
+ OnCompleted<int>(40)
+ );
+
+ var yse = scheduler.CreateColdObservable(
+ OnNext(0, 99),
+ OnCompleted<int>(10)
+ );
+
+ var ysc = scheduler.CreateColdObservable(
+ OnNext(10, 42),
+ OnCompleted<int>(50)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => ysn,
+ ex => yse,
+ () => ysc
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 10),
+ OnNext(311, 10),
+ OnNext(312, 10),
+ OnNext(313, 10),
+ OnNext(314, 10),
+ OnNext(315, 42),
+ OnNext(320, 11),
+ OnNext(321, 11),
+ OnNext(322, 11),
+ OnNext(323, 11),
+ OnNext(324, 11),
+ OnNext(330, 12),
+ OnNext(331, 12),
+ OnNext(332, 12),
+ OnNext(333, 12),
+ OnNext(334, 12),
+ OnCompleted<int>(355)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+
+ ysn.Subscriptions.AssertEqual(
+ Subscribe(300, 340),
+ Subscribe(301, 341),
+ Subscribe(302, 342),
+ Subscribe(303, 343),
+ Subscribe(304, 344)
+ );
+
+ yse.Subscriptions.AssertEqual(
+ );
+
+ ysc.Subscriptions.AssertEqual(
+ Subscribe(305, 355)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_InnersWithTiming3()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(400, 1),
+ OnNext(500, 2),
+ OnNext(600, 3),
+ OnNext(700, 4),
+ OnCompleted<int>(800)
+ );
+
+ var ysn = scheduler.CreateColdObservable(
+ OnNext(10, 10),
+ OnNext(20, 11),
+ OnNext(30, 12),
+ OnCompleted<int>(40)
+ );
+
+ var yse = scheduler.CreateColdObservable(
+ OnNext(0, 99),
+ OnCompleted<int>(10)
+ );
+
+ var ysc = scheduler.CreateColdObservable(
+ OnNext(10, 42),
+ OnCompleted<int>(100)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => ysn,
+ ex => yse,
+ () => ysc
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 10),
+ OnNext(320, 11),
+ OnNext(330, 12),
+ OnNext(410, 10),
+ OnNext(420, 11),
+ OnNext(430, 12),
+ OnNext(510, 10),
+ OnNext(520, 11),
+ OnNext(530, 12),
+ OnNext(610, 10),
+ OnNext(620, 11),
+ OnNext(630, 12),
+ OnNext(710, 10),
+ OnNext(720, 11),
+ OnNext(730, 12),
+ OnNext(810, 42),
+ OnCompleted<int>(900)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 800)
+ );
+
+ ysn.Subscriptions.AssertEqual(
+ Subscribe(300, 340),
+ Subscribe(400, 440),
+ Subscribe(500, 540),
+ Subscribe(600, 640),
+ Subscribe(700, 740)
+ );
+
+ yse.Subscriptions.AssertEqual(
+ );
+
+ ysc.Subscriptions.AssertEqual(
+ Subscribe(800, 900)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Error_Identity()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex1 => Observable.Throw<int>(ex1, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnError<int>(306, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_SelectMany()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Throw<int>(ex, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, 4),
+ OnNext(306, 3),
+ OnNext(307, 4),
+ OnNext(308, 4),
+ OnCompleted<int>(308)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Concat()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Throw<int>(ex, scheduler),
+ () => Observable.Range(1, 3, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnNext(306, 1),
+ OnNext(307, 2),
+ OnNext(308, 3),
+ OnCompleted<int>(309)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Catch()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Range(1, 3, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnCompleted<int>(306)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Error_Catch()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Range(1, 3, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnNext(306, 1),
+ OnNext(307, 2),
+ OnNext(308, 3),
+ OnCompleted<int>(309)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, -1),
+ OnNext(306, 4),
+ OnNext(306, 3),
+ OnNext(307, -1),
+ OnNext(307, 4),
+ OnNext(308, 4),
+ OnCompleted<int>(308)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Error_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, 0),
+ OnNext(306, 4),
+ OnNext(306, 3),
+ OnNext(307, 0),
+ OnNext(307, 4),
+ OnNext(308, 4),
+ OnCompleted<int>(308)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_All_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ ),
+ 307
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, -1),
+ OnNext(306, 4),
+ OnNext(306, 3)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_All_Dispose_Before_First()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ ),
+ 304
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 304)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_OnNextThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Throw<IObservable<int>>(ex),
+ ex1 => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(300, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 300)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_OnErrorThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex1 => Throw<IObservable<int>>(ex),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnError<int>(305, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_OnCompletedThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex1 => Observable.Repeat(0, 2, scheduler),
+ () => Throw<IObservable<int>>(ex)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnError<int>(305, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
#if !NO_TPL
[TestMethod]
@@ -10339,7 +16038,7 @@ namespace ReactiveTests.Tests
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), x => t));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, default(Func<int, Task<int>>)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), (x, ct) => t));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), (int x, CancellationToken ct) => t));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, default(Func<int, CancellationToken, Task<int>>)));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), x => t, (x, y) => x));
@@ -11218,6 +16917,919 @@ namespace ReactiveTests.Tests
Assert.AreEqual(0, m);
}
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, Task<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, Task<int>>)null));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, CancellationToken, Task<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, CancellationToken, Task<int>>)null));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, Task<int>>.Instance, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, Task<int>>)null, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, Task<int>>.Instance, ((Func<int, int, int, int>)null)));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, CancellationToken, Task<int>>.Instance, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, CancellationToken, Task<int>>)null, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, CancellationToken, Task<int>>.Instance, ((Func<int, int, int, int>)null)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((int x, int i) => Task.Factory.StartNew(() => new { x, i })).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_Cancellation_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, i, ctx) => Task.Factory.StartNew(() => new { x, i }, ctx)).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((int x, int i) => Task.Factory.StartNew(() => new { x, i }), (x, i, r) => r).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector_Cancellation_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, i, ctx) => Task.Factory.StartNew(() => new { x, i }, ctx), (x, i, r) => r).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task1()
+ {
+ var res = Observable.Range(0, 10).SelectMany((int x, int _) => Task.Factory.StartNew(() => x + 1)).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task2()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, _, ct) => Task.Factory.StartNew(() => x + 1, ct)).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_TaskThrows()
+ {
+ var ex = new Exception();
+
+ var res = Observable.Range(0, 10).SelectMany((int x, int _) => Task.Factory.StartNew(() =>
+ {
+ if (x > 5)
+ throw ex;
+ return x + 1;
+ })).ToEnumerable();
+
+ ReactiveAssert.Throws(ex, () =>
+ {
+ foreach (var x in res)
+ ;
+ });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_SelectorThrows()
+ {
+ var ex = new Exception();
+
+ var res = Observable.Range(0, 10).SelectMany((int x, int _) =>
+ {
+ if (x > 5)
+ throw ex;
+ return Task.Factory.StartNew(() => x + 1);
+ }).ToEnumerable();
+
+ ReactiveAssert.Throws(ex, () =>
+ {
+ foreach (var x in res)
+ ;
+ });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector1()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, _) => Task.Factory.StartNew(() => x + 1), (x, _, y) => x + y).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { 2 * x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector2()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, _, ct) => Task.Factory.StartNew(() => x + 1, ct), (x, _, y) => x + y).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { 2 * x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelectorThrows()
+ {
+ var ex = new Exception();
+
+ var res = Observable.Range(0, 10).SelectMany((x, _) => Task.Factory.StartNew(() => x + 1), (x, _, y) =>
+ {
+ if (x > 5)
+ throw ex;
+ return x + y;
+ }).ToEnumerable();
+
+ ReactiveAssert.Throws(ex, () =>
+ {
+ foreach (var x in res)
+ ;
+ });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_RanToCompletion_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_RanToCompletion_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Faulted_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Faulted_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Canceled_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ tcss[1].SetCanceled();
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Canceled_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ tcss[1].SetCanceled();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_InnerCompleteBeforeOuter()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ xs.OnCompleted();
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_OuterCompleteBeforeInner()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_NeverInvoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => tcs.SetCanceled());
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ var d = res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_Invoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = false;
+ var d = res.Subscribe(lst.Add, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ d.Dispose();
+
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // never observed because xs.OnNext(2) happened after dispose
+
+ lst.AssertEqual(new[] { 42 });
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_AfterOuterError()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = false;
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => err = ex_, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ var ex = new Exception();
+ xs.OnError(ex);
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // no-op
+
+ lst.AssertEqual(new[] { 42 });
+ Assert.AreSame(ex, err);
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_AfterSelectorThrows()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[4];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+ tcss[3] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var ex = new Exception();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ if (x == 2)
+ throw ex;
+
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = false;
+ var evt = new ManualResetEvent(false);
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; evt.Set(); }, () => { done = true; evt.Set(); });
+
+ tcss[1].SetResult(43);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ tcss[0].SetResult(42);
+
+ xs.OnNext(2); // causes error
+ xs.OnCompleted();
+
+ evt.WaitOne();
+
+ Assert.IsFalse(done);
+ Assert.AreSame(ex, err);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(0, m);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_RanToCompletion_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 0, 43 + 1 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_RanToCompletion_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 0, 43 + 1 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Faulted_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Faulted_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Canceled_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ tcss[1].SetCanceled();
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Canceled_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ tcss[1].SetCanceled();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_InnerCompleteBeforeOuter()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ xs.OnCompleted();
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_OuterCompleteBeforeInner()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_NeverInvoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => tcs.SetCanceled());
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ var d = res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_Invoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = false;
+ var d = res.Subscribe(lst.Add, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ d.Dispose();
+
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // never observed because xs.OnNext(2) happened after dispose
+
+ lst.AssertEqual(new[] { 42 + 1 });
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_AfterOuterError()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = false;
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => err = ex_, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ var ex = new Exception();
+ xs.OnError(ex);
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // no-op
+
+ lst.AssertEqual(new[] { 42 + 1 });
+ Assert.AreSame(ex, err);
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_AfterSelectorThrows()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[4];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+ tcss[3] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var ex = new Exception();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ if (x == 2)
+ throw ex;
+
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = false;
+ var evt = new ManualResetEvent(false);
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; evt.Set(); }, () => { done = true; evt.Set(); });
+
+ tcss[1].SetResult(43);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ tcss[0].SetResult(42);
+
+ xs.OnNext(2); // causes error
+ xs.OnCompleted();
+
+ evt.WaitOne();
+
+ Assert.IsFalse(done);
+ Assert.AreSame(ex, err);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(0, m);
+ }
+
#endif
#endregion