Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Latest.cs')
-rw-r--r--Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Latest.cs145
1 files changed, 145 insertions, 0 deletions
diff --git a/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Latest.cs b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Latest.cs
new file mode 100644
index 0000000..2699797
--- /dev/null
+++ b/Rx.NET/System.Reactive.Linq/Reactive/Linq/Observαble/Latest.cs
@@ -0,0 +1,145 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#if !NO_PERF
+using System;
+using System.Collections.Generic;
+using System.Reactive.Threading;
+using System.Threading;
+
+namespace System.Reactive.Linq.Observαble
+{
+ class Latest<TSource> : PushToPullAdapter<TSource, TSource>
+ {
+ public Latest(IObservable<TSource> source)
+ : base(source)
+ {
+ }
+
+ protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+ {
+ return new _(subscription);
+ }
+
+ class _ : PushToPullSink<TSource, TSource>
+ {
+ private readonly object _gate;
+
+#if !NO_CDS
+ private readonly SemaphoreSlim _semaphore;
+#else
+ private readonly Semaphore _semaphore;
+#endif
+
+ public _(IDisposable subscription)
+ : base(subscription)
+ {
+ _gate = new object();
+
+#if !NO_CDS
+ _semaphore = new SemaphoreSlim(0, 1);
+#else
+ _semaphore = new Semaphore(0, 1);
+#endif
+ }
+
+ private bool _notificationAvailable;
+ private NotificationKind _kind;
+ private TSource _value;
+ private Exception _error;
+
+ public override void OnNext(TSource value)
+ {
+ var lackedValue = false;
+ lock (_gate)
+ {
+ lackedValue = !_notificationAvailable;
+ _notificationAvailable = true;
+ _kind = NotificationKind.OnNext;
+ _value = value;
+ }
+
+ if (lackedValue)
+ _semaphore.Release();
+ }
+
+ public override void OnError(Exception error)
+ {
+ base.Dispose();
+
+ var lackedValue = false;
+ lock (_gate)
+ {
+ lackedValue = !_notificationAvailable;
+ _notificationAvailable = true;
+ _kind = NotificationKind.OnError;
+ _error = error;
+ }
+
+ if (lackedValue)
+ _semaphore.Release();
+ }
+
+ public override void OnCompleted()
+ {
+ base.Dispose();
+
+ var lackedValue = false;
+ lock (_gate)
+ {
+ lackedValue = !_notificationAvailable;
+ _notificationAvailable = true;
+ _kind = NotificationKind.OnCompleted;
+ }
+
+ if (lackedValue)
+ _semaphore.Release();
+ }
+
+ public override bool TryMoveNext(out TSource current)
+ {
+ var kind = default(NotificationKind);
+ var value = default(TSource);
+ var error = default(Exception);
+
+#if !NO_CDS
+ _semaphore.Wait();
+#else
+ _semaphore.WaitOne();
+#endif
+
+ lock (_gate)
+ {
+ kind = _kind;
+
+ switch (kind)
+ {
+ case NotificationKind.OnNext:
+ value = _value;
+ break;
+ case NotificationKind.OnError:
+ error = _error;
+ break;
+ }
+
+ _notificationAvailable = false;
+ }
+
+ switch (kind)
+ {
+ case NotificationKind.OnNext:
+ current = _value;
+ return true;
+ case NotificationKind.OnError:
+ error.Throw();
+ break;
+ case NotificationKind.OnCompleted:
+ break;
+ }
+
+ current = default(TSource);
+ return false;
+ }
+ }
+ }
+}
+#endif