Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs')
-rw-r--r--Rx.NET/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs445
1 files changed, 445 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs b/Rx.NET/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs
new file mode 100644
index 0000000..bc4cdc6
--- /dev/null
+++ b/Rx.NET/System.Reactive.Experimental/Reactive/Linq/QueryLanguageEx.cs
@@ -0,0 +1,445 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+
+namespace System.Reactive.Linq
+{
+ internal class QueryLanguageEx : IQueryLanguageEx
+ {
+ #region Create
+
+ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
+ {
+ return new AnonymousObservable<TResult>(observer =>
+ iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
+ }
+
+ public virtual IObservable<Unit> Create(Func<IEnumerable<IObservable<object>>> iteratorMethod)
+ {
+ return new AnonymousObservable<Unit>(observer =>
+ iteratorMethod().Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
+ }
+
+ #endregion
+
+ #region Expand
+
+ public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler)
+ {
+ return new AnonymousObservable<TSource>(observer =>
+ {
+ var outGate = new object();
+ var q = new Queue<IObservable<TSource>>();
+ var m = new SerialDisposable();
+ var d = new CompositeDisposable { m };
+ var activeCount = 0;
+ var isAcquired = false;
+
+ var ensureActive = default(Action);
+
+ ensureActive = () =>
+ {
+ var isOwner = false;
+
+ lock (q)
+ {
+ if (q.Count > 0)
+ {
+ isOwner = !isAcquired;
+ isAcquired = true;
+ }
+ }
+
+ if (isOwner)
+ {
+ m.Disposable = scheduler.Schedule(self =>
+ {
+ var work = default(IObservable<TSource>);
+
+ lock (q)
+ {
+ if (q.Count > 0)
+ work = q.Dequeue();
+ else
+ {
+ isAcquired = false;
+ return;
+ }
+ }
+
+ var m1 = new SingleAssignmentDisposable();
+ d.Add(m1);
+ m1.Disposable = work.Subscribe(
+ x =>
+ {
+ lock (outGate)
+ observer.OnNext(x);
+
+ var result = default(IObservable<TSource>);
+ try
+ {
+ result = selector(x);
+ }
+ catch (Exception exception)
+ {
+ lock (outGate)
+ observer.OnError(exception);
+ }
+
+ lock (q)
+ {
+ q.Enqueue(result);
+ activeCount++;
+ }
+
+ ensureActive();
+ },
+ exception =>
+ {
+ lock (outGate)
+ observer.OnError(exception);
+ },
+ () =>
+ {
+ d.Remove(m1);
+
+ var done = false;
+ lock (q)
+ {
+ activeCount--;
+ if (activeCount == 0)
+ done = true;
+ }
+ if (done)
+ lock (outGate)
+ observer.OnCompleted();
+ });
+ self();
+ });
+ }
+ };
+
+ lock (q)
+ {
+ q.Enqueue(source);
+ activeCount++;
+ }
+ ensureActive();
+
+ return d;
+ });
+ }
+
+ public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector)
+ {
+ return source.Expand(selector, SchedulerDefaults.Iteration);
+ }
+
+ #endregion
+
+ #region ForkJoin
+
+ public virtual IObservable<TResult> ForkJoin<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
+ {
+ return Combine<TFirst, TSecond, TResult>(first, second, (observer, leftSubscription, rightSubscription) =>
+ {
+ var leftStopped = false;
+ var rightStopped = false;
+ var hasLeft = false;
+ var hasRight = false;
+ var lastLeft = default(TFirst);
+ var lastRight = default(TSecond);
+
+ return new BinaryObserver<TFirst, TSecond>(
+ left =>
+ {
+ switch (left.Kind)
+ {
+ case NotificationKind.OnNext:
+ hasLeft = true;
+ lastLeft = left.Value;
+ break;
+ case NotificationKind.OnError:
+ rightSubscription.Dispose();
+ observer.OnError(left.Exception);
+ break;
+ case NotificationKind.OnCompleted:
+ leftStopped = true;
+ if (rightStopped)
+ {
+ if (!hasLeft)
+ observer.OnCompleted();
+ else if (!hasRight)
+ observer.OnCompleted();
+ else
+ {
+ TResult result;
+ try
+ {
+ result = resultSelector(lastLeft, lastRight);
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+ observer.OnNext(result);
+ observer.OnCompleted();
+ }
+ }
+ break;
+ }
+ },
+ right =>
+ {
+ switch (right.Kind)
+ {
+ case NotificationKind.OnNext:
+ hasRight = true;
+ lastRight = right.Value;
+ break;
+ case NotificationKind.OnError:
+ leftSubscription.Dispose();
+ observer.OnError(right.Exception);
+ break;
+ case NotificationKind.OnCompleted:
+ rightStopped = true;
+ if (leftStopped)
+ {
+ if (!hasLeft)
+ observer.OnCompleted();
+ else if (!hasRight)
+ observer.OnCompleted();
+ else
+ {
+ TResult result;
+ try
+ {
+ result = resultSelector(lastLeft, lastRight);
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+ observer.OnNext(result);
+ observer.OnCompleted();
+ }
+ }
+ break;
+ }
+ });
+ });
+ }
+
+ public virtual IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources)
+ {
+ return sources.ForkJoin();
+ }
+
+ public virtual IObservable<TSource[]> ForkJoin<TSource>(IEnumerable<IObservable<TSource>> sources)
+ {
+ return new AnonymousObservable<TSource[]>(subscriber =>
+ {
+ var allSources = sources.ToArray();
+ var count = allSources.Length;
+
+ if (count == 0)
+ {
+ subscriber.OnCompleted();
+ return Disposable.Empty;
+ }
+
+ var group = new CompositeDisposable(allSources.Length);
+ var gate = new object();
+
+ var finished = false;
+ var hasResults = new bool[count];
+ var hasCompleted = new bool[count];
+ var results = new List<TSource>(count);
+
+ lock (gate)
+ {
+ for (var index = 0; index < count; index++)
+ {
+ var currentIndex = index;
+ var source = allSources[index];
+ results.Add(default(TSource));
+ group.Add(source.Subscribe(
+ value =>
+ {
+ lock (gate)
+ {
+ if (!finished)
+ {
+ hasResults[currentIndex] = true;
+ results[currentIndex] = value;
+ }
+ }
+ },
+ error =>
+ {
+ lock (gate)
+ {
+ finished = true;
+ subscriber.OnError(error);
+ group.Dispose();
+ }
+ },
+ () =>
+ {
+ lock (gate)
+ {
+ if (!finished)
+ {
+ if (!hasResults[currentIndex])
+ {
+ subscriber.OnCompleted();
+ return;
+ }
+ hasCompleted[currentIndex] = true;
+ foreach (var completed in hasCompleted)
+ {
+ if (!completed)
+ return;
+ }
+ finished = true;
+ subscriber.OnNext(results.ToArray());
+ subscriber.OnCompleted();
+ }
+ }
+ }));
+ }
+ }
+ return group;
+ });
+ }
+
+ #endregion
+
+ #region Let
+
+ public virtual IObservable<TResult> Let<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> function)
+ {
+ return function(source);
+ }
+
+ #endregion
+
+ #region ManySelect
+
+ public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector)
+ {
+ return ManySelect(source, selector, DefaultScheduler.Instance);
+ }
+
+ public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector, IScheduler scheduler)
+ {
+ return Observable.Defer<TResult>(() =>
+ {
+ var chain = default(ChainObservable<TSource>);
+
+ return source
+ .Select(
+ x =>
+ {
+ var curr = new ChainObservable<TSource>(x);
+
+ if (chain != null)
+ chain.OnNext(curr);
+ chain = curr;
+
+ return (IObservable<TSource>)curr;
+ })
+ .Do(
+ _ => { },
+ exception =>
+ {
+ if (chain != null)
+ chain.OnError(exception);
+ },
+ () =>
+ {
+ if (chain != null)
+ chain.OnCompleted();
+ })
+ .ObserveOn(scheduler)
+ .Select(selector);
+ });
+ }
+
+ class ChainObservable<T> : ISubject<IObservable<T>, T>
+ {
+ T head;
+ AsyncSubject<IObservable<T>> tail = new AsyncSubject<IObservable<T>>();
+
+ public ChainObservable(T head)
+ {
+ this.head = head;
+ }
+
+ public IDisposable Subscribe(IObserver<T> observer)
+ {
+ var g = new CompositeDisposable();
+ g.Add(CurrentThreadScheduler.Instance.Schedule(() =>
+ {
+ observer.OnNext(head);
+ g.Add(tail.Merge().Subscribe(observer));
+ }));
+ return g;
+ }
+
+ public void OnCompleted()
+ {
+ OnNext(Observable.Empty<T>());
+ }
+
+ public void OnError(Exception error)
+ {
+ OnNext(Observable.Throw<T>(error));
+ }
+
+ public void OnNext(IObservable<T> value)
+ {
+ tail.OnNext(value);
+ tail.OnCompleted();
+ }
+ }
+
+ #endregion
+
+ #region ToListObservable
+
+ public virtual ListObservable<TSource> ToListObservable<TSource>(IObservable<TSource> source)
+ {
+ return new ListObservable<TSource>(source);
+ }
+
+ #endregion
+
+ #region |> Helpers <|
+
+ private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
+ {
+ return new AnonymousObservable<TResult>(observer =>
+ {
+ var leftSubscription = new SingleAssignmentDisposable();
+ var rightSubscription = new SingleAssignmentDisposable();
+
+ var combiner = combinerSelector(observer, leftSubscription, rightSubscription);
+ var gate = new object();
+
+ leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
+ rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
+
+ return new CompositeDisposable(leftSubscription, rightSubscription);
+ });
+ }
+
+ #endregion
+ }
+}