Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/duplicati/duplicati.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwarwickmm <warwickmm@users.noreply.github.com>2019-03-27 04:59:59 +0300
committerGitHub <noreply@github.com>2019-03-27 04:59:59 +0300
commit5b54c265714c51cad9c7250981b709e6979d8978 (patch)
tree47dd4fb3e89cc8c26af93640c93febc079277775
parent762cc8e5557c7cd4ba91558112378f5cfc40724d (diff)
parentd1161f9872059a8033c90fe2f60df8dd5dd55639 (diff)
Merge pull request #3691 from nescafe2002/master
Implement Jottacloud multithreaded downloading support
-rw-r--r--Duplicati/Library/Backend/Jottacloud/Jottacloud.cs192
-rw-r--r--Duplicati/Library/Backend/Jottacloud/Strings.cs4
2 files changed, 164 insertions, 32 deletions
diff --git a/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs b/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs
index c9082b35f..c3dab0358 100644
--- a/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs
+++ b/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs
@@ -19,11 +19,11 @@
#endregion
using Duplicati.Library.Common.IO;
using Duplicati.Library.Interface;
+using Duplicati.Library.Localization.Short;
using System;
using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
+using System.Threading;
+using System.Threading.Tasks;
namespace Duplicati.Library.Backend
{
public class Jottacloud : IBackend, IStreamingBackend
@@ -38,6 +38,8 @@ namespace Duplicati.Library.Backend
private static readonly string[] JFS_BUILTIN_ILLEGAL_MOUNT_POINTS = { "Trash", "Links", "Latest", "Shared" }; // Name of built-in mount points that we can not use. These are treated as mount points in the API, but they are for used for special functionality and we cannot upload files to them!
private const string JFS_DEVICE_OPTION = "jottacloud-device";
private const string JFS_MOUNT_POINT_OPTION = "jottacloud-mountpoint";
+ private const string JFS_THREADS = "jottacloud-threads";
+ private const string JFS_CHUNKSIZE = "jottacloud-chunksize";
private const string JFS_DATE_FORMAT = "yyyy'-'MM'-'dd-'T'HH':'mm':'ssK";
private readonly string m_device;
private readonly bool m_device_builtin;
@@ -49,6 +51,21 @@ namespace Duplicati.Library.Backend
private readonly System.Net.NetworkCredential m_userInfo;
private readonly byte[] m_copybuffer = new byte[Duplicati.Library.Utility.Utility.DEFAULT_BUFFER_SIZE];
+ private static readonly string JFS_DEFAULT_CHUNKSIZE = "5mb";
+ private static readonly string JFS_DEFAULT_THREADS = "4";
+ private readonly int m_threads;
+ private readonly long m_chunksize;
+
+ /// <summary>
+ /// The default maximum number of concurrent connections allowed by a ServicePoint object is 2.
+ /// It should be increased to allow multiple download threads.
+ /// https://stackoverflow.com/a/44637423/1105812
+ /// </summary>
+ static Jottacloud()
+ {
+ System.Net.ServicePointManager.DefaultConnectionLimit = 1000;
+ }
+
public Jottacloud()
{
}
@@ -146,6 +163,24 @@ namespace Duplicati.Library.Backend
m_url_device = JFS_ROOT + "/" + m_userInfo.UserName + "/" + m_device;
m_url = m_url_device + "/" + m_mountPoint + "/" + m_path;
m_url_upload = JFS_ROOT_UPLOAD + "/" + m_userInfo.UserName + "/" + m_device + "/" + m_mountPoint + "/" + m_path; // Different hostname, else identical to m_url.
+
+ m_threads = int.Parse(options.ContainsKey(JFS_THREADS) ? options[JFS_THREADS] : JFS_DEFAULT_THREADS);
+
+ if (!options.TryGetValue(JFS_CHUNKSIZE, out var tmp))
+ {
+ tmp = JFS_DEFAULT_CHUNKSIZE;
+ }
+
+ var chunksize = Utility.Sizeparser.ParseSize(tmp, "mb");
+
+ // Chunk size is bound by BinaryReader.ReadBytes(length) where length is an int.
+
+ if (chunksize > int.MaxValue || chunksize < 1024)
+ {
+ throw new ArgumentOutOfRangeException(nameof(chunksize), string.Format("The chunk size cannot be less than {0}, nor larger than {1}", Utility.Utility.FormatSizeString(1024), Utility.Utility.FormatSizeString(int.MaxValue)));
+ }
+
+ m_chunksize = chunksize;
}
#region IBackend Members
@@ -168,7 +203,6 @@ namespace Duplicati.Library.Backend
// Send request and load XML response.
var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, "", "", false);
var areq = new Utility.AsyncHttpRequest(req);
- using (var resp = (System.Net.HttpWebResponse)areq.GetResponse())
using (var rs = areq.GetResponseStream())
doc.Load(rs);
}
@@ -196,32 +230,77 @@ namespace Duplicati.Library.Backend
}
foreach (System.Xml.XmlNode xFile in xRoot.SelectNodes("files/file[not(@deleted)]"))
{
- string name = xFile.Attributes["name"].Value;
- // Normal files have an "currentRevision", which represent the most recent successfully upload
- // (could also checked that currentRevision/state is "COMPLETED", but should not be necessary).
- // There might also be a newer "latestRevision" coming from an incomplete or corrupt upload,
- // but we ignore that here and use the information about the last valid version.
- System.Xml.XmlNode xRevision = xFile.SelectSingleNode("currentRevision");
- if (xRevision != null)
+ var fe = ToFileEntry(xFile);
+ if (fe != null)
{
- System.Xml.XmlNode xNode = xRevision.SelectSingleNode("state");
- if (xNode.InnerText == "COMPLETED") // Think "currentRevision" always is a complete version, but just to be on the safe side..
- {
- xNode = xRevision.SelectSingleNode("size");
- long size;
- if (xNode == null || !long.TryParse(xNode.InnerText, out size))
- size = -1;
- DateTime lastModified;
- xNode = xRevision.SelectSingleNode("modified"); // There is created, modified and updated time stamps, but not last accessed.
- if (xNode == null || !DateTime.TryParseExact(xNode.InnerText, JFS_DATE_FORMAT, System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.AdjustToUniversal, out lastModified))
- lastModified = new DateTime();
- FileEntry fe = new FileEntry(name, size, lastModified, lastModified);
- yield return fe;
- }
+ yield return fe;
}
}
}
+ public static IFileEntry ToFileEntry(System.Xml.XmlNode xFile)
+ {
+ string name = xFile.Attributes["name"].Value;
+ // Normal files have an "currentRevision", which represent the most recent successfully upload
+ // (could also checked that currentRevision/state is "COMPLETED", but should not be necessary).
+ // There might also be a newer "latestRevision" coming from an incomplete or corrupt upload,
+ // but we ignore that here and use the information about the last valid version.
+ System.Xml.XmlNode xRevision = xFile.SelectSingleNode("currentRevision");
+ if (xRevision != null)
+ {
+ System.Xml.XmlNode xState = xRevision.SelectSingleNode("state");
+ if (xState != null && xState.InnerText == "COMPLETED") // Think "currentRevision" always is a complete version, but just to be on the safe side..
+ {
+ System.Xml.XmlNode xSize = xRevision.SelectSingleNode("size");
+ long size;
+ if (xSize == null || !long.TryParse(xSize.InnerText, out size))
+ size = -1;
+ DateTime lastModified;
+ System.Xml.XmlNode xModified = xRevision.SelectSingleNode("modified"); // There is created, modified and updated time stamps, but not last accessed.
+ if (xModified == null || !DateTime.TryParseExact(xModified.InnerText, JFS_DATE_FORMAT, System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.AdjustToUniversal, out lastModified))
+ lastModified = new DateTime();
+ FileEntry fe = new FileEntry(name, size, lastModified, lastModified);
+ return fe;
+ }
+ }
+ return null;
+ }
+
+ /// <summary>
+ /// Retrieves info for a single file (used to determine file size for chunking)
+ /// </summary>
+ /// <param name="remotename"></param>
+ /// <returns></returns>
+ public IFileEntry Info(string remotename)
+ {
+ var doc = new System.Xml.XmlDocument();
+ try
+ {
+ // Send request and load XML response.
+ var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, remotename, "", false);
+ var areq = new Utility.AsyncHttpRequest(req);
+ using (var rs = areq.GetResponseStream())
+ doc.Load(rs);
+ }
+ catch (System.Net.WebException wex)
+ {
+ if (wex.Response is System.Net.HttpWebResponse && ((System.Net.HttpWebResponse)wex.Response).StatusCode == System.Net.HttpStatusCode.NotFound)
+ throw new FileMissingException(wex);
+ throw;
+ }
+ // Handle XML response. Since we in the constructor demand a folder below the mount point we know the root
+ // element must be a "folder", else it could also have been a "mountPoint" (which has a very similar structure).
+ // We must check for "deleted" attribute, because files/folders which has it is deleted (attribute contains the timestamp of deletion)
+ // so we treat them as non-existant here.
+ var xFile = doc.DocumentElement;
+ if (xFile.Attributes["deleted"] != null)
+ {
+ throw new FileMissingException(string.Format("{0}: {1}", LC.L("The requested file does not exist"), remotename));
+ }
+
+ return ToFileEntry(xFile);
+ }
+
public Task PutAsync(string remotename, string filename, CancellationToken cancelToken)
{
using (System.IO.FileStream fs = System.IO.File.OpenRead(filename))
@@ -251,6 +330,8 @@ namespace Duplicati.Library.Backend
new CommandLineArgument("auth-username", CommandLineArgument.ArgumentType.String, Strings.Jottacloud.DescriptionAuthUsernameShort, Strings.Jottacloud.DescriptionAuthUsernameLong),
new CommandLineArgument(JFS_DEVICE_OPTION, CommandLineArgument.ArgumentType.String, Strings.Jottacloud.DescriptionDeviceShort, Strings.Jottacloud.DescriptionDeviceLong(JFS_MOUNT_POINT_OPTION)),
new CommandLineArgument(JFS_MOUNT_POINT_OPTION, CommandLineArgument.ArgumentType.String, Strings.Jottacloud.DescriptionMountPointShort, Strings.Jottacloud.DescriptionMountPointLong(JFS_DEVICE_OPTION)),
+ new CommandLineArgument(JFS_THREADS, CommandLineArgument.ArgumentType.Integer, Strings.Jottacloud.ThreadsShort, Strings.Jottacloud.ThreadsLong, JFS_DEFAULT_THREADS),
+ new CommandLineArgument(JFS_CHUNKSIZE, CommandLineArgument.ArgumentType.Size, Strings.Jottacloud.ChunksizeShort, Strings.Jottacloud.ChunksizeLong, JFS_DEFAULT_CHUNKSIZE),
});
}
}
@@ -317,25 +398,72 @@ namespace Duplicati.Library.Backend
public bool SupportsStreaming
{
get { return true; }
- }
-
+ }
+
public string[] DNSName
{
get { return new string[] { new Uri(JFS_ROOT).Host, new Uri(JFS_ROOT_UPLOAD).Host }; }
- }
-
+ }
+
public void Get(string remotename, System.IO.Stream stream)
{
+ if (m_threads > 1)
+ {
+ ParallelGet(remotename, stream);
+ return;
+ }
// Downloading from Jottacloud: Will only succeed if the file has a completed revision,
// and if there are multiple versions of the file we will only get the latest completed version,
// ignoring any incomplete or corrupt versions.
var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, remotename, "mode=bin", false);
var areq = new Utility.AsyncHttpRequest(req);
- using (var resp = (System.Net.HttpWebResponse)areq.GetResponse())
using (var s = areq.GetResponseStream())
Utility.Utility.CopyStream(s, stream, true, m_copybuffer);
}
+ /// <summary>
+ /// Fetches the file in chunks (parallelized)
+ /// </summary>
+ public void ParallelGet(string remotename, System.IO.Stream stream)
+ {
+ var size = Info(remotename).Size;
+
+ var chunks = new Queue<Tuple<long, long>>(); // Tuple => Position (from), Position (to)
+
+ long position = 0;
+
+ while (position < size)
+ {
+ var length = Math.Min(m_chunksize, size - position);
+ chunks.Enqueue(new Tuple<long, long>(position, position + length));
+ position += length;
+ }
+
+ var tasks = new Queue<Task<byte[]>>();
+
+ while (tasks.Count > 0 || chunks.Count > 0)
+ {
+ while (chunks.Count > 0 && tasks.Count < m_threads)
+ {
+ var item = chunks.Dequeue();
+ tasks.Enqueue(Task.Run(() =>
+ {
+ var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, remotename, "mode=bin", false);
+ req.AddRange(item.Item1, item.Item2 - 1);
+ var areq = new Utility.AsyncHttpRequest(req);
+ using (var s = areq.GetResponseStream())
+ using (var reader = new System.IO.BinaryReader(s))
+ {
+ var length = item.Item2 - item.Item1;
+ return reader.ReadBytes((int)length);
+ }
+ }));
+ }
+ var buffer = tasks.Dequeue().Result;
+ stream.Write(buffer, 0, buffer.Length);
+ }
+ }
+
public async Task PutAsync(string remotename, System.IO.Stream stream, CancellationToken cancelToken)
{
// Some challenges with uploading to Jottacloud:
@@ -396,9 +524,9 @@ namespace Duplicati.Library.Backend
//req.Headers.Add("JCreated", timeCreated);
//req.Headers.Add("JModified", timeModified);
req.ContentType = "application/octet-stream";
- req.ContentLength = fileSize;
+ req.ContentLength = fileSize;
- // Write post data request
+ // Write post data request
var areq = new Utility.AsyncHttpRequest(req);
using (var rs = areq.GetRequestStream())
await Utility.Utility.CopyStreamAsync(stream, rs, true, cancelToken, m_copybuffer);
diff --git a/Duplicati/Library/Backend/Jottacloud/Strings.cs b/Duplicati/Library/Backend/Jottacloud/Strings.cs
index 871ddb815..08ba011b8 100644
--- a/Duplicati/Library/Backend/Jottacloud/Strings.cs
+++ b/Duplicati/Library/Backend/Jottacloud/Strings.cs
@@ -17,5 +17,9 @@ namespace Duplicati.Library.Backend.Strings {
public static string DescriptionDeviceLong(string mountPointOption) { return LC.L(@"The backup device to use. Will be created if not already exists. You can manage your devices from the backup panel in the Jottacloud web interface. When you specify a custom device you should also specify the mount point to use on this device with the ""{0}"" option.", mountPointOption); }
public static string DescriptionMountPointShort { get { return LC.L(@"Supplies the mount point to use on the server"); } }
public static string DescriptionMountPointLong(string deviceOptionName) { return LC.L(@"The mount point to use on the server. The default is ""Archive"" for using the built-in archive mount point. Set this option to ""Sync"" to use the built-in synchronization mount point instead, or if you have specified a custom device with option ""{0}"" you are free to name the mount point as you like.", deviceOptionName); }
+ public static string ThreadsShort { get { return LC.L(@"Number of threads for restore operations."); } }
+ public static string ThreadsLong { get { return LC.L(@"Number of threads for restore operations. In some cases the download rate is limited to 18.5 Mbps per stream. Use multiple threads to increase throughput."); } }
+ public static string ChunksizeShort { get { return LC.L(@"The chunk size for simultaneous downloading."); } }
+ public static string ChunksizeLong { get { return LC.L(@"The chunk size for simultaneous downloading. These chunks will be held in memory, so keep it as low as possible."); } }
}
}