Welcome to mirror list, hosted at ThFree Co, Russian Federation.

ConcurrencyAbstractionLayerImpl.cs « Concurrency « Reactive « System.Reactive.PlatformServices « Source « NET « Rx - github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4c91463d155fd617ebe041e42193caa616be68c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#if !NO_THREAD
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Concurrency
{
    //
    // WARNING: This code is kept *identically* in two places. One copy is kept in System.Reactive.Core for non-PLIB platforms.
    //          Another copy is kept in System.Reactive.PlatformServices to enlighten the default lowest common denominator
    //          behavior of Rx for PLIB when used on a more capable platform.
    //
    internal class /*Default*/ConcurrencyAbstractionLayerImpl : IConcurrencyAbstractionLayer
    {
        public IDisposable StartTimer(Action<object> action, object state, TimeSpan dueTime)
        {
            return new Timer(action, state, Normalize(dueTime));
        }

        public IDisposable StartPeriodicTimer(Action action, TimeSpan period)
        {
            //
            // MSDN documentation states the following:
            //
            //    "If period is zero (0) or negative one (-1) milliseconds and dueTime is positive, callback is invoked once;
            //     the periodic behavior of the timer is disabled, but can be re-enabled using the Change method."
            //
            if (period <= TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("period");

            return new PeriodicTimer(action, period);
        }

        public IDisposable QueueUserWorkItem(Action<object> action, object state)
        {
            System.Threading.ThreadPool.QueueUserWorkItem(_ => action(_), state);
            return Disposable.Empty;
        }

#if USE_SLEEP_MS
        public void Sleep(TimeSpan timeout)
        {
            System.Threading.Thread.Sleep((int)Normalize(timeout).TotalMilliseconds);
        }
#else
        public void Sleep(TimeSpan timeout)
        {
            System.Threading.Thread.Sleep(Normalize(timeout));
        }
#endif

        public IStopwatch StartStopwatch()
        {
#if !NO_STOPWATCH
            return new StopwatchImpl();
#else
            return new DefaultStopwatch();
#endif
        }

        public bool SupportsLongRunning
        {
            get { return true; }
        }

        public void StartThread(Action<object> action, object state)
        {
            new Thread(() =>
            {
                action(state);
            }) { IsBackground = true }.Start();
        }

        private static TimeSpan Normalize(TimeSpan dueTime)
        {
            if (dueTime < TimeSpan.Zero)
                return TimeSpan.Zero;

            return dueTime;
        }

#if USE_TIMER_SELF_ROOT

        //
        // Some historical context. In the early days of Rx, we discovered an issue with
        // the rooting of timers, causing them to get GC'ed even when the IDisposable of
        // a scheduled activity was kept alive. The original code simply created a timer
        // as follows:
        //
        //   var t = default(Timer);
        //   t = new Timer(_ =>
        //   {
        //       t = null;
        //       Debug.WriteLine("Hello!");
        //   }, null, 5000, Timeout.Infinite);
        //
        // IIRC the reference to "t" captured by the closure wasn't sufficient on .NET CF
        // to keep the timer rooted, causing problems on Windows Phone 7. As a result, we
        // added rooting code using a dictionary (SD 7280), which we carried forward all
        // the way to Rx v2.0 RTM.
        //
        // However, the desktop CLR's implementation of System.Threading.Timer exhibits
        // other characteristics where a timer can root itself when the timer is still
        // reachable through the state or callback parameters. To illustrate this, run
        // the following piece of code:
        //
        //   static void Main()
        //   {
        //       Bar();
        //   
        //       while (true)
        //       {
        //           GC.Collect();
        //           GC.WaitForPendingFinalizers();
        //           Thread.Sleep(100);
        //       }
        //   }
        //   
        //   static void Bar()
        //   {
        //       var t = default(Timer);
        //       t = new Timer(_ =>
        //       {
        //           t = null; // Comment out this line to see the timer stop
        //           Console.WriteLine("Hello!");
        //       }, null, 5000, Timeout.Infinite);
        //   }
        //
        // When the closure over "t" is removed, the timer will stop automatically upon
        // garbage collection. However, when retaining the reference, this problem does
        // not exist. The code below exploits this behavior, avoiding unnecessary costs
        // to root timers in a thread-safe manner.
        //
        // Below is a fragment of SOS output, proving the proper rooting:
        //
        //   !gcroot 02492440
        //    HandleTable:
        //        005a13fc (pinned handle)
        //        -> 03491010 System.Object[]
        //        -> 024924dc System.Threading.TimerQueue
        //        -> 02492450 System.Threading.TimerQueueTimer
        //        -> 02492420 System.Threading.TimerCallback
        //        -> 02492414 TimerRootingExperiment.Program+<>c__DisplayClass1
        //        -> 02492440 System.Threading.Timer
        //
        // With the USE_TIMER_SELF_ROOT symbol, we shake off this additional rooting code
        // for newer platforms where this no longer needed. We checked this on .NET Core
        // as well as .NET 4.0, and only #define this symbol for those platforms.
        //

        class Timer : IDisposable
        {
            private Action<object> _action;
            private volatile System.Threading.Timer _timer;

            public Timer(Action<object> action, object state, TimeSpan dueTime)
            {
                _action = action;

                // Don't want the spin wait in Tick to get stuck if this thread gets aborted.
                try { }
                finally
                {
                    //
                    // Rooting of the timer happens through the this.Tick delegate's target object,
                    // which is the current instance and has a field to store the Timer instance.
                    //
                    _timer = new System.Threading.Timer(this.Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
                }
            }

            private void Tick(object state)
            {
                try
                {
                    _action(state);
                }
                finally
                {
                    SpinWait.SpinUntil(IsTimerAssigned);
                    Dispose();
                }
            }

            private bool IsTimerAssigned()
            {
                return _timer != null;
            }

            public void Dispose()
            {
                var timer = _timer;
                if (timer != TimerStubs.Never)
                {
                    _action = Stubs<object>.Ignore;
                    _timer = TimerStubs.Never;

                    timer.Dispose();
                }
            }
        }

        class PeriodicTimer : IDisposable
        {
            private Action _action;
            private volatile System.Threading.Timer _timer;

            public PeriodicTimer(Action action, TimeSpan period)
            {
                _action = action;

                //
                // Rooting of the timer happens through the this.Tick delegate's target object,
                // which is the current instance and has a field to store the Timer instance.
                //
                _timer = new System.Threading.Timer(this.Tick, null, period, period);
            }

            private void Tick(object state)
            {
                _action();
            }

            public void Dispose()
            {
                var timer = _timer;
                if (timer != null)
                {
                    _action = Stubs.Nop;
                    _timer = null;

                    timer.Dispose();
                }
            }
        }
#else
        class Timer : IDisposable
        {
            //
            // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running.
            //
#if !NO_HASHSET
            private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>();
#else
            private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>();
#endif

            private Action<object> _action;
            private System.Threading.Timer _timer;

            private bool _hasAdded;
            private bool _hasRemoved;

            public Timer(Action<object> action, object state, TimeSpan dueTime)
            {
                _action = action;
                _timer = new System.Threading.Timer(Tick, state, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));

                lock (s_timers)
                {
                    if (!_hasRemoved)
                    {
#if !NO_HASHSET
                        s_timers.Add(_timer);
#else
                        s_timers.Add(_timer, null);
#endif

                        _hasAdded = true;
                    }
                }
            }

            private void Tick(object state)
            {
                try
                {
                    _action(state);
                }
                finally
                {
                    Dispose();
                }
            }

            public void Dispose()
            {
                _action = Stubs<object>.Ignore;

                var timer = default(System.Threading.Timer);

                lock (s_timers)
                {
                    if (!_hasRemoved)
                    {
                        timer = _timer;
                        _timer = null;

                        if (_hasAdded && timer != null)
                            s_timers.Remove(timer);

                        _hasRemoved = true;
                    }
                }

                if (timer != null)
                    timer.Dispose();
            }
        }

        class PeriodicTimer : IDisposable
        {
            //
            // Note: the dictionary exists to "root" the timers so that they are not garbage collected and finalized while they are running.
            //
#if !NO_HASHSET
            private static readonly HashSet<System.Threading.Timer> s_timers = new HashSet<System.Threading.Timer>();
#else
            private static readonly Dictionary<System.Threading.Timer, object> s_timers = new Dictionary<System.Threading.Timer, object>();
#endif

            private Action _action;
            private System.Threading.Timer _timer;

            public PeriodicTimer(Action action, TimeSpan period)
            {
                _action = action;
                _timer = new System.Threading.Timer(Tick, null, period, period);

                lock (s_timers)
                {
#if !NO_HASHSET
                    s_timers.Add(_timer);
#else
                    s_timers.Add(_timer, null);
#endif
                }
            }

            private void Tick(object state)
            {
                _action();
            }

            public void Dispose()
            {
                var timer = default(System.Threading.Timer);

                lock (s_timers)
                {
                    timer = _timer;
                    _timer = null;

                    if (timer != null)
                        s_timers.Remove(timer);
                }

                if (timer != null)
                {
                    timer.Dispose();
                    _action = Stubs.Nop;
                }
            }
        }
#endif
    }
}
#endif