// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Concurrency;
using System.Threading;
namespace System.Reactive
{
///
/// Provides a set of static methods for creating observers.
///
public static class Observer
{
///
/// Creates an observer from a notification callback.
///
/// The type of the elements received by the observer.
/// Action that handles a notification.
/// The observer object that invokes the specified handler using a notification corresponding to each message it receives.
/// is null.
public static IObserver ToObserver(this Action> handler)
{
if (handler == null)
throw new ArgumentNullException("handler");
return new AnonymousObserver(
x => handler(Notification.CreateOnNext(x)),
exception => handler(Notification.CreateOnError(exception)),
() => handler(Notification.CreateOnCompleted())
);
}
///
/// Creates a notification callback from an observer.
///
/// The type of the elements received by the observer.
/// Observer object.
/// The action that forwards its input notification to the underlying observer.
/// is null.
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1704:IdentifiersShouldBeSpelledCorrectly", MessageId = "Notifier", Justification = "Backward compat.")]
public static Action> ToNotifier(this IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return n => n.Accept(observer);
}
///
/// Creates an observer from the specified OnNext action.
///
/// The type of the elements received by the observer.
/// Observer's OnNext action implementation.
/// The observer object implemented using the given actions.
/// is null.
public static IObserver Create(Action onNext)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
return new AnonymousObserver(onNext);
}
///
/// Creates an observer from the specified OnNext and OnError actions.
///
/// The type of the elements received by the observer.
/// Observer's OnNext action implementation.
/// Observer's OnError action implementation.
/// The observer object implemented using the given actions.
/// or is null.
public static IObserver Create(Action onNext, Action onError)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
return new AnonymousObserver(onNext, onError);
}
///
/// Creates an observer from the specified OnNext and OnCompleted actions.
///
/// The type of the elements received by the observer.
/// Observer's OnNext action implementation.
/// Observer's OnCompleted action implementation.
/// The observer object implemented using the given actions.
/// or is null.
public static IObserver Create(Action onNext, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return new AnonymousObserver(onNext, onCompleted);
}
///
/// Creates an observer from the specified OnNext, OnError, and OnCompleted actions.
///
/// The type of the elements received by the observer.
/// Observer's OnNext action implementation.
/// Observer's OnError action implementation.
/// Observer's OnCompleted action implementation.
/// The observer object implemented using the given actions.
/// or or is null.
public static IObserver Create(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
return new AnonymousObserver(onNext, onError, onCompleted);
}
///
/// Hides the identity of an observer.
///
/// The type of the elements received by the source observer.
/// An observer whose identity to hide.
/// An observer that hides the identity of the specified observer.
/// is null.
public static IObserver AsObserver(this IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new AnonymousObserver(observer.OnNext, observer.OnError, observer.OnCompleted);
}
///
/// Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
/// If a violation is detected, an InvalidOperationException is thrown from the offending observer method call.
///
/// The type of the elements received by the source observer.
/// The observer whose callback invocations should be checked for grammar violations.
/// An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
/// is null.
public static IObserver Checked(this IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new CheckedObserver(observer);
}
///
/// Synchronizes access to the observer such that its callback methods cannot be called concurrently from multiple threads. This overload is useful when coordinating access to an observer.
/// Notice reentrant observer callbacks on the same thread are still possible.
///
/// The type of the elements received by the source observer.
/// The observer whose callbacks should be synchronized.
/// An observer that delivers callbacks to the specified observer in a synchronized manner.
/// is null.
///
/// Because a Monitor is used to perform the synchronization, there's no protection against reentrancy from the same thread.
/// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as
/// well, use the overload, passing true for the second parameter.
///
public static IObserver Synchronize(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new SynchronizedObserver(observer, new object());
}
///
/// Synchronizes access to the observer such that its callback methods cannot be called concurrently. This overload is useful when coordinating access to an observer.
/// The parameter configures the type of lock used for synchronization.
///
/// The type of the elements received by the source observer.
/// The observer whose callbacks should be synchronized.
/// If set to true, reentrant observer callbacks will be queued up and get delivered to the observer in a sequential manner.
/// An observer that delivers callbacks to the specified observer in a synchronized manner.
/// is null.
///
/// When the parameter is set to false, behavior is identical to the overload which uses
/// a Monitor for synchronization. When the parameter is set to true, an
/// is used to queue up callbacks to the specified observer if a reentrant call is made.
///
public static IObserver Synchronize(IObserver observer, bool preventReentrancy)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (preventReentrancy)
return new AsyncLockObserver(observer, new AsyncLock());
else
return new SynchronizedObserver(observer, new object());
}
///
/// Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor-based lock.
/// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object.
/// Notice reentrant observer callbacks on the same thread are still possible.
///
/// The type of the elements received by the source observer.
/// The observer whose callbacks should be synchronized.
/// Gate object to synchronize each observer call on.
/// An observer that delivers callbacks to the specified observer in a synchronized manner.
/// or is null.
///
/// Because a Monitor is used to perform the synchronization, there's no protection against reentrancy from the same thread.
/// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as
/// well, use the overload.
///
public static IObserver Synchronize(IObserver observer, object gate)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (gate == null)
throw new ArgumentNullException("gate");
return new SynchronizedObserver(observer, gate);
}
///
/// Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access.
/// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.
///
/// The type of the elements received by the source observer.
/// The observer whose callbacks should be synchronized.
/// Gate object to synchronize each observer call on.
/// An observer that delivers callbacks to the specified observer in a synchronized manner.
/// or is null.
public static IObserver Synchronize(IObserver observer, AsyncLock asyncLock)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (asyncLock == null)
throw new ArgumentNullException("asyncLock");
return new AsyncLockObserver(observer, asyncLock);
}
///
/// Schedules the invocation of observer methods on the given scheduler.
///
/// The type of the elements received by the source observer.
/// The observer to schedule messages for.
/// Scheduler to schedule observer messages on.
/// Observer whose messages are scheduled on the given scheduler.
/// or is null.
public static IObserver NotifyOn(this IObserver observer, IScheduler scheduler)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new ObserveOnObserver(scheduler, observer, null);
}
#if !NO_SYNCCTX
///
/// Schedules the invocation of observer methods on the given synchonization context.
///
/// The type of the elements received by the source observer.
/// The observer to schedule messages for.
/// Synchonization context to schedule observer messages on.
/// Observer whose messages are scheduled on the given synchonization context.
/// or is null.
public static IObserver NotifyOn(this IObserver observer, SynchronizationContext context)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (context == null)
throw new ArgumentNullException("context");
return new ObserveOnObserver(new SynchronizationContextScheduler(context), observer, null);
}
#endif
#if HAS_PROGRESS
///
/// Converts an observer to a progress object.
///
/// The type of the progress objects received by the source observer.
/// The observer to convert.
/// Progress object whose Report messages correspond to the observer's OnNext messages.
/// is null.
public static IProgress ToProgress(this IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return new AnonymousProgress(observer.OnNext);
}
///
/// Converts an observer to a progress object, using the specified scheduler to invoke the progress reporting method.
///
/// The type of the progress objects received by the source observer.
/// The observer to convert.
/// Scheduler to report progress on.
/// Progress object whose Report messages correspond to the observer's OnNext messages.
/// or is null.
public static IProgress ToProgress(this IObserver observer, IScheduler scheduler)
{
if (observer == null)
throw new ArgumentNullException("observer");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new AnonymousProgress(new ObserveOnObserver(scheduler, observer, null).OnNext);
}
class AnonymousProgress : IProgress
{
private readonly Action _progress;
public AnonymousProgress(Action progress)
{
_progress = progress;
}
public void Report(T value)
{
_progress(value);
}
}
///
/// Converts a progress object to an observer.
///
/// The type of the progress objects received by the progress reporter.
/// The progress object to convert.
/// Observer whose OnNext messages correspond to the progress object's Report messages.
/// is null.
public static IObserver ToObserver(this IProgress progress)
{
if (progress == null)
throw new ArgumentNullException("progress");
return new AnonymousObserver(progress.Report);
}
#endif
}
}