Welcome to mirror list, hosted at ThFree Co, Russian Federation.

ReplaySubject.cs « Subjects « Reactive « System.Reactive.Linq « Rx.NET - github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4c1412494f991511746f76c6baaf8654c97e346b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Reactive.Concurrency;

namespace System.Reactive.Subjects
{
    /// <summary>
    /// Represents an object that is both an observable sequence as well as an observer.
    /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
    /// </summary>
    /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
    public sealed class ReplaySubject<T> : ISubject<T>, IDisposable
    {
        private const int InfiniteBufferSize = int.MaxValue;

        private readonly int _bufferSize;
        private readonly TimeSpan _window;
        private readonly IScheduler _scheduler;
        private readonly IStopwatch _stopwatch;

        private readonly Queue<TimeInterval<T>> _queue;
        private bool _isStopped;
        private Exception _error;

        private ImmutableList<ScheduledObserver<T>> _observers;
        private bool _isDisposed;
        
        private readonly object _gate = new object();

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size, window and scheduler.
        /// </summary>
        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
        /// <param name="window">Maximum time length of the replay buffer.</param>
        /// <param name="scheduler">Scheduler the observers are invoked on.</param>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
        public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
        {
            if (bufferSize < 0)
                throw new ArgumentOutOfRangeException("bufferSize");
            if (window < TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("window");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");

            _bufferSize = bufferSize;
            _window = window;
            _scheduler = scheduler;

            _stopwatch = _scheduler.StartStopwatch();
            _queue = new Queue<TimeInterval<T>>();
            _isStopped = false;
            _error = null;

            _observers = new ImmutableList<ScheduledObserver<T>>();
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and window.
        /// </summary>
        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
        /// <param name="window">Maximum time length of the replay buffer.</param>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero. -or- <paramref name="window"/> is less than TimeSpan.Zero.</exception>
        public ReplaySubject(int bufferSize, TimeSpan window)
            : this(bufferSize, window, SchedulerDefaults.Iteration)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class.
        /// </summary>
        public ReplaySubject()
            : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified scheduler.
        /// </summary>
        /// <param name="scheduler">Scheduler the observers are invoked on.</param>
        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
        public ReplaySubject(IScheduler scheduler)
            : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size and scheduler.
        /// </summary>
        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
        /// <param name="scheduler">Scheduler the observers are invoked on.</param>
        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
        public ReplaySubject(int bufferSize, IScheduler scheduler)
            : this(bufferSize, TimeSpan.MaxValue, scheduler)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified buffer size.
        /// </summary>
        /// <param name="bufferSize">Maximum element count of the replay buffer.</param>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="bufferSize"/> is less than zero.</exception>
        public ReplaySubject(int bufferSize)
            : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window and scheduler.
        /// </summary>
        /// <param name="window">Maximum time length of the replay buffer.</param>
        /// <param name="scheduler">Scheduler the observers are invoked on.</param>
        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
        public ReplaySubject(TimeSpan window, IScheduler scheduler)
            : this(InfiniteBufferSize, window, scheduler)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;" /> class with the specified window.
        /// </summary>
        /// <param name="window">Maximum time length of the replay buffer.</param>
        /// <exception cref="ArgumentOutOfRangeException"><paramref name="window"/> is less than TimeSpan.Zero.</exception>
        public ReplaySubject(TimeSpan window)
            : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
        {
        }

        void Trim(TimeSpan now)
        {
            while (_queue.Count > _bufferSize)
                _queue.Dequeue();
            while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
                _queue.Dequeue();
        }

        /// <summary>
        /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
        /// </summary>
        /// <param name="value">The value to send to all observers.</param>
        public void OnNext(T value)
        {
            var o = default(ScheduledObserver<T>[]);
            lock (_gate)
            {
                CheckDisposed();

                if (!_isStopped)
                {
                    var now = _stopwatch.Elapsed;
                    _queue.Enqueue(new TimeInterval<T>(value, now));
                    Trim(now);

                    o = _observers.Data;
                    foreach (var observer in o)
                        observer.OnNext(value);
                }
            }

            if (o != null)
                foreach (var observer in o)
                    observer.EnsureActive();
        }

        /// <summary>
        /// Notifies all subscribed and future observers about the specified exception.
        /// </summary>
        /// <param name="error">The exception to send to all observers.</param>
        /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
        public void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException("error");

            var o = default(ScheduledObserver<T>[]);
            lock (_gate)
            {
                CheckDisposed();

                if (!_isStopped)
                {
                    var now = _stopwatch.Elapsed;
                    _isStopped = true;
                    _error = error;
                    Trim(now);

                    o = _observers.Data;
                    foreach (var observer in o)
                        observer.OnError(error);

                    _observers = new ImmutableList<ScheduledObserver<T>>();
                }
            }

            if (o != null)
                foreach (var observer in o)
                    observer.EnsureActive();
        }

        /// <summary>
        /// Notifies all subscribed and future observers about the end of the sequence.
        /// </summary>
        public void OnCompleted()
        {
            var o = default(ScheduledObserver<T>[]);
            lock (_gate)
            {
                CheckDisposed();

                if (!_isStopped)
                {
                    var now = _stopwatch.Elapsed;
                    _isStopped = true;
                    Trim(now);

                    o = _observers.Data;
                    foreach (var observer in o)
                        observer.OnCompleted();

                    _observers = new ImmutableList<ScheduledObserver<T>>();
                }
            }

            if (o != null)
                foreach (var observer in o)
                    observer.EnsureActive();
        }

        /// <summary>
        /// Subscribes an observer to the subject.
        /// </summary>
        /// <param name="observer">Observer to subscribe to the subject.</param>
        /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");

            var so = new ScheduledObserver<T>(_scheduler, observer);

            var n = 0;

            var subscription = new RemovableDisposable(this, so);
            lock (_gate)
            {
                CheckDisposed();

                //
                // Notice the v1.x behavior of always calling Trim is preserved here.
                //
                // This may be subject (pun intended) of debate: should this policy
                // only be applied while the sequence is active? With the current
                // behavior, a sequence will "die out" after it has terminated by
                // continue to drop OnNext notifications from the queue.
                //
                // In v1.x, this behavior was due to trimming based on the clock value
                // returned by scheduler.Now, applied to all but the terminal message
                // in the queue. Using the IStopwatch has the same effect. Either way,
                // we guarantee the final notification will be observed, but there's
                // no way to retain the buffer directly. One approach is to use the
                // time-based TakeLast operator and apply an unbounded ReplaySubject
                // to it.
                //
                // To conclude, we're keeping the behavior as-is for compatibility
                // reasons with v1.x.
                //
                Trim(_stopwatch.Elapsed);
                _observers = _observers.Add(so);

                n = _queue.Count;
                foreach (var item in _queue)
                    so.OnNext(item.Value);

                if (_error != null)
                {
                    n++;
                    so.OnError(_error);
                }
                else if (_isStopped)
                {
                    n++;
                    so.OnCompleted();
                }
            }

            so.EnsureActive(n);

            return subscription;
        }

        void Unsubscribe(ScheduledObserver<T> observer)
        {
            lock (_gate)
            {
                if (!_isDisposed)
                    _observers = _observers.Remove(observer);
            }
        }

        sealed class RemovableDisposable : IDisposable
        {
            private readonly ReplaySubject<T> _subject;
            private readonly ScheduledObserver<T> _observer;

            public RemovableDisposable(ReplaySubject<T> subject, ScheduledObserver<T> observer)
            {
                _subject = subject;
                _observer = observer;
            }

            public void Dispose()
            {
                _observer.Dispose();
                _subject.Unsubscribe(_observer);
            }
        }

        void CheckDisposed()
        {
            if (_isDisposed)
                throw new ObjectDisposedException(string.Empty);
        }

        /// <summary>
        /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.ReplaySubject&lt;T&gt;"/> class and unsubscribe all observers.
        /// </summary>
        public void Dispose()
        {
            lock (_gate)
            {
                _isDisposed = true;
                _observers = null;
            }
        }
    }
}