Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Internal')
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs26
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs36
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs29
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs18
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs115
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs45
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs45
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs12
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs9
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs51
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs83
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs109
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs51
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs79
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs35
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs152
-rw-r--r--Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs187
17 files changed, 1082 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs
new file mode 100644
index 0000000..b14b89b
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs
@@ -0,0 +1,26 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+
+namespace System.Reactive
+{
+ internal sealed class AnonymousEnumerable<T> : IEnumerable<T>
+ {
+ private readonly Func<IEnumerator<T>> getEnumerator;
+
+ public AnonymousEnumerable(Func<IEnumerator<T>> getEnumerator)
+ {
+ this.getEnumerator = getEnumerator;
+ }
+
+ public IEnumerator<T> GetEnumerator()
+ {
+ return getEnumerator();
+ }
+
+ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs
new file mode 100644
index 0000000..870b6a9
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs
@@ -0,0 +1,36 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+#if NO_PERF
+ class BinaryObserver<TLeft, TRight> : IObserver<Either<Notification<TLeft>, Notification<TRight>>>
+ {
+ public BinaryObserver(IObserver<TLeft> leftObserver, IObserver<TRight> rightObserver)
+ {
+ LeftObserver = leftObserver;
+ RightObserver = rightObserver;
+ }
+
+ public BinaryObserver(Action<Notification<TLeft>> left, Action<Notification<TRight>> right)
+ : this(left.ToObserver(), right.ToObserver())
+ {
+ }
+
+ public IObserver<TLeft> LeftObserver { get; private set; }
+ public IObserver<TRight> RightObserver { get; private set; }
+
+ void IObserver<Either<Notification<TLeft>, Notification<TRight>>>.OnNext(Either<Notification<TLeft>, Notification<TRight>> value)
+ {
+ value.Switch(left => left.Accept(LeftObserver), right => right.Accept(RightObserver));
+ }
+
+ void IObserver<Either<Notification<TLeft>, Notification<TRight>>>.OnError(Exception exception)
+ {
+ }
+
+ void IObserver<Either<Notification<TLeft>, Notification<TRight>>>.OnCompleted()
+ {
+ }
+ }
+#endif
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs
new file mode 100644
index 0000000..5f4a12f
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+
+namespace System.Reactive
+{
+ abstract class ConcatSink<TSource> : TailRecursiveSink<TSource>
+ {
+ public ConcatSink(IObserver<TSource> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ }
+
+ protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
+ {
+ var concat = source as IConcatenatable<TSource>;
+ if (concat != null)
+ return concat.GetSources();
+
+ return null;
+ }
+
+ public override void OnCompleted()
+ {
+ _recurse();
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs
new file mode 100644
index 0000000..3e6759b
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs
@@ -0,0 +1,18 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+//
+// NOTE: Identical copies of this file are kept in System.Reactive.Linq and System.Reactive.Providers.
+//
+
+namespace System.Reactive
+{
+ // We can't make those based on the Strings_*.resx file, because the ObsoleteAttribute needs a compile-time constant.
+
+ class Constants_Linq
+ {
+#if PREFER_ASYNC
+ public const string USE_ASYNC = "This blocking operation is no longer supported. Instead, use the async version in combination with C# and Visual Basic async/await support. In case you need a blocking operation, use Wait or convert the resulting observable sequence to a Task object and block. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information.";
+ public const string USE_TASK_FROMASYNCPATTERN = "This conversion is no longer supported. Replace use of the Begin/End asynchronous method pair with a new Task-based async method, and convert the result using ToObservable. If no Task-based async method is available, use Task.Factory.FromAsync to obtain a Task object. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information.";
+#endif
+ }
+} \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs
new file mode 100644
index 0000000..6bb8372
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs
@@ -0,0 +1,115 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace System.Reactive
+{
+ abstract class Either<TLeft, TRight>
+ {
+ Either()
+ {
+ }
+
+ public static Either<TLeft, TRight> CreateLeft(TLeft value)
+ {
+ return new Either<TLeft, TRight>.Left(value);
+ }
+
+ public static Either<TLeft, TRight> CreateRight(TRight value)
+ {
+ return new Either<TLeft, TRight>.Right(value);
+ }
+
+ public abstract TResult Switch<TResult>(Func<TLeft, TResult> caseLeft, Func<TRight, TResult> caseRight);
+ public abstract void Switch(Action<TLeft> caseLeft, Action<TRight> caseRight);
+
+ public sealed class Left : Either<TLeft, TRight>, IEquatable<Left>
+ {
+ public TLeft Value { get; private set; }
+
+ public Left(TLeft value)
+ {
+ Value = value;
+ }
+
+ public override TResult Switch<TResult>(Func<TLeft, TResult> caseLeft, Func<TRight, TResult> caseRight)
+ {
+ return caseLeft(Value);
+ }
+
+ public override void Switch(Action<TLeft> caseLeft, Action<TRight> caseRight)
+ {
+ caseLeft(Value);
+ }
+
+ public bool Equals(Left other)
+ {
+ if (other == this)
+ return true;
+ if (other == null)
+ return false;
+ return EqualityComparer<TLeft>.Default.Equals(Value, other.Value);
+ }
+
+ public override bool Equals(object obj)
+ {
+ return Equals(obj as Left);
+ }
+
+ public override int GetHashCode()
+ {
+ return EqualityComparer<TLeft>.Default.GetHashCode(Value);
+ }
+
+ public override string ToString()
+ {
+ return string.Format(CultureInfo.CurrentCulture, "Left({0})", Value);
+ }
+ }
+
+ public sealed class Right : Either<TLeft, TRight>, IEquatable<Right>
+ {
+ public TRight Value { get; private set; }
+
+ public Right(TRight value)
+ {
+ Value = value;
+ }
+
+ public override TResult Switch<TResult>(Func<TLeft, TResult> caseLeft, Func<TRight, TResult> caseRight)
+ {
+ return caseRight(Value);
+ }
+
+ public override void Switch(Action<TLeft> caseLeft, Action<TRight> caseRight)
+ {
+ caseRight(Value);
+ }
+
+ public bool Equals(Right other)
+ {
+ if (other == this)
+ return true;
+ if (other == null)
+ return false;
+ return EqualityComparer<TRight>.Default.Equals(Value, other.Value);
+ }
+
+ public override bool Equals(object obj)
+ {
+ return Equals(obj as Right);
+ }
+
+ public override int GetHashCode()
+ {
+ return EqualityComparer<TRight>.Default.GetHashCode(Value);
+ }
+
+ public override string ToString()
+ {
+ return string.Format(CultureInfo.CurrentCulture, "Right({0})", Value);
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs
new file mode 100644
index 0000000..f1fce24
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs
@@ -0,0 +1,45 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_HASHSET
+using System;
+using System.Collections.Generic;
+
+namespace System.Reactive
+{
+ class HashSet<T>
+ {
+ private readonly Dictionary<T, object> _set;
+ private bool _hasNull;
+
+ public HashSet(IEqualityComparer<T> comparer)
+ {
+ _set = new Dictionary<T, object>(comparer);
+ _hasNull = false;
+ }
+
+ public bool Add(T value)
+ {
+ //
+ // Note: The box instruction in the IL will be erased by the JIT in case T is
+ // a value type. See GroupBy for more information.
+ //
+ if (value == null)
+ {
+ if (_hasNull)
+ return false;
+
+ _hasNull = true;
+ return true;
+ }
+ else
+ {
+ if (_set.ContainsKey(value))
+ return false;
+
+ _set[value] = null;
+ return true;
+ }
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs
new file mode 100644
index 0000000..dee33fb
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs
@@ -0,0 +1,45 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System;
+using System.Collections.Generic;
+using System.Reactive.Linq.Observαble;
+
+namespace System.Reactive
+{
+ internal static class Helpers
+ {
+ public static int? GetLength<T>(IEnumerable<T> source)
+ {
+ var array = source as T[];
+ if (array != null)
+ return array.Length;
+
+ var list = source as IList<T>;
+ if (list != null)
+ return list.Count;
+
+ return null;
+ }
+
+ public static IObservable<T> Unpack<T>(IObservable<T> source)
+ {
+ var hasOpt = default(bool);
+
+ do
+ {
+ hasOpt = false;
+
+ var eval = source as IEvaluatableObservable<T>;
+ if (eval != null)
+ {
+ source = eval.Eval();
+ hasOpt = true;
+ }
+ } while (hasOpt);
+
+ return source;
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs
new file mode 100644
index 0000000..88a89c8
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs
@@ -0,0 +1,12 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+
+namespace System.Reactive
+{
+ interface IConcatenatable<TSource>
+ {
+ IEnumerable<IObservable<TSource>> GetSources();
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs
new file mode 100644
index 0000000..d85c226
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs
@@ -0,0 +1,9 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ interface IEvaluatableObservable<T>
+ {
+ IObservable<T> Eval();
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs
new file mode 100644
index 0000000..49c79d9
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs
@@ -0,0 +1,51 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ class ImmutableList<T>
+ {
+ T[] data;
+
+ public ImmutableList()
+ {
+ data = new T[0];
+ }
+
+ public ImmutableList(T[] data)
+ {
+ this.data = data;
+ }
+
+ public ImmutableList<T> Add(T value)
+ {
+ var newData = new T[data.Length + 1];
+ Array.Copy(data, newData, data.Length);
+ newData[data.Length] = value;
+ return new ImmutableList<T>(newData);
+ }
+
+ public ImmutableList<T> Remove(T value)
+ {
+ var i = IndexOf(value);
+ if (i < 0)
+ return this;
+ var newData = new T[data.Length - 1];
+ Array.Copy(data, 0, newData, 0, i);
+ Array.Copy(data, i + 1, newData, i, data.Length - i - 1);
+ return new ImmutableList<T>(newData);
+ }
+
+ public int IndexOf(T value)
+ {
+ for (var i = 0; i < data.Length; ++i)
+ if (data[i].Equals(value))
+ return i;
+ return -1;
+ }
+
+ public T[] Data
+ {
+ get { return data; }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs
new file mode 100644
index 0000000..239f10d
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs
@@ -0,0 +1,83 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace System.Reactive
+{
+ class Lookup<K, E> : ILookup<K, E>
+ {
+ Dictionary<K, List<E>> d;
+
+ public Lookup(IEqualityComparer<K> comparer)
+ {
+ d = new Dictionary<K, List<E>>(comparer);
+ }
+
+ public void Add(K key, E element)
+ {
+ var list = default(List<E>);
+ if (!d.TryGetValue(key, out list))
+ d[key] = list = new List<E>();
+ list.Add(element);
+ }
+
+ public bool Contains(K key)
+ {
+ return d.ContainsKey(key);
+ }
+
+ public int Count
+ {
+ get { return d.Count; }
+ }
+
+ public IEnumerable<E> this[K key]
+ {
+ get { return Hide(d[key]); }
+ }
+
+ private IEnumerable<E> Hide(List<E> elements)
+ {
+ foreach (var x in elements)
+ yield return x;
+ }
+
+ public IEnumerator<IGrouping<K, E>> GetEnumerator()
+ {
+ foreach (var kv in d)
+ yield return new Grouping(kv);
+ }
+
+ class Grouping : IGrouping<K, E>
+ {
+ KeyValuePair<K, List<E>> kv;
+
+ public Grouping(KeyValuePair<K, List<E>> kv)
+ {
+ this.kv = kv;
+ }
+
+ public K Key
+ {
+ get { return kv.Key; }
+ }
+
+ public IEnumerator<E> GetEnumerator()
+ {
+ return kv.Value.GetEnumerator();
+ }
+
+ Collections.IEnumerator Collections.IEnumerable.GetEnumerator()
+ {
+ return GetEnumerator();
+ }
+ }
+
+ Collections.IEnumerator Collections.IEnumerable.GetEnumerator()
+ {
+ return GetEnumerator();
+ }
+ }
+} \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs
new file mode 100644
index 0000000..d7a49c1
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs
@@ -0,0 +1,109 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+namespace System.Reactive
+{
+ class NopObserver<T> : IObserver<T>
+ {
+ public static readonly IObserver<T> Instance = new NopObserver<T>();
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(T value)
+ {
+ }
+ }
+
+ class DoneObserver<T> : IObserver<T>
+ {
+ public static readonly IObserver<T> Completed = new DoneObserver<T>();
+
+ public Exception Exception { get; set; }
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(T value)
+ {
+ }
+ }
+
+ class DisposedObserver<T> : IObserver<T>
+ {
+ public static readonly IObserver<T> Instance = new DisposedObserver<T>();
+
+ public void OnCompleted()
+ {
+ throw new ObjectDisposedException("");
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new ObjectDisposedException("");
+ }
+
+ public void OnNext(T value)
+ {
+ throw new ObjectDisposedException("");
+ }
+ }
+
+ class Observer<T> : IObserver<T>
+ {
+ private readonly ImmutableList<IObserver<T>> _observers;
+
+ public Observer(ImmutableList<IObserver<T>> observers)
+ {
+ _observers = observers;
+ }
+
+ public void OnCompleted()
+ {
+ foreach (var observer in _observers.Data)
+ observer.OnCompleted();
+ }
+
+ public void OnError(Exception error)
+ {
+ foreach (var observer in _observers.Data)
+ observer.OnError(error);
+ }
+
+ public void OnNext(T value)
+ {
+ foreach (var observer in _observers.Data)
+ observer.OnNext(value);
+ }
+
+ internal IObserver<T> Add(IObserver<T> observer)
+ {
+ return new Observer<T>(_observers.Add(observer));
+ }
+
+ internal IObserver<T> Remove(IObserver<T> observer)
+ {
+ var i = Array.IndexOf(_observers.Data, observer);
+ if (i < 0)
+ return this;
+
+ if (_observers.Data.Length == 2)
+ {
+ return _observers.Data[1 - i];
+ }
+ else
+ {
+ return new Observer<T>(_observers.Remove(observer));
+ }
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs
new file mode 100644
index 0000000..85ed55f
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs
@@ -0,0 +1,51 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+
+namespace System.Reactive
+{
+ /// <summary>
+ /// Base class for implementation of query operators, providing performance benefits over the use of <see cref="System.Reactive.Linq.Observable.Create">Observable.Create</see>.
+ /// </summary>
+ /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
+ abstract class Producer<TSource> : IObservable<TSource>
+ {
+ /// <summary>
+ /// Publicly visible Subscribe method.
+ /// </summary>
+ /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
+ /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
+ public IDisposable Subscribe(IObserver<TSource> observer)
+ {
+ if (observer == null)
+ throw new ArgumentNullException("observer");
+
+ var sink = new SingleAssignmentDisposable();
+ var subscription = new SingleAssignmentDisposable();
+
+ if (CurrentThreadScheduler.Instance.ScheduleRequired)
+ {
+ CurrentThreadScheduler.Instance.Schedule(this, (_, me) => subscription.Disposable = me.Run(observer, subscription, s => sink.Disposable = s));
+ }
+ else
+ {
+ subscription.Disposable = this.Run(observer, subscription, s => sink.Disposable = s);
+ }
+
+ return new CompositeDisposable(2) { sink, subscription };
+ }
+
+ /// <summary>
+ /// Core implementation of the query operator, called upon a new subscription to the producer object.
+ /// </summary>
+ /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
+ /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param>
+ /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param>
+ /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns>
+ /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks>
+ protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink);
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs
new file mode 100644
index 0000000..5804538
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs
@@ -0,0 +1,79 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if NO_CDS || NO_PERF
+using System.Collections.Generic;
+
+namespace System.Reactive
+{
+ sealed class PushPullAdapter<T, R> : IObserver<T>, IEnumerator<R>
+ {
+ Action<Notification<T>> yield;
+ Action dispose;
+ Func<Notification<R>> moveNext;
+ Notification<R> current;
+ bool done = false;
+ bool disposed;
+
+ public PushPullAdapter(Action<Notification<T>> yield, Func<Notification<R>> moveNext, Action dispose)
+ {
+ this.yield = yield;
+ this.moveNext = moveNext;
+ this.dispose = dispose;
+ }
+
+ public void OnNext(T value)
+ {
+ yield(Notification.CreateOnNext<T>(value));
+ }
+
+ public void OnError(Exception exception)
+ {
+ yield(Notification.CreateOnError<T>(exception));
+ dispose();
+ }
+
+ public void OnCompleted()
+ {
+ yield(Notification.CreateOnCompleted<T>());
+ dispose();
+ }
+
+ public R Current
+ {
+ get { return current.Value; }
+ }
+
+ public void Dispose()
+ {
+ disposed = true;
+ dispose();
+ }
+
+ object System.Collections.IEnumerator.Current
+ {
+ get { return this.Current; }
+ }
+
+ public bool MoveNext()
+ {
+ if (disposed)
+ throw new ObjectDisposedException("");
+
+ if (!done)
+ {
+ current = moveNext();
+ done = current.Kind != NotificationKind.OnNext;
+ }
+
+ current.Exception.ThrowIfNotNull();
+
+ return current.HasValue;
+ }
+
+ public void Reset()
+ {
+ throw new NotSupportedException();
+ }
+ }
+}
+#endif \ No newline at end of file
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs
new file mode 100644
index 0000000..926f612
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs
@@ -0,0 +1,35 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Reactive.PlatformServices;
+
+namespace System.Reactive.Linq
+{
+ internal static class QueryServices
+ {
+ private static Lazy<IQueryServices> s_services = new Lazy<IQueryServices>(Initialize);
+
+ public static T GetQueryImpl<T>(T defaultInstance)
+ {
+ return s_services.Value.Extend(defaultInstance);
+ }
+
+ private static IQueryServices Initialize()
+ {
+ return PlatformEnlightenmentProvider.Current.GetService<IQueryServices>() ?? new DefaultQueryServices();
+ }
+ }
+
+ internal interface IQueryServices
+ {
+ T Extend<T>(T baseImpl);
+ }
+
+ class DefaultQueryServices : IQueryServices
+ {
+ public T Extend<T>(T baseImpl)
+ {
+ return baseImpl;
+ }
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs
new file mode 100644
index 0000000..effdd08
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs
@@ -0,0 +1,152 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Reflection;
+
+#if HAS_WINRT
+using System.Runtime.InteropServices.WindowsRuntime;
+#endif
+
+namespace System.Reactive
+{
+ static class ReflectionUtils
+ {
+ public static TDelegate CreateDelegate<TDelegate>(object o, MethodInfo method)
+ {
+#if CRIPPLED_REFLECTION
+ return (TDelegate)(object)method.CreateDelegate(typeof(TDelegate), o);
+#else
+ return (TDelegate)(object)Delegate.CreateDelegate(typeof(TDelegate), o, method);
+#endif
+ }
+
+ public static Delegate CreateDelegate(Type delegateType, object o, MethodInfo method)
+ {
+#if CRIPPLED_REFLECTION
+ return method.CreateDelegate(delegateType, o);
+#else
+ return Delegate.CreateDelegate(delegateType, o, method);
+#endif
+ }
+
+ public static void GetEventMethods<TSender, TEventArgs>(Type targetType, object target, string eventName, out MethodInfo addMethod, out MethodInfo removeMethod, out Type delegateType, out bool isWinRT)
+#if !NO_EVENTARGS_CONSTRAINT
+ where TEventArgs : EventArgs
+#endif
+ {
+ var e = default(EventInfo);
+
+ if (target == null)
+ {
+ e = targetType.GetEventEx(eventName, true);
+ if (e == null)
+ throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.COULD_NOT_FIND_STATIC_EVENT, eventName, targetType.FullName));
+ }
+ else
+ {
+ e = targetType.GetEventEx(eventName, false);
+ if (e == null)
+ throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.COULD_NOT_FIND_INSTANCE_EVENT, eventName, targetType.FullName));
+ }
+
+ addMethod = e.GetAddMethod();
+ removeMethod = e.GetRemoveMethod();
+
+ if (addMethod == null)
+ throw new InvalidOperationException(Strings_Linq.EVENT_MISSING_ADD_METHOD);
+ if (removeMethod == null)
+ throw new InvalidOperationException(Strings_Linq.EVENT_MISSING_REMOVE_METHOD);
+
+ var psa = addMethod.GetParameters();
+ if (psa.Length != 1)
+ throw new InvalidOperationException(Strings_Linq.EVENT_ADD_METHOD_SHOULD_TAKE_ONE_PARAMETER);
+
+ var psr = removeMethod.GetParameters();
+ if (psr.Length != 1)
+ throw new InvalidOperationException(Strings_Linq.EVENT_REMOVE_METHOD_SHOULD_TAKE_ONE_PARAMETER);
+
+ isWinRT = false;
+
+#if HAS_WINRT
+ if (addMethod.ReturnType == typeof(EventRegistrationToken))
+ {
+ isWinRT = true;
+
+ var pet = psr[0];
+ if (pet.ParameterType != typeof(EventRegistrationToken))
+ throw new InvalidOperationException(Strings_Linq.EVENT_WINRT_REMOVE_METHOD_SHOULD_TAKE_ERT);
+ }
+#endif
+
+ delegateType = psa[0].ParameterType;
+
+ var invokeMethod = delegateType.GetMethod("Invoke");
+
+ var parameters = invokeMethod.GetParameters();
+
+ if (parameters.Length != 2)
+ throw new InvalidOperationException(Strings_Linq.EVENT_PATTERN_REQUIRES_TWO_PARAMETERS);
+
+ if (!typeof(TSender).IsAssignableFrom(parameters[0].ParameterType))
+ throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.EVENT_SENDER_NOT_ASSIGNABLE, typeof(TSender).FullName));
+
+ if (!typeof(TEventArgs).IsAssignableFrom(parameters[1].ParameterType))
+ throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.EVENT_ARGS_NOT_ASSIGNABLE, typeof(TEventArgs).FullName));
+
+ if (invokeMethod.ReturnType != typeof(void))
+ throw new InvalidOperationException(Strings_Linq.EVENT_MUST_RETURN_VOID);
+ }
+
+ public static EventInfo GetEventEx(this Type type, string name, bool isStatic)
+ {
+#if CRIPPLED_REFLECTION
+ // TODO: replace in the future by System.Reflection.RuntimeExtensions extension methods
+ var q = new Queue<TypeInfo>();
+ q.Enqueue(type.GetTypeInfo());
+
+ while (q.Count > 0)
+ {
+ var t = q.Dequeue();
+
+ var e = t.GetDeclaredEvent(name);
+ if (e != null)
+ return e;
+
+ foreach (var i in t.ImplementedInterfaces)
+ q.Enqueue(i.GetTypeInfo());
+
+ if (t.BaseType != null)
+ q.Enqueue(t.BaseType.GetTypeInfo());
+ }
+
+ return null;
+#else
+ return type.GetEvent(name, isStatic ? BindingFlags.Public | BindingFlags.Static : BindingFlags.Public | BindingFlags.Instance);
+#endif
+ }
+
+#if CRIPPLED_REFLECTION
+ public static MethodInfo GetMethod(this Type type, string name)
+ {
+ return type.GetTypeInfo().GetDeclaredMethod(name);
+ }
+
+ public static MethodInfo GetAddMethod(this EventInfo eventInfo)
+ {
+ return eventInfo.AddMethod;
+ }
+
+ public static MethodInfo GetRemoveMethod(this EventInfo eventInfo)
+ {
+ return eventInfo.RemoveMethod;
+ }
+
+ public static bool IsAssignableFrom(this Type type1, Type type2)
+ {
+ return type1.GetTypeInfo().IsAssignableFrom(type2.GetTypeInfo());
+ }
+#endif
+ }
+}
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs
new file mode 100644
index 0000000..3b974e6
--- /dev/null
+++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs
@@ -0,0 +1,187 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System.Collections.Generic;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+
+namespace System.Reactive
+{
+ abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource>
+ {
+ public TailRecursiveSink(IObserver<TSource> observer, IDisposable cancel)
+ : base(observer, cancel)
+ {
+ }
+
+ private bool _isDisposed;
+ private SerialDisposable _subscription;
+ private AsyncLock _gate;
+ private Stack<IEnumerator<IObservable<TSource>>> _stack;
+ private Stack<int?> _length;
+ protected Action _recurse;
+
+ public IDisposable Run(IEnumerable<IObservable<TSource>> sources)
+ {
+ _isDisposed = false;
+ _subscription = new SerialDisposable();
+ _gate = new AsyncLock();
+ _stack = new Stack<IEnumerator<IObservable<TSource>>>();
+ _length = new Stack<int?>();
+
+ var e = default(IEnumerator<IObservable<TSource>>);
+ if (!TryGetEnumerator(sources, out e))
+ return Disposable.Empty;
+
+ _stack.Push(e);
+ _length.Push(Helpers.GetLength(sources));
+
+ var cancelable = SchedulerDefaults.TailRecursion.Schedule(self =>
+ {
+ _recurse = self;
+ _gate.Wait(MoveNext);
+ });
+
+ return new CompositeDisposable(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));
+ }
+
+ protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);
+
+ private void MoveNext()
+ {
+ var hasNext = false;
+ var next = default(IObservable<TSource>);
+
+ do
+ {
+ if (_stack.Count == 0)
+ break;
+
+ if (_isDisposed)
+ return;
+
+ var e = _stack.Peek();
+ var l = _length.Peek();
+
+ var current = default(IObservable<TSource>);
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = e.Current;
+ }
+ catch (Exception ex)
+ {
+ e.Dispose();
+
+ base._observer.OnError(ex);
+ base.Dispose();
+ return;
+ }
+
+ if (!hasNext)
+ {
+ e.Dispose();
+
+ _stack.Pop();
+ _length.Pop();
+ }
+ else
+ {
+ var r = l - 1;
+ _length.Pop();
+ _length.Push(r);
+
+ try
+ {
+ next = Helpers.Unpack(current);
+ }
+ catch (Exception exception)
+ {
+ e.Dispose();
+ base._observer.OnError(exception);
+ base.Dispose();
+ return;
+ }
+
+ //
+ // Tail recursive case; drop the current frame.
+ //
+ if (r == 0)
+ {
+ e.Dispose();
+ _stack.Pop();
+ _length.Pop();
+ }
+
+ //
+ // Flattening of nested sequences. Prevents stack overflow in observers.
+ //
+ var nextSeq = Extract(next);
+ if (nextSeq != null)
+ {
+ var nextEnumerator = default(IEnumerator<IObservable<TSource>>);
+ if (!TryGetEnumerator(nextSeq, out nextEnumerator))
+ return;
+
+ _stack.Push(nextEnumerator);
+ _length.Push(Helpers.GetLength(nextSeq));
+
+ hasNext = false;
+ }
+ }
+ } while (!hasNext);
+
+ if (!hasNext)
+ {
+ Done();
+ return;
+ }
+
+ var d = new SingleAssignmentDisposable();
+ _subscription.Disposable = d;
+ d.Disposable = next.SubscribeSafe(this);
+ }
+
+ private new void Dispose()
+ {
+ while (_stack.Count > 0)
+ {
+ var e = _stack.Pop();
+ _length.Pop();
+
+ e.Dispose();
+ }
+
+ _isDisposed = true;
+ }
+
+ private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEnumerator<IObservable<TSource>> result)
+ {
+ try
+ {
+ result = sources.GetEnumerator();
+ return true;
+ }
+ catch (Exception exception)
+ {
+ base._observer.OnError(exception);
+ base.Dispose();
+
+ result = null;
+ return false;
+ }
+ }
+
+ public abstract void OnCompleted();
+ public abstract void OnError(Exception error);
+ public abstract void OnNext(TSource value);
+
+ protected virtual void Done()
+ {
+ base._observer.OnCompleted();
+ base.Dispose();
+ }
+ }
+}
+#endif \ No newline at end of file