Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
committerAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
commit74a538f6725ebc83efda4bb07d5747e8a6359e19 (patch)
tree7c98de97c88c78b4aca4b25b36db310f82c26865 /Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
parent50e7bdb4507f7e4c2aefb7772d57d9a80f4d42b0 (diff)
Import Official Rx 2.2 (3ebdd2e09991)HEADmaster
I made changes from the original source tree to match the older tree so that we don't have to make several changes to project tree generator. (There is actually no new sources in Rx so hopefully we can just reuse existing modifications in the tree).
Diffstat (limited to 'Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs')
-rw-r--r--Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs6870
1 files changed, 6741 insertions, 129 deletions
diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
index 0670909..dd17353 100644
--- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
+++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
@@ -2486,7 +2486,7 @@ namespace ReactiveTests.Tests
var nullGroup = scheduler.CreateObserver<string>();
var err = default(Exception);
-
+
scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
scheduler.Start();
@@ -2553,6 +2553,1704 @@ namespace ReactiveTests.Tests
#endregion
+ #region + GroupBy w/capacity +
+
+ private const int _groupByCapacity = 1024;
+
+ [TestMethod]
+ public void GroupBy_Capacity_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_KeyEle_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_KeyComparer_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, (IEqualityComparer<int>)null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Key_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1));
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_WithKeyComparer()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ }, _groupByCapacity, comparer).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(570, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ }, x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ }, _groupByCapacity, comparer
+ ).Select(g => g.Key),
+ 355
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 355)
+ );
+
+ Assert.AreEqual(5, keyInvoked);
+ Assert.AreEqual(5, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 10)
+ throw ex;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(9, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 10)
+ throw ex;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(10, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_ComparerEqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnError<string>(310, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 310)
+ );
+
+ Assert.AreEqual(4, keyInvoked);
+ Assert.AreEqual(3, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_ComparerGetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
+
+ var res = scheduler.Start(() =>
+ xs.GroupBy(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ _groupByCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(8, keyInvoked);
+ Assert.AreEqual(7, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(570)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Complete_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(570)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex1 = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex1),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }, ex => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnError<string>(570, ex1)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnError<string>(570, ex1)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnError<string>(570, ex1)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(570, ex1)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(400, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof")
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab ")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 400)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var keyInvoked = 0;
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 6)
+ throw ex;
+ return x.Trim();
+ }, x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(3, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(360, ex)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var eleInvoked = 0;
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 6)
+ throw ex;
+ return Reverse(x);
+ }, _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(360, ex)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Comparer_EqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Comparer_GetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Outer_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(2, inners.Count);
+
+ outerResults.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR")
+ );
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof")
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(570)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(570)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Multiple_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+ scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
+ scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
+ scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof")
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Escape_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnCompleted<string>(600)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Escape_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<string>(600, ex)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_Inner_Escape_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, new Exception())
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose());
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 400)
+ );
+
+ res.Messages.AssertEqual(
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_NullKeys_Simple()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupBy_Capacity_NullKeys_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ var nullGroup = scheduler.CreateObserver<string>();
+ var err = default(Exception);
+
+ scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
+ scheduler.Start();
+
+ Assert.AreSame(ex, err);
+
+ nullGroup.Messages.AssertEqual(
+ OnNext(220, "bar"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ #endregion
+
#region + GroupByUntil +
[TestMethod]
@@ -4405,6 +6103,1874 @@ namespace ReactiveTests.Tests
#endregion
+ #region + GroupByUntil w/capacity +
+
+ private const int _groupByUntilCapacity = 1024;
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>)));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>)));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity));
+
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1));
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default));
+ ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1));
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_WithKeyComparer()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnError<string>(570, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key),
+ 355
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 355)
+ );
+
+ Assert.AreEqual(5, keyInvoked);
+ Assert.AreEqual(5, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 10)
+ throw ex;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(9, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 10)
+ throw ex;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "FOO"),
+ OnError<string>(480, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 480)
+ );
+
+ Assert.AreEqual(10, keyInvoked);
+ Assert.AreEqual(10, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_ComparerEqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnError<string>(310, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 310)
+ );
+
+ Assert.AreEqual(4, keyInvoked);
+ Assert.AreEqual(3, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_ComparerGetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity,
+ comparer
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR"),
+ OnNext(350, "Baz"),
+ OnNext(360, "qux"),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(8, keyInvoked);
+ Assert.AreEqual(7, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnCompleted<string>(320)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnCompleted<string>(570)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Complete_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex1 = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex1),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
+ }, ex => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnCompleted<string>(320)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(390, "rab "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(480, " zab"),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(570, ex1)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnError<string>(570, ex1)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }));
+
+ scheduler.ScheduleAbsolute(400, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab ")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 400)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_KeyThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var keyInvoked = 0;
+ var ex = new Exception();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x =>
+ {
+ keyInvoked++;
+ if (keyInvoked == 6)
+ throw ex;
+ return x.Trim();
+ }, x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(3, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_EleThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ var eleInvoked = 0;
+ var ex = new Exception();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x =>
+ {
+ eleInvoked++;
+ if (eleInvoked == 6)
+ throw ex;
+ return Reverse(x);
+ }, g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnError<string>(360, ex)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(360, ex)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnError<string>(360, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 360)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Comparer_EqualsThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.EqualsException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Comparer_GetHashCodeThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.Start();
+
+ Assert.AreEqual(4, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnError<string>(420, comparer.HashCodeException)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Outer_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(2, inners.Count);
+
+ outerResults.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "baR")
+ );
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(420)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab"),
+ OnNext(390, "rab "),
+ OnNext(420, " RAB "),
+ OnCompleted<string>(420)
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB "),
+ OnNext(480, " zab"),
+ OnNext(510, " ZAb "),
+ OnCompleted<string>(510)
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq "),
+ OnCompleted<string>(570)
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Multiple_Independence()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var comparer = new GroupByComparer(scheduler);
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inners = new Dictionary<string, IObservable<string>>();
+ var innerSubscriptions = new Dictionary<string, IDisposable>();
+ var res = new Dictionary<string, ITestableObserver<string>>();
+ var outerResults = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ outerResults.OnNext(group.Key);
+ var result = scheduler.CreateObserver<string>();
+ inners[group.Key] = group;
+ res[group.Key] = result;
+ innerSubscriptions[group.Key] = group.Subscribe(result);
+ }, outerResults.OnError, outerResults.OnCompleted));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ foreach (var d in innerSubscriptions.Values)
+ d.Dispose();
+ });
+
+ scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
+ scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
+ scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
+ scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
+
+ scheduler.Start();
+
+ Assert.AreEqual(5, inners.Count);
+
+ res["foo"].Messages.AssertEqual(
+ OnNext(220, "oof "),
+ OnNext(240, " OoF "),
+ OnNext(310, " Oof"),
+ OnCompleted<string>(310)
+ );
+
+ res["baR"].Messages.AssertEqual(
+ OnNext(270, " Rab")
+ );
+
+ res["Baz"].Messages.AssertEqual(
+ OnNext(350, " zaB ")
+ );
+
+ res["qux"].Messages.AssertEqual(
+ OnNext(360, " xuq ")
+ );
+
+ res["FOO"].Messages.AssertEqual(
+ OnNext(470, " OOF"),
+ OnNext(530, " oOf "),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Escape_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnCompleted<string>(600)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Escape_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, ex)
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }, _ => { }));
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ outerSubscription.Dispose();
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<string>(600, ex)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Inner_Escape_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(310, "foO "),
+ OnNext(470, "FOO "),
+ OnNext(530, " fOo "),
+ OnError<string>(570, new Exception())
+ );
+
+ var outer = default(IObservable<IGroupedObservable<string, string>>);
+ var outerSubscription = default(IDisposable);
+ var inner = default(IObservable<string>);
+ var innerSubscription = default(IDisposable);
+ var res = scheduler.CreateObserver<string>();
+
+ scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
+
+ scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
+ {
+ inner = group;
+ }));
+
+ scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose());
+
+ scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
+
+ scheduler.ScheduleAbsolute(Disposed, () =>
+ {
+ innerSubscription.Dispose();
+ });
+
+ scheduler.Start();
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+
+ res.Messages.AssertEqual(
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_Default()
+ {
+ var scheduler = new TestScheduler();
+
+ var keyInvoked = 0;
+ var eleInvoked = 0;
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(90, "error"),
+ OnNext(110, "error"),
+ OnNext(130, "error"),
+ OnNext(220, " foo"),
+ OnNext(240, " FoO "),
+ OnNext(270, "baR "),
+ OnNext(310, "foO "),
+ OnNext(350, " Baz "),
+ OnNext(360, " qux "),
+ OnNext(390, " bar"),
+ OnNext(420, " BAR "),
+ OnNext(470, "FOO "),
+ OnNext(480, "baz "),
+ OnNext(510, " bAZ "),
+ OnNext(530, " fOo "),
+ OnCompleted<string>(570),
+ OnNext(580, "error"),
+ OnCompleted<string>(600),
+ OnError<string>(650, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil(
+ x =>
+ {
+ keyInvoked++;
+ return x.Trim().ToLower();
+ },
+ x =>
+ {
+ eleInvoked++;
+ return Reverse(x);
+ },
+ g => g.Skip(2),
+ _groupByUntilCapacity
+ ).Select(g => g.Key)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, "foo"),
+ OnNext(270, "bar"),
+ OnNext(350, "baz"),
+ OnNext(360, "qux"),
+ OnNext(470, "foo"),
+ OnCompleted<string>(570)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 570)
+ );
+
+ Assert.AreEqual(12, keyInvoked);
+ Assert.AreEqual(12, eleInvoked);
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_DurationSelector_Throws()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, "foo")
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.GroupByUntil<string, string, string>(x => x, g => { throw ex; }, _groupByUntilCapacity)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<IGroupedObservable<string, string>>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Simple_Never()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Simple_Expire1()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var n = 0;
+ var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ Assert.AreEqual(2, n);
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Simple_Expire2()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnCompleted<string>(500)
+ );
+
+ var n = 0;
+ var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
+
+ Assert.AreEqual(2, n);
+
+ res.Messages.AssertEqual(
+ OnNext(220, "(null)bar"),
+ OnNext(240, "FOOfoo"),
+ OnNext(310, "QUXqux"),
+ OnNext(470, "(null)baz"),
+ OnCompleted<string>(500)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ [TestMethod]
+ public void GroupByUntil_Capacity_NullKeys_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, "bar"),
+ OnNext(240, "foo"),
+ OnNext(310, "qux"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ var nullGroup = scheduler.CreateObserver<string>();
+ var err = default(Exception);
+
+ scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
+ scheduler.Start();
+
+ Assert.AreSame(ex, err);
+
+ nullGroup.Messages.AssertEqual(
+ OnNext(220, "bar"),
+ OnNext(470, "baz"),
+ OnError<string>(500, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 500)
+ );
+ }
+
+ #endregion
+
#region + GroupJoin +
[TestMethod]
@@ -8425,196 +11991,620 @@ namespace ReactiveTests.Tests
}
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector);
- public void SelectMany_WithIndex_Complete()
+ public void SelectManyWithIndex_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IObservable<int>>)null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance).Subscribe(null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => Observable.Return(new { x, i }))
+ );
+
+ var witness = new { x = 0, i = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0 }),
+ OnNext(220, new { x = 3, i = 1 }),
+ OnNext(250, new { x = 5, i = 2 }),
+ OnNext(270, new { x = 1, i = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => x)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnNext(930, 401),
+ OnNext(940, 402),
+ OnCompleted<int>(960)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 900));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 960));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 950));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Complete_InnerNotComplete()
{
var scheduler = new TestScheduler();
- ITestableObservable<char> cs = scheduler.CreateHotObservable(
- OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
- OnNext(250, 'a'),
- OnNext(270, 'l'),
- OnNext(310, 'o'),
- OnCompleted<char>(410)
- );
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
var res = scheduler.Start(() =>
- cs.SelectMany(
- (x, i) => Observable.Return(new { x, i }, scheduler)
- ));
+ xs.SelectMany((x, _) => x)
+ );
res.Messages.AssertEqual(
- OnNext(251, new { x = 'a', i = 0 }),
- OnNext(271, new { x = 'l', i = 1 }),
- OnNext(311, new { x = 'o', i = 2 }),
- OnCompleted(new { x = default(char), i = default(int) }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnNext(930, 401),
+ OnNext(940, 402)
);
- cs.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 900));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 1000));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 960));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 950));
}
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector);
- public void SelectMany_WithIndex_IEnumerable_Complete()
+ public void SelectManyWithIndex_Complete_OuterNotComplete()
{
var scheduler = new TestScheduler();
- ITestableObservable<char> cs = scheduler.CreateHotObservable(
- OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
- OnNext(250, 'a'),
- OnNext(270, 'l'),
- OnNext(310, 'o'),
- OnCompleted<char>(410)
- );
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100)))
+ );
var res = scheduler.Start(() =>
- cs.SelectMany(
- (c, i) => new [] { new { c = c, i = i } }
- ));
+ xs.SelectMany((x, _) => x)
+ );
-
res.Messages.AssertEqual(
- OnNext(250, new { c = 'a', i = 0 }),
- OnNext(270, new { c = 'l', i = 1 }),
- OnNext(310, new { c = 'o', i = 2 }),
- OnCompleted(new { c = default(char), i = default(int) }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnNext(930, 401),
+ OnNext(940, 402)
);
- cs.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 1000));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 960));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 950));
}
+ [TestMethod]
+ public void SelectManyWithIndex_Error_Outer()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnError<ITestableObservable<int>>(900, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => x)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnNext(810, 304),
+ OnNext(860, 305),
+ OnError<int>(900, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 900));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 900));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 790));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(850, 900));
+ }
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, int, TResult> resultSelector);
- public void SelectMany_WithIndex_IObservable_ResultSelector_Complete()
+ public void SelectManyWithIndex_Error_Inner()
{
var scheduler = new TestScheduler();
- ITestableObservable<ITestableObservable<char>> css = scheduler.CreateHotObservable(
- OnNext(190, scheduler.CreateColdObservable(
- OnNext(1, 'h'),
- OnCompleted<char>(2))),
- OnNext(250, scheduler.CreateColdObservable(
- OnNext(1, 'a'),
- OnCompleted<char>(2))),
- OnNext(270, scheduler.CreateColdObservable(
- OnNext(1, 'l'),
- OnCompleted<char>(2))),
- OnNext(310, scheduler.CreateColdObservable(
- OnNext(1, 'o'),
- OnNext(2, ' '),
- OnNext(3, 'r'),
- OnNext(4, 'u'),
- OnNext(5, 'l'),
- OnNext(6, 'e'),
- OnNext(7, 'z'),
- OnCompleted<char>(8))),
- OnCompleted<ITestableObservable<char>>(410)
- );
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnError<int>(460, ex))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
var res = scheduler.Start(() =>
- css.SelectMany(
- (foo, i) =>
- {
- return foo.Select(c => new { c = c, i = i });
- },
- (source, i, cs, j) => new { c = cs.c, i = cs.i, i2 = i, j = j }
- ));
+ xs.SelectMany((x, _) => x)
+ );
res.Messages.AssertEqual(
- OnNext(251, new { c = 'a', i = 0, i2 = 0, j = 0 }),
- OnNext(271, new { c = 'l', i = 1, i2 = 1, j = 0 }),
- OnNext(311, new { c = 'o', i = 2, i2 = 2, j = 0 }),
- OnNext(312, new { c = ' ', i = 2, i2 = 2, j = 1 }),
- OnNext(313, new { c = 'r', i = 2, i2 = 2, j = 2 }),
- OnNext(314, new { c = 'u', i = 2, i2 = 2, j = 3 }),
- OnNext(315, new { c = 'l', i = 2, i2 = 2, j = 4 }),
- OnNext(316, new { c = 'e', i = 2, i2 = 2, j = 5 }),
- OnNext(317, new { c = 'z', i = 2, i2 = 2, j = 6 }),
- OnCompleted(new { c = 'a', i = 0, i2 = 0, j = 0 }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303),
+ OnNext(740, 106),
+ OnError<int>(760, ex)
);
- css.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 760));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 760));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 760));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(750, 760));
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ );
}
-
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector);
- public void SelectMany_WithIndex_IEnumerable_ResultSelector_Complete()
+ public void SelectManyWithIndex_Dispose()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable(
- OnNext(210, 5),
- OnNext(340, 4),
- OnNext(420, 3),
- OnCompleted<int>(600)
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
);
var res = scheduler.Start(() =>
- xs.SelectMany(
- (x, i) => new[] { new { x = x + 1, i = i }, new { x = -x, i = i }, new { x = x * x, i = i } },
- (x, i, y, j) => new { x = x, i = i, y = y.x, y_i = y.i, j = j })
+ xs.SelectMany((x, _) => x),
+ 700
);
-
+
res.Messages.AssertEqual(
- OnNext(210, new { x = 5, i = 0, y = 6, y_i = 0, j = 0 }),
- OnNext(210, new { x = 5, i = 0, y = -5, y_i = 0, j = 1 }),
- OnNext(210, new { x = 5, i = 0, y = 25, y_i = 0, j = 2 }),
- OnNext(340, new { x = 4, i = 1, y = 5, y_i = 1, j = 0 }),
- OnNext(340, new { x = 4, i = 1, y = -4, y_i = 1, j = 1 }),
- OnNext(340, new { x = 4, i = 1, y = 16, y_i = 1, j = 2 }),
- OnNext(420, new { x = 3, i = 2, y = 4, y_i = 2, j = 0 }),
- OnNext(420, new { x = 3, i = 2, y = -3, y_i = 2, j = 1 }),
- OnNext(420, new { x = 3, i = 2, y = 9, y_i = 2, j = 2 }),
- OnCompleted(new { x = default(int), i = default(int), y = default(int), y_i = default(int), j = default(int) }, 600)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnNext(560, 301),
+ OnNext(580, 202),
+ OnNext(590, 203),
+ OnNext(600, 302),
+ OnNext(620, 303)
);
xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
+ Subscribe(200, 700));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 700));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 605));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(550, 700));
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
);
}
[TestMethod]
- // Tests this overload:
- // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted);
- public void SelectMany_WithIndex_Triple_Complete()
+ public void SelectManyWithIndex_Throw()
{
var scheduler = new TestScheduler();
- ITestableObservable<char> cs = scheduler.CreateHotObservable(
- OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored.
- OnNext(250, 'a'),
- OnNext(270, 'l'),
- OnNext(310, 'o'),
- OnCompleted<char>(410)
- );
+ var xs = scheduler.CreateHotObservable(
+ OnNext(5, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(105, scheduler.CreateColdObservable(
+ OnError<int>(1, new InvalidOperationException()))),
+ OnNext(300, scheduler.CreateColdObservable(
+ OnNext(10, 102),
+ OnNext(90, 103),
+ OnNext(110, 104),
+ OnNext(190, 105),
+ OnNext(440, 106),
+ OnCompleted<int>(460))),
+ OnNext(400, scheduler.CreateColdObservable(
+ OnNext(180, 202),
+ OnNext(190, 203),
+ OnCompleted<int>(205))),
+ OnNext(550, scheduler.CreateColdObservable(
+ OnNext(10, 301),
+ OnNext(50, 302),
+ OnNext(70, 303),
+ OnNext(260, 304),
+ OnNext(310, 305),
+ OnCompleted<int>(410))),
+ OnNext(750, scheduler.CreateColdObservable(
+ OnCompleted<int>(40))),
+ OnNext(850, scheduler.CreateColdObservable(
+ OnNext(80, 401),
+ OnNext(90, 402),
+ OnCompleted<int>(100))),
+ OnCompleted<ITestableObservable<int>>(900)
+ );
+
+ var invoked = 0;
+
+ var ex = new Exception();
var res = scheduler.Start(() =>
- cs.SelectMany(
- (c, i) => Observable.Return(new { c = c, i = i }, scheduler),
- (ex, i) => { throw ex; },
- (i) => Observable.Repeat(new { c = 'x', i = -1 }, i, scheduler)
- ));
+ xs.SelectMany((x, _) =>
+ {
+ invoked++;
+ if (invoked == 3)
+ throw ex;
+ return x;
+ })
+ );
res.Messages.AssertEqual(
- OnNext(251, new { c = 'a', i = 0 }),
- OnNext(271, new { c = 'l', i = 1 }),
- OnNext(311, new { c = 'o', i = 2 }),
- OnCompleted(new { c = default(char), i = default(int) }, 410)
+ OnNext(310, 102),
+ OnNext(390, 103),
+ OnNext(410, 104),
+ OnNext(490, 105),
+ OnError<int>(550, ex)
);
- cs.Subscriptions.AssertEqual(
- Subscribe(200, 410));
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 550));
+
+ xs.Messages[2].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(300, 550));
+
+ xs.Messages[3].Value.Value.Subscriptions.AssertEqual(
+ Subscribe(400, 550));
+
+ xs.Messages[4].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ xs.Messages[5].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ xs.Messages[6].Value.Value.Subscriptions.AssertEqual(
+ );
+
+ Assert.AreEqual(3, invoked);
}
+ [TestMethod]
+ public void SelectManyWithIndex_UseFunction()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(10), scheduler).Select(__ => x).Take(x))
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(220, 4),
+ OnNext(230, 3),
+ OnNext(230, 4),
+ OnNext(240, 3),
+ OnNext(240, 4),
+ OnNext(250, 3),
+ OnNext(250, 4),
+ OnNext(260, 5),
+ OnNext(270, 5),
+ OnNext(280, 1),
+ OnNext(280, 5),
+ OnNext(290, 5),
+ OnNext(300, 5),
+ OnCompleted<int>(300)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
[TestMethod]
public void SelectMany_Enumerable_ArgumentChecking()
@@ -9311,6 +13301,652 @@ namespace ReactiveTests.Tests
}
[TestMethod]
+ public void SelectManyWithIndex_Enumerable_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, IEnumerable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IEnumerable<int>>)null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IEnumerable<int>>.Instance).Subscribe(null));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, IEnumerable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IEnumerable<int>>)null, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IEnumerable<int>>.Instance, (Func<int, int, int, int, int>)null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => new[] { new { x, i } })
+ );
+
+ var witness = new { x = 0, i = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0 }),
+ OnNext(220, new { x = 3, i = 1 }),
+ OnNext(250, new { x = 5, i = 2 }),
+ OnNext(270, new { x = 1, i = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_ResultSelector_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => Enumerable.Range(10, i + 1), (x, i, y, j) => new { x, i, y, j })
+ );
+
+ var witness = new { x = 0, i = 0, y = 0, j = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 10, j = 0 }),
+ OnNext(250, new { x = 5, i = 2, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 10, j = 0 }),
+ OnNext(270, new { x = 1, i = 3, y = 11, j = 1 }),
+ OnNext(270, new { x = 1, i = 3, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 13, j = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Complete()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var inners = new List<MockEnumerable<int>>();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) =>
+ {
+ var ys = new MockEnumerable<int>(scheduler, Enumerable.Repeat(x, x));
+ inners.Add(ys);
+ return ys;
+ })
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+
+ Assert.AreEqual(4, inners.Count);
+
+ inners[0].Subscriptions.AssertEqual(
+ Subscribe(210, 210)
+ );
+
+ inners[1].Subscriptions.AssertEqual(
+ Subscribe(340, 340)
+ );
+
+ inners[2].Subscriptions.AssertEqual(
+ Subscribe(420, 420)
+ );
+
+ inners[3].Subscriptions.AssertEqual(
+ Subscribe(510, 510)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Complete_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(510, 4),
+ OnNext(510, 4),
+ OnCompleted<int>(600)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Error()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnError<int>(600, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x))
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnNext(510, 2),
+ OnError<int>(600, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Error_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnError<int>(600, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(420, 6),
+ OnNext(510, 4),
+ OnNext(510, 4),
+ OnError<int>(600, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 600)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x)),
+ 350
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 350)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_Dispose_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y),
+ 350
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 350)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_SelectorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var invoked = 0;
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) =>
+ {
+ invoked++;
+ if (invoked == 3)
+ throw ex;
+
+ return Enumerable.Repeat(x, x);
+ })
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 2),
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnNext(340, 4),
+ OnError<int>(420, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(3, invoked);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_ResultSelectorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var inners = new List<MockEnumerable<int>>();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) =>
+ {
+ var ys = new MockEnumerable<int>(scheduler, Enumerable.Repeat(x, x));
+ inners.Add(ys);
+ return ys;
+ },
+ (x, _, y, __) =>
+ {
+ if (x == 3)
+ throw ex;
+
+ return x + y;
+ }
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnError<int>(420, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(3, inners.Count);
+
+ inners[0].Subscriptions.AssertEqual(
+ Subscribe(210, 210)
+ );
+
+ inners[1].Subscriptions.AssertEqual(
+ Subscribe(340, 340)
+ );
+
+ inners[2].Subscriptions.AssertEqual(
+ Subscribe(420, 420)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_ResultSelector_GetEnumeratorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new RogueEnumerable<int>(ex), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_SelectorThrows_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var invoked = 0;
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) =>
+ {
+ invoked++;
+ if (invoked == 3)
+ throw ex;
+
+ return Enumerable.Repeat(x, x);
+ },
+ (x, _, y, __) => x + y
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(210, 4),
+ OnNext(210, 4),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnNext(340, 8),
+ OnError<int>(420, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 420)
+ );
+
+ Assert.AreEqual(3, invoked);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_CurrentThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new CurrentThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_CurrentThrows_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new CurrentThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_GetEnumeratorThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new RogueEnumerable<int>(ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_MoveNextThrows()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new MoveNextThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Enumerable_MoveNextThrows_ResultSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 2),
+ OnNext(340, 4),
+ OnNext(420, 3),
+ OnNext(510, 2),
+ OnCompleted<int>(600)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => new MoveNextThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex), (x, _, y, __) => x + y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(210, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 210)
+ );
+ }
+
+ [TestMethod]
public void SelectMany_QueryOperator_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
@@ -9585,12 +14221,294 @@ namespace ReactiveTests.Tests
Subscribe(200, 221)
);
}
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IObservable<int>>)null, DummyFunc<int, int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance, ((Func<int, int, int, int, int>)null)));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance).Subscribe(null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(210, 4),
+ OnNext(220, 3),
+ OnNext(250, 5),
+ OnNext(270, 1),
+ OnCompleted<int>(290)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, i) => Observable.Range(10, i + 1), (x, i, y, j) => new { x, i, y, j })
+ );
+
+ var witness = new { x = 0, i = 0, y = 0, j = 0 };
+
+ res.Messages.AssertEqual(
+ OnNext(210, new { x = 4, i = 0, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 10, j = 0 }),
+ OnNext(220, new { x = 3, i = 1, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 10, j = 0 }),
+ OnNext(250, new { x = 5, i = 2, y = 11, j = 1 }),
+ OnNext(250, new { x = 5, i = 2, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 10, j = 0 }),
+ OnNext(270, new { x = 1, i = 3, y = 11, j = 1 }),
+ OnNext(270, new { x = 1, i = 3, y = 12, j = 2 }),
+ OnNext(270, new { x = 1, i = 3, y = 13, j = 3 }),
+ OnCompleted(290, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 290)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_CompleteOuterFirst()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnNext(223, 20),
+ OnNext(223, 31),
+ OnNext(223, 42),
+ OnNext(224, 50),
+ OnNext(224, 21),
+ OnNext(224, 32),
+ OnNext(224, 43),
+ OnNext(225, 51),
+ OnNext(226, 52),
+ OnNext(227, 53),
+ OnNext(228, 54),
+ OnCompleted<int>(228)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 224)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_CompleteInnerFirst()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(300)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnNext(223, 20),
+ OnNext(223, 31),
+ OnNext(223, 42),
+ OnNext(224, 50),
+ OnNext(224, 21),
+ OnNext(224, 32),
+ OnNext(224, 43),
+ OnNext(225, 51),
+ OnNext(226, 52),
+ OnNext(227, 53),
+ OnNext(228, 54),
+ OnCompleted<int>(300)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 300)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ErrorOuter()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnError<int>(224, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnNext(223, 20),
+ OnNext(223, 31),
+ OnNext(223, 42),
+ OnError<int>(224, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 224)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ErrorInner()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => x == 2
+ ? Observable.Throw<long>(ex, scheduler)
+ : Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x),
+ (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41),
+ OnError<int>(223, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 223)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y),
+ 223
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(221, 40),
+ OnNext(222, 30),
+ OnNext(222, 41)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 223)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ThrowSelector()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Throw<IObservable<long>>(ex), (x, _, y, __) => x * 10 + (int)y)
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(220, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 220)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_QueryOperator_ThrowResult()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(220, 4),
+ OnNext(221, 3),
+ OnNext(222, 2),
+ OnNext(223, 5),
+ OnCompleted<int>(224)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => Throw<int>(ex))
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(221, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 221)
+ );
+ }
+
[TestMethod]
public void SelectMany_Triple_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, (Func<int, IObservable<int>>)null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, null, DummyFunc<IObservable<int>>.Instance));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, null));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance).Subscribe(null));
@@ -10329,6 +15247,787 @@ namespace ReactiveTests.Tests
);
}
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(null, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, (Func<int, int, IObservable<int>>)null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, null, DummyFunc<IObservable<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, null));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance).Subscribe(null));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Index()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var witness = new { x = 0, i = 0 };
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, i) => Observable.Return(new { x, i }, scheduler),
+ ex => Observable.Throw(ex, scheduler, witness),
+ () => Observable.Empty(scheduler, witness)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, new { x = 0, i = 0 }),
+ OnNext(302, new { x = 1, i = 1 }),
+ OnNext(303, new { x = 2, i = 2 }),
+ OnNext(304, new { x = 3, i = 3 }),
+ OnNext(305, new { x = 4, i = 4 }),
+ OnCompleted(306, witness)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Identity()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Throw<int>(ex, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnCompleted<int>(306)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_InnersWithTiming1()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ysn = scheduler.CreateColdObservable(
+ OnNext(10, 10),
+ OnNext(20, 11),
+ OnNext(30, 12),
+ OnCompleted<int>(40)
+ );
+
+ var yse = scheduler.CreateColdObservable(
+ OnNext(0, 99),
+ OnCompleted<int>(10)
+ );
+
+ var ysc = scheduler.CreateColdObservable(
+ OnNext(10, 42),
+ OnCompleted<int>(20)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => ysn,
+ ex => yse,
+ () => ysc
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 10),
+ OnNext(311, 10),
+ OnNext(312, 10),
+ OnNext(313, 10),
+ OnNext(314, 10),
+ OnNext(315, 42),
+ OnNext(320, 11),
+ OnNext(321, 11),
+ OnNext(322, 11),
+ OnNext(323, 11),
+ OnNext(324, 11),
+ OnNext(330, 12),
+ OnNext(331, 12),
+ OnNext(332, 12),
+ OnNext(333, 12),
+ OnNext(334, 12),
+ OnCompleted<int>(344)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+
+ ysn.Subscriptions.AssertEqual(
+ Subscribe(300, 340),
+ Subscribe(301, 341),
+ Subscribe(302, 342),
+ Subscribe(303, 343),
+ Subscribe(304, 344)
+ );
+
+ yse.Subscriptions.AssertEqual(
+ );
+
+ ysc.Subscriptions.AssertEqual(
+ Subscribe(305, 325)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_InnersWithTiming2()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ysn = scheduler.CreateColdObservable(
+ OnNext(10, 10),
+ OnNext(20, 11),
+ OnNext(30, 12),
+ OnCompleted<int>(40)
+ );
+
+ var yse = scheduler.CreateColdObservable(
+ OnNext(0, 99),
+ OnCompleted<int>(10)
+ );
+
+ var ysc = scheduler.CreateColdObservable(
+ OnNext(10, 42),
+ OnCompleted<int>(50)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => ysn,
+ ex => yse,
+ () => ysc
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 10),
+ OnNext(311, 10),
+ OnNext(312, 10),
+ OnNext(313, 10),
+ OnNext(314, 10),
+ OnNext(315, 42),
+ OnNext(320, 11),
+ OnNext(321, 11),
+ OnNext(322, 11),
+ OnNext(323, 11),
+ OnNext(324, 11),
+ OnNext(330, 12),
+ OnNext(331, 12),
+ OnNext(332, 12),
+ OnNext(333, 12),
+ OnNext(334, 12),
+ OnCompleted<int>(355)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+
+ ysn.Subscriptions.AssertEqual(
+ Subscribe(300, 340),
+ Subscribe(301, 341),
+ Subscribe(302, 342),
+ Subscribe(303, 343),
+ Subscribe(304, 344)
+ );
+
+ yse.Subscriptions.AssertEqual(
+ );
+
+ ysc.Subscriptions.AssertEqual(
+ Subscribe(305, 355)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_InnersWithTiming3()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(400, 1),
+ OnNext(500, 2),
+ OnNext(600, 3),
+ OnNext(700, 4),
+ OnCompleted<int>(800)
+ );
+
+ var ysn = scheduler.CreateColdObservable(
+ OnNext(10, 10),
+ OnNext(20, 11),
+ OnNext(30, 12),
+ OnCompleted<int>(40)
+ );
+
+ var yse = scheduler.CreateColdObservable(
+ OnNext(0, 99),
+ OnCompleted<int>(10)
+ );
+
+ var ysc = scheduler.CreateColdObservable(
+ OnNext(10, 42),
+ OnCompleted<int>(100)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => ysn,
+ ex => yse,
+ () => ysc
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(310, 10),
+ OnNext(320, 11),
+ OnNext(330, 12),
+ OnNext(410, 10),
+ OnNext(420, 11),
+ OnNext(430, 12),
+ OnNext(510, 10),
+ OnNext(520, 11),
+ OnNext(530, 12),
+ OnNext(610, 10),
+ OnNext(620, 11),
+ OnNext(630, 12),
+ OnNext(710, 10),
+ OnNext(720, 11),
+ OnNext(730, 12),
+ OnNext(810, 42),
+ OnCompleted<int>(900)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 800)
+ );
+
+ ysn.Subscriptions.AssertEqual(
+ Subscribe(300, 340),
+ Subscribe(400, 440),
+ Subscribe(500, 540),
+ Subscribe(600, 640),
+ Subscribe(700, 740)
+ );
+
+ yse.Subscriptions.AssertEqual(
+ );
+
+ ysc.Subscriptions.AssertEqual(
+ Subscribe(800, 900)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Error_Identity()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, ex)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex1 => Observable.Throw<int>(ex1, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnError<int>(306, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_SelectMany()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Throw<int>(ex, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, 4),
+ OnNext(306, 3),
+ OnNext(307, 4),
+ OnNext(308, 4),
+ OnCompleted<int>(308)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Concat()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Throw<int>(ex, scheduler),
+ () => Observable.Range(1, 3, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnNext(306, 1),
+ OnNext(307, 2),
+ OnNext(308, 3),
+ OnCompleted<int>(309)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Catch()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Range(1, 3, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnCompleted<int>(306)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Error_Catch()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Return(x, scheduler),
+ ex => Observable.Range(1, 3, scheduler),
+ () => Observable.Empty<int>(scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(301, 0),
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(305, 4),
+ OnNext(306, 1),
+ OnNext(307, 2),
+ OnNext(308, 3),
+ OnCompleted<int>(309)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, -1),
+ OnNext(306, 4),
+ OnNext(306, 3),
+ OnNext(307, -1),
+ OnNext(307, 4),
+ OnNext(308, 4),
+ OnCompleted<int>(308)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_Error_All()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, 0),
+ OnNext(306, 4),
+ OnNext(306, 3),
+ OnNext(307, 0),
+ OnNext(307, 4),
+ OnNext(308, 4),
+ OnCompleted<int>(308)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_All_Dispose()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ ),
+ 307
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnNext(305, 4),
+ OnNext(305, 3),
+ OnNext(306, -1),
+ OnNext(306, 4),
+ OnNext(306, 3)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_All_Dispose_Before_First()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ ),
+ 304
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 304)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_OnNextThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Throw<IObservable<int>>(ex),
+ ex1 => Observable.Repeat(0, 2, scheduler),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnError<int>(300, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 300)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_OnErrorThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnError<int>(305, new Exception())
+ );
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex1 => Throw<IObservable<int>>(ex),
+ () => Observable.Repeat(-1, 2, scheduler)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnError<int>(305, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Triple_OnCompletedThrow()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(300, 0),
+ OnNext(301, 1),
+ OnNext(302, 2),
+ OnNext(303, 3),
+ OnNext(304, 4),
+ OnCompleted<int>(305)
+ );
+
+ var ex = new Exception();
+
+ var res = scheduler.Start(() =>
+ xs.SelectMany(
+ (x, _) => Observable.Repeat(x, x, scheduler),
+ ex1 => Observable.Repeat(0, 2, scheduler),
+ () => Throw<IObservable<int>>(ex)
+ )
+ );
+
+ res.Messages.AssertEqual(
+ OnNext(302, 1),
+ OnNext(303, 2),
+ OnNext(304, 3),
+ OnNext(304, 2),
+ OnError<int>(305, ex)
+ );
+
+ xs.Subscriptions.AssertEqual(
+ Subscribe(200, 305)
+ );
+ }
+
#if !NO_TPL
[TestMethod]
@@ -10339,7 +16038,7 @@ namespace ReactiveTests.Tests
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), x => t));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, default(Func<int, Task<int>>)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), (x, ct) => t));
+ ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), (int x, CancellationToken ct) => t));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, default(Func<int, CancellationToken, Task<int>>)));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), x => t, (x, y) => x));
@@ -11218,6 +16917,919 @@ namespace ReactiveTests.Tests
Assert.AreEqual(0, m);
}
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ArgumentChecking()
+ {
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, Task<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, Task<int>>)null));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, CancellationToken, Task<int>>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, CancellationToken, Task<int>>)null));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, Task<int>>.Instance, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, Task<int>>)null, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, Task<int>>.Instance, ((Func<int, int, int, int>)null)));
+
+ ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, CancellationToken, Task<int>>.Instance, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, CancellationToken, Task<int>>)null, DummyFunc<int, int, int, int>.Instance));
+ ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, CancellationToken, Task<int>>.Instance, ((Func<int, int, int, int>)null)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((int x, int i) => Task.Factory.StartNew(() => new { x, i })).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_Cancellation_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, i, ctx) => Task.Factory.StartNew(() => new { x, i }, ctx)).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((int x, int i) => Task.Factory.StartNew(() => new { x, i }), (x, i, r) => r).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector_Cancellation_Index()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, i, ctx) => Task.Factory.StartNew(() => new { x, i }, ctx), (x, i, r) => r).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task1()
+ {
+ var res = Observable.Range(0, 10).SelectMany((int x, int _) => Task.Factory.StartNew(() => x + 1)).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task2()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, _, ct) => Task.Factory.StartNew(() => x + 1, ct)).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_TaskThrows()
+ {
+ var ex = new Exception();
+
+ var res = Observable.Range(0, 10).SelectMany((int x, int _) => Task.Factory.StartNew(() =>
+ {
+ if (x > 5)
+ throw ex;
+ return x + 1;
+ })).ToEnumerable();
+
+ ReactiveAssert.Throws(ex, () =>
+ {
+ foreach (var x in res)
+ ;
+ });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_SelectorThrows()
+ {
+ var ex = new Exception();
+
+ var res = Observable.Range(0, 10).SelectMany((int x, int _) =>
+ {
+ if (x > 5)
+ throw ex;
+ return Task.Factory.StartNew(() => x + 1);
+ }).ToEnumerable();
+
+ ReactiveAssert.Throws(ex, () =>
+ {
+ foreach (var x in res)
+ ;
+ });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector1()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, _) => Task.Factory.StartNew(() => x + 1), (x, _, y) => x + y).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { 2 * x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelector2()
+ {
+ var res = Observable.Range(0, 10).SelectMany((x, _, ct) => Task.Factory.StartNew(() => x + 1, ct), (x, _, y) => x + y).ToEnumerable();
+ Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { 2 * x + 1 }).SequenceEqual(res.OrderBy(x => x)));
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_Task_ResultSelectorThrows()
+ {
+ var ex = new Exception();
+
+ var res = Observable.Range(0, 10).SelectMany((x, _) => Task.Factory.StartNew(() => x + 1), (x, _, y) =>
+ {
+ if (x > 5)
+ throw ex;
+ return x + y;
+ }).ToEnumerable();
+
+ ReactiveAssert.Throws(ex, () =>
+ {
+ foreach (var x in res)
+ ;
+ });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_RanToCompletion_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_RanToCompletion_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Faulted_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Faulted_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Canceled_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ tcss[1].SetCanceled();
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Canceled_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ tcss[1].SetCanceled();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_InnerCompleteBeforeOuter()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ xs.OnCompleted();
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_OuterCompleteBeforeInner()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (int x, int _) => tcss[x].Task);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_NeverInvoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => tcs.SetCanceled());
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ var d = res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_Invoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = false;
+ var d = res.Subscribe(lst.Add, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ d.Dispose();
+
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // never observed because xs.OnNext(2) happened after dispose
+
+ lst.AssertEqual(new[] { 42 });
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_AfterOuterError()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = false;
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => err = ex_, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ var ex = new Exception();
+ xs.OnError(ex);
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // no-op
+
+ lst.AssertEqual(new[] { 42 });
+ Assert.AreSame(ex, err);
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_AfterSelectorThrows()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[4];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+ tcss[3] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var ex = new Exception();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ if (x == 2)
+ throw ex;
+
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ });
+
+ var lst = new List<int>();
+
+ var done = false;
+ var evt = new ManualResetEvent(false);
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; evt.Set(); }, () => { done = true; evt.Set(); });
+
+ tcss[1].SetResult(43);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ tcss[0].SetResult(42);
+
+ xs.OnNext(2); // causes error
+ xs.OnCompleted();
+
+ evt.WaitOne();
+
+ Assert.IsFalse(done);
+ Assert.AreSame(ex, err);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(0, m);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_RanToCompletion_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 0, 43 + 1 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_RanToCompletion_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[2];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+
+ tcss[0].SetResult(42);
+ tcss[1].SetResult(43);
+
+ var res = Observable.SelectMany(Observable.Range(0, 2), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 0, 43 + 1 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Faulted_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Faulted_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var ex = new Exception();
+ tcss[1].SetException(ex);
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.AreSame(ex, err);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Canceled_Async()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ tcss[1].SetCanceled();
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Canceled_Sync()
+ {
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ tcss[1].SetCanceled();
+
+ var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var err = default(Exception);
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set());
+
+ done.WaitOne();
+
+ lst.AssertEqual(new int[0]);
+ Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task);
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_InnerCompleteBeforeOuter()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ xs.OnCompleted();
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_OuterCompleteBeforeInner()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _) => tcss[x].Task, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_NeverInvoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => tcs.SetCanceled());
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = new ManualResetEvent(false);
+ var d = res.Subscribe(lst.Add, () => done.Set());
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ tcss[0].SetResult(43);
+ tcss[2].SetResult(44);
+
+ done.WaitOne();
+
+ lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 });
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_Invoked()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = false;
+ var d = res.Subscribe(lst.Add, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ d.Dispose();
+
+ xs.OnNext(2);
+ xs.OnCompleted();
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // never observed because xs.OnNext(2) happened after dispose
+
+ lst.AssertEqual(new[] { 42 + 1 });
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_AfterOuterError()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[3];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = false;
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => err = ex_, () => done = true);
+
+ tcss[1].SetResult(42);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ var ex = new Exception();
+ xs.OnError(ex);
+
+ Assert.IsFalse(tcss[0].TrySetResult(43));
+ tcss[2].SetResult(44); // no-op
+
+ lst.AssertEqual(new[] { 42 + 1 });
+ Assert.AreSame(ex, err);
+ Assert.IsFalse(done);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(1, m); // tcss[1] was already finished
+ }
+
+ [TestMethod]
+ public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_AfterSelectorThrows()
+ {
+ var xs = new Subject<int>();
+
+ var tcss = new TaskCompletionSource<int>[4];
+ tcss[0] = new TaskCompletionSource<int>();
+ tcss[1] = new TaskCompletionSource<int>();
+ tcss[2] = new TaskCompletionSource<int>();
+ tcss[3] = new TaskCompletionSource<int>();
+
+ var n = 0;
+ var m = 0;
+
+ var ex = new Exception();
+
+ var res = Observable.SelectMany(xs, (x, _, token) =>
+ {
+ if (x == 2)
+ throw ex;
+
+ var tcs = tcss[x];
+
+ token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; });
+
+ return tcs.Task;
+ }, (x, _, y) => x + y);
+
+ var lst = new List<int>();
+
+ var done = false;
+ var evt = new ManualResetEvent(false);
+ var err = default(Exception);
+ res.Subscribe(lst.Add, ex_ => { err = ex_; evt.Set(); }, () => { done = true; evt.Set(); });
+
+ tcss[1].SetResult(43);
+
+ xs.OnNext(0);
+ xs.OnNext(1);
+
+ tcss[0].SetResult(42);
+
+ xs.OnNext(2); // causes error
+ xs.OnCompleted();
+
+ evt.WaitOne();
+
+ Assert.IsFalse(done);
+ Assert.AreSame(ex, err);
+ Assert.AreEqual(2, n);
+ Assert.AreEqual(0, m);
+ }
+
#endif
#endregion