From 13f83a22a110090de76b37b955e6cd7f89ca40fc Mon Sep 17 00:00:00 2001 From: Sean Templeton Date: Sat, 23 Feb 2019 09:19:20 -0600 Subject: Update Mega, OneDrive, and Rclone backends for async Put --- Duplicati/Library/Backend/Mega/MegaBackend.cs | 6 ++--- .../Backend/OneDrive/MicrosoftGraphBackend.cs | 12 ++++------ Duplicati/Library/Backend/Rclone/Rclone.cs | 27 ++++++++++++++-------- 3 files changed, 25 insertions(+), 20 deletions(-) (limited to 'Duplicati/Library') diff --git a/Duplicati/Library/Backend/Mega/MegaBackend.cs b/Duplicati/Library/Backend/Mega/MegaBackend.cs index 52789316c..2eab7059c 100644 --- a/Duplicati/Library/Backend/Mega/MegaBackend.cs +++ b/Duplicati/Library/Backend/Mega/MegaBackend.cs @@ -142,14 +142,14 @@ namespace Duplicati.Library.Backend.Mega #region IStreamingBackend implementation - public Task Put(string remotename, System.IO.Stream stream, CancellationToken cancelToken) + public async Task Put(string remotename, System.IO.Stream stream, CancellationToken cancelToken) { try { if (m_filecache == null) ResetFileCache(); - var el = Client.Upload(stream, remotename, CurrentFolder); + var el = await Client.UploadAsync(stream, remotename, CurrentFolder, null, null, cancelToken); if (m_filecache.ContainsKey(remotename)) Delete(remotename); @@ -161,8 +161,6 @@ namespace Duplicati.Library.Backend.Mega m_filecache = null; throw; } - - return Task.FromResult(true); } public void Get(string remotename, System.IO.Stream stream) diff --git a/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs b/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs index ac37027c1..95cf218d0 100644 --- a/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs +++ b/Duplicati/Library/Backend/OneDrive/MicrosoftGraphBackend.cs @@ -334,14 +334,14 @@ namespace Duplicati.Library.Backend } } - public Task Put(string remotename, Stream stream, CancellationToken cancelToken) + public async Task Put(string remotename, Stream stream, CancellationToken cancelToken) { // PUT only supports up to 4 MB file uploads. There's a separate process for larger files. if (stream.Length < PUT_MAX_SIZE) { StreamContent streamContent = new StreamContent(stream); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); - var response = this.m_client.PutAsync(string.Format("{0}/root:{1}{2}:/content", this.DrivePrefix, this.RootPath, NormalizeSlashes(remotename)), streamContent).Await(); + var response = await m_client.PutAsync(string.Format("{0}/root:{1}{2}:/content", this.DrivePrefix, this.RootPath, NormalizeSlashes(remotename)), streamContent); // Make sure this response is a valid drive item, though we don't actually use it for anything currently. this.ParseResponse(response); @@ -358,7 +358,7 @@ namespace Duplicati.Library.Backend // Indicate that we want to replace any existing content with this new data we're uploading this.PrepareContent(new UploadSession() { Item = new DriveItem() { ConflictBehavior = ConflictBehavior.Replace } }); - HttpResponseMessage createSessionResponse = this.m_client.SendAsync(createSessionRequest).Await(); + HttpResponseMessage createSessionResponse = await m_client.SendAsync(createSessionRequest); UploadSession uploadSession = this.ParseResponse(createSessionResponse); // If the stream's total length is less than the chosen fragment size, then we should make the buffer only as large as the stream. @@ -368,7 +368,7 @@ namespace Duplicati.Library.Backend int read = 0; for (int offset = 0; offset < stream.Length; offset += read) { - read = stream.Read(fragmentBuffer, 0, bufferSize); + read = await stream.ReadAsync(fragmentBuffer, 0, bufferSize, cancelToken); int retryCount = this.fragmentRetryCount; for (int attempt = 0; attempt < retryCount; attempt++) @@ -384,7 +384,7 @@ namespace Duplicati.Library.Backend try { // The uploaded put requests will error if they are authenticated - response = this.m_client.SendAsync(request, false).Await(); + response = await m_client.SendAsync(request, false); // Note: On the last request, the json result includes the default properties of the item that was uploaded this.ParseResponse(response); @@ -428,8 +428,6 @@ namespace Duplicati.Library.Backend } } } - - return Task.FromResult(true); } public void Delete(string remotename) diff --git a/Duplicati/Library/Backend/Rclone/Rclone.cs b/Duplicati/Library/Backend/Rclone/Rclone.cs index be5b0b0b7..df128554b 100644 --- a/Duplicati/Library/Backend/Rclone/Rclone.cs +++ b/Duplicati/Library/Backend/Rclone/Rclone.cs @@ -19,6 +19,7 @@ #endregion using Duplicati.Library.Common.IO; using Duplicati.Library.Interface; +using Duplicati.Library.Utility; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -97,7 +98,7 @@ namespace Duplicati.Library.Backend get { return false; } } - private string RcloneCommandExecuter(String command, String arguments) + private async Task RcloneCommandExecuter(String command, String arguments, CancellationToken cancelToken) { StringBuilder outputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); @@ -162,7 +163,17 @@ namespace Duplicati.Library.Backend process.Start(); process.BeginOutputReadLine(); process.BeginErrorReadLine(); - process.WaitForExit(); + + while(!process.HasExited) + { + await Task.Delay(500).ConfigureAwait(false); + if (cancelToken.IsCancellationRequested) + { + process.Kill(); + process.WaitForExit(); + } + } + process.CancelOutputRead(); process.CancelErrorRead(); @@ -191,7 +202,7 @@ namespace Duplicati.Library.Backend try { - str_result = RcloneCommandExecuter(rclone_executable, String.Format("lsjson {0}:{1}", remote_repo, remote_path)); + str_result = RcloneCommandExecuter(rclone_executable, String.Format("lsjson {0}:{1}", remote_repo, remote_path), CancellationToken.None).Await(); // this will give an error if the executable does not exist. } @@ -229,21 +240,19 @@ namespace Duplicati.Library.Backend { try { - RcloneCommandExecuter(rclone_executable, String.Format("copyto {0}:{1} {2}:{3}/{4}", local_repo, filename, remote_repo, remote_path, remotename)); + return RcloneCommandExecuter(rclone_executable, String.Format("copyto {0}:{1} {2}:{3}/{4}", local_repo, filename, remote_repo, remote_path, remotename), cancelToken); } catch (FolderMissingException ex) { throw new FileMissingException(ex); } - - return Task.FromResult(true); } public void Get(string remotename, string filename) { try { - RcloneCommandExecuter(rclone_executable, String.Format("copyto {2}:{3}/{4} {0}:{1}", local_repo, filename, remote_repo, remote_path, remotename)); + RcloneCommandExecuter(rclone_executable, String.Format("copyto {2}:{3}/{4} {0}:{1}", local_repo, filename, remote_repo, remote_path, remotename), CancellationToken.None).Await(); } catch (FolderMissingException ex) { throw new FileMissingException(ex); @@ -256,7 +265,7 @@ namespace Duplicati.Library.Backend // Will give a "directory not found" error if the file does not exist, need to change that to a missing file exception try { - RcloneCommandExecuter(rclone_executable, String.Format("delete {0}:{1}/{2}", remote_repo, remote_path, remotename)); + RcloneCommandExecuter(rclone_executable, String.Format("delete {0}:{1}/{2}", remote_repo, remote_path, remotename), CancellationToken.None).Await(); } catch (FolderMissingException ex) { throw new FileMissingException(ex); @@ -302,7 +311,7 @@ namespace Duplicati.Library.Backend public void CreateFolder() { - RcloneCommandExecuter(rclone_executable, String.Format("mkdir {0}:{1}", remote_repo, remote_path)); + RcloneCommandExecuter(rclone_executable, String.Format("mkdir {0}:{1}", remote_repo, remote_path), CancellationToken.None).Await(); } #endregion -- cgit v1.2.3