Welcome to mirror list, hosted at ThFree Co, Russian Federation.

unlock_pipeline_service.rb « ci « services « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: bd42871ffbeb2899d1a2ddb46327e281f24d5510 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# frozen_string_literal: true

module Ci
  class UnlockPipelineService
    include BaseServiceUtility
    include ::Gitlab::ExclusiveLeaseHelpers

    ExecutionTimeoutError = Class.new(StandardError)

    BATCH_SIZE = 100
    MAX_EXEC_DURATION = 10.minutes.freeze
    LOCK_TIMEOUT = (MAX_EXEC_DURATION + 1.minute).freeze

    def initialize(pipeline)
      @pipeline = pipeline
      @already_leased = false
      @already_unlocked = false
      @exec_timeout = false
      @unlocked_job_artifacts_count = 0
      @unlocked_pipeline_artifacts_count = 0
    end

    def execute
      unlock_pipeline_exclusively

      success(
        skipped_already_leased: already_leased,
        skipped_already_unlocked: already_unlocked,
        exec_timeout: exec_timeout,
        unlocked_job_artifacts: unlocked_job_artifacts_count,
        unlocked_pipeline_artifacts: unlocked_pipeline_artifacts_count
      )
    end

    private

    attr_reader :pipeline, :already_leased, :already_unlocked, :exec_timeout,
      :unlocked_job_artifacts_count, :unlocked_pipeline_artifacts_count

    def unlock_pipeline_exclusively
      in_lock(lock_key, ttl: LOCK_TIMEOUT, retries: 0) do
        # Even though we enforce uniqueness when enqueueing pipelines, there is still a rare race condition chance that
        # a pipeline can be re-enqueued right after a worker pops off the same pipeline ID from the queue, and then just
        # after it completing the unlock process and releasing the lock, another worker picks up the re-enqueued
        # pipeline ID. So let's make sure to only unlock artifacts if the pipeline has not been unlocked.
        if pipeline.unlocked?
          @already_unlocked = true
          break
        end

        unlock_job_artifacts
        unlock_pipeline_artifacts

        # Marking the row in `ci_pipelines` to unlocked signifies that all artifacts have
        # already been unlocked. This must always happen last.
        unlock_pipeline
      end
    rescue ExecutionTimeoutError
      @exec_timeout = true
    rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
      @already_leased = true
    ensure
      if pipeline.unlocked?
        Ci::UnlockPipelineRequest.log_event(:completed, pipeline.id) unless already_unlocked
      else
        # This is to ensure to re-enqueue the pipeline in 2 occasions:
        # 1. When an unexpected error happens.
        # 2. When the execution timeout has been reached in the case of a pipeline having a lot of
        #    job artifacts. This allows us to continue unlocking the rest of the artifacts from
        #    where we left off. This is why we unlock the pipeline last.
        Ci::UnlockPipelineRequest.enqueue(pipeline.id)
        Ci::UnlockPipelineRequest.log_event(:re_enqueued, pipeline.id)
      end
    end

    def lock_key
      "ci:unlock_pipeline_service:lock:#{pipeline.id}"
    end

    def unlock_pipeline
      pipeline.update_column(:locked, Ci::Pipeline.lockeds[:unlocked])
    end

    def unlock_job_artifacts
      start = Time.current

      builds_relation.each_batch(of: BATCH_SIZE) do |builds|
        # rubocop: disable CodeReuse/ActiveRecord
        Ci::JobArtifact.where(job_id: builds.pluck(:id)).each_batch(of: BATCH_SIZE) do |job_artifacts|
          unlocked_count = Ci::JobArtifact
            .where(id: job_artifacts.pluck(:id))
            .update_all(locked: :unlocked)

          @unlocked_job_artifacts_count ||= 0
          @unlocked_job_artifacts_count += unlocked_count

          raise ExecutionTimeoutError if (Time.current - start) > MAX_EXEC_DURATION
        end
        # rubocop: enable CodeReuse/ActiveRecord
      end
    end

    # Removes the partition_id filter from the query until we get more data in the
    # second partition.
    def builds_relation
      if Feature.enabled?(:disable_ci_partition_pruning, pipeline.project, type: :wip)
        Ci::Build.in_pipelines(pipeline)
      else
        pipeline.builds
      end
    end

    def unlock_pipeline_artifacts
      @unlocked_pipeline_artifacts_count = pipeline.pipeline_artifacts.update_all(locked: :unlocked)
    end
  end
end