// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; namespace System.Reactive.Linq.ObservableImpl { class AverageDouble : Producer { private readonly IObservable _source; public AverageDouble(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; _count = 0L; } public void OnNext(double value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext(_sum / _count); base._observer.OnCompleted(); } else { base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } base.Dispose(); } } } class AverageSingle : Producer { private readonly IObservable _source; public AverageSingle(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects. private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; _count = 0L; } public void OnNext(float value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext((float)(_sum / _count)); base._observer.OnCompleted(); } else { base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } base.Dispose(); } } } class AverageDecimal : Producer { private readonly IObservable _source; public AverageDecimal(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private decimal _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0M; _count = 0L; } public void OnNext(decimal value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext(_sum / _count); base._observer.OnCompleted(); } else { base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } base.Dispose(); } } } class AverageInt32 : Producer { private readonly IObservable _source; public AverageInt32(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private long _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0L; _count = 0L; } public void OnNext(int value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext((double)_sum / _count); base._observer.OnCompleted(); } else { base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } base.Dispose(); } } } class AverageInt64 : Producer { private readonly IObservable _source; public AverageInt64(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private long _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0L; _count = 0L; } public void OnNext(long value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext((double)_sum / _count); base._observer.OnCompleted(); } else { base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } base.Dispose(); } } } class AverageDoubleNullable : Producer { private readonly IObservable _source; public AverageDoubleNullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; _count = 0L; } public void OnNext(double? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext(_sum / _count); } else { base._observer.OnNext(null); } base._observer.OnCompleted(); base.Dispose(); } } } class AverageSingleNullable : Producer { private readonly IObservable _source; public AverageSingleNullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects. private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; _count = 0L; } public void OnNext(float? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext((float)(_sum / _count)); } else { base._observer.OnNext(null); } base._observer.OnCompleted(); base.Dispose(); } } } class AverageDecimalNullable : Producer { private readonly IObservable _source; public AverageDecimalNullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private decimal _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0M; _count = 0L; } public void OnNext(decimal? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext(_sum / _count); } else { base._observer.OnNext(null); } base._observer.OnCompleted(); base.Dispose(); } } } class AverageInt32Nullable : Producer { private readonly IObservable _source; public AverageInt32Nullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private long _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0L; _count = 0L; } public void OnNext(int? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext((double)_sum / _count); } else { base._observer.OnNext(null); } base._observer.OnCompleted(); base.Dispose(); } } } class AverageInt64Nullable : Producer { private readonly IObservable _source; public AverageInt64Nullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private long _sum; private long _count; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0L; _count = 0L; } public void OnNext(long? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { if (_count > 0) { base._observer.OnNext((double)_sum / _count); } else { base._observer.OnNext(null); } base._observer.OnCompleted(); base.Dispose(); } } } } #endif