Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/duplicati/duplicati.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Duplicati/Library/Backend/Jottacloud/Jottacloud.cs')
-rw-r--r--Duplicati/Library/Backend/Jottacloud/Jottacloud.cs192
1 files changed, 160 insertions, 32 deletions
diff --git a/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs b/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs
index c9082b35f..c3dab0358 100644
--- a/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs
+++ b/Duplicati/Library/Backend/Jottacloud/Jottacloud.cs
@@ -19,11 +19,11 @@
#endregion
using Duplicati.Library.Common.IO;
using Duplicati.Library.Interface;
+using Duplicati.Library.Localization.Short;
using System;
using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-
+using System.Threading;
+using System.Threading.Tasks;
namespace Duplicati.Library.Backend
{
public class Jottacloud : IBackend, IStreamingBackend
@@ -38,6 +38,8 @@ namespace Duplicati.Library.Backend
private static readonly string[] JFS_BUILTIN_ILLEGAL_MOUNT_POINTS = { "Trash", "Links", "Latest", "Shared" }; // Name of built-in mount points that we can not use. These are treated as mount points in the API, but they are for used for special functionality and we cannot upload files to them!
private const string JFS_DEVICE_OPTION = "jottacloud-device";
private const string JFS_MOUNT_POINT_OPTION = "jottacloud-mountpoint";
+ private const string JFS_THREADS = "jottacloud-threads";
+ private const string JFS_CHUNKSIZE = "jottacloud-chunksize";
private const string JFS_DATE_FORMAT = "yyyy'-'MM'-'dd-'T'HH':'mm':'ssK";
private readonly string m_device;
private readonly bool m_device_builtin;
@@ -49,6 +51,21 @@ namespace Duplicati.Library.Backend
private readonly System.Net.NetworkCredential m_userInfo;
private readonly byte[] m_copybuffer = new byte[Duplicati.Library.Utility.Utility.DEFAULT_BUFFER_SIZE];
+ private static readonly string JFS_DEFAULT_CHUNKSIZE = "5mb";
+ private static readonly string JFS_DEFAULT_THREADS = "4";
+ private readonly int m_threads;
+ private readonly long m_chunksize;
+
+ /// <summary>
+ /// The default maximum number of concurrent connections allowed by a ServicePoint object is 2.
+ /// It should be increased to allow multiple download threads.
+ /// https://stackoverflow.com/a/44637423/1105812
+ /// </summary>
+ static Jottacloud()
+ {
+ System.Net.ServicePointManager.DefaultConnectionLimit = 1000;
+ }
+
public Jottacloud()
{
}
@@ -146,6 +163,24 @@ namespace Duplicati.Library.Backend
m_url_device = JFS_ROOT + "/" + m_userInfo.UserName + "/" + m_device;
m_url = m_url_device + "/" + m_mountPoint + "/" + m_path;
m_url_upload = JFS_ROOT_UPLOAD + "/" + m_userInfo.UserName + "/" + m_device + "/" + m_mountPoint + "/" + m_path; // Different hostname, else identical to m_url.
+
+ m_threads = int.Parse(options.ContainsKey(JFS_THREADS) ? options[JFS_THREADS] : JFS_DEFAULT_THREADS);
+
+ if (!options.TryGetValue(JFS_CHUNKSIZE, out var tmp))
+ {
+ tmp = JFS_DEFAULT_CHUNKSIZE;
+ }
+
+ var chunksize = Utility.Sizeparser.ParseSize(tmp, "mb");
+
+ // Chunk size is bound by BinaryReader.ReadBytes(length) where length is an int.
+
+ if (chunksize > int.MaxValue || chunksize < 1024)
+ {
+ throw new ArgumentOutOfRangeException(nameof(chunksize), string.Format("The chunk size cannot be less than {0}, nor larger than {1}", Utility.Utility.FormatSizeString(1024), Utility.Utility.FormatSizeString(int.MaxValue)));
+ }
+
+ m_chunksize = chunksize;
}
#region IBackend Members
@@ -168,7 +203,6 @@ namespace Duplicati.Library.Backend
// Send request and load XML response.
var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, "", "", false);
var areq = new Utility.AsyncHttpRequest(req);
- using (var resp = (System.Net.HttpWebResponse)areq.GetResponse())
using (var rs = areq.GetResponseStream())
doc.Load(rs);
}
@@ -196,32 +230,77 @@ namespace Duplicati.Library.Backend
}
foreach (System.Xml.XmlNode xFile in xRoot.SelectNodes("files/file[not(@deleted)]"))
{
- string name = xFile.Attributes["name"].Value;
- // Normal files have an "currentRevision", which represent the most recent successfully upload
- // (could also checked that currentRevision/state is "COMPLETED", but should not be necessary).
- // There might also be a newer "latestRevision" coming from an incomplete or corrupt upload,
- // but we ignore that here and use the information about the last valid version.
- System.Xml.XmlNode xRevision = xFile.SelectSingleNode("currentRevision");
- if (xRevision != null)
+ var fe = ToFileEntry(xFile);
+ if (fe != null)
{
- System.Xml.XmlNode xNode = xRevision.SelectSingleNode("state");
- if (xNode.InnerText == "COMPLETED") // Think "currentRevision" always is a complete version, but just to be on the safe side..
- {
- xNode = xRevision.SelectSingleNode("size");
- long size;
- if (xNode == null || !long.TryParse(xNode.InnerText, out size))
- size = -1;
- DateTime lastModified;
- xNode = xRevision.SelectSingleNode("modified"); // There is created, modified and updated time stamps, but not last accessed.
- if (xNode == null || !DateTime.TryParseExact(xNode.InnerText, JFS_DATE_FORMAT, System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.AdjustToUniversal, out lastModified))
- lastModified = new DateTime();
- FileEntry fe = new FileEntry(name, size, lastModified, lastModified);
- yield return fe;
- }
+ yield return fe;
}
}
}
+ public static IFileEntry ToFileEntry(System.Xml.XmlNode xFile)
+ {
+ string name = xFile.Attributes["name"].Value;
+ // Normal files have an "currentRevision", which represent the most recent successfully upload
+ // (could also checked that currentRevision/state is "COMPLETED", but should not be necessary).
+ // There might also be a newer "latestRevision" coming from an incomplete or corrupt upload,
+ // but we ignore that here and use the information about the last valid version.
+ System.Xml.XmlNode xRevision = xFile.SelectSingleNode("currentRevision");
+ if (xRevision != null)
+ {
+ System.Xml.XmlNode xState = xRevision.SelectSingleNode("state");
+ if (xState != null && xState.InnerText == "COMPLETED") // Think "currentRevision" always is a complete version, but just to be on the safe side..
+ {
+ System.Xml.XmlNode xSize = xRevision.SelectSingleNode("size");
+ long size;
+ if (xSize == null || !long.TryParse(xSize.InnerText, out size))
+ size = -1;
+ DateTime lastModified;
+ System.Xml.XmlNode xModified = xRevision.SelectSingleNode("modified"); // There is created, modified and updated time stamps, but not last accessed.
+ if (xModified == null || !DateTime.TryParseExact(xModified.InnerText, JFS_DATE_FORMAT, System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.AdjustToUniversal, out lastModified))
+ lastModified = new DateTime();
+ FileEntry fe = new FileEntry(name, size, lastModified, lastModified);
+ return fe;
+ }
+ }
+ return null;
+ }
+
+ /// <summary>
+ /// Retrieves info for a single file (used to determine file size for chunking)
+ /// </summary>
+ /// <param name="remotename"></param>
+ /// <returns></returns>
+ public IFileEntry Info(string remotename)
+ {
+ var doc = new System.Xml.XmlDocument();
+ try
+ {
+ // Send request and load XML response.
+ var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, remotename, "", false);
+ var areq = new Utility.AsyncHttpRequest(req);
+ using (var rs = areq.GetResponseStream())
+ doc.Load(rs);
+ }
+ catch (System.Net.WebException wex)
+ {
+ if (wex.Response is System.Net.HttpWebResponse && ((System.Net.HttpWebResponse)wex.Response).StatusCode == System.Net.HttpStatusCode.NotFound)
+ throw new FileMissingException(wex);
+ throw;
+ }
+ // Handle XML response. Since we in the constructor demand a folder below the mount point we know the root
+ // element must be a "folder", else it could also have been a "mountPoint" (which has a very similar structure).
+ // We must check for "deleted" attribute, because files/folders which has it is deleted (attribute contains the timestamp of deletion)
+ // so we treat them as non-existant here.
+ var xFile = doc.DocumentElement;
+ if (xFile.Attributes["deleted"] != null)
+ {
+ throw new FileMissingException(string.Format("{0}: {1}", LC.L("The requested file does not exist"), remotename));
+ }
+
+ return ToFileEntry(xFile);
+ }
+
public Task PutAsync(string remotename, string filename, CancellationToken cancelToken)
{
using (System.IO.FileStream fs = System.IO.File.OpenRead(filename))
@@ -251,6 +330,8 @@ namespace Duplicati.Library.Backend
new CommandLineArgument("auth-username", CommandLineArgument.ArgumentType.String, Strings.Jottacloud.DescriptionAuthUsernameShort, Strings.Jottacloud.DescriptionAuthUsernameLong),
new CommandLineArgument(JFS_DEVICE_OPTION, CommandLineArgument.ArgumentType.String, Strings.Jottacloud.DescriptionDeviceShort, Strings.Jottacloud.DescriptionDeviceLong(JFS_MOUNT_POINT_OPTION)),
new CommandLineArgument(JFS_MOUNT_POINT_OPTION, CommandLineArgument.ArgumentType.String, Strings.Jottacloud.DescriptionMountPointShort, Strings.Jottacloud.DescriptionMountPointLong(JFS_DEVICE_OPTION)),
+ new CommandLineArgument(JFS_THREADS, CommandLineArgument.ArgumentType.Integer, Strings.Jottacloud.ThreadsShort, Strings.Jottacloud.ThreadsLong, JFS_DEFAULT_THREADS),
+ new CommandLineArgument(JFS_CHUNKSIZE, CommandLineArgument.ArgumentType.Size, Strings.Jottacloud.ChunksizeShort, Strings.Jottacloud.ChunksizeLong, JFS_DEFAULT_CHUNKSIZE),
});
}
}
@@ -317,25 +398,72 @@ namespace Duplicati.Library.Backend
public bool SupportsStreaming
{
get { return true; }
- }
-
+ }
+
public string[] DNSName
{
get { return new string[] { new Uri(JFS_ROOT).Host, new Uri(JFS_ROOT_UPLOAD).Host }; }
- }
-
+ }
+
public void Get(string remotename, System.IO.Stream stream)
{
+ if (m_threads > 1)
+ {
+ ParallelGet(remotename, stream);
+ return;
+ }
// Downloading from Jottacloud: Will only succeed if the file has a completed revision,
// and if there are multiple versions of the file we will only get the latest completed version,
// ignoring any incomplete or corrupt versions.
var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, remotename, "mode=bin", false);
var areq = new Utility.AsyncHttpRequest(req);
- using (var resp = (System.Net.HttpWebResponse)areq.GetResponse())
using (var s = areq.GetResponseStream())
Utility.Utility.CopyStream(s, stream, true, m_copybuffer);
}
+ /// <summary>
+ /// Fetches the file in chunks (parallelized)
+ /// </summary>
+ public void ParallelGet(string remotename, System.IO.Stream stream)
+ {
+ var size = Info(remotename).Size;
+
+ var chunks = new Queue<Tuple<long, long>>(); // Tuple => Position (from), Position (to)
+
+ long position = 0;
+
+ while (position < size)
+ {
+ var length = Math.Min(m_chunksize, size - position);
+ chunks.Enqueue(new Tuple<long, long>(position, position + length));
+ position += length;
+ }
+
+ var tasks = new Queue<Task<byte[]>>();
+
+ while (tasks.Count > 0 || chunks.Count > 0)
+ {
+ while (chunks.Count > 0 && tasks.Count < m_threads)
+ {
+ var item = chunks.Dequeue();
+ tasks.Enqueue(Task.Run(() =>
+ {
+ var req = CreateRequest(System.Net.WebRequestMethods.Http.Get, remotename, "mode=bin", false);
+ req.AddRange(item.Item1, item.Item2 - 1);
+ var areq = new Utility.AsyncHttpRequest(req);
+ using (var s = areq.GetResponseStream())
+ using (var reader = new System.IO.BinaryReader(s))
+ {
+ var length = item.Item2 - item.Item1;
+ return reader.ReadBytes((int)length);
+ }
+ }));
+ }
+ var buffer = tasks.Dequeue().Result;
+ stream.Write(buffer, 0, buffer.Length);
+ }
+ }
+
public async Task PutAsync(string remotename, System.IO.Stream stream, CancellationToken cancelToken)
{
// Some challenges with uploading to Jottacloud:
@@ -396,9 +524,9 @@ namespace Duplicati.Library.Backend
//req.Headers.Add("JCreated", timeCreated);
//req.Headers.Add("JModified", timeModified);
req.ContentType = "application/octet-stream";
- req.ContentLength = fileSize;
+ req.ContentLength = fileSize;
- // Write post data request
+ // Write post data request
var areq = new Utility.AsyncHttpRequest(req);
using (var rs = areq.GetRequestStream())
await Utility.Utility.CopyStreamAsync(stream, rs, true, cancelToken, m_copybuffer);