// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.Observαble { class TakeUntil : Producer { private readonly IObservable _source; private readonly IObservable _other; public TakeUntil(IObservable source, IObservable other) { _source = source; _other = other; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } class _ : Sink { private readonly TakeUntil _parent; public _(TakeUntil parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public IDisposable Run() { var sourceObserver = new T(this); var otherObserver = new O(this, sourceObserver); // COMPAT - Order of Subscribe calls per v1.0.10621 var otherSubscription = _parent._other.SubscribeSafe(otherObserver); otherObserver.Disposable = otherSubscription; var sourceSubscription = _parent._source.SubscribeSafe(sourceObserver); return new CompositeDisposable( otherSubscription, sourceSubscription ); } /* * We tried a more fine-grained synchronization scheme to make TakeUntil more efficient, but * this requires several CAS instructions, which quickly add up to being non-beneficial. * * Notice an approach where the "other" channel performs an Interlocked.Exchange operation on * the _parent._observer field to substitute it with a NopObserver doesn't work, * because the "other" channel still need to send an OnCompleted message, which could happen * concurrently with another message when the "source" channel has already read from the * _parent._observer field between making the On* call. * * Fixing this issue requires an ownership transfer mechanism for channels to get exclusive * access to the outgoing observer while dispatching a message. Doing this more fine-grained * than using locks turns out to be tricky and doesn't reduce cost. */ class T : IObserver { private readonly _ _parent; public volatile bool _open; public T(_ parent) { _parent = parent; _open = false; } public void OnNext(TSource value) { if (_open) { _parent._observer.OnNext(value); } else { lock (_parent) { _parent._observer.OnNext(value); } } } public void OnError(Exception error) { lock (_parent) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } class O : IObserver { private readonly _ _parent; private readonly T _sourceObserver; private readonly SingleAssignmentDisposable _subscription; public O(_ parent, T sourceObserver) { _parent = parent; _sourceObserver = sourceObserver; _subscription = new SingleAssignmentDisposable(); } public IDisposable Disposable { set { _subscription.Disposable = value; } } public void OnNext(TOther value) { lock (_parent) { _parent._observer.OnCompleted(); _parent.Dispose(); } } public void OnError(Exception error) { lock (_parent) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent) { _sourceObserver._open = true; _subscription.Dispose(); } } } } } class TakeUntil : Producer { private readonly IObservable _source; private readonly DateTimeOffset _endTime; internal readonly IScheduler _scheduler; public TakeUntil(IObservable source, DateTimeOffset endTime, IScheduler scheduler) { _source = source; _endTime = endTime; _scheduler = scheduler; } public IObservable Ω(DateTimeOffset endTime) { // // Minimum semantics: // // t 0--1--2--3--4--5--6--7-> t 0--1--2--3--4--5--6--7-> // // xs --o--o--o--o--o--o--| xs --o--o--o--o--o--o--| // xs.TU(5AM) --o--o--o--o--o| xs.TU(3AM) --o--o--o| // xs.TU(5AM).TU(3AM) --o--o--o| xs.TU(3AM).TU(5AM) --o--o--o| // if (_endTime <= endTime) return this; else return new TakeUntil(_source, endTime, _scheduler); } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } class _ : Sink, IObserver { private readonly TakeUntil _parent; public _(TakeUntil parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; public IDisposable Run() { _gate = new object(); var t = _parent._scheduler.Schedule(_parent._endTime, Tick); var d = _parent._source.SubscribeSafe(this); return new CompositeDisposable(t, d); } private void Tick() { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } public void OnNext(TSource value) { lock (_gate) { base._observer.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } } #endif