diff options
author | Sean Templeton <seantempleton@users.noreply.github.com> | 2020-10-29 03:03:09 +0300 |
---|---|---|
committer | Sean Templeton <seantempleton@users.noreply.github.com> | 2020-10-30 05:19:15 +0300 |
commit | a9c16fcefa887752fa865eb49954f81598a22cfb (patch) | |
tree | 4aa520b6dfb5b04b3e36707da01bbd65054c3db4 /Duplicati/Library | |
parent | 40416560e605d7b340d57a699b2776ba393275a6 (diff) |
Upload filesets last during a backup
If parallel uploads are enabled it is possible for the fileset to be uploaded before all dblocks are uploaded.
When the fileset is placed in the queue to be uploaded place it in a separate list and only upload it after all dblocks have been uploaded.
Fixes #4341
Diffstat (limited to 'Duplicati/Library')
-rw-r--r-- | Duplicati/Library/Main/Operation/Backup/BackendUploader.cs | 76 |
1 files changed, 50 insertions, 26 deletions
diff --git a/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs b/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs index b7639fa6c..307d60a52 100644 --- a/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs +++ b/Duplicati/Library/Main/Operation/Backup/BackendUploader.cs @@ -39,12 +39,12 @@ namespace Duplicati.Library.Main.Operation.Backup {
public Task<long> LastWriteSizeAsync { get { return m_tcs.Task; } }
private readonly TaskCompletionSource<long> m_tcs = new TaskCompletionSource<long>();
-
+
public void SetFlushed(long size)
{
m_tcs.TrySetResult(size);
}
-
+
public void TrySetCanceled()
{
m_tcs.TrySetCanceled();
@@ -108,7 +108,6 @@ namespace Duplicati.Library.Main.Operation.Backup private readonly StatsCollector m_stats;
private readonly ITaskReader m_taskReader;
-
public BackendUploader(Func<IBackend> backendFactory, Options options, DatabaseCommon database, ITaskReader taskReader, StatsCollector stats)
{
m_backendFactory = backendFactory;
@@ -135,6 +134,7 @@ namespace Duplicati.Library.Main.Operation.Backup var uploadsInProgress = 0;
m_cancelTokenSource = new CancellationTokenSource();
m_progressUpdater.Run(m_cancelTokenSource.Token);
+ var filesetsToUpload = new List<FilesetUploadRequest>();
try
{
@@ -145,12 +145,7 @@ namespace Duplicati.Library.Main.Operation.Backup if (!await m_taskReader.ProgressAsync)
break;
- var worker = workers.FirstOrDefault(w => w.Task.IsCompleted && !w.Task.IsFaulted);
- if (worker == null)
- {
- worker = new Worker(m_backendFactory());
- workers.Add(worker);
- }
+ Worker worker = GetWorker(workers);
if (req is VolumeUploadRequest volumeUpload)
{
@@ -164,8 +159,7 @@ namespace Duplicati.Library.Main.Operation.Backup }
else if (req is FilesetUploadRequest filesetUpload)
{
- worker.Task = Task.Run(() => UploadVolumeWriter(filesetUpload.Fileset, worker, m_cancelTokenSource.Token));
- uploadsInProgress++;
+ filesetsToUpload.Add(filesetUpload);
}
else if (req is IndexVolumeUploadRequest indexUpload)
{
@@ -174,20 +168,10 @@ namespace Duplicati.Library.Main.Operation.Backup }
else if (req is FlushRequest flush)
{
- while (workers.Any())
- {
- Task finishedTask = await Task.WhenAny(workers.Select(w => w.Task)).ConfigureAwait(false);
- if (finishedTask.IsFaulted)
- {
- flush.TrySetCanceled();
- ExceptionDispatchInfo.Capture(finishedTask.Exception).Throw();
- }
-
- Worker finishedWorker = workers.Single(w => w.Task == finishedTask);
- workers.Remove(finishedWorker);
- finishedWorker.Dispose();
- }
-
+ await WaitForWorkers(workers, flush).ConfigureAwait(false);
+ UploadFilesets(workers, filesetsToUpload);
+ await WaitForWorkers(workers, flush).ConfigureAwait(false);
+
flush.SetFlushed(lastSize);
uploadsInProgress = 0;
break;
@@ -240,6 +224,46 @@ namespace Duplicati.Library.Main.Operation.Backup });
}
+ private Worker GetWorker(List<Worker> workers)
+ {
+ var worker = workers.FirstOrDefault(w => w.Task.IsCompleted && !w.Task.IsFaulted);
+ if (worker == null)
+ {
+ worker = new Worker(m_backendFactory());
+ workers.Add(worker);
+ }
+
+ return worker;
+ }
+
+ private async Task WaitForWorkers(List<Worker> workers, FlushRequest flush)
+ {
+ while (workers.Any())
+ {
+ Task finishedTask = await Task.WhenAny(workers.Select(w => w.Task)).ConfigureAwait(false);
+ if (finishedTask.IsFaulted)
+ {
+ flush.TrySetCanceled();
+ ExceptionDispatchInfo.Capture(finishedTask.Exception).Throw();
+ }
+
+ Worker finishedWorker = workers.Single(w => w.Task == finishedTask);
+ workers.Remove(finishedWorker);
+ finishedWorker.Dispose();
+ }
+ }
+
+ private void UploadFilesets(List<Worker> workers, List<FilesetUploadRequest> filesets)
+ {
+ foreach (var fileset in filesets)
+ {
+ Worker worker = GetWorker(workers);
+ worker.Task = Task.Run(() => UploadVolumeWriter(fileset.Fileset, worker, m_cancelTokenSource.Token));
+ }
+
+ filesets.Clear();
+ }
+
private static Exception GetInnerMostException(Exception ex)
{
while (ex.InnerException != null)
@@ -481,4 +505,4 @@ namespace Duplicati.Library.Main.Operation.Backup }
}
}
-}
+}
\ No newline at end of file |