Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/duplicati/duplicati.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwarwickmm <warwickmm@users.noreply.github.com>2021-05-04 19:42:52 +0300
committerGitHub <noreply@github.com>2021-05-04 19:42:52 +0300
commit50ed059e11efeb73a5d0943f5349f64064e19c8c (patch)
tree54aa0ded9bdf1a6fa361c98a263d4e8b50410d3d
parent90bcedbd52ca1e7d6dd622cf655684a5ea53b0c3 (diff)
parentfa33e5f135b09a3c464a492b21affea51b397a3c (diff)
Merge pull request #4469 from tygill/fix/ms-graph-throttling
Fix OneDrive throttling and respect retry-after header.
-rw-r--r--Duplicati/CommandLine/BackendTester/NonSeekableStream.cs10
-rw-r--r--Duplicati/CommandLine/BackendTester/Program.cs38
-rw-r--r--Duplicati/Library/Backend/OAuthHelper/Duplicati.Library.OAuthHelper.csproj139
-rw-r--r--Duplicati/Library/Backend/OAuthHelper/OAuthHttpClient.cs5
-rw-r--r--Duplicati/Library/Backend/OAuthHelper/RetryAfterHelper.cs125
-rw-r--r--Duplicati/Library/Backend/OneDrive/Exceptions.cs11
-rw-r--r--Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs445
-rw-r--r--Duplicati/Library/Utility/ReadLimitLengthStream.cs153
-rw-r--r--Duplicati/Library/Utility/Utility.cs14
-rw-r--r--Duplicati/UnitTest/UtilityTests.cs169
10 files changed, 899 insertions, 210 deletions
diff --git a/Duplicati/CommandLine/BackendTester/NonSeekableStream.cs b/Duplicati/CommandLine/BackendTester/NonSeekableStream.cs
index f47f124c4..f60dc495d 100644
--- a/Duplicati/CommandLine/BackendTester/NonSeekableStream.cs
+++ b/Duplicati/CommandLine/BackendTester/NonSeekableStream.cs
@@ -42,14 +42,20 @@ namespace Duplicati.CommandLine.BackendTester
}
}
+ public override long Position
+ {
+ get => base.Position;
+ set => throw new NotSupportedException();
+ }
+
public override long Seek(long offset, System.IO.SeekOrigin origin)
{
- throw new NotImplementedException();
+ throw new NotSupportedException();
}
public override void SetLength(long value)
{
- throw new NotImplementedException();
+ throw new NotSupportedException();
}
}
}
diff --git a/Duplicati/CommandLine/BackendTester/Program.cs b/Duplicati/CommandLine/BackendTester/Program.cs
index 3537472a4..7ec611cb4 100644
--- a/Duplicati/CommandLine/BackendTester/Program.cs
+++ b/Duplicati/CommandLine/BackendTester/Program.cs
@@ -225,6 +225,28 @@ namespace Duplicati.CommandLine.BackendTester
bool skipOverwriteTest = Library.Utility.Utility.ParseBoolOption(options, "skip-overwrite-test");
bool trimFilenameSpaces = Library.Utility.Utility.ParseBoolOption(options, "trim-filename-spaces");
+ long throttleUpload = 0;
+ if (options.TryGetValue("throttle-upload", out string throttleUploadString))
+ {
+ if (!(backend is IStreamingBackend) || disableStreaming)
+ {
+ Console.WriteLine("Warning: Throttling is only supported in this tool on streaming backends");
+ }
+
+ throttleUpload = Duplicati.Library.Utility.Sizeparser.ParseSize(throttleUploadString, "kb");
+ }
+
+ long throttleDownload = 0;
+ if (options.TryGetValue("throttle-download", out string throttleDownloadString))
+ {
+ if (!(backend is IStreamingBackend) || disableStreaming)
+ {
+ Console.WriteLine("Warning: Throttling is only supported in this tool on streaming backends");
+ }
+
+ throttleDownload = Duplicati.Library.Utility.Sizeparser.ParseSize(throttleDownloadString, "kb");
+ }
+
if (options.ContainsKey("number-of-files"))
number_of_files = int.Parse(options["number-of-files"]);
if (options.ContainsKey("min-file-size"))
@@ -266,9 +288,9 @@ namespace Duplicati.CommandLine.BackendTester
//Upload a dummy file for entry 0 and the last one, they will be replaced by the real files afterwards
//We upload entry 0 twice just to try to freak any internal cache list
- Uploadfile(dummy, 0, files[0].remotefilename, backend, disableStreaming);
- Uploadfile(dummy, 0, files[0].remotefilename, backend, disableStreaming);
- Uploadfile(dummy, files.Count - 1, files[files.Count - 1].remotefilename, backend, disableStreaming);
+ Uploadfile(dummy, 0, files[0].remotefilename, backend, disableStreaming, throttleUpload);
+ Uploadfile(dummy, 0, files[0].remotefilename, backend, disableStreaming, throttleUpload);
+ Uploadfile(dummy, files.Count - 1, files[files.Count - 1].remotefilename, backend, disableStreaming, throttleUpload);
}
}
@@ -276,7 +298,7 @@ namespace Duplicati.CommandLine.BackendTester
Console.WriteLine("Uploading files ...");
for (int i = 0; i < files.Count; i++)
- Uploadfile(files[i].localfilename, i, files[i].remotefilename, backend, disableStreaming);
+ Uploadfile(files[i].localfilename, i, files[i].remotefilename, backend, disableStreaming, throttleUpload);
TempFile originalRenamedFile = null;
string renamedFileNewName = null;
@@ -345,7 +367,8 @@ namespace Duplicati.CommandLine.BackendTester
if (backend is IStreamingBackend streamingBackend && !disableStreaming)
{
using (System.IO.FileStream fs = new System.IO.FileStream(cf, System.IO.FileMode.Create, System.IO.FileAccess.Write, System.IO.FileShare.None))
- using (NonSeekableStream nss = new NonSeekableStream(fs))
+ using (Library.Utility.ThrottledStream ts = new Library.Utility.ThrottledStream(fs, throttleDownload, throttleDownload))
+ using (NonSeekableStream nss = new NonSeekableStream(ts))
streamingBackend.Get(files[i].remotefilename, nss);
}
else
@@ -484,7 +507,7 @@ namespace Duplicati.CommandLine.BackendTester
return true;
}
- private static void Uploadfile(string localfilename, int i, string remotefilename, IBackend backend, bool disableStreaming)
+ private static void Uploadfile(string localfilename, int i, string remotefilename, IBackend backend, bool disableStreaming, long throttle)
{
Console.Write("Uploading file {0}, {1} ... ", i, Duplicati.Library.Utility.Utility.FormatSizeString(new System.IO.FileInfo(localfilename).Length));
Exception e = null;
@@ -494,7 +517,8 @@ namespace Duplicati.CommandLine.BackendTester
if (backend is IStreamingBackend streamingBackend && !disableStreaming)
{
using (System.IO.FileStream fs = new System.IO.FileStream(localfilename, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.Read))
- using (NonSeekableStream nss = new NonSeekableStream(fs))
+ using (Library.Utility.ThrottledStream ts = new Library.Utility.ThrottledStream(fs, throttle, throttle))
+ using (NonSeekableStream nss = new NonSeekableStream(ts))
streamingBackend.PutAsync(remotefilename, nss, CancellationToken.None).Wait();
}
else
diff --git a/Duplicati/Library/Backend/OAuthHelper/Duplicati.Library.OAuthHelper.csproj b/Duplicati/Library/Backend/OAuthHelper/Duplicati.Library.OAuthHelper.csproj
index 3d3d46406..26538d17a 100644
--- a/Duplicati/Library/Backend/OAuthHelper/Duplicati.Library.OAuthHelper.csproj
+++ b/Duplicati/Library/Backend/OAuthHelper/Duplicati.Library.OAuthHelper.csproj
@@ -1,68 +1,73 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <PropertyGroup>
- <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
- <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProjectGuid>{D4C37C33-5E73-4B56-B2C3-DC4A6BAA36BB}</ProjectGuid>
- <OutputType>Library</OutputType>
- <RootNamespace>Duplicati.Library.OAuthHelper</RootNamespace>
- <AssemblyName>Duplicati.Library.OAuthHelper</AssemblyName>
- <AssemblyOriginatorKeyFile>Duplicati.snk</AssemblyOriginatorKeyFile>
- <TargetFrameworkVersion>v4.7.1</TargetFrameworkVersion>
- <UseMSBuildEngine>false</UseMSBuildEngine>
- <TargetFrameworkProfile />
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
- <DebugSymbols>true</DebugSymbols>
- <DebugType>full</DebugType>
- <Optimize>false</Optimize>
- <OutputPath>bin\Debug</OutputPath>
- <DefineConstants>DEBUG;</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- <ConsolePause>false</ConsolePause>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
- <DebugType>full</DebugType>
- <Optimize>true</Optimize>
- <OutputPath>bin\Release</OutputPath>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- <ConsolePause>false</ConsolePause>
- </PropertyGroup>
- <ItemGroup>
- <Reference Include="Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
- <HintPath>..\..\..\..\packages\Newtonsoft.Json.12.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
- </Reference>
- <Reference Include="System" />
- <Reference Include="System.Net.Http" />
- <Reference Include="System.Net.Http.WebRequest" />
- </ItemGroup>
- <ItemGroup>
- <Compile Include="OAuthHttpClient.cs" />
- <Compile Include="OAuthHttpMessageHandler.cs" />
- <Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="OAuthHelper.cs" />
- <Compile Include="Strings.cs" />
- <Compile Include="JSONWebHelper.cs" />
- <Compile Include="MultipartItem.cs" />
- </ItemGroup>
- <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
- <ItemGroup>
- <ProjectReference Include="..\..\Utility\Duplicati.Library.Utility.csproj">
- <Project>{DE3E5D4C-51AB-4E5E-BEE8-E636CEBFBA65}</Project>
- <Name>Duplicati.Library.Utility</Name>
- </ProjectReference>
- <ProjectReference Include="..\..\Localization\Duplicati.Library.Localization.csproj">
- <Project>{B68F2214-951F-4F78-8488-66E1ED3F50BF}</Project>
- <Name>Duplicati.Library.Localization</Name>
- </ProjectReference>
- <ProjectReference Include="..\..\Interface\Duplicati.Library.Interface.csproj">
- <Project>{C5899F45-B0FF-483C-9D38-24A9FCAAB237}</Project>
- <Name>Duplicati.Library.Interface</Name>
- </ProjectReference>
- </ItemGroup>
- <ItemGroup>
- <None Include="packages.config" />
- </ItemGroup>
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{D4C37C33-5E73-4B56-B2C3-DC4A6BAA36BB}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <RootNamespace>Duplicati.Library.OAuthHelper</RootNamespace>
+ <AssemblyName>Duplicati.Library.OAuthHelper</AssemblyName>
+ <AssemblyOriginatorKeyFile>Duplicati.snk</AssemblyOriginatorKeyFile>
+ <TargetFrameworkVersion>v4.7.1</TargetFrameworkVersion>
+ <UseMSBuildEngine>false</UseMSBuildEngine>
+ <TargetFrameworkProfile />
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug</OutputPath>
+ <DefineConstants>DEBUG;</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <ConsolePause>false</ConsolePause>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>full</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release</OutputPath>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <ConsolePause>false</ConsolePause>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
+ <HintPath>..\..\..\..\packages\Newtonsoft.Json.12.0.2\lib\net45\Newtonsoft.Json.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Net.Http" />
+ <Reference Include="System.Net.Http.WebRequest" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="OAuthHttpClient.cs" />
+ <Compile Include="OAuthHttpMessageHandler.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="OAuthHelper.cs" />
+ <Compile Include="RetryAfterHelper.cs" />
+ <Compile Include="Strings.cs" />
+ <Compile Include="JSONWebHelper.cs" />
+ <Compile Include="MultipartItem.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <ItemGroup>
+ <ProjectReference Include="..\..\Logging\Duplicati.Library.Logging.csproj">
+ <Project>{d10a5fc0-11b4-4e70-86aa-8aea52bd9798}</Project>
+ <Name>Duplicati.Library.Logging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\Utility\Duplicati.Library.Utility.csproj">
+ <Project>{DE3E5D4C-51AB-4E5E-BEE8-E636CEBFBA65}</Project>
+ <Name>Duplicati.Library.Utility</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\Localization\Duplicati.Library.Localization.csproj">
+ <Project>{B68F2214-951F-4F78-8488-66E1ED3F50BF}</Project>
+ <Name>Duplicati.Library.Localization</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\Interface\Duplicati.Library.Interface.csproj">
+ <Project>{C5899F45-B0FF-483C-9D38-24A9FCAAB237}</Project>
+ <Name>Duplicati.Library.Interface</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
</Project> \ No newline at end of file
diff --git a/Duplicati/Library/Backend/OAuthHelper/OAuthHttpClient.cs b/Duplicati/Library/Backend/OAuthHelper/OAuthHttpClient.cs
index 8937f4a7d..58ad15d9b 100644
--- a/Duplicati/Library/Backend/OAuthHelper/OAuthHttpClient.cs
+++ b/Duplicati/Library/Backend/OAuthHelper/OAuthHttpClient.cs
@@ -59,6 +59,11 @@ namespace Duplicati.Library
{
this.Timeout = HttpContextSettings.OperationTimeout;
}
+ else
+ {
+ // If no timeout is set, default to infinite
+ this.Timeout = System.Threading.Timeout.InfiniteTimeSpan;
+ }
// Set the default user agent
this.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("Duplicati", USER_AGENT_VERSION));
diff --git a/Duplicati/Library/Backend/OAuthHelper/RetryAfterHelper.cs b/Duplicati/Library/Backend/OAuthHelper/RetryAfterHelper.cs
new file mode 100644
index 000000000..ce1e60f8b
--- /dev/null
+++ b/Duplicati/Library/Backend/OAuthHelper/RetryAfterHelper.cs
@@ -0,0 +1,125 @@
+// Copyright (C) 2018, The Duplicati Team
+// http://www.duplicati.com, info@duplicati.com
+//
+// This library is free software; you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation; either version 2.1 of the
+// License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+using Duplicati.Library.Logging;
+using Duplicati.Library.Utility;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.Http.Headers;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Duplicati.Library
+{
+ public class RetryAfterHelper
+ {
+ private static readonly string LOGTAG = Log.LogTagFromType<RetryAfterHelper>();
+
+ // Whenever a response includes a Retry-After header, we'll update this timestamp with when we can next
+ // send a request. And before sending any requests, we'll make sure to wait until at least this time.
+ // Since this may be read and written by multiple threads, it is stored as a long and updated using Interlocked.Exchange.
+ private long retryAfter = DateTimeOffset.MinValue.UtcTicks;
+
+ public void SetRetryAfter(RetryConditionHeaderValue retryAfter)
+ {
+ if (retryAfter != null)
+ {
+ DateTimeOffset? delayUntil = null;
+ if (retryAfter.Delta.HasValue)
+ {
+ delayUntil = DateTimeOffset.UtcNow + retryAfter.Delta.Value;
+ }
+ else if (retryAfter.Date.HasValue)
+ {
+ delayUntil = retryAfter.Date.Value;
+ }
+
+ if (delayUntil.HasValue)
+ {
+ // Set the retry timestamp to the UTC version of the timestamp.
+ long newRetryAfter = delayUntil.Value.UtcTicks;
+
+ // Update the persisted retry after timestamp
+ long replacedRetryAfter;
+ long currentRetryAfter;
+ do
+ {
+ currentRetryAfter = Interlocked.Read(ref this.retryAfter);
+
+ if (newRetryAfter < currentRetryAfter)
+ {
+ // If the current retry after is already past the new value, then no need to update it again.
+ break;
+ }
+ else
+ {
+ replacedRetryAfter = Interlocked.CompareExchange(ref this.retryAfter, newRetryAfter, currentRetryAfter);
+ }
+ }
+ while (replacedRetryAfter != currentRetryAfter);
+ }
+ }
+ }
+
+ public void WaitForRetryAfter()
+ {
+ this.WaitForRetryAfterAsync(CancellationToken.None).Await();
+ }
+
+ public async Task WaitForRetryAfterAsync(CancellationToken cancelToken)
+ {
+ TimeSpan delay = this.GetDelayTime();
+
+ if (delay > TimeSpan.Zero)
+ {
+ Log.WriteProfilingMessage(
+ LOGTAG,
+ "RetryAfterWait",
+ "Waiting for {0} to respect Retry-After header",
+ delay);
+
+ await Task.Delay(delay).ConfigureAwait(false);
+ }
+ }
+
+ private TimeSpan GetDelayTime()
+ {
+ // Make sure this is thread safe in case multiple calls are made concurrently to this backend
+ // This is done by reading the value into a local value which is then parsed and operated on locally.
+ long retryAfterTicks = Interlocked.Read(ref this.retryAfter);
+ DateTimeOffset delayUntil = new DateTimeOffset(retryAfterTicks, TimeSpan.Zero);
+
+ TimeSpan delay;
+ DateTimeOffset now = DateTimeOffset.UtcNow;
+
+ // Make sure delayUntil is in the future and delay until then if so
+ if (delayUntil >= now)
+ {
+ delay = delayUntil - now;
+ }
+ else
+ {
+ // If the date given was in the past then don't wait at all
+ delay = TimeSpan.Zero;
+ }
+
+ return delay;
+ }
+ }
+}
diff --git a/Duplicati/Library/Backend/OneDrive/Exceptions.cs b/Duplicati/Library/Backend/OneDrive/Exceptions.cs
index 0eb50882d..a7685c516 100644
--- a/Duplicati/Library/Backend/OneDrive/Exceptions.cs
+++ b/Duplicati/Library/Backend/OneDrive/Exceptions.cs
@@ -2,6 +2,7 @@
using System.IO;
using System.Net;
using System.Net.Http;
+using System.Net.Http.Headers;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
@@ -80,8 +81,6 @@ namespace Duplicati.Library.Backend.MicrosoftGraph
}
}
- public HttpResponseMessage Response { get; private set; }
-
protected static string ResponseToString(HttpResponseMessage response)
{
if (response != null)
@@ -188,7 +187,7 @@ namespace Duplicati.Library.Backend.MicrosoftGraph
HttpResponseMessage originalResponse,
int fragment,
int fragmentCount,
- MicrosoftGraphException fragmentException)
+ Exception fragmentException)
: base(
string.Format("Error uploading fragment {0} of {1} for {2}", fragment, fragmentCount, originalResponse?.RequestMessage?.RequestUri?.ToString() ?? "<unknown>"),
originalResponse,
@@ -196,14 +195,13 @@ namespace Duplicati.Library.Backend.MicrosoftGraph
{
this.Fragment = fragment;
this.FragmentCount = fragmentCount;
- this.InnerException = fragmentException;
}
public UploadSessionException(
HttpWebResponse originalResponse,
int fragment,
int fragmentCount,
- MicrosoftGraphException fragmentException)
+ Exception fragmentException)
: base(
string.Format("Error uploading fragment {0} of {1} for {2}", fragment, fragmentCount, originalResponse?.ResponseUri?.ToString() ?? "<unknown>"),
originalResponse,
@@ -211,12 +209,9 @@ namespace Duplicati.Library.Backend.MicrosoftGraph
{
this.Fragment = fragment;
this.FragmentCount = fragmentCount;
- this.InnerException = fragmentException;
}
public int Fragment { get; private set; }
public int FragmentCount { get; private set; }
-
- public new MicrosoftGraphException InnerException { get; private set; }
}
}
diff --git a/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs b/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs
index c8e533fc7..6bf0bf02f 100644
--- a/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs
+++ b/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs
@@ -1,6 +1,7 @@
using Duplicati.Library.Backend.MicrosoftGraph;
using Duplicati.Library.Common.IO;
using Duplicati.Library.Interface;
+using Duplicati.Library.Logging;
using Duplicati.Library.Utility;
using Newtonsoft.Json;
using System;
@@ -30,6 +31,8 @@ namespace Duplicati.Library.Backend
/// </remarks>
public abstract class MicrosoftGraphBackend : IBackend, IStreamingBackend, IQuotaEnabledBackend, IRenameEnabledBackend
{
+ private static readonly string LOGTAG = Log.LogTagFromType<MicrosoftGraphBackend>();
+
private const string SERVICES_AGREEMENT = "https://www.microsoft.com/en-us/servicesagreement";
private const string PRIVACY_STATEMENT = "https://privacy.microsoft.com/en-us/privacystatement";
@@ -94,6 +97,11 @@ namespace Duplicati.Library.Backend
private readonly int fragmentRetryCount;
private readonly int fragmentRetryDelay; // In milliseconds
+ // Whenever a response includes a Retry-After header, we'll update this timestamp with when we can next
+ // send a request. And before sending any requests, we'll make sure to wait until at least this time.
+ // Since this may be read and written by multiple threads, it is stored as a long and updated using Interlocked.Exchange.
+ private readonly RetryAfterHelper m_retryAfter;
+
private string[] dnsNames = null;
private readonly Lazy<string> rootPathFromURL;
@@ -157,6 +165,8 @@ namespace Duplicati.Library.Backend
this.m_oAuthHelper.AutoAuthHeader = true;
}
+ this.m_retryAfter = new RetryAfterHelper();
+
// Extract out the path to the backup root folder from the given URI. Since this can be an expensive operation,
// we will cache the value using a lazy initializer.
this.rootPathFromURL = new Lazy<string>(() => MicrosoftGraphBackend.NormalizeSlashes(this.GetRootPathFromUrl(url)));
@@ -209,6 +219,7 @@ namespace Duplicati.Library.Backend
UploadSession uploadSession = this.Post<UploadSession>(string.Format("{0}/root:{1}{2}:/createUploadSession", this.DrivePrefix, this.RootPath, NormalizeSlashes(dnsTestFile)), MicrosoftGraphBackend.dummyUploadSession);
// Canceling an upload session is done by sending a DELETE to the upload URL
+ m_retryAfter.WaitForRetryAfter();
if (this.m_client != null)
{
using (var request = new HttpRequestMessage(HttpMethod.Delete, uploadSession.UploadUrl))
@@ -354,6 +365,7 @@ namespace Duplicati.Library.Backend
{
try
{
+ m_retryAfter.WaitForRetryAfter();
string getUrl = string.Format("{0}/root:{1}{2}:/content", this.DrivePrefix, this.RootPath, NormalizeSlashes(remotename));
if (this.m_client != null)
{
@@ -411,6 +423,7 @@ namespace Duplicati.Library.Backend
// PUT only supports up to 4 MB file uploads. There's a separate process for larger files.
if (stream.Length < PUT_MAX_SIZE)
{
+ await m_retryAfter.WaitForRetryAfterAsync(cancelToken).ConfigureAwait(false);
string putUrl = string.Format("{0}/root:{1}{2}:/content", this.DrivePrefix, this.RootPath, NormalizeSlashes(remotename));
if (this.m_client != null)
{
@@ -443,6 +456,7 @@ namespace Duplicati.Library.Backend
string createSessionUrl = string.Format("{0}/root:{1}{2}:/createUploadSession", this.DrivePrefix, this.RootPath, NormalizeSlashes(remotename));
if (this.m_client != null)
{
+ await m_retryAfter.WaitForRetryAfterAsync(cancelToken).ConfigureAwait(false);
using (HttpRequestMessage createSessionRequest = new HttpRequestMessage(HttpMethod.Post, createSessionUrl))
using (HttpResponseMessage createSessionResponse = await this.m_client.SendAsync(createSessionRequest, cancelToken).ConfigureAwait(false))
{
@@ -451,27 +465,220 @@ namespace Duplicati.Library.Backend
// If the stream's total length is less than the chosen fragment size, then we should make the buffer only as large as the stream.
int bufferSize = (int)Math.Min(this.fragmentSize, stream.Length);
- byte[] fragmentBuffer = new byte[bufferSize];
- int read = 0;
- for (int offset = 0; offset < stream.Length; offset += read)
+ long read = 0;
+ for (long offset = 0; offset < stream.Length; offset += read)
{
- read = await stream.ReadAsync(fragmentBuffer, 0, bufferSize, cancelToken).ConfigureAwait(false);
+ // If the stream isn't long enough for this to be a full buffer, then limit the length
+ long currentBufferSize = bufferSize;
+ if (stream.Length < offset + bufferSize)
+ {
+ currentBufferSize = stream.Length - offset;
+ }
- int retryCount = this.fragmentRetryCount;
- for (int attempt = 0; attempt < retryCount; attempt++)
+ using (Stream subStream = new ReadLimitLengthStream(stream, offset, currentBufferSize))
{
- using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Put, uploadSession.UploadUrl))
- using (ByteArrayContent fragmentContent = new ByteArrayContent(fragmentBuffer, 0, read))
+ read = subStream.Length;
+
+ int fragmentCount = (int)Math.Ceiling((double)stream.Length / bufferSize);
+ int retryCount = this.fragmentRetryCount;
+ for (int attempt = 0; attempt < retryCount; attempt++)
{
- fragmentContent.Headers.ContentLength = read;
- fragmentContent.Headers.ContentRange = new ContentRangeHeaderValue(offset, offset + read - 1, stream.Length);
+ await m_retryAfter.WaitForRetryAfterAsync(cancelToken).ConfigureAwait(false);
+
+ int fragmentNumber = (int)(offset / bufferSize);
+ Log.WriteVerboseMessage(
+ LOGTAG,
+ "MicrosoftGraphFragmentUpload",
+ "Uploading fragment {0}/{1} of remote file {2}",
+ fragmentNumber,
+ fragmentCount,
+ remotename);
+
+ using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Put, uploadSession.UploadUrl))
+ using (StreamContent fragmentContent = new StreamContent(subStream))
+ {
+ fragmentContent.Headers.ContentLength = read;
+ fragmentContent.Headers.ContentRange = new ContentRangeHeaderValue(offset, offset + read - 1, stream.Length);
+
+ request.Content = fragmentContent;
+
+ try
+ {
+ // The uploaded put requests will error if they are authenticated
+ using (HttpResponseMessage response = await this.m_client.SendAsync(request, false, cancelToken).ConfigureAwait(false))
+ {
+ // Note: On the last request, the json result includes the default properties of the item that was uploaded
+ this.ParseResponse<UploadSession>(response);
+ }
+ }
+ catch (MicrosoftGraphException ex)
+ {
+ if (subStream.Position != 0)
+ {
+ if (subStream.CanSeek)
+ {
+ // Make sure to reset the substream to its start in case this is a retry
+ subStream.Seek(0, SeekOrigin.Begin);
+ }
+ else
+ {
+ // If any of the source stream was read and the substream can't be seeked back to the beginning,
+ // then the internal retry mechanism can't be used and the caller will have to retry this whole file.
+ // Should we consider signaling to the graph API that we're abandoning this upload session?
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
+ }
+
+ // Error handling based on recommendations here:
+ // https://docs.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession#best-practices
+ if (attempt >= retryCount - 1)
+ {
+ // We've used up all our retry attempts
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
+ else if ((int)ex.StatusCode >= 500 && (int)ex.StatusCode < 600)
+ {
+ // If a 5xx error code is hit, we should use an exponential backoff strategy before retrying.
+ // To make things simpler, we just use the current attempt number as the exponential factor.
+ // If there was a Retry-After header, we'll wait for that right before sending the next request as well.
+ TimeSpan delay = TimeSpan.FromMilliseconds((int)Math.Pow(2, attempt) * this.fragmentRetryDelay);
+
+ Log.WriteRetryMessage(
+ LOGTAG,
+ "MicrosoftGraphFragmentRetryIn",
+ ex,
+ "Uploading fragment {0}/{1} of remote file {2} failed and will be retried in {3}",
+ fragmentNumber,
+ fragmentCount,
+ remotename,
+ delay);
+
+ await Task.Delay(delay).ConfigureAwait(false);
+ continue;
+ }
+ else if (ex.StatusCode == HttpStatusCode.NotFound)
+ {
+ // 404 is a special case indicating the upload session no longer exists, so the fragment shouldn't be retried.
+ // Instead we'll let the caller re-attempt the whole file.
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
+ else if ((int)ex.StatusCode >= 400 && (int)ex.StatusCode < 500)
+ {
+ // If a 4xx error code is hit, we should retry without the exponential backoff attempt.
+ Log.WriteRetryMessage(
+ LOGTAG,
+ "MicrosoftGraphFragmentRetry",
+ ex,
+ "Uploading fragment {0}/{1} of remote file {2} failed and will be retried",
+ fragmentNumber,
+ fragmentCount,
+ remotename);
+
+ continue;
+ }
+ else
+ {
+ // Other errors should be rethrown
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
+ }
+ catch (Exception ex)
+ {
+ // Any other exceptions should also cause the upload session to be canceled.
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
+
+ // If we successfully sent this piece, then we can break out of the retry loop
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ await m_retryAfter.WaitForRetryAfterAsync(cancelToken).ConfigureAwait(false);
+ using (HttpWebResponse createSessionResponse = await this.m_oAuthHelper.GetResponseWithoutExceptionAsync(createSessionUrl, cancelToken, MicrosoftGraphBackend.dummyUploadSession, HttpMethod.Post.ToString()).ConfigureAwait(false))
+ {
+ UploadSession uploadSession = this.ParseResponse<UploadSession>(createSessionResponse);
+
+ // If the stream's total length is less than the chosen fragment size, then we should make the buffer only as large as the stream.
+ int bufferSize = (int)Math.Min(this.fragmentSize, stream.Length);
+
+ long read = 0;
+ for (long offset = 0; offset < stream.Length; offset += read)
+ {
+ // If the stream isn't long enough for this to be a full buffer, then limit the length
+ long currentBufferSize = bufferSize;
+ if (stream.Length < offset + bufferSize)
+ {
+ currentBufferSize = stream.Length - offset;
+ }
- request.Content = fragmentContent;
+ using (Stream subStream = new ReadLimitLengthStream(stream, offset, currentBufferSize))
+ {
+ read = subStream.Length;
+
+ int fragmentCount = (int)Math.Ceiling((double)stream.Length / bufferSize);
+ int retryCount = this.fragmentRetryCount;
+ for (int attempt = 0; attempt < retryCount; attempt++)
+ {
+ await m_retryAfter.WaitForRetryAfterAsync(cancelToken).ConfigureAwait(false);
+
+ int fragmentNumber = (int)(offset / bufferSize);
+ Log.WriteVerboseMessage(
+ LOGTAG,
+ "MicrosoftGraphFragmentUpload",
+ "Uploading fragment {0}/{1} of remote file {2}",
+ fragmentNumber,
+ fragmentCount,
+ remotename);
+
+ // The uploaded put requests will error if they are authenticated
+ var request = new AsyncHttpRequest(this.m_oAuthHelper.CreateRequest(uploadSession.UploadUrl, HttpMethod.Put.ToString(), true));
+ request.Request.ContentLength = read;
+ request.Request.Headers.Set(HttpRequestHeader.ContentRange, new ContentRangeHeaderValue(offset, offset + read - 1, stream.Length).ToString());
+ request.Request.ContentType = "application/octet-stream";
+
+ using (var requestStream = request.GetRequestStream(read))
+ {
+ await Utility.Utility.CopyStreamAsync(subStream, requestStream, cancelToken).ConfigureAwait(false);
+ }
try
{
- // The uploaded put requests will error if they are authenticated
- using (HttpResponseMessage response = await this.m_client.SendAsync(request, false, cancelToken).ConfigureAwait(false))
+ using (var response = await this.m_oAuthHelper.GetResponseWithoutExceptionAsync(request, cancelToken).ConfigureAwait(false))
{
// Note: On the last request, the json result includes the default properties of the item that was uploaded
this.ParseResponse<UploadSession>(response);
@@ -479,37 +686,110 @@ namespace Duplicati.Library.Backend
}
catch (MicrosoftGraphException ex)
{
+ if (subStream.Position != 0)
+ {
+ if (subStream.CanSeek)
+ {
+ // Make sure to reset the substream to its start in case this is a retry
+ subStream.Seek(0, SeekOrigin.Begin);
+ }
+ else
+ {
+ // If any of the source stream was read and the substream can't be seeked back to the beginning,
+ // then the internal retry mechanism can't be used and the caller will have to retry this whole file.
+ // Should we consider signaling to the graph API that we're abandoning this upload session?
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
+ }
+
// Error handling based on recommendations here:
// https://docs.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession#best-practices
if (attempt >= retryCount - 1)
{
// We've used up all our retry attempts
- throw new UploadSessionException(createSessionResponse, offset / bufferSize, (int)Math.Ceiling((double)stream.Length / bufferSize), ex);
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
}
else if ((int)ex.StatusCode >= 500 && (int)ex.StatusCode < 600)
{
// If a 5xx error code is hit, we should use an exponential backoff strategy before retrying.
// To make things simpler, we just use the current attempt number as the exponential factor.
- Thread.Sleep((int)Math.Pow(2, attempt) * this.fragmentRetryDelay); // If this is changed to use tasks, this should be changed to Task.Await()
+ // If there was a Retry-After header, we'll wait for that right before sending the next request as well.
+ TimeSpan delay = TimeSpan.FromMilliseconds((int)Math.Pow(2, attempt) * this.fragmentRetryDelay);
+
+ Log.WriteRetryMessage(
+ LOGTAG,
+ "MicrosoftGraphFragmentRetryIn",
+ ex,
+ "Uploading fragment {0}/{1} of remote file {2} failed and will be retried in {3}",
+ fragmentNumber,
+ fragmentCount,
+ remotename,
+ delay);
+
+ await Task.Delay(delay).ConfigureAwait(false);
continue;
}
else if (ex.StatusCode == HttpStatusCode.NotFound)
{
// 404 is a special case indicating the upload session no longer exists, so the fragment shouldn't be retried.
// Instead we'll let the caller re-attempt the whole file.
- throw new UploadSessionException(createSessionResponse, offset / bufferSize, (int)Math.Ceiling((double)stream.Length / bufferSize), ex);
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
}
else if ((int)ex.StatusCode >= 400 && (int)ex.StatusCode < 500)
{
- // If a 4xx error code is hit, we should retry without the backoff attempt
+ // If a 4xx error code is hit, we should retry without the exponential backoff attempt.
+ Log.WriteRetryMessage(
+ LOGTAG,
+ "MicrosoftGraphFragmentRetry",
+ ex,
+ "Uploading fragment {0}/{1} of remote file {2} failed and will be retried",
+ fragmentNumber,
+ fragmentCount,
+ remotename);
+
continue;
}
else
{
// Other errors should be rethrown
- throw new UploadSessionException(createSessionResponse, offset / bufferSize, (int)Math.Ceiling((double)stream.Length / bufferSize), ex);
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
}
}
+ catch (Exception ex)
+ {
+ // Any other exceptions should also cause the upload session to be canceled.
+ await this.ThrowUploadSessionException(
+ uploadSession,
+ createSessionResponse,
+ fragmentNumber,
+ fragmentCount,
+ ex,
+ cancelToken).ConfigureAwait(false);
+ }
// If we successfully sent this piece, then we can break out of the retry loop
break;
@@ -518,83 +798,6 @@ namespace Duplicati.Library.Backend
}
}
}
- else
- {
- using (HttpWebResponse createSessionResponse = await this.m_oAuthHelper.GetResponseWithoutExceptionAsync(createSessionUrl, cancelToken, MicrosoftGraphBackend.dummyUploadSession, HttpMethod.Post.ToString()))
- {
- UploadSession uploadSession = this.ParseResponse<UploadSession>(createSessionResponse);
-
- // If the stream's total length is less than the chosen fragment size, then we should make the buffer only as large as the stream.
- int bufferSize = (int)Math.Min(this.fragmentSize, stream.Length);
-
- byte[] fragmentBuffer = new byte[bufferSize];
- int read = 0;
- for (int offset = 0; offset < stream.Length; offset += read)
- {
- read = await stream.ReadAsync(fragmentBuffer, 0, bufferSize, cancelToken).ConfigureAwait(false);
-
- int retryCount = this.fragmentRetryCount;
- for (int attempt = 0; attempt < retryCount; attempt++)
- {
- // The uploaded put requests will error if they are authenticated
- var request = new AsyncHttpRequest(this.m_oAuthHelper.CreateRequest(uploadSession.UploadUrl, HttpMethod.Put.ToString(), true));
- request.Request.ContentLength = read;
- request.Request.Headers.Set(HttpRequestHeader.ContentRange, new ContentRangeHeaderValue(offset, offset + read - 1, stream.Length).ToString());
- request.Request.ContentType = "application/octet-stream";
-
- using (var requestStream = request.GetRequestStream(read))
- {
- await requestStream.WriteAsync(fragmentBuffer, 0, read, cancelToken);
- }
-
- try
- {
- using (var response = await this.m_oAuthHelper.GetResponseWithoutExceptionAsync(request, cancelToken))
- {
- // Note: On the last request, the json result includes the default properties of the item that was uploaded
- this.ParseResponse<UploadSession>(response);
- }
- }
- catch (MicrosoftGraphException ex)
- {
- // Error handling based on recommendations here:
- // https://docs.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession#best-practices
- if (attempt >= retryCount - 1)
- {
- // We've used up all our retry attempts
- throw new UploadSessionException(createSessionResponse, offset / bufferSize, (int)Math.Ceiling((double)stream.Length / bufferSize), ex);
- }
- else if ((int)ex.StatusCode >= 500 && (int)ex.StatusCode < 600)
- {
- // If a 5xx error code is hit, we should use an exponential backoff strategy before retrying.
- // To make things simpler, we just use the current attempt number as the exponential factor.
- Thread.Sleep((int)Math.Pow(2, attempt) * this.fragmentRetryDelay); // If this is changed to use tasks, this should be changed to Task.Await()
- continue;
- }
- else if (ex.StatusCode == HttpStatusCode.NotFound)
- {
- // 404 is a special case indicating the upload session no longer exists, so the fragment shouldn't be retried.
- // Instead we'll let the caller re-attempt the whole file.
- throw new UploadSessionException(createSessionResponse, offset / bufferSize, (int)Math.Ceiling((double)stream.Length / bufferSize), ex);
- }
- else if ((int)ex.StatusCode >= 400 && (int)ex.StatusCode < 500)
- {
- // If a 4xx error code is hit, we should retry without the backoff attempt
- continue;
- }
- else
- {
- // Other errors should be rethrown
- throw new UploadSessionException(createSessionResponse, offset / bufferSize, (int)Math.Ceiling((double)stream.Length / bufferSize), ex);
- }
- }
-
- // If we successfully sent this piece, then we can break out of the retry loop
- break;
- }
- }
- }
- }
}
}
@@ -602,6 +805,7 @@ namespace Duplicati.Library.Backend
{
try
{
+ m_retryAfter.WaitForRetryAfter();
string deleteUrl = string.Format("{0}/root:{1}{2}", this.DrivePrefix, this.RootPath, NormalizeSlashes(remotename));
if (this.m_client != null)
{
@@ -681,6 +885,7 @@ namespace Duplicati.Library.Backend
}
else
{
+ m_retryAfter.WaitForRetryAfter();
using (var response = this.m_oAuthHelper.GetResponseWithoutException(url, null, method.ToString()))
{
return this.ParseResponse<T>(response);
@@ -700,6 +905,7 @@ namespace Duplicati.Library.Backend
}
else
{
+ m_retryAfter.WaitForRetryAfter();
using (var response = this.m_oAuthHelper.GetResponseWithoutException(url, body, method.ToString()))
{
return this.ParseResponse<T>(response);
@@ -709,6 +915,7 @@ namespace Duplicati.Library.Backend
private T SendRequest<T>(HttpRequestMessage request)
{
+ m_retryAfter.WaitForRetryAfter();
using (var response = this.m_client.SendAsync(request).Await())
{
return this.ParseResponse<T>(response);
@@ -742,6 +949,8 @@ namespace Duplicati.Library.Backend
private void CheckResponse(HttpResponseMessage response)
{
+ m_retryAfter.SetRetryAfter(response.Headers.RetryAfter);
+
if (!response.IsSuccessStatusCode)
{
if (response.StatusCode == HttpStatusCode.NotFound)
@@ -759,6 +968,12 @@ namespace Duplicati.Library.Backend
private void CheckResponse(HttpWebResponse response)
{
+ string retryAfterHeader = response.Headers[HttpResponseHeader.RetryAfter];
+ if (retryAfterHeader != null && RetryConditionHeaderValue.TryParse(retryAfterHeader, out RetryConditionHeaderValue retryAfter))
+ {
+ m_retryAfter.SetRetryAfter(retryAfter);
+ }
+
if (!((int)response.StatusCode >= 200 && (int)response.StatusCode < 300))
{
if (response.StatusCode == HttpStatusCode.NotFound)
@@ -796,6 +1011,46 @@ namespace Duplicati.Library.Backend
}
}
+ private async Task ThrowUploadSessionException(
+ UploadSession uploadSession,
+ HttpResponseMessage createSessionResponse,
+ int fragment,
+ int fragmentCount,
+ Exception ex,
+ CancellationToken cancelToken)
+ {
+ // Before throwing the exception, cancel the upload session
+ // The uploaded delete request will error if it is authenticated
+ using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Delete, uploadSession.UploadUrl))
+ using (HttpResponseMessage response = await this.m_client.SendAsync(request, false, cancelToken).ConfigureAwait(false))
+ {
+ // Note that the response body should always be empty in this case.
+ this.ParseResponse<UploadSession>(response);
+ }
+
+ throw new UploadSessionException(createSessionResponse, fragment, fragmentCount, ex);
+ }
+
+ private async Task ThrowUploadSessionException(
+ UploadSession uploadSession,
+ HttpWebResponse createSessionResponse,
+ int fragment,
+ int fragmentCount,
+ Exception ex,
+ CancellationToken cancelToken)
+ {
+ // Before throwing the exception, cancel the upload session
+ // The uploaded delete request will error if it is authenticated
+ var request = new AsyncHttpRequest(this.m_oAuthHelper.CreateRequest(uploadSession.UploadUrl, HttpMethod.Delete.ToString(), true));
+ using (var response = await this.m_oAuthHelper.GetResponseWithoutExceptionAsync(request, cancelToken).ConfigureAwait(false))
+ {
+ // Note that the response body should always be empty in this case.
+ this.ParseResponse<UploadSession>(response);
+ }
+
+ throw new UploadSessionException(createSessionResponse, fragment, fragmentCount, ex);
+ }
+
/// <summary>
/// Normalizes the slashes in a url fragment. For example:
/// "" => ""
diff --git a/Duplicati/Library/Utility/ReadLimitLengthStream.cs b/Duplicati/Library/Utility/ReadLimitLengthStream.cs
index 7aab7f2d8..7333be9e8 100644
--- a/Duplicati/Library/Utility/ReadLimitLengthStream.cs
+++ b/Duplicati/Library/Utility/ReadLimitLengthStream.cs
@@ -4,24 +4,73 @@ using System.Threading;
using System.Threading.Tasks;
namespace Duplicati.Library.Utility
-{
- // StreamReadLimitLengthWrapper() based on code from Matt Smith (Oct 26 '15 at 20:38)
- // https://stackoverflow.com/questions/33354822/how-to-set-length-in-stream-without-truncated
-
+{
+ // StreamReadLimitLengthWrapper() based on code from Matt Smith (Oct 26 '15 at 20:38)
+ // https://stackoverflow.com/questions/33354822/how-to-set-length-in-stream-without-truncated
+
+ /// <summary>
+ /// This class wraps a Stream but only allows reading a limited number of bytes from the underlying stream.
+ /// This can be used to create a stream that exposes only a small window of a source stream.
+ /// Having multiple copies of this wrapper on the same base stream at the same time is not ideal
+ /// (as the position will move around), but creating multiple of these streams one after another
+ /// can be used to create a sliding window over a base stream.
+ /// </summary>
public class ReadLimitLengthStream : Stream
{
- readonly Stream m_innerStream;
- readonly long m_endPosition;
+ private readonly Stream m_innerStream;
+ private readonly long m_start;
+ private readonly long m_length;
- public ReadLimitLengthStream(Stream innerStream, long size)
+ public ReadLimitLengthStream(Stream innerStream, long length)
+ : this(innerStream, 0, length)
+ {
+ }
+
+ public ReadLimitLengthStream(Stream innerStream, long start, long length)
{
- if (size < 0)
+ if (innerStream == null)
+ {
+ throw new ArgumentNullException(nameof(innerStream));
+ }
+ if (start < 0 || start > innerStream.Length)
+ {
+ throw new ArgumentOutOfRangeException(nameof(start));
+ }
+ if (length < 0 || innerStream.Length < start + length)
{
- throw new ArgumentOutOfRangeException(nameof(size));
+ throw new ArgumentOutOfRangeException(nameof(length));
}
- m_innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
- m_endPosition = m_innerStream.Position + size;
+ m_innerStream = innerStream;
+ m_start = start;
+ m_length = length;
+
+ if (m_start != 0)
+ {
+ // Make sure the stream is starting at the expected point
+ if (m_innerStream.Position != m_start)
+ {
+ if (m_innerStream.CanSeek)
+ {
+ m_innerStream.Seek(m_start, SeekOrigin.Begin);
+ }
+ else if (m_innerStream.Position < m_start)
+ {
+ // If the underlying stream doesn't support seeking,
+ // this will instead simulate the seek by reading until
+ // the underlying stream is at the start position.
+ long bytesToRead = m_start - m_innerStream.Position;
+ for (long i = 0; i < bytesToRead; i++)
+ {
+ m_innerStream.ReadByte();
+ }
+ }
+ else
+ {
+ throw new ArgumentException("Cannot seek stream to starting position", nameof(innerStream));
+ }
+ }
+ }
}
public override bool CanRead => m_innerStream.CanRead;
@@ -35,12 +84,12 @@ namespace Duplicati.Library.Utility
// NOOP
}
- public override long Length => m_endPosition;
+ public override long Length => m_length;
public override long Position
{
- get => m_innerStream.Position;
- set => m_innerStream.Position = value;
+ get => ClampInnerStreamPosition(m_innerStream.Position) - m_start;
+ set => m_innerStream.Position = ClampInnerStreamPosition(value + m_start);
}
public override int Read(byte[] buffer, int offset, int count)
@@ -50,8 +99,23 @@ namespace Duplicati.Library.Utility
}
public override long Seek(long offset, SeekOrigin origin)
- {
- return m_innerStream.Seek(offset, origin);
+ {
+ switch (origin)
+ {
+ case SeekOrigin.Begin:
+ m_innerStream.Seek(m_start + offset, SeekOrigin.Begin);
+ break;
+ case SeekOrigin.Current:
+ m_innerStream.Seek(offset, SeekOrigin.Current);
+ break;
+ case SeekOrigin.End:
+ m_innerStream.Seek(m_start + m_length + offset, SeekOrigin.Begin);
+ break;
+ default:
+ throw new ArgumentException("Unknown SeekOrigin", nameof(origin));
+ }
+
+ return this.Position;
}
public override void SetLength(long value)
@@ -94,11 +158,6 @@ namespace Duplicati.Library.Utility
// Since this wrapper does not own the underlying stream, we do not want it to close the underlying stream
}
- public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
- {
- return m_innerStream.CopyToAsync(destination, bufferSize, cancellationToken);
- }
-
public override int EndRead(IAsyncResult asyncResult)
{
return m_innerStream.EndRead(asyncResult);
@@ -106,7 +165,7 @@ namespace Duplicati.Library.Utility
public override Task FlushAsync(CancellationToken cancellationToken)
{
- return m_innerStream.FlushAsync(cancellationToken);
+ return Task.CompletedTask;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -138,8 +197,36 @@ namespace Duplicati.Library.Utility
private int GetAllowedCount(int count)
{
- long pos = m_innerStream.Position;
- long maxCount = m_endPosition - pos;
+ if (m_innerStream.Position < m_start)
+ {
+ // The stream is positioned before the starting point.
+ // This state is exposed externally as having Position==0,
+ // so if possible, seek to the start and then read from there.
+ if (CanSeek)
+ {
+ this.Position = 0;
+ }
+ else
+ {
+ // If the underlying stream doesn't support seeking though,
+ // this will instead simulate the seek by reading until the underlying stream is at the start position.
+ long bytesToRead = m_start - m_innerStream.Position;
+ for (long i = 0; i < bytesToRead; i++)
+ {
+ m_innerStream.ReadByte();
+ }
+ }
+ }
+
+ long pos = this.Position;
+ if (pos >= m_length)
+ {
+ // The stream is at or past the end of the limit.
+ // Nothing should be read.
+ return 0;
+ }
+
+ long maxCount = m_length - pos;
if (count > maxCount)
{
return (int)maxCount;
@@ -147,5 +234,23 @@ namespace Duplicati.Library.Utility
return count;
}
+
+ private long ClampInnerStreamPosition(long position)
+ {
+ // Note that this allows this stream to have positions in the range 0 to m_length.
+ // Reading at m_length should return nothing.
+ if (position < m_start)
+ {
+ return m_start;
+ }
+
+ long maxPosition = m_start + m_length;
+ if (position > maxPosition)
+ {
+ return maxPosition;
+ }
+
+ return position;
+ }
}
}
diff --git a/Duplicati/Library/Utility/Utility.cs b/Duplicati/Library/Utility/Utility.cs
index e8280231f..ae6cc8a75 100644
--- a/Duplicati/Library/Utility/Utility.cs
+++ b/Duplicati/Library/Utility/Utility.cs
@@ -18,13 +18,13 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using System.Text;
using System.Text.RegularExpressions;
using Duplicati.Library.Common.IO;
using Duplicati.Library.Common;
using System.Globalization;
-using System.Threading;
namespace Duplicati.Library.Utility
{
@@ -34,7 +34,7 @@ namespace Duplicati.Library.Utility
/// Size of buffers for copying stream
/// </summary>
public static long DEFAULT_BUFFER_SIZE => SystemContextSettings.Buffersize;
-
+
/// <summary>
/// A cache of the FileSystemCaseSensitive property, which is computed upon the first access.
/// </summary>
@@ -91,16 +91,16 @@ namespace Duplicati.Library.Utility
}
buf = buf ?? new byte[DEFAULT_BUFFER_SIZE];
-
+
int read;
long total = 0;
while ((read = source.Read(buf, 0, buf.Length)) != 0)
{
target.Write(buf, 0, read);
total += read;
- }
-
- return total;
+ }
+
+ return total;
}
/// <summary>
@@ -139,7 +139,7 @@ namespace Duplicati.Library.Utility
await target.WriteAsync(buf, 0, read, cancelToken).ConfigureAwait(false);
total += read;
}
-
+
return total;
}
diff --git a/Duplicati/UnitTest/UtilityTests.cs b/Duplicati/UnitTest/UtilityTests.cs
index 66823f16d..f45ad3d32 100644
--- a/Duplicati/UnitTest/UtilityTests.cs
+++ b/Duplicati/UnitTest/UtilityTests.cs
@@ -127,6 +127,175 @@ namespace Duplicati.UnitTest
[Test]
[Category("Utility")]
+ public static void ReadLimitLengthStream()
+ {
+ Action<IEnumerable<int>, IEnumerable<byte>> assertArray = (expected, actual) =>
+ {
+ List<int> expectedList = expected.ToList();
+ List<byte> actualList = actual.ToList();
+ Assert.AreEqual(expectedList.Count, actualList.Count, "Count");
+ for (int i = 0; i < expectedList.Count; i++)
+ {
+ Assert.AreEqual((byte)expectedList[i], actualList[i], "Index {0}", i);
+ }
+ };
+
+ byte[] readBuffer = new byte[10];
+ Action<Stream, int, int> testSeek = (stream, position, readByte) =>
+ {
+ stream.Position = position;
+ Assert.AreEqual(position, stream.Position);
+ Assert.AreEqual(readByte, stream.ReadByte());
+
+ Assert.AreEqual(position, stream.Seek(position, SeekOrigin.Begin));
+ Assert.AreEqual(position, stream.Position);
+ Array.Clear(readBuffer, 0, 10);
+ Assert.AreEqual(readByte >= 0 ? 1 : 0, stream.Read(readBuffer, 1, 1), "Read count");
+ if (readByte >= 0)
+ {
+ // Make sure nothing was read before or after the offset byte given as well.
+ Assert.AreEqual(readBuffer[0], 0);
+ Assert.AreEqual(readBuffer[1], readByte);
+ Assert.AreEqual(readBuffer[2], 0);
+ }
+ else
+ {
+ // Make sure nothing was read
+ Assert.AreEqual(readBuffer[0], 0);
+ Assert.AreEqual(readBuffer[1], 0);
+ Assert.AreEqual(readBuffer[2], 0);
+ }
+
+ Assert.AreEqual(position, stream.Seek(position - stream.Length, SeekOrigin.End));
+ Assert.AreEqual(position, stream.Position);
+ Assert.AreEqual(readByte, stream.ReadByte());
+ };
+
+ // Base stream is a sequence from 0..9
+ using (MemoryStream baseStream = new MemoryStream(Enumerable.Range(0, 10).Select(i => (byte)i).ToArray()))
+ {
+ using (ReadLimitLengthStream stream = new ReadLimitLengthStream(baseStream, 5))
+ {
+ Assert.AreEqual(5, stream.Length);
+
+ // Test reading past array bounds
+ Assert.AreEqual(5, stream.Read(readBuffer, 0, 10), "Read count");
+ assertArray(Enumerable.Range(0, 5), readBuffer.Take(5));
+
+ // Set the position directly and read a shorter range
+ stream.Position = 2;
+ Assert.AreEqual(2, stream.ReadAsync(readBuffer, 0, 2).Await(), "Read count");
+ assertArray(Enumerable.Range(2, 2), readBuffer.Take(2));
+ }
+
+ // Make sure the stream will be seeked if the start does not match the current position.
+ baseStream.Position = 0;
+ using (ReadLimitLengthStream stream = new ReadLimitLengthStream(baseStream, 2, 4))
+ {
+ // Make sure the position is updated when the stream is created
+ Assert.AreEqual(0, stream.Position);
+
+ // Test basic read
+ Assert.AreEqual(4, stream.Read(readBuffer, 0, 4), "Read count");
+ assertArray(Enumerable.Range(2, 4), readBuffer.Take(4));
+
+ // Test CopyTo
+ using (MemoryStream destination = new MemoryStream())
+ {
+ stream.Position = 0;
+ stream.CopyTo(destination);
+ assertArray(Enumerable.Range(2, 4), destination.ToArray());
+ }
+
+ // Test CopyToAsync
+ using (MemoryStream destination = new MemoryStream())
+ {
+ stream.Position = 0;
+ stream.CopyToAsync(destination).Await();
+ assertArray(Enumerable.Range(2, 4), destination.ToArray());
+ }
+
+ // Test seeking
+ testSeek(stream, 0, 2);
+ testSeek(stream, 1, 3);
+ testSeek(stream, 2, 4);
+ testSeek(stream, 3, 5);
+ testSeek(stream, 4, -1);
+
+ // Test seeking from current
+ stream.Position = 0;
+ Assert.AreEqual(2, stream.Seek(2, SeekOrigin.Current));
+ Assert.AreEqual(2, stream.Position);
+
+ // Test clamping of position
+ stream.Position = -2;
+ Assert.AreEqual(0, stream.Position);
+ Assert.AreEqual(2, baseStream.Position);
+
+ stream.Position = 9;
+ Assert.AreEqual(4, stream.Position);
+ Assert.AreEqual(6, baseStream.Position);
+
+ // Make sure changing the base stream still shows clamped positions
+ baseStream.Position = 0;
+ Assert.AreEqual(0, stream.Position);
+
+ baseStream.Position = 4;
+ Assert.AreEqual(2, stream.Position);
+
+ baseStream.Position = 10;
+ Assert.AreEqual(4, stream.Position);
+
+ // Reading when the baseStream is positioned before the start of the limit
+ // should still be possible, and should seek to return the correct values.
+ baseStream.Position = 0;
+ Assert.AreEqual(0, stream.Position);
+ Assert.AreEqual(2, stream.ReadByte());
+ }
+
+ // Check a stream with length 0
+ baseStream.Position = 0;
+ using (ReadLimitLengthStream stream = new ReadLimitLengthStream(baseStream, 2, 0))
+ {
+ // Make sure the position is updated when the stream is created
+ Assert.AreEqual(0, stream.Position);
+
+ // Test basic read
+ Assert.AreEqual(0, stream.Read(readBuffer, 0, 4), "Read count");
+
+ // Test seeking
+ testSeek(stream, 0, -1);
+ testSeek(stream, 0, -1);
+
+ // Test seeking from current
+ stream.Position = 0;
+ Assert.AreEqual(0, stream.Seek(2, SeekOrigin.Current));
+ Assert.AreEqual(0, stream.Position);
+
+ // Test clamping of position
+ stream.Position = -2;
+ Assert.AreEqual(0, stream.Position);
+ Assert.AreEqual(2, baseStream.Position);
+
+ stream.Position = 9;
+ Assert.AreEqual(0, stream.Position);
+ Assert.AreEqual(2, baseStream.Position);
+
+ // Make sure changing the base stream still shows clamped positions
+ baseStream.Position = 0;
+ Assert.AreEqual(0, stream.Position);
+
+ baseStream.Position = 4;
+ Assert.AreEqual(0, stream.Position);
+
+ baseStream.Position = 10;
+ Assert.AreEqual(0, stream.Position);
+ }
+ }
+ }
+
+ [Test]
+ [Category("Utility")]
public static void ForceStreamRead()
{
byte[] source = { 0x10, 0x20, 0x30, 0x40, 0x50 };