diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable')
98 files changed, 1213 insertions, 417 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs index 3fc6962..46e66c6 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class AddRef<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs index d1b6ef8..6ae63c0 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Aggregate<TSource, TAccumulate, TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs index 09c825d..49a0933 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class All<TSource> : Producer<bool> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs index 7272cee..ea110c5 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Amb<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs index 1414bdd..c8eb37a 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Any<TSource> : Producer<bool> { @@ -25,7 +25,7 @@ namespace System.Reactive.Linq.Observαble { if (_predicate != null) { - var sink = new π(this, observer, cancel); + var sink = new AnyImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -65,11 +65,11 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<bool>, IObserver<TSource> + class AnyImpl : Sink<bool>, IObserver<TSource> { private readonly Any<TSource> _parent; - public π(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel) + public AnyImpl(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs index 09d5f14..c4e9d2f 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class AsObservable<TSource> : Producer<TSource>, IEvaluatableObservable<TSource> { @@ -14,7 +14,7 @@ namespace System.Reactive.Linq.Observαble _source = source; } - public IObservable<TSource> Ω() + public IObservable<TSource> Omega() { return this; } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs index fb2d6b7..897da02 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class AverageDouble : Producer<double> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs index a82b550..ce6ced8 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs @@ -8,7 +8,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Buffer<TSource> : Producer<IList<TSource>> { @@ -53,7 +53,7 @@ namespace System.Reactive.Linq.Observαble } else if (_count > 0) { - var sink = new μ(this, observer, cancel); + var sink = new Impl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -61,13 +61,13 @@ namespace System.Reactive.Linq.Observαble { if (_timeSpan == _timeShift) { - var sink = new π(this, observer, cancel); + var sink = new BufferTimeShift(this, observer, cancel); setSink(sink); return sink.Run(); } else { - var sink = new τ(this, observer, cancel); + var sink = new BufferImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -143,11 +143,11 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<IList<TSource>>, IObserver<TSource> + class BufferImpl : Sink<IList<TSource>>, IObserver<TSource> { private readonly Buffer<TSource> _parent; - public τ(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) + public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -283,11 +283,11 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<IList<TSource>>, IObserver<TSource> + class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource> { private readonly Buffer<TSource> _parent; - public π(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) + public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -346,11 +346,11 @@ namespace System.Reactive.Linq.Observαble } } - class μ : Sink<IList<TSource>>, IObserver<TSource> + class Impl : Sink<IList<TSource>>, IObserver<TSource> { private readonly Buffer<TSource> _parent; - public μ(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) + public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -487,7 +487,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new β(this, observer, cancel); + var sink = new Beta(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -544,7 +544,7 @@ namespace System.Reactive.Linq.Observαble var closingSubscription = new SingleAssignmentDisposable(); _m.Disposable = closingSubscription; - closingSubscription.Disposable = bufferClose.SubscribeSafe(new ω(this, closingSubscription)); + closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription)); } private void CloseBuffer(IDisposable closingSubscription) @@ -561,12 +561,12 @@ namespace System.Reactive.Linq.Observαble _bufferGate.Wait(CreateBufferClose); } - class ω : IObserver<TBufferClosing> + class Omega : IObserver<TBufferClosing> { private readonly _ _parent; private readonly IDisposable _self; - public ω(_ parent, IDisposable self) + public Omega(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -617,11 +617,11 @@ namespace System.Reactive.Linq.Observαble } } - class β : Sink<IList<TSource>>, IObserver<TSource> + class Beta : Sink<IList<TSource>>, IObserver<TSource> { private readonly Buffer<TSource, TBufferClosing> _parent; - public β(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel) + public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -641,16 +641,16 @@ namespace System.Reactive.Linq.Observαble _refCountDisposable = new RefCountDisposable(d); d.Add(_parent._source.SubscribeSafe(this)); - d.Add(_parent._bufferBoundaries.SubscribeSafe(new ω(this))); + d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this))); return _refCountDisposable; } - class ω : IObserver<TBufferClosing> + class Omega : IObserver<TBufferClosing> { - private readonly β _parent; + private readonly Beta _parent; - public ω(β parent) + public Omega(Beta parent) { _parent = parent; } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs index b104050..b3ba37c 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Case<TValue, TResult> : Producer<TResult>, IEvaluatableObservable<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs index 48820b3..b123357 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs @@ -3,9 +3,9 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { - class Cast<TSource, TResult> : Producer<TResult> /* Could optimize further by deriving from Select<TResult> and providing Ω<TResult2>. We're not doing this (yet) for debuggability. */ + class Cast<TSource, TResult> : Producer<TResult> /* Could optimize further by deriving from Select<TResult> and providing Omega<TResult2>. We're not doing this (yet) for debuggability. */ { private readonly IObservable<TSource> _source; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs index 71e0037..06bf911 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Catch<TSource> : Producer<TSource> { @@ -136,7 +136,7 @@ namespace System.Reactive.Linq.Observαble var d = new SingleAssignmentDisposable(); _subscription.Disposable = d; - d.Disposable = result.SubscribeSafe(new ε(this)); + d.Disposable = result.SubscribeSafe(new Impl(this)); } else { @@ -151,11 +151,11 @@ namespace System.Reactive.Linq.Observαble base.Dispose(); } - class ε : IObserver<TSource> + class Impl : IObserver<TSource> { private readonly _ _parent; - public ε(_ parent) + public Impl(_ parent) { _parent = parent; } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs index 2cfd240..8f0bf24 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs @@ -7,7 +7,7 @@ using System.Reactive; using System.Reactive.Threading; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Collect<TSource, TResult> : PushToPullAdapter<TSource, TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs index 2188307..9748e24 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs @@ -7,7 +7,7 @@ using System.Collections.ObjectModel; using System.Linq; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { #region Binary diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs index 5a9b2e4..cd8c78a 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Concat<TSource> : Producer<TSource>, IConcatenatable<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs index b26e2d4..69e30dc 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Contains<TSource> : Producer<bool> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs index 7432816..0c2f389 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Count<TSource> : Producer<int> { @@ -31,7 +31,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new π(this, observer, cancel); + var sink = new CountImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -77,12 +77,12 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<int>, IObserver<TSource> + class CountImpl : Sink<int>, IObserver<TSource> { private readonly Count<TSource> _parent; private int _count; - public π(Count<TSource> parent, IObserver<int> observer, IDisposable cancel) + public CountImpl(Count<TSource> parent, IObserver<int> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs index ed4bb89..551e536 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class DefaultIfEmpty<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs index ba3dcc4..aa49c12 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Defer<TValue> : Producer<TValue>, IEvaluatableObservable<TValue> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs index 3e2cbba..5ea3d25 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs @@ -10,7 +10,7 @@ using System.Threading; using System.Reactive.Threading; #endif -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Delay<TSource> : Producer<TSource> { @@ -37,7 +37,7 @@ namespace System.Reactive.Linq.Observαble { if (_scheduler.AsLongRunning() != null) { - var sink = new λ(this, observer, cancel); + var sink = new LongRunningImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -227,9 +227,9 @@ namespace System.Reactive.Linq.Observαble // implementation, the loop below kept running while there was work for immediate dispatch, // potentially causing a long running work item on the target scheduler. With the addition // of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this - // interface and perform different processing (see λ). To reduce the code churn in the old - // loop code here, we set the shouldYield flag to true after the first dispatch iteration, - // in order to break from the loop and enter the recursive scheduling path. + // interface and perform different processing (see LongRunningImpl). To reduce the code + // churn in the old loop code here, we set the shouldYield flag to true after the first + // dispatch iteration, in order to break from the loop and enter the recursive scheduling path. // var shouldYield = false; @@ -322,11 +322,11 @@ namespace System.Reactive.Linq.Observαble } } - class λ : Sink<TSource>, IObserver<TSource> + class LongRunningImpl : Sink<TSource>, IObserver<TSource> { private readonly Delay<TSource> _parent; - public λ(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -629,7 +629,7 @@ namespace System.Reactive.Linq.Observαble } else { - _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new σ(this)); + _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this)); } return new CompositeDisposable(_subscription, _delays); @@ -660,7 +660,7 @@ namespace System.Reactive.Linq.Observαble var d = new SingleAssignmentDisposable(); _delays.Add(d); - d.Disposable = delay.SubscribeSafe(new δ(this, value, d)); + d.Disposable = delay.SubscribeSafe(new Delta(this, value, d)); } public void OnError(Exception error) @@ -692,11 +692,11 @@ namespace System.Reactive.Linq.Observαble } } - class σ : IObserver<TDelay> + class SubscriptionDelay : IObserver<TDelay> { private readonly _ _parent; - public σ(_ parent) + public SubscriptionDelay(_ parent) { _parent = parent; } @@ -718,13 +718,13 @@ namespace System.Reactive.Linq.Observαble } } - class δ : IObserver<TDelay> + class Delta : IObserver<TDelay> { private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; - public δ(_ parent, TSource value, IDisposable self) + public Delta(_ parent, TSource value, IDisposable self) { _parent = parent; _value = value; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs index c6a05cc..f5a742a 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Concurrency; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class DelaySubscription<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs index 112b156..590ef3b 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Dematerialize<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs index af20277..bd4b0ca 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Distinct<TSource, TKey> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs index 10c7923..1ba8f5f 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class DistinctUntilChanged<TSource, TKey> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs index 2f73cfa..a4b3747 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Do<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs index 3ef40eb..5dc0bc6 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class DoWhile<TSource> : Producer<TSource>, IConcatenatable<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs index b2b1212..0798d3b 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ElementAt<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs index 42c7241..30a2b65 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Concurrency; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Empty<TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs index ca9a5d8..a70ac94 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Finally<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs index 8d8bba7..62da120 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class FirstAsync<TSource> : Producer<TSource> { @@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble { if (_predicate != null) { - var sink = new π(this, observer, cancel); + var sink = new FirstAsyncImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -73,11 +73,11 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<TSource>, IObserver<TSource> + class FirstAsyncImpl : Sink<TSource>, IObserver<TSource> { private readonly FirstAsync<TSource> _parent; - public π(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public FirstAsyncImpl(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs index 85b9e03..948f319 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class For<TSource, TResult> : Producer<TResult>, IConcatenatable<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs index 38e112d..9b5c024 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs @@ -4,7 +4,7 @@ using System; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ForEach<TSource> { @@ -62,7 +62,7 @@ namespace System.Reactive.Linq.Observαble } } - public class τ : IObserver<TSource> + public class ForEachImpl : IObserver<TSource> { private readonly Action<TSource, int> _onNext; private readonly Action _done; @@ -71,7 +71,7 @@ namespace System.Reactive.Linq.Observαble private Exception _exception; private int _stopped; - public τ(Action<TSource, int> onNext, Action done) + public ForEachImpl(Action<TSource, int> onNext, Action done) { _onNext = onNext; _done = done; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs index 82c4348..90aa779 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs @@ -156,7 +156,7 @@ using System.Reactive.Subjects; // subject (which is opaque to the event producer). Not to mention that the subject would always be // rooted by the target event (even when the FromEvent[Pattern] observable wrapper is unreachable). // -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class FromEvent<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, TEventArgs> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs index 76da050..ca98290 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs @@ -12,23 +12,23 @@ using System.Threading; // // See FromEvent.cs for more information. // -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class FromEventPattern { - public class τ<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TEventArgs>> + public class Impl<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TEventArgs>> #if !NO_EVENTARGS_CONSTRAINT where TEventArgs : EventArgs #endif { private readonly Func<EventHandler<TEventArgs>, TDelegate> _conversion; - public τ(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler) + public Impl(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler) : base(addHandler, removeHandler, scheduler) { } - public τ(Func<EventHandler<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler) + public Impl(Func<EventHandler<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler) : base(addHandler, removeHandler, scheduler) { _conversion = conversion; @@ -52,12 +52,12 @@ namespace System.Reactive.Linq.Observαble } } - public class τ<TDelegate, TSender, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TSender, TEventArgs>> + public class Impl<TDelegate, TSender, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TSender, TEventArgs>> #if !NO_EVENTARGS_CONSTRAINT where TEventArgs : EventArgs #endif { - public τ(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler) + public Impl(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler) : base(addHandler, removeHandler, scheduler) { } @@ -69,7 +69,7 @@ namespace System.Reactive.Linq.Observαble } } - public class ρ<TSender, TEventArgs, TResult> : EventProducer<Delegate, TResult> + public class Handler<TSender, TEventArgs, TResult> : EventProducer<Delegate, TResult> { private readonly object _target; private readonly Type _delegateType; @@ -80,7 +80,7 @@ namespace System.Reactive.Linq.Observαble private readonly bool _isWinRT; #endif - public ρ(object target, Type delegateType, MethodInfo addMethod, MethodInfo removeMethod, Func<TSender, TEventArgs, TResult> getResult, bool isWinRT, IScheduler scheduler) + public Handler(object target, Type delegateType, MethodInfo addMethod, MethodInfo removeMethod, Func<TSender, TEventArgs, TResult> getResult, bool isWinRT, IScheduler scheduler) : base(scheduler) { #if HAS_WINRT diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs index 89ef862..7e46655 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs @@ -7,7 +7,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Generate<TState, TResult> : Producer<TResult> { @@ -52,13 +52,13 @@ namespace System.Reactive.Linq.Observαble { if (_timeSelectorA != null) { - var sink = new α(this, observer, cancel); + var sink = new SelectorA(this, observer, cancel); setSink(sink); return sink.Run(); } else if (_timeSelectorR != null) { - var sink = new δ(this, observer, cancel); + var sink = new Delta(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -70,11 +70,11 @@ namespace System.Reactive.Linq.Observαble } } - class α : Sink<TResult> + class SelectorA : Sink<TResult> { private readonly Generate<TState, TResult> _parent; - public α(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -130,11 +130,11 @@ namespace System.Reactive.Linq.Observαble } } - class δ : Sink<TResult> + class Delta : Sink<TResult> { private readonly Generate<TState, TResult> _parent; - public δ(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs index 05bf20b..a25efa8 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs @@ -8,7 +8,7 @@ using System.Diagnostics; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs index 1e64100..4f91030 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs @@ -7,20 +7,22 @@ using System.Linq; using System.Reactive.Disposables; using System.Reactive.Subjects; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class GroupBy<TSource, TKey, TElement> : Producer<IGroupedObservable<TKey, TElement>> { private readonly IObservable<TSource> _source; private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; + private readonly int? _capacity; private readonly IEqualityComparer<TKey> _comparer; - public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer) + public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer) { _source = source; _keySelector = keySelector; _elementSelector = elementSelector; + _capacity = capacity; _comparer = comparer; } @@ -49,7 +51,15 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer); + + if (_parent._capacity.HasValue) + { + _map = new Dictionary<TKey, ISubject<TElement>>(_parent._capacity.Value, _parent._comparer); + } + else + { + _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer); + } } public void OnNext(TSource value) diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs index 7d91f73..9baf146 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs @@ -1,13 +1,13 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - #if !NO_PERF +#if !NO_PERF using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Subjects; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>> { @@ -15,14 +15,16 @@ namespace System.Reactive.Linq.Observαble private readonly Func<TSource, TKey> _keySelector; private readonly Func<TSource, TElement> _elementSelector; private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector; + private readonly int? _capacity; private readonly IEqualityComparer<TKey> _comparer; - public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer) + public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer) { _source = source; _keySelector = keySelector; _elementSelector = elementSelector; _durationSelector = durationSelector; + _capacity = capacity; _comparer = comparer; } @@ -52,7 +54,7 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _map = new Map<TKey, ISubject<TElement>>(_parent._comparer); + _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer); _nullGate = new object(); } @@ -127,7 +129,7 @@ namespace System.Reactive.Linq.Observαble var md = new SingleAssignmentDisposable(); _parent._groupDisposable.Add(md); - md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md)); + md.Disposable = duration.SubscribeSafe(new Delta(this, key, writer, md)); } var element = default(TElement); @@ -163,14 +165,14 @@ namespace System.Reactive.Linq.Observαble writer.OnNext(element); } - class δ : IObserver<TDuration> + class Delta : IObserver<TDuration> { private readonly _ _parent; private readonly TKey _key; private readonly ISubject<TElement> _writer; private readonly IDisposable _self; - public δ(_ parent, TKey key, ISubject<TElement> writer, IDisposable self) + public Delta(_ parent, TKey key, ISubject<TElement> writer, IDisposable self) { _parent = parent; _key = key; @@ -272,11 +274,38 @@ namespace System.Reactive.Linq.Observαble #if !NO_CDS class Map<TKey, TValue> { +#if !NO_CDS_COLLECTIONS + // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs + + // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the + // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference + // and blocking, but also the more expensive operations that require all locks become (e.g. table + // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good + // compromise. + private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4; + + private static int DefaultConcurrencyLevel + { + get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; } + } +#endif + private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map; - public Map(IEqualityComparer<TKey> comparer) + public Map(int? capacity, IEqualityComparer<TKey> comparer) { - _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer); + if (capacity.HasValue) + { +#if NO_CDS_COLLECTIONS + _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer); +#else + _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer); +#endif + } + else + { + _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer); + } } public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added) @@ -327,9 +356,16 @@ namespace System.Reactive.Linq.Observαble { private readonly Dictionary<TKey, TValue> _map; - public Map(IEqualityComparer<TKey> comparer) + public Map(int? capacity, IEqualityComparer<TKey> comparer) { - _map = new Dictionary<TKey, TValue>(comparer); + if (capacity.HasValue) + { + _map = new Dictionary<TKey, TValue>(capacity.Value, comparer); + } + else + { + _map = new Dictionary<TKey, TValue>(comparer); + } } public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added) diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs index 66e49be..19c8c45 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult> { @@ -68,18 +68,18 @@ namespace System.Reactive.Linq.Observαble _rightID = 0; _rightMap = new Dictionary<int, TRight>(); - leftSubscription.Disposable = _parent._left.SubscribeSafe(new λ(this, leftSubscription)); - rightSubscription.Disposable = _parent._right.SubscribeSafe(new ρ(this, rightSubscription)); + leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription)); + rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription)); return _refCount; } - class λ : IObserver<TLeft> + class LeftObserver : IObserver<TLeft> { private readonly _ _parent; private readonly IDisposable _self; - public λ(_ parent, IDisposable self) + public LeftObserver(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -122,7 +122,7 @@ namespace System.Reactive.Linq.Observαble } // BREAKING CHANGE v2 > v1.x - The duration sequence is subscribed to before the result sequence is evaluated. - md.Disposable = duration.SubscribeSafe(new δ(this, id, s, md)); + md.Disposable = duration.SubscribeSafe(new Delta(this, id, s, md)); var result = default(TResult); try @@ -146,14 +146,14 @@ namespace System.Reactive.Linq.Observαble } } - class δ : IObserver<TLeftDuration> + class Delta : IObserver<TLeftDuration> { - private readonly λ _parent; + private readonly LeftObserver _parent; private readonly int _id; private readonly IObserver<TRight> _group; private readonly IDisposable _self; - public δ(λ parent, int id, IObserver<TRight> group, IDisposable self) + public Delta(LeftObserver parent, int id, IObserver<TRight> group, IDisposable self) { _parent = parent; _id = id; @@ -201,12 +201,12 @@ namespace System.Reactive.Linq.Observαble } } - class ρ : IObserver<TRight> + class RightObserver : IObserver<TRight> { private readonly _ _parent; private readonly IDisposable _self; - public ρ(_ parent, IDisposable self) + public RightObserver(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -242,7 +242,7 @@ namespace System.Reactive.Linq.Observαble OnError(exception); return; } - md.Disposable = duration.SubscribeSafe(new δ(this, id, md)); + md.Disposable = duration.SubscribeSafe(new Delta(this, id, md)); lock (_parent._gate) { @@ -251,13 +251,13 @@ namespace System.Reactive.Linq.Observαble } } - class δ : IObserver<TRightDuration> + class Delta : IObserver<TRightDuration> { - private readonly ρ _parent; + private readonly RightObserver _parent; private readonly int _id; private readonly IDisposable _self; - public δ(ρ parent, int id, IDisposable self) + public Delta(RightObserver parent, int id, IDisposable self) { _parent = parent; _id = id; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs index 8b1804a..8740c32 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class If<TResult> : Producer<TResult>, IEvaluatableObservable<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs index 65441bd..1f932ac 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class IgnoreElements<TSource> : Producer<TSource> { @@ -14,7 +14,7 @@ namespace System.Reactive.Linq.Observαble _source = source; } - public IObservable<TSource> Ω() + public IObservable<TSource> Omega() { return this; } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs index 674c6f8..722750e 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class IsEmpty<TSource> : Producer<bool> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs index 13e6ad2..4d22d34 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult> { @@ -69,18 +69,18 @@ namespace System.Reactive.Linq.Observαble _rightID = 0; _rightMap = new Dictionary<int, TRight>(); - leftSubscription.Disposable = _parent._left.SubscribeSafe(new λ(this, leftSubscription)); - rightSubscription.Disposable = _parent._right.SubscribeSafe(new ρ(this, rightSubscription)); + leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription)); + rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription)); return _group; } - class λ : IObserver<TLeft> + class LeftObserver : IObserver<TLeft> { private readonly _ _parent; private readonly IDisposable _self; - public λ(_ parent, IDisposable self) + public LeftObserver(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -124,7 +124,7 @@ namespace System.Reactive.Linq.Observαble return; } - md.Disposable = duration.SubscribeSafe(new δ(this, id, md)); + md.Disposable = duration.SubscribeSafe(new Delta(this, id, md)); lock (_parent._gate) { @@ -147,13 +147,13 @@ namespace System.Reactive.Linq.Observαble } } - class δ : IObserver<TLeftDuration> + class Delta : IObserver<TLeftDuration> { - private readonly λ _parent; + private readonly LeftObserver _parent; private readonly int _id; private readonly IDisposable _self; - public δ(λ parent, int id, IDisposable self) + public Delta(LeftObserver parent, int id, IDisposable self) { _parent = parent; _id = id; @@ -203,12 +203,12 @@ namespace System.Reactive.Linq.Observαble } } - class ρ : IObserver<TRight> + class RightObserver : IObserver<TRight> { private readonly _ _parent; private readonly IDisposable _self; - public ρ(_ parent, IDisposable self) + public RightObserver(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -252,7 +252,7 @@ namespace System.Reactive.Linq.Observαble return; } - md.Disposable = duration.SubscribeSafe(new δ(this, id, md)); + md.Disposable = duration.SubscribeSafe(new Delta(this, id, md)); lock (_parent._gate) { @@ -275,13 +275,13 @@ namespace System.Reactive.Linq.Observαble } } - class δ : IObserver<TRightDuration> + class Delta : IObserver<TRightDuration> { - private readonly ρ _parent; + private readonly RightObserver _parent; private readonly int _id; private readonly IDisposable _self; - public δ(ρ parent, int id, IDisposable self) + public Delta(RightObserver parent, int id, IDisposable self) { _parent = parent; _id = id; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs index 1ecd930..0a77d32 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class LastAsync<TSource> : Producer<TSource> { @@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble { if (_predicate != null) { - var sink = new π(this, observer, cancel); + var sink = new LastAsyncImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -77,13 +77,13 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<TSource>, IObserver<TSource> + class LastAsyncImpl : Sink<TSource>, IObserver<TSource> { private readonly LastAsync<TSource> _parent; private TSource _value; private bool _seenValue; - public π(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public LastAsyncImpl(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs index 2699797..bc92e6d 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Threading; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Latest<TSource> : PushToPullAdapter<TSource, TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs index 0108e18..63c9f24 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class LongCount<TSource> : Producer<long> { @@ -31,7 +31,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new π(this, observer, cancel); + var sink = new LongCountImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -77,12 +77,12 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<long>, IObserver<TSource> + class LongCountImpl : Sink<long>, IObserver<TSource> { private readonly LongCount<TSource> _parent; private long _count; - public π(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel) + public LongCountImpl(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs index 102267d..fa8e1ee 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Materialize<TSource> : Producer<Notification<TSource>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs index da339da..8e86e75 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Max<TSource> : Producer<TSource> { @@ -28,19 +28,19 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new δ(this, observer, cancel); + var sink = new Delta(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } - class δ : Sink<TSource>, IObserver<TSource> + class Delta : Sink<TSource>, IObserver<TSource> { private readonly Max<TSource> _parent; private bool _hasValue; private TSource _lastValue; - public δ(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs index 52a9a42..4bf97f5 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class MaxBy<TSource, TKey> : Producer<IList<TSource>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs index a27c3c3..dd710cb 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs @@ -10,7 +10,7 @@ using System.Threading; using System.Threading.Tasks; #endif -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Merge<TSource> : Producer<TSource> { @@ -41,14 +41,14 @@ namespace System.Reactive.Linq.Observαble { if (_maxConcurrent > 0) { - var sink = new μ(this, observer, cancel); + var sink = new MergeConcurrent(this, observer, cancel); setSink(sink); return sink.Run(); } #if !NO_TPL else if (_sourcesT != null) { - var sink = new τ(this, observer, cancel); + var sink = new MergeImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -93,7 +93,7 @@ namespace System.Reactive.Linq.Observαble { var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = value.SubscribeSafe(new ι(this, innerSubscription)); + innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription)); } public void OnError(Exception error) @@ -129,12 +129,12 @@ namespace System.Reactive.Linq.Observαble } } - class ι : IObserver<TSource> + class Iter : IObserver<TSource> { private readonly _ _parent; private readonly IDisposable _self; - public ι(_ parent, IDisposable self) + public Iter(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -177,11 +177,11 @@ namespace System.Reactive.Linq.Observαble } } - class μ : Sink<TSource>, IObserver<IObservable<TSource>> + class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>> { private readonly Merge<TSource> _parent; - public μ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -253,15 +253,15 @@ namespace System.Reactive.Linq.Observαble { var subscription = new SingleAssignmentDisposable(); _group.Add(subscription); - subscription.Disposable = innerSource.SubscribeSafe(new ι(this, subscription)); + subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription)); } - class ι : IObserver<TSource> + class Iter : IObserver<TSource> { - private readonly μ _parent; + private readonly MergeConcurrent _parent; private readonly IDisposable _self; - public ι(μ parent, IDisposable self) + public Iter(MergeConcurrent parent, IDisposable self) { _parent = parent; _self = self; @@ -308,11 +308,11 @@ namespace System.Reactive.Linq.Observαble #if !NO_TPL #pragma warning disable 0420 - class τ : Sink<TSource>, IObserver<Task<TSource>> + class MergeImpl : Sink<TSource>, IObserver<Task<TSource>> { private readonly Merge<TSource> _parent; - public τ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs index f7b6197..fc08002 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Min<TSource> : Producer<TSource> { @@ -28,19 +28,19 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new δ(this, observer, cancel); + var sink = new Delta(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } - class δ : Sink<TSource>, IObserver<TSource> + class Delta : Sink<TSource>, IObserver<TSource> { private readonly Min<TSource> _parent; private bool _hasValue; private TSource _lastValue; - public δ(Min<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public Delta(Min<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs index a32bf92..d9c93f6 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class MinBy<TSource, TKey> : Producer<IList<TSource>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs index 937d811..7311a33 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class MostRecent<TSource> : PushToPullAdapter<TSource, TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs index 4a77300..ee163a5 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Disposables; using System.Reactive.Subjects; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Multicast<TSource, TIntermediate, TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs index a9adc52..eae632f 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Never<TResult> : IObservable<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs index 2d4ec45..4b20276 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Threading; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Next<TSource> : PushToPullAdapter<TSource, TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs index d5d7428..eaf7613 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ObserveOn<TSource> : Producer<TSource> { @@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble #if !NO_SYNCCTX if (_context != null) { - var sink = new ς(this, observer, cancel); + var sink = new ObserveOnImpl(this, observer, cancel); setSink(sink); return _source.Subscribe(sink); } @@ -47,11 +47,11 @@ namespace System.Reactive.Linq.Observαble } #if !NO_SYNCCTX - class ς : Sink<TSource>, IObserver<TSource> + class ObserveOnImpl : Sink<TSource>, IObserver<TSource> { private readonly ObserveOn<TSource> _parent; - public ς(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public ObserveOnImpl(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs index 07e4515..27a0ca3 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class OfType<TSource, TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs index 26dfd34..077a23f 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class OnErrorResumeNext<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs index 916d12b..6351365 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs @@ -5,7 +5,7 @@ using System.Collections; using System.Collections.Generic; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { abstract class PushToPullAdapter<TSource, TResult> : IEnumerable<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs index c58d94c..414ae72 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Range : Producer<int> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs index 1762807..4efb917 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Disposables; using System.Reactive.Subjects; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class RefCount<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs index fab0607..4f418cb 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Repeat<TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs index cd9ad02..db98f88 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Concurrency; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Return<TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs index 0d46c62..6d0e68b 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Sample<TSource, TSample> : Producer<TSource> { @@ -51,7 +51,7 @@ namespace System.Reactive.Linq.Observαble _sourceSubscription = sourceSubscription; sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); - var samplerSubscription = _parent._sampler.SubscribeSafe(new σ(this)); + var samplerSubscription = _parent._sampler.SubscribeSafe(new SampleImpl(this)); return new CompositeDisposable(_sourceSubscription, samplerSubscription); } @@ -83,11 +83,11 @@ namespace System.Reactive.Linq.Observαble } } - class σ : IObserver<TSample> + class SampleImpl : IObserver<TSample> { private readonly _ _parent; - public σ(_ parent) + public SampleImpl(_ parent) { _parent = parent; } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs index 6c8f2fe..421177f 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Scan<TSource, TAccumulate> : Producer<TAccumulate> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs index e853210..330fe3a 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs @@ -3,11 +3,11 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { abstract class Select<TResult> : Producer<TResult> { - public abstract IObservable<TResult2> Ω<TResult2>(Func<TResult, TResult2> selector); + public abstract IObservable<TResult2> Omega<TResult2>(Func<TResult, TResult2> selector); } class Select<TSource, TResult> : Select<TResult> @@ -28,7 +28,7 @@ namespace System.Reactive.Linq.Observαble _selectorI = selector; } - public override IObservable<TResult2> Ω<TResult2>(Func<TResult, TResult2> selector) + public override IObservable<TResult2> Omega<TResult2>(Func<TResult, TResult2> selector) { if (_selector != null) return new Select<TSource, TResult2>(_source, x => selector(_selector(x))); @@ -46,7 +46,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new SelectImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -92,12 +92,12 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TResult>, IObserver<TSource> + class SelectImpl : Sink<TResult>, IObserver<TSource> { private readonly Select<TSource, TResult> _parent; private int _index; - public τ(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public SelectImpl(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs index 054f974..49d7cf9 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs @@ -1,9 +1,7 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF -using System; using System.Collections.Generic; -using System.Reactive; using System.Reactive.Disposables; #if !NO_TPL @@ -11,17 +9,17 @@ using System.Threading; using System.Threading.Tasks; #endif -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SelectMany<TSource, TCollection, TResult> : Producer<TResult> { private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TCollection>> _collectionSelector; - private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex; + private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI; private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE; - private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex; + private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI; private readonly Func<TSource, TCollection, TResult> _resultSelector; - private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex; + private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI; public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { @@ -33,8 +31,8 @@ namespace System.Reactive.Linq.Observαble public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { _source = source; - _collectionSelectorWithIndex = collectionSelector; - _resultSelectorWithIndex = resultSelector; + _collectionSelectorI = collectionSelector; + _resultSelectorI = resultSelector; } public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) @@ -47,12 +45,14 @@ namespace System.Reactive.Linq.Observαble public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { _source = source; - _collectionSelectorEWithIndex = collectionSelector; - _resultSelectorWithIndex = resultSelector; + _collectionSelectorEI = collectionSelector; + _resultSelectorI = resultSelector; } #if !NO_TPL private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT; + private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI; + private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI; public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { @@ -60,27 +60,52 @@ namespace System.Reactive.Linq.Observαble _collectionSelectorT = collectionSelector; _resultSelector = resultSelector; } + + public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector) + { + _source = source; + _collectionSelectorTI = collectionSelector; + _resultSelectorTI = resultSelector; + } #endif protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) { - if (_collectionSelector != null || _collectionSelectorWithIndex != null) + if (_collectionSelector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } + else if (_collectionSelectorI != null) + { + var sink = new IndexSelectorImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } #if !NO_TPL else if (_collectionSelectorT != null) { - var sink = new τ(this, observer, cancel); + var sink = new SelectManyImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } + else if (_collectionSelectorTI != null) + { + var sink = new Sigma(this, observer, cancel); setSink(sink); return sink.Run(); } #endif + else if (_collectionSelectorE != null) + { + var sink = new NoSelectorImpl(this, observer, cancel); + setSink(sink); + return _source.SubscribeSafe(sink); + } else { - var sink = new ε(this, observer, cancel); + var sink = new Omega(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -94,14 +119,12 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _indexInSource = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; - private int _indexInSource; public IDisposable Run() { @@ -122,13 +145,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._collectionSelector != null) - collection = _parent._collectionSelector(value); - else - { - checked { _indexInSource++; } - collection = _parent._collectionSelectorWithIndex(value, _indexInSource); - } + collection = _parent._collectionSelector(value); } catch (Exception ex) { @@ -142,7 +159,7 @@ namespace System.Reactive.Linq.Observαble var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource)); + innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription)); } public void OnError(Exception error) @@ -178,21 +195,17 @@ namespace System.Reactive.Linq.Observαble } } - class ι : IObserver<TCollection> + class Iter : IObserver<TCollection> { private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; - private int _indexInSource; - private int _indexInIntermediate = -1; - public ι(_ parent, TSource value, IDisposable self, int indexInSource) + public Iter(_ parent, TSource value, IDisposable self) { _parent = parent; _value = value; _self = self; - _indexInSource = indexInSource; - _indexInIntermediate = -1; } public void OnNext(TCollection value) @@ -201,14 +214,164 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._parent._resultSelector != null) - res = _parent._parent._resultSelector(_value, value); - else + res = _parent._parent._resultSelector(_value, value); + } + catch (Exception ex) + { + lock (_parent._gate) + { + _parent._observer.OnError(ex); + _parent.Dispose(); + } + return; + } + + lock (_parent._gate) + _parent._observer.OnNext(res); + } + + public void OnError(Exception error) + { + lock (_parent._gate) + { + _parent._observer.OnError(error); + _parent.Dispose(); + } + } + + public void OnCompleted() + { + _parent._group.Remove(_self); + if (_parent._isStopped && _parent._group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_parent._gate) { - checked { _indexInIntermediate++; } - res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate); + _parent._observer.OnCompleted(); + _parent.Dispose(); } } + } + } + } + + class IndexSelectorImpl : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TCollection, TResult> _parent; + + public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private bool _isStopped; + private CompositeDisposable _group; + private SingleAssignmentDisposable _sourceSubscription; + private int _index; + + public IDisposable Run() + { + _gate = new object(); + _isStopped = false; + _group = new CompositeDisposable(); + + _sourceSubscription = new SingleAssignmentDisposable(); + _group.Add(_sourceSubscription); + _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); + + return _group; + } + + public void OnNext(TSource value) + { + var index = checked(_index++); + var collection = default(IObservable<TCollection>); + + try + { + collection = _parent._collectionSelectorI(value, index); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + var innerSubscription = new SingleAssignmentDisposable(); + _group.Add(innerSubscription); + innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription)); + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + _isStopped = true; + if (_group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + else + { + _sourceSubscription.Dispose(); + } + } + + class Iter : IObserver<TCollection> + { + private readonly IndexSelectorImpl _parent; + private readonly TSource _value; + private readonly int _valueIndex; + private readonly IDisposable _self; + + public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self) + { + _parent = parent; + _value = value; + _valueIndex = index; + _self = self; + } + + private int _index; + + public void OnNext(TCollection value) + { + var res = default(TResult); + + try + { + res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++)); + } catch (Exception ex) { lock (_parent._gate) @@ -254,16 +417,14 @@ namespace System.Reactive.Linq.Observαble } } - class ε : Sink<TResult>, IObserver<TSource> + class NoSelectorImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TCollection, TResult> _parent; - private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection - public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; - _indexInSource = -1; } public void OnNext(TSource value) @@ -271,14 +432,93 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable<TCollection>); try { - if (_parent._collectionSelectorE != null) - xs = _parent._collectionSelectorE(value); - else + xs = _parent._collectionSelectorE(value); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + var e = default(IEnumerator<TCollection>); + try + { + e = xs.GetEnumerator(); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + try + { + var hasNext = true; + while (hasNext) { - checked { _indexInSource++; } - xs = _parent._collectionSelectorEWithIndex(value, _indexInSource); + hasNext = false; + var current = default(TResult); + + try + { + hasNext = e.MoveNext(); + if (hasNext) + current = _parent._resultSelector(value, e.Current); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + if (hasNext) + base._observer.OnNext(current); } } + finally + { + if (e != null) + e.Dispose(); + } + } + + public void OnError(Exception error) + { + base._observer.OnError(error); + base.Dispose(); + } + + public void OnCompleted() + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + + class Omega : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TCollection, TResult> _parent; + + public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private int _index; + + public void OnNext(TSource value) + { + var index = checked(_index++); + + var xs = default(IEnumerable<TCollection>); + try + { + xs = _parent._collectionSelectorEI(value, index); + } catch (Exception exception) { base._observer.OnError(exception); @@ -300,7 +540,7 @@ namespace System.Reactive.Linq.Observαble try { - int indexInIntermediate = -1; + var eIndex = 0; var hasNext = true; while (hasNext) { @@ -311,15 +551,7 @@ namespace System.Reactive.Linq.Observαble { hasNext = e.MoveNext(); if (hasNext) - { - if (_parent._resultSelector != null) - current = _parent._resultSelector(value, e.Current); - else - { - checked { indexInIntermediate++; } - current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate); - } - } + current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++)); } catch (Exception exception) { @@ -354,11 +586,11 @@ namespace System.Reactive.Linq.Observαble #if !NO_TPL #pragma warning disable 0420 - class τ : Sink<TResult>, IObserver<TSource> + class SelectManyImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TCollection, TResult> _parent; - public τ(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -487,6 +719,143 @@ namespace System.Reactive.Linq.Observαble } } } + + class Sigma : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TCollection, TResult> _parent; + + public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private CancellationDisposable _cancel; + private volatile int _count; + private int _index; + + public IDisposable Run() + { + _gate = new object(); + _cancel = new CancellationDisposable(); + _count = 1; + + return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); + } + + public void OnNext(TSource value) + { + var index = checked(_index++); + + var task = default(Task<TCollection>); + try + { + Interlocked.Increment(ref _count); + task = _parent._collectionSelectorTI(value, index, _cancel.Token); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + + return; + } + + if (task.IsCompleted) + { + OnCompletedTask(value, index, task); + } + else + { + AttachContinuation(value, index, task); + } + } + + private void AttachContinuation(TSource value, int index, Task<TCollection> task) + { + // + // Separate method to avoid closure in synchronous completion case. + // + task.ContinueWith(t => OnCompletedTask(value, index, t)); + } + + private void OnCompletedTask(TSource value, int index, Task<TCollection> task) + { + switch (task.Status) + { + case TaskStatus.RanToCompletion: + { + var res = default(TResult); + try + { + res = _parent._resultSelectorTI(value, index, task.Result); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + + return; + } + + lock (_gate) + base._observer.OnNext(res); + + OnCompleted(); + } + break; + case TaskStatus.Faulted: + { + lock (_gate) + { + base._observer.OnError(task.Exception.InnerException); + base.Dispose(); + } + } + break; + case TaskStatus.Canceled: + { + if (!_cancel.IsDisposed) + { + lock (_gate) + { + base._observer.OnError(new TaskCanceledException(task)); + base.Dispose(); + } + } + } + break; + } + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + if (Interlocked.Decrement(ref _count) == 0) + { + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + } + } #pragma warning restore 0420 #endif } @@ -495,13 +864,11 @@ namespace System.Reactive.Linq.Observαble { private readonly IObservable<TSource> _source; private readonly Func<TSource, IObservable<TResult>> _selector; + private readonly Func<TSource, int, IObservable<TResult>> _selectorI; private readonly Func<Exception, IObservable<TResult>> _selectorOnError; private readonly Func<IObservable<TResult>> _selectorOnCompleted; - private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex; - private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError; - private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted; private readonly Func<TSource, IEnumerable<TResult>> _selectorE; - private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex; + private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI; public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) { @@ -509,26 +876,26 @@ namespace System.Reactive.Linq.Observαble _selector = selector; } - public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) { _source = source; - _selector = selector; - _selectorOnError = selectorOnError; - _selectorOnCompleted = selectorOnCompleted; + _selectorI = selector; } - public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) + public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) { _source = source; - _selectorWithIndex = selector; + _selector = selector; + _selectorOnError = selectorOnError; + _selectorOnCompleted = selectorOnCompleted; } - public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted) + public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) { _source = source; - _selectorWithIndex = selector; - _selectorWithIndexOnError = selectorOnError; - _selectorWithIndexOnCompleted = selectorOnCompleted; + _selectorI = selector; + _selectorOnError = selectorOnError; + _selectorOnCompleted = selectorOnCompleted; } public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) @@ -540,38 +907,63 @@ namespace System.Reactive.Linq.Observαble public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) { _source = source; - _selectorEWithIndex = selector; + _selectorEI = selector; } #if !NO_TPL private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT; + private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI; public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) { _source = source; _selectorT = selector; } + + public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) + { + _source = source; + _selectorTI = selector; + } #endif protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) { - if (_selector != null || _selectorWithIndex != null) + if (_selector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } + else if (_selectorI != null) + { + var sink = new IndexSelectorImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } #if !NO_TPL else if (_selectorT != null) { - var sink = new τ(this, observer, cancel); + var sink = new SelectManyImpl(this, observer, cancel); + setSink(sink); + return sink.Run(); + } + else if (_selectorTI != null) + { + var sink = new Sigma(this, observer, cancel); setSink(sink); return sink.Run(); } #endif + else if (_selectorE != null) + { + var sink = new NoSelectorImpl(this, observer, cancel); + setSink(sink); + return _source.SubscribeSafe(sink); + } else { - var sink = new ε(this, observer, cancel); + var sink = new Omega(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -585,14 +977,12 @@ namespace System.Reactive.Linq.Observαble : base(observer, cancel) { _parent = parent; - _index = -1; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; - private int _index; public IDisposable Run() { @@ -613,13 +1003,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._selector != null) - inner = _parent._selector(value); - else - { - checked { _index++; } - inner = _parent._selectorWithIndex(value, _index); - } + inner = _parent._selector(value); } catch (Exception ex) { @@ -642,13 +1026,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._selectorOnError != null) - inner = _parent._selectorOnError(error); - else - { - checked { _index++; } - inner = _parent._selectorWithIndexOnError(error, _index); - } + inner = _parent._selectorOnError(error); } catch (Exception ex) { @@ -682,10 +1060,7 @@ namespace System.Reactive.Linq.Observαble try { - if (_parent._selectorOnCompleted != null) - inner = _parent._selectorOnCompleted(); - else - inner = _parent._selectorWithIndexOnCompleted(_index); + inner = _parent._selectorOnCompleted(); } catch (Exception ex) { @@ -731,15 +1106,15 @@ namespace System.Reactive.Linq.Observαble { var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); - innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription)); + innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); } - class ι : IObserver<TResult> + class Iter : IObserver<TResult> { private readonly _ _parent; private readonly IDisposable _self; - public ι(_ parent, IDisposable self) + public Iter(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -782,16 +1157,203 @@ namespace System.Reactive.Linq.Observαble } } - class ε : Sink<TResult>, IObserver<TSource> + class IndexSelectorImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TResult> _parent; + + public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private bool _isStopped; + private CompositeDisposable _group; + private SingleAssignmentDisposable _sourceSubscription; private int _index; - public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public IDisposable Run() + { + _gate = new object(); + _isStopped = false; + _group = new CompositeDisposable(); + + _sourceSubscription = new SingleAssignmentDisposable(); + _group.Add(_sourceSubscription); + _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); + + return _group; + } + + public void OnNext(TSource value) + { + var inner = default(IObservable<TResult>); + + try + { + inner = _parent._selectorI(value, checked(_index++)); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + SubscribeInner(inner); + } + + public void OnError(Exception error) + { + if (_parent._selectorOnError != null) + { + var inner = default(IObservable<TResult>); + + try + { + inner = _parent._selectorOnError(error); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + SubscribeInner(inner); + + Final(); + } + else + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + } + + public void OnCompleted() + { + if (_parent._selectorOnCompleted != null) + { + var inner = default(IObservable<TResult>); + + try + { + inner = _parent._selectorOnCompleted(); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + return; + } + + SubscribeInner(inner); + } + + Final(); + } + + private void Final() + { + _isStopped = true; + if (_group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + else + { + _sourceSubscription.Dispose(); + } + } + + private void SubscribeInner(IObservable<TResult> inner) + { + var innerSubscription = new SingleAssignmentDisposable(); + _group.Add(innerSubscription); + innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); + } + + class Iter : IObserver<TResult> + { + private readonly IndexSelectorImpl _parent; + private readonly IDisposable _self; + + public Iter(IndexSelectorImpl parent, IDisposable self) + { + _parent = parent; + _self = self; + } + + public void OnNext(TResult value) + { + lock (_parent._gate) + _parent._observer.OnNext(value); + } + + public void OnError(Exception error) + { + lock (_parent._gate) + { + _parent._observer.OnError(error); + _parent.Dispose(); + } + } + + public void OnCompleted() + { + _parent._group.Remove(_self); + if (_parent._isStopped && _parent._group.Count == 1) + { + // + // Notice there can be a race between OnCompleted of the source and any + // of the inner sequences, where both see _group.Count == 1, and one is + // waiting for the lock. There won't be a double OnCompleted observation + // though, because the call to Dispose silences the observer by swapping + // in a NopObserver<T>. + // + lock (_parent._gate) + { + _parent._observer.OnCompleted(); + _parent.Dispose(); + } + } + } + } + } + + class NoSelectorImpl : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TResult> _parent; + + public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; - _index = -1; } public void OnNext(TSource value) @@ -799,14 +1361,91 @@ namespace System.Reactive.Linq.Observαble var xs = default(IEnumerable<TResult>); try { - if (_parent._selectorE != null) - xs = _parent._selectorE(value); - else + xs = _parent._selectorE(value); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + var e = default(IEnumerator<TResult>); + try + { + e = xs.GetEnumerator(); + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + try + { + var hasNext = true; + while (hasNext) { - checked { _index++; } - xs = _parent._selectorEWithIndex(value, _index); + hasNext = false; + var current = default(TResult); + + try + { + hasNext = e.MoveNext(); + if (hasNext) + current = e.Current; + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + return; + } + + if (hasNext) + base._observer.OnNext(current); } } + finally + { + if (e != null) + e.Dispose(); + } + } + + public void OnError(Exception error) + { + base._observer.OnError(error); + base.Dispose(); + } + + public void OnCompleted() + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + + class Omega : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TResult> _parent; + + public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private int _index; + + public void OnNext(TSource value) + { + var xs = default(IEnumerable<TResult>); + try + { + xs = _parent._selectorEI(value, checked(_index++)); + } catch (Exception exception) { base._observer.OnError(exception); @@ -873,11 +1512,11 @@ namespace System.Reactive.Linq.Observαble #if !NO_TPL #pragma warning disable 0420 - class τ : Sink<TResult>, IObserver<TSource> + class SelectManyImpl : Sink<TResult>, IObserver<TSource> { private readonly SelectMany<TSource, TResult> _parent; - public τ(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -982,6 +1621,117 @@ namespace System.Reactive.Linq.Observαble } } } + + class Sigma : Sink<TResult>, IObserver<TSource> + { + private readonly SelectMany<TSource, TResult> _parent; + + public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + : base(observer, cancel) + { + _parent = parent; + } + + private object _gate; + private CancellationDisposable _cancel; + private volatile int _count; + private int _index; + + public IDisposable Run() + { + _gate = new object(); + _cancel = new CancellationDisposable(); + _count = 1; + + return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); + } + + public void OnNext(TSource value) + { + var task = default(Task<TResult>); + try + { + Interlocked.Increment(ref _count); + task = _parent._selectorTI(value, checked(_index++), _cancel.Token); + } + catch (Exception ex) + { + lock (_gate) + { + base._observer.OnError(ex); + base.Dispose(); + } + + return; + } + + if (task.IsCompleted) + { + OnCompletedTask(task); + } + else + { + task.ContinueWith(OnCompletedTask); + } + } + + private void OnCompletedTask(Task<TResult> task) + { + switch (task.Status) + { + case TaskStatus.RanToCompletion: + { + lock (_gate) + base._observer.OnNext(task.Result); + + OnCompleted(); + } + break; + case TaskStatus.Faulted: + { + lock (_gate) + { + base._observer.OnError(task.Exception.InnerException); + base.Dispose(); + } + } + break; + case TaskStatus.Canceled: + { + if (!_cancel.IsDisposed) + { + lock (_gate) + { + base._observer.OnError(new TaskCanceledException(task)); + base.Dispose(); + } + } + } + break; + } + } + + public void OnError(Exception error) + { + lock (_gate) + { + base._observer.OnError(error); + base.Dispose(); + } + } + + public void OnCompleted() + { + if (Interlocked.Decrement(ref _count) == 0) + { + lock (_gate) + { + base._observer.OnCompleted(); + base.Dispose(); + } + } + } + } #pragma warning restore 0420 #endif } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs index 4661f82..6b03aba 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SequenceEqual<TSource> : Producer<bool> { @@ -38,7 +38,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new ε(this, observer, cancel); + var sink = new SequenceEqualImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -226,11 +226,11 @@ namespace System.Reactive.Linq.Observαble } } - class ε : Sink<bool>, IObserver<TSource> + class SequenceEqualImpl : Sink<bool>, IObserver<TSource> { private readonly SequenceEqual<TSource> _parent; - public ε(SequenceEqual<TSource> parent, IObserver<bool> observer, IDisposable cancel) + public SequenceEqualImpl(SequenceEqual<TSource> parent, IObserver<bool> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs index 19c07e5..c4dce4a 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SingleAsync<TSource> : Producer<TSource> { @@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble { if (_predicate != null) { - var sink = new π(this, observer, cancel); + var sink = new SingleAsyncImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -84,13 +84,13 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<TSource>, IObserver<TSource> + class SingleAsyncImpl : Sink<TSource>, IObserver<TSource> { private readonly SingleAsync<TSource> _parent; private TSource _value; private bool _seenValue; - public π(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public SingleAsyncImpl(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs index a092d99..7222816 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Skip<TSource> : Producer<TSource> { @@ -27,7 +27,7 @@ namespace System.Reactive.Linq.Observαble _scheduler = scheduler; } - public IObservable<TSource> Ω(int count) + public IObservable<TSource> Omega(int count) { // // Sum semantics: @@ -39,7 +39,7 @@ namespace System.Reactive.Linq.Observαble return new Skip<TSource>(_source, _count + count); } - public IObservable<TSource> Ω(TimeSpan duration) + public IObservable<TSource> Omega(TimeSpan duration) { // // Maximum semantics: @@ -66,7 +66,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new SkipImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -105,12 +105,12 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class SkipImpl : Sink<TSource>, IObserver<TSource> { private readonly Skip<TSource> _parent; private volatile bool _open; - public τ(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public SkipImpl(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs index 596f183..6bd771e 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Reactive.Concurrency; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SkipLast<TSource> : Producer<TSource> { @@ -37,7 +37,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new SkipLastImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -75,12 +75,12 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class SkipLastImpl : Sink<TSource>, IObserver<TSource> { private readonly SkipLast<TSource> _parent; private Queue<System.Reactive.TimeInterval<TSource>> _queue; - public τ(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public SkipLastImpl(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs index 8d4c7a9..8acce09 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs @@ -6,7 +6,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SkipUntil<TSource, TOther> : Producer<TSource> { @@ -140,7 +140,7 @@ namespace System.Reactive.Linq.Observαble _scheduler = scheduler; } - public IObservable<TSource> Ω(DateTimeOffset startTime) + public IObservable<TSource> Omega(DateTimeOffset startTime) { // // Maximum semantics: diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs index e6e379d..b5694dc 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SkipWhile<TSource> : Producer<TSource> { @@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new SkipWhileImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -86,13 +86,13 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class SkipWhileImpl : Sink<TSource>, IObserver<TSource> { private readonly SkipWhile<TSource> _parent; private bool _running; private int _index; - public τ(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public SkipWhileImpl(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs index 6d51153..c5346f8 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class SumDouble : Producer<double> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs index 794b0d4..e27d6eb 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Switch<TSource> : Producer<TSource> { @@ -65,7 +65,7 @@ namespace System.Reactive.Linq.Observαble var d = new SingleAssignmentDisposable(); _innerSubscription.Disposable = d; - d.Disposable = value.SubscribeSafe(new ι(this, id, d)); + d.Disposable = value.SubscribeSafe(new Iter(this, id, d)); } public void OnError(Exception error) @@ -91,13 +91,13 @@ namespace System.Reactive.Linq.Observαble } } - class ι : IObserver<TSource> + class Iter : IObserver<TSource> { private readonly _ _parent; private readonly ulong _id; private readonly IDisposable _self; - public ι(_ parent, ulong id, IDisposable self) + public Iter(_ parent, ulong id, IDisposable self) { _parent = parent; _id = id; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs index 36985e9..ef4e885 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Synchronize<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs index 738e7c0..255e3d2 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Take<TSource> : Producer<TSource> { @@ -27,7 +27,7 @@ namespace System.Reactive.Linq.Observαble _scheduler = scheduler; } - public IObservable<TSource> Ω(int count) + public IObservable<TSource> Omega(int count) { // // Minimum semantics: @@ -42,7 +42,7 @@ namespace System.Reactive.Linq.Observαble return new Take<TSource>(_source, count); } - public IObservable<TSource> Ω(TimeSpan duration) + public IObservable<TSource> Omega(TimeSpan duration) { // // Minimum semantics: @@ -69,7 +69,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new TakeImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -115,11 +115,11 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class TakeImpl : Sink<TSource>, IObserver<TSource> { private readonly Take<TSource> _parent; - public τ(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public TakeImpl(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs index 98bf40e..441a502 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class TakeLast<TSource> : Producer<TSource> { @@ -41,7 +41,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new TakeLastImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -131,12 +131,12 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class TakeLastImpl : Sink<TSource>, IObserver<TSource> { private readonly TakeLast<TSource> _parent; private Queue<System.Reactive.TimeInterval<TSource>> _queue; - public τ(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public TakeLastImpl(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs index 71b36b0..2a15825 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs @@ -7,7 +7,7 @@ using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class TakeLastBuffer<TSource> : Producer<IList<TSource>> { @@ -39,7 +39,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new Impl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -82,12 +82,12 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<IList<TSource>>, IObserver<TSource> + class Impl : Sink<IList<TSource>>, IObserver<TSource> { private readonly TakeLastBuffer<TSource> _parent; private Queue<System.Reactive.TimeInterval<TSource>> _queue; - public τ(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) + public Impl(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs index eccc222..62b450a 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs @@ -6,7 +6,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class TakeUntil<TSource, TOther> : Producer<TSource> { @@ -173,7 +173,7 @@ namespace System.Reactive.Linq.Observαble _scheduler = scheduler; } - public IObservable<TSource> Ω(DateTimeOffset endTime) + public IObservable<TSource> Omega(DateTimeOffset endTime) { // // Minimum semantics: diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs index 30f63ca..dbf6117 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class TakeWhile<TSource> : Producer<TSource> { @@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new TakeWhileImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -91,13 +91,13 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class TakeWhileImpl : Sink<TSource>, IObserver<TSource> { private readonly TakeWhile<TSource> _parent; private bool _running; private int _index; - public τ(TakeWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public TakeWhileImpl(TakeWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs index 7b7ad86..5061114 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Throttle<TSource> : Producer<TSource> { @@ -192,7 +192,7 @@ namespace System.Reactive.Linq.Observαble var d = new SingleAssignmentDisposable(); _cancelable.Disposable = d; - d.Disposable = throttle.SubscribeSafe(new δ(this, value, currentid, d)); + d.Disposable = throttle.SubscribeSafe(new Delta(this, value, currentid, d)); } public void OnError(Exception error) @@ -226,14 +226,14 @@ namespace System.Reactive.Linq.Observαble } } - class δ : IObserver<TThrottle> + class Delta : IObserver<TThrottle> { private readonly _ _parent; private readonly TSource _value; private readonly ulong _currentid; private readonly IDisposable _self; - public δ(_ parent, TSource value, ulong currentid, IDisposable self) + public Delta(_ parent, TSource value, ulong currentid, IDisposable self) { _parent = parent; _value = value; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs index 3b10894..9940a9d 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Concurrency; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Throw<TResult> : Producer<TResult> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs index dacfa03..4c40863 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs @@ -7,7 +7,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class TimeInterval<TSource> : Producer<System.Reactive.TimeInterval<TSource>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs index 4896294..94b78a6 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs @@ -5,7 +5,7 @@ using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Timeout<TSource> : Producer<TSource> { @@ -35,23 +35,23 @@ namespace System.Reactive.Linq.Observαble { if (_dueTimeA.HasValue) { - var sink = new α(this, observer, cancel); + var sink = new TimeA(this, observer, cancel); setSink(sink); return sink.Run(); } else { - var sink = new ρ(this, observer, cancel); + var sink = new TimeR(this, observer, cancel); setSink(sink); return sink.Run(); } } - class α : Sink<TSource>, IObserver<TSource> + class TimeA : Sink<TSource>, IObserver<TSource> { private readonly Timeout<TSource> _parent; - public α(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public TimeA(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -136,11 +136,11 @@ namespace System.Reactive.Linq.Observαble } } - class ρ : Sink<TSource>, IObserver<TSource> + class TimeR : Sink<TSource>, IObserver<TSource> { private readonly Timeout<TSource> _parent; - public ρ(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public TimeR(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -358,16 +358,16 @@ namespace System.Reactive.Linq.Observαble var d = new SingleAssignmentDisposable(); _timer.Disposable = d; - d.Disposable = timeout.SubscribeSafe(new τ(this, myid, d)); + d.Disposable = timeout.SubscribeSafe(new TimeoutImpl(this, myid, d)); } - class τ : IObserver<TTimeout> + class TimeoutImpl : IObserver<TTimeout> { private readonly _ _parent; private readonly ulong _id; private readonly IDisposable _self; - public τ(_ parent, ulong id, IDisposable self) + public TimeoutImpl(_ parent, ulong id, IDisposable self) { _parent = parent; _id = id; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs index 22ee0df..14df757 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs @@ -7,7 +7,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Timer : Producer<long> { @@ -34,7 +34,7 @@ namespace System.Reactive.Linq.Observαble { if (_period.HasValue) { - var sink = new π(this, observer, cancel); + var sink = new TimerImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -76,12 +76,12 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<long> + class TimerImpl : Sink<long> { private readonly Timer _parent; private readonly TimeSpan _period; - public π(Timer parent, IObserver<long> observer, IDisposable cancel) + public TimerImpl(Timer parent, IObserver<long> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs index ed54678..19bf1be 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Concurrency; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Timestamp<TSource> : Producer<Timestamped<TSource>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs index ba039ae..97b9bd4 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ToArray<TSource> : Producer<TSource[]> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs index 919c1a1..5cfd9ec 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ToDictionary<TSource, TKey, TElement> : Producer<IDictionary<TKey, TElement>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs index 34956d4..a265098 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ToList<TSource> : Producer<IList<TSource>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs index 55ff7d3..d86f5ee 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ToLookup<TSource, TKey, TElement> : Producer<ILookup<TKey, TElement>> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs index 795d54d..8de8ff7 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class ToObservable<TSource> : Producer<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs index 996ee0e..680d1f4 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs @@ -4,7 +4,7 @@ using System; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Using<TSource, TResource> : Producer<TSource> where TResource : IDisposable diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs index a8eaa92..3ab5f73 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Where<TSource> : Producer<TSource> { @@ -23,7 +23,7 @@ namespace System.Reactive.Linq.Observαble _predicateI = predicate; } - public IObservable<TSource> Ω(Func<TSource, bool> predicate) + public IObservable<TSource> Omega(Func<TSource, bool> predicate) { if (_predicate != null) return new Where<TSource>(_source, x => _predicate(x) && predicate(x)); @@ -41,7 +41,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new τ(this, observer, cancel); + var sink = new WhereImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } @@ -88,12 +88,12 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<TSource>, IObserver<TSource> + class WhereImpl : Sink<TSource>, IObserver<TSource> { private readonly Where<TSource> _parent; private int _index; - public τ(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel) + public WhereImpl(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs index 178e4c8..2f24adf 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs @@ -4,7 +4,7 @@ using System; using System.Collections.Generic; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class While<TSource> : Producer<TSource>, IConcatenatable<TSource> { diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs index 47059c7..4dad064 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs @@ -9,7 +9,7 @@ using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { class Window<TSource> : Producer<IObservable<TSource>> { @@ -54,7 +54,7 @@ namespace System.Reactive.Linq.Observαble } else if (_count > 0) { - var sink = new μ(this, observer, cancel); + var sink = new BoundedWindowImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -62,13 +62,13 @@ namespace System.Reactive.Linq.Observαble { if (_timeSpan == _timeShift) { - var sink = new π(this, observer, cancel); + var sink = new TimeShiftImpl(this, observer, cancel); setSink(sink); return sink.Run(); } else { - var sink = new τ(this, observer, cancel); + var sink = new WindowImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -151,11 +151,11 @@ namespace System.Reactive.Linq.Observαble } } - class τ : Sink<IObservable<TSource>>, IObserver<TSource> + class WindowImpl : Sink<IObservable<TSource>>, IObserver<TSource> { private readonly Window<TSource> _parent; - public τ(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) + public WindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -296,11 +296,11 @@ namespace System.Reactive.Linq.Observαble } } - class π : Sink<IObservable<TSource>>, IObserver<TSource> + class TimeShiftImpl : Sink<IObservable<TSource>>, IObserver<TSource> { private readonly Window<TSource> _parent; - public π(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) + public TimeShiftImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -371,11 +371,11 @@ namespace System.Reactive.Linq.Observαble } } - class μ : Sink<IObservable<TSource>>, IObserver<TSource> + class BoundedWindowImpl : Sink<IObservable<TSource>>, IObserver<TSource> { private readonly Window<TSource> _parent; - public μ(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) + public BoundedWindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -516,7 +516,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new β(this, observer, cancel); + var sink = new Beta(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -578,7 +578,7 @@ namespace System.Reactive.Linq.Observαble var closingSubscription = new SingleAssignmentDisposable(); _m.Disposable = closingSubscription; - closingSubscription.Disposable = windowClose.SubscribeSafe(new ω(this, closingSubscription)); + closingSubscription.Disposable = windowClose.SubscribeSafe(new Omega(this, closingSubscription)); } private void CloseWindow(IDisposable closingSubscription) @@ -597,12 +597,12 @@ namespace System.Reactive.Linq.Observαble _windowGate.Wait(CreateWindowClose); } - class ω : IObserver<TWindowClosing> + class Omega : IObserver<TWindowClosing> { private readonly _ _parent; private readonly IDisposable _self; - public ω(_ parent, IDisposable self) + public Omega(_ parent, IDisposable self) { _parent = parent; _self = self; @@ -653,11 +653,11 @@ namespace System.Reactive.Linq.Observαble } } - class β : Sink<IObservable<TSource>>, IObserver<TSource> + class Beta : Sink<IObservable<TSource>>, IObserver<TSource> { private readonly Window<TSource, TWindowClosing> _parent; - public β(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) + public Beta(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; @@ -680,16 +680,16 @@ namespace System.Reactive.Linq.Observαble base._observer.OnNext(window); d.Add(_parent._source.SubscribeSafe(this)); - d.Add(_parent._windowBoundaries.SubscribeSafe(new ω(this))); + d.Add(_parent._windowBoundaries.SubscribeSafe(new Omega(this))); return _refCountDisposable; } - class ω : IObserver<TWindowClosing> + class Omega : IObserver<TWindowClosing> { - private readonly β _parent; + private readonly Beta _parent; - public ω(β parent) + public Omega(Beta parent) { _parent = parent; } diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs index bde6e29..77434c7 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs @@ -7,7 +7,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { #region Binary @@ -42,7 +42,7 @@ namespace System.Reactive.Linq.Observαble } else { - var sink = new ε(this, observer, cancel); + var sink = new ZipImpl(this, observer, cancel); setSink(sink); return sink.Run(); } @@ -258,11 +258,11 @@ namespace System.Reactive.Linq.Observαble } } - class ε : Sink<TResult>, IObserver<TFirst> + class ZipImpl : Sink<TResult>, IObserver<TFirst> { private readonly Zip<TFirst, TSecond, TResult> _parent; - public ε(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel) + public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs index d4b0dbc..4489003 100644 --- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs @@ -3,7 +3,7 @@ #if !NO_PERF using System; -namespace System.Reactive.Linq.Observαble +namespace System.Reactive.Linq.ObservableImpl { } #endif |