Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
committerAtsushi Eno <atsushieno@gmail.com>2013-12-16 17:30:03 +0400
commit74a538f6725ebc83efda4bb07d5747e8a6359e19 (patch)
tree7c98de97c88c78b4aca4b25b36db310f82c26865 /Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable
parent50e7bdb4507f7e4c2aefb7772d57d9a80f4d42b0 (diff)
Import Official Rx 2.2 (3ebdd2e09991)HEADmaster
I made changes from the original source tree to match the older tree so that we don't have to make several changes to project tree generator. (There is actually no new sources in Rx so hopefully we can just reuse existing modifications in the tree).
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs40
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs26
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs6
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs16
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs14
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs16
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs58
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs30
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs30
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs28
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs962
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs4
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs20
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs10
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs2
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs40
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs8
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs2
98 files changed, 1213 insertions, 417 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
index 3fc6962..46e66c6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AddRef.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class AddRef<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs
index d1b6ef8..6ae63c0 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Aggregate.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Aggregate<TSource, TAccumulate, TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs
index 09c825d..49a0933 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/All.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class All<TSource> : Producer<bool>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
index 7272cee..ea110c5 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Amb.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Amb<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs
index 1414bdd..c8eb37a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Any.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Any<TSource> : Producer<bool>
{
@@ -25,7 +25,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new AnyImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -65,11 +65,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<bool>, IObserver<TSource>
+ class AnyImpl : Sink<bool>, IObserver<TSource>
{
private readonly Any<TSource> _parent;
- public π(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel)
+ public AnyImpl(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs
index 09d5f14..c4e9d2f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/AsObservable.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class AsObservable<TSource> : Producer<TSource>, IEvaluatableObservable<TSource>
{
@@ -14,7 +14,7 @@ namespace System.Reactive.Linq.Observαble
_source = source;
}
- public IObservable<TSource> Ω()
+ public IObservable<TSource> Omega()
{
return this;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs
index fb2d6b7..897da02 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Average.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class AverageDouble : Producer<double>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
index a82b550..ce6ced8 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Buffer.cs
@@ -8,7 +8,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Buffer<TSource> : Producer<IList<TSource>>
{
@@ -53,7 +53,7 @@ namespace System.Reactive.Linq.Observαble
}
else if (_count > 0)
{
- var sink = new μ(this, observer, cancel);
+ var sink = new Impl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -61,13 +61,13 @@ namespace System.Reactive.Linq.Observαble
{
if (_timeSpan == _timeShift)
{
- var sink = new π(this, observer, cancel);
+ var sink = new BufferTimeShift(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new BufferImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -143,11 +143,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<IList<TSource>>, IObserver<TSource>
+ class BufferImpl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource> _parent;
- public τ(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public BufferImpl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -283,11 +283,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<IList<TSource>>, IObserver<TSource>
+ class BufferTimeShift : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource> _parent;
- public π(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public BufferTimeShift(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -346,11 +346,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class μ : Sink<IList<TSource>>, IObserver<TSource>
+ class Impl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource> _parent;
- public μ(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public Impl(Buffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -487,7 +487,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new β(this, observer, cancel);
+ var sink = new Beta(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -544,7 +544,7 @@ namespace System.Reactive.Linq.Observαble
var closingSubscription = new SingleAssignmentDisposable();
_m.Disposable = closingSubscription;
- closingSubscription.Disposable = bufferClose.SubscribeSafe(new ω(this, closingSubscription));
+ closingSubscription.Disposable = bufferClose.SubscribeSafe(new Omega(this, closingSubscription));
}
private void CloseBuffer(IDisposable closingSubscription)
@@ -561,12 +561,12 @@ namespace System.Reactive.Linq.Observαble
_bufferGate.Wait(CreateBufferClose);
}
- class ω : IObserver<TBufferClosing>
+ class Omega : IObserver<TBufferClosing>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ω(_ parent, IDisposable self)
+ public Omega(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -617,11 +617,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class β : Sink<IList<TSource>>, IObserver<TSource>
+ class Beta : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly Buffer<TSource, TBufferClosing> _parent;
- public β(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public Beta(Buffer<TSource, TBufferClosing> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -641,16 +641,16 @@ namespace System.Reactive.Linq.Observαble
_refCountDisposable = new RefCountDisposable(d);
d.Add(_parent._source.SubscribeSafe(this));
- d.Add(_parent._bufferBoundaries.SubscribeSafe(new ω(this)));
+ d.Add(_parent._bufferBoundaries.SubscribeSafe(new Omega(this)));
return _refCountDisposable;
}
- class ω : IObserver<TBufferClosing>
+ class Omega : IObserver<TBufferClosing>
{
- private readonly β _parent;
+ private readonly Beta _parent;
- public ω(β parent)
+ public Omega(Beta parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs
index b104050..b3ba37c 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Case.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Case<TValue, TResult> : Producer<TResult>, IEvaluatableObservable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs
index 48820b3..b123357 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Cast.cs
@@ -3,9 +3,9 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
- class Cast<TSource, TResult> : Producer<TResult> /* Could optimize further by deriving from Select<TResult> and providing Ω<TResult2>. We're not doing this (yet) for debuggability. */
+ class Cast<TSource, TResult> : Producer<TResult> /* Could optimize further by deriving from Select<TResult> and providing Omega<TResult2>. We're not doing this (yet) for debuggability. */
{
private readonly IObservable<TSource> _source;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs
index 71e0037..06bf911 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Catch<TSource> : Producer<TSource>
{
@@ -136,7 +136,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_subscription.Disposable = d;
- d.Disposable = result.SubscribeSafe(new ε(this));
+ d.Disposable = result.SubscribeSafe(new Impl(this));
}
else
{
@@ -151,11 +151,11 @@ namespace System.Reactive.Linq.Observαble
base.Dispose();
}
- class ε : IObserver<TSource>
+ class Impl : IObserver<TSource>
{
private readonly _ _parent;
- public ε(_ parent)
+ public Impl(_ parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs
index 2cfd240..8f0bf24 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Collect.cs
@@ -7,7 +7,7 @@ using System.Reactive;
using System.Reactive.Threading;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Collect<TSource, TResult> : PushToPullAdapter<TSource, TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
index 2188307..9748e24 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs
@@ -7,7 +7,7 @@ using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
#region Binary
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs
index 5a9b2e4..cd8c78a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Concat.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Concat<TSource> : Producer<TSource>, IConcatenatable<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs
index b26e2d4..69e30dc 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Contains.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Contains<TSource> : Producer<bool>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs
index 7432816..0c2f389 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Count.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Count<TSource> : Producer<int>
{
@@ -31,7 +31,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new π(this, observer, cancel);
+ var sink = new CountImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -77,12 +77,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<int>, IObserver<TSource>
+ class CountImpl : Sink<int>, IObserver<TSource>
{
private readonly Count<TSource> _parent;
private int _count;
- public π(Count<TSource> parent, IObserver<int> observer, IDisposable cancel)
+ public CountImpl(Count<TSource> parent, IObserver<int> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs
index ed4bb89..551e536 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DefaultIfEmpty.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DefaultIfEmpty<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs
index ba3dcc4..aa49c12 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Defer.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Defer<TValue> : Producer<TValue>, IEvaluatableObservable<TValue>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
index 3e2cbba..5ea3d25 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Delay.cs
@@ -10,7 +10,7 @@ using System.Threading;
using System.Reactive.Threading;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Delay<TSource> : Producer<TSource>
{
@@ -37,7 +37,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_scheduler.AsLongRunning() != null)
{
- var sink = new λ(this, observer, cancel);
+ var sink = new LongRunningImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -227,9 +227,9 @@ namespace System.Reactive.Linq.Observαble
// implementation, the loop below kept running while there was work for immediate dispatch,
// potentially causing a long running work item on the target scheduler. With the addition
// of long-running scheduling in Rx v2.0, we can check whether the scheduler supports this
- // interface and perform different processing (see λ). To reduce the code churn in the old
- // loop code here, we set the shouldYield flag to true after the first dispatch iteration,
- // in order to break from the loop and enter the recursive scheduling path.
+ // interface and perform different processing (see LongRunningImpl). To reduce the code
+ // churn in the old loop code here, we set the shouldYield flag to true after the first
+ // dispatch iteration, in order to break from the loop and enter the recursive scheduling path.
//
var shouldYield = false;
@@ -322,11 +322,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class λ : Sink<TSource>, IObserver<TSource>
+ class LongRunningImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Delay<TSource> _parent;
- public λ(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public LongRunningImpl(Delay<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -629,7 +629,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new σ(this));
+ _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
}
return new CompositeDisposable(_subscription, _delays);
@@ -660,7 +660,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_delays.Add(d);
- d.Disposable = delay.SubscribeSafe(new δ(this, value, d));
+ d.Disposable = delay.SubscribeSafe(new Delta(this, value, d));
}
public void OnError(Exception error)
@@ -692,11 +692,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class σ : IObserver<TDelay>
+ class SubscriptionDelay : IObserver<TDelay>
{
private readonly _ _parent;
- public σ(_ parent)
+ public SubscriptionDelay(_ parent)
{
_parent = parent;
}
@@ -718,13 +718,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TDelay>
+ class Delta : IObserver<TDelay>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
- public δ(_ parent, TSource value, IDisposable self)
+ public Delta(_ parent, TSource value, IDisposable self)
{
_parent = parent;
_value = value;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs
index c6a05cc..f5a742a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DelaySubscription.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DelaySubscription<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs
index 112b156..590ef3b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Dematerialize.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Dematerialize<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs
index af20277..bd4b0ca 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Distinct.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Distinct<TSource, TKey> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs
index 10c7923..1ba8f5f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DistinctUntilChanged.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DistinctUntilChanged<TSource, TKey> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs
index 2f73cfa..a4b3747 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Do.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Do<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs
index 3ef40eb..5dc0bc6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/DoWhile.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class DoWhile<TSource> : Producer<TSource>, IConcatenatable<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs
index b2b1212..0798d3b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ElementAt.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ElementAt<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs
index 42c7241..30a2b65 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Empty.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Empty<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs
index ca9a5d8..a70ac94 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Finally.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Finally<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs
index 8d8bba7..62da120 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FirstAsync.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class FirstAsync<TSource> : Producer<TSource>
{
@@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new FirstAsyncImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -73,11 +73,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<TSource>, IObserver<TSource>
+ class FirstAsyncImpl : Sink<TSource>, IObserver<TSource>
{
private readonly FirstAsync<TSource> _parent;
- public π(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public FirstAsyncImpl(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs
index 85b9e03..948f319 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/For.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class For<TSource, TResult> : Producer<TResult>, IConcatenatable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs
index 38e112d..9b5c024 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ForEach.cs
@@ -4,7 +4,7 @@
using System;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ForEach<TSource>
{
@@ -62,7 +62,7 @@ namespace System.Reactive.Linq.Observαble
}
}
- public class τ : IObserver<TSource>
+ public class ForEachImpl : IObserver<TSource>
{
private readonly Action<TSource, int> _onNext;
private readonly Action _done;
@@ -71,7 +71,7 @@ namespace System.Reactive.Linq.Observαble
private Exception _exception;
private int _stopped;
- public τ(Action<TSource, int> onNext, Action done)
+ public ForEachImpl(Action<TSource, int> onNext, Action done)
{
_onNext = onNext;
_done = done;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
index 82c4348..90aa779 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
@@ -156,7 +156,7 @@ using System.Reactive.Subjects;
// subject (which is opaque to the event producer). Not to mention that the subject would always be
// rooted by the target event (even when the FromEvent[Pattern] observable wrapper is unreachable).
//
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class FromEvent<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, TEventArgs>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs
index 76da050..ca98290 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEventPattern.cs
@@ -12,23 +12,23 @@ using System.Threading;
//
// See FromEvent.cs for more information.
//
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class FromEventPattern
{
- public class τ<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TEventArgs>>
+ public class Impl<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TEventArgs>>
#if !NO_EVENTARGS_CONSTRAINT
where TEventArgs : EventArgs
#endif
{
private readonly Func<EventHandler<TEventArgs>, TDelegate> _conversion;
- public τ(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
+ public Impl(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
}
- public τ(Func<EventHandler<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
+ public Impl(Func<EventHandler<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
_conversion = conversion;
@@ -52,12 +52,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- public class τ<TDelegate, TSender, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TSender, TEventArgs>>
+ public class Impl<TDelegate, TSender, TEventArgs> : ClassicEventProducer<TDelegate, EventPattern<TSender, TEventArgs>>
#if !NO_EVENTARGS_CONSTRAINT
where TEventArgs : EventArgs
#endif
{
- public τ(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
+ public Impl(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
}
@@ -69,7 +69,7 @@ namespace System.Reactive.Linq.Observαble
}
}
- public class ρ<TSender, TEventArgs, TResult> : EventProducer<Delegate, TResult>
+ public class Handler<TSender, TEventArgs, TResult> : EventProducer<Delegate, TResult>
{
private readonly object _target;
private readonly Type _delegateType;
@@ -80,7 +80,7 @@ namespace System.Reactive.Linq.Observαble
private readonly bool _isWinRT;
#endif
- public ρ(object target, Type delegateType, MethodInfo addMethod, MethodInfo removeMethod, Func<TSender, TEventArgs, TResult> getResult, bool isWinRT, IScheduler scheduler)
+ public Handler(object target, Type delegateType, MethodInfo addMethod, MethodInfo removeMethod, Func<TSender, TEventArgs, TResult> getResult, bool isWinRT, IScheduler scheduler)
: base(scheduler)
{
#if HAS_WINRT
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs
index 89ef862..7e46655 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs
@@ -7,7 +7,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Generate<TState, TResult> : Producer<TResult>
{
@@ -52,13 +52,13 @@ namespace System.Reactive.Linq.Observαble
{
if (_timeSelectorA != null)
{
- var sink = new α(this, observer, cancel);
+ var sink = new SelectorA(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else if (_timeSelectorR != null)
{
- var sink = new δ(this, observer, cancel);
+ var sink = new Delta(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -70,11 +70,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class α : Sink<TResult>
+ class SelectorA : Sink<TResult>
{
private readonly Generate<TState, TResult> _parent;
- public α(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -130,11 +130,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : Sink<TResult>
+ class Delta : Sink<TResult>
{
private readonly Generate<TState, TResult> _parent;
- public δ(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
index 05bf20b..a25efa8 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs
@@ -8,7 +8,7 @@ using System.Diagnostics;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs
index 1e64100..4f91030 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs
@@ -7,20 +7,22 @@ using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupBy<TSource, TKey, TElement> : Producer<IGroupedObservable<TKey, TElement>>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, TKey> _keySelector;
private readonly Func<TSource, TElement> _elementSelector;
+ private readonly int? _capacity;
private readonly IEqualityComparer<TKey> _comparer;
- public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+ public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
_source = source;
_keySelector = keySelector;
_elementSelector = elementSelector;
+ _capacity = capacity;
_comparer = comparer;
}
@@ -49,7 +51,15 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+
+ if (_parent._capacity.HasValue)
+ {
+ _map = new Dictionary<TKey, ISubject<TElement>>(_parent._capacity.Value, _parent._comparer);
+ }
+ else
+ {
+ _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+ }
}
public void OnNext(TSource value)
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
index 7d91f73..9baf146 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs
@@ -1,13 +1,13 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- #if !NO_PERF
+#if !NO_PERF
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupByUntil<TSource, TKey, TElement, TDuration> : Producer<IGroupedObservable<TKey, TElement>>
{
@@ -15,14 +15,16 @@ namespace System.Reactive.Linq.Observαble
private readonly Func<TSource, TKey> _keySelector;
private readonly Func<TSource, TElement> _elementSelector;
private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
+ private readonly int? _capacity;
private readonly IEqualityComparer<TKey> _comparer;
- public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
_source = source;
_keySelector = keySelector;
_elementSelector = elementSelector;
_durationSelector = durationSelector;
+ _capacity = capacity;
_comparer = comparer;
}
@@ -52,7 +54,7 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _map = new Map<TKey, ISubject<TElement>>(_parent._comparer);
+ _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
_nullGate = new object();
}
@@ -127,7 +129,7 @@ namespace System.Reactive.Linq.Observαble
var md = new SingleAssignmentDisposable();
_parent._groupDisposable.Add(md);
- md.Disposable = duration.SubscribeSafe(new δ(this, key, writer, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, key, writer, md));
}
var element = default(TElement);
@@ -163,14 +165,14 @@ namespace System.Reactive.Linq.Observαble
writer.OnNext(element);
}
- class δ : IObserver<TDuration>
+ class Delta : IObserver<TDuration>
{
private readonly _ _parent;
private readonly TKey _key;
private readonly ISubject<TElement> _writer;
private readonly IDisposable _self;
- public δ(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
+ public Delta(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
{
_parent = parent;
_key = key;
@@ -272,11 +274,38 @@ namespace System.Reactive.Linq.Observαble
#if !NO_CDS
class Map<TKey, TValue>
{
+#if !NO_CDS_COLLECTIONS
+ // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs
+
+ // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
+ // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
+ // and blocking, but also the more expensive operations that require all locks become (e.g. table
+ // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
+ // compromise.
+ private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
+
+ private static int DefaultConcurrencyLevel
+ {
+ get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
+ }
+#endif
+
private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
- public Map(IEqualityComparer<TKey> comparer)
+ public Map(int? capacity, IEqualityComparer<TKey> comparer)
{
- _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+ if (capacity.HasValue)
+ {
+#if NO_CDS_COLLECTIONS
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer);
+#else
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
+#endif
+ }
+ else
+ {
+ _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+ }
}
public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
@@ -327,9 +356,16 @@ namespace System.Reactive.Linq.Observαble
{
private readonly Dictionary<TKey, TValue> _map;
- public Map(IEqualityComparer<TKey> comparer)
+ public Map(int? capacity, IEqualityComparer<TKey> comparer)
{
- _map = new Dictionary<TKey, TValue>(comparer);
+ if (capacity.HasValue)
+ {
+ _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
+ }
+ else
+ {
+ _map = new Dictionary<TKey, TValue>(comparer);
+ }
}
public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs
index 66e49be..19c8c45 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupJoin.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
{
@@ -68,18 +68,18 @@ namespace System.Reactive.Linq.Observαble
_rightID = 0;
_rightMap = new Dictionary<int, TRight>();
- leftSubscription.Disposable = _parent._left.SubscribeSafe(new λ(this, leftSubscription));
- rightSubscription.Disposable = _parent._right.SubscribeSafe(new ρ(this, rightSubscription));
+ leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
+ rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
return _refCount;
}
- class λ : IObserver<TLeft>
+ class LeftObserver : IObserver<TLeft>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public λ(_ parent, IDisposable self)
+ public LeftObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -122,7 +122,7 @@ namespace System.Reactive.Linq.Observαble
}
// BREAKING CHANGE v2 > v1.x - The duration sequence is subscribed to before the result sequence is evaluated.
- md.Disposable = duration.SubscribeSafe(new δ(this, id, s, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, s, md));
var result = default(TResult);
try
@@ -146,14 +146,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TLeftDuration>
+ class Delta : IObserver<TLeftDuration>
{
- private readonly λ _parent;
+ private readonly LeftObserver _parent;
private readonly int _id;
private readonly IObserver<TRight> _group;
private readonly IDisposable _self;
- public δ(λ parent, int id, IObserver<TRight> group, IDisposable self)
+ public Delta(LeftObserver parent, int id, IObserver<TRight> group, IDisposable self)
{
_parent = parent;
_id = id;
@@ -201,12 +201,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ρ : IObserver<TRight>
+ class RightObserver : IObserver<TRight>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ρ(_ parent, IDisposable self)
+ public RightObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -242,7 +242,7 @@ namespace System.Reactive.Linq.Observαble
OnError(exception);
return;
}
- md.Disposable = duration.SubscribeSafe(new δ(this, id, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
lock (_parent._gate)
{
@@ -251,13 +251,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TRightDuration>
+ class Delta : IObserver<TRightDuration>
{
- private readonly ρ _parent;
+ private readonly RightObserver _parent;
private readonly int _id;
private readonly IDisposable _self;
- public δ(ρ parent, int id, IDisposable self)
+ public Delta(RightObserver parent, int id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs
index 8b1804a..8740c32 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/If.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class If<TResult> : Producer<TResult>, IEvaluatableObservable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs
index 65441bd..1f932ac 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IgnoreElements.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class IgnoreElements<TSource> : Producer<TSource>
{
@@ -14,7 +14,7 @@ namespace System.Reactive.Linq.Observαble
_source = source;
}
- public IObservable<TSource> Ω()
+ public IObservable<TSource> Omega()
{
return this;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs
index 674c6f8..722750e 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/IsEmpty.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class IsEmpty<TSource> : Producer<bool>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs
index 13e6ad2..4d22d34 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Join.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> : Producer<TResult>
{
@@ -69,18 +69,18 @@ namespace System.Reactive.Linq.Observαble
_rightID = 0;
_rightMap = new Dictionary<int, TRight>();
- leftSubscription.Disposable = _parent._left.SubscribeSafe(new λ(this, leftSubscription));
- rightSubscription.Disposable = _parent._right.SubscribeSafe(new ρ(this, rightSubscription));
+ leftSubscription.Disposable = _parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
+ rightSubscription.Disposable = _parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
return _group;
}
- class λ : IObserver<TLeft>
+ class LeftObserver : IObserver<TLeft>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public λ(_ parent, IDisposable self)
+ public LeftObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -124,7 +124,7 @@ namespace System.Reactive.Linq.Observαble
return;
}
- md.Disposable = duration.SubscribeSafe(new δ(this, id, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
lock (_parent._gate)
{
@@ -147,13 +147,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TLeftDuration>
+ class Delta : IObserver<TLeftDuration>
{
- private readonly λ _parent;
+ private readonly LeftObserver _parent;
private readonly int _id;
private readonly IDisposable _self;
- public δ(λ parent, int id, IDisposable self)
+ public Delta(LeftObserver parent, int id, IDisposable self)
{
_parent = parent;
_id = id;
@@ -203,12 +203,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ρ : IObserver<TRight>
+ class RightObserver : IObserver<TRight>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ρ(_ parent, IDisposable self)
+ public RightObserver(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -252,7 +252,7 @@ namespace System.Reactive.Linq.Observαble
return;
}
- md.Disposable = duration.SubscribeSafe(new δ(this, id, md));
+ md.Disposable = duration.SubscribeSafe(new Delta(this, id, md));
lock (_parent._gate)
{
@@ -275,13 +275,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TRightDuration>
+ class Delta : IObserver<TRightDuration>
{
- private readonly ρ _parent;
+ private readonly RightObserver _parent;
private readonly int _id;
private readonly IDisposable _self;
- public δ(ρ parent, int id, IDisposable self)
+ public Delta(RightObserver parent, int id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs
index 1ecd930..0a77d32 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LastAsync.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class LastAsync<TSource> : Producer<TSource>
{
@@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new LastAsyncImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -77,13 +77,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<TSource>, IObserver<TSource>
+ class LastAsyncImpl : Sink<TSource>, IObserver<TSource>
{
private readonly LastAsync<TSource> _parent;
private TSource _value;
private bool _seenValue;
- public π(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public LastAsyncImpl(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs
index 2699797..bc92e6d 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Latest.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Latest<TSource> : PushToPullAdapter<TSource, TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs
index 0108e18..63c9f24 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/LongCount.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class LongCount<TSource> : Producer<long>
{
@@ -31,7 +31,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new π(this, observer, cancel);
+ var sink = new LongCountImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -77,12 +77,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<long>, IObserver<TSource>
+ class LongCountImpl : Sink<long>, IObserver<TSource>
{
private readonly LongCount<TSource> _parent;
private long _count;
- public π(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel)
+ public LongCountImpl(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs
index 102267d..fa8e1ee 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Materialize.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Materialize<TSource> : Producer<Notification<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs
index da339da..8e86e75 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Max.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Max<TSource> : Producer<TSource>
{
@@ -28,19 +28,19 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new δ(this, observer, cancel);
+ var sink = new Delta(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
}
- class δ : Sink<TSource>, IObserver<TSource>
+ class Delta : Sink<TSource>, IObserver<TSource>
{
private readonly Max<TSource> _parent;
private bool _hasValue;
private TSource _lastValue;
- public δ(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public Delta(Max<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs
index 52a9a42..4bf97f5 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MaxBy.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class MaxBy<TSource, TKey> : Producer<IList<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs
index a27c3c3..dd710cb 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Merge.cs
@@ -10,7 +10,7 @@ using System.Threading;
using System.Threading.Tasks;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Merge<TSource> : Producer<TSource>
{
@@ -41,14 +41,14 @@ namespace System.Reactive.Linq.Observαble
{
if (_maxConcurrent > 0)
{
- var sink = new μ(this, observer, cancel);
+ var sink = new MergeConcurrent(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#if !NO_TPL
else if (_sourcesT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new MergeImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -93,7 +93,7 @@ namespace System.Reactive.Linq.Observαble
{
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = value.SubscribeSafe(new ι(this, innerSubscription));
+ innerSubscription.Disposable = value.SubscribeSafe(new Iter(this, innerSubscription));
}
public void OnError(Exception error)
@@ -129,12 +129,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TSource>
+ class Iter : IObserver<TSource>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ι(_ parent, IDisposable self)
+ public Iter(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -177,11 +177,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class μ : Sink<TSource>, IObserver<IObservable<TSource>>
+ class MergeConcurrent : Sink<TSource>, IObserver<IObservable<TSource>>
{
private readonly Merge<TSource> _parent;
- public μ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public MergeConcurrent(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -253,15 +253,15 @@ namespace System.Reactive.Linq.Observαble
{
var subscription = new SingleAssignmentDisposable();
_group.Add(subscription);
- subscription.Disposable = innerSource.SubscribeSafe(new ι(this, subscription));
+ subscription.Disposable = innerSource.SubscribeSafe(new Iter(this, subscription));
}
- class ι : IObserver<TSource>
+ class Iter : IObserver<TSource>
{
- private readonly μ _parent;
+ private readonly MergeConcurrent _parent;
private readonly IDisposable _self;
- public ι(μ parent, IDisposable self)
+ public Iter(MergeConcurrent parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -308,11 +308,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TSource>, IObserver<Task<TSource>>
+ class MergeImpl : Sink<TSource>, IObserver<Task<TSource>>
{
private readonly Merge<TSource> _parent;
- public τ(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public MergeImpl(Merge<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs
index f7b6197..fc08002 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Min.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Min<TSource> : Producer<TSource>
{
@@ -28,19 +28,19 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new δ(this, observer, cancel);
+ var sink = new Delta(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
}
- class δ : Sink<TSource>, IObserver<TSource>
+ class Delta : Sink<TSource>, IObserver<TSource>
{
private readonly Min<TSource> _parent;
private bool _hasValue;
private TSource _lastValue;
- public δ(Min<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public Delta(Min<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs
index a32bf92..d9c93f6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MinBy.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class MinBy<TSource, TKey> : Producer<IList<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs
index 937d811..7311a33 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/MostRecent.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class MostRecent<TSource> : PushToPullAdapter<TSource, TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
index 4a77300..ee163a5 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Multicast.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Multicast<TSource, TIntermediate, TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs
index a9adc52..eae632f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Never.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Never<TResult> : IObservable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
index 2d4ec45..4b20276 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Next<TSource> : PushToPullAdapter<TSource, TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs
index d5d7428..eaf7613 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ObserveOn.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ObserveOn<TSource> : Producer<TSource>
{
@@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble
#if !NO_SYNCCTX
if (_context != null)
{
- var sink = new ς(this, observer, cancel);
+ var sink = new ObserveOnImpl(this, observer, cancel);
setSink(sink);
return _source.Subscribe(sink);
}
@@ -47,11 +47,11 @@ namespace System.Reactive.Linq.Observαble
}
#if !NO_SYNCCTX
- class ς : Sink<TSource>, IObserver<TSource>
+ class ObserveOnImpl : Sink<TSource>, IObserver<TSource>
{
private readonly ObserveOn<TSource> _parent;
- public ς(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public ObserveOnImpl(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs
index 07e4515..27a0ca3 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OfType.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class OfType<TSource, TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs
index 26dfd34..077a23f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class OnErrorResumeNext<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs
index 916d12b..6351365 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/PushToPullAdapter.cs
@@ -5,7 +5,7 @@ using System.Collections;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
abstract class PushToPullAdapter<TSource, TResult> : IEnumerable<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs
index c58d94c..414ae72 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Range.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Range : Producer<int>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs
index 1762807..4efb917 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/RefCount.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class RefCount<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs
index fab0607..4f418cb 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Repeat.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Repeat<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs
index cd9ad02..db98f88 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Return.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Return<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
index 0d46c62..6d0e68b 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sample.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Sample<TSource, TSample> : Producer<TSource>
{
@@ -51,7 +51,7 @@ namespace System.Reactive.Linq.Observαble
_sourceSubscription = sourceSubscription;
sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
- var samplerSubscription = _parent._sampler.SubscribeSafe(new σ(this));
+ var samplerSubscription = _parent._sampler.SubscribeSafe(new SampleImpl(this));
return new CompositeDisposable(_sourceSubscription, samplerSubscription);
}
@@ -83,11 +83,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class σ : IObserver<TSample>
+ class SampleImpl : IObserver<TSample>
{
private readonly _ _parent;
- public σ(_ parent)
+ public SampleImpl(_ parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs
index 6c8f2fe..421177f 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Scan.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Scan<TSource, TAccumulate> : Producer<TAccumulate>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs
index e853210..330fe3a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Select.cs
@@ -3,11 +3,11 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
abstract class Select<TResult> : Producer<TResult>
{
- public abstract IObservable<TResult2> Ω<TResult2>(Func<TResult, TResult2> selector);
+ public abstract IObservable<TResult2> Omega<TResult2>(Func<TResult, TResult2> selector);
}
class Select<TSource, TResult> : Select<TResult>
@@ -28,7 +28,7 @@ namespace System.Reactive.Linq.Observαble
_selectorI = selector;
}
- public override IObservable<TResult2> Ω<TResult2>(Func<TResult, TResult2> selector)
+ public override IObservable<TResult2> Omega<TResult2>(Func<TResult, TResult2> selector)
{
if (_selector != null)
return new Select<TSource, TResult2>(_source, x => selector(_selector(x)));
@@ -46,7 +46,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -92,12 +92,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectImpl : Sink<TResult>, IObserver<TSource>
{
private readonly Select<TSource, TResult> _parent;
private int _index;
- public τ(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectImpl(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
index 054f974..49d7cf9 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SelectMany.cs
@@ -1,9 +1,7 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
-using System;
using System.Collections.Generic;
-using System.Reactive;
using System.Reactive.Disposables;
#if !NO_TPL
@@ -11,17 +9,17 @@ using System.Threading;
using System.Threading.Tasks;
#endif
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SelectMany<TSource, TCollection, TResult> : Producer<TResult>
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
- private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex;
+ private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI;
private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
- private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex;
+ private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI;
private readonly Func<TSource, TCollection, TResult> _resultSelector;
- private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex;
+ private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -33,8 +31,8 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
- _collectionSelectorWithIndex = collectionSelector;
- _resultSelectorWithIndex = resultSelector;
+ _collectionSelectorI = collectionSelector;
+ _resultSelectorI = resultSelector;
}
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
@@ -47,12 +45,14 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
_source = source;
- _collectionSelectorEWithIndex = collectionSelector;
- _resultSelectorWithIndex = resultSelector;
+ _collectionSelectorEI = collectionSelector;
+ _resultSelectorI = resultSelector;
}
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
+ private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI;
+ private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI;
public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
@@ -60,27 +60,52 @@ namespace System.Reactive.Linq.Observαble
_collectionSelectorT = collectionSelector;
_resultSelector = resultSelector;
}
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
+ {
+ _source = source;
+ _collectionSelectorTI = collectionSelector;
+ _resultSelectorTI = resultSelector;
+ }
#endif
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_collectionSelector != null || _collectionSelectorWithIndex != null)
+ if (_collectionSelector != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return sink.Run();
}
+ else if (_collectionSelectorI != null)
+ {
+ var sink = new IndexSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
#if !NO_TPL
else if (_collectionSelectorT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectManyImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
+ else if (_collectionSelectorTI != null)
+ {
+ var sink = new Sigma(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#endif
+ else if (_collectionSelectorE != null)
+ {
+ var sink = new NoSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return _source.SubscribeSafe(sink);
+ }
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new Omega(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -94,14 +119,12 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _indexInSource = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
- private int _indexInSource;
public IDisposable Run()
{
@@ -122,13 +145,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._collectionSelector != null)
- collection = _parent._collectionSelector(value);
- else
- {
- checked { _indexInSource++; }
- collection = _parent._collectionSelectorWithIndex(value, _indexInSource);
- }
+ collection = _parent._collectionSelector(value);
}
catch (Exception ex)
{
@@ -142,7 +159,7 @@ namespace System.Reactive.Linq.Observαble
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource));
+ innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription));
}
public void OnError(Exception error)
@@ -178,21 +195,17 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TCollection>
+ class Iter : IObserver<TCollection>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;
- private int _indexInSource;
- private int _indexInIntermediate = -1;
- public ι(_ parent, TSource value, IDisposable self, int indexInSource)
+ public Iter(_ parent, TSource value, IDisposable self)
{
_parent = parent;
_value = value;
_self = self;
- _indexInSource = indexInSource;
- _indexInIntermediate = -1;
}
public void OnNext(TCollection value)
@@ -201,14 +214,164 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._parent._resultSelector != null)
- res = _parent._parent._resultSelector(_value, value);
- else
+ res = _parent._parent._resultSelector(_value, value);
+ }
+ catch (Exception ex)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(ex);
+ _parent.Dispose();
+ }
+ return;
+ }
+
+ lock (_parent._gate)
+ _parent._observer.OnNext(res);
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(error);
+ _parent.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _parent._group.Remove(_self);
+ if (_parent._isStopped && _parent._group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_parent._gate)
{
- checked { _indexInIntermediate++; }
- res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate);
+ _parent._observer.OnCompleted();
+ _parent.Dispose();
}
}
+ }
+ }
+ }
+
+ class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private bool _isStopped;
+ private CompositeDisposable _group;
+ private SingleAssignmentDisposable _sourceSubscription;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _isStopped = false;
+ _group = new CompositeDisposable();
+
+ _sourceSubscription = new SingleAssignmentDisposable();
+ _group.Add(_sourceSubscription);
+ _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
+
+ return _group;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+ var collection = default(IObservable<TCollection>);
+
+ try
+ {
+ collection = _parent._collectionSelectorI(value, index);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ var innerSubscription = new SingleAssignmentDisposable();
+ _group.Add(innerSubscription);
+ innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription));
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _isStopped = true;
+ if (_group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ else
+ {
+ _sourceSubscription.Dispose();
+ }
+ }
+
+ class Iter : IObserver<TCollection>
+ {
+ private readonly IndexSelectorImpl _parent;
+ private readonly TSource _value;
+ private readonly int _valueIndex;
+ private readonly IDisposable _self;
+
+ public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)
+ {
+ _parent = parent;
+ _value = value;
+ _valueIndex = index;
+ _self = self;
+ }
+
+ private int _index;
+
+ public void OnNext(TCollection value)
+ {
+ var res = default(TResult);
+
+ try
+ {
+ res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++));
+ }
catch (Exception ex)
{
lock (_parent._gate)
@@ -254,16 +417,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TSource>
+ class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
- private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection
- public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
- _indexInSource = -1;
}
public void OnNext(TSource value)
@@ -271,14 +432,93 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TCollection>);
try
{
- if (_parent._collectionSelectorE != null)
- xs = _parent._collectionSelectorE(value);
- else
+ xs = _parent._collectionSelectorE(value);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ var e = default(IEnumerator<TCollection>);
+ try
+ {
+ e = xs.GetEnumerator();
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ try
+ {
+ var hasNext = true;
+ while (hasNext)
{
- checked { _indexInSource++; }
- xs = _parent._collectionSelectorEWithIndex(value, _indexInSource);
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = _parent._resultSelector(value, e.Current);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ if (hasNext)
+ base._observer.OnNext(current);
}
}
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+
+ class Omega : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private int _index;
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+
+ var xs = default(IEnumerable<TCollection>);
+ try
+ {
+ xs = _parent._collectionSelectorEI(value, index);
+ }
catch (Exception exception)
{
base._observer.OnError(exception);
@@ -300,7 +540,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- int indexInIntermediate = -1;
+ var eIndex = 0;
var hasNext = true;
while (hasNext)
{
@@ -311,15 +551,7 @@ namespace System.Reactive.Linq.Observαble
{
hasNext = e.MoveNext();
if (hasNext)
- {
- if (_parent._resultSelector != null)
- current = _parent._resultSelector(value, e.Current);
- else
- {
- checked { indexInIntermediate++; }
- current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate);
- }
- }
+ current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++));
}
catch (Exception exception)
{
@@ -354,11 +586,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectManyImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TCollection, TResult> _parent;
- public τ(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -487,6 +719,143 @@ namespace System.Reactive.Linq.Observαble
}
}
}
+
+ class Sigma : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TCollection, TResult> _parent;
+
+ public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private CancellationDisposable _cancel;
+ private volatile int _count;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _cancel = new CancellationDisposable();
+ _count = 1;
+
+ return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+ }
+
+ public void OnNext(TSource value)
+ {
+ var index = checked(_index++);
+
+ var task = default(Task<TCollection>);
+ try
+ {
+ Interlocked.Increment(ref _count);
+ task = _parent._collectionSelectorTI(value, index, _cancel.Token);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ if (task.IsCompleted)
+ {
+ OnCompletedTask(value, index, task);
+ }
+ else
+ {
+ AttachContinuation(value, index, task);
+ }
+ }
+
+ private void AttachContinuation(TSource value, int index, Task<TCollection> task)
+ {
+ //
+ // Separate method to avoid closure in synchronous completion case.
+ //
+ task.ContinueWith(t => OnCompletedTask(value, index, t));
+ }
+
+ private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.RanToCompletion:
+ {
+ var res = default(TResult);
+ try
+ {
+ res = _parent._resultSelectorTI(value, index, task.Result);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ lock (_gate)
+ base._observer.OnNext(res);
+
+ OnCompleted();
+ }
+ break;
+ case TaskStatus.Faulted:
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(task.Exception.InnerException);
+ base.Dispose();
+ }
+ }
+ break;
+ case TaskStatus.Canceled:
+ {
+ if (!_cancel.IsDisposed)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(new TaskCanceledException(task));
+ base.Dispose();
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Decrement(ref _count) == 0)
+ {
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ }
+ }
#pragma warning restore 0420
#endif
}
@@ -495,13 +864,11 @@ namespace System.Reactive.Linq.Observαble
{
private readonly IObservable<TSource> _source;
private readonly Func<TSource, IObservable<TResult>> _selector;
+ private readonly Func<TSource, int, IObservable<TResult>> _selectorI;
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
- private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex;
- private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError;
- private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted;
private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
- private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex;
+ private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI;
public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
{
@@ -509,26 +876,26 @@ namespace System.Reactive.Linq.Observαble
_selector = selector;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
_source = source;
- _selector = selector;
- _selectorOnError = selectorOnError;
- _selectorOnCompleted = selectorOnCompleted;
+ _selectorI = selector;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
+ public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
{
_source = source;
- _selectorWithIndex = selector;
+ _selector = selector;
+ _selectorOnError = selectorOnError;
+ _selectorOnCompleted = selectorOnCompleted;
}
- public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted)
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
{
_source = source;
- _selectorWithIndex = selector;
- _selectorWithIndexOnError = selectorOnError;
- _selectorWithIndexOnCompleted = selectorOnCompleted;
+ _selectorI = selector;
+ _selectorOnError = selectorOnError;
+ _selectorOnCompleted = selectorOnCompleted;
}
public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
@@ -540,38 +907,63 @@ namespace System.Reactive.Linq.Observαble
public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
{
_source = source;
- _selectorEWithIndex = selector;
+ _selectorEI = selector;
}
#if !NO_TPL
private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
+ private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI;
public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
_source = source;
_selectorT = selector;
}
+
+ public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+ _source = source;
+ _selectorTI = selector;
+ }
#endif
protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
{
- if (_selector != null || _selectorWithIndex != null)
+ if (_selector != null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return sink.Run();
}
+ else if (_selectorI != null)
+ {
+ var sink = new IndexSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
#if !NO_TPL
else if (_selectorT != null)
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SelectManyImpl(this, observer, cancel);
+ setSink(sink);
+ return sink.Run();
+ }
+ else if (_selectorTI != null)
+ {
+ var sink = new Sigma(this, observer, cancel);
setSink(sink);
return sink.Run();
}
#endif
+ else if (_selectorE != null)
+ {
+ var sink = new NoSelectorImpl(this, observer, cancel);
+ setSink(sink);
+ return _source.SubscribeSafe(sink);
+ }
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new Omega(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -585,14 +977,12 @@ namespace System.Reactive.Linq.Observαble
: base(observer, cancel)
{
_parent = parent;
- _index = -1;
}
private object _gate;
private bool _isStopped;
private CompositeDisposable _group;
private SingleAssignmentDisposable _sourceSubscription;
- private int _index;
public IDisposable Run()
{
@@ -613,13 +1003,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selector != null)
- inner = _parent._selector(value);
- else
- {
- checked { _index++; }
- inner = _parent._selectorWithIndex(value, _index);
- }
+ inner = _parent._selector(value);
}
catch (Exception ex)
{
@@ -642,13 +1026,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selectorOnError != null)
- inner = _parent._selectorOnError(error);
- else
- {
- checked { _index++; }
- inner = _parent._selectorWithIndexOnError(error, _index);
- }
+ inner = _parent._selectorOnError(error);
}
catch (Exception ex)
{
@@ -682,10 +1060,7 @@ namespace System.Reactive.Linq.Observαble
try
{
- if (_parent._selectorOnCompleted != null)
- inner = _parent._selectorOnCompleted();
- else
- inner = _parent._selectorWithIndexOnCompleted(_index);
+ inner = _parent._selectorOnCompleted();
}
catch (Exception ex)
{
@@ -731,15 +1106,15 @@ namespace System.Reactive.Linq.Observαble
{
var innerSubscription = new SingleAssignmentDisposable();
_group.Add(innerSubscription);
- innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription));
+ innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
}
- class ι : IObserver<TResult>
+ class Iter : IObserver<TResult>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ι(_ parent, IDisposable self)
+ public Iter(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -782,16 +1157,203 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TSource>
+ class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
+
+ public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private bool _isStopped;
+ private CompositeDisposable _group;
+ private SingleAssignmentDisposable _sourceSubscription;
private int _index;
- public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _isStopped = false;
+ _group = new CompositeDisposable();
+
+ _sourceSubscription = new SingleAssignmentDisposable();
+ _group.Add(_sourceSubscription);
+ _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
+
+ return _group;
+ }
+
+ public void OnNext(TSource value)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorI(value, checked(_index++));
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+ }
+
+ public void OnError(Exception error)
+ {
+ if (_parent._selectorOnError != null)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorOnError(error);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+
+ Final();
+ }
+ else
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (_parent._selectorOnCompleted != null)
+ {
+ var inner = default(IObservable<TResult>);
+
+ try
+ {
+ inner = _parent._selectorOnCompleted();
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+ return;
+ }
+
+ SubscribeInner(inner);
+ }
+
+ Final();
+ }
+
+ private void Final()
+ {
+ _isStopped = true;
+ if (_group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ else
+ {
+ _sourceSubscription.Dispose();
+ }
+ }
+
+ private void SubscribeInner(IObservable<TResult> inner)
+ {
+ var innerSubscription = new SingleAssignmentDisposable();
+ _group.Add(innerSubscription);
+ innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
+ }
+
+ class Iter : IObserver<TResult>
+ {
+ private readonly IndexSelectorImpl _parent;
+ private readonly IDisposable _self;
+
+ public Iter(IndexSelectorImpl parent, IDisposable self)
+ {
+ _parent = parent;
+ _self = self;
+ }
+
+ public void OnNext(TResult value)
+ {
+ lock (_parent._gate)
+ _parent._observer.OnNext(value);
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_parent._gate)
+ {
+ _parent._observer.OnError(error);
+ _parent.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ _parent._group.Remove(_self);
+ if (_parent._isStopped && _parent._group.Count == 1)
+ {
+ //
+ // Notice there can be a race between OnCompleted of the source and any
+ // of the inner sequences, where both see _group.Count == 1, and one is
+ // waiting for the lock. There won't be a double OnCompleted observation
+ // though, because the call to Dispose silences the observer by swapping
+ // in a NopObserver<T>.
+ //
+ lock (_parent._gate)
+ {
+ _parent._observer.OnCompleted();
+ _parent.Dispose();
+ }
+ }
+ }
+ }
+ }
+
+ class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
- _index = -1;
}
public void OnNext(TSource value)
@@ -799,14 +1361,91 @@ namespace System.Reactive.Linq.Observαble
var xs = default(IEnumerable<TResult>);
try
{
- if (_parent._selectorE != null)
- xs = _parent._selectorE(value);
- else
+ xs = _parent._selectorE(value);
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ var e = default(IEnumerator<TResult>);
+ try
+ {
+ e = xs.GetEnumerator();
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ try
+ {
+ var hasNext = true;
+ while (hasNext)
{
- checked { _index++; }
- xs = _parent._selectorEWithIndex(value, _index);
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = e.Current;
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ if (hasNext)
+ base._observer.OnNext(current);
}
}
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+
+ public void OnCompleted()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+
+ class Omega : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private int _index;
+
+ public void OnNext(TSource value)
+ {
+ var xs = default(IEnumerable<TResult>);
+ try
+ {
+ xs = _parent._selectorEI(value, checked(_index++));
+ }
catch (Exception exception)
{
base._observer.OnError(exception);
@@ -873,11 +1512,11 @@ namespace System.Reactive.Linq.Observαble
#if !NO_TPL
#pragma warning disable 0420
- class τ : Sink<TResult>, IObserver<TSource>
+ class SelectManyImpl : Sink<TResult>, IObserver<TSource>
{
private readonly SelectMany<TSource, TResult> _parent;
- public τ(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -982,6 +1621,117 @@ namespace System.Reactive.Linq.Observαble
}
}
}
+
+ class Sigma : Sink<TResult>, IObserver<TSource>
+ {
+ private readonly SelectMany<TSource, TResult> _parent;
+
+ public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ _parent = parent;
+ }
+
+ private object _gate;
+ private CancellationDisposable _cancel;
+ private volatile int _count;
+ private int _index;
+
+ public IDisposable Run()
+ {
+ _gate = new object();
+ _cancel = new CancellationDisposable();
+ _count = 1;
+
+ return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
+ }
+
+ public void OnNext(TSource value)
+ {
+ var task = default(Task<TResult>);
+ try
+ {
+ Interlocked.Increment(ref _count);
+ task = _parent._selectorTI(value, checked(_index++), _cancel.Token);
+ }
+ catch (Exception ex)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(ex);
+ base.Dispose();
+ }
+
+ return;
+ }
+
+ if (task.IsCompleted)
+ {
+ OnCompletedTask(task);
+ }
+ else
+ {
+ task.ContinueWith(OnCompletedTask);
+ }
+ }
+
+ private void OnCompletedTask(Task<TResult> task)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.RanToCompletion:
+ {
+ lock (_gate)
+ base._observer.OnNext(task.Result);
+
+ OnCompleted();
+ }
+ break;
+ case TaskStatus.Faulted:
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(task.Exception.InnerException);
+ base.Dispose();
+ }
+ }
+ break;
+ case TaskStatus.Canceled:
+ {
+ if (!_cancel.IsDisposed)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(new TaskCanceledException(task));
+ base.Dispose();
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ base._observer.OnError(error);
+ base.Dispose();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Decrement(ref _count) == 0)
+ {
+ lock (_gate)
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+ }
+ }
#pragma warning restore 0420
#endif
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
index 4661f82..6b03aba 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SequenceEqual.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SequenceEqual<TSource> : Producer<bool>
{
@@ -38,7 +38,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new SequenceEqualImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -226,11 +226,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<bool>, IObserver<TSource>
+ class SequenceEqualImpl : Sink<bool>, IObserver<TSource>
{
private readonly SequenceEqual<TSource> _parent;
- public ε(SequenceEqual<TSource> parent, IObserver<bool> observer, IDisposable cancel)
+ public SequenceEqualImpl(SequenceEqual<TSource> parent, IObserver<bool> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs
index 19c07e5..c4dce4a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SingleAsync.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SingleAsync<TSource> : Producer<TSource>
{
@@ -22,7 +22,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_predicate != null)
{
- var sink = new π(this, observer, cancel);
+ var sink = new SingleAsyncImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -84,13 +84,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<TSource>, IObserver<TSource>
+ class SingleAsyncImpl : Sink<TSource>, IObserver<TSource>
{
private readonly SingleAsync<TSource> _parent;
private TSource _value;
private bool _seenValue;
- public π(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SingleAsyncImpl(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
index a092d99..7222816 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Skip.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Skip<TSource> : Producer<TSource>
{
@@ -27,7 +27,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(int count)
+ public IObservable<TSource> Omega(int count)
{
//
// Sum semantics:
@@ -39,7 +39,7 @@ namespace System.Reactive.Linq.Observαble
return new Skip<TSource>(_source, _count + count);
}
- public IObservable<TSource> Ω(TimeSpan duration)
+ public IObservable<TSource> Omega(TimeSpan duration)
{
//
// Maximum semantics:
@@ -66,7 +66,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SkipImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -105,12 +105,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class SkipImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Skip<TSource> _parent;
private volatile bool _open;
- public τ(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SkipImpl(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs
index 596f183..6bd771e 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipLast.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SkipLast<TSource> : Producer<TSource>
{
@@ -37,7 +37,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SkipLastImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -75,12 +75,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class SkipLastImpl : Sink<TSource>, IObserver<TSource>
{
private readonly SkipLast<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
- public τ(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SkipLastImpl(SkipLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
index 8d4c7a9..8acce09 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipUntil.cs
@@ -6,7 +6,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SkipUntil<TSource, TOther> : Producer<TSource>
{
@@ -140,7 +140,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(DateTimeOffset startTime)
+ public IObservable<TSource> Omega(DateTimeOffset startTime)
{
//
// Maximum semantics:
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs
index e6e379d..b5694dc 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/SkipWhile.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SkipWhile<TSource> : Producer<TSource>
{
@@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new SkipWhileImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -86,13 +86,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class SkipWhileImpl : Sink<TSource>, IObserver<TSource>
{
private readonly SkipWhile<TSource> _parent;
private bool _running;
private int _index;
- public τ(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public SkipWhileImpl(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs
index 6d51153..c5346f8 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Sum.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class SumDouble : Producer<double>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
index 794b0d4..e27d6eb 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Switch.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Switch<TSource> : Producer<TSource>
{
@@ -65,7 +65,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_innerSubscription.Disposable = d;
- d.Disposable = value.SubscribeSafe(new ι(this, id, d));
+ d.Disposable = value.SubscribeSafe(new Iter(this, id, d));
}
public void OnError(Exception error)
@@ -91,13 +91,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ι : IObserver<TSource>
+ class Iter : IObserver<TSource>
{
private readonly _ _parent;
private readonly ulong _id;
private readonly IDisposable _self;
- public ι(_ parent, ulong id, IDisposable self)
+ public Iter(_ parent, ulong id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs
index 36985e9..ef4e885 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Synchronize.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Synchronize<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
index 738e7c0..255e3d2 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Take.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Take<TSource> : Producer<TSource>
{
@@ -27,7 +27,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(int count)
+ public IObservable<TSource> Omega(int count)
{
//
// Minimum semantics:
@@ -42,7 +42,7 @@ namespace System.Reactive.Linq.Observαble
return new Take<TSource>(_source, count);
}
- public IObservable<TSource> Ω(TimeSpan duration)
+ public IObservable<TSource> Omega(TimeSpan duration)
{
//
// Minimum semantics:
@@ -69,7 +69,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new TakeImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -115,11 +115,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class TakeImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Take<TSource> _parent;
- public τ(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TakeImpl(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
index 98bf40e..441a502 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLast.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeLast<TSource> : Producer<TSource>
{
@@ -41,7 +41,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new TakeLastImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -131,12 +131,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class TakeLastImpl : Sink<TSource>, IObserver<TSource>
{
private readonly TakeLast<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
- public τ(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TakeLastImpl(TakeLast<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs
index 71b36b0..2a15825 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeLastBuffer.cs
@@ -7,7 +7,7 @@ using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeLastBuffer<TSource> : Producer<IList<TSource>>
{
@@ -39,7 +39,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new Impl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -82,12 +82,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<IList<TSource>>, IObserver<TSource>
+ class Impl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly TakeLastBuffer<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
- public τ(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
+ public Impl(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
index eccc222..62b450a 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeUntil.cs
@@ -6,7 +6,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeUntil<TSource, TOther> : Producer<TSource>
{
@@ -173,7 +173,7 @@ namespace System.Reactive.Linq.Observαble
_scheduler = scheduler;
}
- public IObservable<TSource> Ω(DateTimeOffset endTime)
+ public IObservable<TSource> Omega(DateTimeOffset endTime)
{
//
// Minimum semantics:
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs
index 30f63ca..dbf6117 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TakeWhile.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TakeWhile<TSource> : Producer<TSource>
{
@@ -33,7 +33,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new TakeWhileImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -91,13 +91,13 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class TakeWhileImpl : Sink<TSource>, IObserver<TSource>
{
private readonly TakeWhile<TSource> _parent;
private bool _running;
private int _index;
- public τ(TakeWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TakeWhileImpl(TakeWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
index 7b7ad86..5061114 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throttle.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Throttle<TSource> : Producer<TSource>
{
@@ -192,7 +192,7 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_cancelable.Disposable = d;
- d.Disposable = throttle.SubscribeSafe(new δ(this, value, currentid, d));
+ d.Disposable = throttle.SubscribeSafe(new Delta(this, value, currentid, d));
}
public void OnError(Exception error)
@@ -226,14 +226,14 @@ namespace System.Reactive.Linq.Observαble
}
}
- class δ : IObserver<TThrottle>
+ class Delta : IObserver<TThrottle>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly ulong _currentid;
private readonly IDisposable _self;
- public δ(_ parent, TSource value, ulong currentid, IDisposable self)
+ public Delta(_ parent, TSource value, ulong currentid, IDisposable self)
{
_parent = parent;
_value = value;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs
index 3b10894..9940a9d 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Throw.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Throw<TResult> : Producer<TResult>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs
index dacfa03..4c40863 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/TimeInterval.cs
@@ -7,7 +7,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class TimeInterval<TSource> : Producer<System.Reactive.TimeInterval<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
index 4896294..94b78a6 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timeout.cs
@@ -5,7 +5,7 @@ using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Timeout<TSource> : Producer<TSource>
{
@@ -35,23 +35,23 @@ namespace System.Reactive.Linq.Observαble
{
if (_dueTimeA.HasValue)
{
- var sink = new α(this, observer, cancel);
+ var sink = new TimeA(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
{
- var sink = new ρ(this, observer, cancel);
+ var sink = new TimeR(this, observer, cancel);
setSink(sink);
return sink.Run();
}
}
- class α : Sink<TSource>, IObserver<TSource>
+ class TimeA : Sink<TSource>, IObserver<TSource>
{
private readonly Timeout<TSource> _parent;
- public α(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TimeA(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -136,11 +136,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ρ : Sink<TSource>, IObserver<TSource>
+ class TimeR : Sink<TSource>, IObserver<TSource>
{
private readonly Timeout<TSource> _parent;
- public ρ(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public TimeR(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -358,16 +358,16 @@ namespace System.Reactive.Linq.Observαble
var d = new SingleAssignmentDisposable();
_timer.Disposable = d;
- d.Disposable = timeout.SubscribeSafe(new τ(this, myid, d));
+ d.Disposable = timeout.SubscribeSafe(new TimeoutImpl(this, myid, d));
}
- class τ : IObserver<TTimeout>
+ class TimeoutImpl : IObserver<TTimeout>
{
private readonly _ _parent;
private readonly ulong _id;
private readonly IDisposable _self;
- public τ(_ parent, ulong id, IDisposable self)
+ public TimeoutImpl(_ parent, ulong id, IDisposable self)
{
_parent = parent;
_id = id;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
index 22ee0df..14df757 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timer.cs
@@ -7,7 +7,7 @@ using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Timer : Producer<long>
{
@@ -34,7 +34,7 @@ namespace System.Reactive.Linq.Observαble
{
if (_period.HasValue)
{
- var sink = new π(this, observer, cancel);
+ var sink = new TimerImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -76,12 +76,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<long>
+ class TimerImpl : Sink<long>
{
private readonly Timer _parent;
private readonly TimeSpan _period;
- public π(Timer parent, IObserver<long> observer, IDisposable cancel)
+ public TimerImpl(Timer parent, IObserver<long> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs
index ed54678..19bf1be 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Timestamp.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Concurrency;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Timestamp<TSource> : Producer<Timestamped<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs
index ba039ae..97b9bd4 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToArray.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToArray<TSource> : Producer<TSource[]>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs
index 919c1a1..5cfd9ec 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToDictionary.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToDictionary<TSource, TKey, TElement> : Producer<IDictionary<TKey, TElement>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs
index 34956d4..a265098 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToList.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToList<TSource> : Producer<IList<TSource>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs
index 55ff7d3..d86f5ee 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToLookup.cs
@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToLookup<TSource, TKey, TElement> : Producer<ILookup<TKey, TElement>>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs
index 795d54d..8de8ff7 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/ToObservable.cs
@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class ToObservable<TSource> : Producer<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
index 996ee0e..680d1f4 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Using.cs
@@ -4,7 +4,7 @@
using System;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Using<TSource, TResource> : Producer<TSource>
where TResource : IDisposable
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs
index a8eaa92..3ab5f73 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Where.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Where<TSource> : Producer<TSource>
{
@@ -23,7 +23,7 @@ namespace System.Reactive.Linq.Observαble
_predicateI = predicate;
}
- public IObservable<TSource> Ω(Func<TSource, bool> predicate)
+ public IObservable<TSource> Omega(Func<TSource, bool> predicate)
{
if (_predicate != null)
return new Where<TSource>(_source, x => _predicate(x) && predicate(x));
@@ -41,7 +41,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new WhereImpl(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
@@ -88,12 +88,12 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<TSource>, IObserver<TSource>
+ class WhereImpl : Sink<TSource>, IObserver<TSource>
{
private readonly Where<TSource> _parent;
private int _index;
- public τ(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+ public WhereImpl(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs
index 178e4c8..2f24adf 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/While.cs
@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class While<TSource> : Producer<TSource>, IConcatenatable<TSource>
{
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs
index 47059c7..4dad064 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Window.cs
@@ -9,7 +9,7 @@ using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Threading;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
class Window<TSource> : Producer<IObservable<TSource>>
{
@@ -54,7 +54,7 @@ namespace System.Reactive.Linq.Observαble
}
else if (_count > 0)
{
- var sink = new μ(this, observer, cancel);
+ var sink = new BoundedWindowImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -62,13 +62,13 @@ namespace System.Reactive.Linq.Observαble
{
if (_timeSpan == _timeShift)
{
- var sink = new π(this, observer, cancel);
+ var sink = new TimeShiftImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
else
{
- var sink = new τ(this, observer, cancel);
+ var sink = new WindowImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -151,11 +151,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class τ : Sink<IObservable<TSource>>, IObserver<TSource>
+ class WindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource> _parent;
- public τ(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public WindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -296,11 +296,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class π : Sink<IObservable<TSource>>, IObserver<TSource>
+ class TimeShiftImpl : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource> _parent;
- public π(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public TimeShiftImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -371,11 +371,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class μ : Sink<IObservable<TSource>>, IObserver<TSource>
+ class BoundedWindowImpl : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource> _parent;
- public μ(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public BoundedWindowImpl(Window<TSource> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -516,7 +516,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new β(this, observer, cancel);
+ var sink = new Beta(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -578,7 +578,7 @@ namespace System.Reactive.Linq.Observαble
var closingSubscription = new SingleAssignmentDisposable();
_m.Disposable = closingSubscription;
- closingSubscription.Disposable = windowClose.SubscribeSafe(new ω(this, closingSubscription));
+ closingSubscription.Disposable = windowClose.SubscribeSafe(new Omega(this, closingSubscription));
}
private void CloseWindow(IDisposable closingSubscription)
@@ -597,12 +597,12 @@ namespace System.Reactive.Linq.Observαble
_windowGate.Wait(CreateWindowClose);
}
- class ω : IObserver<TWindowClosing>
+ class Omega : IObserver<TWindowClosing>
{
private readonly _ _parent;
private readonly IDisposable _self;
- public ω(_ parent, IDisposable self)
+ public Omega(_ parent, IDisposable self)
{
_parent = parent;
_self = self;
@@ -653,11 +653,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class β : Sink<IObservable<TSource>>, IObserver<TSource>
+ class Beta : Sink<IObservable<TSource>>, IObserver<TSource>
{
private readonly Window<TSource, TWindowClosing> _parent;
- public β(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
+ public Beta(Window<TSource, TWindowClosing> parent, IObserver<IObservable<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
@@ -680,16 +680,16 @@ namespace System.Reactive.Linq.Observαble
base._observer.OnNext(window);
d.Add(_parent._source.SubscribeSafe(this));
- d.Add(_parent._windowBoundaries.SubscribeSafe(new ω(this)));
+ d.Add(_parent._windowBoundaries.SubscribeSafe(new Omega(this)));
return _refCountDisposable;
}
- class ω : IObserver<TWindowClosing>
+ class Omega : IObserver<TWindowClosing>
{
- private readonly β _parent;
+ private readonly Beta _parent;
- public ω(β parent)
+ public Omega(Beta parent)
{
_parent = parent;
}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
index bde6e29..77434c7 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Zip.cs
@@ -7,7 +7,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
#region Binary
@@ -42,7 +42,7 @@ namespace System.Reactive.Linq.Observαble
}
else
{
- var sink = new ε(this, observer, cancel);
+ var sink = new ZipImpl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
@@ -258,11 +258,11 @@ namespace System.Reactive.Linq.Observαble
}
}
- class ε : Sink<TResult>, IObserver<TFirst>
+ class ZipImpl : Sink<TResult>, IObserver<TFirst>
{
private readonly Zip<TFirst, TSecond, TResult> _parent;
- public ε(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+ public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs
index d4b0dbc..4489003 100644
--- a/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/_.cs
@@ -3,7 +3,7 @@
#if !NO_PERF
using System;
-namespace System.Reactive.Linq.Observαble
+namespace System.Reactive.Linq.ObservableImpl
{
}
#endif