1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
namespace System.Reactive
{
/// <summary>
/// Class to create an IObserver<T> instance from delegate-based implementations of the On* methods.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
public sealed class AnonymousObserver<T> : ObserverBase<T>
{
private readonly Action<T> _onNext;
private readonly Action<Exception> _onError;
private readonly Action _onCompleted;
/// <summary>
/// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
/// </summary>
/// <param name="onNext">Observer's OnNext action implementation.</param>
/// <param name="onError">Observer's OnError action implementation.</param>
/// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
/// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
_onNext = onNext;
_onError = onError;
_onCompleted = onCompleted;
}
/// <summary>
/// Creates an observer from the specified OnNext action.
/// </summary>
/// <param name="onNext">Observer's OnNext action implementation.</param>
/// <exception cref="ArgumentNullException"><paramref name="onNext"/> is null.</exception>
public AnonymousObserver(Action<T> onNext)
: this(onNext, Stubs.Throw, Stubs.Nop)
{
}
/// <summary>
/// Creates an observer from the specified OnNext and OnError actions.
/// </summary>
/// <param name="onNext">Observer's OnNext action implementation.</param>
/// <param name="onError">Observer's OnError action implementation.</param>
/// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
public AnonymousObserver(Action<T> onNext, Action<Exception> onError)
: this(onNext, onError, Stubs.Nop)
{
}
/// <summary>
/// Creates an observer from the specified OnNext and OnCompleted actions.
/// </summary>
/// <param name="onNext">Observer's OnNext action implementation.</param>
/// <param name="onCompleted">Observer's OnCompleted action implementation.</param>
/// <exception cref="ArgumentNullException"><paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
public AnonymousObserver(Action<T> onNext, Action onCompleted)
: this(onNext, Stubs.Throw, onCompleted)
{
}
/// <summary>
/// Calls the onNext action.
/// </summary>
/// <param name="value">Next element in the sequence.</param>
protected override void OnNextCore(T value)
{
_onNext(value);
}
/// <summary>
/// Calls the onError action.
/// </summary>
/// <param name="error">The error that has occurred.</param>
protected override void OnErrorCore(Exception error)
{
_onError(error);
}
/// <summary>
/// Calls the onCompleted action.
/// </summary>
protected override void OnCompletedCore()
{
_onCompleted();
}
internal IObserver<T> MakeSafe(IDisposable disposable)
{
return new AnonymousSafeObserver<T>(_onNext, _onError, _onCompleted, disposable);
}
}
}
|