diff options
Diffstat (limited to 'Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs')
-rw-r--r-- | Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs | 6870 |
1 files changed, 6741 insertions, 129 deletions
diff --git a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs index 0670909..dd17353 100644 --- a/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs +++ b/Rx/NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs @@ -2486,7 +2486,7 @@ namespace ReactiveTests.Tests var nullGroup = scheduler.CreateObserver<string>(); var err = default(Exception); - + scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_)); scheduler.Start(); @@ -2553,6 +2553,1704 @@ namespace ReactiveTests.Tests #endregion + #region + GroupBy w/capacity + + + private const int _groupByCapacity = 1024; + + [TestMethod] + public void GroupBy_Capacity_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, null)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null)); + + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default)); + } + + [TestMethod] + public void GroupBy_Capacity_KeyEle_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null)); + + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1)); + } + + [TestMethod] + public void GroupBy_Capacity_KeyComparer_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, (IEqualityComparer<int>)null)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null)); + + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default)); + } + + [TestMethod] + public void GroupBy_Capacity_Key_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null)); + + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1)); + } + + [TestMethod] + public void GroupBy_Capacity_WithKeyComparer() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupBy(x => + { + keyInvoked++; + return x.Trim(); + }, _groupByCapacity, comparer).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_Complete() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + _groupByCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + Assert.AreEqual(12, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_Error() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnError<string>(570, ex), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + _groupByCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnError<string>(570, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + Assert.AreEqual(12, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_Dispose() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + return x.Trim(); + }, x => + { + eleInvoked++; + return Reverse(x); + }, _groupByCapacity, comparer + ).Select(g => g.Key), + 355 + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz") + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 355) + ); + + Assert.AreEqual(5, keyInvoked); + Assert.AreEqual(5, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_KeyThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + if (keyInvoked == 10) + throw ex; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + _groupByCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnError<string>(480, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 480) + ); + + Assert.AreEqual(10, keyInvoked); + Assert.AreEqual(9, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_EleThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + if (eleInvoked == 10) + throw ex; + return Reverse(x); + }, + _groupByCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnError<string>(480, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 480) + ); + + Assert.AreEqual(10, keyInvoked); + Assert.AreEqual(10, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_ComparerEqualsThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + _groupByCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnError<string>(310, comparer.EqualsException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 310) + ); + + Assert.AreEqual(4, keyInvoked); + Assert.AreEqual(3, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_ComparerGetHashCodeThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410); + + var res = scheduler.Start(() => + xs.GroupBy( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + _groupByCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnError<string>(420, comparer.HashCodeException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + + Assert.AreEqual(8, keyInvoked); + Assert.AreEqual(7, eleInvoked); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); + })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnCompleted<string>(570) + ); + + res["baR"].Messages.AssertEqual( + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(570) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(480, " zab"), + OnNext(510, " ZAb "), + OnCompleted<string>(570) + ); + + res["qux"].Messages.AssertEqual( + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Complete_All() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnCompleted<string>(570) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(570) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnNext(480, " zab"), + OnNext(510, " ZAb "), + OnCompleted<string>(570) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Error() + { + var scheduler = new TestScheduler(); + + var ex1 = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnError<string>(570, ex1), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); + }, ex => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnError<string>(570, ex1) + ); + + res["baR"].Messages.AssertEqual( + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnError<string>(570, ex1) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(480, " zab"), + OnNext(510, " ZAb "), + OnError<string>(570, ex1) + ); + + res["qux"].Messages.AssertEqual( + OnError<string>(570, ex1) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + })); + + scheduler.ScheduleAbsolute(400, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof") + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab ") + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB ") + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq ") + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 400) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_KeyThrow() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + var keyInvoked = 0; + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => + { + keyInvoked++; + if (keyInvoked == 6) + throw ex; + return x.Trim(); + }, x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(3, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnError<string>(360, ex) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnError<string>(360, ex) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(360, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 360) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_EleThrow() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + var eleInvoked = 0; + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => + { + eleInvoked++; + if (eleInvoked == 6) + throw ex; + return Reverse(x); + }, _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnError<string>(360, ex) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnError<string>(360, ex) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(360, ex) + ); + + res["qux"].Messages.AssertEqual( + OnError<string>(360, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 360) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Comparer_EqualsThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnError<string>(420, comparer.EqualsException) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnError<string>(420, comparer.EqualsException) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(420, comparer.EqualsException) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnError<string>(420, comparer.EqualsException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Comparer_GetHashCodeThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnError<string>(420, comparer.HashCodeException) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnError<string>(420, comparer.HashCodeException) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(420, comparer.HashCodeException) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnError<string>(420, comparer.HashCodeException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Outer_Independence() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + var outerResults = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + outerResults.OnNext(group.Key); + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, outerResults.OnError, outerResults.OnCompleted)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose()); + + scheduler.Start(); + + Assert.AreEqual(2, inners.Count); + + outerResults.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR") + ); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnCompleted<string>(570) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Independence() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + var outerResults = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + outerResults.OnNext(group.Key); + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, outerResults.OnError, outerResults.OnCompleted)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof") + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(570) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnNext(480, " zab"), + OnNext(510, " ZAb "), + OnCompleted<string>(570) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Multiple_Independence() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + var outerResults = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + outerResults.OnNext(group.Key); + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, outerResults.OnError, outerResults.OnCompleted)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); + scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose()); + scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose()); + scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose()); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof") + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab") + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB ") + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq ") + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Escape_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(310, "foO "), + OnNext(470, "FOO "), + OnNext(530, " fOo "), + OnCompleted<string>(570) + ); + + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inner = default(IObservable<string>); + var innerSubscription = default(IDisposable); + var res = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + inner = group; + })); + + scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + innerSubscription.Dispose(); + }); + + scheduler.Start(); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + res.Messages.AssertEqual( + OnCompleted<string>(600) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Escape_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(310, "foO "), + OnNext(470, "FOO "), + OnNext(530, " fOo "), + OnError<string>(570, ex) + ); + + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inner = default(IObservable<string>); + var innerSubscription = default(IDisposable); + var res = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + inner = group; + }, _ => { })); + + scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + innerSubscription.Dispose(); + }); + + scheduler.Start(); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + res.Messages.AssertEqual( + OnError<string>(600, ex) + ); + } + + [TestMethod] + public void GroupBy_Capacity_Inner_Escape_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(310, "foO "), + OnNext(470, "FOO "), + OnNext(530, " fOo "), + OnError<string>(570, new Exception()) + ); + + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inner = default(IObservable<string>); + var innerSubscription = default(IDisposable); + var res = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + inner = group; + })); + + scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose()); + + scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + innerSubscription.Dispose(); + }); + + scheduler.Start(); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 400) + ); + + res.Messages.AssertEqual( + ); + } + + [TestMethod] + public void GroupBy_Capacity_NullKeys_Simple() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, "bar"), + OnNext(240, "foo"), + OnNext(310, "qux"), + OnNext(470, "baz"), + OnCompleted<string>(500) + ); + + var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); + + res.Messages.AssertEqual( + OnNext(220, "(null)bar"), + OnNext(240, "FOOfoo"), + OnNext(310, "QUXqux"), + OnNext(470, "(null)baz"), + OnCompleted<string>(500) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 500) + ); + } + + [TestMethod] + public void GroupBy_Capacity_NullKeys_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, "bar"), + OnNext(240, "foo"), + OnNext(310, "qux"), + OnNext(470, "baz"), + OnError<string>(500, ex) + ); + + var nullGroup = scheduler.CreateObserver<string>(); + var err = default(Exception); + + scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_)); + scheduler.Start(); + + Assert.AreSame(ex, err); + + nullGroup.Messages.AssertEqual( + OnNext(220, "bar"), + OnNext(470, "baz"), + OnError<string>(500, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 500) + ); + } + + #endregion + #region + GroupByUntil + [TestMethod] @@ -4405,6 +6103,1874 @@ namespace ReactiveTests.Tests #endregion + #region + GroupByUntil w/capacity + + + private const int _groupByUntilCapacity = 1024; + + [TestMethod] + public void GroupByUntil_Capacity_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>))); + + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity)); + + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>))); + + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity)); + + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1)); + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default)); + ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1)); + } + + [TestMethod] + public void GroupByUntil_Capacity_WithKeyComparer() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnNext(470, "FOO"), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_Complete() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnNext(470, "FOO"), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + Assert.AreEqual(12, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_Error() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnError<string>(570, ex), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnNext(470, "FOO"), + OnError<string>(570, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + Assert.AreEqual(12, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_Dispose() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key), + 355 + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz") + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 355) + ); + + Assert.AreEqual(5, keyInvoked); + Assert.AreEqual(5, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_KeyThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + if (keyInvoked == 10) + throw ex; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnNext(470, "FOO"), + OnError<string>(480, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 480) + ); + + Assert.AreEqual(10, keyInvoked); + Assert.AreEqual(9, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_EleThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + if (eleInvoked == 10) + throw ex; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnNext(470, "FOO"), + OnError<string>(480, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 480) + ); + + Assert.AreEqual(10, keyInvoked); + Assert.AreEqual(10, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_ComparerEqualsThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnError<string>(310, comparer.EqualsException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 310) + ); + + Assert.AreEqual(4, keyInvoked); + Assert.AreEqual(3, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_ComparerGetHashCodeThrow() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity, + comparer + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR"), + OnNext(350, "Baz"), + OnNext(360, "qux"), + OnError<string>(420, comparer.HashCodeException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + + Assert.AreEqual(8, keyInvoked); + Assert.AreEqual(7, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); + })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(5, inners.Count); + + res["foo"].Messages.AssertEqual( + OnCompleted<string>(320) + ); + + res["baR"].Messages.AssertEqual( + OnNext(390, "rab "), + OnCompleted<string>(420) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(480, " zab"), + OnCompleted<string>(510) + ); + + res["qux"].Messages.AssertEqual( + OnCompleted<string>(570) + ); + + res["FOO"].Messages.AssertEqual( + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Complete_All() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(5, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(420) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnNext(480, " zab"), + OnNext(510, " ZAb "), + OnCompleted<string>(510) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnCompleted<string>(570) + ); + + res["FOO"].Messages.AssertEqual( + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Error() + { + var scheduler = new TestScheduler(); + + var ex1 = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnError<string>(570, ex1), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result)); + }, ex => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(5, inners.Count); + + res["foo"].Messages.AssertEqual( + OnCompleted<string>(320) + ); + + res["baR"].Messages.AssertEqual( + OnNext(390, "rab "), + OnCompleted<string>(420) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(480, " zab"), + OnCompleted<string>(510) + ); + + res["qux"].Messages.AssertEqual( + OnError<string>(570, ex1) + ); + + res["FOO"].Messages.AssertEqual( + OnError<string>(570, ex1) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + })); + + scheduler.ScheduleAbsolute(400, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab ") + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB ") + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq ") + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 400) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_KeyThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + var keyInvoked = 0; + var ex = new Exception(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => + { + keyInvoked++; + if (keyInvoked == 6) + throw ex; + return x.Trim(); + }, x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(3, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnError<string>(360, ex) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(360, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 360) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_EleThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + var eleInvoked = 0; + var ex = new Exception(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => + { + eleInvoked++; + if (eleInvoked == 6) + throw ex; + return Reverse(x); + }, g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnError<string>(360, ex) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(360, ex) + ); + + res["qux"].Messages.AssertEqual( + OnError<string>(360, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 360) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Comparer_EqualsThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnError<string>(420, comparer.EqualsException) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(420, comparer.EqualsException) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnError<string>(420, comparer.EqualsException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Comparer_GetHashCodeThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, _ => { })); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.Start(); + + Assert.AreEqual(4, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnError<string>(420, comparer.HashCodeException) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnError<string>(420, comparer.HashCodeException) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnError<string>(420, comparer.HashCodeException) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Outer_Independence() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + var outerResults = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + outerResults.OnNext(group.Key); + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, outerResults.OnError, outerResults.OnCompleted)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose()); + + scheduler.Start(); + + Assert.AreEqual(2, inners.Count); + + outerResults.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "baR") + ); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(420) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Independence() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + var outerResults = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + outerResults.OnNext(group.Key); + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, outerResults.OnError, outerResults.OnCompleted)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); + + scheduler.Start(); + + Assert.AreEqual(5, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab"), + OnNext(390, "rab "), + OnNext(420, " RAB "), + OnCompleted<string>(420) + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB "), + OnNext(480, " zab"), + OnNext(510, " ZAb "), + OnCompleted<string>(510) + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq "), + OnCompleted<string>(570) + ); + + res["FOO"].Messages.AssertEqual( + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Multiple_Independence() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var comparer = new GroupByComparer(scheduler); + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inners = new Dictionary<string, IObservable<string>>(); + var innerSubscriptions = new Dictionary<string, IDisposable>(); + var res = new Dictionary<string, ITestableObserver<string>>(); + var outerResults = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + outerResults.OnNext(group.Key); + var result = scheduler.CreateObserver<string>(); + inners[group.Key] = group; + res[group.Key] = result; + innerSubscriptions[group.Key] = group.Subscribe(result); + }, outerResults.OnError, outerResults.OnCompleted)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + foreach (var d in innerSubscriptions.Values) + d.Dispose(); + }); + + scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose()); + scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose()); + scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose()); + scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose()); + + scheduler.Start(); + + Assert.AreEqual(5, inners.Count); + + res["foo"].Messages.AssertEqual( + OnNext(220, "oof "), + OnNext(240, " OoF "), + OnNext(310, " Oof"), + OnCompleted<string>(310) + ); + + res["baR"].Messages.AssertEqual( + OnNext(270, " Rab") + ); + + res["Baz"].Messages.AssertEqual( + OnNext(350, " zaB ") + ); + + res["qux"].Messages.AssertEqual( + OnNext(360, " xuq ") + ); + + res["FOO"].Messages.AssertEqual( + OnNext(470, " OOF"), + OnNext(530, " oOf "), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Escape_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(310, "foO "), + OnNext(470, "FOO "), + OnNext(530, " fOo "), + OnCompleted<string>(570) + ); + + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inner = default(IObservable<string>); + var innerSubscription = default(IDisposable); + var res = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + inner = group; + })); + + scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + innerSubscription.Dispose(); + }); + + scheduler.Start(); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + res.Messages.AssertEqual( + OnCompleted<string>(600) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Escape_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(310, "foO "), + OnNext(470, "FOO "), + OnNext(530, " fOo "), + OnError<string>(570, ex) + ); + + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inner = default(IObservable<string>); + var innerSubscription = default(IDisposable); + var res = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + inner = group; + }, _ => { })); + + scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + outerSubscription.Dispose(); + innerSubscription.Dispose(); + }); + + scheduler.Start(); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + res.Messages.AssertEqual( + OnError<string>(600, ex) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Inner_Escape_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(310, "foO "), + OnNext(470, "FOO "), + OnNext(530, " fOo "), + OnError<string>(570, new Exception()) + ); + + var outer = default(IObservable<IGroupedObservable<string, string>>); + var outerSubscription = default(IDisposable); + var inner = default(IObservable<string>); + var innerSubscription = default(IDisposable); + var res = scheduler.CreateObserver<string>(); + + scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity)); + + scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group => + { + inner = group; + })); + + scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose()); + + scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res)); + + scheduler.ScheduleAbsolute(Disposed, () => + { + innerSubscription.Dispose(); + }); + + scheduler.Start(); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 290) + ); + + res.Messages.AssertEqual( + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_Default() + { + var scheduler = new TestScheduler(); + + var keyInvoked = 0; + var eleInvoked = 0; + + var xs = scheduler.CreateHotObservable( + OnNext(90, "error"), + OnNext(110, "error"), + OnNext(130, "error"), + OnNext(220, " foo"), + OnNext(240, " FoO "), + OnNext(270, "baR "), + OnNext(310, "foO "), + OnNext(350, " Baz "), + OnNext(360, " qux "), + OnNext(390, " bar"), + OnNext(420, " BAR "), + OnNext(470, "FOO "), + OnNext(480, "baz "), + OnNext(510, " bAZ "), + OnNext(530, " fOo "), + OnCompleted<string>(570), + OnNext(580, "error"), + OnCompleted<string>(600), + OnError<string>(650, new Exception()) + ); + + var res = scheduler.Start(() => + xs.GroupByUntil( + x => + { + keyInvoked++; + return x.Trim().ToLower(); + }, + x => + { + eleInvoked++; + return Reverse(x); + }, + g => g.Skip(2), + _groupByUntilCapacity + ).Select(g => g.Key) + ); + + res.Messages.AssertEqual( + OnNext(220, "foo"), + OnNext(270, "bar"), + OnNext(350, "baz"), + OnNext(360, "qux"), + OnNext(470, "foo"), + OnCompleted<string>(570) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 570) + ); + + Assert.AreEqual(12, keyInvoked); + Assert.AreEqual(12, eleInvoked); + } + + [TestMethod] + public void GroupByUntil_Capacity_DurationSelector_Throws() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, "foo") + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.GroupByUntil<string, string, string>(x => x, g => { throw ex; }, _groupByUntilCapacity) + ); + + res.Messages.AssertEqual( + OnError<IGroupedObservable<string, string>>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_NullKeys_Simple_Never() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, "bar"), + OnNext(240, "foo"), + OnNext(310, "qux"), + OnNext(470, "baz"), + OnCompleted<string>(500) + ); + + var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); + + res.Messages.AssertEqual( + OnNext(220, "(null)bar"), + OnNext(240, "FOOfoo"), + OnNext(310, "QUXqux"), + OnNext(470, "(null)baz"), + OnCompleted<string>(500) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 500) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_NullKeys_Simple_Expire1() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, "bar"), + OnNext(240, "foo"), + OnNext(310, "qux"), + OnNext(470, "baz"), + OnCompleted<string>(500) + ); + + var n = 0; + var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); + + Assert.AreEqual(2, n); + + res.Messages.AssertEqual( + OnNext(220, "(null)bar"), + OnNext(240, "FOOfoo"), + OnNext(310, "QUXqux"), + OnNext(470, "(null)baz"), + OnCompleted<string>(500) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 500) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_NullKeys_Simple_Expire2() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, "bar"), + OnNext(240, "foo"), + OnNext(310, "qux"), + OnNext(470, "baz"), + OnCompleted<string>(500) + ); + + var n = 0; + var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x)); + + Assert.AreEqual(2, n); + + res.Messages.AssertEqual( + OnNext(220, "(null)bar"), + OnNext(240, "FOOfoo"), + OnNext(310, "QUXqux"), + OnNext(470, "(null)baz"), + OnCompleted<string>(500) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 500) + ); + } + + [TestMethod] + public void GroupByUntil_Capacity_NullKeys_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, "bar"), + OnNext(240, "foo"), + OnNext(310, "qux"), + OnNext(470, "baz"), + OnError<string>(500, ex) + ); + + var nullGroup = scheduler.CreateObserver<string>(); + var err = default(Exception); + + scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_)); + scheduler.Start(); + + Assert.AreSame(ex, err); + + nullGroup.Messages.AssertEqual( + OnNext(220, "bar"), + OnNext(470, "baz"), + OnError<string>(500, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 500) + ); + } + + #endregion + #region + GroupJoin + [TestMethod] @@ -8425,196 +11991,620 @@ namespace ReactiveTests.Tests } [TestMethod] - // Tests this overload: - // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector); - public void SelectMany_WithIndex_Complete() + public void SelectManyWithIndex_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, IObservable<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IObservable<int>>)null)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance).Subscribe(null)); + } + + [TestMethod] + public void SelectManyWithIndex_Index() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 4), + OnNext(220, 3), + OnNext(250, 5), + OnNext(270, 1), + OnCompleted<int>(290) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, i) => Observable.Return(new { x, i })) + ); + + var witness = new { x = 0, i = 0 }; + + res.Messages.AssertEqual( + OnNext(210, new { x = 4, i = 0 }), + OnNext(220, new { x = 3, i = 1 }), + OnNext(250, new { x = 5, i = 2 }), + OnNext(270, new { x = 1, i = 3 }), + OnCompleted(290, witness) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 290) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnCompleted<int>(460))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203), + OnCompleted<int>(205))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))), + OnCompleted<ITestableObservable<int>>(900) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => x) + ); + + res.Messages.AssertEqual( + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnNext(560, 301), + OnNext(580, 202), + OnNext(590, 203), + OnNext(600, 302), + OnNext(620, 303), + OnNext(740, 106), + OnNext(810, 304), + OnNext(860, 305), + OnNext(930, 401), + OnNext(940, 402), + OnCompleted<int>(960) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 900)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 760)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 605)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + Subscribe(550, 960)); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + Subscribe(750, 790)); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( + Subscribe(850, 950)); + } + + [TestMethod] + public void SelectManyWithIndex_Complete_InnerNotComplete() { var scheduler = new TestScheduler(); - ITestableObservable<char> cs = scheduler.CreateHotObservable( - OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored. - OnNext(250, 'a'), - OnNext(270, 'l'), - OnNext(310, 'o'), - OnCompleted<char>(410) - ); + var xs = scheduler.CreateHotObservable( + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnCompleted<int>(460))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))), + OnCompleted<ITestableObservable<int>>(900) + ); var res = scheduler.Start(() => - cs.SelectMany( - (x, i) => Observable.Return(new { x, i }, scheduler) - )); + xs.SelectMany((x, _) => x) + ); res.Messages.AssertEqual( - OnNext(251, new { x = 'a', i = 0 }), - OnNext(271, new { x = 'l', i = 1 }), - OnNext(311, new { x = 'o', i = 2 }), - OnCompleted(new { x = default(char), i = default(int) }, 410) + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnNext(560, 301), + OnNext(580, 202), + OnNext(590, 203), + OnNext(600, 302), + OnNext(620, 303), + OnNext(740, 106), + OnNext(810, 304), + OnNext(860, 305), + OnNext(930, 401), + OnNext(940, 402) ); - cs.Subscriptions.AssertEqual( - Subscribe(200, 410)); + xs.Subscriptions.AssertEqual( + Subscribe(200, 900)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 760)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 1000)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + Subscribe(550, 960)); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + Subscribe(750, 790)); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( + Subscribe(850, 950)); } [TestMethod] - // Tests this overload: - // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector); - public void SelectMany_WithIndex_IEnumerable_Complete() + public void SelectManyWithIndex_Complete_OuterNotComplete() { var scheduler = new TestScheduler(); - ITestableObservable<char> cs = scheduler.CreateHotObservable( - OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored. - OnNext(250, 'a'), - OnNext(270, 'l'), - OnNext(310, 'o'), - OnCompleted<char>(410) - ); + var xs = scheduler.CreateHotObservable( + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnCompleted<int>(460))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203), + OnCompleted<int>(205))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))) + ); var res = scheduler.Start(() => - cs.SelectMany( - (c, i) => new [] { new { c = c, i = i } } - )); + xs.SelectMany((x, _) => x) + ); - res.Messages.AssertEqual( - OnNext(250, new { c = 'a', i = 0 }), - OnNext(270, new { c = 'l', i = 1 }), - OnNext(310, new { c = 'o', i = 2 }), - OnCompleted(new { c = default(char), i = default(int) }, 410) + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnNext(560, 301), + OnNext(580, 202), + OnNext(590, 203), + OnNext(600, 302), + OnNext(620, 303), + OnNext(740, 106), + OnNext(810, 304), + OnNext(860, 305), + OnNext(930, 401), + OnNext(940, 402) ); - cs.Subscriptions.AssertEqual( - Subscribe(200, 410)); + xs.Subscriptions.AssertEqual( + Subscribe(200, 1000)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 760)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 605)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + Subscribe(550, 960)); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + Subscribe(750, 790)); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( + Subscribe(850, 950)); } + [TestMethod] + public void SelectManyWithIndex_Error_Outer() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnCompleted<int>(460))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203), + OnCompleted<int>(205))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))), + OnError<ITestableObservable<int>>(900, ex) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => x) + ); + + res.Messages.AssertEqual( + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnNext(560, 301), + OnNext(580, 202), + OnNext(590, 203), + OnNext(600, 302), + OnNext(620, 303), + OnNext(740, 106), + OnNext(810, 304), + OnNext(860, 305), + OnError<int>(900, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 900)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 760)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 605)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + Subscribe(550, 900)); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + Subscribe(750, 790)); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( + Subscribe(850, 900)); + } [TestMethod] - // Tests this overload: - // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, int, TResult> resultSelector); - public void SelectMany_WithIndex_IObservable_ResultSelector_Complete() + public void SelectManyWithIndex_Error_Inner() { var scheduler = new TestScheduler(); - ITestableObservable<ITestableObservable<char>> css = scheduler.CreateHotObservable( - OnNext(190, scheduler.CreateColdObservable( - OnNext(1, 'h'), - OnCompleted<char>(2))), - OnNext(250, scheduler.CreateColdObservable( - OnNext(1, 'a'), - OnCompleted<char>(2))), - OnNext(270, scheduler.CreateColdObservable( - OnNext(1, 'l'), - OnCompleted<char>(2))), - OnNext(310, scheduler.CreateColdObservable( - OnNext(1, 'o'), - OnNext(2, ' '), - OnNext(3, 'r'), - OnNext(4, 'u'), - OnNext(5, 'l'), - OnNext(6, 'e'), - OnNext(7, 'z'), - OnCompleted<char>(8))), - OnCompleted<ITestableObservable<char>>(410) - ); + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnError<int>(460, ex))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203), + OnCompleted<int>(205))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))), + OnCompleted<ITestableObservable<int>>(900) + ); var res = scheduler.Start(() => - css.SelectMany( - (foo, i) => - { - return foo.Select(c => new { c = c, i = i }); - }, - (source, i, cs, j) => new { c = cs.c, i = cs.i, i2 = i, j = j } - )); + xs.SelectMany((x, _) => x) + ); res.Messages.AssertEqual( - OnNext(251, new { c = 'a', i = 0, i2 = 0, j = 0 }), - OnNext(271, new { c = 'l', i = 1, i2 = 1, j = 0 }), - OnNext(311, new { c = 'o', i = 2, i2 = 2, j = 0 }), - OnNext(312, new { c = ' ', i = 2, i2 = 2, j = 1 }), - OnNext(313, new { c = 'r', i = 2, i2 = 2, j = 2 }), - OnNext(314, new { c = 'u', i = 2, i2 = 2, j = 3 }), - OnNext(315, new { c = 'l', i = 2, i2 = 2, j = 4 }), - OnNext(316, new { c = 'e', i = 2, i2 = 2, j = 5 }), - OnNext(317, new { c = 'z', i = 2, i2 = 2, j = 6 }), - OnCompleted(new { c = 'a', i = 0, i2 = 0, j = 0 }, 410) + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnNext(560, 301), + OnNext(580, 202), + OnNext(590, 203), + OnNext(600, 302), + OnNext(620, 303), + OnNext(740, 106), + OnError<int>(760, ex) ); - css.Subscriptions.AssertEqual( - Subscribe(200, 410)); + xs.Subscriptions.AssertEqual( + Subscribe(200, 760)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 760)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 605)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + Subscribe(550, 760)); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + Subscribe(750, 760)); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( + ); } - [TestMethod] - // Tests this overload: - // IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector); - public void SelectMany_WithIndex_IEnumerable_ResultSelector_Complete() + public void SelectManyWithIndex_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( - OnNext(210, 5), - OnNext(340, 4), - OnNext(420, 3), - OnCompleted<int>(600) + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnCompleted<int>(460))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203), + OnCompleted<int>(205))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))), + OnCompleted<ITestableObservable<int>>(900) ); var res = scheduler.Start(() => - xs.SelectMany( - (x, i) => new[] { new { x = x + 1, i = i }, new { x = -x, i = i }, new { x = x * x, i = i } }, - (x, i, y, j) => new { x = x, i = i, y = y.x, y_i = y.i, j = j }) + xs.SelectMany((x, _) => x), + 700 ); - + res.Messages.AssertEqual( - OnNext(210, new { x = 5, i = 0, y = 6, y_i = 0, j = 0 }), - OnNext(210, new { x = 5, i = 0, y = -5, y_i = 0, j = 1 }), - OnNext(210, new { x = 5, i = 0, y = 25, y_i = 0, j = 2 }), - OnNext(340, new { x = 4, i = 1, y = 5, y_i = 1, j = 0 }), - OnNext(340, new { x = 4, i = 1, y = -4, y_i = 1, j = 1 }), - OnNext(340, new { x = 4, i = 1, y = 16, y_i = 1, j = 2 }), - OnNext(420, new { x = 3, i = 2, y = 4, y_i = 2, j = 0 }), - OnNext(420, new { x = 3, i = 2, y = -3, y_i = 2, j = 1 }), - OnNext(420, new { x = 3, i = 2, y = 9, y_i = 2, j = 2 }), - OnCompleted(new { x = default(int), i = default(int), y = default(int), y_i = default(int), j = default(int) }, 600) + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnNext(560, 301), + OnNext(580, 202), + OnNext(590, 203), + OnNext(600, 302), + OnNext(620, 303) ); xs.Subscriptions.AssertEqual( - Subscribe(200, 600) + Subscribe(200, 700)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 700)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 605)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + Subscribe(550, 700)); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + ); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( ); } [TestMethod] - // Tests this overload: - // IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted); - public void SelectMany_WithIndex_Triple_Complete() + public void SelectManyWithIndex_Throw() { var scheduler = new TestScheduler(); - ITestableObservable<char> cs = scheduler.CreateHotObservable( - OnNext(190, 'h'), // Test scheduler starts pushing events at time 200, so this is ignored. - OnNext(250, 'a'), - OnNext(270, 'l'), - OnNext(310, 'o'), - OnCompleted<char>(410) - ); + var xs = scheduler.CreateHotObservable( + OnNext(5, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(105, scheduler.CreateColdObservable( + OnError<int>(1, new InvalidOperationException()))), + OnNext(300, scheduler.CreateColdObservable( + OnNext(10, 102), + OnNext(90, 103), + OnNext(110, 104), + OnNext(190, 105), + OnNext(440, 106), + OnCompleted<int>(460))), + OnNext(400, scheduler.CreateColdObservable( + OnNext(180, 202), + OnNext(190, 203), + OnCompleted<int>(205))), + OnNext(550, scheduler.CreateColdObservable( + OnNext(10, 301), + OnNext(50, 302), + OnNext(70, 303), + OnNext(260, 304), + OnNext(310, 305), + OnCompleted<int>(410))), + OnNext(750, scheduler.CreateColdObservable( + OnCompleted<int>(40))), + OnNext(850, scheduler.CreateColdObservable( + OnNext(80, 401), + OnNext(90, 402), + OnCompleted<int>(100))), + OnCompleted<ITestableObservable<int>>(900) + ); + + var invoked = 0; + + var ex = new Exception(); var res = scheduler.Start(() => - cs.SelectMany( - (c, i) => Observable.Return(new { c = c, i = i }, scheduler), - (ex, i) => { throw ex; }, - (i) => Observable.Repeat(new { c = 'x', i = -1 }, i, scheduler) - )); + xs.SelectMany((x, _) => + { + invoked++; + if (invoked == 3) + throw ex; + return x; + }) + ); res.Messages.AssertEqual( - OnNext(251, new { c = 'a', i = 0 }), - OnNext(271, new { c = 'l', i = 1 }), - OnNext(311, new { c = 'o', i = 2 }), - OnCompleted(new { c = default(char), i = default(int) }, 410) + OnNext(310, 102), + OnNext(390, 103), + OnNext(410, 104), + OnNext(490, 105), + OnError<int>(550, ex) ); - cs.Subscriptions.AssertEqual( - Subscribe(200, 410)); + xs.Subscriptions.AssertEqual( + Subscribe(200, 550)); + + xs.Messages[2].Value.Value.Subscriptions.AssertEqual( + Subscribe(300, 550)); + + xs.Messages[3].Value.Value.Subscriptions.AssertEqual( + Subscribe(400, 550)); + + xs.Messages[4].Value.Value.Subscriptions.AssertEqual( + ); + + xs.Messages[5].Value.Value.Subscriptions.AssertEqual( + ); + + xs.Messages[6].Value.Value.Subscriptions.AssertEqual( + ); + + Assert.AreEqual(3, invoked); } + [TestMethod] + public void SelectManyWithIndex_UseFunction() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 4), + OnNext(220, 3), + OnNext(250, 5), + OnNext(270, 1), + OnCompleted<int>(290) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(10), scheduler).Select(__ => x).Take(x)) + ); + + res.Messages.AssertEqual( + OnNext(220, 4), + OnNext(230, 3), + OnNext(230, 4), + OnNext(240, 3), + OnNext(240, 4), + OnNext(250, 3), + OnNext(250, 4), + OnNext(260, 5), + OnNext(270, 5), + OnNext(280, 1), + OnNext(280, 5), + OnNext(290, 5), + OnNext(300, 5), + OnCompleted<int>(300) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 290) + ); + } [TestMethod] public void SelectMany_Enumerable_ArgumentChecking() @@ -9311,6 +13301,652 @@ namespace ReactiveTests.Tests } [TestMethod] + public void SelectManyWithIndex_Enumerable_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, IEnumerable<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IEnumerable<int>>)null)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IEnumerable<int>>.Instance).Subscribe(null)); + + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, IEnumerable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IEnumerable<int>>)null, DummyFunc<int, int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IEnumerable<int>>.Instance, (Func<int, int, int, int, int>)null)); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Index() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 4), + OnNext(220, 3), + OnNext(250, 5), + OnNext(270, 1), + OnCompleted<int>(290) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, i) => new[] { new { x, i } }) + ); + + var witness = new { x = 0, i = 0 }; + + res.Messages.AssertEqual( + OnNext(210, new { x = 4, i = 0 }), + OnNext(220, new { x = 3, i = 1 }), + OnNext(250, new { x = 5, i = 2 }), + OnNext(270, new { x = 1, i = 3 }), + OnCompleted(290, witness) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 290) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_ResultSelector_Index() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 4), + OnNext(220, 3), + OnNext(250, 5), + OnNext(270, 1), + OnCompleted<int>(290) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, i) => Enumerable.Range(10, i + 1), (x, i, y, j) => new { x, i, y, j }) + ); + + var witness = new { x = 0, i = 0, y = 0, j = 0 }; + + res.Messages.AssertEqual( + OnNext(210, new { x = 4, i = 0, y = 10, j = 0 }), + OnNext(220, new { x = 3, i = 1, y = 10, j = 0 }), + OnNext(220, new { x = 3, i = 1, y = 11, j = 1 }), + OnNext(250, new { x = 5, i = 2, y = 10, j = 0 }), + OnNext(250, new { x = 5, i = 2, y = 11, j = 1 }), + OnNext(250, new { x = 5, i = 2, y = 12, j = 2 }), + OnNext(270, new { x = 1, i = 3, y = 10, j = 0 }), + OnNext(270, new { x = 1, i = 3, y = 11, j = 1 }), + OnNext(270, new { x = 1, i = 3, y = 12, j = 2 }), + OnNext(270, new { x = 1, i = 3, y = 13, j = 3 }), + OnCompleted(290, witness) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 290) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Complete() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var inners = new List<MockEnumerable<int>>(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => + { + var ys = new MockEnumerable<int>(scheduler, Enumerable.Repeat(x, x)); + inners.Add(ys); + return ys; + }) + ); + + res.Messages.AssertEqual( + OnNext(210, 2), + OnNext(210, 2), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnNext(420, 3), + OnNext(420, 3), + OnNext(420, 3), + OnNext(510, 2), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 600) + ); + + Assert.AreEqual(4, inners.Count); + + inners[0].Subscriptions.AssertEqual( + Subscribe(210, 210) + ); + + inners[1].Subscriptions.AssertEqual( + Subscribe(340, 340) + ); + + inners[2].Subscriptions.AssertEqual( + Subscribe(420, 420) + ); + + inners[3].Subscriptions.AssertEqual( + Subscribe(510, 510) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Complete_ResultSelector() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y) + ); + + res.Messages.AssertEqual( + OnNext(210, 4), + OnNext(210, 4), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(420, 6), + OnNext(420, 6), + OnNext(420, 6), + OnNext(510, 4), + OnNext(510, 4), + OnCompleted<int>(600) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 600) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Error() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnError<int>(600, ex) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Enumerable.Repeat(x, x)) + ); + + res.Messages.AssertEqual( + OnNext(210, 2), + OnNext(210, 2), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnNext(420, 3), + OnNext(420, 3), + OnNext(420, 3), + OnNext(510, 2), + OnNext(510, 2), + OnError<int>(600, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 600) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Error_ResultSelector() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnError<int>(600, ex) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y) + ); + + res.Messages.AssertEqual( + OnNext(210, 4), + OnNext(210, 4), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(420, 6), + OnNext(420, 6), + OnNext(420, 6), + OnNext(510, 4), + OnNext(510, 4), + OnError<int>(600, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 600) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Enumerable.Repeat(x, x)), + 350 + ); + + res.Messages.AssertEqual( + OnNext(210, 2), + OnNext(210, 2), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 350) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_Dispose_ResultSelector() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Enumerable.Repeat(x, x), (x, _, y, __) => x + y), + 350 + ); + + res.Messages.AssertEqual( + OnNext(210, 4), + OnNext(210, 4), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 350) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_SelectorThrows() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var invoked = 0; + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => + { + invoked++; + if (invoked == 3) + throw ex; + + return Enumerable.Repeat(x, x); + }) + ); + + res.Messages.AssertEqual( + OnNext(210, 2), + OnNext(210, 2), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnNext(340, 4), + OnError<int>(420, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + + Assert.AreEqual(3, invoked); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_ResultSelectorThrows() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var inners = new List<MockEnumerable<int>>(); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => + { + var ys = new MockEnumerable<int>(scheduler, Enumerable.Repeat(x, x)); + inners.Add(ys); + return ys; + }, + (x, _, y, __) => + { + if (x == 3) + throw ex; + + return x + y; + } + ) + ); + + res.Messages.AssertEqual( + OnNext(210, 4), + OnNext(210, 4), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnError<int>(420, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + + Assert.AreEqual(3, inners.Count); + + inners[0].Subscriptions.AssertEqual( + Subscribe(210, 210) + ); + + inners[1].Subscriptions.AssertEqual( + Subscribe(340, 340) + ); + + inners[2].Subscriptions.AssertEqual( + Subscribe(420, 420) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_ResultSelector_GetEnumeratorThrows() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => new RogueEnumerable<int>(ex), (x, _, y, __) => x + y) + ); + + res.Messages.AssertEqual( + OnError<int>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_SelectorThrows_ResultSelector() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var invoked = 0; + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => + { + invoked++; + if (invoked == 3) + throw ex; + + return Enumerable.Repeat(x, x); + }, + (x, _, y, __) => x + y + ) + ); + + res.Messages.AssertEqual( + OnNext(210, 4), + OnNext(210, 4), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnNext(340, 8), + OnError<int>(420, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 420) + ); + + Assert.AreEqual(3, invoked); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_CurrentThrows() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => new CurrentThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex)) + ); + + res.Messages.AssertEqual( + OnError<int>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_CurrentThrows_ResultSelector() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => new CurrentThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex), (x, _, y, __) => x + y) + ); + + res.Messages.AssertEqual( + OnError<int>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_GetEnumeratorThrows() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => new RogueEnumerable<int>(ex)) + ); + + res.Messages.AssertEqual( + OnError<int>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_MoveNextThrows() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => new MoveNextThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex)) + ); + + res.Messages.AssertEqual( + OnError<int>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Enumerable_MoveNextThrows_ResultSelector() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 2), + OnNext(340, 4), + OnNext(420, 3), + OnNext(510, 2), + OnCompleted<int>(600) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => new MoveNextThrowsEnumerable<int>(Enumerable.Repeat(x, x), ex), (x, _, y, __) => x + y) + ); + + res.Messages.AssertEqual( + OnError<int>(210, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 210) + ); + } + + [TestMethod] public void SelectMany_QueryOperator_ArgumentChecking() { ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance)); @@ -9585,12 +14221,294 @@ namespace ReactiveTests.Tests Subscribe(200, 221) ); } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, IObservable<int>>)null, DummyFunc<int, int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance, ((Func<int, int, int, int, int>)null))); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<int, int, int, int, int>.Instance).Subscribe(null)); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_Index() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(210, 4), + OnNext(220, 3), + OnNext(250, 5), + OnNext(270, 1), + OnCompleted<int>(290) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, i) => Observable.Range(10, i + 1), (x, i, y, j) => new { x, i, y, j }) + ); + + var witness = new { x = 0, i = 0, y = 0, j = 0 }; + + res.Messages.AssertEqual( + OnNext(210, new { x = 4, i = 0, y = 10, j = 0 }), + OnNext(220, new { x = 3, i = 1, y = 10, j = 0 }), + OnNext(220, new { x = 3, i = 1, y = 11, j = 1 }), + OnNext(250, new { x = 5, i = 2, y = 10, j = 0 }), + OnNext(250, new { x = 5, i = 2, y = 11, j = 1 }), + OnNext(250, new { x = 5, i = 2, y = 12, j = 2 }), + OnNext(270, new { x = 1, i = 3, y = 10, j = 0 }), + OnNext(270, new { x = 1, i = 3, y = 11, j = 1 }), + OnNext(270, new { x = 1, i = 3, y = 12, j = 2 }), + OnNext(270, new { x = 1, i = 3, y = 13, j = 3 }), + OnCompleted(290, witness) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 290) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_CompleteOuterFirst() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnCompleted<int>(224) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y) + ); + + res.Messages.AssertEqual( + OnNext(221, 40), + OnNext(222, 30), + OnNext(222, 41), + OnNext(223, 20), + OnNext(223, 31), + OnNext(223, 42), + OnNext(224, 50), + OnNext(224, 21), + OnNext(224, 32), + OnNext(224, 43), + OnNext(225, 51), + OnNext(226, 52), + OnNext(227, 53), + OnNext(228, 54), + OnCompleted<int>(228) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 224) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_CompleteInnerFirst() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnCompleted<int>(300) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y) + ); + + res.Messages.AssertEqual( + OnNext(221, 40), + OnNext(222, 30), + OnNext(222, 41), + OnNext(223, 20), + OnNext(223, 31), + OnNext(223, 42), + OnNext(224, 50), + OnNext(224, 21), + OnNext(224, 32), + OnNext(224, 43), + OnNext(225, 51), + OnNext(226, 52), + OnNext(227, 53), + OnNext(228, 54), + OnCompleted<int>(300) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 300) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_ErrorOuter() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnError<int>(224, ex) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y) + ); + + res.Messages.AssertEqual( + OnNext(221, 40), + OnNext(222, 30), + OnNext(222, 41), + OnNext(223, 20), + OnNext(223, 31), + OnNext(223, 42), + OnError<int>(224, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 224) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_ErrorInner() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnCompleted<int>(224) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => x == 2 + ? Observable.Throw<long>(ex, scheduler) + : Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), + (x, _, y, __) => x * 10 + (int)y) + ); + + res.Messages.AssertEqual( + OnNext(221, 40), + OnNext(222, 30), + OnNext(222, 41), + OnError<int>(223, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 223) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnCompleted<int>(224) + ); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => x * 10 + (int)y), + 223 + ); + + res.Messages.AssertEqual( + OnNext(221, 40), + OnNext(222, 30), + OnNext(222, 41) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 223) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_ThrowSelector() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnCompleted<int>(224) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Throw<IObservable<long>>(ex), (x, _, y, __) => x * 10 + (int)y) + ); + + res.Messages.AssertEqual( + OnError<int>(220, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 220) + ); + } + + [TestMethod] + public void SelectManyWithIndex_QueryOperator_ThrowResult() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(220, 4), + OnNext(221, 3), + OnNext(222, 2), + OnNext(223, 5), + OnCompleted<int>(224) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany((x, _) => Observable.Interval(TimeSpan.FromTicks(1), scheduler).Take(x), (x, _, y, __) => Throw<int>(ex)) + ); + + res.Messages.AssertEqual( + OnError<int>(221, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 221) + ); + } + [TestMethod] public void SelectMany_Triple_ArgumentChecking() { ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance)); - ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, (Func<int, IObservable<int>>)null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance)); ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, null, DummyFunc<IObservable<int>>.Instance)); ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, null)); ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance).Subscribe(null)); @@ -10329,6 +15247,787 @@ namespace ReactiveTests.Tests ); } + [TestMethod] + public void SelectManyWithIndex_Triple_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(null, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, (Func<int, int, IObservable<int>>)null, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, null, DummyFunc<IObservable<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, null)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, DummyFunc<int, int, IObservable<int>>.Instance, DummyFunc<Exception, IObservable<int>>.Instance, DummyFunc<IObservable<int>>.Instance).Subscribe(null)); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_Index() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var witness = new { x = 0, i = 0 }; + + var res = scheduler.Start(() => + xs.SelectMany( + (x, i) => Observable.Return(new { x, i }, scheduler), + ex => Observable.Throw(ex, scheduler, witness), + () => Observable.Empty(scheduler, witness) + ) + ); + + res.Messages.AssertEqual( + OnNext(301, new { x = 0, i = 0 }), + OnNext(302, new { x = 1, i = 1 }), + OnNext(303, new { x = 2, i = 2 }), + OnNext(304, new { x = 3, i = 3 }), + OnNext(305, new { x = 4, i = 4 }), + OnCompleted(306, witness) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_Identity() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Return(x, scheduler), + ex => Observable.Throw<int>(ex, scheduler), + () => Observable.Empty<int>(scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(301, 0), + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(305, 4), + OnCompleted<int>(306) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_InnersWithTiming1() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var ysn = scheduler.CreateColdObservable( + OnNext(10, 10), + OnNext(20, 11), + OnNext(30, 12), + OnCompleted<int>(40) + ); + + var yse = scheduler.CreateColdObservable( + OnNext(0, 99), + OnCompleted<int>(10) + ); + + var ysc = scheduler.CreateColdObservable( + OnNext(10, 42), + OnCompleted<int>(20) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => ysn, + ex => yse, + () => ysc + ) + ); + + res.Messages.AssertEqual( + OnNext(310, 10), + OnNext(311, 10), + OnNext(312, 10), + OnNext(313, 10), + OnNext(314, 10), + OnNext(315, 42), + OnNext(320, 11), + OnNext(321, 11), + OnNext(322, 11), + OnNext(323, 11), + OnNext(324, 11), + OnNext(330, 12), + OnNext(331, 12), + OnNext(332, 12), + OnNext(333, 12), + OnNext(334, 12), + OnCompleted<int>(344) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + + ysn.Subscriptions.AssertEqual( + Subscribe(300, 340), + Subscribe(301, 341), + Subscribe(302, 342), + Subscribe(303, 343), + Subscribe(304, 344) + ); + + yse.Subscriptions.AssertEqual( + ); + + ysc.Subscriptions.AssertEqual( + Subscribe(305, 325) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_InnersWithTiming2() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var ysn = scheduler.CreateColdObservable( + OnNext(10, 10), + OnNext(20, 11), + OnNext(30, 12), + OnCompleted<int>(40) + ); + + var yse = scheduler.CreateColdObservable( + OnNext(0, 99), + OnCompleted<int>(10) + ); + + var ysc = scheduler.CreateColdObservable( + OnNext(10, 42), + OnCompleted<int>(50) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => ysn, + ex => yse, + () => ysc + ) + ); + + res.Messages.AssertEqual( + OnNext(310, 10), + OnNext(311, 10), + OnNext(312, 10), + OnNext(313, 10), + OnNext(314, 10), + OnNext(315, 42), + OnNext(320, 11), + OnNext(321, 11), + OnNext(322, 11), + OnNext(323, 11), + OnNext(324, 11), + OnNext(330, 12), + OnNext(331, 12), + OnNext(332, 12), + OnNext(333, 12), + OnNext(334, 12), + OnCompleted<int>(355) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + + ysn.Subscriptions.AssertEqual( + Subscribe(300, 340), + Subscribe(301, 341), + Subscribe(302, 342), + Subscribe(303, 343), + Subscribe(304, 344) + ); + + yse.Subscriptions.AssertEqual( + ); + + ysc.Subscriptions.AssertEqual( + Subscribe(305, 355) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_InnersWithTiming3() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(400, 1), + OnNext(500, 2), + OnNext(600, 3), + OnNext(700, 4), + OnCompleted<int>(800) + ); + + var ysn = scheduler.CreateColdObservable( + OnNext(10, 10), + OnNext(20, 11), + OnNext(30, 12), + OnCompleted<int>(40) + ); + + var yse = scheduler.CreateColdObservable( + OnNext(0, 99), + OnCompleted<int>(10) + ); + + var ysc = scheduler.CreateColdObservable( + OnNext(10, 42), + OnCompleted<int>(100) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => ysn, + ex => yse, + () => ysc + ) + ); + + res.Messages.AssertEqual( + OnNext(310, 10), + OnNext(320, 11), + OnNext(330, 12), + OnNext(410, 10), + OnNext(420, 11), + OnNext(430, 12), + OnNext(510, 10), + OnNext(520, 11), + OnNext(530, 12), + OnNext(610, 10), + OnNext(620, 11), + OnNext(630, 12), + OnNext(710, 10), + OnNext(720, 11), + OnNext(730, 12), + OnNext(810, 42), + OnCompleted<int>(900) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 800) + ); + + ysn.Subscriptions.AssertEqual( + Subscribe(300, 340), + Subscribe(400, 440), + Subscribe(500, 540), + Subscribe(600, 640), + Subscribe(700, 740) + ); + + yse.Subscriptions.AssertEqual( + ); + + ysc.Subscriptions.AssertEqual( + Subscribe(800, 900) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_Error_Identity() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnError<int>(305, ex) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Return(x, scheduler), + ex1 => Observable.Throw<int>(ex1, scheduler), + () => Observable.Empty<int>(scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(301, 0), + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(305, 4), + OnError<int>(306, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_SelectMany() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex => Observable.Throw<int>(ex, scheduler), + () => Observable.Empty<int>(scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(304, 2), + OnNext(305, 4), + OnNext(305, 3), + OnNext(306, 4), + OnNext(306, 3), + OnNext(307, 4), + OnNext(308, 4), + OnCompleted<int>(308) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + + [TestMethod] + public void SelectManyWithIndex_Triple_Concat() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Return(x, scheduler), + ex => Observable.Throw<int>(ex, scheduler), + () => Observable.Range(1, 3, scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(301, 0), + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(305, 4), + OnNext(306, 1), + OnNext(307, 2), + OnNext(308, 3), + OnCompleted<int>(309) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_Catch() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Return(x, scheduler), + ex => Observable.Range(1, 3, scheduler), + () => Observable.Empty<int>(scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(301, 0), + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(305, 4), + OnCompleted<int>(306) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_Error_Catch() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnError<int>(305, new Exception()) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Return(x, scheduler), + ex => Observable.Range(1, 3, scheduler), + () => Observable.Empty<int>(scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(301, 0), + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(305, 4), + OnNext(306, 1), + OnNext(307, 2), + OnNext(308, 3), + OnCompleted<int>(309) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_All() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex => Observable.Repeat(0, 2, scheduler), + () => Observable.Repeat(-1, 2, scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(304, 2), + OnNext(305, 4), + OnNext(305, 3), + OnNext(306, -1), + OnNext(306, 4), + OnNext(306, 3), + OnNext(307, -1), + OnNext(307, 4), + OnNext(308, 4), + OnCompleted<int>(308) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_Error_All() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnError<int>(305, new Exception()) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex => Observable.Repeat(0, 2, scheduler), + () => Observable.Repeat(-1, 2, scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(304, 2), + OnNext(305, 4), + OnNext(305, 3), + OnNext(306, 0), + OnNext(306, 4), + OnNext(306, 3), + OnNext(307, 0), + OnNext(307, 4), + OnNext(308, 4), + OnCompleted<int>(308) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_All_Dispose() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex => Observable.Repeat(0, 2, scheduler), + () => Observable.Repeat(-1, 2, scheduler) + ), + 307 + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(304, 2), + OnNext(305, 4), + OnNext(305, 3), + OnNext(306, -1), + OnNext(306, 4), + OnNext(306, 3) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_All_Dispose_Before_First() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex => Observable.Repeat(0, 2, scheduler), + () => Observable.Repeat(-1, 2, scheduler) + ), + 304 + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 304) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_OnNextThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Throw<IObservable<int>>(ex), + ex1 => Observable.Repeat(0, 2, scheduler), + () => Observable.Repeat(-1, 2, scheduler) + ) + ); + + res.Messages.AssertEqual( + OnError<int>(300, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 300) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_OnErrorThrow() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnError<int>(305, new Exception()) + ); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex1 => Throw<IObservable<int>>(ex), + () => Observable.Repeat(-1, 2, scheduler) + ) + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(304, 2), + OnError<int>(305, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + + [TestMethod] + public void SelectManyWithIndex_Triple_OnCompletedThrow() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(300, 0), + OnNext(301, 1), + OnNext(302, 2), + OnNext(303, 3), + OnNext(304, 4), + OnCompleted<int>(305) + ); + + var ex = new Exception(); + + var res = scheduler.Start(() => + xs.SelectMany( + (x, _) => Observable.Repeat(x, x, scheduler), + ex1 => Observable.Repeat(0, 2, scheduler), + () => Throw<IObservable<int>>(ex) + ) + ); + + res.Messages.AssertEqual( + OnNext(302, 1), + OnNext(303, 2), + OnNext(304, 3), + OnNext(304, 2), + OnError<int>(305, ex) + ); + + xs.Subscriptions.AssertEqual( + Subscribe(200, 305) + ); + } + #if !NO_TPL [TestMethod] @@ -10339,7 +16038,7 @@ namespace ReactiveTests.Tests ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), x => t)); ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, default(Func<int, Task<int>>))); - ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), (x, ct) => t)); + ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), (int x, CancellationToken ct) => t)); ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(DummyObservable<int>.Instance, default(Func<int, CancellationToken, Task<int>>))); ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SelectMany(default(IObservable<int>), x => t, (x, y) => x)); @@ -11218,6 +16917,919 @@ namespace ReactiveTests.Tests Assert.AreEqual(0, m); } + [TestMethod] + public void SelectManyWithIndex_Task_ArgumentChecking() + { + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, Task<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, Task<int>>)null)); + + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int>(DummyFunc<int, int, CancellationToken, Task<int>>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, CancellationToken, Task<int>>)null)); + + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, Task<int>>.Instance, DummyFunc<int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, Task<int>>)null, DummyFunc<int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, Task<int>>.Instance, ((Func<int, int, int, int>)null))); + + ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).SelectMany<int, int, int>(DummyFunc<int, int, CancellationToken, Task<int>>.Instance, DummyFunc<int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany((Func<int, int, CancellationToken, Task<int>>)null, DummyFunc<int, int, int, int>.Instance)); + ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.SelectMany(DummyFunc<int, int, CancellationToken, Task<int>>.Instance, ((Func<int, int, int, int>)null))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_Index() + { + var res = Observable.Range(0, 10).SelectMany((int x, int i) => Task.Factory.StartNew(() => new { x, i })).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_Cancellation_Index() + { + var res = Observable.Range(0, 10).SelectMany((x, i, ctx) => Task.Factory.StartNew(() => new { x, i }, ctx)).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_ResultSelector_Index() + { + var res = Observable.Range(0, 10).SelectMany((int x, int i) => Task.Factory.StartNew(() => new { x, i }), (x, i, r) => r).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_ResultSelector_Cancellation_Index() + { + var res = Observable.Range(0, 10).SelectMany((x, i, ctx) => Task.Factory.StartNew(() => new { x, i }, ctx), (x, i, r) => r).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany((x, i) => new[] { new { x, i } }).SequenceEqual(res.OrderBy(v => v.i))); + } + + [TestMethod] + public void SelectManyWithIndex_Task1() + { + var res = Observable.Range(0, 10).SelectMany((int x, int _) => Task.Factory.StartNew(() => x + 1)).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { x + 1 }).SequenceEqual(res.OrderBy(x => x))); + } + + [TestMethod] + public void SelectManyWithIndex_Task2() + { + var res = Observable.Range(0, 10).SelectMany((x, _, ct) => Task.Factory.StartNew(() => x + 1, ct)).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { x + 1 }).SequenceEqual(res.OrderBy(x => x))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_TaskThrows() + { + var ex = new Exception(); + + var res = Observable.Range(0, 10).SelectMany((int x, int _) => Task.Factory.StartNew(() => + { + if (x > 5) + throw ex; + return x + 1; + })).ToEnumerable(); + + ReactiveAssert.Throws(ex, () => + { + foreach (var x in res) + ; + }); + } + + [TestMethod] + public void SelectManyWithIndex_Task_SelectorThrows() + { + var ex = new Exception(); + + var res = Observable.Range(0, 10).SelectMany((int x, int _) => + { + if (x > 5) + throw ex; + return Task.Factory.StartNew(() => x + 1); + }).ToEnumerable(); + + ReactiveAssert.Throws(ex, () => + { + foreach (var x in res) + ; + }); + } + + [TestMethod] + public void SelectManyWithIndex_Task_ResultSelector1() + { + var res = Observable.Range(0, 10).SelectMany((x, _) => Task.Factory.StartNew(() => x + 1), (x, _, y) => x + y).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { 2 * x + 1 }).SequenceEqual(res.OrderBy(x => x))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_ResultSelector2() + { + var res = Observable.Range(0, 10).SelectMany((x, _, ct) => Task.Factory.StartNew(() => x + 1, ct), (x, _, y) => x + y).ToEnumerable(); + Assert.IsTrue(Enumerable.Range(0, 10).SelectMany(x => new[] { 2 * x + 1 }).SequenceEqual(res.OrderBy(x => x))); + } + + [TestMethod] + public void SelectManyWithIndex_Task_ResultSelectorThrows() + { + var ex = new Exception(); + + var res = Observable.Range(0, 10).SelectMany((x, _) => Task.Factory.StartNew(() => x + 1), (x, _, y) => + { + if (x > 5) + throw ex; + return x + y; + }).ToEnumerable(); + + ReactiveAssert.Throws(ex, () => + { + foreach (var x in res) + ; + }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_RanToCompletion_Async() + { + var tcss = new TaskCompletionSource<int>[2]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(Observable.Range(0, 2), (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + tcss[0].SetResult(42); + tcss[1].SetResult(43); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_RanToCompletion_Sync() + { + var tcss = new TaskCompletionSource<int>[2]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + + tcss[0].SetResult(42); + tcss[1].SetResult(43); + + var res = Observable.SelectMany(Observable.Range(0, 2), (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42, 43 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Faulted_Async() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + var ex = new Exception(); + tcss[1].SetException(ex); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.AreSame(ex, err); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Faulted_Sync() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var ex = new Exception(); + tcss[1].SetException(ex); + + var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.AreSame(ex, err); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Canceled_Async() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + tcss[1].SetCanceled(); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Canceled_Sync() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + tcss[1].SetCanceled(); + + var res = Observable.SelectMany(Observable.Range(0, 3), (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_InnerCompleteBeforeOuter() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(xs, (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + xs.OnNext(2); + + tcss[0].SetResult(43); + tcss[2].SetResult(44); + + xs.OnCompleted(); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_OuterCompleteBeforeInner() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(xs, (int x, int _) => tcss[x].Task); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + xs.OnNext(2); + xs.OnCompleted(); + + tcss[0].SetResult(43); + tcss[2].SetResult(44); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_NeverInvoked() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(xs, (x, _, token) => + { + var tcs = tcss[x]; + + token.Register(() => tcs.SetCanceled()); + + return tcs.Task; + }); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + var d = res.Subscribe(lst.Add, () => done.Set()); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + xs.OnNext(2); + xs.OnCompleted(); + + tcss[0].SetResult(43); + tcss[2].SetResult(44); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42, 43, 44 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_Invoked() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var n = 0; + var m = 0; + + var res = Observable.SelectMany(xs, (x, _, token) => + { + var tcs = tcss[x]; + + token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; }); + + return tcs.Task; + }); + + var lst = new List<int>(); + + var done = false; + var d = res.Subscribe(lst.Add, () => done = true); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + + d.Dispose(); + + xs.OnNext(2); + xs.OnCompleted(); + + Assert.IsFalse(tcss[0].TrySetResult(43)); + tcss[2].SetResult(44); // never observed because xs.OnNext(2) happened after dispose + + lst.AssertEqual(new[] { 42 }); + Assert.IsFalse(done); + Assert.AreEqual(2, n); + Assert.AreEqual(1, m); // tcss[1] was already finished + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_AfterOuterError() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var n = 0; + var m = 0; + + var res = Observable.SelectMany(xs, (x, _, token) => + { + var tcs = tcss[x]; + + token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; }); + + return tcs.Task; + }); + + var lst = new List<int>(); + + var done = false; + var err = default(Exception); + res.Subscribe(lst.Add, ex_ => err = ex_, () => done = true); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + + var ex = new Exception(); + xs.OnError(ex); + + Assert.IsFalse(tcss[0].TrySetResult(43)); + tcss[2].SetResult(44); // no-op + + lst.AssertEqual(new[] { 42 }); + Assert.AreSame(ex, err); + Assert.IsFalse(done); + Assert.AreEqual(2, n); + Assert.AreEqual(1, m); // tcss[1] was already finished + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_Simple_Cancellation_AfterSelectorThrows() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[4]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + tcss[3] = new TaskCompletionSource<int>(); + + var n = 0; + var m = 0; + + var ex = new Exception(); + + var res = Observable.SelectMany(xs, (x, _, token) => + { + if (x == 2) + throw ex; + + var tcs = tcss[x]; + + token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; }); + + return tcs.Task; + }); + + var lst = new List<int>(); + + var done = false; + var evt = new ManualResetEvent(false); + var err = default(Exception); + res.Subscribe(lst.Add, ex_ => { err = ex_; evt.Set(); }, () => { done = true; evt.Set(); }); + + tcss[1].SetResult(43); + + xs.OnNext(0); + xs.OnNext(1); + + tcss[0].SetResult(42); + + xs.OnNext(2); // causes error + xs.OnCompleted(); + + evt.WaitOne(); + + Assert.IsFalse(done); + Assert.AreSame(ex, err); + Assert.AreEqual(2, n); + Assert.AreEqual(0, m); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_RanToCompletion_Async() + { + var tcss = new TaskCompletionSource<int>[2]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(Observable.Range(0, 2), (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + tcss[0].SetResult(42); + tcss[1].SetResult(43); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42 + 0, 43 + 1 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_RanToCompletion_Sync() + { + var tcss = new TaskCompletionSource<int>[2]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + + tcss[0].SetResult(42); + tcss[1].SetResult(43); + + var res = Observable.SelectMany(Observable.Range(0, 2), (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42 + 0, 43 + 1 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Faulted_Async() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + var ex = new Exception(); + tcss[1].SetException(ex); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.AreSame(ex, err); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Faulted_Sync() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var ex = new Exception(); + tcss[1].SetException(ex); + + var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.AreSame(ex, err); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Canceled_Async() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + tcss[1].SetCanceled(); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Canceled_Sync() + { + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + tcss[1].SetCanceled(); + + var res = Observable.SelectMany(Observable.Range(0, 3), (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var err = default(Exception); + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, ex_ => { err = ex_; done.Set(); }, () => done.Set()); + + done.WaitOne(); + + lst.AssertEqual(new int[0]); + Assert.IsTrue(err is TaskCanceledException && ((TaskCanceledException)err).Task == tcss[1].Task); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_InnerCompleteBeforeOuter() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(xs, (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + xs.OnNext(2); + + tcss[0].SetResult(43); + tcss[2].SetResult(44); + + xs.OnCompleted(); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_OuterCompleteBeforeInner() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(xs, (x, _) => tcss[x].Task, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + res.Subscribe(lst.Add, () => done.Set()); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + xs.OnNext(2); + xs.OnCompleted(); + + tcss[0].SetResult(43); + tcss[2].SetResult(44); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_NeverInvoked() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var res = Observable.SelectMany(xs, (x, _, token) => + { + var tcs = tcss[x]; + + token.Register(() => tcs.SetCanceled()); + + return tcs.Task; + }, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = new ManualResetEvent(false); + var d = res.Subscribe(lst.Add, () => done.Set()); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + xs.OnNext(2); + xs.OnCompleted(); + + tcss[0].SetResult(43); + tcss[2].SetResult(44); + + done.WaitOne(); + + lst.OrderBy(x => x).AssertEqual(new[] { 42 + 1, 43 + 0, 44 + 2 }); + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_Invoked() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var n = 0; + var m = 0; + + var res = Observable.SelectMany(xs, (x, _, token) => + { + var tcs = tcss[x]; + + token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; }); + + return tcs.Task; + }, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = false; + var d = res.Subscribe(lst.Add, () => done = true); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + + d.Dispose(); + + xs.OnNext(2); + xs.OnCompleted(); + + Assert.IsFalse(tcss[0].TrySetResult(43)); + tcss[2].SetResult(44); // never observed because xs.OnNext(2) happened after dispose + + lst.AssertEqual(new[] { 42 + 1 }); + Assert.IsFalse(done); + Assert.AreEqual(2, n); + Assert.AreEqual(1, m); // tcss[1] was already finished + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_AfterOuterError() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[3]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + + var n = 0; + var m = 0; + + var res = Observable.SelectMany(xs, (x, _, token) => + { + var tcs = tcss[x]; + + token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; }); + + return tcs.Task; + }, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = false; + var err = default(Exception); + res.Subscribe(lst.Add, ex_ => err = ex_, () => done = true); + + tcss[1].SetResult(42); + + xs.OnNext(0); + xs.OnNext(1); + + var ex = new Exception(); + xs.OnError(ex); + + Assert.IsFalse(tcss[0].TrySetResult(43)); + tcss[2].SetResult(44); // no-op + + lst.AssertEqual(new[] { 42 + 1 }); + Assert.AreSame(ex, err); + Assert.IsFalse(done); + Assert.AreEqual(2, n); + Assert.AreEqual(1, m); // tcss[1] was already finished + } + + [TestMethod] + public void SelectManyWithIndex_TaskWithCompletionSource_WithResultSelector_Cancellation_AfterSelectorThrows() + { + var xs = new Subject<int>(); + + var tcss = new TaskCompletionSource<int>[4]; + tcss[0] = new TaskCompletionSource<int>(); + tcss[1] = new TaskCompletionSource<int>(); + tcss[2] = new TaskCompletionSource<int>(); + tcss[3] = new TaskCompletionSource<int>(); + + var n = 0; + var m = 0; + + var ex = new Exception(); + + var res = Observable.SelectMany(xs, (x, _, token) => + { + if (x == 2) + throw ex; + + var tcs = tcss[x]; + + token.Register(() => { n++; m += tcs.TrySetCanceled() ? 1 : 0; }); + + return tcs.Task; + }, (x, _, y) => x + y); + + var lst = new List<int>(); + + var done = false; + var evt = new ManualResetEvent(false); + var err = default(Exception); + res.Subscribe(lst.Add, ex_ => { err = ex_; evt.Set(); }, () => { done = true; evt.Set(); }); + + tcss[1].SetResult(43); + + xs.OnNext(0); + xs.OnNext(1); + + tcss[0].SetResult(42); + + xs.OnNext(2); // causes error + xs.OnCompleted(); + + evt.WaitOne(); + + Assert.IsFalse(done); + Assert.AreSame(ex, err); + Assert.AreEqual(2, n); + Assert.AreEqual(0, m); + } + #endif #endregion |