// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; using ReactiveTests.Dummies; namespace ReactiveTests.Tests { [TestClass] public class ObservableSingleTest : ReactiveTest { #region + AsObservable + [TestMethod] public void AsObservable_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.AsObservable(null)); } [TestMethod] public void AsObservable_AsObservable() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ); var ys = xs.AsObservable().AsObservable(); Assert.AreNotSame(xs, ys); var res = scheduler.Start(() => ys ); res.Messages.AssertEqual( OnNext(220, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void AsObservable_Hides() { var xs = Observable.Empty(); var res = xs.AsObservable(); Assert.AreNotSame(xs, res); } [TestMethod] public void AsObservable_Never() { var scheduler = new TestScheduler(); var xs = Observable.Never(); var res = scheduler.Start(() => xs.AsObservable() ); res.Messages.AssertEqual( ); } [TestMethod] public void AsObservable_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var res = scheduler.Start(() => xs.AsObservable() ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void AsObservable_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var res = scheduler.Start(() => xs.AsObservable() ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void AsObservable_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.AsObservable() ); res.Messages.AssertEqual( OnNext(220, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void AsObservable_IsNotEager() { var scheduler = new TestScheduler(); bool subscribed = false; var xs = Observable.Create(obs => { subscribed = true; var disp = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ).Subscribe(obs); return disp.Dispose; }); xs.AsObservable(); Assert.IsFalse(subscribed); var res = scheduler.Start(() => xs.AsObservable() ); Assert.IsTrue(subscribed); } #endregion #region + Buffer + [TestMethod] public void Buffer_Single_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), 1)); ReactiveAssert.Throws(() => Observable.Buffer(someObservable, 0)); ReactiveAssert.Throws(() => Observable.Buffer(someObservable, -1)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), 1, 1)); ReactiveAssert.Throws(() => Observable.Buffer(someObservable, 1, 0)); ReactiveAssert.Throws(() => Observable.Buffer(someObservable, 0, 1)); } [TestMethod] public void Buffer_Count_PartialWindow() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.Buffer(5) ); res.Messages.AssertEqual( OnNext>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Buffer_Count_FullWindows() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.Buffer(2) ); res.Messages.AssertEqual( OnNext>(220, l => l.SequenceEqual(new[] { 2, 3 })), OnNext>(240, l => l.SequenceEqual(new[] { 4, 5 })), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Buffer_Count_FullAndPartialWindows() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.Buffer(3) ); res.Messages.AssertEqual( OnNext>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })), OnNext>(250, l => l.SequenceEqual(new int[] { 5 })), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Buffer_Count_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); var res = scheduler.Start(() => xs.Buffer(5) ); res.Messages.AssertEqual( OnError>(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Buffer_Count_Skip_Less() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.Buffer(3, 1) ); res.Messages.AssertEqual( OnNext>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })), OnNext>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })), OnNext>(250, l => l.SequenceEqual(new int[] { 4, 5 })), OnNext>(250, l => l.SequenceEqual(new int[] { 5 })), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Buffer_Count_Skip_More() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.Buffer(2, 3) ); res.Messages.AssertEqual( OnNext>(220, l => l.SequenceEqual(new int[] { 2, 3 })), OnNext>(250, l => l.SequenceEqual(new int[] { 5 })), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void BufferWithCount_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), 1, 1)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, 0, 1)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, 1, 0)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), 1)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, 0)); } [TestMethod] public void BufferWithCount_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(280, "2,3,4"), OnNext(350, "4,5,6"), OnNext(420, "6,7,8"), OnNext(600, "8,9"), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithCount_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370 ); res.Messages.AssertEqual( OnNext(280, "2,3,4"), OnNext(350, "4,5,6") ); xs.Subscriptions.AssertEqual( Subscribe(200, 370) ); } [TestMethod] public void BufferWithCount_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnError(600, ex) ); var res = scheduler.Start(() => xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(280, "2,3,4"), OnNext(350, "4,5,6"), OnNext(420, "6,7,8"), OnError(600, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithCount_Default() { Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6); Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5); } #endregion #region + Dematerialize + [TestMethod] public void Dematerialize_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Dematerialize(null)); } [TestMethod] public void Dematerialize_Range1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, Notification.CreateOnNext(41)), OnNext(210, Notification.CreateOnNext(42)), OnNext(220, Notification.CreateOnNext(43)), OnCompleted>(250) ); var res = scheduler.Start(() => xs.Dematerialize() ); res.Messages.AssertEqual( OnNext(210, 42), OnNext(220, 43), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Dematerialize_Range2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, Notification.CreateOnNext(41)), OnNext(210, Notification.CreateOnNext(42)), OnNext(220, Notification.CreateOnNext(43)), OnNext(230, Notification.CreateOnCompleted()) ); var res = scheduler.Start(() => xs.Dematerialize() ); res.Messages.AssertEqual( OnNext(210, 42), OnNext(220, 43), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Dematerialize_Error1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, Notification.CreateOnNext(41)), OnNext(210, Notification.CreateOnNext(42)), OnNext(220, Notification.CreateOnNext(43)), OnError>(230, ex) ); var res = scheduler.Start(() => xs.Dematerialize() ); res.Messages.AssertEqual( OnNext(210, 42), OnNext(220, 43), OnError(230, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Dematerialize_Error2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, Notification.CreateOnNext(41)), OnNext(210, Notification.CreateOnNext(42)), OnNext(220, Notification.CreateOnNext(43)), OnNext(230, Notification.CreateOnError(ex)) ); var res = scheduler.Start(() => xs.Dematerialize() ); res.Messages.AssertEqual( OnNext(210, 42), OnNext(220, 43), OnError(230, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Materialize_Dematerialize_Never() { var scheduler = new TestScheduler(); var xs = Observable.Never(); var res = scheduler.Start(() => xs.Materialize().Dematerialize() ); res.Messages.AssertEqual( ); } [TestMethod] public void Materialize_Dematerialize_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var res = scheduler.Start(() => xs.Materialize().Dematerialize() ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Materialize_Dematerialize_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Materialize().Dematerialize() ); res.Messages.AssertEqual( OnNext(210, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Materialize_Dematerialize_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var res = scheduler.Start(() => xs.Materialize().Dematerialize() ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } #endregion #region + DistinctUntilChanged + [TestMethod] public void DistinctUntilChanged_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(null)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(null, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(someObservable, null)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(null, _ => _)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(someObservable, null)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(someObservable, _ => _, null)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(null, _ => _, EqualityComparer.Default)); ReactiveAssert.Throws(() => Observable.DistinctUntilChanged(someObservable, null, EqualityComparer.Default)); } [TestMethod] public void DistinctUntilChanged_Never() { var scheduler = new TestScheduler(); var xs = Observable.Never(); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( ); } [TestMethod] public void DistinctUntilChanged_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( OnNext(220, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_AllChanges() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_AllSame() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 2), OnNext(230, 2), OnNext(240, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( OnNext(210, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_SomeChanges() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), //* OnNext(215, 3), //* OnNext(220, 3), OnNext(225, 2), //* OnNext(230, 2), OnNext(230, 1), //* OnNext(240, 2), //* OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged() ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(215, 3), OnNext(225, 2), OnNext(230, 1), OnNext(240, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_Comparer_AllEqual() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged(new FuncComparer((x, y) => true)) ); res.Messages.AssertEqual( OnNext(210, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_Comparer_AllDifferent() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 2), OnNext(230, 2), OnNext(240, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged(new FuncComparer((x, y) => false)) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 2), OnNext(230, 2), OnNext(240, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void DistinctUntilChanged_KeySelector_Div2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), //* OnNext(220, 4), OnNext(230, 3), //* OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged(x => x % 2) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(230, 3), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } class FuncComparer : IEqualityComparer { private Func _equals; public FuncComparer(Func equals) { _equals = equals; } public bool Equals(T x, T y) { return _equals(x, y); } public int GetHashCode(T obj) { return 0; } } [TestMethod] public void DistinctUntilChanged_KeySelectorThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged(new Func(x => { throw ex; })) ); res.Messages.AssertEqual( OnError(210, ex) ); } [TestMethod] public void DistinctUntilChanged_ComparerThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged(new ThrowComparer(ex)) ); res.Messages.AssertEqual( OnNext(210, 2), OnError(220, ex) ); } class ThrowComparer : IEqualityComparer { private Exception _ex; public ThrowComparer(Exception ex) { _ex = ex; } public bool Equals(T x, T y) { throw _ex; } public int GetHashCode(T obj) { return 0; } } [TestMethod] public void DistinctUntilChanged_KeySelector_Comparer() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), // * key = 1 % 3 = 1 OnNext(220, 8), // key = 4 % 3 = 1 same OnNext(230, 2), // key = 1 % 3 = 1 same OnNext(240, 5), // * key = 2 % 3 = 2 OnCompleted(250) ); var res = scheduler.Start(() => xs.DistinctUntilChanged(x => x / 2, new FuncComparer((x, y) => x % 3 == y % 3)) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } #endregion #region + Do + [TestMethod] public void Do_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Do(someObservable, (Action)null)); ReactiveAssert.Throws(() => Observable.Do(null, _ => { })); ReactiveAssert.Throws(() => Observable.Do(someObservable, x => { }, (Action)null)); ReactiveAssert.Throws(() => Observable.Do(someObservable, (Action)null, () => { })); ReactiveAssert.Throws(() => Observable.Do(null, x => { }, () => { })); ReactiveAssert.Throws(() => Observable.Do(someObservable, x => { }, (Action)null)); ReactiveAssert.Throws(() => Observable.Do(someObservable, (Action)null, (Exception _) => { })); ReactiveAssert.Throws(() => Observable.Do(null, x => { }, (Exception _) => { })); ReactiveAssert.Throws(() => Observable.Do(someObservable, x => { }, (Exception _) => { }, null)); ReactiveAssert.Throws(() => Observable.Do(someObservable, x => { }, (Action)null, () => { })); ReactiveAssert.Throws(() => Observable.Do(someObservable, (Action)null, (Exception _) => { }, () => { })); ReactiveAssert.Throws(() => Observable.Do(null, x => { }, (Exception _) => { }, () => { })); ReactiveAssert.Throws(() => Observable.Do(null, Observer.Create(i => { }))); ReactiveAssert.Throws(() => Observable.Do(someObservable, default(IObserver))); } [TestMethod] public void Do_ShouldSeeAllValues() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); int i = 0; int sum = 2 + 3 + 4 + 5; var res = scheduler.Start(() => xs.Do(x => { i++; sum -= x; }) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_PlainAction() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); int i = 0; var res = scheduler.Start(() => xs.Do(_ => { i++; }) ); Assert.AreEqual(4, i); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_NextCompleted() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool completed = false; var res = scheduler.Start(() => xs.Do(x => { i++; sum -= x; }, () => { completed = true; }) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsTrue(completed); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_NextCompleted_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable(); int i = 0; bool completed = false; var res = scheduler.Start(() => xs.Do(x => { i++; }, () => { completed = true; }) ); Assert.AreEqual(0, i); Assert.IsFalse(completed); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Do_NextError() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool sawError = false; var res = scheduler.Start(() => xs.Do(x => { i++; sum -= x; }, e => { sawError = e == ex; }) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsTrue(sawError); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_NextErrorNot() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool sawError = false; var res = scheduler.Start(() => xs.Do(x => { i++; sum -= x; }, _ => { sawError = true; }) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsFalse(sawError); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_NextErrorCompleted() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool sawError = false; bool hasCompleted = false; var res = scheduler.Start(() => xs.Do(x => { i++; sum -= x; }, e => { sawError = true; }, () => { hasCompleted = true; }) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsFalse(sawError); Assert.IsTrue(hasCompleted); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_NextErrorCompletedError() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool sawError = false; bool hasCompleted = false; var res = scheduler.Start(() => xs.Do(x => { i++; sum -= x; }, e => { sawError = e == ex; }, () => { hasCompleted = true; }) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsTrue(sawError); Assert.IsFalse(hasCompleted); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_NextErrorCompletedNever() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable(); int i = 0; bool sawError = false; bool hasCompleted = false; var res = scheduler.Start(() => xs.Do(x => { i++; }, e => { sawError = true; }, () => { hasCompleted = true; }) ); Assert.AreEqual(0, i); Assert.IsFalse(sawError); Assert.IsFalse(hasCompleted); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Do_Observer_SomeDataWithError() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool sawError = false; bool hasCompleted = false; var res = scheduler.Start(() => xs.Do(Observer.Create(x => { i++; sum -= x; }, e => { sawError = e == ex; }, () => { hasCompleted = true; })) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsTrue(sawError); Assert.IsFalse(hasCompleted); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do_Observer_SomeDataWithoutError() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); int i = 0; int sum = 2 + 3 + 4 + 5; bool sawError = false; bool hasCompleted = false; var res = scheduler.Start(() => xs.Do(Observer.Create(x => { i++; sum -= x; }, e => { sawError = true; }, () => { hasCompleted = true; })) ); Assert.AreEqual(4, i); Assert.AreEqual(0, sum); Assert.IsFalse(sawError); Assert.IsTrue(hasCompleted); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do1422_Next_NextThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(x => { throw ex; }) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_NextCompleted_NextThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(x => { throw ex; }, () => { }) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_NextCompleted_CompletedThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(x => { }, () => { throw ex; }) ); res.Messages.AssertEqual( OnNext(210, 2), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do1422_NextError_NextThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(x => { throw ex; }, _ => { }) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_NextError_ErrorThrows() { var scheduler = new TestScheduler(); var ex1 = new Exception(); var ex2 = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(210, ex1) ); var res = scheduler.Start(() => xs.Do(x => { }, _ => { throw ex2; }) ); res.Messages.AssertEqual( OnError(210, ex2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_NextErrorCompleted_NextThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(x => { throw ex; }, _ => { }, () => { }) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_NextErrorCompleted_ErrorThrows() { var scheduler = new TestScheduler(); var ex1 = new Exception(); var ex2 = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(210, ex1) ); var res = scheduler.Start(() => xs.Do(x => { }, _ => { throw ex2; }, () => { }) ); res.Messages.AssertEqual( OnError(210, ex2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_NextErrorCompleted_CompletedThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(x => { }, _ => { }, () => { throw ex; }) ); res.Messages.AssertEqual( OnNext(210, 2), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Do1422_Observer_NextThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(Observer.Create(x => { throw ex; }, _ => { }, () => { })) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_Observer_ErrorThrows() { var scheduler = new TestScheduler(); var ex1 = new Exception(); var ex2 = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(210, ex1) ); var res = scheduler.Start(() => xs.Do(Observer.Create(x => { }, _ => { throw ex2; }, () => { })) ); res.Messages.AssertEqual( OnError(210, ex2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Do1422_Observer_CompletedThrows() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Do(Observer.Create(x => { }, _ => { }, () => { throw ex; })) ); res.Messages.AssertEqual( OnNext(210, 2), OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } #endregion #region + Finally + [TestMethod] public void Finally_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Finally(null, () => { })); ReactiveAssert.Throws(() => Observable.Finally(someObservable, null)); } [TestMethod] public void Finally_Never() { var scheduler = new TestScheduler(); bool invoked = false; var res = scheduler.Start(() => Observable.Never().Finally(() => { invoked = true; }) ); res.Messages.AssertEqual( ); Assert.IsTrue(invoked); // due to unsubscribe; see 1356 } [TestMethod] public void Finally_OnlyCalledOnce_Never() { int invokeCount = 0; var someObservable = Observable.Never().Finally(() => { invokeCount++; }); var d = someObservable.Subscribe(); d.Dispose(); d.Dispose(); Assert.AreEqual(1, invokeCount); } [TestMethod] public void Finally_OnlyCalledOnce_Empty() { var invokeCount = 0; var someObservable = Observable.Empty().Finally(() => { invokeCount++; }); var d = someObservable.Subscribe(); d.Dispose(); d.Dispose(); Assert.AreEqual(1, invokeCount); } [TestMethod] public void Finally_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var invoked = false; var res = scheduler.Start(() => xs.Finally(() => { invoked = true; }) ); Assert.IsTrue(invoked); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Finally_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var invoked = false; var res = scheduler.Start(() => xs.Finally(() => { invoked = true; }) ); Assert.IsTrue(invoked); res.Messages.AssertEqual( OnNext(210, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Finally_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var invoked = false; var res = scheduler.Start(() => xs.Finally(() => { invoked = true; }) ); Assert.IsTrue(invoked); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } #endregion #region + IgnoreElements + [TestMethod] public void IgnoreElements_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.IgnoreElements(null)); } [TestMethod] public void IgnoreElements_IgnoreElements() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnCompleted(300) ); var res = scheduler.Start(() => xs.IgnoreElements().IgnoreElements() ); res.Messages.AssertEqual( OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void IgnoreElements_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.IgnoreElements() ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void IgnoreElements_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(610) ); var res = scheduler.Start(() => xs.IgnoreElements() ); res.Messages.AssertEqual( OnCompleted(610) ); xs.Subscriptions.AssertEqual( Subscribe(200, 610) ); } [TestMethod] public void IgnoreElements_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(610, ex) ); var res = scheduler.Start(() => xs.IgnoreElements() ); res.Messages.AssertEqual( OnError(610, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 610) ); } #endregion #region + Materialize + [TestMethod] public void Materialize_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Materialize(null)); } [TestMethod] public void Materialize_Never() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Never().Materialize() ); res.Messages.AssertEqual( ); } [TestMethod] public void Materialize_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var res = scheduler.Start(() => xs.Materialize() ); res.Messages.AssertEqual( OnNext(250, Notification.CreateOnCompleted()), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Materialize_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Materialize() ); res.Messages.AssertEqual( OnNext(210, Notification.CreateOnNext(2)), OnNext(250, Notification.CreateOnCompleted()), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Materialize_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var res = scheduler.Start(() => xs.Materialize() ); res.Messages.AssertEqual( OnNext(250, Notification.CreateOnError(ex)), OnCompleted>(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } #endregion #region - Repeat - [TestMethod] public void Repeat_Observable_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Repeat(null)); ReactiveAssert.Throws(() => DummyObservable.Instance.Repeat().Subscribe(null)); } [TestMethod] public void Repeat_Observable_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnCompleted(250) ); var res = scheduler.Start(() => xs.Repeat() ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnNext(550, 1), OnNext(600, 2), OnNext(650, 3), OnNext(800, 1), OnNext(850, 2), OnNext(900, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450), Subscribe(450, 700), Subscribe(700, 950), Subscribe(950, 1000) ); } [TestMethod] public void Repeat_Observable_Infinite() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3) ); var res = scheduler.Start(() => xs.Repeat() ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Repeat_Observable_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnError(250, ex) ); var res = scheduler.Start(() => xs.Repeat() ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnError(450, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); } [TestMethod] public void Repeat_Observable_Throws() { var scheduler1 = new TestScheduler(); var xs = Observable.Return(1, scheduler1).Repeat(); xs.Subscribe(x => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler1.Start()); var scheduler2 = new TestScheduler(); var ys = Observable.Throw(new Exception(), scheduler2).Repeat(); ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler2.Start()); var scheduler3 = new TestScheduler(); var zs = Observable.Return(1, scheduler3).Repeat(); var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); scheduler3.ScheduleAbsolute(210, () => d.Dispose()); scheduler3.Start(); var xss = Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).Repeat(); ReactiveAssert.Throws(() => xss.Subscribe()); } [TestMethod] public void Repeat_Observable_Default_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Repeat((IObservable)null)); ReactiveAssert.Throws(() => DummyObservable.Instance.Repeat().Subscribe(null)); } [TestMethod] public void Repeat_Observable_RepeatCount_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Repeat(null, 0)); ReactiveAssert.Throws(() => DummyObservable.Instance.Repeat(-1)); ReactiveAssert.Throws(() => DummyObservable.Instance.Repeat(0).Subscribe(null)); } [TestMethod] public void Repeat_Observable_RepeatCount_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(5, 1), OnNext(10, 2), OnNext(15, 3), OnCompleted(20) ); var res = scheduler.Start(() => xs.Repeat(3) ); res.Messages.AssertEqual( OnNext(205, 1), OnNext(210, 2), OnNext(215, 3), OnNext(225, 1), OnNext(230, 2), OnNext(235, 3), OnNext(245, 1), OnNext(250, 2), OnNext(255, 3), OnCompleted(260) ); xs.Subscriptions.AssertEqual( Subscribe(200, 220), Subscribe(220, 240), Subscribe(240, 260) ); } [TestMethod] public void Repeat_Observable_RepeatCount_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(5, 1), OnNext(10, 2), OnNext(15, 3), OnCompleted(20) ); var res = scheduler.Start(() => xs.Repeat(3), 231 ); res.Messages.AssertEqual( OnNext(205, 1), OnNext(210, 2), OnNext(215, 3), OnNext(225, 1), OnNext(230, 2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 220), Subscribe(220, 231) ); } [TestMethod] public void Repeat_Observable_RepeatCount_Infinite() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3) ); var res = scheduler.Start(() => xs.Repeat(3) ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Repeat_Observable_RepeatCount_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnError(250, ex) ); var res = scheduler.Start(() => xs.Repeat(3) ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnError(450, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); } [TestMethod] public void Repeat_Observable_RepeatCount_Throws() { var scheduler1 = new TestScheduler(); var xs = Observable.Return(1, scheduler1).Repeat(3); xs.Subscribe(x => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler1.Start()); var scheduler2 = new TestScheduler(); var ys = Observable.Throw(new Exception(), scheduler2).Repeat(3); ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler2.Start()); var scheduler3 = new TestScheduler(); var zs = Observable.Return(1, scheduler3).Repeat(100); var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); scheduler3.ScheduleAbsolute(10, () => d.Dispose()); scheduler3.Start(); var xss = Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).Repeat(3); ReactiveAssert.Throws(() => xss.Subscribe()); } [TestMethod] public void Repeat_Observable_RepeatCount_Default_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Repeat(default(IObservable), 0)); ReactiveAssert.Throws(() => DummyObservable.Instance.Repeat(-1)); ReactiveAssert.Throws(() => DummyObservable.Instance.Repeat(0).Subscribe(null)); } #endregion #region - Retry - [TestMethod] public void Retry_Observable_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Retry(null)); ReactiveAssert.Throws(() => DummyObservable.Instance.Retry().Subscribe(null)); } [TestMethod] public void Retry_Observable_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnCompleted(250) ); var res = scheduler.Start(() => xs.Retry() ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnCompleted(450) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); } [TestMethod] public void Retry_Observable_Infinite() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3) ); var res = scheduler.Start(() => xs.Retry() ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Retry_Observable_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnError(250, ex) ); var res = scheduler.Start(() => xs.Retry(), 1100 ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnNext(550, 1), OnNext(600, 2), OnNext(650, 3), OnNext(800, 1), OnNext(850, 2), OnNext(900, 3), OnNext(1050, 1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450), Subscribe(450, 700), Subscribe(700, 950), Subscribe(950, 1100) ); } [TestMethod] public void Retry_Observable_Throws1() { var scheduler1 = new TestScheduler(); var xs = Observable.Return(1, scheduler1).Retry(); xs.Subscribe(x => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler1.Start()); } [TestMethod] public void Retry_Observable_Throws2() { var scheduler2 = new TestScheduler(); var ys = Observable.Throw(new Exception(), scheduler2).Retry(); var d = ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }); scheduler2.ScheduleAbsolute(210, () => d.Dispose()); scheduler2.Start(); } [TestMethod] public void Retry_Observable_Throws3() { var scheduler3 = new TestScheduler(); var zs = Observable.Return(1, scheduler3).Retry(); zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler3.Start()); } /* * BREAKING CHANGE v2.0 > v1.x - The code below will loop endlessly, trying to repeat the failing subscription, * whose exception is propagated through OnError starting from v2.0. * [TestMethod] public void Retry_Observable_Throws4() { var xss = Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).Retry(); ReactiveAssert.Throws(() => xss.Subscribe()); } */ [TestMethod] public void Retry_Observable_Default_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Retry((IObservable)null)); ReactiveAssert.Throws(() => DummyObservable.Instance.Retry().Subscribe(null)); } [TestMethod] public void Retry_Observable_RetryCount_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Retry(null, 0)); ReactiveAssert.Throws(() => DummyObservable.Instance.Retry(-1)); ReactiveAssert.Throws(() => DummyObservable.Instance.Retry(0).Subscribe(null)); } [TestMethod] public void Retry_Observable_RetryCount_Basic() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateColdObservable( OnNext(5, 1), OnNext(10, 2), OnNext(15, 3), OnError(20, ex) ); var res = scheduler.Start(() => xs.Retry(3) ); res.Messages.AssertEqual( OnNext(205, 1), OnNext(210, 2), OnNext(215, 3), OnNext(225, 1), OnNext(230, 2), OnNext(235, 3), OnNext(245, 1), OnNext(250, 2), OnNext(255, 3), OnError(260, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 220), Subscribe(220, 240), Subscribe(240, 260) ); } [TestMethod] public void Retry_Observable_RetryCount_Dispose() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(5, 1), OnNext(10, 2), OnNext(15, 3), OnError(20, new Exception()) ); var res = scheduler.Start(() => xs.Retry(3), 231 ); res.Messages.AssertEqual( OnNext(205, 1), OnNext(210, 2), OnNext(215, 3), OnNext(225, 1), OnNext(230, 2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 220), Subscribe(220, 231) ); } [TestMethod] public void Retry_Observable_RetryCount_Infinite() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3) ); var res = scheduler.Start(() => xs.Retry(3) ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Retry_Observable_RetryCount_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnCompleted(250) ); var res = scheduler.Start(() => xs.Retry(3) ); res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnCompleted(450) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); } [TestMethod] public void Retry_Observable_RetryCount_Throws() { var scheduler1 = new TestScheduler(); var xs = Observable.Return(1, scheduler1).Retry(3); xs.Subscribe(x => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler1.Start()); var scheduler2 = new TestScheduler(); var ys = Observable.Throw(new Exception(), scheduler2).Retry(100); var d = ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }); scheduler2.ScheduleAbsolute(10, () => d.Dispose()); scheduler2.Start(); var scheduler3 = new TestScheduler(); var zs = Observable.Return(1, scheduler3).Retry(100); zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler3.Start()); var xss = Observable.Create(new Func, Action>(o => { throw new InvalidOperationException(); })).Retry(3); ReactiveAssert.Throws(() => xss.Subscribe()); } [TestMethod] public void Retry_Observable_RetryCount_Default_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Retry(default(IObservable), 0)); ReactiveAssert.Throws(() => DummyObservable.Instance.Retry(-1)); ReactiveAssert.Throws(() => DummyObservable.Instance.Retry(0).Subscribe(null)); } [TestMethod] public void Retry_Observable_RetryCount_Default() { Observable.Range(1, 3).Retry(3).AssertEqual(Observable.Range(1, 3).Retry(3)); } #endregion #region + Scan + [TestMethod] public void Scan_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Scan(null, (_, __) => 0)); ReactiveAssert.Throws(() => Observable.Scan(someObservable, null)); ReactiveAssert.Throws(() => Observable.Scan(null, 0, (_, __) => 0)); ReactiveAssert.Throws(() => Observable.Scan(someObservable, 0, null)); } [TestMethod] public void Scan_Seed_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable(); var seed = 42; var res = scheduler.Start(() => xs.Scan(seed, (acc, x) => acc + x) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Scan_Seed_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var seed = 42; var res = scheduler.Start(() => xs.Scan(seed, (acc, x) => acc + x) ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_Seed_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ); var seed = 42; var res = scheduler.Start(() => xs.Scan(seed, (acc, x) => acc + x) ); res.Messages.AssertEqual( OnNext(220, seed + 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_Seed_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var seed = 42; var res = scheduler.Start(() => xs.Scan(seed, (acc, x) => acc + x) ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_Seed_SomeData() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var seed = 1; var res = scheduler.Start(() => xs.Scan(seed, (acc, x) => acc + x) ); res.Messages.AssertEqual( OnNext(210, seed + 2), OnNext(220, seed + 2 + 3), OnNext(230, seed + 2 + 3 + 4), OnNext(240, seed + 2 + 3 + 4 + 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_Seed_AccumulatorThrows() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var ex = new Exception(); var seed = 1; var res = scheduler.Start(() => xs.Scan(seed, (acc, x) => { if (x == 4) throw ex; return acc + x; }) ); res.Messages.AssertEqual( OnNext(210, seed + 2), OnNext(220, seed + 2 + 3), OnError(230, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Scan_NoSeed_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable(); var res = scheduler.Start(() => xs.Scan((acc, x) => acc + x) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Scan_NoSeed_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(250) ); var res = scheduler.Start(() => xs.Scan((acc, x) => acc + x) ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_NoSeed_Return() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.Scan((acc, x) => acc + x) ); res.Messages.AssertEqual( OnNext(220, 2), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_NoSeed_Throw() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(250, ex) ); var res = scheduler.Start(() => xs.Scan((acc, x) => acc + x) ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_NoSeed_SomeData() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var res = scheduler.Start(() => xs.Scan((acc, x) => acc + x) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 2 + 3), OnNext(230, 2 + 3 + 4), OnNext(240, 2 + 3 + 4 + 5), OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Scan_NoSeed_AccumulatorThrows() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnCompleted(250) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Scan((acc, x) => { if (x == 4) throw ex; return acc + x; }) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(220, 2 + 3), OnError(230, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } #endregion #region + SkipLast + [TestMethod] public void SkipLast_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.SkipLast(null, 0)); ReactiveAssert.Throws(() => Observable.SkipLast(DummyObservable.Instance, -1)); } [TestMethod] public void SkipLast_Zero_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.SkipLast(0) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void SkipLast_Zero_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.SkipLast(0) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void SkipLast_Zero_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.SkipLast(0) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void SkipLast_One_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.SkipLast(1) ); res.Messages.AssertEqual( OnNext(250, 2), OnNext(270, 3), OnNext(310, 4), OnNext(360, 5), OnNext(380, 6), OnNext(410, 7), OnNext(590, 8), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void SkipLast_One_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.SkipLast(1) ); res.Messages.AssertEqual( OnNext(250, 2), OnNext(270, 3), OnNext(310, 4), OnNext(360, 5), OnNext(380, 6), OnNext(410, 7), OnNext(590, 8), OnError(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void SkipLast_One_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.SkipLast(1) ); res.Messages.AssertEqual( OnNext(250, 2), OnNext(270, 3), OnNext(310, 4), OnNext(360, 5), OnNext(380, 6), OnNext(410, 7), OnNext(590, 8) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void SkipLast_Three_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.SkipLast(3) ); res.Messages.AssertEqual( OnNext(310, 2), OnNext(360, 3), OnNext(380, 4), OnNext(410, 5), OnNext(590, 6), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void SkipLast_Three_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.SkipLast(3) ); res.Messages.AssertEqual( OnNext(310, 2), OnNext(360, 3), OnNext(380, 4), OnNext(410, 5), OnNext(590, 6), OnError(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void SkipLast_Three_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.SkipLast(3) ); res.Messages.AssertEqual( OnNext(310, 2), OnNext(360, 3), OnNext(380, 4), OnNext(410, 5), OnNext(590, 6) ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } #endregion #region StartWith [TestMethod] public void StartWith_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.StartWith(default(IObservable), 1)); ReactiveAssert.Throws(() => Observable.StartWith(someObservable, default(int[]))); ReactiveAssert.Throws(() => Observable.StartWith(default(IObservable), scheduler, 1)); ReactiveAssert.Throws(() => Observable.StartWith(someObservable, default(IScheduler), 1)); ReactiveAssert.Throws(() => Observable.StartWith(someObservable, scheduler, default(int[]))); } [TestMethod] public void StartWith() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnCompleted(250) ); var res = scheduler.Start(() => xs.StartWith(1) ); res.Messages.AssertEqual( OnNext(200, 1), OnNext(220, 2), OnCompleted(250) ); } [TestMethod] public void StartWith_Scheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 4), OnCompleted(250) ); var res = scheduler.Start(() => xs.StartWith(scheduler, 1, 2, 3) ); res.Messages.AssertEqual( OnNext(201, 1), OnNext(202, 2), OnNext(203, 3), OnNext(220, 4), OnCompleted(250) ); } #endregion #region + TakeLast + [TestMethod] public void TakeLast_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.TakeLast(null, 0)); ReactiveAssert.Throws(() => Observable.TakeLast(DummyObservable.Instance, -1)); ReactiveAssert.Throws(() => Observable.TakeLast(null, 0, Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLast(DummyObservable.Instance, -1, Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLast(DummyObservable.Instance, 0, default(IScheduler))); } [TestMethod] public void TakeLast_Zero_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLast(0) ); res.Messages.AssertEqual( OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLast_Zero_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLast(0) ); res.Messages.AssertEqual( OnError(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLast_Zero_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLast(0) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLast_One_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLast(1) ); res.Messages.AssertEqual( OnNext(650, 9), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLast_One_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLast(1) ); res.Messages.AssertEqual( OnError(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLast_One_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLast(1) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLast_Three_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLast(3) ); res.Messages.AssertEqual( OnNext(650, 7), OnNext(650, 8), OnNext(650, 9), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLast_Three_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLast(3) ); res.Messages.AssertEqual( OnError(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLast_Three_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLast(3) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLast_LongRunning_Regular() { var res = Observable.Range(0, 100, Scheduler.Default).TakeLast(10, NewThreadScheduler.Default); var lst = new List(); res.ForEach(lst.Add); Assert.IsTrue(Enumerable.Range(90, 10).SequenceEqual(lst)); } #endregion #region + TakeLastBuffer + [TestMethod] public void TakeLastBuffer_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.TakeLastBuffer(null, 0)); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(DummyObservable.Instance, -1)); } [TestMethod] public void TakeLastBuffer_Zero_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLastBuffer(0) ); res.Messages.AssertEqual( OnNext>(650, lst => lst.Count == 0), OnCompleted>(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLastBuffer_Zero_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(0) ); res.Messages.AssertEqual( OnError>(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLastBuffer_Zero_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLastBuffer(0) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLastBuffer_One_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLastBuffer(1) ); res.Messages.AssertEqual( OnNext>(650, lst => lst.SequenceEqual(new[] { 9 })), OnCompleted>(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLastBuffer_One_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(1) ); res.Messages.AssertEqual( OnError>(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLastBuffer_One_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLastBuffer(1) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLastBuffer_Three_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLastBuffer(3) ); res.Messages.AssertEqual( OnNext>(650, lst => lst.SequenceEqual(new[] { 7, 8, 9 })), OnCompleted>(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLastBuffer_Three_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(3) ); res.Messages.AssertEqual( OnError>(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [TestMethod] public void TakeLastBuffer_Three_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLastBuffer(3) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } #endregion #region + Window + [TestMethod] public void WindowWithCount_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Window(default(IObservable), 1, 1)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, 0, 1)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, 1, 0)); ReactiveAssert.Throws(() => Observable.Window(default(IObservable), 1)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, 0)); } [TestMethod] public void WindowWithCount_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(280, "1 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(350, "2 6"), OnNext(380, "2 7"), OnNext(420, "2 8"), OnNext(420, "3 8"), OnNext(470, "3 9"), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithCount_InnerTimings() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = default(IObservable>); var outerSubscription = default(IDisposable); var innerSubscriptions = new List(); var windows = new List>(); var observers = new List>(); scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2)); scheduler.ScheduleAbsolute(Subscribed, () => { outerSubscription = res.Subscribe( window => { var result = scheduler.CreateObserver(); windows.Add(window); observers.Add(result); scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result))); } ); }); scheduler.Start(); Assert.AreEqual(5, observers.Count); observers[0].Messages.AssertEqual( OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnCompleted(280) ); observers[1].Messages.AssertEqual( OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnCompleted(350) ); observers[2].Messages.AssertEqual( OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnCompleted(420) ); observers[3].Messages.AssertEqual( OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); observers[4].Messages.AssertEqual( OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithCount_InnerTimings_DisposeOuter() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = default(IObservable>); var outerSubscription = default(IDisposable); var innerSubscriptions = new List(); var windows = new List>(); var observers = new List>(); var windowCreationTimes = new List(); scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2)); scheduler.ScheduleAbsolute(Subscribed, () => { outerSubscription = res.Subscribe( window => { windowCreationTimes.Add(scheduler.Clock); var result = scheduler.CreateObserver(); windows.Add(window); observers.Add(result); scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result))); } ); }); scheduler.ScheduleAbsolute(400, () => { outerSubscription.Dispose(); }); scheduler.Start(); Assert.IsTrue(windowCreationTimes.Last() < 400); Assert.AreEqual(4, observers.Count); observers[0].Messages.AssertEqual( OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnCompleted(280) ); observers[1].Messages.AssertEqual( OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnCompleted(350) ); observers[2].Messages.AssertEqual( OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnCompleted(420) ); observers[3].Messages.AssertEqual( OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithCount_InnerTimings_DisposeOuterAndInners() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = default(IObservable>); var outerSubscription = default(IDisposable); var innerSubscriptions = new List(); var windows = new List>(); var observers = new List>(); var windowCreationTimes = new List(); scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2)); scheduler.ScheduleAbsolute(Subscribed, () => { outerSubscription = res.Subscribe( window => { windowCreationTimes.Add(scheduler.Clock); var result = scheduler.CreateObserver(); windows.Add(window); observers.Add(result); scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result))); } ); }); scheduler.ScheduleAbsolute(400, () => { outerSubscription.Dispose(); foreach (var d in innerSubscriptions) d.Dispose(); }); scheduler.Start(); Assert.IsTrue(windowCreationTimes.Last() < 400); Assert.AreEqual(4, observers.Count); observers[0].Messages.AssertEqual( OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnCompleted(280) ); observers[1].Messages.AssertEqual( OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnCompleted(350) ); observers[2].Messages.AssertEqual( OnNext(350, 6), OnNext(380, 7) ); observers[3].Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void WindowWithCount_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370 ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(280, "1 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(350, "2 6") ); xs.Subscriptions.AssertEqual( Subscribe(200, 370) ); } [TestMethod] public void WindowWithCount_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnError(600, ex) ); var res = scheduler.Start(() => xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(280, "1 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(350, "2 6"), OnNext(380, "2 7"), OnNext(420, "2 8"), OnNext(420, "3 8"), OnNext(470, "3 9"), OnError(600, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithCount_Default() { Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable()); Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable()); Observable.Range(1, 10).Window(3, 2).Skip(1).First().SequenceEqual(new[] { 3, 4, 5 }.ToObservable()); } #endregion } }