Welcome to mirror list, hosted at ThFree Co, Russian Federation.

BufferedStream2.cs « IO « System « System.Core « referencesource « class « mcs - github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: d958460110f82524a2b64e4677aed157d01e4167 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
// ==++==
// 
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
/*============================================================
**
** Class:  BufferedStream2
**
**
===========================================================*/
using System;
using System.Diagnostics;
using System.Globalization;
using System.Runtime.Versioning;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Runtime.CompilerServices;

using Microsoft.Win32;
using Microsoft.Win32.SafeHandles;
using System.Security.Permissions;

namespace System.IO {

// This abstract implementation adds thread safe buffering on top 
// of the underlying stream. For most streams, having this intermediate
// buffer translates to better performance due to the costly nature of 
// underlying IO, P/Invoke (such as disk IO). This also improves the locking 
// efficiency when operating under heavy concurrency. The synchronization 
// technique used in this implementation is specifically optimized for IO


// The main differences between this implementation and the existing System.IO.BufferedStream
//  - the design allows for inheritance as opposed to wrapping streams
//  - it is thread safe, though currently only synchronous Write is optimized 


[HostProtection(Synchronization=true)]
internal abstract class BufferedStream2 : Stream
{
    protected internal const int DefaultBufferSize = 32*1024; //32KB or 64KB seems to give the best throughput 

    protected int bufferSize;                       // Length of internal buffer, if it's allocated.
    private byte[] _buffer;                         // Internal buffer.  Alloc on first use.
    
    // At present only concurrent buffer writing is optimized implicitly 
    // while reading relies on explicit locking. 
    
    // Ideally we want these fields to be volatile
    private /*volatile*/ int _pendingBufferCopy;    // How many buffer writes are pending.
    private /*volatile*/ int _writePos;             // Write pointer within shared buffer.
    
    // Should we use a separate buffer for reading Vs writing?
    private /*volatile*/ int _readPos;              // Read pointer within shared buffer.
    private /*volatile*/ int _readLen;              // Number of bytes read in buffer from file.

    // Ideally we want this field to be volatile but Interlocked operations 
    // on 64bit int is not guaranteed to be atomic especially on 32bit platforms
    protected long pos;                             // Cache current location in the underlying stream.

    // Derived streams should override CanRead/CanWrite/CanSeek to enable/disable functionality as desired

