// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; namespace System.Reactive.Linq.ObservableImpl { class SumDouble : Producer { private readonly IObservable _source; public SumDouble(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; } public void OnNext(double value) { _sum += value; } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumSingle : Producer { private readonly IObservable _source; public SumSingle(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; // This is what LINQ to Objects does! public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; // This is what LINQ to Objects does! } public void OnNext(float value) { _sum += value; // This is what LINQ to Objects does! } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext((float)_sum); // This is what LINQ to Objects does! base._observer.OnCompleted(); base.Dispose(); } } } class SumDecimal : Producer { private readonly IObservable _source; public SumDecimal(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private decimal _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0M; } public void OnNext(decimal value) { _sum += value; } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumInt32 : Producer { private readonly IObservable _source; public SumInt32(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private int _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0; } public void OnNext(int value) { try { checked { _sum += value; } } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumInt64 : Producer { private readonly IObservable _source; public SumInt64(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private long _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0L; } public void OnNext(long value) { try { checked { _sum += value; } } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumDoubleNullable : Producer { private readonly IObservable _source; public SumDoubleNullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; } public void OnNext(double? value) { if (value != null) _sum += value.Value; } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumSingleNullable : Producer { private readonly IObservable _source; public SumSingleNullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private double _sum; // This is what LINQ to Objects does! public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0.0; // This is what LINQ to Objects does! } public void OnNext(float? value) { if (value != null) _sum += value.Value; // This is what LINQ to Objects does! } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext((float)_sum); // This is what LINQ to Objects does! base._observer.OnCompleted(); base.Dispose(); } } } class SumDecimalNullable : Producer { private readonly IObservable _source; public SumDecimalNullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private decimal _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0M; } public void OnNext(decimal? value) { if (value != null) _sum += value.Value; } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumInt32Nullable : Producer { private readonly IObservable _source; public SumInt32Nullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private int _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0; } public void OnNext(int? value) { try { checked { if (value != null) _sum += value.Value; } } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } class SumInt64Nullable : Producer { private readonly IObservable _source; public SumInt64Nullable(IObservable source) { _source = source; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink, IObserver { private long _sum; public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { _sum = 0L; } public void OnNext(long? value) { try { checked { if (value != null) _sum += value.Value; } } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_sum); base._observer.OnCompleted(); base.Dispose(); } } } } #endif