Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/owncloud/client.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOlivier Goffart <ogoffart@woboq.com>2018-10-12 15:44:33 +0300
committerOlivier Goffart <ogoffart@woboq.com>2018-10-12 15:46:46 +0300
commit84c871c08700f2b43659446fedaa23e3fa616724 (patch)
tree6983207f46dfdc1b3947920b7e7bcb48c3586bd6
parentd62027db4ae8b2b8eb89a327ab7cdaa7ba0d0c55 (diff)
New discovery algorithm: Parallel PROPFIND
-rw-r--r--src/gui/folder.cpp3
-rw-r--r--src/libsync/discovery.cpp48
-rw-r--r--src/libsync/discovery.h3
-rw-r--r--src/libsync/discoveryphase.cpp10
-rw-r--r--src/libsync/discoveryphase.h5
-rw-r--r--src/libsync/owncloudpropagator.cpp9
-rw-r--r--src/libsync/syncoptions.h4
-rw-r--r--test/testsyncengine.cpp2
8 files changed, 53 insertions, 31 deletions
diff --git a/src/gui/folder.cpp b/src/gui/folder.cpp
index cff5aa17d..b942ce719 100644
--- a/src/gui/folder.cpp
+++ b/src/gui/folder.cpp
@@ -740,6 +740,9 @@ void Folder::setSyncOptions()
opt._maxChunkSize = cfgFile.maxChunkSize();
}
+ int maxParallel = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
+ opt._parallelNetworkJobs = maxParallel ? maxParallel : _accountState->account()->isHttp2Supported() ? 20 : 6;
+
// Previously min/max chunk size values didn't exist, so users might
// have setups where the chunk size exceeds the new min/max default
// values. To cope with this, adjust min/max to always include the
diff --git a/src/libsync/discovery.cpp b/src/libsync/discovery.cpp
index 66fabbbe7..81e74ee0c 100644
--- a/src/libsync/discovery.cpp
+++ b/src/libsync/discovery.cpp
@@ -38,7 +38,11 @@ void ProcessDirectoryJob::start()
serverJob = new DiscoverySingleDirectoryJob(_discoveryData->_account,
_discoveryData->_remoteFolder + _currentFolder._server, this);
connect(serverJob, &DiscoverySingleDirectoryJob::etag, this, &ProcessDirectoryJob::etag);
+ _discoveryData->_currentlyActiveJobs++;
+ _pendingAsyncJobs++;
connect(serverJob, &DiscoverySingleDirectoryJob::finished, this, [this, serverJob](const auto &results) {
+ _discoveryData->_currentlyActiveJobs--;
+ _pendingAsyncJobs--;
if (results) {
_serverEntries = *results;
_hasServerEntries = true;
@@ -252,8 +256,7 @@ void ProcessDirectoryJob::process()
}
processFile(std::move(path), localEntry, serverEntry, record);
}
-
- progress();
+ QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
}
bool ProcessDirectoryJob::handleExcluded(const QString &path, bool isDirectory, bool isHidden, bool isSymlink)
@@ -444,7 +447,7 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
if (!result) {
processFileAnalyzeLocalInfo(item, path, localEntry, serverEntry, dbEntry, _queryServer);
}
- progress();
+ QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
});
return;
}
@@ -558,12 +561,12 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
auto job = new RequestEtagJob(_discoveryData->_account, originalPath, this);
connect(job, &RequestEtagJob::finishedWithResult, this, [=](const Result<QString> &etag) mutable {
_pendingAsyncJobs--;
+ QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
if (etag.errorCode() != 404 ||
// Somehow another item claimed this original path, consider as if it existed
_discoveryData->_renamedItems.contains(originalPath)) {
// If the file exist or if there is another error, consider it is a new file.
postProcessServerNew();
- progress();
return;
}
@@ -579,7 +582,6 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
postProcessRename(path);
processFileFinalize(item, path, item->isDirectory(), item->_instruction == CSYNC_INSTRUCTION_RENAME ? NormalQuery : ParentDontExist, _queryServer);
- progress();
});
job->start();
done = true; // Ideally, if the origin still exist on the server, we should continue searching... but that'd be difficult
@@ -930,7 +932,7 @@ void ProcessDirectoryJob::processFileAnalyzeLocalInfo(
}
processFileFinalize(item, path, item->isDirectory(), NormalQuery, recurseQueryServer);
_pendingAsyncJobs--;
- progress();
+ QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
});
job->start();
return;
@@ -1164,23 +1166,13 @@ void ProcessDirectoryJob::subJobFinished()
int count = _runningJobs.removeAll(job);
ASSERT(count == 1);
job->deleteLater();
- progress();
+ QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
}
-void ProcessDirectoryJob::progress()
+int ProcessDirectoryJob::progress(int nbJobs)
{
- int maxRunning = 3; // FIXME
- if (_pendingAsyncJobs + _runningJobs.size() > maxRunning)
- return;
-
- if (!_queuedJobs.empty()) {
- auto f = _queuedJobs.front();
- _queuedJobs.pop_front();
- _runningJobs.push_back(f);
- f->start();
- return;
- }
- if (_runningJobs.empty() && _pendingAsyncJobs == 0) {
+ if (_queuedJobs.empty() && _runningJobs.empty() && _pendingAsyncJobs == 0) {
+ _pendingAsyncJobs = -1; // We're finished, we don't want to emit finished again
if (_dirItem) {
if (_childModified && _dirItem->_instruction == CSYNC_INSTRUCTION_REMOVE) {
// re-create directory that has modified contents
@@ -1200,6 +1192,22 @@ void ProcessDirectoryJob::progress()
}
emit finished();
}
+
+ int started = 0;
+ foreach (auto *rj, _runningJobs) {
+ started += rj->progress(nbJobs - started);
+ if (started >= nbJobs)
+ return started;
+ }
+
+ while (started < nbJobs && !_queuedJobs.empty()) {
+ auto f = _queuedJobs.front();
+ _queuedJobs.pop_front();
+ _runningJobs.push_back(f);
+ f->start();
+ started++;
+ }
+ return started;
}
void ProcessDirectoryJob::dbError()
diff --git a/src/libsync/discovery.h b/src/libsync/discovery.h
index 783280f77..c914d5c87 100644
--- a/src/libsync/discovery.h
+++ b/src/libsync/discovery.h
@@ -46,6 +46,8 @@ public:
{
}
void start();
+ /** Start up to nbJobs, return the number of job started */
+ int progress(int nbJobs);
SyncFileItemPtr _dirItem;
@@ -83,7 +85,6 @@ private:
bool checkPermissions(const SyncFileItemPtr &item);
void processBlacklisted(const PathTuple &, const LocalInfo &, const SyncJournalFileRecord &dbEntry);
void subJobFinished();
- void progress();
/** An DB operation failed */
void dbError();
diff --git a/src/libsync/discoveryphase.cpp b/src/libsync/discoveryphase.cpp
index a7673fd90..4d977b0ed 100644
--- a/src/libsync/discoveryphase.cpp
+++ b/src/libsync/discoveryphase.cpp
@@ -148,6 +148,7 @@ QString DiscoveryPhase::adjustRenamedPath(const QString &original) const
void DiscoveryPhase::startJob(ProcessDirectoryJob *job)
{
connect(job, &ProcessDirectoryJob::finished, this, [this, job] {
+ _currentRootJob = nullptr;
if (job->_dirItem)
emit itemDiscovered(job->_dirItem);
job->deleteLater();
@@ -158,9 +159,18 @@ void DiscoveryPhase::startJob(ProcessDirectoryJob *job)
emit finished();
}
});
+ _currentRootJob = job;
job->start();
}
+void DiscoveryPhase::scheduleMoreJobs()
+{
+ auto limit = qMax(1, _syncOptions._parallelNetworkJobs);
+ if (_currentRootJob && _currentlyActiveJobs < limit) {
+ _currentRootJob->progress(limit - _currentlyActiveJobs);
+ }
+}
+
DiscoverySingleDirectoryJob::DiscoverySingleDirectoryJob(const AccountPtr &account, const QString &path, QObject *parent)
: QObject(parent)
, _subPath(path)
diff --git a/src/libsync/discoveryphase.h b/src/libsync/discoveryphase.h
index f6ec65ee7..e5afd74b4 100644
--- a/src/libsync/discoveryphase.h
+++ b/src/libsync/discoveryphase.h
@@ -123,6 +123,9 @@ public:
class DiscoveryPhase : public QObject
{
Q_OBJECT
+
+ ProcessDirectoryJob *_currentRootJob = nullptr;
+
public:
QString _localDir; // absolute path to the local directory. ends with '/'
QString _remoteFolder; // remote folder, ends with '/'
@@ -133,6 +136,7 @@ public:
QStringList _selectiveSyncWhiteList;
ExcludedFiles *_excludes;
QString _invalidFilenamePattern; // FIXME: maybe move in ExcludedFiles
+ int _currentlyActiveJobs = 0;
bool _ignoreHiddenFiles = false;
std::function<bool(const QString &)> _shouldDiscoverLocaly;
@@ -152,6 +156,7 @@ public:
QByteArray _dataFingerprint;
+ void scheduleMoreJobs();
signals:
void fatalError(const QString &errorString);
void itemDiscovered(const SyncFileItemPtr &item);
diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp
index 8950c5284..f7e6560dc 100644
--- a/src/libsync/owncloudpropagator.cpp
+++ b/src/libsync/owncloudpropagator.cpp
@@ -86,7 +86,7 @@ int OwncloudPropagator::maximumActiveTransferJob()
// disable parallelism when there is a network limit.
return 1;
}
- return qMin(3, qCeil(hardMaximumActiveJob() / 2.));
+ return qMin(3, qCeil(_syncOptions._parallelNetworkJobs / 2.));
}
/* The maximum number of active jobs in parallel */
@@ -94,12 +94,7 @@ int OwncloudPropagator::hardMaximumActiveJob()
{
if (!_syncOptions._parallelNetworkJobs)
return 1;
- static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
- if (max)
- return max;
- if (_account->isHttp2Supported())
- return 20;
- return 6; // (Qt cannot do more anyway)
+ return _syncOptions._parallelNetworkJobs;
}
PropagateItemJob::~PropagateItemJob()
diff --git a/src/libsync/syncoptions.h b/src/libsync/syncoptions.h
index 680fede87..65ba8d611 100644
--- a/src/libsync/syncoptions.h
+++ b/src/libsync/syncoptions.h
@@ -61,8 +61,8 @@ struct SyncOptions
*/
std::chrono::milliseconds _targetChunkUploadDuration = std::chrono::minutes(1);
- /** Whether parallel network jobs are allowed. */
- bool _parallelNetworkJobs = true;
+ /** The maximum number of active jobs in parallel */
+ int _parallelNetworkJobs = 6;
/** Whether delta-synchronization is enabled */
bool _deltaSyncEnabled = false;
diff --git a/test/testsyncengine.cpp b/test/testsyncengine.cpp
index 5628befdc..1c4dd7e02 100644
--- a/test/testsyncengine.cpp
+++ b/test/testsyncengine.cpp
@@ -433,7 +433,7 @@ private slots:
// Disable parallel uploads
SyncOptions syncOptions;
- syncOptions._parallelNetworkJobs = false;
+ syncOptions._parallelNetworkJobs = 0;
fakeFolder.syncEngine().setSyncOptions(syncOptions);
// Produce an error based on upload size