    [ResourceExposure(ResourceScope.None)]
    [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
    public override void Write(byte[] array, int offset, int count) 
    {
        if (array==null)
            throw new ArgumentNullException("array", SR.GetString(SR.ArgumentNull_Buffer));
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset", SR.GetString(SR.ArgumentOutOfRange_NeedNonNegNum));
        if (count < 0)
            throw new ArgumentOutOfRangeException("count", SR.GetString(SR.ArgumentOutOfRange_NeedNonNegNum));
        if (array.Length - offset < count)
            throw new ArgumentException(SR.GetString(SR.Argument_InvalidOffLen));

        Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");

        if (_writePos==0) {
            // Ensure we can write to the stream, and ready buffer for writing.
            if (!CanWrite) __Error.WriteNotSupported();
            if (_readPos < _readLen) FlushRead();
            _readPos = 0;
            _readLen = 0;
        }

        // Optimization: 
        // Don't allocate a buffer then call memcpy for 0 bytes.
        if (count == 0) 
            return;  

        do {
            // Avoid contention around spilling over the buffer, the locking mechanism here is bit unconventional.
            // Let's call this a YieldLock. It is closer to a spin lock than a semaphore but not quite a spin lock. 
            // Forced thread context switching is better than a tight spin lock here for several reasons. 
            // We utilize less CPU, yield to other threads (potentially the one doing the write, this is 
            // especially important under heavy thread/processor contention environment) and also yield to
            // runtime thread aborts (important when run from a high pri thread like finalizer).
            if (_writePos > bufferSize) {
                Thread.Sleep(1);
                continue;
            }

            // Optimization: 
            // For input chunk larger than internal buffer size, write directly
            // It is okay to have a ---- here with the _writePos check, which means
            // we have a loose order between flushing the intenal cache Vs writing 
            // this larger chunk but that is fine. This step will nicely optimize 
            // repeated writing of larger chunks by skipping the interlocked operation
            if ((_writePos == 0) && (count >= bufferSize)) {
                WriteCore(array, offset, count, true);
                return;
            }

            // We should review whether we need critical region markers for hosts. 
            Thread.BeginCriticalRegion();

            Interlocked.Increment(ref _pendingBufferCopy);
            int newPos = Interlocked.Add(ref _writePos, count);
            int oldPos = (newPos - count);

            // Clear the buffer
            if (newPos > bufferSize) {
                Interlocked.Decrement(ref _pendingBufferCopy);
                Thread.EndCriticalRegion();
                
                // Though the lock below is not necessary for correctness, when operating in a heavy 
                // thread contention environment, augmenting the YieldLock techinique with a critical 
                // section around write seems to be giving slightly better performance while 
                // not having noticable impact in the less contended situations.
                // Perhaps we can build a technique that keeps track of the contention?
                //lock (this) 
                {
                    // Make sure we didn't get pre-empted by another thread
                    if (_writePos  > bufferSize) {
                        if ((oldPos  <= bufferSize) && (oldPos  > 0)) {
                            while (_pendingBufferCopy != 0) {
                                Thread.SpinWait(1);
                            }
                            WriteCore(_buffer, 0, oldPos, true);
                            _writePos = 0;
                        }
                    }
                }
            }
            else {
                if (_buffer == null)
                    Interlocked.CompareExchange(ref _buffer, new byte[bufferSize], null);

                // Copy user data into buffer, to write at a later date.
                Buffer.BlockCopy(array, offset, _buffer, oldPos, count);
                
                Interlocked.Decrement(ref _pendingBufferCopy);
                Thread.EndCriticalRegion();
                return;
            }

        } while (true);
    } 

#if _ENABLE_STREAM_FACTORING
    public override long Position 
    {
        // Making the getter thread safe is not very useful anyways 
        get { 
            if (!CanSeek) __Error.SeekNotSupported();
            Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");

            // Compensate for buffer that we read from the handle (_readLen) Vs what the user
            // read so far from the internel buffer (_readPos). Of course add any unwrittern  
            // buffered data
            return pos + (_readPos - _readLen + _writePos);
        }

        [MethodImplAttribute(MethodImplOptions.Synchronized)]
        set {
            if (value < 0) throw new ArgumentOutOfRangeException("value", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
            
            if (_writePos > 0) FlushWrite(false);
            _readPos = 0;
            _readLen = 0;

            Seek(value, SeekOrigin.Begin);
        }
    }

    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override int Read(/*[In, Out]*/ byte[] array, int offset, int count) 
    {
        if (array == null)
            throw new ArgumentNullException("array", Helper.GetResourceString("ArgumentNull_Buffer"));
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
        if (count < 0)
            throw new ArgumentOutOfRangeException("count", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
        if (array.Length - offset < count)
            throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
        
        Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");

        bool isBlocked = false;
        int n = _readLen - _readPos;

        // If the read buffer is empty, read into either user's array or our
        // buffer, depending on number of bytes user asked for and buffer size.
        if (n == 0) {
            if (!CanRead) __Error.ReadNotSupported();
            if (_writePos > 0) FlushWrite(false);
            if (!CanSeek || (count >= bufferSize)) {
                n = ReadCore(array, offset, count);
                // Throw away read buffer.
                _readPos = 0;
                _readLen = 0;
                return n;
            }
            if (_buffer == null) _buffer = new byte[bufferSize];
            n = ReadCore(_buffer, 0, bufferSize);
            if (n == 0) return 0;
            isBlocked = n < bufferSize;
            _readPos = 0;
            _readLen = n;
        }

        // Now copy min of count or numBytesAvailable (ie, near EOF) to array.
        if (n > count) n = count;
        Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
        _readPos += n;

        // We may have read less than the number of bytes the user asked 
        // for, but that is part of the Stream contract.  Reading again for
        // more data may cause us to block if we're using a device with 
        // no clear end of file, such as a serial port or pipe.  If we
        // blocked here & this code was used with redirected pipes for a
        // process's standard output, this can lead to deadlocks involving
        // two processes. But leave this here for files to avoid what would
        // probably be a breaking change.         -- 

        return n;
    }

    [HostProtection(ExternalThreading=true)]
    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
    {
        if (array==null)
            throw new ArgumentNullException("array");
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
        if (numBytes < 0)
            throw new ArgumentOutOfRangeException("numBytes", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
        if (array.Length - offset < numBytes)
            throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));

        if (!CanRead) __Error.ReadNotSupported();

        Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
        
        if (_writePos > 0) FlushWrite(false);
        if (_readPos == _readLen) {
            // I can't see how to handle buffering of async requests when 
            // filling the buffer asynchronously, without a lot of complexity.
            // The problems I see are issuing an async read, we do an async 
            // read to fill the buffer, then someone issues another read 
            // (either synchronously or asynchronously) before the first one 
            // returns.  This would involve some sort of complex buffer locking
            // that we probably don't want to get into, at least not in V1.
            // If we did a [....] read to fill the buffer, we could avoid the
            // problem, and any async read less than 64K gets turned into a
            // synchronous read by NT anyways...       -- 

            if (numBytes < bufferSize) {
                if (_buffer == null) _buffer = new byte[bufferSize];
                IAsyncResult bufferRead = BeginReadCore(_buffer, 0, bufferSize, null, null, 0);
                _readLen = EndRead(bufferRead);
                int n = _readLen;
                if (n > numBytes) n = numBytes;
                Buffer.BlockCopy(_buffer, 0, array, offset, n);
                _readPos = n;

                // Fake async
                return BufferedStreamAsyncResult.Complete(n, userCallback, stateObject, false);
            }

            // Here we're making our position pointer inconsistent
            // with our read buffer.  Throw away the read buffer's contents.
            _readPos = 0;
            _readLen = 0;
            return BeginReadCore(array, offset, numBytes, userCallback, stateObject, 0);
        }
        else {
            int n = _readLen - _readPos;
            if (n > numBytes) n = numBytes;
            Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
            _readPos += n;

            if (n >= numBytes) 
                return BufferedStreamAsyncResult.Complete(n, userCallback, stateObject, false);

            // For streams with no clear EOF like serial ports or pipes
            // we cannot read more data without causing an app to block
            // incorrectly.  Pipes don't go down this path 
            // though.  This code needs to be fixed.
            // Throw away read buffer.
            _readPos = 0;
            _readLen = 0;
            
            // WARNING: all state on asyncResult objects must be set before
            // we call ReadFile in BeginReadCore, since the OS can run our
            // callback & the user's callback before ReadFile returns.
            return BeginReadCore(array, offset + n, numBytes - n, userCallback, stateObject, n);
        }
    }

    public unsafe override int EndRead(IAsyncResult asyncResult)
    {
        if (asyncResult == null)
            throw new ArgumentNullException("asyncResult"); 

        BufferedStreamAsyncResult bsar = asyncResult as BufferedStreamAsyncResult;
        if (bsar != null) {
            if (bsar._isWrite)
                __Error.WrongAsyncResult();
            return bsar._numBytes;
        }
        else {
            return EndReadCore(asyncResult);
        }
    }

    [HostProtection(ExternalThreading=true)]
    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
    {
        if (array==null)
            throw new ArgumentNullException("array");
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
        if (numBytes < 0)
            throw new ArgumentOutOfRangeException("numBytes", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
        if (array.Length - offset < numBytes)
            throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));

        if (!CanWrite) __Error.WriteNotSupported();

        Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");

        if (_writePos==0) {
            if (_readPos < _readLen) FlushRead();
            _readPos = 0;
            _readLen = 0;
        }

        int n = bufferSize - _writePos;
        if (numBytes <= n) {
            if (_buffer == null) _buffer = new byte[bufferSize];
            Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
            _writePos += numBytes;

            return BufferedStreamAsyncResult.Complete(numBytes, userCallback, stateObject, true);
        }

        if (_writePos > 0) FlushWrite(false);
        return BeginWriteCore(array, offset, numBytes, userCallback, stateObject);
    }

    public unsafe override void EndWrite(IAsyncResult asyncResult)
    {
        if (asyncResult == null)
            throw new ArgumentNullException("asyncResult"); 

        BufferedStreamAsyncResult bsar = asyncResult as BufferedStreamAsyncResult;
        if (bsar == null) 
            EndWriteCore(asyncResult);
    }

    // Reads a byte from the file stream.  Returns the byte cast to an int
    // or -1 if reading from the end of the stream.
    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override int ReadByte() 
    {
        if (_readLen == 0 && !CanRead) 
            __Error.ReadNotSupported();
        
        Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
        
        if (_readPos == _readLen) {
            if (_writePos > 0) FlushWrite(false);

            Debug.Assert(bufferSize > 0, "bufferSize > 0");
            if (_buffer == null) _buffer = new byte[bufferSize];
            _readLen = ReadCore(_buffer, 0, bufferSize);
            _readPos = 0;
        }

        if (_readPos == _readLen)
            return -1;

        int result = _buffer[_readPos];
        _readPos++;
        return result;
    }

    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override void WriteByte(byte value)
    {
        if (_writePos == 0) {
            if (!CanWrite) 
                __Error.WriteNotSupported();
            if (_readPos < _readLen) 
                FlushRead();
            _readPos = 0;
            _readLen = 0;
            Debug.Assert(bufferSize > 0, "bufferSize > 0");
            if (_buffer==null) 
                _buffer = new byte[bufferSize];
        }

        if (_writePos == bufferSize)
            FlushWrite(false);

        _buffer[_writePos] = value;
        _writePos++;
    }


    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override long Seek(long offset, SeekOrigin origin) 
    {
        if (origin<SeekOrigin.Begin || origin>SeekOrigin.End)
            throw new ArgumentException(Helper.GetResourceString("Argument_InvalidSeekOrigin"));
        
        if (!CanSeek) __Error.SeekNotSupported();

        Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");

        // If we've got bytes in our buffer to write, write them out.
        // If we've read in and consumed some bytes, we'll have to adjust
        // our seek positions ONLY IF we're seeking relative to the current
        // position in the stream.  This simulates doing a seek to the new
        // position, then a read for the number of bytes we have in our buffer.
        if (_writePos > 0) {
            FlushWrite(false);
        }
        else if (origin == SeekOrigin.Current) {
            // Don't call FlushRead here, which would have caused an infinite
            // loop.  Simply adjust the seek origin.  This isn't necessary
            // if we're seeking relative to the beginning or end of the stream.
            offset -= (_readLen - _readPos);
        }

        long oldPos = pos + (_readPos - _readLen);
        long curPos = SeekCore(offset, origin);

        // We now must update the read buffer.  We can in some cases simply
        // update _readPos within the buffer, copy around the buffer so our 
        // Position property is still correct, and avoid having to do more 
        // reads from the disk.  Otherwise, discard the buffer's contents.
        if (_readLen > 0) {
            // We can optimize the following condition:
            // oldPos - _readPos <= curPos < oldPos + _readLen - _readPos
            if (oldPos == curPos) {
                if (_readPos > 0) {
                    Buffer.BlockCopy(_buffer, _readPos, _buffer, 0, _readLen - _readPos);
                    _readLen -= _readPos;
                    _readPos = 0;
                }
                // If we still have buffered data, we must update the stream's 
                // position so our Position property is correct.
                if (_readLen > 0)
                    SeekCore(_readLen, SeekOrigin.Current);
            }
            else if (oldPos - _readPos < curPos && curPos < oldPos + _readLen - _readPos) {
                int diff = (int)(curPos - oldPos);
                Buffer.BlockCopy(_buffer, _readPos+diff, _buffer, 0, _readLen - (_readPos + diff));
                _readLen -= (_readPos + diff);
                _readPos = 0;
                if (_readLen > 0)
                    SeekCore(_readLen, SeekOrigin.Current);
            }
            else {
                // Lose the read buffer.
                _readPos = 0;
                _readLen = 0;
            }
            Debug.Assert(_readLen >= 0 && _readPos <= _readLen, "_readLen should be nonnegative, and _readPos should be less than or equal _readLen");
            Debug.Assert(curPos == Position, "Seek optimization: curPos != Position!  Buffer math was mangled.");
        }
        return curPos;
    }
#endif //_ENABLE_STREAM_FACTORING

    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    public override void Flush()
    {
        try {
            if (_writePos > 0) 
                FlushWrite(false);
            else if (_readPos < _readLen) 
                FlushRead();
        }
        finally {
            _writePos = 0;
            _readPos = 0;
            _readLen = 0;
        }
    }

    // Reading is done by blocks from the file, but someone could read
    // 1 byte from the buffer then write.  At that point, the OS's file
    // pointer is out of [....] with the stream's position.  All write 
    // functions should call this function to preserve the position in the file.
    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    protected void FlushRead() {
#if _ENABLE_STREAM_FACTORING
        Debug.Assert(_writePos == 0, "BufferedStream: Write buffer must be empty in FlushRead!");
        
        if (_readPos - _readLen != 0) {
            Debug.Assert(CanSeek, "BufferedStream will lose buffered read data now.");
            if (CanSeek)
                SeekCore(_readPos - _readLen, SeekOrigin.Current);
        }
        _readPos = 0;
        _readLen = 0;
#endif //_ENABLE_STREAM_FACTORING
    }

    // Writes are buffered.  Anytime the buffer fills up 
    // (_writePos + delta > bufferSize) or the buffer switches to reading
    // and there is left over data (_writePos > 0), this function must be called.
    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    protected void FlushWrite(bool blockForWrite) {
        Debug.Assert(_readPos == 0 && _readLen == 0, "BufferedStream: Read buffer must be empty in FlushWrite!");
        
        if (_writePos > 0) 
            WriteCore(_buffer, 0, _writePos, blockForWrite);
        _writePos = 0;
    }

    protected override void Dispose(bool disposing)
    {
        try {
            // Flush data to disk iff we were writing.  
            if (_writePos > 0) {
                // With our Whidbey async IO & overlapped support for AD unloads,
                // we don't strictly need to block here to release resources 
                // if the underlying IO is overlapped since that support 
                // takes care of the pinning & freeing the 
                // overlapped struct.  We need to do this when called from
                // Close so that the handle is closed when Close returns, but
                // we do't need to call EndWrite from the finalizer.  
                // Additionally, if we do call EndWrite, we block forever 
                // because AD unloads prevent us from running the managed 
                // callback from the IO completion port.  Blocking here when 
                // called from the finalizer during AD unload is clearly wrong, 
                // but we can't use any sort of test for whether the AD is 
                // unloading because if we weren't unloading, an AD unload 
                // could happen on a separate thread before we call EndWrite.
                FlushWrite(disposing);
            }
        }
        finally {
            // Don't set the buffer to null, to avoid a NullReferenceException
            // when users have a race condition in their code (ie, they call
            // Close when calling another method on Stream like Read).
            //_buffer = null;
            _readPos = 0;
            _readLen = 0;
            _writePos = 0;

            base.Dispose(disposing);
        }
    }

    //
    // Helper methods
    //

#if  _ENABLE_STREAM_FACTORING
    protected int BufferedWritePosition 
    {
        // Making the getter thread safe is not very useful anyways 
        get { 
            return _writePos;
        }
        //set {
        //    Interlocked.Exchange(ref _writePos, value);
        //}
    }

    protected int BufferedReadPosition 
    {
        // Making the getter thread safe is not very useful anyways 
        get { 
            return _readPos;
        }
        //set {
        //    Interlocked.Exchange(ref _readPos, value);
        //}
    }
#endif  //_ENABLE_STREAM_FACTORING

    protected long UnderlyingStreamPosition 
    {
        // Making the getter thread safe is not very useful anyways 
        get { 
            return pos;
        }
        set {
            Interlocked.Exchange(ref pos, value);
        }
    }

    protected long AddUnderlyingStreamPosition(long posDelta)
    {
        return Interlocked.Add(ref pos, posDelta);
    }
    
    [MethodImplAttribute(MethodImplOptions.Synchronized)]
    protected internal void DiscardBuffer()
    {
        _readPos = 0;
        _readLen = 0;
        _writePos = 0;
    }

    private void WriteCore(byte[] buffer, int offset, int count, bool blockForWrite) 
    {
        long streamPos;
        WriteCore(buffer, offset, count, blockForWrite, out streamPos);
    }
    protected abstract void WriteCore(byte[] buffer, int offset, int count, bool blockForWrite, out long streamPos);

#if _ENABLE_STREAM_FACTORING
    private int ReadCore(byte[] buffer, int offset, int count) 
    {
        long streamPos;
        return ReadCore(buffer, offset, count, out streamPos);
    }
    
    private int EndReadCore(IAsyncResult asyncResult)
    {
        long streamPos;
        return EndReadCore(asyncResult, out streamPos);
    }

    private void EndWriteCore(IAsyncResult asyncResult)
    {
        long streamPos;
        EndWriteCore(asyncResult, out streamPos);
    }

    // Derived streams should implement the following core methods
    protected abstract int ReadCore(byte[] buffer, int offset, int count, out long streamPos);
    [ResourceExposure(ResourceScope.None)]
    [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
    protected abstract IAsyncResult BeginReadCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, Object stateObject, int numBufferedBytesRead);
    protected abstract int EndReadCore(IAsyncResult asyncResult, out long streamPos);
    [ResourceExposure(ResourceScope.None)]
    [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
    protected abstract IAsyncResult BeginWriteCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, Object stateObject);
    protected abstract void EndWriteCore(IAsyncResult asyncResult, out long streamPos);
    protected abstract long SeekCore(long offset, SeekOrigin origin);
#endif //_ENABLE_STREAM_FACTORING
}

#if _ENABLE_STREAM_FACTORING
// Fake async result 
unsafe internal sealed class BufferedStreamAsyncResult : IAsyncResult
{
    // User code callback
    internal AsyncCallback _userCallback;
    internal Object _userStateObject;
    internal int _numBytes;     // number of bytes read OR written
    //internal int _errorCode;

    internal bool _isWrite;     // Whether this is a read or a write

    public Object AsyncState
    {
        get { return _userStateObject; }
    }

    public bool IsCompleted
    {
        get { return true; }
    }

    public WaitHandle AsyncWaitHandle
    {
        get { return null; }
    }

    public bool CompletedSynchronously
    {
        get { return true; }
    }

    internal static IAsyncResult Complete(int numBufferedBytes, AsyncCallback userCallback, Object userStateObject, bool isWrite)
    {
        // Fake async
        BufferedStreamAsyncResult asyncResult = new BufferedStreamAsyncResult();
        asyncResult._numBytes = numBufferedBytes;
        asyncResult._userCallback = userCallback;
        asyncResult._userStateObject = userStateObject;
        asyncResult._isWrite = isWrite;
        if (userCallback != null) {
            userCallback(asyncResult);
        }
        return asyncResult;
    }
}
#endif //_ENABLE_STREAM_FACTORING
}