diff options
Diffstat (limited to 'app/workers/gitlab/import/advance_stage.rb')
-rw-r--r-- | app/workers/gitlab/import/advance_stage.rb | 88 |
1 files changed, 75 insertions, 13 deletions
diff --git a/app/workers/gitlab/import/advance_stage.rb b/app/workers/gitlab/import/advance_stage.rb index 5d5abc88388..180c08905ff 100644 --- a/app/workers/gitlab/import/advance_stage.rb +++ b/app/workers/gitlab/import/advance_stage.rb @@ -4,6 +4,9 @@ module Gitlab module Import module AdvanceStage INTERVAL = 30.seconds.to_i + TIMEOUT_DURATION = 2.hours + + AdvanceStageTimeoutError = Class.new(StandardError) # The number of seconds to wait (while blocking the thread) before # continuing to the next waiter. @@ -14,30 +17,35 @@ module Gitlab # remaining jobs. # next_stage - The name of the next stage to start when all jobs have been # completed. - def perform(project_id, waiters, next_stage) - import_state = find_import_state(project_id) + # timeout_timer - Time the sidekiq worker was first initiated with the current job_count + # previous_job_count - Number of jobs remaining on last invocation of this worker + def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) + import_state_jid = find_import_state_jid(project_id) # If the import state is nil the project may have been deleted or the import # may have failed or been canceled. In this case we tidy up the cache data and no # longer attempt to advance to the next stage. - if import_state.nil? + if import_state_jid.nil? clear_waiter_caches(waiters) return end new_waiters = wait_for_jobs(waiters) + new_job_count = new_waiters.values.sum + + # Reset the timeout timer as some jobs finished processing + if new_job_count != previous_job_count + timeout_timer = Time.zone.now + previous_job_count = new_job_count + end if new_waiters.empty? - # We refresh the import JID here so workers importing individual - # resources (e.g. notes) don't have to do this all the time, reducing - # the pressure on Redis. We _only_ do this once all jobs are done so - # we don't get stuck forever if one or more jobs failed to notify the - # JobWaiter. - import_state.refresh_jid_expiration - - next_stage_worker(next_stage).perform_async(project_id) + proceed_to_next_stage(import_state_jid, next_stage, project_id) + elsif timeout_reached?(timeout_timer) && new_job_count == previous_job_count + + handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count) else - self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage) + self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count) end end @@ -55,12 +63,66 @@ module Gitlab end end - def find_import_state(project_id) + def find_import_state_jid(project_id) + raise NotImplementedError + end + + def find_import_state(id) raise NotImplementedError end private + def proceed_to_next_stage(import_state_jid, next_stage, project_id) + # We refresh the import JID here so workers importing individual + # resources (e.g. notes) don't have to do this all the time, reducing + # the pressure on Redis. We _only_ do this once all jobs are done so + # we don't get stuck forever if one or more jobs failed to notify the + # JobWaiter. + import_state_jid.refresh_jid_expiration + + next_stage_worker(next_stage).perform_async(project_id) + end + + def handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count) + project = Project.find_by_id(project_id) + strategy = project.import_data&.data&.dig("timeout_strategy") || ProjectImportData::PESSIMISTIC_TIMEOUT + + Gitlab::Import::Logger.info( + message: 'Timeout reached, no longer retrying', + project_id: project_id, + jobs_remaining: new_job_count, + waiters: new_waiters, + timeout_strategy: strategy + ) + + clear_waiter_caches(new_waiters) + + case strategy + when ProjectImportData::OPTIMISTIC_TIMEOUT + proceed_to_next_stage(import_state_jid, next_stage, project_id) + when ProjectImportData::PESSIMISTIC_TIMEOUT + import_state = find_import_state(import_state_jid.id) + fail_import_and_log_status(import_state) + end + end + + def fail_import_and_log_status(import_state) + raise AdvanceStageTimeoutError, "Failing advance stage, timeout reached with pessimistic strategy" + rescue AdvanceStageTimeoutError => e + Gitlab::Import::ImportFailureService.track( + import_state: import_state, + exception: e, + error_source: self.class.name, + fail_import: true + ) + end + + def timeout_reached?(timeout_timer) + timeout_timer = Time.zone.parse(timeout_timer) if timeout_timer.is_a?(String) + Time.zone.now > timeout_timer + TIMEOUT_DURATION + end + def next_stage_worker(next_stage) raise NotImplementedError end |