#region Disclaimer / License
// Copyright (C) 2015, The Duplicati Team
// http://www.duplicati.com, info@duplicati.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
//
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
namespace Duplicati.Library.Utility
{
///
/// A link connecting a streaming writer to a streaming reader through a buffer.
/// Should be used to enable multithreading by linking sequential processes avoiding tempfiles (piping).
/// Safe for a single reader and writer to be in different threads. It is possible to pass the written
/// data on to an underlying stream. Use another DirectLinkStream as passthrough to feed multiple spawned
/// threads processing data.
///
public class DirectStreamLink : IDisposable
{
///
/// Main lock to synchronize operations between reader and writer.
/// Used to protect accesses to all state vars and making decisions
/// on blocking thereon.
///
private object m_lock = new object();
/// An event to wake reader.
private ManualResetEventSlim m_signalDataAvailable = new ManualResetEventSlim(false);
/// An event to wake writer.
private ManualResetEventSlim m_signalBufferAvailable = new ManualResetEventSlim(true);
/// Track closes and enable threadsafe self dispose.
private int m_autoDisposeCounter = 2;
/// Number of bytes written to buffer.
private long m_written = 0;
/// Number of bytes read from buffer.
private long m_read = 0;
/// If the writer is closed.
private volatile bool m_writerClosed = false;
/// If the reader is closed.
private volatile bool m_readerClosed = false;
/// The buffer for piping.
private byte[] m_buf;
/// A stream to pass writes to. For stream stacking.
private Stream m_passWriteThrough;
/// Allows to set a length for SubStreams Length property.
private long m_knownLength = -1;
///
/// If set the length is enforced as follows:
/// EndOfStreamException On Write() if writer tries to write more bytes.
/// EndOfStreamException On Read() if reader tries to read more bytes after writer has closed before knownLength.
///
private bool m_enforceKnownLength = false;
///
/// Forces to wait until reader has consumed all bytes from buffer
/// on a Flush operation. Set via constructor.
///
private bool m_blockOnFlush = true;
///
/// Forces to wait until reader has closed its stream
/// on writer's Close operation. This may be used to
/// synchronize worker threads. Set via constructor.
///
private bool m_blockOnClose = true;
/// The helper stream for reader from pipe.
private LinkedReaderStream m_readerStream;
/// The helper stream for writer to pipe.
private LinkedWriterStream m_writerStream;
/// Sets up the DirectStreamLink with a certain behaviour.
/// The size of the internal buffer to use.
/// Specifies to block Flush until reader has read buffer empty. This is not suited for synching.
/// Specifies to block close of writer until reader has closed. Can be used for synching.
/// A stream all written data is just passed to. For stream stacking.
public DirectStreamLink(int bufsize, bool blockOnFlush, bool blockOnClose, Stream passWriteThrough)
{
m_passWriteThrough = passWriteThrough;
m_blockOnFlush = blockOnFlush;
m_blockOnClose = blockOnClose;
if (bufsize <= 0) throw new ArgumentOutOfRangeException(nameof(bufsize), "The size of the buffer must be positive.");
m_buf = new byte[bufsize];
m_readerStream = new LinkedReaderStream(this);
m_writerStream = new LinkedWriterStream(this);
}
/// The Stream to read from the link.
public Stream ReaderStream { get { return m_readerStream; } }
/// The Stream to write to the link.
public Stream WriterStream { get { return m_writerStream; } }
///
/// Allows to set and optionally enforce the length of the piped data if known before.
/// This may help consumers that need a length for correct operation.
/// Set length to negative value if length is not known (default).
///
public void SetKnownLength (long length, bool enforce)
{
lock (m_lock)
{
m_knownLength = length;
m_enforceKnownLength = enforce;
}
}
/// Read bytes from the Pipe. Blocks if none available. Redirected from ReaderStream.
private int read(byte[] buffer, int offset, int count)
{
int bytesRead = 0;
while (count > 0)
{
int readBytes = 0;
int startIndex = 0;
int bufFilled;
lock (m_lock)
{
bufFilled = (int)(m_written - m_read);
if (bufFilled == 0)
{
if (bytesRead > 0) return bytesRead; // we have data, return it instead of blocking.
if (m_writerClosed)
{
if (m_enforceKnownLength && m_read < m_knownLength) throw new EndOfStreamException();
else return bytesRead; // stream is done.
}
m_signalDataAvailable.Reset(); // block and wait for data
}
else
{
startIndex = (int)(m_read % m_buf.Length);
readBytes = m_buf.Length - startIndex; // maximum to end of buffer
if (count < readBytes) readBytes = count;
if (bufFilled < readBytes) { readBytes = bufFilled; count = bufFilled; }
}
}
if (readBytes == 0) m_signalDataAvailable.Wait();
else
{
Array.Copy(m_buf, startIndex, buffer, offset + bytesRead, readBytes);
bytesRead += readBytes;
count -= readBytes;
lock (m_lock)
{
m_read += readBytes;
m_signalBufferAvailable.Set();
}
}
}
return bytesRead;
}
/// Write bytes to the Pipe. Blocks if buffer full. Redirected from WriterStream.
private void write(byte[] buffer, int offset, int count)
{
int orgOffset = offset, orgCount = count;
while (count > 0)
{
int writeBytes = 0;
int startIndex = 0;
int bufFree;
lock (m_lock)
{
if (m_enforceKnownLength && m_knownLength >= 0 && (m_written + count) > m_knownLength)
throw new EndOfStreamException();
if (m_readerClosed) return; // we do not care about writes after reader has closed his stream (Note: PassThrough is still done).
bufFree = (int)(m_buf.Length - (m_written - m_read));
if (bufFree == 0) m_signalBufferAvailable.Reset();
else
{
startIndex = (int)(m_written % m_buf.Length);
writeBytes = m_buf.Length - startIndex; // maximum to end of buffer
if (bufFree < writeBytes) writeBytes = bufFree;
}
}
if (writeBytes == 0) m_signalBufferAvailable.Wait();
else
{
if (count < writeBytes) writeBytes = count;
Array.Copy(buffer, offset, m_buf, startIndex, writeBytes);
offset += writeBytes;
count -= writeBytes;
lock (m_lock)
{
m_written += writeBytes;
m_signalDataAvailable.Set();
}
}
}
// We pass through the data after written to our own buffer.
// This allows our worker to start processing.
// If we block because our worker cannot consume fast enough,
// We are the slower part in the chain anyway.
if (m_passWriteThrough != null)
m_passWriteThrough.Write(buffer, orgOffset, orgCount);
}
private void flush()
{
if (m_passWriteThrough != null)
m_passWriteThrough.Flush();
if (m_blockOnFlush) // wait until reader has consumed last chunk of data
{
bool isEmpty = false;
while (!isEmpty && !m_readerClosed)
{
lock (m_lock)
{
isEmpty = !(m_read < m_written);
if (!isEmpty && !m_readerClosed) m_signalBufferAvailable.Reset();
}
m_signalBufferAvailable.Wait();
}
}
}
private void readerClosed()
{
lock (m_lock)
{
if (m_readerClosed) return;
m_readerClosed = true;
m_signalBufferAvailable.Set(); // unblock potentially waiting writer
m_readerStream = null;
}
if (Interlocked.Decrement(ref m_autoDisposeCounter) == 0)
this.Dispose();
}
private void writerClosed()
{
lock (m_lock)
{
if (m_writerClosed) return;
m_writerClosed = true;
m_signalDataAvailable.Set(); // unblock potentially waiting reader
m_writerStream = null;
}
flush();
// we close m_passWriteThrough before blocking, so if at end of chain (stacked DirectStreamLinks)
// a blocked reader is waiting, it can proceed sooner.
if (m_passWriteThrough != null)
{ m_passWriteThrough.Close(); }
if (m_blockOnClose) // wait until reader has closed its stream before continuing.
{
while (!m_readerClosed)
{
lock (m_lock)
{
if (!m_readerClosed) m_signalBufferAvailable.Reset();
}
m_signalBufferAvailable.Wait();
}
}
if (Interlocked.Decrement(ref m_autoDisposeCounter) == 0)
this.Dispose();
}
/// Disposes class. Is triggered automatically as soon as reader and writer are closed.
public void Dispose()
{
lock (m_lock)
{
if (!m_readerClosed && m_readerStream != null) m_readerStream.Close();
if (!m_writerClosed && m_writerStream != null) m_writerStream.Close();
}
this.m_signalBufferAvailable.Dispose();
this.m_signalDataAvailable.Dispose();
}
#region HelperClasses: Reader, Writer and a DataPump
/// Common base class for reader and writer.
private abstract class LinkedSubStream : Stream
{
protected DirectStreamLink m_linkStream;
protected long m_knownLength = -1;
public LinkedSubStream(DirectStreamLink linkStream)
{ this.m_linkStream = linkStream; }
public override bool CanSeek { get { return false; } }
public override void SetLength(long value) { throw new NotSupportedException(); }
public void SetFakeLength(long value) { m_knownLength = value; }
public override long Length { get { if (m_linkStream.m_knownLength >= 0) return m_linkStream.m_knownLength; else throw new NotSupportedException(); } }
// We fake Seek and Position to at least support dummy operations.
// That mitigates some things like setting Position = 0 on start
// and if callers rely on Position instead of counting themselves.
public override long Seek(long offset, SeekOrigin origin)
{
switch (origin)
{
case SeekOrigin.Begin: return this.Position = offset;
case SeekOrigin.Current: return this.Position = this.Position + offset;
default: throw new NotSupportedException();
}
}
public override long Position
{ set { if (value == this.Position) return; else throw new NotSupportedException(); } }
}
/// The class for readig from DirectStreamLink.
private class LinkedReaderStream : LinkedSubStream
{
public LinkedReaderStream(DirectStreamLink linkStream)
: base(linkStream) { }
public override bool CanRead { get { return true; } }
public override bool CanWrite { get { return false; } }
public override long Position
{
get { return m_linkStream.m_read; }
set { throw new NotSupportedException(); }
}
public override int Read(byte[] buffer, int offset, int count)
{ return m_linkStream.read(buffer, offset, count); }
public override void Write(byte[] buffer, int offset, int count)
{ throw new NotSupportedException(); }
protected override void Dispose(bool disposing)
{
m_linkStream.readerClosed();
base.Dispose(disposing);
}
public override void Flush() { }
}
/// The class for writing to DirectStreamLink.
private class LinkedWriterStream : LinkedSubStream
{
public LinkedWriterStream(DirectStreamLink linkStream)
: base(linkStream) { }
public override bool CanRead { get { return false; } }
public override bool CanWrite { get { return true; } }
public override long Position
{
get { return m_linkStream.m_written; }
set { throw new NotSupportedException(); }
}
public override int Read(byte[] buffer, int offset, int count)
{ throw new NotSupportedException(); }
public override void Write(byte[] buffer, int offset, int count)
{ m_linkStream.write(buffer, offset, count); }
protected override void Dispose(bool disposing)
{
m_linkStream.writerClosed();
base.Dispose(disposing);
}
public override void Flush() { m_linkStream.flush(); }
}
///
/// A helper class to transfer data asynchronously from one stream into another.
/// Handles exceptions, takes care of closes when done and is suited to be used
/// with syncing-on-Close in DirectStreamLink's.
/// When data is transferred, streams are closed by default (see constructor).
///
public class DataPump
{
/// Minimum buffer size for pumping
public const int MINBUFSIZE = 1 << 10; // 1K
/// Default buffer size for pumping
public const int DEFAULTBUFSIZE = 1 << 14; // 16K
private readonly int m_bufsize;
private readonly bool m_closeInputWhenDone, m_closeOutputWhenDone;
private readonly Action m_callbackFinalizePumping = null;
private Stream m_input, m_output;
private long m_count = 0;
private volatile bool m_wasStarted = false;
/// Creates and configures a new DataPump instance.
/// The stream to read data from.
/// The stream to write data to.
/// The internal buffer size for reading/writing.
/// A callback to issue when pumping is done but before streams are closed. e.g. Can add data to output.
/// Disable auto close of input stream when pumping is done.
/// Disable auto close of output stream when pumping is done.
public DataPump(Stream input, Stream output, int bufsize = DEFAULTBUFSIZE
, Action callbackFinalizePumping = null
, bool dontCloseInputWhenDone = false, bool dontCloseOutputWhenDone = false)
{
this.m_input = input;
this.m_output = output;
this.m_bufsize = Math.Max(MINBUFSIZE, bufsize);
this.m_callbackFinalizePumping = callbackFinalizePumping;
this.m_closeInputWhenDone = !dontCloseInputWhenDone;
this.m_closeOutputWhenDone = !dontCloseOutputWhenDone;
}
/// Returns number of bytes currently transferred.
public long BytesPumped
{ get { return System.Threading.Interlocked.Read(ref m_count); } }
/// Returns if the DataPump was started.
public bool WasStarted { get { return m_wasStarted; } }
///
/// Runs DataPump blocking. Can be used with Task class when calling from
/// higher .Net Framework. Returns number of bytes pumped. Rethrows exceptions
/// if any on Read() or Write().
///
public long Run()
{
if (m_wasStarted)
throw new InvalidOperationException();
m_wasStarted = true;
return doRun(true);
}
/// Actually transfers stream data.
private long doRun(bool rethrowException)
{
Exception hadException = null;
byte[] buf = new byte[1 << 14]; int c;
try
{
while ((c = m_input.Read(buf, 0, buf.Length)) > 0)
{
m_output.Write(buf, 0, c);
System.Threading.Interlocked.Add(ref m_count, c);
}
if (m_callbackFinalizePumping != null)
{
try { m_callbackFinalizePumping(this); }
catch { }
}
}
catch (Exception ex)
{
hadException = ex;
if (rethrowException) throw;
}
finally
{
// When done, close streams and clear references.
// We close output first. This is on purpose to mitigate potential race conditions
// when a caller is synchronizing against close of input stream.
// This is commonly the case when DataPump is used to decouple two previously stacked streams
// through DirectLinkStream with BlockOnClose option.
try { if (m_output != null && m_closeOutputWhenDone) m_output.Close(); }
catch { }
try { if (m_input != null && m_closeInputWhenDone) m_input.Close(); }
catch { }
m_input = m_output = null;
}
return m_count;
}
}
#endregion
}
}