Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/ForEach.cs')
-rw-r--r--Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/ForEach.cs122
1 files changed, 122 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/ForEach.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/ForEach.cs
new file mode 100644
index 0000000..38e112d
--- /dev/null
+++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/ForEach.cs
@@ -0,0 +1,122 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System;
+using System.Threading;
+
+namespace System.Reactive.Linq.Observαble
+{
+ class ForEach<TSource>
+ {
+ public class _ : IObserver<TSource>
+ {
+ private readonly Action<TSource> _onNext;
+ private readonly Action _done;
+
+ private Exception _exception;
+ private int _stopped;
+
+ public _(Action<TSource> onNext, Action done)
+ {
+ _onNext = onNext;
+ _done = done;
+
+ _stopped = 0;
+ }
+
+ public Exception Error
+ {
+ get { return _exception; }
+ }
+
+ public void OnNext(TSource value)
+ {
+ if (_stopped == 0)
+ {
+ try
+ {
+ _onNext(value);
+ }
+ catch (Exception ex)
+ {
+ OnError(ex);
+ }
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ if (Interlocked.Exchange(ref _stopped, 1) == 0)
+ {
+ _exception = error;
+ _done();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Exchange(ref _stopped, 1) == 0)
+ {
+ _done();
+ }
+ }
+ }
+
+ public class τ : IObserver<TSource>
+ {
+ private readonly Action<TSource, int> _onNext;
+ private readonly Action _done;
+
+ private int _index;
+ private Exception _exception;
+ private int _stopped;
+
+ public τ(Action<TSource, int> onNext, Action done)
+ {
+ _onNext = onNext;
+ _done = done;
+
+ _index = 0;
+ _stopped = 0;
+ }
+
+ public Exception Error
+ {
+ get { return _exception; }
+ }
+
+ public void OnNext(TSource value)
+ {
+ if (_stopped == 0)
+ {
+ try
+ {
+ _onNext(value, checked(_index++));
+ }
+ catch (Exception ex)
+ {
+ OnError(ex);
+ }
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ if (Interlocked.Exchange(ref _stopped, 1) == 0)
+ {
+ _exception = error;
+ _done();
+ }
+ }
+
+ public void OnCompleted()
+ {
+ if (Interlocked.Exchange(ref _stopped, 1) == 0)
+ {
+ _done();
+ }
+ }
+ }
+ }
+}
+#endif \ No newline at end of file