diff options
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Merge.cs')
-rw-r--r-- | Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Merge.cs | 288 |
1 files changed, 0 insertions, 288 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Merge.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Merge.cs deleted file mode 100644 index b051a4b..0000000 --- a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Merge.cs +++ /dev/null @@ -1,288 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#if !NO_PERF -using System; -using System.Collections.Generic; -using System.Reactive.Disposables; - -namespace System.Reactive.Linq.Observαble -{ - class Merge<TSource> : Producer<TSource> - { - private readonly IObservable<IObservable<TSource>> _sources; - private readonly int _maxConcurrent; - - public Merge(IObservable<IObservable<TSource>> sources) - { - _sources = sources; - } - - public Merge(IObservable<IObservable<TSource>> sources, int maxConcurrent) - { - _sources = sources; - _maxConcurrent = maxConcurrent; - } - - protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) - { - if (_maxConcurrent > 0) - { - var sink = new μ(this, observer, cancel); - setSink(sink); - return sink.Run(); - } - else - { - var sink = new _(this, observer, cancel); - setSink(sink); - return sink.Run(); - } - } - - class _ : Sink<TSource>, IObserver<IObservable<TSource>> - { - private readonly Merge<TSource> _parent; - - public _(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) - : base(observer, cancel) - { - _parent = parent; - } - - private object _gate; - private bool _isStopped; - private CompositeDisposable _group; - private SingleAssignmentDisposable _sourceSubscription; - - public IDisposable Run() - { - _gate = new object(); - _isStopped = false; - _group = new CompositeDisposable(); - - _sourceSubscription = new SingleAssignmentDisposable(); - _group.Add(_sourceSubscription); - _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this); - - return _group; - } - - public void OnNext(IObservable<TSource> value) - { - var innerSubscription = new SingleAssignmentDisposable(); - _group.Add(innerSubscription); - innerSubscription.Disposable = value.SubscribeSafe(new ι(this, innerSubscription)); - } - - public void OnError(Exception error) - { - lock (_gate) - { - base._observer.OnError(error); - base.Dispose(); - } - } - - public void OnCompleted() - { - _isStopped = true; - if (_group.Count == 1) - { - // - // Notice there can be a race between OnCompleted of the source and any - // of the inner sequences, where both see _group.Count == 1, and one is - // waiting for the lock. There won't be a double OnCompleted observation - // though, because the call to Dispose silences the observer by swapping - // in a NopObserver<T>. - // - lock (_gate) - { - base._observer.OnCompleted(); - base.Dispose(); - } - } - else - { - _sourceSubscription.Dispose(); - } - } - - class ι : IObserver<TSource> - { - private readonly _ _parent; - private readonly IDisposable _self; - - public ι(_ parent, IDisposable self) - { - _parent = parent; - _self = self; - } - - public void OnNext(TSource value) - { - lock (_parent._gate) - _parent._observer.OnNext(value); - } - - public void OnError(Exception error) - { - lock (_parent._gate) - { - _parent._observer.OnError(error); - _parent.Dispose(); - } - } - - public void OnCompleted() - { - _parent._group.Remove(_self); - if (_parent._isStopped && _parent._group.Count == 1) - { - // - // Notice there can be a race between OnCompleted of the source and any - // of the inner sequences, where both see _group.Count == 1, and one is - // waiting for the lock. There won't be a double OnCompleted observation - // though, because the call to Dispose silences the observer by swapping - // in a NopObserver<T>. - // - lock (_parent._gate) - { - _parent._observer.OnCompleted(); - _parent.Dispose(); - } - } - } - } - } - - class μ : Sink<TSource>, IObserver<IObservable<TSource>> - { - private readonly Merge<TSource> _parent; - - public μ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) - : base(observer, cancel) - { - _parent = parent; - } - - private object _gate; - private Queue<IObservable<TSource>> _q; - private bool _isStopped; - private SingleAssignmentDisposable _sourceSubscription; - private CompositeDisposable _group; - private int _activeCount = 0; - - public IDisposable Run() - { - _gate = new object(); - _q = new Queue<IObservable<TSource>>(); - _isStopped = false; - _activeCount = 0; - - _group = new CompositeDisposable(); - _sourceSubscription = new SingleAssignmentDisposable(); - _sourceSubscription.Disposable = _parent._sources.SubscribeSafe(this); - _group.Add(_sourceSubscription); - - return _group; - } - - public void OnNext(IObservable<TSource> value) - { - lock (_gate) - { - if (_activeCount < _parent._maxConcurrent) - { - _activeCount++; - Subscribe(value); - } - else - _q.Enqueue(value); - } - } - - public void OnError(Exception error) - { - lock (_gate) - { - base._observer.OnError(error); - base.Dispose(); - } - } - - public void OnCompleted() - { - lock (_gate) - { - _isStopped = true; - if (_activeCount == 0) - { - base._observer.OnCompleted(); - base.Dispose(); - } - else - { - _sourceSubscription.Dispose(); - } - } - } - - private void Subscribe(IObservable<TSource> innerSource) - { - var subscription = new SingleAssignmentDisposable(); - _group.Add(subscription); - subscription.Disposable = innerSource.SubscribeSafe(new ι(this, subscription)); - } - - class ι : IObserver<TSource> - { - private readonly μ _parent; - private readonly IDisposable _self; - - public ι(μ parent, IDisposable self) - { - _parent = parent; - _self = self; - } - - public void OnNext(TSource value) - { - lock (_parent._gate) - _parent._observer.OnNext(value); - } - - public void OnError(Exception error) - { - lock (_parent._gate) - { - _parent._observer.OnError(error); - _parent.Dispose(); - } - } - - public void OnCompleted() - { - _parent._group.Remove(_self); - lock (_parent._gate) - { - if (_parent._q.Count > 0) - { - var s = _parent._q.Dequeue(); - _parent.Subscribe(s); - } - else - { - _parent._activeCount--; - if (_parent._isStopped && _parent._activeCount == 0) - { - _parent._observer.OnCompleted(); - _parent.Dispose(); - } - } - } - } - } - } - } -} -#endif
\ No newline at end of file |