Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Ix/NET/System.Interactive.Async/AsyncEnumerable.Creation.cs')
-rw-r--r--Ix/NET/System.Interactive.Async/AsyncEnumerable.Creation.cs283
1 files changed, 283 insertions, 0 deletions
diff --git a/Ix/NET/System.Interactive.Async/AsyncEnumerable.Creation.cs b/Ix/NET/System.Interactive.Async/AsyncEnumerable.Creation.cs
new file mode 100644
index 0000000..f930293
--- /dev/null
+++ b/Ix/NET/System.Interactive.Async/AsyncEnumerable.Creation.cs
@@ -0,0 +1,283 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Linq
+{
+ public static partial class AsyncEnumerable
+ {
+ static IAsyncEnumerable<T> Create<T>(Func<IAsyncEnumerator<T>> getEnumerator)
+ {
+ return new AnonymousAsyncEnumerable<T>(getEnumerator);
+ }
+
+ class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
+ {
+ Func<IAsyncEnumerator<T>> getEnumerator;
+
+ public AnonymousAsyncEnumerable(Func<IAsyncEnumerator<T>> getEnumerator)
+ {
+ this.getEnumerator = getEnumerator;
+ }
+
+ public IAsyncEnumerator<T> GetEnumerator()
+ {
+ return getEnumerator();
+ }
+ }
+
+ static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
+ {
+ return new AnonymousAsyncEnumerator<T>(moveNext, current, dispose);
+ }
+
+ static IAsyncEnumerator<T> Create<T>(Func<CancellationToken, TaskCompletionSource<bool>, Task<bool>> moveNext, Func<T> current, Action dispose)
+ {
+ var self = default(IAsyncEnumerator<T>);
+ self = new AnonymousAsyncEnumerator<T>(
+ ct =>
+ {
+ var tcs = new TaskCompletionSource<bool>();
+
+ var stop = new Action(() =>
+ {
+ self.Dispose();
+ tcs.TrySetCanceled();
+ });
+
+ var ctr = ct.Register(stop);
+
+ var res = moveNext(ct, tcs).Finally(ctr.Dispose);
+ return res;
+ },
+ current,
+ dispose
+ );
+ return self;
+ }
+
+ class AnonymousAsyncEnumerator<T> : IAsyncEnumerator<T>
+ {
+ private readonly Func<CancellationToken, Task<bool>> _moveNext;
+ private readonly Func<T> _current;
+ private readonly Action _dispose;
+ private bool _disposed;
+
+ public AnonymousAsyncEnumerator(Func<CancellationToken, Task<bool>> moveNext, Func<T> current, Action dispose)
+ {
+ _moveNext = moveNext;
+ _current = current;
+ _dispose = dispose;
+ }
+
+ public Task<bool> MoveNext(CancellationToken cancellationToken)
+ {
+ if (_disposed)
+ return TaskExt.Return(false, CancellationToken.None);
+
+ return _moveNext(cancellationToken);
+ }
+
+ public T Current
+ {
+ get
+ {
+ return _current();
+ }
+ }
+
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _disposed = true;
+ _dispose();
+ }
+ }
+ }
+
+ public static IAsyncEnumerable<TValue> Return<TValue>(TValue value)
+ {
+ return new[] { value }.ToAsyncEnumerable();
+ }
+
+ public static IAsyncEnumerable<TValue> Throw<TValue>(Exception exception)
+ {
+ if (exception == null)
+ throw new ArgumentNullException("exception");
+
+ return Create(() => Create<TValue>(
+ ct => TaskExt.Throw<bool>(exception, ct),
+ () => { throw new InvalidOperationException(); },
+ () => { })
+ );
+ }
+
+ public static IAsyncEnumerable<TValue> Never<TValue>()
+ {
+ return Create(() => Create<TValue>(
+ (ct, tcs) => tcs.Task,
+ () => { throw new InvalidOperationException(); },
+ () => { })
+ );
+ }
+
+ public static IAsyncEnumerable<TValue> Empty<TValue>()
+ {
+ return Create(() => Create<TValue>(
+ ct => TaskExt.Return(false, ct),
+ () => { throw new InvalidOperationException(); },
+ () => { })
+ );
+ }
+
+ public static IAsyncEnumerable<int> Range(int start, int count)
+ {
+ if (count < 0)
+ throw new ArgumentOutOfRangeException("count");
+
+ return Enumerable.Range(start, count).ToAsyncEnumerable();
+ }
+
+ public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element, int count)
+ {
+ if (count < 0)
+ throw new ArgumentOutOfRangeException("count");
+
+ return Enumerable.Repeat(element, count).ToAsyncEnumerable();
+ }
+
+ public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element)
+ {
+ return Create(() =>
+ {
+ return Create(
+ ct => TaskExt.Return(true, ct),
+ () => element,
+ () => { }
+ );
+ });
+ }
+
+ public static IAsyncEnumerable<TSource> Defer<TSource>(Func<IAsyncEnumerable<TSource>> factory)
+ {
+ if (factory == null)
+ throw new ArgumentNullException("factory");
+
+ return Create(() => factory().GetEnumerator());
+ }
+
+ public static IAsyncEnumerable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
+ {
+ if (condition == null)
+ throw new ArgumentNullException("condition");
+ if (iterate == null)
+ throw new ArgumentNullException("iterate");
+ if (resultSelector == null)
+ throw new ArgumentNullException("resultSelector");
+
+ return Create(() =>
+ {
+ var i = initialState;
+ var started = false;
+ var current = default(TResult);
+
+ return Create(
+ ct =>
+ {
+ var b = false;
+ try
+ {
+ if (started)
+ i = iterate(i);
+
+ b = condition(i);
+
+ if (b)
+ current = resultSelector(i);
+ }
+ catch (Exception ex)
+ {
+ return TaskExt.Throw<bool>(ex, ct);
+ }
+
+ if (!b)
+ return TaskExt.Return(false, ct);
+
+ if (!started)
+ started = true;
+
+ return TaskExt.Return(true, ct);
+ },
+ () => current,
+ () => { }
+ );
+ });
+ }
+
+ public static IAsyncEnumerable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory) where TResource : IDisposable
+ {
+ if (resourceFactory == null)
+ throw new ArgumentNullException("resourceFactory");
+ if (enumerableFactory == null)
+ throw new ArgumentNullException("enumerableFactory");
+
+ return Create(() =>
+ {
+ var resource = resourceFactory();
+ var e = default(IAsyncEnumerator<TSource>);
+
+ try
+ {
+ e = enumerableFactory(resource).GetEnumerator();
+ }
+ catch (Exception)
+ {
+ resource.Dispose();
+ throw;
+ }
+
+ var cts = new CancellationTokenDisposable();
+ var d = new CompositeDisposable(cts, resource, e);
+
+ var current = default(TSource);
+
+ return Create(
+ (ct, tcs) =>
+ {
+ e.MoveNext(cts.Token).ContinueWith(t =>
+ {
+ t.Handle(tcs,
+ res =>
+ {
+ if (res)
+ {
+ current = e.Current;
+ tcs.TrySetResult(true);
+ }
+ else
+ {
+ d.Dispose();
+ tcs.TrySetResult(false);
+ }
+ },
+ ex =>
+ {
+ d.Dispose();
+ tcs.TrySetException(ex);
+ }
+ );
+ });
+
+ return tcs.Task;
+ },
+ () => current,
+ d.Dispose
+ );
+ });
+ }
+ }
+}