using System.Reactive.Concurrency; using System.Threading; namespace System.Reactive { /// /// Provides a set of static methods for creating observers. /// public static class Observer { /// /// Creates an observer from a notification callback. /// /// Action that handles a notification. /// The observer object that invokes the specified handler using a notification corresponding to each message it receives. public static IObserver ToObserver(this Action> handler) { if (handler == null) throw new ArgumentNullException("handler"); return new AnonymousObserver( x => handler(Notification.CreateOnNext(x)), exception => handler(Notification.CreateOnError(exception)), () => handler(Notification.CreateOnCompleted())); } /// /// Creates a notification callback from an observer. /// /// Observer object. /// The action that forwards its input notification to the underlying observer. public static Action> ToNotifier(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return n => n.Accept(observer); } /// /// Creates an observer from the specified OnNext action. /// /// Observer's OnNext action implementation. /// The observer object implemented using the given actions. public static IObserver Create(Action onNext) { if (onNext == null) throw new ArgumentNullException("onNext"); return new AnonymousObserver(onNext); } /// /// Creates an observer from the specified OnNext and OnError actions. /// /// Observer's OnNext action implementation. /// Observer's OnError action implementation. /// The observer object implemented using the given actions. public static IObserver Create(Action onNext, Action onError) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); return new AnonymousObserver(onNext, onError); } /// /// Creates an observer from the specified OnNext and OnCompleted actions. /// /// Observer's OnNext action implementation. /// Observer's OnCompleted action implementation. /// The observer object implemented using the given actions. public static IObserver Create(Action onNext, Action onCompleted) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return new AnonymousObserver(onNext, onCompleted); } /// /// Creates an observer from the specified OnNext, OnError, and OnCompleted actions. /// /// Observer's OnNext action implementation. /// Observer's OnError action implementation. /// Observer's OnCompleted action implementation. /// The observer object implemented using the given actions. public static IObserver Create(Action onNext, Action onError, Action onCompleted) { if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return new AnonymousObserver(onNext, onError, onCompleted); } /// /// Hides the identity of an observer. /// /// An observer whose identity to hide. /// An observer that hides the identity of the specified observer. public static IObserver AsObserver(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return new AnonymousObserver(observer.OnNext, observer.OnError, observer.OnCompleted); } /// /// Synchronizes the observer messages. /// /// The observer to synchronize. /// Gate object to synchronize each observer call on. /// The observer whose messages are synchronized on the given gate object. public static IObserver Synchronize(IObserver observer, object gate) { if (observer == null) throw new ArgumentNullException("observer"); if (gate == null) throw new ArgumentNullException("gate"); return new SynchronizedObserver(observer, gate); } /// /// Synchronizes the observer messages. /// /// The observer to synchronize. /// The observer whose messages are synchronized. public static IObserver Synchronize(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return Synchronize(observer, new object()); } /// /// Schedules the observer messages on the given scheduler. /// /// The observer to schedule messages for. /// Scheduler to schedule observer messages on. /// Observer whose messages are scheduled on the given scheduler. public static IObserver NotifyOn(this IObserver observer, IScheduler scheduler) { if (observer == null) throw new ArgumentNullException("observer"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new ObserveOnObserver(scheduler, observer, null); } #if !NO_SYNCCTX /// /// Schedules the observer messages on the given synchonization context. /// /// The observer to schedule messages for. /// Synchonization context to schedule observer messages on. /// Observer whose messages are scheduled on the given synchonization context. public static IObserver NotifyOn(this IObserver observer, SynchronizationContext context) { if (observer == null) throw new ArgumentNullException("observer"); if (context == null) throw new ArgumentNullException("context"); return new ObserveOnObserver(new SynchronizationContextScheduler(context), observer, null); } #endif #if HAS_PROGRESS /// /// Converts an observer to a progress object. /// /// The observer to convert. /// Progress object whose Report messages correspond to the observer's OnNext messages. public static IProgress ToProgress(this IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); return new AnonymousProgress(observer.OnNext); } /// /// Converts an observer to a progress object. /// /// The observer to convert. /// Scheduler to report progress on. /// Progress object whose Report messages correspond to the observer's OnNext messages. public static IProgress ToProgress(this IObserver observer, IScheduler scheduler) { if (observer == null) throw new ArgumentNullException("observer"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return new AnonymousProgress(new ObserveOnObserver(scheduler, observer, null).OnNext); } class AnonymousProgress : IProgress { private readonly Action _progress; public AnonymousProgress(Action progress) { _progress = progress; } public void Report(T value) { _progress(value); } } /// /// Converts a progress object to an observer. /// /// The progress object to convert. /// Observer whose OnNext messages correspond to the progress object's Report messages. public static IObserver ToObserver(this IProgress progress) { if (progress == null) throw new ArgumentNullException("progress"); return Create(progress.Report); } #endif } }