diff options
Diffstat (limited to 'app/services/ci/unlock_pipeline_service.rb')
-rw-r--r-- | app/services/ci/unlock_pipeline_service.rb | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/app/services/ci/unlock_pipeline_service.rb b/app/services/ci/unlock_pipeline_service.rb new file mode 100644 index 00000000000..88d4a8fd0be --- /dev/null +++ b/app/services/ci/unlock_pipeline_service.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true + +module Ci + class UnlockPipelineService + include BaseServiceUtility + include ::Gitlab::ExclusiveLeaseHelpers + + ExecutionTimeoutError = Class.new(StandardError) + + BATCH_SIZE = 100 + MAX_EXEC_DURATION = 10.minutes.freeze + LOCK_TIMEOUT = (MAX_EXEC_DURATION + 1.minute).freeze + + def initialize(pipeline) + @pipeline = pipeline + @already_leased = false + @already_unlocked = false + @exec_timeout = false + @unlocked_job_artifacts_count = 0 + @unlocked_pipeline_artifacts_count = 0 + end + + def execute + unlock_pipeline_exclusively + + success( + skipped_already_leased: already_leased, + skipped_already_unlocked: already_unlocked, + exec_timeout: exec_timeout, + unlocked_job_artifacts: unlocked_job_artifacts_count, + unlocked_pipeline_artifacts: unlocked_pipeline_artifacts_count + ) + end + + private + + attr_reader :pipeline, :already_leased, :already_unlocked, :exec_timeout, + :unlocked_job_artifacts_count, :unlocked_pipeline_artifacts_count + + def unlock_pipeline_exclusively + in_lock(lock_key, ttl: LOCK_TIMEOUT, retries: 0) do + # Even though we enforce uniqueness when enqueueing pipelines, there is still a rare race condition chance that + # a pipeline can be re-enqueued right after a worker pops off the same pipeline ID from the queue, and then just + # after it completing the unlock process and releasing the lock, another worker picks up the re-enqueued + # pipeline ID. So let's make sure to only unlock artifacts if the pipeline has not been unlocked. + if pipeline.unlocked? + @already_unlocked = true + break + end + + unlock_job_artifacts + unlock_pipeline_artifacts + + # Marking the row in `ci_pipelines` to unlocked signifies that all artifacts have + # already been unlocked. This must always happen last. + unlock_pipeline + end + rescue ExecutionTimeoutError + @exec_timeout = true + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError + @already_leased = true + ensure + if pipeline.unlocked? + Ci::UnlockPipelineRequest.log_event(:completed, pipeline.id) unless already_unlocked + else + # This is to ensure to re-enqueue the pipeline in 2 occasions: + # 1. When an unexpected error happens. + # 2. When the execution timeout has been reached in the case of a pipeline having a lot of + # job artifacts. This allows us to continue unlocking the rest of the artifacts from + # where we left off. This is why we unlock the pipeline last. + Ci::UnlockPipelineRequest.enqueue(pipeline.id) + Ci::UnlockPipelineRequest.log_event(:re_enqueued, pipeline.id) + end + end + + def lock_key + "ci:unlock_pipeline_service:lock:#{pipeline.id}" + end + + def unlock_pipeline + pipeline.update_column(:locked, Ci::Pipeline.lockeds[:unlocked]) + end + + def unlock_job_artifacts + start = Time.current + + pipeline.builds.each_batch(of: BATCH_SIZE) do |builds| + # rubocop: disable CodeReuse/ActiveRecord + Ci::JobArtifact.where(job_id: builds.pluck(:id)).each_batch(of: BATCH_SIZE) do |job_artifacts| + unlocked_count = Ci::JobArtifact + .where(id: job_artifacts.pluck(:id)) + .update_all(locked: :unlocked) + + @unlocked_job_artifacts_count ||= 0 + @unlocked_job_artifacts_count += unlocked_count + + raise ExecutionTimeoutError if (Time.current - start) > MAX_EXEC_DURATION + end + # rubocop: enable CodeReuse/ActiveRecord + end + end + + def unlock_pipeline_artifacts + @unlocked_pipeline_artifacts_count = pipeline.pipeline_artifacts.update_all(locked: :unlocked) + end + end +end |