diff options
author | Thomas Steur <tsteur@users.noreply.github.com> | 2018-04-23 05:01:28 +0300 |
---|---|---|
committer | Matthieu Aubry <mattab@users.noreply.github.com> | 2018-04-23 05:01:28 +0300 |
commit | f0ced619613b32d29d6534388ad6a9a0240f4cc9 (patch) | |
tree | ab427d28b70f5bf3ce7ad4d6ec20c909f1f4be6f | |
parent | 9bed2959276b4b2cb9959ba0a5aab23a1297f488 (diff) |
Do not run the same archive command twice (#12702)
* Do not run the same archive command twice
* simplify
* During core:archive when a day or period archiving fails for a website, abort the website archiving (#12708)
* If one of the period archiving fails, we now cancel the remaining archiving processes
* Increment the skipped counter
* also exit when a day period is already active
* undo last commit
* if day archive fails, then do not proceed archiving periods
* Minor changes
* Archive: mark the daily archive as successful, only after it was really successful (#12720)
+ Simplify & fix some logic
* do not archive a period when a sub period is processing (#12719)
-rw-r--r-- | core/CliMulti.php | 37 | ||||
-rw-r--r-- | core/CronArchive.php | 122 |
2 files changed, 132 insertions, 27 deletions
diff --git a/core/CliMulti.php b/core/CliMulti.php index ef360f0214..3e092d84b4 100644 --- a/core/CliMulti.php +++ b/core/CliMulti.php @@ -148,13 +148,18 @@ class CliMulti $this->outputs[] = $output; } - private function buildCommand($hostname, $query, $outputFile) + private function buildCommand($hostname, $query, $outputFile, $doEsacpeArg = true) { $bin = $this->findPhpBinary(); $superuserCommand = $this->runAsSuperUser ? "--superuser" : ""; + if ($doEsacpeArg) { + $hostname = escapeshellarg($hostname); + $query = escapeshellarg($query); + } + return sprintf('%s %s %s/console climulti:request -q --piwik-domain=%s %s %s > %s 2>&1 &', - $bin, $this->phpCliOptions, PIWIK_INCLUDE_PATH, escapeshellarg($hostname), $superuserCommand, escapeshellarg($query), $outputFile); + $bin, $this->phpCliOptions, PIWIK_INCLUDE_PATH, $hostname, $superuserCommand, $query, $outputFile); } private function getResponse() @@ -266,6 +271,34 @@ class CliMulti return StaticContainer::get('path.tmp') . '/climulti'; } + public function isCommandAlreadyRunning($url) + { + if (!$this->supportsAsync) { + // we cannot detect if web archive is still running + return false; + } + + $query = UrlHelper::getQueryFromUrl($url, array('pid' => 'removeme')); + $hostname = Url::getHost($checkIfTrusted = false); + $commandToCheck = $this->buildCommand($hostname, $query, $output = '', $escape = false); + + $currentlyRunningJobs = `ps aux`; + + $posStart = strpos($commandToCheck, 'console climulti'); + $posPid = strpos($commandToCheck, '&pid='); // the pid is random each time so we need to ignore it. + $shortendCommand = substr($commandToCheck, $posStart, $posPid - $posStart); + // equals eg console climulti:request -q --piwik-domain= --superuser module=API&method=API.get&idSite=1&period=month&date=2018-04-08,2018-04-30&format=php&trigger=archivephp + $shortendCommand = preg_replace("/([&])date=.*?(&|$)/", "", $shortendCommand); + $currentlyRunningJobs = preg_replace("/([&])date=.*?(&|$)/", "", $currentlyRunningJobs); + + if (strpos($currentlyRunningJobs, $shortendCommand) !== false) { + Log::debug($shortendCommand . ' is already running'); + return true; + } + + return false; + } + private function executeAsyncCli($url, Output $output, $cmdId) { $this->processes[] = new Process($cmdId); diff --git a/core/CronArchive.php b/core/CronArchive.php index 1b40c63a01..e36ec81764 100644 --- a/core/CronArchive.php +++ b/core/CronArchive.php @@ -629,21 +629,13 @@ class CronArchive // Skip this day archive if last archive was older than TTL $existingArchiveIsValid = ($elapsedSinceLastArchiving < $this->todayArchiveTimeToLive); - $skipDayArchive = $existingArchiveIsValid; - - // Invalidate old website forces the archiving for this site - $skipDayArchive = $skipDayArchive && !$websiteInvalidatedShouldReprocess; - - // Also reprocess when day has ended since last run - if ($dayHasEndedMustReprocess - // it might have reprocessed for that day by another cron - && !$this->hasBeenProcessedSinceMidnight($idSite, $lastTimestampWebsiteProcessedDay) - && !$existingArchiveIsValid) { - $skipDayArchive = false; - } - - if ($websiteIdIsForced) { - $skipDayArchive = false; + $skipDayArchive = false; + if($existingArchiveIsValid + && !$websiteIdIsForced + && !$websiteInvalidatedShouldReprocess + && !$dayHasEndedMustReprocess + && $this->hasBeenProcessedSinceMidnight($idSite, $lastTimestampWebsiteProcessedDay)) { + $skipDayArchive = true; } if ($skipDayArchive) { @@ -731,6 +723,10 @@ class CronArchive $date = $this->getApiDateParameter($idSite, $period, $lastTimestampWebsiteProcessedPeriods); $periodArchiveWasSuccessful = $this->archiveReportsFor($idSite, $period, $date, $archiveSegments = true, $timer); $success = $periodArchiveWasSuccessful && $success; + if(!$success) { + // if it failed, we abort the current website processing + return $success; + } } if ($this->shouldProcessPeriod('range')) { @@ -794,10 +790,6 @@ class CronArchive $timer = new Timer(); - // Fake that the request is already done, so that other core:archive commands - // running do not grab the same website from the queue - Option::set($this->lastRunKey($idSite, "day"), time()); - // Remove this website from the list of websites to be invalidated // since it's now just about to being re-processed, makes sure another running cron archiving process // does not archive the same idSite @@ -822,6 +814,13 @@ class CronArchive $this->logArchiveWebsite($idSite, "day", $date); + $cliMulti = $this->makeCliMulti(); + if ($cliMulti->isCommandAlreadyRunning($this->makeRequestUrl($url))) { + $this->logger->info("Skipped website id $idSite, such a process is already in progress, " . $timerWebsite->__toString()); + $this->skipped++; + return false; + } + $content = $this->request($url); $daysResponse = @unserialize($content); @@ -829,9 +828,6 @@ class CronArchive || !is_array($daysResponse) || count($daysResponse) == 0 ) { - // cancel the successful run flag - Option::set($this->lastRunKey($idSite, "day"), 0); - // cancel marking the site as reprocessed if ($websiteInvalidatedShouldReprocess) { $store = new SitesToReprocessDistributedList(); @@ -877,9 +873,12 @@ class CronArchive $this->visitsToday += $visitsToday; $this->websitesWithVisitsSinceLastRun++; - $this->archiveReportsFor($idSite, "day", $this->getApiDateParameter($idSite, "day", $processDaysSince), $archiveSegments = true, $timer, $visitsToday, $visitsLastDays); + $dayArchiveWasSuccessful = $this->archiveReportsFor($idSite, "day", $this->getApiDateParameter($idSite, "day", $processDaysSince), $archiveSegments = true, $timer, $visitsToday, $visitsLastDays); - return true; + if($dayArchiveWasSuccessful) { + Option::set($this->lastRunKey($idSite, "day"), time()); + } + return $dayArchiveWasSuccessful; } /** @@ -932,9 +931,26 @@ class CronArchive $urls = array(); + $cliMulti = $this->makeCliMulti(); + $noSegmentUrl = $url; + // already processed above for "day" if ($period != "day") { + + $periodInProgress = $this->isAlreadyArchivingAnyLowerPeriod($idSite, $period); + if ($periodInProgress) { + $this->logger->info("- skipping archiving for period '{period}' because processing the period '{periodcheck}' is already in progress.", array('period' => $period, 'periodcheck' => $periodInProgress)); + $success = false; + return $success; + } + + if ($cliMulti->isCommandAlreadyRunning($url)) { + $this->logArchiveWebsiteAlreadyInProcess($idSite, $period, $date); + $success = false; + return $success; + } + $urls[] = $url; $this->logArchiveWebsite($idSite, $period, $date); } @@ -951,7 +967,6 @@ class CronArchive $this->requests += count($urls); - $cliMulti = $this->makeCliMulti(); $cliMulti->setConcurrentProcessesLimit($this->getConcurrentRequestsPerWebsite()); $response = $cliMulti->request($urls); @@ -961,8 +976,10 @@ class CronArchive if ($noSegmentUrl === $url && $success) { $stats = @unserialize($content); + if (!is_array($stats)) { $this->logError("Error unserializing the following response from $url: " . $content); + $success = false; } if ($period == 'range') { @@ -1708,14 +1725,28 @@ class CronArchive $segments[] = $segment; } + $cliMulti = $this->makeCliMulti(); + $segmentCount = count($segments); $processedSegmentCount = 0; + foreach ($segments as $segment) { $dateParamForSegment = $this->segmentArchivingRequestUrlProvider->getUrlParameterDateString($idSite, $period, $date, $segment); $urlWithSegment = $this->getVisitsRequestUrl($idSite, $period, $dateParamForSegment, $segment); $urlWithSegment = $this->makeRequestUrl($urlWithSegment); + $periodInProgress = $this->isAlreadyArchivingAnyLowerPeriod($idSite, $period); + if ($periodInProgress) { + $this->logger->info("- skipping segment archiving for period '{period}' with segment '{segment}' because processing the period '{periodcheck}' is already in progress.", array('segment' => $segment, 'period' => $period, 'periodcheck' => $periodInProgress)); + continue; + } + + if ($cliMulti->isCommandAlreadyRunning($urlWithSegment)) { + $this->logger->info("- skipping segment archiving for '{segment}' because such a process is already in progress.", array('segment' => $segment)); + continue; + } + $request = new Request($urlWithSegment); $logger = $this->logger; $request->before(function () use ($logger, $segment, $segmentCount, &$processedSegmentCount) { @@ -1734,6 +1765,32 @@ class CronArchive return $urlsWithSegment; } + private function isAlreadyArchivingAnyLowerPeriod($idSite, $period, $segment = false) + { + $periodOrder = array('day', 'week', 'month', 'year'); + $cliMulti = $this->makeCliMulti(); + + $index = array_search($period, $periodOrder); + if ($index > 0) { + // we only need to check for week, month, year if any earlier period is already running + // so when period = month, then we check for day and week + + for ($i = 0; $i < $index; $i++) { + $periodToCheck = $periodOrder[$i]; + + // the date will be ignored in isCommandAlreadyRunning() because it could be any date + $urlCheck = $this->getVisitsRequestUrl($idSite, $periodToCheck, 'last2', $segment); + $urlCheck = $this->makeRequestUrl($urlCheck); + + if ($cliMulti->isCommandAlreadyRunning($urlCheck)) { + return $periodToCheck; + } + } + } + + return false; + } + private function createSitesToArchiveQueue($websitesIds) { // use synchronous, single process queue if --force-idsites is used or sharing site IDs isn't supported @@ -1765,6 +1822,21 @@ class CronArchive $this->logger->info('- pre-processing all visits'); } + /** + * @param $idSite + * @param $period + * @param $date + */ + private function logArchiveWebsiteAlreadyInProcess($idSite, $period, $date) + { + $this->logger->info(sprintf( + "Will not pre-process for website id = %s, period = %s, date = %s because such a process is already in progress.", + $idSite, + $period, + $date + )); + } + private function shouldSkipSegmentArchiving($segment) { if ($this->disableSegmentsArchiving) { |