// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq.Observαble { class GroupByUntil : Producer> { private readonly IObservable _source; private readonly Func _keySelector; private readonly Func _elementSelector; private readonly Func, IObservable> _durationSelector; private readonly IEqualityComparer _comparer; public GroupByUntil(IObservable source, Func keySelector, Func elementSelector, Func, IObservable> durationSelector, IEqualityComparer comparer) { _source = source; _keySelector = keySelector; _elementSelector = elementSelector; _durationSelector = durationSelector; _comparer = comparer; } private CompositeDisposable _groupDisposable; private RefCountDisposable _refCountDisposable; protected override IDisposable Run(IObserver> observer, IDisposable cancel, Action setSink) { _groupDisposable = new CompositeDisposable(); _refCountDisposable = new RefCountDisposable(_groupDisposable); var sink = new _(this, observer, cancel); setSink(sink); _groupDisposable.Add(_source.SubscribeSafe(sink)); return _refCountDisposable; } class _ : Sink>, IObserver { private readonly GroupByUntil _parent; private readonly Map> _map; private ISubject _null; private object _nullGate; public _(GroupByUntil parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; _map = new Map>(_parent._comparer); _nullGate = new object(); } public void OnNext(TSource value) { var key = default(TKey); try { key = _parent._keySelector(value); } catch (Exception exception) { Error(exception); return; } var fireNewMapEntry = false; var writer = default(ISubject); try { // // Note: The box instruction in the IL will be erased by the JIT in case T is // a value type. In fact, the whole if block will go away and we'll end // up with nothing but the GetOrAdd call below. // // See GroupBy for more information and confirmation of this fact using // the SOS debugger extension. // if (key == null) { lock (_nullGate) { if (_null == null) { _null = new Subject(); fireNewMapEntry = true; } writer = _null; } } else { writer = _map.GetOrAdd(key, () => new Subject(), out fireNewMapEntry); } } catch (Exception exception) { Error(exception); return; } if (fireNewMapEntry) { var group = new GroupedObservable(key, writer, _parent._refCountDisposable); var duration = default(IObservable); var durationGroup = new GroupedObservable(key, writer); try { duration = _parent._durationSelector(durationGroup); } catch (Exception exception) { Error(exception); return; } lock (base._observer) base._observer.OnNext(group); var md = new SingleAssignmentDisposable(); _parent._groupDisposable.Add(md); md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md)); } var element = default(TElement); try { element = _parent._elementSelector(value); } catch (Exception exception) { Error(exception); return; } // // ISSUE: Rx v1.x shipped without proper handling of the case where the duration // sequence fires concurrently with the OnNext code path here. In such a // case, the subject can be completed before we get a chance to send out // a new element. However, a resurrected group for the same key won't get // to see the element either. To guard against this case, we'd have to // check whether the OnNext call below lost the race, and resurrect a new // group if needed. Unfortunately, this complicates matters when the // duration selector triggers synchronously (e.g. Return or Empty), which // causes the group to terminate immediately. We should not get stuck in // this case, repeatedly trying to resurrect a group that always ends // before we can send the element into it. Also, users may expect this // base case to mean no elements will ever be produced, so sending the // element into the group before starting the duration sequence may not // be a good idea either. For the time being, we'll leave this as-is and // revisit the behavior for vNext. Nonetheless, we'll add synchronization // to ensure no concurrent calls to the subject are made. // lock (writer) writer.OnNext(element); } class δ : IObserver { private readonly _ _parent; private readonly TKey _key; private readonly ISubject _writer; private readonly IDisposable _self; public δ(_ parent, TKey key, ISubject writer, IDisposable self) { _parent = parent; _key = key; _writer = writer; _self = self; } public void OnNext(TDuration value) { OnCompleted(); } public void OnError(Exception error) { _parent.Error(error); _self.Dispose(); } public void OnCompleted() { if (_key == null) { var @null = default(ISubject); lock (_parent._nullGate) { @null = _parent._null; _parent._null = null; } lock (@null) @null.OnCompleted(); } else { if (_parent._map.Remove(_key)) { lock (_writer) _writer.OnCompleted(); } } _parent._parent._groupDisposable.Remove(_self); } } public void OnError(Exception error) { Error(error); } public void OnCompleted() { // // NOTE: A race with OnCompleted triggered by a duration selector is fine when // using Subject. It will transition into a terminal state, making one // of the two calls a no-op by swapping in a DoneObserver. // var @null = default(ISubject); lock (_nullGate) @null = _null; if (@null != null) @null.OnCompleted(); foreach (var w in _map.Values) w.OnCompleted(); lock (base._observer) base._observer.OnCompleted(); base.Dispose(); } private void Error(Exception exception) { // // NOTE: A race with OnCompleted triggered by a duration selector is fine when // using Subject. It will transition into a terminal state, making one // of the two calls a no-op by swapping in a DoneObserver. // var @null = default(ISubject); lock (_nullGate) @null = _null; if (@null != null) @null.OnError(exception); foreach (var w in _map.Values) w.OnError(exception); lock (base._observer) base._observer.OnError(exception); base.Dispose(); } } } #if !NO_CDS class Map { private readonly System.Collections.Concurrent.ConcurrentDictionary _map; public Map(IEqualityComparer comparer) { _map = new System.Collections.Concurrent.ConcurrentDictionary(comparer); } public TValue GetOrAdd(TKey key, Func valueFactory, out bool added) { added = false; var value = default(TValue); var newValue = default(TValue); var hasNewValue = false; while (true) { if (_map.TryGetValue(key, out value)) break; if (!hasNewValue) { newValue = valueFactory(); hasNewValue = true; } if (_map.TryAdd(key, newValue)) { added = true; value = newValue; break; } } return value; } public IEnumerable Values { get { return _map.Values.ToArray(); } } public bool Remove(TKey key) { var value = default(TValue); return _map.TryRemove(key, out value); } } #else class Map { private readonly Dictionary _map; public Map(IEqualityComparer comparer) { _map = new Dictionary(comparer); } public TValue GetOrAdd(TKey key, Func valueFactory, out bool added) { lock (_map) { added = false; var value = default(TValue); if (!_map.TryGetValue(key, out value)) { value = valueFactory(); _map.Add(key, value); added = true; } return value; } } public IEnumerable Values { get { lock (_map) { return _map.Values.ToArray(); } } } public bool Remove(TKey key) { lock (_map) { return _map.Remove(key); } } } #endif } #endif