// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Collections.Generic; namespace System.Reactive.Linq.Observαble { class MaxBy : Producer> { private readonly IObservable _source; private readonly Func _keySelector; private readonly IComparer _comparer; public MaxBy(IObservable source, Func keySelector, IComparer comparer) { _source = source; _keySelector = keySelector; _comparer = comparer; } protected override IDisposable Run(IObserver> observer, IDisposable cancel, Action setSink) { var sink = new _(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } class _ : Sink>, IObserver { private readonly MaxBy _parent; private bool _hasValue; private TKey _lastKey; private List _list; public _(MaxBy parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; _hasValue = false; _lastKey = default(TKey); _list = new List(); } public void OnNext(TSource value) { var key = default(TKey); try { key = _parent._keySelector(value); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } var comparison = 0; if (!_hasValue) { _hasValue = true; _lastKey = key; } else { try { comparison = _parent._comparer.Compare(key, _lastKey); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } } if (comparison > 0) { _lastKey = key; _list.Clear(); } if (comparison >= 0) { _list.Add(value); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnNext(_list); base._observer.OnCompleted(); base.Dispose(); } } } } #endif