Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/matomo-org/matomo.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Steur <thomas.steur@googlemail.com>2014-03-25 08:06:54 +0400
committerThomas Steur <thomas.steur@googlemail.com>2014-03-25 08:06:54 +0400
commit5fcbc094bffe8f93a44c10bea514e47ccf5330c2 (patch)
tree5b3f92f130eb04bde40f6b215b5d5171b46181fa
parent92256d5b2752b85430dfe70a30dabc2afa03022e (diff)
refs #4903 started to work on the possibility to run multiple archivers in parallel for faster archiving. There were multiple issues, for instance there were arrays of siteIds read and written in Options but options do cache all values in a class property so an update of an option does not get updated on another running archiver. Also all sites were reprocessed because of the time_before_today_archive_considered_outdated setting if the last archiving by another archivier was 10 seconds or longer ago. To prevent this only maintaining a list of to be processed siteids in db / filesystem helps so far
-rw-r--r--core/ArchiveProcessor/FixedSiteIds.php48
-rw-r--r--core/ArchiveProcessor/SharedSiteIds.php136
-rw-r--r--core/CliMulti/Process.php3
-rw-r--r--core/CronArchive.php95
-rw-r--r--core/Option.php12
-rw-r--r--plugins/CoreAdminHome/API.php2
6 files changed, 272 insertions, 24 deletions
diff --git a/core/ArchiveProcessor/FixedSiteIds.php b/core/ArchiveProcessor/FixedSiteIds.php
new file mode 100644
index 0000000000..a9d44d9aec
--- /dev/null
+++ b/core/ArchiveProcessor/FixedSiteIds.php
@@ -0,0 +1,48 @@
+<?php
+/**
+ * Piwik - Open source web analytics
+ *
+ * @link http://piwik.org
+ * @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
+ *
+ */
+namespace Piwik\ArchiveProcessor;
+
+use Piwik\CronArchive;
+use Exception;
+use Piwik\Option;
+use Piwik\CliMulti\Process;
+
+class FixedSiteIds
+{
+ private $siteIds = array();
+ private $index = -1;
+
+ public function __construct($websiteIds)
+ {
+ $this->siteIds = $websiteIds;
+ }
+
+ public function getNumSites()
+ {
+ return count($this->siteIds);
+ }
+
+ public function getNumProcessedWebsites()
+ {
+ return $this->index + 1;
+ }
+
+ public function getNextSiteId()
+ {
+ $this->index++;
+
+ if (!empty($this->siteIds[$this->index])) {
+ return $this->siteIds[$this->index];
+ }
+
+ return null;
+ }
+
+}
+
diff --git a/core/ArchiveProcessor/SharedSiteIds.php b/core/ArchiveProcessor/SharedSiteIds.php
new file mode 100644
index 0000000000..0ae998389d
--- /dev/null
+++ b/core/ArchiveProcessor/SharedSiteIds.php
@@ -0,0 +1,136 @@
+<?php
+/**
+ * Piwik - Open source web analytics
+ *
+ * @link http://piwik.org
+ * @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
+ *
+ */
+namespace Piwik\ArchiveProcessor;
+
+use Exception;
+use Piwik\Option;
+use Piwik\CliMulti\Process;
+
+class SharedSiteIds
+{
+ private $siteIds = array();
+ private $currentSiteId;
+
+ public function __construct($websiteIds)
+ {
+ $self = $this;
+ $this->siteIds = $this->runExclusive(function () use ($self, $websiteIds) {
+ $existingWebsiteIds = $self->getAllSiteIdsToArchive();
+
+ if (!empty($existingWebsiteIds)) {
+ return $existingWebsiteIds;
+ }
+
+ $self->setSiteIdsToArchive($websiteIds);
+
+ return $websiteIds;
+ });
+ }
+
+ public function getNumSites()
+ {
+ return count($this->siteIds);
+ }
+
+ public function getNumProcessedWebsites()
+ {
+ if (empty($this->currentSiteId)) {
+ return 0;
+ }
+
+ $index = array_search($this->currentSiteId, $this->siteIds);
+
+ if (false === $index) {
+ return 0;
+ }
+
+ return $index + 1;
+ }
+
+ public function setSiteIdsToArchive($siteIds)
+ {
+ if (!empty($siteIds)) {
+ Option::set('SiteIdsToArchive', implode(',', $siteIds));
+ } else {
+ Option::delete('SiteIdsToArchive');
+ }
+ }
+
+ public function getAllSiteIdsToArchive()
+ {
+ Option::clearCachedOption('SiteIdsToArchive');
+ $siteIdsToArchive = Option::get('SiteIdsToArchive');
+
+ if (empty($siteIdsToArchive)) {
+ return array();
+ }
+
+ return explode(',', trim($siteIdsToArchive));
+ }
+
+ /**
+ * If there are multiple archiver running on the same node it makes sure only one of them performs an action and it
+ * will wait until another one has finished. Any closure you pass here should be very fast as other processes wait
+ * for this closure to finish otherwise. Currently only used for making multiple archivers at the same time work.
+ * If a closure takes more than 5 seconds we assume it is dead and simply continue.
+ *
+ * @param \Closure $closure
+ * @return mixed
+ * @throws \Exception
+ */
+ private function runExclusive($closure)
+ {
+ $process = new Process('archive.lock');
+ while ($process->isRunning() && $process->getSecondsSinceCreation() < 5) {
+ // wait max 5 seconds, such an operation should not take longer
+ usleep(25);
+ }
+
+ $process->startProcess();
+
+ try {
+ $result = $closure();
+ } catch (Exception $e) {
+ $process->finishProcess();
+ throw $e;
+ }
+
+ $process->finishProcess();
+
+ return $result;
+ }
+
+ public function getNextSiteId()
+ {
+ $self = $this;
+
+ $this->currentSiteId = $this->runExclusive(function () use ($self) {
+
+ $siteIds = $self->getAllSiteIdsToArchive();
+
+ if (empty($siteIds)) {
+ return null;
+ }
+
+ $nextSiteId = array_shift($siteIds);
+ $self->setSiteIdsToArchive($siteIds);
+
+ return $nextSiteId;
+ });
+
+ return $this->currentSiteId;
+ }
+
+ public static function isSupported()
+ {
+ return Process::isSupported();
+ }
+
+}
+
diff --git a/core/CliMulti/Process.php b/core/CliMulti/Process.php
index fe30c6d0e0..fd9b9c4087 100644
--- a/core/CliMulti/Process.php
+++ b/core/CliMulti/Process.php
@@ -146,9 +146,10 @@ class Process
return false;
}
- if(self::shellExecFunctionIsDisabled()) {
+ if (self::shellExecFunctionIsDisabled()) {
return false;
}
+
if (static::commandExists('ps') && self::returnsSuccessCode('ps') && self::commandExists('awk')) {
return true;
}
diff --git a/core/CronArchive.php b/core/CronArchive.php
index 9ce21c1d56..50f8c4aebb 100644
--- a/core/CronArchive.php
+++ b/core/CronArchive.php
@@ -8,7 +8,9 @@
*/
namespace Piwik;
+use Piwik\ArchiveProcessor\FixedSiteIds;
use Piwik\ArchiveProcessor\Rules;
+use Piwik\ArchiveProcessor\SharedSiteIds;
use Piwik\Plugins\SitesManager\API as APISitesManager;
use Piwik\Plugins\CoreAdminHome\API as APICoreAdminHome;
use Exception;
@@ -105,6 +107,10 @@ Notes:
private $idSitesInvalidatedOldReports = array();
private $shouldArchiveSpecifiedSites = array();
private $shouldSkipSpecifiedSites = array();
+
+ /**
+ * @var SharedSiteIds|FixedSiteIds
+ */
private $websites = array();
private $allWebsites = array();
private $segments = array();
@@ -153,9 +159,17 @@ Notes:
$this->segments = $this->initSegmentsToArchive();
$this->allWebsites = APISitesManager::getInstance()->getAllSitesId();
+
$websitesIds = $this->initWebsiteIds();
$this->filterWebsiteIds($websitesIds);
- $this->websites = $websitesIds;
+
+ if (!empty($this->shouldArchiveSpecifiedSites)
+ || !empty($this->shouldArchiveAllSites)
+ || !SharedSiteIds::isSupported()) {
+ $this->websites = new FixedSiteIds($websitesIds);
+ } else {
+ $this->websites = new SharedSiteIds($websitesIds);
+ }
if($this->shouldStartProfiler) {
\Piwik\Profiler::setupProfilerXHProf($mainRun = true);
@@ -189,7 +203,14 @@ Notes:
$this->logSection("START");
$this->log("Starting Piwik reports archiving...");
- foreach ($this->websites as $idsite) {
+
+ do {
+ $idsite = $this->websites->getNextSiteId();
+
+ if (null === $idsite) {
+ break;
+ }
+
flush();
$requestsBefore = $this->requests;
if ($idsite <= 0) {
@@ -207,10 +228,15 @@ Notes:
$lastTimestampWebsiteProcessedPeriods = $lastTimestampWebsiteProcessedDay = false;
if ($this->archiveAndRespectTTL) {
+ Option::clearCachedOption($this->lastRunKey($idsite, "periods"));
$lastTimestampWebsiteProcessedPeriods = Option::get($this->lastRunKey($idsite, "periods"));
+
+ Option::clearCachedOption($this->lastRunKey($idsite, "day"));
$lastTimestampWebsiteProcessedDay = Option::get($this->lastRunKey($idsite, "day"));
}
+ $this->updateIdSitesInvalidatedOldReports();
+
// For period other than days, we only re-process the reports at most
// 1) every $processPeriodsMaximumEverySeconds
$secondsSinceLastExecution = time() - $lastTimestampWebsiteProcessedPeriods;
@@ -235,6 +261,7 @@ Notes:
// (*) If there was some old reports invalidated for this website
// we make sure all these old reports are triggered at least once
$websiteIsOldDataInvalidate = in_array($idsite, $this->idSitesInvalidatedOldReports);
+
if ($websiteIsOldDataInvalidate) {
$shouldArchivePeriods = true;
}
@@ -256,12 +283,13 @@ Notes:
$skipDayArchive = $skipDayArchive && !$websiteIsOldDataInvalidate;
// Also reprocess when day has ended since last run
- if($dayHasEndedMustReprocess
+ if ($dayHasEndedMustReprocess
+ && !$this->hasBeenProcessedSinceMidnight($idsite, $lastTimestampWebsiteProcessedDay) // it might have reprocessed for that day by another cron
&& !$existingArchiveIsValid) {
$skipDayArchive = false;
}
- if($websiteIdIsForced) {
+ if ($websiteIdIsForced) {
$skipDayArchive = false;
}
@@ -278,6 +306,13 @@ Notes:
// running do not grab the same website from the queue
Option::set($this->lastRunKey($idsite, "day"), time());
+ // Remove this website from the list of websites to be invalidated
+ // since it's now just about to being re-processed, makes sure another running cron archiving process
+ // does not archive the same idsite
+ if ($websiteIsOldDataInvalidate) {
+ $this->setSiteIsArchived($idsite);
+ }
+
// when some data was purged from this website
// we make sure we query all previous days/weeks/months
$processDaysSince = $lastTimestampWebsiteProcessedDay;
@@ -347,12 +382,6 @@ Notes:
// Record succesful run of this website's periods archiving
if ($success) {
Option::set($this->lastRunKey($idsite, "periods"), time());
-
- // Remove this website from the list of websites to be invalidated
- // since it's now just been re-processing the reports, job is done!
- if ($websiteIsOldDataInvalidate) {
- $this->setSiteIsArchived($idsite);
- }
}
$archivedPeriodsArchivesWebsite++;
@@ -361,12 +390,11 @@ Notes:
Log::info("Archived website id = $idsite, today = $visitsToday visits"
. $debug . ", $requestsWebsite API requests, "
. $timerWebsite->__toString()
- . " [" . ($websitesWithVisitsSinceLastRun + $skipped) . "/"
- . count($this->websites)
+ . " [" . $this->websites->getNumProcessedWebsites() . "/"
+ . $this->websites->getNumSites()
. " done]");
- }
-
+ } while (!empty($idsite));
$this->log("Done archiving!");
@@ -383,11 +411,11 @@ Notes:
$this->log("Total API requests: $this->requests");
//DONE: done/total, visits, wtoday, wperiods, reqs, time, errors[count]: first eg.
- $percent = count($this->websites) == 0
+ $percent = $this->websites->getNumSites() == 0
? ""
- : " " . round($processed * 100 / count($this->websites), 0) . "%";
+ : " " . round($processed * 100 / $this->websites->getNumSites(), 0) . "%";
$this->log("done: " .
- $processed . "/" . count($this->websites) . "" . $percent . ", " .
+ $processed . "/" . $this->websites->getNumSites() . "" . $percent . ", " .
$this->visits . " v, $websitesWithVisitsSinceLastRun wtoday, $archivedPeriodsArchivesWebsite wperiods, " .
$this->requests . " req, " . round($timer->getTimeMs()) . " ms, " .
(empty($this->errors)
@@ -782,7 +810,7 @@ Notes:
}
}
- private function filterWebsiteIds(&$websiteIds)
+ public function filterWebsiteIds(&$websiteIds)
{
// Keep only the websites that do exist
$websiteIds = array_intersect($websiteIds, $this->allWebsites);
@@ -803,7 +831,7 @@ Notes:
* Returns the list of sites to loop over and archive.
* @return array
*/
- private function initWebsiteIds()
+ public function initWebsiteIds()
{
if(count($this->shouldArchiveSpecifiedSites) > 0) {
$this->log("- Will process " . count($this->shouldArchiveSpecifiedSites) . " websites (--force-idsites)");
@@ -817,7 +845,7 @@ Notes:
$websiteIds = array_merge(
$this->addWebsiteIdsWithVisitsSinceLastRun(),
- $this->addWebsiteIdsToReprocess()
+ $this->getWebsiteIdsToInvalidate()
);
$websiteIds = array_merge($websiteIds, $this->addWebsiteIdsInTimezoneWithNewDay($websiteIds));
return array_unique($websiteIds);
@@ -921,6 +949,11 @@ Notes:
return false;
}
+ private function updateIdSitesInvalidatedOldReports()
+ {
+ $this->idSitesInvalidatedOldReports = APICoreAdminHome::getWebsiteIdsToInvalidate();
+ }
+
/**
* Return All websites that had reports in the past which were invalidated recently
* (see API CoreAdminHome.invalidateArchivedReports)
@@ -928,9 +961,9 @@ Notes:
*
* @return array
*/
- private function addWebsiteIdsToReprocess()
+ private function getWebsiteIdsToInvalidate()
{
- $this->idSitesInvalidatedOldReports = APICoreAdminHome::getWebsiteIdsToInvalidate();
+ $this->updateIdSitesInvalidatedOldReports();
if (count($this->idSitesInvalidatedOldReports) > 0) {
$ids = ", IDs: " . implode(", ", $this->idSitesInvalidatedOldReports);
@@ -938,6 +971,7 @@ Notes:
. " other websites because some old data reports have been invalidated (eg. using the Log Import script) "
. $ids);
}
+
return $this->idSitesInvalidatedOldReports;
}
@@ -980,6 +1014,22 @@ Notes:
return $timezoneToProcess;
}
+ private function hasBeenProcessedSinceMidnight($idsite, $lastTimestampWebsiteProcessedDay)
+ {
+ if (false === $lastTimestampWebsiteProcessedDay) {
+ return true;
+ }
+
+ $timezone = Site::getTimezoneFor($idsite);
+
+ $dateInTimezone = Date::factory('now', $timezone);
+ $midnightInTimezone = $dateInTimezone->setTime('00:00:00');
+
+ $lastProcessedDateInTimezone = Date::factory((int) $lastTimestampWebsiteProcessedDay, $timezone);
+
+ return $lastProcessedDateInTimezone->getTimestamp() >= $midnightInTimezone->getTimestamp();
+ }
+
/**
* Returns the list of websites in which timezones today is a new day
* (compared to the last time archiving was executed)
@@ -1092,7 +1142,6 @@ Notes:
$found = array_search($idsite, $websiteIdsInvalidated);
if ($found !== false) {
unset($websiteIdsInvalidated[$found]);
-// $this->log("Websites left to invalidate: " . implode(", ", $websiteIdsInvalidated));
Option::set(APICoreAdminHome::OPTION_INVALIDATED_IDSITES, serialize($websiteIdsInvalidated));
}
}
diff --git a/core/Option.php b/core/Option.php
index 63910cf7e5..4a98138da0 100644
--- a/core/Option.php
+++ b/core/Option.php
@@ -94,6 +94,11 @@ class Option
return self::getInstance()->deleteNameLike($namePattern, $value);
}
+ public static function clearCachedOption($name)
+ {
+ self::getInstance()->clearCachedOptionByName($name);
+ }
+
/**
* Clears the option value cache and forces a reload from the Database.
* Used in unit tests to reset the state of the object between tests.
@@ -144,6 +149,13 @@ class Option
{
}
+ protected function clearCachedOptionByName($name)
+ {
+ if (isset($this->all[$name])) {
+ unset($this->all[$name]);
+ }
+ }
+
protected function getValue($name)
{
$this->autoload();
diff --git a/plugins/CoreAdminHome/API.php b/plugins/CoreAdminHome/API.php
index 7336a50f7b..d3d8f9adf7 100644
--- a/plugins/CoreAdminHome/API.php
+++ b/plugins/CoreAdminHome/API.php
@@ -200,6 +200,8 @@ class API extends \Piwik\Plugin\API
static public function getWebsiteIdsToInvalidate()
{
Piwik::checkUserHasSomeAdminAccess();
+
+ Option::clearCachedOption(self::OPTION_INVALIDATED_IDSITES);
$invalidatedIdSites = Option::get(self::OPTION_INVALIDATED_IDSITES);
if ($invalidatedIdSites
&& ($invalidatedIdSites = unserialize($invalidatedIdSites))