Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/duplicati/duplicati.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Templeton <seantempleton@outlook.com>2019-02-23 05:46:12 +0300
committerSean Templeton <seantempleton@outlook.com>2019-03-04 06:42:58 +0300
commit1029d99e102b01f447fd9b987d646e9b76f6b47d (patch)
tree2abc6233d3fd35a0849e3492b33f64f2879a34fd /Duplicati/Library/Main/Operation/Backup/BackendUploader.cs
parent05c3f4808cbd75a24ce2363a290f22056da47098 (diff)
Simplify BackendUploader and remove unnecessary code
Diffstat (limited to 'Duplicati/Library/Main/Operation/Backup/BackendUploader.cs')
-rw-r--r--Duplicati/Library/Main/Operation/Backup/BackendUploader.cs127
1 files changed, 47 insertions, 80 deletions
diff --git a/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs b/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs
index 74a939071..17f49f017 100644
--- a/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs
+++ b/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs
@@ -14,18 +14,18 @@
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-using System;
using CoCoL;
-using System.Threading.Tasks;
+using Duplicati.Library.Interface;
using Duplicati.Library.Main.Operation.Common;
using Duplicati.Library.Main.Volumes;
-using System.Collections.Generic;
-using static Duplicati.Library.Main.Operation.Common.BackendHandler;
-using Duplicati.Library.Interface;
-using Newtonsoft.Json;
using Duplicati.Library.Utility;
-using System.Linq;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using static Duplicati.Library.Main.Operation.Common.BackendHandler;
namespace Duplicati.Library.Main.Operation.Backup
{
@@ -41,7 +41,6 @@ namespace Duplicati.Library.Main.Operation.Backup
{
m_tcs.TrySetResult(size);
}
-
}
internal class IndexVolumeUploadRequest : IUploadRequest
@@ -88,21 +87,19 @@ namespace Duplicati.Library.Main.Operation.Backup
{
private static readonly string LOGTAG = Logging.Log.LogTagFromType<BackendUploader>();
- private Options m_options;
private IBackend m_backend;
- private BackendHandler m_backendHandler;
- private Common.ITaskReader m_taskreader;
+ private Options m_options;
+ private ITaskReader m_taskreader;
private StatsCollector m_stats;
private DatabaseCommon m_database;
private readonly BackupResults m_results;
private string m_lastThrottleUploadValue;
private string m_lastThrottleDownloadValue;
- public BackendUploader(Common.BackendHandler backendHandler, IBackend backend, Options options, Common.DatabaseCommon database, BackupResults results, Common.ITaskReader taskreader, StatsCollector stats)
+ public BackendUploader(IBackend backend, Options options, DatabaseCommon database, BackupResults results, ITaskReader taskreader, StatsCollector stats)
{
this.m_options = options;
this.m_backend = backend;
- this.m_backendHandler = backendHandler;
this.m_taskreader = taskreader;
this.m_stats = stats;
this.m_database = database;
@@ -118,13 +115,11 @@ namespace Duplicati.Library.Main.Operation.Backup
async self =>
{
- var inProgress = new Queue<KeyValuePair<int, Task>>();
+ var inProgress = new List<Task>();
var max_pending = m_options.AsynchronousUploadLimit == 0 ? long.MaxValue : m_options.AsynchronousUploadLimit;
- var noIndexFiles = m_options.IndexfilePolicy == Options.IndexFileStrategy.None;
- var active = 0;
var lastSize = -1L;
-
- while(!await self.Input.IsRetiredAsync && await m_taskreader.ProgressAsync)
+
+ while (!await self.Input.IsRetiredAsync && await m_taskreader.ProgressAsync)
{
try
{
@@ -132,66 +127,43 @@ namespace Duplicati.Library.Main.Operation.Backup
if (!await m_taskreader.ProgressAsync)
continue;
-
- var task = default(KeyValuePair<int, Task>);
- if (req is VolumeUploadRequest)
+
+ if (req is VolumeUploadRequest volumeUpload)
{
- lastSize = ((VolumeUploadRequest)req).BlockVolume.SourceSize;
- if(((VolumeUploadRequest)req).IndexVolume == null)
- task = new KeyValuePair<int, Task>(1, UploadFileAsync(((VolumeUploadRequest)req).BlockEntry, null));
+ lastSize = volumeUpload.BlockVolume.SourceSize;
+ if (volumeUpload.IndexVolume == null)
+ inProgress.Add(UploadFileAsync(volumeUpload.BlockEntry));
else
- task = new KeyValuePair<int, Task>(2, UploadBlockAndIndexAsync((VolumeUploadRequest)req));
+ inProgress.Add(UploadBlockAndIndexAsync(volumeUpload));
}
- else if (req is FilesetUploadRequest)
- task = new KeyValuePair<int, Task>(1, UploadVolumeWriter(((FilesetUploadRequest)req).Fileset));
- else if (req is IndexVolumeUploadRequest)
- task = new KeyValuePair<int, Task>(1, UploadVolumeWriter(((IndexVolumeUploadRequest)req).IndexVolume));
- else if (req is FlushRequest)
+ else if (req is FilesetUploadRequest filesetUpload)
+ inProgress.Add(UploadVolumeWriter(filesetUpload.Fileset));
+ else if (req is IndexVolumeUploadRequest indexUpload)
+ inProgress.Add(UploadVolumeWriter(indexUpload.IndexVolume));
+ else if (req is FlushRequest flush)
{
try
{
- while(inProgress.Count > 0)
- await inProgress.Dequeue().Value;
- active = 0;
+ await Task.WhenAll(inProgress);
+ inProgress.Clear();
}
finally
{
- ((FlushRequest)req).SetFlushed(lastSize);
+ flush.SetFlushed(lastSize);
}
break;
}
-
- if (task.Value != null)
- {
- inProgress.Enqueue(task);
- active += task.Key;
- }
}
- catch(Exception ex)
+ catch (Exception ex)
{
if (!ex.IsRetiredException())
throw;
}
- while(active >= max_pending)
+ while (inProgress.Count >= max_pending)
{
- var top = inProgress.Dequeue();
-
- // See if we are done
- if (await Task.WhenAny(top.Value, Task.Delay(500)) != top.Value)
- {
- try
- {
- m_stats.SetBlocking(true);
- await top.Value;
- }
- finally
- {
- m_stats.SetBlocking(false);
- }
- }
-
- active -= top.Key;
+ var completedTask = await Task.WhenAny(inProgress);
+ inProgress.Remove(completedTask);
}
}
@@ -200,8 +172,7 @@ namespace Duplicati.Library.Main.Operation.Backup
try
{
m_stats.SetBlocking(true);
- while (inProgress.Count > 0)
- await inProgress.Dequeue().Value;
+ await Task.WhenAll(inProgress);
}
finally
{
@@ -214,10 +185,7 @@ namespace Duplicati.Library.Main.Operation.Backup
{
await UploadFileAsync(upload.BlockEntry).ConfigureAwait(false);
await UploadFileAsync(upload.IndexEntry).ConfigureAwait(false);
-
- // Register that the index file is tracking the block file
- var blockVolumeId = await m_database.GetRemoteVolumeIDAsync(upload.BlockVolume.RemoteFilename).ConfigureAwait(false);
- await m_database.AddIndexBlockLinkAsync(upload.IndexVolume.VolumeID, blockVolumeId).ConfigureAwait(false);
+ await m_database.AddIndexBlockLinkAsync(upload.IndexVolume.VolumeID, upload.BlockVolume.VolumeID).ConfigureAwait(false);
}
private async Task UploadVolumeWriter(VolumeWriterBase volumeWriter)
@@ -230,23 +198,23 @@ namespace Duplicati.Library.Main.Operation.Backup
await UploadFileAsync(fileEntry).ConfigureAwait(false);
}
- private async Task<bool> UploadFileAsync(FileEntryItem item, Func<string, Task<IndexVolumeWriter>> createIndexFile = null)
+ private async Task UploadFileAsync(FileEntryItem item)
{
- return await DoWithRetry(item, async () =>
+ await DoWithRetry(item, async () =>
{
if (item.IsRetry)
await RenameFileAfterErrorAsync(item).ConfigureAwait(false);
- return await DoPut(item, false).ConfigureAwait(false);
+ await DoPut(item).ConfigureAwait(false);
}).ConfigureAwait(false);
}
- private async Task<T> DoWithRetry<T>(FileEntryItem item, Func<Task<T>> method)
+ private async Task DoWithRetry(FileEntryItem item, Func<Task> method)
{
item.IsRetry = false;
Exception lastException = null;
- if(!await m_taskreader.ProgressAsync)
+ if (!await m_taskreader.ProgressAsync)
throw new OperationCanceledException();
for (var i = 0; i < m_options.NumberOfRetries; i++)
@@ -259,7 +227,8 @@ namespace Duplicati.Library.Main.Operation.Backup
try
{
- return await method().ConfigureAwait(false);
+ await method().ConfigureAwait(false);
+ return;
}
catch (Exception ex)
{
@@ -273,7 +242,7 @@ namespace Duplicati.Library.Main.Operation.Backup
await m_stats.SendEventAsync(item.Operation, i < m_options.NumberOfRetries ? BackendEventType.Retrying : BackendEventType.Failed, item.RemoteFilename, item.Size);
bool recovered = false;
- if (ex is Duplicati.Library.Interface.FolderMissingException && m_options.AutocreateFolders)
+ if (m_options.AutocreateFolders && ex is FolderMissingException)
{
try
{
@@ -328,7 +297,7 @@ namespace Duplicati.Library.Main.Operation.Backup
item.RemoteFilename = newname;
}
- private async Task<bool> DoPut(FileEntryItem item, bool updatedHash = false)
+ private async Task DoPut(FileEntryItem item)
{
if (item.TrackedInDb)
await m_database.UpdateRemoteVolumeAsync(item.RemoteFilename, RemoteVolumeState.Uploading, item.Size, item.Hash);
@@ -337,7 +306,7 @@ namespace Duplicati.Library.Main.Operation.Backup
{
Logging.Log.WriteDryrunMessage(LOGTAG, "WouldUploadVolume", "Would upload volume: {0}, size: {1}", item.RemoteFilename, Library.Utility.Utility.FormatSizeString(new FileInfo(item.LocalFilename).Length));
item.DeleteLocalFile();
- return true;
+ return;
}
await m_database.LogRemoteOperationAsync("put", item.RemoteFilename, JsonConvert.SerializeObject(new { Size = item.Size, Hash = item.Hash }));
@@ -345,12 +314,12 @@ namespace Duplicati.Library.Main.Operation.Backup
var begin = DateTime.Now;
- if (m_backend is Library.Interface.IStreamingBackend && !m_options.DisableStreamingTransfers)
+ if (!m_options.DisableStreamingTransfers && m_backend is IStreamingBackend backend)
{
- using (var fs = System.IO.File.OpenRead(item.LocalFilename))
+ using (var fs = File.OpenRead(item.LocalFilename))
using (var ts = new ThrottledStream(fs, m_options.MaxUploadPrSecond, m_options.MaxDownloadPrSecond))
- using (var pgs = new Library.Utility.ProgressReportingStream(ts, pg => HandleProgress(ts, pg)))
- ((Library.Interface.IStreamingBackend)m_backend).Put(item.RemoteFilename, pgs);
+ using (var pgs = new ProgressReportingStream(ts, pg => HandleProgress(ts, pg)))
+ backend.Put(item.RemoteFilename, pgs);
}
else
m_backend.Put(item.RemoteFilename, item.LocalFilename);
@@ -374,8 +343,6 @@ namespace Duplicati.Library.Main.Operation.Backup
item.DeleteLocalFile();
await m_database.CommitTransactionAsync("CommitAfterUpload");
-
- return true;
}
private void HandleProgress(ThrottledStream ts, long pg)