From 84c871c08700f2b43659446fedaa23e3fa616724 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Fri, 12 Oct 2018 14:44:33 +0200 Subject: New discovery algorithm: Parallel PROPFIND --- src/gui/folder.cpp | 3 +++ src/libsync/discovery.cpp | 48 ++++++++++++++++++++++---------------- src/libsync/discovery.h | 3 ++- src/libsync/discoveryphase.cpp | 10 ++++++++ src/libsync/discoveryphase.h | 5 ++++ src/libsync/owncloudpropagator.cpp | 9 ++----- src/libsync/syncoptions.h | 4 ++-- test/testsyncengine.cpp | 2 +- 8 files changed, 53 insertions(+), 31 deletions(-) diff --git a/src/gui/folder.cpp b/src/gui/folder.cpp index cff5aa17d..b942ce719 100644 --- a/src/gui/folder.cpp +++ b/src/gui/folder.cpp @@ -740,6 +740,9 @@ void Folder::setSyncOptions() opt._maxChunkSize = cfgFile.maxChunkSize(); } + int maxParallel = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt(); + opt._parallelNetworkJobs = maxParallel ? maxParallel : _accountState->account()->isHttp2Supported() ? 20 : 6; + // Previously min/max chunk size values didn't exist, so users might // have setups where the chunk size exceeds the new min/max default // values. To cope with this, adjust min/max to always include the diff --git a/src/libsync/discovery.cpp b/src/libsync/discovery.cpp index 66fabbbe7..81e74ee0c 100644 --- a/src/libsync/discovery.cpp +++ b/src/libsync/discovery.cpp @@ -38,7 +38,11 @@ void ProcessDirectoryJob::start() serverJob = new DiscoverySingleDirectoryJob(_discoveryData->_account, _discoveryData->_remoteFolder + _currentFolder._server, this); connect(serverJob, &DiscoverySingleDirectoryJob::etag, this, &ProcessDirectoryJob::etag); + _discoveryData->_currentlyActiveJobs++; + _pendingAsyncJobs++; connect(serverJob, &DiscoverySingleDirectoryJob::finished, this, [this, serverJob](const auto &results) { + _discoveryData->_currentlyActiveJobs--; + _pendingAsyncJobs--; if (results) { _serverEntries = *results; _hasServerEntries = true; @@ -252,8 +256,7 @@ void ProcessDirectoryJob::process() } processFile(std::move(path), localEntry, serverEntry, record); } - - progress(); + QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs); } bool ProcessDirectoryJob::handleExcluded(const QString &path, bool isDirectory, bool isHidden, bool isSymlink) @@ -444,7 +447,7 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo( if (!result) { processFileAnalyzeLocalInfo(item, path, localEntry, serverEntry, dbEntry, _queryServer); } - progress(); + QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs); }); return; } @@ -558,12 +561,12 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo( auto job = new RequestEtagJob(_discoveryData->_account, originalPath, this); connect(job, &RequestEtagJob::finishedWithResult, this, [=](const Result &etag) mutable { _pendingAsyncJobs--; + QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs); if (etag.errorCode() != 404 || // Somehow another item claimed this original path, consider as if it existed _discoveryData->_renamedItems.contains(originalPath)) { // If the file exist or if there is another error, consider it is a new file. postProcessServerNew(); - progress(); return; } @@ -579,7 +582,6 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo( postProcessRename(path); processFileFinalize(item, path, item->isDirectory(), item->_instruction == CSYNC_INSTRUCTION_RENAME ? NormalQuery : ParentDontExist, _queryServer); - progress(); }); job->start(); done = true; // Ideally, if the origin still exist on the server, we should continue searching... but that'd be difficult @@ -930,7 +932,7 @@ void ProcessDirectoryJob::processFileAnalyzeLocalInfo( } processFileFinalize(item, path, item->isDirectory(), NormalQuery, recurseQueryServer); _pendingAsyncJobs--; - progress(); + QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs); }); job->start(); return; @@ -1164,23 +1166,13 @@ void ProcessDirectoryJob::subJobFinished() int count = _runningJobs.removeAll(job); ASSERT(count == 1); job->deleteLater(); - progress(); + QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs); } -void ProcessDirectoryJob::progress() +int ProcessDirectoryJob::progress(int nbJobs) { - int maxRunning = 3; // FIXME - if (_pendingAsyncJobs + _runningJobs.size() > maxRunning) - return; - - if (!_queuedJobs.empty()) { - auto f = _queuedJobs.front(); - _queuedJobs.pop_front(); - _runningJobs.push_back(f); - f->start(); - return; - } - if (_runningJobs.empty() && _pendingAsyncJobs == 0) { + if (_queuedJobs.empty() && _runningJobs.empty() && _pendingAsyncJobs == 0) { + _pendingAsyncJobs = -1; // We're finished, we don't want to emit finished again if (_dirItem) { if (_childModified && _dirItem->_instruction == CSYNC_INSTRUCTION_REMOVE) { // re-create directory that has modified contents @@ -1200,6 +1192,22 @@ void ProcessDirectoryJob::progress() } emit finished(); } + + int started = 0; + foreach (auto *rj, _runningJobs) { + started += rj->progress(nbJobs - started); + if (started >= nbJobs) + return started; + } + + while (started < nbJobs && !_queuedJobs.empty()) { + auto f = _queuedJobs.front(); + _queuedJobs.pop_front(); + _runningJobs.push_back(f); + f->start(); + started++; + } + return started; } void ProcessDirectoryJob::dbError() diff --git a/src/libsync/discovery.h b/src/libsync/discovery.h index 783280f77..c914d5c87 100644 --- a/src/libsync/discovery.h +++ b/src/libsync/discovery.h @@ -46,6 +46,8 @@ public: { } void start(); + /** Start up to nbJobs, return the number of job started */ + int progress(int nbJobs); SyncFileItemPtr _dirItem; @@ -83,7 +85,6 @@ private: bool checkPermissions(const SyncFileItemPtr &item); void processBlacklisted(const PathTuple &, const LocalInfo &, const SyncJournalFileRecord &dbEntry); void subJobFinished(); - void progress(); /** An DB operation failed */ void dbError(); diff --git a/src/libsync/discoveryphase.cpp b/src/libsync/discoveryphase.cpp index a7673fd90..4d977b0ed 100644 --- a/src/libsync/discoveryphase.cpp +++ b/src/libsync/discoveryphase.cpp @@ -148,6 +148,7 @@ QString DiscoveryPhase::adjustRenamedPath(const QString &original) const void DiscoveryPhase::startJob(ProcessDirectoryJob *job) { connect(job, &ProcessDirectoryJob::finished, this, [this, job] { + _currentRootJob = nullptr; if (job->_dirItem) emit itemDiscovered(job->_dirItem); job->deleteLater(); @@ -158,9 +159,18 @@ void DiscoveryPhase::startJob(ProcessDirectoryJob *job) emit finished(); } }); + _currentRootJob = job; job->start(); } +void DiscoveryPhase::scheduleMoreJobs() +{ + auto limit = qMax(1, _syncOptions._parallelNetworkJobs); + if (_currentRootJob && _currentlyActiveJobs < limit) { + _currentRootJob->progress(limit - _currentlyActiveJobs); + } +} + DiscoverySingleDirectoryJob::DiscoverySingleDirectoryJob(const AccountPtr &account, const QString &path, QObject *parent) : QObject(parent) , _subPath(path) diff --git a/src/libsync/discoveryphase.h b/src/libsync/discoveryphase.h index f6ec65ee7..e5afd74b4 100644 --- a/src/libsync/discoveryphase.h +++ b/src/libsync/discoveryphase.h @@ -123,6 +123,9 @@ public: class DiscoveryPhase : public QObject { Q_OBJECT + + ProcessDirectoryJob *_currentRootJob = nullptr; + public: QString _localDir; // absolute path to the local directory. ends with '/' QString _remoteFolder; // remote folder, ends with '/' @@ -133,6 +136,7 @@ public: QStringList _selectiveSyncWhiteList; ExcludedFiles *_excludes; QString _invalidFilenamePattern; // FIXME: maybe move in ExcludedFiles + int _currentlyActiveJobs = 0; bool _ignoreHiddenFiles = false; std::function _shouldDiscoverLocaly; @@ -152,6 +156,7 @@ public: QByteArray _dataFingerprint; + void scheduleMoreJobs(); signals: void fatalError(const QString &errorString); void itemDiscovered(const SyncFileItemPtr &item); diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index 8950c5284..f7e6560dc 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -86,7 +86,7 @@ int OwncloudPropagator::maximumActiveTransferJob() // disable parallelism when there is a network limit. return 1; } - return qMin(3, qCeil(hardMaximumActiveJob() / 2.)); + return qMin(3, qCeil(_syncOptions._parallelNetworkJobs / 2.)); } /* The maximum number of active jobs in parallel */ @@ -94,12 +94,7 @@ int OwncloudPropagator::hardMaximumActiveJob() { if (!_syncOptions._parallelNetworkJobs) return 1; - static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt(); - if (max) - return max; - if (_account->isHttp2Supported()) - return 20; - return 6; // (Qt cannot do more anyway) + return _syncOptions._parallelNetworkJobs; } PropagateItemJob::~PropagateItemJob() diff --git a/src/libsync/syncoptions.h b/src/libsync/syncoptions.h index 680fede87..65ba8d611 100644 --- a/src/libsync/syncoptions.h +++ b/src/libsync/syncoptions.h @@ -61,8 +61,8 @@ struct SyncOptions */ std::chrono::milliseconds _targetChunkUploadDuration = std::chrono::minutes(1); - /** Whether parallel network jobs are allowed. */ - bool _parallelNetworkJobs = true; + /** The maximum number of active jobs in parallel */ + int _parallelNetworkJobs = 6; /** Whether delta-synchronization is enabled */ bool _deltaSyncEnabled = false; diff --git a/test/testsyncengine.cpp b/test/testsyncengine.cpp index 5628befdc..1c4dd7e02 100644 --- a/test/testsyncengine.cpp +++ b/test/testsyncengine.cpp @@ -433,7 +433,7 @@ private slots: // Disable parallel uploads SyncOptions syncOptions; - syncOptions._parallelNetworkJobs = false; + syncOptions._parallelNetworkJobs = 0; fakeFolder.syncEngine().setSyncOptions(syncOptions); // Produce an error based on upload size -- cgit v1.2.3