diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs | 1907 |
1 files changed, 0 insertions, 1907 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs deleted file mode 100644 index 161e1c1..0000000 --- a/Rx.NET/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Time.cs +++ /dev/null @@ -1,1907 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -using System.Collections.Generic; -using System.Reactive.Concurrency; -using System.Reactive.Disposables; -using System.Reactive.Subjects; - -namespace System.Reactive.Linq -{ -#if !NO_PERF - using Observαble; -#endif - - internal partial class QueryLanguage - { - #region + Buffer + - - #region TimeSpan only - - public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan) - { - return Buffer_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler) - { - return Buffer_<TSource>(source, timeSpan, timeSpan, scheduler); - } - - public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift) - { - return Buffer_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) - { - return Buffer_<TSource>(source, timeSpan, timeShift, scheduler); - } - - private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) - { -#if !NO_PERF - return new Buffer<TSource>(source, timeSpan, timeShift, scheduler); -#else - return source.Window(timeSpan, timeShift, scheduler).SelectMany(Observable.ToList); -#endif - } - - #endregion - - #region TimeSpan + int - - public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count) - { - return Buffer_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) - { - return Buffer_<TSource>(source, timeSpan, count, scheduler); - } - - private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) - { -#if !NO_PERF - return new Buffer<TSource>(source, timeSpan, count, scheduler); -#else - return source.Window(timeSpan, count, scheduler).SelectMany(Observable.ToList); -#endif - } - - #endregion - - #endregion - - #region + Delay + - - #region TimeSpan - - public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime) - { - return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { - return Delay_<TSource>(source, dueTime, scheduler); - } - - private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new Delay<TSource>(source, dueTime, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var gate = new object(); - var q = new Queue<Timestamped<Notification<TSource>>>(); - var active = false; - var running = false; - var cancelable = new SerialDisposable(); - var exception = default(Exception); - - var subscription = source.Materialize().Timestamp(scheduler).Subscribe(notification => - { - var shouldRun = false; - - lock (gate) - { - if (notification.Value.Kind == NotificationKind.OnError) - { - q.Clear(); - q.Enqueue(notification); - exception = notification.Value.Exception; - shouldRun = !running; - } - else - { - q.Enqueue(new Timestamped<Notification<TSource>>(notification.Value, notification.Timestamp.Add(dueTime))); - shouldRun = !active; - active = true; - } - } - - if (shouldRun) - { - if (exception != null) - observer.OnError(exception); - else - { - var d = new SingleAssignmentDisposable(); - cancelable.Disposable = d; - d.Disposable = scheduler.Schedule(dueTime, self => - { - lock (gate) - { - if (exception != null) - return; - running = true; - } - Notification<TSource> result; - - do - { - result = null; - lock (gate) - { - if (q.Count > 0 && q.Peek().Timestamp.CompareTo(scheduler.Now) <= 0) - result = q.Dequeue().Value; - } - - if (result != null) - result.Accept(observer); - } while (result != null); - - var shouldRecurse = false; - var recurseDueTime = TimeSpan.Zero; - var e = default(Exception); - lock (gate) - { - if (q.Count > 0) - { - shouldRecurse = true; - recurseDueTime = TimeSpan.FromTicks(Math.Max(0, q.Peek().Timestamp.Subtract(scheduler.Now).Ticks)); - } - else - active = false; - e = exception; - running = false; - } - if (e != null) - observer.OnError(e); - else if (shouldRecurse) - self(recurseDueTime); - }); - } - } - }); - - return new CompositeDisposable(subscription, cancelable); - }); -#endif - } - - #endregion - - #region DateTimeOffset - - public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime) - { - return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) - { - return Delay_<TSource>(source, dueTime, scheduler); - } - - private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new Delay<TSource>(source, dueTime, scheduler); -#else - return Observable.Defer(() => - { - var timeSpan = dueTime.Subtract(scheduler.Now); - return Delay_<TSource>(source, timeSpan, scheduler); - }); -#endif - } - - #endregion - - #region Duration selector - - public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delayDurationSelector) - { - return Delay_<TSource, TDelay>(source, null, delayDurationSelector); - } - - public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector) - { - return Delay_<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector); - } - - private static IObservable<TSource> Delay_<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector) - { -#if !NO_PERF - return new Delay<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector); -#else - return new AnonymousObservable<TSource>(observer => - { - var delays = new CompositeDisposable(); - - var gate = new object(); - var atEnd = false; - - var done = new Action(() => - { - if (atEnd && delays.Count == 0) - { - observer.OnCompleted(); - } - }); - - var subscription = new SerialDisposable(); - - var start = new Action(() => - { - subscription.Disposable = source.Subscribe( - x => - { - var delay = default(IObservable<TDelay>); - try - { - delay = delayDurationSelector(x); - } - catch (Exception error) - { - lock (gate) - observer.OnError(error); - - return; - } - - var d = new SingleAssignmentDisposable(); - delays.Add(d); - d.Disposable = delay.Subscribe( - _ => - { - lock (gate) - { - observer.OnNext(x); - - delays.Remove(d); - done(); - } - }, - exception => - { - lock (gate) - observer.OnError(exception); - }, - () => - { - lock (gate) - { - observer.OnNext(x); - - delays.Remove(d); - done(); - } - } - ); - }, - exception => - { - lock (gate) - { - observer.OnError(exception); - } - }, - () => - { - lock (gate) - { - atEnd = true; - subscription.Dispose(); - - done(); - } - } - ); - }); - - if (subscriptionDelay == null) - { - start(); - } - else - { - subscription.Disposable = subscriptionDelay.Subscribe( - _ => - { - start(); - }, - observer.OnError, - start - ); - } - - return new CompositeDisposable(subscription, delays); - }); -#endif - } - - #endregion - - #endregion - - #region + DelaySubscription + - - public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime) - { - return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { - return DelaySubscription_<TSource>(source, dueTime, scheduler); - } - - private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new DelaySubscription<TSource>(source, dueTime, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var d = new MultipleAssignmentDisposable(); - - var dt = Normalize(dueTime); - - d.Disposable = scheduler.Schedule(dt, () => - { - d.Disposable = source.Subscribe(observer); - }); - - return d; - }); -#endif - } - - public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime) - { - return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) - { - return DelaySubscription_<TSource>(source, dueTime, scheduler); - } - - private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new DelaySubscription<TSource>(source, dueTime, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var d = new MultipleAssignmentDisposable(); - - d.Disposable = scheduler.Schedule(dueTime, () => - { - d.Disposable = source.Subscribe(observer); - }); - - return d; - }); -#endif - } - - #endregion - - #region + Generate + - - public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector) - { - return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler) - { - return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler); - } - - private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler) - { -#if !NO_PERF - return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler); -#else - return new AnonymousObservable<TResult>(observer => - { - var state = initialState; - var first = true; - var hasResult = false; - var result = default(TResult); - var time = default(TimeSpan); - return scheduler.Schedule(TimeSpan.Zero, self => - { - if (hasResult) - observer.OnNext(result); - try - { - if (first) - first = false; - else - state = iterate(state); - hasResult = condition(state); - if (hasResult) - { - result = resultSelector(state); - time = timeSelector(state); - } - } - catch (Exception exception) - { - observer.OnError(exception); - return; - } - - if (hasResult) - self(time); - else - observer.OnCompleted(); - }); - }); -#endif - } - - public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector) - { - return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler) - { - return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler); - } - - private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler) - { -#if !NO_PERF - return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler); -#else - return new AnonymousObservable<TResult>(observer => - { - var state = initialState; - var first = true; - var hasResult = false; - var result = default(TResult); - var time = default(DateTimeOffset); - return scheduler.Schedule(scheduler.Now, self => - { - if (hasResult) - observer.OnNext(result); - try - { - if (first) - first = false; - else - state = iterate(state); - hasResult = condition(state); - if (hasResult) - { - result = resultSelector(state); - time = timeSelector(state); - } - } - catch (Exception exception) - { - observer.OnError(exception); - return; - } - - if (hasResult) - self(time); - else - observer.OnCompleted(); - }); - }); -#endif - } - - #endregion - - #region + Interval + - - public virtual IObservable<long> Interval(TimeSpan period) - { - return Timer_(period, period, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<long> Interval(TimeSpan period, IScheduler scheduler) - { - return Timer_(period, period, scheduler); - } - - #endregion - - #region + Sample + - - public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval) - { - return Sample_<TSource>(source, interval, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler) - { - return Sample_<TSource>(source, interval, scheduler); - } - - private static IObservable<TSource> Sample_<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler) - { -#if !NO_PERF - return new Sample<TSource>(source, interval, scheduler); -#else - var sampler = Observable.Interval(interval, scheduler); - return Sample_<TSource, long>(source, sampler); -#endif - } - - public virtual IObservable<TSource> Sample<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler) - { - return Sample_<TSource, TSample>(source, sampler); - } - - private static IObservable<TSource> Sample_<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler) - { -#if !NO_PERF - return new Sample<TSource, TSample>(source, sampler); -#else - return Combine(source, sampler, (IObserver<TSource> observer, IDisposable leftSubscription, IDisposable rightSubscription) => - { - var value = default(Notification<TSource>); - var atEnd = false; - return new BinaryObserver<TSource, TSample>( - newValue => - { - switch (newValue.Kind) - { - case NotificationKind.OnNext: - value = newValue; - break; - case NotificationKind.OnError: - newValue.Accept(observer); - break; - case NotificationKind.OnCompleted: - atEnd = true; - break; - } - }, - _ => - { - var myValue = value; - value = null; - if (myValue != null) - myValue.Accept(observer); - if (atEnd) - observer.OnCompleted(); - }); - }); -#endif - } - - #endregion - - #region + Skip + - - public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration) - { - return Skip_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { - return Skip_<TSource>(source, duration, scheduler); - } - - private static IObservable<TSource> Skip_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { -#if !NO_PERF - var skip = source as Skip<TSource>; - if (skip != null && skip._scheduler == scheduler) - return skip.Ω(duration); - - return new Skip<TSource>(source, duration, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var open = false; - - var t = scheduler.Schedule(duration, () => open = true); - - var d = source.Subscribe( - x => - { - if (open) - observer.OnNext(x); - }, - observer.OnError, - observer.OnCompleted - ); - - return new CompositeDisposable(t, d); - }); -#endif - } - - #endregion - - #region + SkipLast + - - public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration) - { - return SkipLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { - return SkipLast_<TSource>(source, duration, scheduler); - } - - private static IObservable<TSource> SkipLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { -#if !NO_PERF - return new SkipLast<TSource>(source, duration, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var q = new Queue<System.Reactive.TimeInterval<TSource>>(); - - var swp = scheduler.AsStopwatchProvider(); - var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch(); - - return source.Subscribe( - x => - { - var now = sw.Elapsed; - q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now)); - while (q.Count > 0 && now - q.Peek().Interval >= duration) - observer.OnNext(q.Dequeue().Value); - }, - observer.OnError, - () => - { - var now = sw.Elapsed; - while (q.Count > 0 && now - q.Peek().Interval >= duration) - observer.OnNext(q.Dequeue().Value); - observer.OnCompleted(); - } - ); - }); -#endif - } - - #endregion - - #region + SkipUntil + - - public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime) - { - return SkipUntil_<TSource>(source, startTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler) - { - return SkipUntil_<TSource>(source, startTime, scheduler); - } - - private static IObservable<TSource> SkipUntil_<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler) - { -#if !NO_PERF - var skipUntil = source as SkipUntil<TSource>; - if (skipUntil != null && skipUntil._scheduler == scheduler) - return skipUntil.Ω(startTime); - - return new SkipUntil<TSource>(source, startTime, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var open = false; - - var t = scheduler.Schedule(startTime, () => open = true); - - var d = source.Subscribe( - x => - { - if (open) - observer.OnNext(x); - }, - observer.OnError, - observer.OnCompleted - ); - - return new CompositeDisposable(t, d); - }); -#endif - } - - #endregion - - #region + Take + - - public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration) - { - return Take_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { - return Take_<TSource>(source, duration, scheduler); - } - - private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { -#if !NO_PERF - var take = source as Take<TSource>; - if (take != null && take._scheduler == scheduler) - return take.Ω(duration); - - return new Take<TSource>(source, duration, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var gate = new object(); - - var t = scheduler.Schedule(duration, () => - { - lock (gate) - { - observer.OnCompleted(); - } - }); - - var d = source.Synchronize(gate).Subscribe(observer); - - return new CompositeDisposable(t, d); - }); -#endif - } - - #endregion - - #region + TakeLast + - - public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration) - { - return TakeLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations, SchedulerDefaults.Iteration); - } - - public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { - return TakeLast_<TSource>(source, duration, scheduler, SchedulerDefaults.Iteration); - } - - public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler) - { - return TakeLast_<TSource>(source, duration, timerScheduler, loopScheduler); - } - - private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler) - { -#if !NO_PERF - return new TakeLast<TSource>(source, duration, timerScheduler, loopScheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var q = new Queue<System.Reactive.TimeInterval<TSource>>(); - - var swp = timerScheduler.AsStopwatchProvider(); - var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch(); - - var trim = new Action<TimeSpan>(now => - { - while (q.Count > 0 && now - q.Peek().Interval >= duration) - q.Dequeue(); - }); - - var g = new CompositeDisposable(); - - g.Add(source.Subscribe( - x => - { - var now = sw.Elapsed; - - q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now)); - trim(now); - }, - observer.OnError, - () => - { - var now = sw.Elapsed; - trim(now); - - g.Add(loopScheduler.Schedule(rec => - { - if (q.Count > 0) - { - observer.OnNext(q.Dequeue().Value); - rec(); - } - else - { - observer.OnCompleted(); - } - })); - } - )); - - return g; - }); -#endif - } - - public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration) - { - return TakeLastBuffer_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { - return TakeLastBuffer_<TSource>(source, duration, scheduler); - } - - private static IObservable<IList<TSource>> TakeLastBuffer_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler) - { -#if !NO_PERF - return new TakeLastBuffer<TSource>(source, duration, scheduler); -#else - return new AnonymousObservable<IList<TSource>>(observer => - { - var q = new Queue<System.Reactive.TimeInterval<TSource>>(); - - var swp = scheduler.AsStopwatchProvider(); - var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch(); - - return source.Subscribe( - x => - { - var now = sw.Elapsed; - - q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now)); - while (q.Count > 0 && now - q.Peek().Interval >= duration) - q.Dequeue(); - }, - observer.OnError, - () => - { - var now = sw.Elapsed; - - var res = new List<TSource>(); - while (q.Count > 0) - { - var next = q.Dequeue(); - if (now - next.Interval <= duration) - res.Add(next.Value); - } - - observer.OnNext(res); - observer.OnCompleted(); - } - ); - }); -#endif - } - - #endregion - - #region + TakeUntil + - - public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime) - { - return TakeUntil_<TSource>(source, endTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler) - { - return TakeUntil_<TSource>(source, endTime, scheduler); - } - - private static IObservable<TSource> TakeUntil_<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler) - { -#if !NO_PERF - var takeUntil = source as TakeUntil<TSource>; - if (takeUntil != null && takeUntil._scheduler == scheduler) - return takeUntil.Ω(endTime); - - return new TakeUntil<TSource>(source, endTime, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var gate = new object(); - - var t = scheduler.Schedule(endTime, () => - { - lock (gate) - { - observer.OnCompleted(); - } - }); - - var d = source.Synchronize(gate).Subscribe(observer); - - return new CompositeDisposable(t, d); - }); -#endif - } - - #endregion - - #region + Throttle + - - public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime) - { - return Throttle_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { - return Throttle_<TSource>(source, dueTime, scheduler); - } - - private static IObservable<TSource> Throttle_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new Throttle<TSource>(source, dueTime, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var gate = new object(); - var value = default(TSource); - var hasValue = false; - var cancelable = new SerialDisposable(); - var id = 0UL; - - var subscription = source.Subscribe(x => - { - ulong currentid; - lock (gate) - { - hasValue = true; - value = x; - id = unchecked(id + 1); - currentid = id; - } - var d = new SingleAssignmentDisposable(); - cancelable.Disposable = d; - d.Disposable = scheduler.Schedule(dueTime, () => - { - lock (gate) - { - if (hasValue && id == currentid) - observer.OnNext(value); - hasValue = false; - } - }); - }, - exception => - { - cancelable.Dispose(); - - lock (gate) - { - observer.OnError(exception); - hasValue = false; - id = unchecked(id + 1); - } - }, - () => - { - cancelable.Dispose(); - - lock (gate) - { - if (hasValue) - observer.OnNext(value); - observer.OnCompleted(); - hasValue = false; - id = unchecked(id + 1); - } - }); - - return new CompositeDisposable(subscription, cancelable); - }); -#endif - } - - public virtual IObservable<TSource> Throttle<TSource, TThrottle>(IObservable<TSource> source, Func<TSource, IObservable<TThrottle>> throttleDurationSelector) - { -#if !NO_PERF - return new Throttle<TSource, TThrottle>(source, throttleDurationSelector); -#else - return new AnonymousObservable<TSource>(observer => - { - var gate = new object(); - var value = default(TSource); - var hasValue = false; - var cancelable = new SerialDisposable(); - var id = 0UL; - - var subscription = source.Subscribe( - x => - { - var throttle = default(IObservable<TThrottle>); - try - { - throttle = throttleDurationSelector(x); - } - catch (Exception error) - { - lock (gate) - observer.OnError(error); - - return; - } - - ulong currentid; - lock (gate) - { - hasValue = true; - value = x; - id = unchecked(id + 1); - currentid = id; - } - - var d = new SingleAssignmentDisposable(); - cancelable.Disposable = d; - d.Disposable = throttle.Subscribe( - _ => - { - lock (gate) - { - if (hasValue && id == currentid) - observer.OnNext(value); - - hasValue = false; - d.Dispose(); - } - }, - exception => - { - lock (gate) - { - observer.OnError(exception); - } - }, - () => - { - lock (gate) - { - if (hasValue && id == currentid) - observer.OnNext(value); - - hasValue = false; - d.Dispose(); - } - } - ); - }, - exception => - { - cancelable.Dispose(); - - lock (gate) - { - observer.OnError(exception); - hasValue = false; - id = unchecked(id + 1); - } - }, - () => - { - cancelable.Dispose(); - - lock (gate) - { - if (hasValue) - observer.OnNext(value); - observer.OnCompleted(); - hasValue = false; - id = unchecked(id + 1); - } - }); - - return new CompositeDisposable(subscription, cancelable); - }); -#endif - } - - #endregion - - #region + TimeInterval + - - public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source) - { - return TimeInterval_<TSource>(source, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source, IScheduler scheduler) - { - return TimeInterval_<TSource>(source, scheduler); - } - -#if !NO_PERF - private static IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler) - { - return new TimeInterval<TSource>(source, scheduler); - } -#else - private IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler) - { - return Defer(() => - { - var last = scheduler.Now; - return source.Select(x => - { - var now = scheduler.Now; - var span = now.Subtract(last); - last = now; - return new System.Reactive.TimeInterval<TSource>(x, span); - }); - }); - } -#endif - - #endregion - - #region + Timeout + - - #region TimeSpan - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime) - { - return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) - { - return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler); - } - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other) - { - return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler) - { - return Timeout_<TSource>(source, dueTime, other, scheduler); - } - - private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler) - { -#if !NO_PERF - return new Timeout<TSource>(source, dueTime, other, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var subscription = new SerialDisposable(); - var timer = new SerialDisposable(); - var original = new SingleAssignmentDisposable(); - - subscription.Disposable = original; - - var gate = new object(); - var id = 0UL; - var switched = false; - - Action createTimer = () => - { - var myid = id; - timer.Disposable = scheduler.Schedule(dueTime, () => - { - var timerWins = false; - - lock (gate) - { - switched = (id == myid); - timerWins = switched; - } - - if (timerWins) - subscription.Disposable = other.Subscribe(observer); - }); - }; - - createTimer(); - - original.Disposable = source.Subscribe( - x => - { - var onNextWins = false; - - lock (gate) - { - onNextWins = !switched; - if (onNextWins) - { - id = unchecked(id + 1); - } - } - - if (onNextWins) - { - observer.OnNext(x); - createTimer(); - } - }, - exception => - { - var onErrorWins = false; - - lock (gate) - { - onErrorWins = !switched; - if (onErrorWins) - { - id = unchecked(id + 1); - } - } - - if (onErrorWins) - observer.OnError(exception); - }, - () => - { - var onCompletedWins = false; - - lock (gate) - { - onCompletedWins = !switched; - if (onCompletedWins) - { - id = unchecked(id + 1); - } - } - - if (onCompletedWins) - observer.OnCompleted(); - }); - - return new CompositeDisposable(subscription, timer); - }); -#endif - } - - #endregion - - #region DateTimeOffset - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime) - { - return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler) - { - return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler); - } - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other) - { - return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler) - { - return Timeout_<TSource>(source, dueTime, other, scheduler); - } - - private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler) - { -#if !NO_PERF - return new Timeout<TSource>(source, dueTime, other, scheduler); -#else - return new AnonymousObservable<TSource>(observer => - { - var subscription = new SerialDisposable(); - var original = new SingleAssignmentDisposable(); - - subscription.Disposable = original; - - var gate = new object(); - var switched = false; - - var timer = scheduler.Schedule(dueTime, () => - { - var timerWins = false; - - lock (gate) - { - timerWins = !switched; - switched = true; - } - - if (timerWins) - subscription.Disposable = other.Subscribe(observer); - }); - - original.Disposable = source.Subscribe( - x => - { - lock (gate) - { - if (!switched) - observer.OnNext(x); - } - }, - exception => - { - var onErrorWins = false; - - lock (gate) - { - onErrorWins = !switched; - switched = true; - } - - if (onErrorWins) - observer.OnError(exception); - }, - () => - { - var onCompletedWins = false; - - lock (gate) - { - onCompletedWins = !switched; - switched = true; - } - - if (onCompletedWins) - observer.OnCompleted(); - }); - - return new CompositeDisposable(subscription, timer); - }); -#endif - } - - #endregion - - #region Duration selector - - public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector) - { - return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException())); - } - - public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other) - { - return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, other); - } - - public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector) - { - return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException())); - } - - public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other) - { - return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other); - } - - private static IObservable<TSource> Timeout_<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other) - { -#if !NO_PERF - return new Timeout<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other); -#else - return new AnonymousObservable<TSource>(observer => - { - var subscription = new SerialDisposable(); - var timer = new SerialDisposable(); - var original = new SingleAssignmentDisposable(); - - subscription.Disposable = original; - - var gate = new object(); - var id = 0UL; - var switched = false; - - Action<IObservable<TTimeout>> setTimer = timeout => - { - var myid = id; - - Func<bool> timerWins = () => - { - var res = false; - - lock (gate) - { - switched = (id == myid); - res = switched; - } - - return res; - }; - - var d = new SingleAssignmentDisposable(); - timer.Disposable = d; - d.Disposable = timeout.Subscribe( - _ => - { - if (timerWins()) - subscription.Disposable = other.Subscribe(observer); - - d.Dispose(); - }, - error => - { - if (timerWins()) - observer.OnError(error); - }, - () => - { - if (timerWins()) - subscription.Disposable = other.Subscribe(observer); - } - ); - }; - - setTimer(firstTimeout); - - Func<bool> observerWins = () => - { - var res = false; - - lock (gate) - { - res = !switched; - if (res) - { - id = unchecked(id + 1); - } - } - - return res; - }; - - original.Disposable = source.Subscribe( - x => - { - if (observerWins()) - { - observer.OnNext(x); - - var timeout = default(IObservable<TTimeout>); - try - { - timeout = timeoutDurationSelector(x); - } - catch (Exception error) - { - observer.OnError(error); - return; - } - - setTimer(timeout); - } - }, - exception => - { - if (observerWins()) - observer.OnError(exception); - }, - () => - { - if (observerWins()) - observer.OnCompleted(); - } - ); - - return new CompositeDisposable(subscription, timer); - }); -#endif - } - - #endregion - - #endregion - - #region + Timer + - - public virtual IObservable<long> Timer(TimeSpan dueTime) - { - return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<long> Timer(DateTimeOffset dueTime) - { - return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period) - { - return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period) - { - return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<long> Timer(TimeSpan dueTime, IScheduler scheduler) - { - return Timer_(dueTime, scheduler); - } - - public virtual IObservable<long> Timer(DateTimeOffset dueTime, IScheduler scheduler) - { - return Timer_(dueTime, scheduler); - } - - public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IScheduler scheduler) - { - return Timer_(dueTime, period, scheduler); - } - - public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler) - { - return Timer_(dueTime, period, scheduler); - } - - private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new Timer(dueTime, null, scheduler); -#else - var d = Normalize(dueTime); - - return new AnonymousObservable<long>(observer => - scheduler.Schedule(d, () => - { - observer.OnNext(0); - observer.OnCompleted(); - })); -#endif - } - -#if !NO_PERF - private static IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler) - { - return new Timer(dueTime, period, scheduler); - } -#else - private IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler) - { - var p = Normalize(period); - - return Defer(() => Timer(scheduler.Now + dueTime, p, scheduler)); - } -#endif - - private static IObservable<long> Timer_(DateTimeOffset dueTime, IScheduler scheduler) - { -#if !NO_PERF - return new Timer(dueTime, null, scheduler); -#else - return new AnonymousObservable<long>(observer => - scheduler.Schedule(dueTime, () => - { - observer.OnNext(0); - observer.OnCompleted(); - })); -#endif - } - - private static IObservable<long> Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler) - { -#if !NO_PERF - return new Timer(dueTime, period, scheduler); -#else - var p = Normalize(period); - - return new AnonymousObservable<long>(observer => - { - var d = dueTime; - var count = 0L; - - return scheduler.Schedule(d, self => - { - if (p > TimeSpan.Zero) - { - var now = scheduler.Now; - d = d + p; - if (d <= now) - d = now + p; - } - - observer.OnNext(count); - count = unchecked(count + 1); - self(d); - }); - }); -#endif - } - - #endregion - - #region + Timestamp + - - public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source) - { - return Timestamp_<TSource>(source, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source, IScheduler scheduler) - { - return Timestamp_<TSource>(source, scheduler); - } - - private static IObservable<Timestamped<TSource>> Timestamp_<TSource>(IObservable<TSource> source, IScheduler scheduler) - { -#if !NO_PERF - return new Timestamp<TSource>(source, scheduler); -#else - return source.Select(x => new Timestamped<TSource>(x, scheduler.Now)); -#endif - } - - #endregion - - #region + Window + - - #region TimeSpan only - - public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan) - { - return Window_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler) - { - return Window_<TSource>(source, timeSpan, timeSpan, scheduler); - } - - public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift) - { - return Window_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) - { - return Window_<TSource>(source, timeSpan, timeShift, scheduler); - } - - private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) - { -#if !NO_PERF - return new Window<TSource>(source, timeSpan, timeShift, scheduler); -#else - return new AnonymousObservable<IObservable<TSource>>(observer => - { - var totalTime = TimeSpan.Zero; - var nextShift = timeShift; - var nextSpan = timeSpan; - - var gate = new object(); - var q = new Queue<ISubject<TSource>>(); - - var timerD = new SerialDisposable(); - var groupDisposable = new CompositeDisposable(2) { timerD }; - var refCountDisposable = new RefCountDisposable(groupDisposable); - - var createTimer = default(Action); - createTimer = () => - { - var m = new SingleAssignmentDisposable(); - timerD.Disposable = m; - - var isSpan = false; - var isShift = false; - if (nextSpan == nextShift) - { - isSpan = true; - isShift = true; - } - else if (nextSpan < nextShift) - isSpan = true; - else - isShift = true; - - var newTotalTime = isSpan ? nextSpan : nextShift; - var ts = newTotalTime - totalTime; - totalTime = newTotalTime; - - if (isSpan) - nextSpan += timeShift; - if (isShift) - nextShift += timeShift; - - m.Disposable = scheduler.Schedule(ts, () => - { - lock (gate) - { - if (isShift) - { - var s = new Subject<TSource>(); - q.Enqueue(s); - observer.OnNext(s.AddRef(refCountDisposable)); - } - if (isSpan) - { - var s = q.Dequeue(); - s.OnCompleted(); - } - } - - createTimer(); - }); - }; - - q.Enqueue(new Subject<TSource>()); - observer.OnNext(q.Peek().AddRef(refCountDisposable)); - createTimer(); - - groupDisposable.Add(source.Subscribe( - x => - { - lock (gate) - { - foreach (var s in q) - s.OnNext(x); - } - }, - exception => - { - lock (gate) - { - foreach (var s in q) - s.OnError(exception); - observer.OnError(exception); - } - }, - () => - { - lock (gate) - { - foreach (var s in q) - s.OnCompleted(); - observer.OnCompleted(); - } - } - )); - - return refCountDisposable; - }); -#endif - } - - #endregion - - #region TimeSpan + int - - public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count) - { - return Window_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations); - } - - public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) - { - return Window_<TSource>(source, timeSpan, count, scheduler); - } - - private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler) - { -#if !NO_PERF - return new Window<TSource>(source, timeSpan, count, scheduler); -#else - return new AnonymousObservable<IObservable<TSource>>(observer => - { - var gate = new object(); - var s = default(ISubject<TSource>); - var n = 0; - var windowId = 0; - - var timerD = new SerialDisposable(); - var groupDisposable = new CompositeDisposable(2) { timerD }; - var refCountDisposable = new RefCountDisposable(groupDisposable); - - var createTimer = default(Action<int>); - createTimer = id => - { - var m = new SingleAssignmentDisposable(); - timerD.Disposable = m; - - m.Disposable = scheduler.Schedule(timeSpan, () => - { - var newId = 0; - lock (gate) - { - if (id != windowId) - return; - n = 0; - newId = ++windowId; - - s.OnCompleted(); - s = new Subject<TSource>(); - observer.OnNext(s.AddRef(refCountDisposable)); - } - - createTimer(newId); - }); - }; - - s = new Subject<TSource>(); - observer.OnNext(s.AddRef(refCountDisposable)); - createTimer(0); - - groupDisposable.Add(source.Subscribe( - x => - { - var newWindow = false; - var newId = 0; - - lock (gate) - { - s.OnNext(x); - - n++; - if (n == count) - { - newWindow = true; - n = 0; - newId = ++windowId; - - s.OnCompleted(); - s = new Subject<TSource>(); - observer.OnNext(s.AddRef(refCountDisposable)); - } - } - - if (newWindow) - createTimer(newId); - }, - exception => - { - lock (gate) - { - s.OnError(exception); - observer.OnError(exception); - } - }, - () => - { - lock (gate) - { - s.OnCompleted(); - observer.OnCompleted(); - } - } - )); - - return refCountDisposable; - }); -#endif - } - - #endregion - - #endregion - - #region |> Helpers <| - -#if NO_PERF - private static TimeSpan Normalize(TimeSpan timeSpan) - { - if (timeSpan.CompareTo(TimeSpan.Zero) < 0) - return TimeSpan.Zero; - return timeSpan; - } -#endif - - #endregion - } -} |