// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Concurrency; namespace System.Reactive.Linq { public static partial class Observable { #region + AsObservable + /// /// Hides the identity of an observable sequence. /// /// The type of the elements in the source sequence. /// An observable sequence whose identity to hide. /// An observable sequence that hides the identity of the source sequence. /// is null. public static IObservable AsObservable(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.AsObservable(source); } #endregion #region + Buffer + /// /// Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on element count information. /// /// The type of the elements in the source sequence, and in the lists in the result sequence. /// Source sequence to produce buffers over. /// Length of each buffer. /// An observable sequence of buffers. /// is null. /// is less than or equal to zero. public static IObservable> Buffer(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count"); return s_impl.Buffer(source, count); } /// /// Projects each element of an observable sequence into zero or more buffers which are produced based on element count information. /// /// The type of the elements in the source sequence, and in the lists in the result sequence. /// Source sequence to produce buffers over. /// Length of each buffer. /// Number of elements to skip between creation of consecutive buffers. /// An observable sequence of buffers. /// is null. /// or is less than or equal to zero. public static IObservable> Buffer(this IObservable source, int count, int skip) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count"); if (skip <= 0) throw new ArgumentOutOfRangeException("skip"); return s_impl.Buffer(source, count, skip); } #endregion #region + Dematerialize + /// /// Dematerializes the explicit notification values of an observable sequence as implicit notifications. /// /// The type of the elements materialized in the source sequence notification objects. /// An observable sequence containing explicit notification values which have to be turned into implicit notifications. /// An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. /// is null. public static IObservable Dematerialize(this IObservable> source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.Dematerialize(source); } #endregion #region + DistinctUntilChanged + /// /// Returns an observable sequence that contains only distinct contiguous elements. /// /// The type of the elements in the source sequence. /// An observable sequence to retain distinct contiguous elements for. /// An observable sequence only containing the distinct contiguous elements from the source sequence. /// is null. public static IObservable DistinctUntilChanged(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.DistinctUntilChanged(source); } /// /// Returns an observable sequence that contains only distinct contiguous elements according to the comparer. /// /// The type of the elements in the source sequence. /// An observable sequence to retain distinct contiguous elements for. /// Equality comparer for source elements. /// An observable sequence only containing the distinct contiguous elements from the source sequence. /// or is null. public static IObservable DistinctUntilChanged(this IObservable source, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException("source"); if (comparer == null) throw new ArgumentNullException("comparer"); return s_impl.DistinctUntilChanged(source, comparer); } /// /// Returns an observable sequence that contains only distinct contiguous elements according to the keySelector. /// /// The type of the elements in the source sequence. /// The type of the discriminator key computed for each element in the source sequence. /// An observable sequence to retain distinct contiguous elements for, based on a computed key value. /// A function to compute the comparison key for each element. /// An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. /// or is null. public static IObservable DistinctUntilChanged(this IObservable source, Func keySelector) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); return s_impl.DistinctUntilChanged(source, keySelector); } /// /// Returns an observable sequence that contains only distinct contiguous elements according to the keySelector and the comparer. /// /// The type of the elements in the source sequence. /// The type of the discriminator key computed for each element in the source sequence. /// An observable sequence to retain distinct contiguous elements for, based on a computed key value. /// A function to compute the comparison key for each element. /// Equality comparer for computed key values. /// An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. /// or or is null. public static IObservable DistinctUntilChanged(this IObservable source, Func keySelector, IEqualityComparer comparer) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return s_impl.DistinctUntilChanged(source, keySelector, comparer); } #endregion #region + Do + /// /// Invokes an action for each element in the observable sequence, and propagates all observer messages through the result sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the observable sequence. /// The source sequence with the side-effecting behavior applied. /// or is null. public static IObservable Do(this IObservable source, Action onNext) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); return s_impl.Do(source, onNext); } /// /// Invokes an action for each element in the observable sequence and invokes an action upon graceful termination of the observable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IObservable Do(this IObservable source, Action onNext, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return s_impl.Do(source, onNext, onCompleted); } /// /// Invokes an action for each element in the observable sequence and invokes an action upon exceptional termination of the observable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IObservable Do(this IObservable source, Action onNext, Action onError) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); return s_impl.Do(source, onNext, onError); } /// /// Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// The source sequence with the side-effecting behavior applied. /// or or or is null. public static IObservable Do(this IObservable source, Action onNext, Action onError, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); return s_impl.Do(source, onNext, onError, onCompleted); } /// /// Invokes the observer's methods for each message in the source sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Observer whose methods to invoke as part of the source sequence's observation. /// The source sequence with the side-effecting behavior applied. /// or is null. public static IObservable Do(this IObservable source, IObserver observer) { if (source == null) throw new ArgumentNullException("source"); if (observer == null) throw new ArgumentNullException("observer"); return s_impl.Do(source, observer); } #endregion #region + Finally + /// /// Invokes a specified action after the source observable sequence terminates gracefully or exceptionally. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke after the source observable sequence terminates. /// Source sequence with the action-invoking termination behavior applied. /// or is null. public static IObservable Finally(this IObservable source, Action finallyAction) { if (source == null) throw new ArgumentNullException("source"); if (finallyAction == null) throw new ArgumentNullException("finallyAction"); return s_impl.Finally(source, finallyAction); } #endregion #region + IgnoreElements + /// /// Ignores all elements in an observable sequence leaving only the termination messages. /// /// The type of the elements in the source sequence. /// Source sequence. /// An empty observable sequence that signals termination, successful or exceptional, of the source sequence. /// is null. public static IObservable IgnoreElements(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.IgnoreElements(source); } #endregion #region + Materialize + /// /// Materializes the implicit notifications of an observable sequence as explicit notification values. /// /// The type of the elements in the source sequence. /// An observable sequence to get notification values for. /// An observable sequence containing the materialized notification values from the source sequence. /// is null. public static IObservable> Materialize(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.Materialize(source); } #endregion #region - Repeat - /// /// Repeats the observable sequence indefinitely. /// /// The type of the elements in the source sequence. /// Observable sequence to repeat. /// The observable sequence producing the elements of the given sequence repeatedly and sequentially. /// is null. public static IObservable Repeat(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.Repeat(source); } /// /// Repeats the observable sequence a specified number of times. /// /// The type of the elements in the source sequence. /// Observable sequence to repeat. /// Number of times to repeat the sequence. /// The observable sequence producing the elements of the given sequence repeatedly. /// is null. /// is less than zero. public static IObservable Repeat(this IObservable source, int repeatCount) { if (source == null) throw new ArgumentNullException("source"); if (repeatCount < 0) throw new ArgumentOutOfRangeException("repeatCount"); return s_impl.Repeat(source, repeatCount); } #endregion #region - Retry - /// /// Repeats the source observable sequence until it successfully terminates. /// /// The type of the elements in the source sequence. /// Observable sequence to repeat until it successfully terminates. /// An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully. /// is null. public static IObservable Retry(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); return s_impl.Retry(source); } /// /// Repeats the source observable sequence the specified number of times or until it successfully terminates. /// /// The type of the elements in the source sequence. /// Observable sequence to repeat until it successfully terminates. /// Number of times to repeat the sequence. /// An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully. /// is null. /// is less than zero. public static IObservable Retry(this IObservable source, int retryCount) { if (source == null) throw new ArgumentNullException("source"); if (retryCount < 0) throw new ArgumentOutOfRangeException("retryCount"); return s_impl.Retry(source, retryCount); } #endregion #region + Scan + /// /// Applies an accumulator function over an observable sequence and returns each intermediate result. The specified seed value is used as the initial accumulator value. /// For aggregation behavior with no intermediate results, see . /// /// The type of the elements in the source sequence. /// The type of the result of the aggregation. /// An observable sequence to accumulate over. /// The initial accumulator value. /// An accumulator function to be invoked on each element. /// An observable sequence containing the accumulated values. /// or is null. public static IObservable Scan(this IObservable source, TAccumulate seed, Func accumulator) { if (source == null) throw new ArgumentNullException("source"); if (accumulator == null) throw new ArgumentNullException("accumulator"); return s_impl.Scan(source, seed, accumulator); } /// /// Applies an accumulator function over an observable sequence and returns each intermediate result. /// For aggregation behavior with no intermediate results, see . /// /// The type of the elements in the source sequence and the result of the aggregation. /// An observable sequence to accumulate over. /// An accumulator function to be invoked on each element. /// An observable sequence containing the accumulated values. /// or is null. public static IObservable Scan(this IObservable source, Func accumulator) { if (source == null) throw new ArgumentNullException("source"); if (accumulator == null) throw new ArgumentNullException("accumulator"); return s_impl.Scan(source, accumulator); } #endregion #region + SkipLast + /// /// Bypasses a specified number of elements at the end of an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence. /// Number of elements to bypass at the end of the source sequence. /// An observable sequence containing the source sequence elements except for the bypassed ones at the end. /// is null. /// is less than zero. /// /// This operator accumulates a queue with a length enough to store the first elements. As more elements are /// received, elements are taken from the front of the queue and produced on the result sequence. This causes elements to be delayed. /// public static IObservable SkipLast(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); return s_impl.SkipLast(source, count); } #endregion #region - StartWith - /// /// Prepends a sequence of values to an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence to prepend values to. /// Values to prepend to the specified sequence. /// The source sequence prepended with the specified values. /// or is null. public static IObservable StartWith(this IObservable source, params TSource[] values) { if (source == null) throw new ArgumentNullException("source"); if (values == null) throw new ArgumentNullException("values"); return s_impl.StartWith(source, values); } /// /// Prepends a sequence of values to an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence to prepend values to. /// Values to prepend to the specified sequence. /// The source sequence prepended with the specified values. /// or is null. public static IObservable StartWith(this IObservable source, IEnumerable values) { if (source == null) throw new ArgumentNullException("source"); if (values == null) throw new ArgumentNullException("values"); TSource[] valueArray = values as TSource[]; if (valueArray == null) { List valueList = new List(values); valueArray = valueList.ToArray(); } return s_impl.StartWith(source, valueArray); } /// /// Prepends a sequence of values to an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence to prepend values to. /// Scheduler to emit the prepended values on. /// Values to prepend to the specified sequence. /// The source sequence prepended with the specified values. /// or or is null. public static IObservable StartWith(this IObservable source, IScheduler scheduler, params TSource[] values) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); if (values == null) throw new ArgumentNullException("values"); return s_impl.StartWith(source, scheduler, values); } /// /// Prepends a sequence of values to an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence to prepend values to. /// Scheduler to emit the prepended values on. /// Values to prepend to the specified sequence. /// The source sequence prepended with the specified values. /// or or is null. public static IObservable StartWith(this IObservable source, IScheduler scheduler, IEnumerable values) { if (source == null) throw new ArgumentNullException("source"); if (scheduler == null) throw new ArgumentNullException("scheduler"); if (values == null) throw new ArgumentNullException("values"); TSource[] valueArray = values as TSource[]; if (valueArray == null) { List valueList = new List(values); valueArray = valueList.ToArray(); } return s_impl.StartWith(source, scheduler, valueArray); } #endregion #region + TakeLast + /// /// Returns a specified number of contiguous elements from the end of an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence. /// Number of elements to take from the end of the source sequence. /// An observable sequence containing the specified number of elements from the end of the source sequence. /// is null. /// is less than zero. /// /// This operator accumulates a buffer with a length enough to store elements elements. Upon completion of /// the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed. /// public static IObservable TakeLast(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); return s_impl.TakeLast(source, count); } /// /// Returns a specified number of contiguous elements from the end of an observable sequence, using the specified scheduler to drain the queue. /// /// The type of the elements in the source sequence. /// Source sequence. /// Number of elements to take from the end of the source sequence. /// Scheduler used to drain the queue upon completion of the source sequence. /// An observable sequence containing the specified number of elements from the end of the source sequence. /// or is null. /// is less than zero. /// /// This operator accumulates a buffer with a length enough to store elements elements. Upon completion of /// the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed. /// public static IObservable TakeLast(this IObservable source, int count, IScheduler scheduler) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); if (scheduler == null) throw new ArgumentNullException("scheduler"); return s_impl.TakeLast(source, count, scheduler); } #endregion #region + TakeLastBuffer + /// /// Returns a list with the specified number of contiguous elements from the end of an observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence. /// Number of elements to take from the end of the source sequence. /// An observable sequence containing a single list with the specified number of elements from the end of the source sequence. /// is null. /// is less than zero. /// /// This operator accumulates a buffer with a length enough to store elements. Upon completion of the /// source sequence, this buffer is produced on the result sequence. /// public static IObservable> TakeLastBuffer(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count < 0) throw new ArgumentOutOfRangeException("count"); return s_impl.TakeLastBuffer(source, count); } #endregion #region + Window + /// /// Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on element count information. /// /// The type of the elements in the source sequence, and in the windows in the result sequence. /// Source sequence to produce windows over. /// Length of each window. /// An observable sequence of windows. /// is null. /// is less than or equal to zero. public static IObservable> Window(this IObservable source, int count) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count"); return s_impl.Window(source, count); } /// /// Projects each element of an observable sequence into zero or more windows which are produced based on element count information. /// /// The type of the elements in the source sequence, and in the windows in the result sequence. /// Source sequence to produce windows over. /// Length of each window. /// Number of elements to skip between creation of consecutive windows. /// An observable sequence of windows. /// is null. /// or is less than or equal to zero. public static IObservable> Window(this IObservable source, int count, int skip) { if (source == null) throw new ArgumentNullException("source"); if (count <= 0) throw new ArgumentOutOfRangeException("count"); if (skip <= 0) throw new ArgumentOutOfRangeException("skip"); return s_impl.Window(source, count, skip); } #endregion } }