// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Reactive.Concurrency; using System.Threading; namespace System.Reactive { /// /// Provides a set of static methods for creating observers. /// public static class Observer { /// /// Creates an observer from a notification callback. /// /// The type of the elements received by the observer. /// Action that handles a notification. /// The observer object that invokes the specified handler using a notification corresponding to each message it receives. /// is null. public static IObserver ToObserver(this Action> handler) { if (handler == null) throw new ArgumentNullException("handler"); return new AnonymousObserver( x => handler(Notification.CreateOnNext(x)), exception => handler(Notification.CreateOnError(exception)), () => handler(Notification.CreateOnCompleted()) ); } /// /// Creates a notification callback from an observer. /// /// The type of the elements received by the observer. /// Observer object. /// The action that forwards its input notification to the underlying observer. /// is null. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1704:IdentifiersShouldBeSpelledCorrectly", MessageId = "Notifier", Justification = "Backward compat.")] public static Action> ToNotifier(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return n => n.Accept(observer); } /// /// Creates an observer from the specified OnNext action. /// /// The type of the elements received by the observer. /// Observer's OnNext action implementation. /// The observer object implemented using the given actions. /// is null. public static IObserver Create(Action onNext) { if (onNext == null) throw new ArgumentNullException("onNext"); return new AnonymousObserver(onNext); } /// /// Creates an observer from the specified OnNext and OnError actions. /// /// The type of the elements received by the observer. /// Observer's OnNext action implementation. /// Observer's OnError action implementation. /// The observer object implemented using the given actions. /// or is null. public static IObserver Create(Action onNext, Action onError) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); return new AnonymousObserver(onNext, onError); } /// /// Creates an observer from the specified OnNext and OnCompleted actions. /// /// The type of the elements received by the observer. /// Observer's OnNext action implementation. /// Observer's OnCompleted action implementation. /// The observer object implemented using the given actions. /// or is null. public static IObserver Create(Action onNext, Action onCompleted) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return new AnonymousObserver(onNext, onCompleted); } /// /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions. /// /// The type of the elements received by the observer. /// Observer's OnNext action implementation. /// Observer's OnError action implementation. /// Observer's OnCompleted action implementation. /// The observer object implemented using the given actions. /// or or is null. public static IObserver Create(Action onNext, Action onError, Action onCompleted) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return new AnonymousObserver(onNext, onError, onCompleted); } /// /// Hides the identity of an observer. /// /// The type of the elements received by the source observer. /// An observer whose identity to hide. /// An observer that hides the identity of the specified observer. /// is null. public static IObserver AsObserver(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return new AnonymousObserver(observer.OnNext, observer.OnError, observer.OnCompleted); } /// /// Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. /// If a violation is detected, an InvalidOperationException is thrown from the offending observer method call. /// /// The type of the elements received by the source observer. /// The observer whose callback invocations should be checked for grammar violations. /// An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer. /// is null. public static IObserver Checked(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return new CheckedObserver(observer); } /// /// Synchronizes access to the observer such that its callback methods cannot be called concurrently from multiple threads. This overload is useful when coordinating access to an observer. /// Notice reentrant observer callbacks on the same thread are still possible. /// /// The type of the elements received by the source observer. /// The observer whose callbacks should be synchronized. /// An observer that delivers callbacks to the specified observer in a synchronized manner. /// is null. /// /// Because a Monitor is used to perform the synchronization, there's no protection against reentrancy from the same thread. /// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as /// well, use the overload, passing true for the second parameter. /// public static IObserver Synchronize(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return new SynchronizedObserver(observer, new object()); } /// /// Synchronizes access to the observer such that its callback methods cannot be called concurrently. This overload is useful when coordinating access to an observer. /// The parameter configures the type of lock used for synchronization. /// /// The type of the elements received by the source observer. /// The observer whose callbacks should be synchronized. /// If set to true, reentrant observer callbacks will be queued up and get delivered to the observer in a sequential manner. /// An observer that delivers callbacks to the specified observer in a synchronized manner. /// is null. /// /// When the parameter is set to false, behavior is identical to the overload which uses /// a Monitor for synchronization. When the parameter is set to true, an /// is used to queue up callbacks to the specified observer if a reentrant call is made. /// public static IObserver Synchronize(IObserver observer, bool preventReentrancy) { if (observer == null) throw new ArgumentNullException("observer"); if (preventReentrancy) return new AsyncLockObserver(observer, new AsyncLock()); else return new SynchronizedObserver(observer, new object()); } /// /// Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor-based lock. /// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object. /// Notice reentrant observer callbacks on the same thread are still possible. /// /// The type of the elements received by the source observer. /// The observer whose callbacks should be synchronized. /// Gate object to synchronize each observer call on. /// An observer that delivers callbacks to the specified observer in a synchronized manner. /// or is null. /// /// Because a Monitor is used to perform the synchronization, there's no protection against reentrancy from the same thread. /// Hence, overlapped observer callbacks are still possible, which is invalid behavior according to the observer grammar. In order to protect against this behavior as /// well, use the overload. /// public static IObserver Synchronize(IObserver observer, object gate) { if (observer == null) throw new ArgumentNullException("observer"); if (gate == null) throw new ArgumentNullException("gate"); return new SynchronizedObserver(observer, gate); } /// /// Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access. /// This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock. /// /// The type of the elements received by the source observer. /// The observer whose callbacks should be synchronized. /// Gate object to synchronize each observer call on. /// An observer that delivers callbacks to the specified observer in a synchronized manner. /// or is null. public static IObserver Synchronize(IObserver observer, AsyncLock asyncLock) { if (observer == null) throw new ArgumentNullException("observer"); if (asyncLock == null) throw new ArgumentNullException("asyncLock"); return new AsyncLockObserver(observer, asyncLock); } /// /// Schedules the invocation of observer methods on the given scheduler. /// /// The type of the elements received by the source observer. /// The observer to schedule messages for. /// Scheduler to schedule observer messages on. /// Observer whose messages are scheduled on the given scheduler. /// or is null. public static IObserver NotifyOn(this IObserver observer, IScheduler scheduler) { if (observer == null) throw new ArgumentNullException("observer"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new ObserveOnObserver(scheduler, observer, null); } #if !NO_SYNCCTX /// /// Schedules the invocation of observer methods on the given synchonization context. /// /// The type of the elements received by the source observer. /// The observer to schedule messages for. /// Synchonization context to schedule observer messages on. /// Observer whose messages are scheduled on the given synchonization context. /// or is null. public static IObserver NotifyOn(this IObserver observer, SynchronizationContext context) { if (observer == null) throw new ArgumentNullException("observer"); if (context == null) throw new ArgumentNullException("context"); return new ObserveOnObserver(new SynchronizationContextScheduler(context), observer, null); } #endif #if HAS_PROGRESS /// /// Converts an observer to a progress object. /// /// The type of the progress objects received by the source observer. /// The observer to convert. /// Progress object whose Report messages correspond to the observer's OnNext messages. /// is null. public static IProgress ToProgress(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return new AnonymousProgress(observer.OnNext); } /// /// Converts an observer to a progress object, using the specified scheduler to invoke the progress reporting method. /// /// The type of the progress objects received by the source observer. /// The observer to convert. /// Scheduler to report progress on. /// Progress object whose Report messages correspond to the observer's OnNext messages. /// or is null. public static IProgress ToProgress(this IObserver observer, IScheduler scheduler) { if (observer == null) throw new ArgumentNullException("observer"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new AnonymousProgress(new ObserveOnObserver(scheduler, observer, null).OnNext); } class AnonymousProgress : IProgress { private readonly Action _progress; public AnonymousProgress(Action progress) { _progress = progress; } public void Report(T value) { _progress(value); } } /// /// Converts a progress object to an observer. /// /// The type of the progress objects received by the progress reporter. /// The progress object to convert. /// Observer whose OnNext messages correspond to the progress object's Report messages. /// is null. public static IObserver ToObserver(this IProgress progress) { if (progress == null) throw new ArgumentNullException("progress"); return new AnonymousObserver(progress.Report); } #endif } }