// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Collections.Generic; using System.Diagnostics; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading; namespace System.Reactive.Linq.Observαble { class Window : Producer> { private readonly IObservable _source; private readonly int _count; private readonly int _skip; private readonly TimeSpan _timeSpan; private readonly TimeSpan _timeShift; private readonly IScheduler _scheduler; public Window(IObservable source, int count, int skip) { _source = source; _count = count; _skip = skip; } public Window(IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _timeShift = timeShift; _scheduler = scheduler; } public Window(IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) { _source = source; _timeSpan = timeSpan; _count = count; _scheduler = scheduler; } protected override IDisposable Run(IObserver> observer, IDisposable cancel, Action setSink) { if (_scheduler == null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } else if (_count > 0) { var sink = new μ(this, observer, cancel); setSink(sink); return sink.Run(); } else { if (_timeSpan == _timeShift) { var sink = new π(this, observer, cancel); setSink(sink); return sink.Run(); } else { var sink = new τ(this, observer, cancel); setSink(sink); return sink.Run(); } } } class _ : Sink>, IObserver { private readonly Window _parent; public _(Window parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private Queue> _queue; private int _n; private SingleAssignmentDisposable _m; private RefCountDisposable _refCountDisposable; public IDisposable Run() { _queue = new Queue>(); _n = 0; _m = new SingleAssignmentDisposable(); _refCountDisposable = new RefCountDisposable(_m); var firstWindow = CreateWindow(); base._observer.OnNext(firstWindow); _m.Disposable = _parent._source.SubscribeSafe(this); return _refCountDisposable; } private IObservable CreateWindow() { var s = new Subject(); _queue.Enqueue(s); return new WindowObservable(s, _refCountDisposable); } public void OnNext(TSource value) { foreach (var s in _queue) s.OnNext(value); var c = _n - _parent._count + 1; if (c >= 0 && c % _parent._skip == 0) { var s = _queue.Dequeue(); s.OnCompleted(); } _n++; if (_n % _parent._skip == 0) { var newWindow = CreateWindow(); base._observer.OnNext(newWindow); } } public void OnError(Exception error) { while (_queue.Count > 0) _queue.Dequeue().OnError(error); base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { while (_queue.Count > 0) _queue.Dequeue().OnCompleted(); base._observer.OnCompleted(); base.Dispose(); } } class τ : Sink>, IObserver { private readonly Window _parent; public τ(Window parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private TimeSpan _totalTime; private TimeSpan _nextShift; private TimeSpan _nextSpan; private object _gate; private Queue> _q; private SerialDisposable _timerD; private RefCountDisposable _refCountDisposable; public IDisposable Run() { _totalTime = TimeSpan.Zero; _nextShift = _parent._timeShift; _nextSpan = _parent._timeSpan; _gate = new object(); _q = new Queue>(); _timerD = new SerialDisposable(); var groupDisposable = new CompositeDisposable(2) { _timerD }; _refCountDisposable = new RefCountDisposable(groupDisposable); CreateWindow(); CreateTimer(); groupDisposable.Add(_parent._source.SubscribeSafe(this)); return _refCountDisposable; } private void CreateWindow() { var s = new Subject(); _q.Enqueue(s); base._observer.OnNext(new WindowObservable(s, _refCountDisposable)); } private void CreateTimer() { var m = new SingleAssignmentDisposable(); _timerD.Disposable = m; var isSpan = false; var isShift = false; if (_nextSpan == _nextShift) { isSpan = true; isShift = true; } else if (_nextSpan < _nextShift) isSpan = true; else isShift = true; var newTotalTime = isSpan ? _nextSpan : _nextShift; var ts = newTotalTime - _totalTime; _totalTime = newTotalTime; if (isSpan) _nextSpan += _parent._timeShift; if (isShift) _nextShift += _parent._timeShift; m.Disposable = _parent._scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick); } struct State { public bool isSpan; public bool isShift; } private IDisposable Tick(IScheduler self, State state) { lock (_gate) { // // BREAKING CHANGE v2 > v1.x - Making behavior of sending OnCompleted to the window // before sending out a new window consistent across all // overloads of Window and Buffer. Before v2, the two // operations below were reversed. // if (state.isSpan) { var s = _q.Dequeue(); s.OnCompleted(); } if (state.isShift) { CreateWindow(); } } CreateTimer(); return Disposable.Empty; } public void OnNext(TSource value) { lock (_gate) { foreach (var s in _q) s.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { foreach (var s in _q) s.OnError(error); base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { foreach (var s in _q) s.OnCompleted(); base._observer.OnCompleted(); base.Dispose(); } } } class π : Sink>, IObserver { private readonly Window _parent; public π(Window parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private Subject _subject; private RefCountDisposable _refCountDisposable; public IDisposable Run() { _gate = new object(); var groupDisposable = new CompositeDisposable(2); _refCountDisposable = new RefCountDisposable(groupDisposable); CreateWindow(); groupDisposable.Add(_parent._scheduler.SchedulePeriodic(_parent._timeSpan, Tick)); groupDisposable.Add(_parent._source.SubscribeSafe(this)); return _refCountDisposable; } private void Tick() { lock (_gate) { _subject.OnCompleted(); CreateWindow(); } } private void CreateWindow() { _subject = new Subject(); base._observer.OnNext(new WindowObservable(_subject, _refCountDisposable)); } public void OnNext(TSource value) { lock (_gate) { _subject.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { _subject.OnError(error); base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { _subject.OnCompleted(); base._observer.OnCompleted(); base.Dispose(); } } } class μ : Sink>, IObserver { private readonly Window _parent; public μ(Window parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private ISubject _s; private int _n; private int _windowId; private SerialDisposable _timerD; private RefCountDisposable _refCountDisposable; public IDisposable Run() { _gate = new object(); _s = default(ISubject); _n = 0; _windowId = 0; _timerD = new SerialDisposable(); var groupDisposable = new CompositeDisposable(2) { _timerD }; _refCountDisposable = new RefCountDisposable(groupDisposable); _s = new Subject(); base._observer.OnNext(new WindowObservable(_s, _refCountDisposable)); CreateTimer(0); groupDisposable.Add(_parent._source.SubscribeSafe(this)); return _refCountDisposable; } private void CreateTimer(int id) { var m = new SingleAssignmentDisposable(); _timerD.Disposable = m; m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick); } private IDisposable Tick(IScheduler self, int id) { var d = Disposable.Empty; var newId = 0; lock (_gate) { if (id != _windowId) return d; _n = 0; newId = ++_windowId; _s.OnCompleted(); _s = new Subject(); base._observer.OnNext(new WindowObservable(_s, _refCountDisposable)); } CreateTimer(newId); return d; } public void OnNext(TSource value) { var newWindow = false; var newId = 0; lock (_gate) { _s.OnNext(value); _n++; if (_n == _parent._count) { newWindow = true; _n = 0; newId = ++_windowId; _s.OnCompleted(); _s = new Subject(); base._observer.OnNext(new WindowObservable(_s, _refCountDisposable)); } } if (newWindow) CreateTimer(newId); } public void OnError(Exception error) { lock (_gate) { _s.OnError(error); base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { _s.OnCompleted(); base._observer.OnCompleted(); base.Dispose(); } } } } class Window : Producer> { private readonly IObservable _source; private readonly Func> _windowClosingSelector; private readonly IObservable _windowBoundaries; public Window(IObservable source, Func> windowClosingSelector) { _source = source; _windowClosingSelector = windowClosingSelector; } public Window(IObservable source, IObservable windowBoundaries) { _source = source; _windowBoundaries = windowBoundaries; } protected override IDisposable Run(IObserver> observer, IDisposable cancel, Action setSink) { if (_windowClosingSelector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } else { var sink = new β(this, observer, cancel); setSink(sink); return sink.Run(); } } class _ : Sink>, IObserver { private readonly Window _parent; public _(Window parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private ISubject _window; private object _gate; private AsyncLock _windowGate; private SerialDisposable _m; private RefCountDisposable _refCountDisposable; public IDisposable Run() { _window = new Subject(); _gate = new object(); _windowGate = new AsyncLock(); _m = new SerialDisposable(); var groupDisposable = new CompositeDisposable(2) { _m }; _refCountDisposable = new RefCountDisposable(groupDisposable); var window = new WindowObservable(_window, _refCountDisposable); base._observer.OnNext(window); groupDisposable.Add(_parent._source.SubscribeSafe(this)); _windowGate.Wait(CreateWindowClose); return _refCountDisposable; } private void CreateWindowClose() { var windowClose = default(IObservable); try { windowClose = _parent._windowClosingSelector(); } catch (Exception exception) { lock (_gate) { base._observer.OnError(exception); base.Dispose(); } return; } var closingSubscription = new SingleAssignmentDisposable(); _m.Disposable = closingSubscription; closingSubscription.Disposable = windowClose.SubscribeSafe(new ω(this, closingSubscription)); } private void CloseWindow(IDisposable closingSubscription) { closingSubscription.Dispose(); lock (_gate) { _window.OnCompleted(); _window = new Subject(); var window = new WindowObservable(_window, _refCountDisposable); base._observer.OnNext(window); } _windowGate.Wait(CreateWindowClose); } class ω : IObserver { private readonly _ _parent; private readonly IDisposable _self; public ω(_ parent, IDisposable self) { _parent = parent; _self = self; } public void OnNext(TWindowClosing value) { _parent.CloseWindow(_self); } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.CloseWindow(_self); } } public void OnNext(TSource value) { lock (_gate) { _window.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { _window.OnError(error); base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { _window.OnCompleted(); base._observer.OnCompleted(); base.Dispose(); } } } class β : Sink>, IObserver { private readonly Window _parent; public β(Window parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private ISubject _window; private object _gate; private RefCountDisposable _refCountDisposable; public IDisposable Run() { _window = new Subject(); _gate = new object(); var d = new CompositeDisposable(2); _refCountDisposable = new RefCountDisposable(d); var window = new WindowObservable(_window, _refCountDisposable); base._observer.OnNext(window); d.Add(_parent._source.SubscribeSafe(this)); d.Add(_parent._windowBoundaries.SubscribeSafe(new ω(this))); return _refCountDisposable; } class ω : IObserver { private readonly β _parent; public ω(β parent) { _parent = parent; } public void OnNext(TWindowClosing value) { lock (_parent._gate) { _parent._window.OnCompleted(); _parent._window = new Subject(); var window = new WindowObservable(_parent._window, _parent._refCountDisposable); _parent._observer.OnNext(window); } } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(); } } public void OnNext(TSource value) { lock (_gate) { _window.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { _window.OnError(error); base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { _window.OnCompleted(); base._observer.OnCompleted(); base.Dispose(); } } } } class WindowObservable : AddRef { public WindowObservable(IObservable source, RefCountDisposable refCount) : base(source, refCount) { } } } #endif