diff options
Diffstat (limited to 'Rx/NET/Source/System.Reactive.Linq/Reactive/Internal')
17 files changed, 1082 insertions, 0 deletions
diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs new file mode 100644 index 0000000..b14b89b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/AnonymousEnumerable.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; + +namespace System.Reactive +{ + internal sealed class AnonymousEnumerable<T> : IEnumerable<T> + { + private readonly Func<IEnumerator<T>> getEnumerator; + + public AnonymousEnumerable(Func<IEnumerator<T>> getEnumerator) + { + this.getEnumerator = getEnumerator; + } + + public IEnumerator<T> GetEnumerator() + { + return getEnumerator(); + } + + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() + { + return this.GetEnumerator(); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs new file mode 100644 index 0000000..870b6a9 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/BinaryObserver.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ +#if NO_PERF + class BinaryObserver<TLeft, TRight> : IObserver<Either<Notification<TLeft>, Notification<TRight>>> + { + public BinaryObserver(IObserver<TLeft> leftObserver, IObserver<TRight> rightObserver) + { + LeftObserver = leftObserver; + RightObserver = rightObserver; + } + + public BinaryObserver(Action<Notification<TLeft>> left, Action<Notification<TRight>> right) + : this(left.ToObserver(), right.ToObserver()) + { + } + + public IObserver<TLeft> LeftObserver { get; private set; } + public IObserver<TRight> RightObserver { get; private set; } + + void IObserver<Either<Notification<TLeft>, Notification<TRight>>>.OnNext(Either<Notification<TLeft>, Notification<TRight>> value) + { + value.Switch(left => left.Accept(LeftObserver), right => right.Accept(RightObserver)); + } + + void IObserver<Either<Notification<TLeft>, Notification<TRight>>>.OnError(Exception exception) + { + } + + void IObserver<Either<Notification<TLeft>, Notification<TRight>>>.OnCompleted() + { + } + } +#endif +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs new file mode 100644 index 0000000..5f4a12f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ConcatSink.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; + +namespace System.Reactive +{ + abstract class ConcatSink<TSource> : TailRecursiveSink<TSource> + { + public ConcatSink(IObserver<TSource> observer, IDisposable cancel) + : base(observer, cancel) + { + } + + protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source) + { + var concat = source as IConcatenatable<TSource>; + if (concat != null) + return concat.GetSources(); + + return null; + } + + public override void OnCompleted() + { + _recurse(); + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs new file mode 100644 index 0000000..3e6759b --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Constants.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +// +// NOTE: Identical copies of this file are kept in System.Reactive.Linq and System.Reactive.Providers. +// + +namespace System.Reactive +{ + // We can't make those based on the Strings_*.resx file, because the ObsoleteAttribute needs a compile-time constant. + + class Constants_Linq + { +#if PREFER_ASYNC + public const string USE_ASYNC = "This blocking operation is no longer supported. Instead, use the async version in combination with C# and Visual Basic async/await support. In case you need a blocking operation, use Wait or convert the resulting observable sequence to a Task object and block. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; + public const string USE_TASK_FROMASYNCPATTERN = "This conversion is no longer supported. Replace use of the Begin/End asynchronous method pair with a new Task-based async method, and convert the result using ToObservable. If no Task-based async method is available, use Task.Factory.FromAsync to obtain a Task object. See http://go.microsoft.com/fwlink/?LinkID=260866 for more information."; +#endif + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs new file mode 100644 index 0000000..6bb8372 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Either.Generic.cs @@ -0,0 +1,115 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Globalization; + +namespace System.Reactive +{ + abstract class Either<TLeft, TRight> + { + Either() + { + } + + public static Either<TLeft, TRight> CreateLeft(TLeft value) + { + return new Either<TLeft, TRight>.Left(value); + } + + public static Either<TLeft, TRight> CreateRight(TRight value) + { + return new Either<TLeft, TRight>.Right(value); + } + + public abstract TResult Switch<TResult>(Func<TLeft, TResult> caseLeft, Func<TRight, TResult> caseRight); + public abstract void Switch(Action<TLeft> caseLeft, Action<TRight> caseRight); + + public sealed class Left : Either<TLeft, TRight>, IEquatable<Left> + { + public TLeft Value { get; private set; } + + public Left(TLeft value) + { + Value = value; + } + + public override TResult Switch<TResult>(Func<TLeft, TResult> caseLeft, Func<TRight, TResult> caseRight) + { + return caseLeft(Value); + } + + public override void Switch(Action<TLeft> caseLeft, Action<TRight> caseRight) + { + caseLeft(Value); + } + + public bool Equals(Left other) + { + if (other == this) + return true; + if (other == null) + return false; + return EqualityComparer<TLeft>.Default.Equals(Value, other.Value); + } + + public override bool Equals(object obj) + { + return Equals(obj as Left); + } + + public override int GetHashCode() + { + return EqualityComparer<TLeft>.Default.GetHashCode(Value); + } + + public override string ToString() + { + return string.Format(CultureInfo.CurrentCulture, "Left({0})", Value); + } + } + + public sealed class Right : Either<TLeft, TRight>, IEquatable<Right> + { + public TRight Value { get; private set; } + + public Right(TRight value) + { + Value = value; + } + + public override TResult Switch<TResult>(Func<TLeft, TResult> caseLeft, Func<TRight, TResult> caseRight) + { + return caseRight(Value); + } + + public override void Switch(Action<TLeft> caseLeft, Action<TRight> caseRight) + { + caseRight(Value); + } + + public bool Equals(Right other) + { + if (other == this) + return true; + if (other == null) + return false; + return EqualityComparer<TRight>.Default.Equals(Value, other.Value); + } + + public override bool Equals(object obj) + { + return Equals(obj as Right); + } + + public override int GetHashCode() + { + return EqualityComparer<TRight>.Default.GetHashCode(Value); + } + + public override string ToString() + { + return string.Format(CultureInfo.CurrentCulture, "Right({0})", Value); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs new file mode 100644 index 0000000..f1fce24 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/HashSet.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_HASHSET +using System; +using System.Collections.Generic; + +namespace System.Reactive +{ + class HashSet<T> + { + private readonly Dictionary<T, object> _set; + private bool _hasNull; + + public HashSet(IEqualityComparer<T> comparer) + { + _set = new Dictionary<T, object>(comparer); + _hasNull = false; + } + + public bool Add(T value) + { + // + // Note: The box instruction in the IL will be erased by the JIT in case T is + // a value type. See GroupBy for more information. + // + if (value == null) + { + if (_hasNull) + return false; + + _hasNull = true; + return true; + } + else + { + if (_set.ContainsKey(value)) + return false; + + _set[value] = null; + return true; + } + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs new file mode 100644 index 0000000..dee33fb --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Helpers.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System; +using System.Collections.Generic; +using System.Reactive.Linq.Observαble; + +namespace System.Reactive +{ + internal static class Helpers + { + public static int? GetLength<T>(IEnumerable<T> source) + { + var array = source as T[]; + if (array != null) + return array.Length; + + var list = source as IList<T>; + if (list != null) + return list.Count; + + return null; + } + + public static IObservable<T> Unpack<T>(IObservable<T> source) + { + var hasOpt = default(bool); + + do + { + hasOpt = false; + + var eval = source as IEvaluatableObservable<T>; + if (eval != null) + { + source = eval.Eval(); + hasOpt = true; + } + } while (hasOpt); + + return source; + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs new file mode 100644 index 0000000..88a89c8 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IConcatenatable.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; + +namespace System.Reactive +{ + interface IConcatenatable<TSource> + { + IEnumerable<IObservable<TSource>> GetSources(); + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs new file mode 100644 index 0000000..d85c226 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/IEvaluatableObservable.cs @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + interface IEvaluatableObservable<T> + { + IObservable<T> Eval(); + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs new file mode 100644 index 0000000..49c79d9 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ImmutableList.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + class ImmutableList<T> + { + T[] data; + + public ImmutableList() + { + data = new T[0]; + } + + public ImmutableList(T[] data) + { + this.data = data; + } + + public ImmutableList<T> Add(T value) + { + var newData = new T[data.Length + 1]; + Array.Copy(data, newData, data.Length); + newData[data.Length] = value; + return new ImmutableList<T>(newData); + } + + public ImmutableList<T> Remove(T value) + { + var i = IndexOf(value); + if (i < 0) + return this; + var newData = new T[data.Length - 1]; + Array.Copy(data, 0, newData, 0, i); + Array.Copy(data, i + 1, newData, i, data.Length - i - 1); + return new ImmutableList<T>(newData); + } + + public int IndexOf(T value) + { + for (var i = 0; i < data.Length; ++i) + if (data[i].Equals(value)) + return i; + return -1; + } + + public T[] Data + { + get { return data; } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs new file mode 100644 index 0000000..239f10d --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Lookup.cs @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; + +namespace System.Reactive +{ + class Lookup<K, E> : ILookup<K, E> + { + Dictionary<K, List<E>> d; + + public Lookup(IEqualityComparer<K> comparer) + { + d = new Dictionary<K, List<E>>(comparer); + } + + public void Add(K key, E element) + { + var list = default(List<E>); + if (!d.TryGetValue(key, out list)) + d[key] = list = new List<E>(); + list.Add(element); + } + + public bool Contains(K key) + { + return d.ContainsKey(key); + } + + public int Count + { + get { return d.Count; } + } + + public IEnumerable<E> this[K key] + { + get { return Hide(d[key]); } + } + + private IEnumerable<E> Hide(List<E> elements) + { + foreach (var x in elements) + yield return x; + } + + public IEnumerator<IGrouping<K, E>> GetEnumerator() + { + foreach (var kv in d) + yield return new Grouping(kv); + } + + class Grouping : IGrouping<K, E> + { + KeyValuePair<K, List<E>> kv; + + public Grouping(KeyValuePair<K, List<E>> kv) + { + this.kv = kv; + } + + public K Key + { + get { return kv.Key; } + } + + public IEnumerator<E> GetEnumerator() + { + return kv.Value.GetEnumerator(); + } + + Collections.IEnumerator Collections.IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } + + Collections.IEnumerator Collections.IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } +}
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs new file mode 100644 index 0000000..d7a49c1 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Observers.cs @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +namespace System.Reactive +{ + class NopObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Instance = new NopObserver<T>(); + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + } + + class DoneObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Completed = new DoneObserver<T>(); + + public Exception Exception { get; set; } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + } + + class DisposedObserver<T> : IObserver<T> + { + public static readonly IObserver<T> Instance = new DisposedObserver<T>(); + + public void OnCompleted() + { + throw new ObjectDisposedException(""); + } + + public void OnError(Exception error) + { + throw new ObjectDisposedException(""); + } + + public void OnNext(T value) + { + throw new ObjectDisposedException(""); + } + } + + class Observer<T> : IObserver<T> + { + private readonly ImmutableList<IObserver<T>> _observers; + + public Observer(ImmutableList<IObserver<T>> observers) + { + _observers = observers; + } + + public void OnCompleted() + { + foreach (var observer in _observers.Data) + observer.OnCompleted(); + } + + public void OnError(Exception error) + { + foreach (var observer in _observers.Data) + observer.OnError(error); + } + + public void OnNext(T value) + { + foreach (var observer in _observers.Data) + observer.OnNext(value); + } + + internal IObserver<T> Add(IObserver<T> observer) + { + return new Observer<T>(_observers.Add(observer)); + } + + internal IObserver<T> Remove(IObserver<T> observer) + { + var i = Array.IndexOf(_observers.Data, observer); + if (i < 0) + return this; + + if (_observers.Data.Length == 2) + { + return _observers.Data[1 - i]; + } + else + { + return new Observer<T>(_observers.Remove(observer)); + } + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs new file mode 100644 index 0000000..85ed55f --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/Producer.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive +{ + /// <summary> + /// Base class for implementation of query operators, providing performance benefits over the use of <see cref="System.Reactive.Linq.Observable.Create">Observable.Create</see>. + /// </summary> + /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam> + abstract class Producer<TSource> : IObservable<TSource> + { + /// <summary> + /// Publicly visible Subscribe method. + /// </summary> + /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param> + /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns> + public IDisposable Subscribe(IObserver<TSource> observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + var sink = new SingleAssignmentDisposable(); + var subscription = new SingleAssignmentDisposable(); + + if (CurrentThreadScheduler.Instance.ScheduleRequired) + { + CurrentThreadScheduler.Instance.Schedule(this, (_, me) => subscription.Disposable = me.Run(observer, subscription, s => sink.Disposable = s)); + } + else + { + subscription.Disposable = this.Run(observer, subscription, s => sink.Disposable = s); + } + + return new CompositeDisposable(2) { sink, subscription }; + } + + /// <summary> + /// Core implementation of the query operator, called upon a new subscription to the producer object. + /// </summary> + /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param> + /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param> + /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param> + /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns> + /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks> + protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink); + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs new file mode 100644 index 0000000..5804538 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/PushPullAdapter.cs @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if NO_CDS || NO_PERF +using System.Collections.Generic; + +namespace System.Reactive +{ + sealed class PushPullAdapter<T, R> : IObserver<T>, IEnumerator<R> + { + Action<Notification<T>> yield; + Action dispose; + Func<Notification<R>> moveNext; + Notification<R> current; + bool done = false; + bool disposed; + + public PushPullAdapter(Action<Notification<T>> yield, Func<Notification<R>> moveNext, Action dispose) + { + this.yield = yield; + this.moveNext = moveNext; + this.dispose = dispose; + } + + public void OnNext(T value) + { + yield(Notification.CreateOnNext<T>(value)); + } + + public void OnError(Exception exception) + { + yield(Notification.CreateOnError<T>(exception)); + dispose(); + } + + public void OnCompleted() + { + yield(Notification.CreateOnCompleted<T>()); + dispose(); + } + + public R Current + { + get { return current.Value; } + } + + public void Dispose() + { + disposed = true; + dispose(); + } + + object System.Collections.IEnumerator.Current + { + get { return this.Current; } + } + + public bool MoveNext() + { + if (disposed) + throw new ObjectDisposedException(""); + + if (!done) + { + current = moveNext(); + done = current.Kind != NotificationKind.OnNext; + } + + current.Exception.ThrowIfNotNull(); + + return current.HasValue; + } + + public void Reset() + { + throw new NotSupportedException(); + } + } +} +#endif
\ No newline at end of file diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs new file mode 100644 index 0000000..926f612 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/QueryServices.cs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Reactive.PlatformServices; + +namespace System.Reactive.Linq +{ + internal static class QueryServices + { + private static Lazy<IQueryServices> s_services = new Lazy<IQueryServices>(Initialize); + + public static T GetQueryImpl<T>(T defaultInstance) + { + return s_services.Value.Extend(defaultInstance); + } + + private static IQueryServices Initialize() + { + return PlatformEnlightenmentProvider.Current.GetService<IQueryServices>() ?? new DefaultQueryServices(); + } + } + + internal interface IQueryServices + { + T Extend<T>(T baseImpl); + } + + class DefaultQueryServices : IQueryServices + { + public T Extend<T>(T baseImpl) + { + return baseImpl; + } + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs new file mode 100644 index 0000000..effdd08 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/ReflectionUtils.cs @@ -0,0 +1,152 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Reflection; + +#if HAS_WINRT +using System.Runtime.InteropServices.WindowsRuntime; +#endif + +namespace System.Reactive +{ + static class ReflectionUtils + { + public static TDelegate CreateDelegate<TDelegate>(object o, MethodInfo method) + { +#if CRIPPLED_REFLECTION + return (TDelegate)(object)method.CreateDelegate(typeof(TDelegate), o); +#else + return (TDelegate)(object)Delegate.CreateDelegate(typeof(TDelegate), o, method); +#endif + } + + public static Delegate CreateDelegate(Type delegateType, object o, MethodInfo method) + { +#if CRIPPLED_REFLECTION + return method.CreateDelegate(delegateType, o); +#else + return Delegate.CreateDelegate(delegateType, o, method); +#endif + } + + public static void GetEventMethods<TSender, TEventArgs>(Type targetType, object target, string eventName, out MethodInfo addMethod, out MethodInfo removeMethod, out Type delegateType, out bool isWinRT) +#if !NO_EVENTARGS_CONSTRAINT + where TEventArgs : EventArgs +#endif + { + var e = default(EventInfo); + + if (target == null) + { + e = targetType.GetEventEx(eventName, true); + if (e == null) + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.COULD_NOT_FIND_STATIC_EVENT, eventName, targetType.FullName)); + } + else + { + e = targetType.GetEventEx(eventName, false); + if (e == null) + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.COULD_NOT_FIND_INSTANCE_EVENT, eventName, targetType.FullName)); + } + + addMethod = e.GetAddMethod(); + removeMethod = e.GetRemoveMethod(); + + if (addMethod == null) + throw new InvalidOperationException(Strings_Linq.EVENT_MISSING_ADD_METHOD); + if (removeMethod == null) + throw new InvalidOperationException(Strings_Linq.EVENT_MISSING_REMOVE_METHOD); + + var psa = addMethod.GetParameters(); + if (psa.Length != 1) + throw new InvalidOperationException(Strings_Linq.EVENT_ADD_METHOD_SHOULD_TAKE_ONE_PARAMETER); + + var psr = removeMethod.GetParameters(); + if (psr.Length != 1) + throw new InvalidOperationException(Strings_Linq.EVENT_REMOVE_METHOD_SHOULD_TAKE_ONE_PARAMETER); + + isWinRT = false; + +#if HAS_WINRT + if (addMethod.ReturnType == typeof(EventRegistrationToken)) + { + isWinRT = true; + + var pet = psr[0]; + if (pet.ParameterType != typeof(EventRegistrationToken)) + throw new InvalidOperationException(Strings_Linq.EVENT_WINRT_REMOVE_METHOD_SHOULD_TAKE_ERT); + } +#endif + + delegateType = psa[0].ParameterType; + + var invokeMethod = delegateType.GetMethod("Invoke"); + + var parameters = invokeMethod.GetParameters(); + + if (parameters.Length != 2) + throw new InvalidOperationException(Strings_Linq.EVENT_PATTERN_REQUIRES_TWO_PARAMETERS); + + if (!typeof(TSender).IsAssignableFrom(parameters[0].ParameterType)) + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.EVENT_SENDER_NOT_ASSIGNABLE, typeof(TSender).FullName)); + + if (!typeof(TEventArgs).IsAssignableFrom(parameters[1].ParameterType)) + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.EVENT_ARGS_NOT_ASSIGNABLE, typeof(TEventArgs).FullName)); + + if (invokeMethod.ReturnType != typeof(void)) + throw new InvalidOperationException(Strings_Linq.EVENT_MUST_RETURN_VOID); + } + + public static EventInfo GetEventEx(this Type type, string name, bool isStatic) + { +#if CRIPPLED_REFLECTION + // TODO: replace in the future by System.Reflection.RuntimeExtensions extension methods + var q = new Queue<TypeInfo>(); + q.Enqueue(type.GetTypeInfo()); + + while (q.Count > 0) + { + var t = q.Dequeue(); + + var e = t.GetDeclaredEvent(name); + if (e != null) + return e; + + foreach (var i in t.ImplementedInterfaces) + q.Enqueue(i.GetTypeInfo()); + + if (t.BaseType != null) + q.Enqueue(t.BaseType.GetTypeInfo()); + } + + return null; +#else + return type.GetEvent(name, isStatic ? BindingFlags.Public | BindingFlags.Static : BindingFlags.Public | BindingFlags.Instance); +#endif + } + +#if CRIPPLED_REFLECTION + public static MethodInfo GetMethod(this Type type, string name) + { + return type.GetTypeInfo().GetDeclaredMethod(name); + } + + public static MethodInfo GetAddMethod(this EventInfo eventInfo) + { + return eventInfo.AddMethod; + } + + public static MethodInfo GetRemoveMethod(this EventInfo eventInfo) + { + return eventInfo.RemoveMethod; + } + + public static bool IsAssignableFrom(this Type type1, Type type2) + { + return type1.GetTypeInfo().IsAssignableFrom(type2.GetTypeInfo()); + } +#endif + } +} diff --git a/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs new file mode 100644 index 0000000..3b974e6 --- /dev/null +++ b/Rx/NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs @@ -0,0 +1,187 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#if !NO_PERF +using System.Collections.Generic; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; + +namespace System.Reactive +{ + abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource> + { + public TailRecursiveSink(IObserver<TSource> observer, IDisposable cancel) + : base(observer, cancel) + { + } + + private bool _isDisposed; + private SerialDisposable _subscription; + private AsyncLock _gate; + private Stack<IEnumerator<IObservable<TSource>>> _stack; + private Stack<int?> _length; + protected Action _recurse; + + public IDisposable Run(IEnumerable<IObservable<TSource>> sources) + { + _isDisposed = false; + _subscription = new SerialDisposable(); + _gate = new AsyncLock(); + _stack = new Stack<IEnumerator<IObservable<TSource>>>(); + _length = new Stack<int?>(); + + var e = default(IEnumerator<IObservable<TSource>>); + if (!TryGetEnumerator(sources, out e)) + return Disposable.Empty; + + _stack.Push(e); + _length.Push(Helpers.GetLength(sources)); + + var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => + { + _recurse = self; + _gate.Wait(MoveNext); + }); + + return new CompositeDisposable(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose))); + } + + protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source); + + private void MoveNext() + { + var hasNext = false; + var next = default(IObservable<TSource>); + + do + { + if (_stack.Count == 0) + break; + + if (_isDisposed) + return; + + var e = _stack.Peek(); + var l = _length.Peek(); + + var current = default(IObservable<TSource>); + try + { + hasNext = e.MoveNext(); + if (hasNext) + current = e.Current; + } + catch (Exception ex) + { + e.Dispose(); + + base._observer.OnError(ex); + base.Dispose(); + return; + } + + if (!hasNext) + { + e.Dispose(); + + _stack.Pop(); + _length.Pop(); + } + else + { + var r = l - 1; + _length.Pop(); + _length.Push(r); + + try + { + next = Helpers.Unpack(current); + } + catch (Exception exception) + { + e.Dispose(); + base._observer.OnError(exception); + base.Dispose(); + return; + } + + // + // Tail recursive case; drop the current frame. + // + if (r == 0) + { + e.Dispose(); + _stack.Pop(); + _length.Pop(); + } + + // + // Flattening of nested sequences. Prevents stack overflow in observers. + // + var nextSeq = Extract(next); + if (nextSeq != null) + { + var nextEnumerator = default(IEnumerator<IObservable<TSource>>); + if (!TryGetEnumerator(nextSeq, out nextEnumerator)) + return; + + _stack.Push(nextEnumerator); + _length.Push(Helpers.GetLength(nextSeq)); + + hasNext = false; + } + } + } while (!hasNext); + + if (!hasNext) + { + Done(); + return; + } + + var d = new SingleAssignmentDisposable(); + _subscription.Disposable = d; + d.Disposable = next.SubscribeSafe(this); + } + + private new void Dispose() + { + while (_stack.Count > 0) + { + var e = _stack.Pop(); + _length.Pop(); + + e.Dispose(); + } + + _isDisposed = true; + } + + private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEnumerator<IObservable<TSource>> result) + { + try + { + result = sources.GetEnumerator(); + return true; + } + catch (Exception exception) + { + base._observer.OnError(exception); + base.Dispose(); + + result = null; + return false; + } + } + + public abstract void OnCompleted(); + public abstract void OnError(Exception error); + public abstract void OnNext(TSource value); + + protected virtual void Done() + { + base._observer.OnCompleted(); + base.Dispose(); + } + } +} +#endif
\ No newline at end of file |