1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# frozen_string_literal: true
module Ci
class UnlockPipelineService
include BaseServiceUtility
include ::Gitlab::ExclusiveLeaseHelpers
ExecutionTimeoutError = Class.new(StandardError)
BATCH_SIZE = 100
MAX_EXEC_DURATION = 10.minutes.freeze
LOCK_TIMEOUT = (MAX_EXEC_DURATION + 1.minute).freeze
def initialize(pipeline)
@pipeline = pipeline
@already_leased = false
@already_unlocked = false
@exec_timeout = false
@unlocked_job_artifacts_count = 0
@unlocked_pipeline_artifacts_count = 0
end
def execute
unlock_pipeline_exclusively
success(
skipped_already_leased: already_leased,
skipped_already_unlocked: already_unlocked,
exec_timeout: exec_timeout,
unlocked_job_artifacts: unlocked_job_artifacts_count,
unlocked_pipeline_artifacts: unlocked_pipeline_artifacts_count
)
end
private
attr_reader :pipeline, :already_leased, :already_unlocked, :exec_timeout,
:unlocked_job_artifacts_count, :unlocked_pipeline_artifacts_count
def unlock_pipeline_exclusively
in_lock(lock_key, ttl: LOCK_TIMEOUT, retries: 0) do
# Even though we enforce uniqueness when enqueueing pipelines, there is still a rare race condition chance that
# a pipeline can be re-enqueued right after a worker pops off the same pipeline ID from the queue, and then just
# after it completing the unlock process and releasing the lock, another worker picks up the re-enqueued
# pipeline ID. So let's make sure to only unlock artifacts if the pipeline has not been unlocked.
if pipeline.unlocked?
@already_unlocked = true
break
end
unlock_job_artifacts
unlock_pipeline_artifacts
# Marking the row in `ci_pipelines` to unlocked signifies that all artifacts have
# already been unlocked. This must always happen last.
unlock_pipeline
end
rescue ExecutionTimeoutError
@exec_timeout = true
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
@already_leased = true
ensure
if pipeline.unlocked?
Ci::UnlockPipelineRequest.log_event(:completed, pipeline.id) unless already_unlocked
else
# This is to ensure to re-enqueue the pipeline in 2 occasions:
# 1. When an unexpected error happens.
# 2. When the execution timeout has been reached in the case of a pipeline having a lot of
# job artifacts. This allows us to continue unlocking the rest of the artifacts from
# where we left off. This is why we unlock the pipeline last.
Ci::UnlockPipelineRequest.enqueue(pipeline.id)
Ci::UnlockPipelineRequest.log_event(:re_enqueued, pipeline.id)
end
end
def lock_key
"ci:unlock_pipeline_service:lock:#{pipeline.id}"
end
def unlock_pipeline
pipeline.update_column(:locked, Ci::Pipeline.lockeds[:unlocked])
end
def unlock_job_artifacts
start = Time.current
builds_relation.each_batch(of: BATCH_SIZE) do |builds|
# rubocop: disable CodeReuse/ActiveRecord
Ci::JobArtifact.where(job_id: builds.pluck(:id)).each_batch(of: BATCH_SIZE) do |job_artifacts|
unlocked_count = Ci::JobArtifact
.where(id: job_artifacts.pluck(:id))
.update_all(locked: :unlocked)
@unlocked_job_artifacts_count ||= 0
@unlocked_job_artifacts_count += unlocked_count
raise ExecutionTimeoutError if (Time.current - start) > MAX_EXEC_DURATION
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
# Removes the partition_id filter from the query until we get more data in the
# second partition.
def builds_relation
if Feature.enabled?(:disable_ci_partition_pruning, pipeline.project, type: :wip)
Ci::Build.in_pipelines(pipeline)
else
pipeline.builds
end
end
def unlock_pipeline_artifacts
@unlocked_pipeline_artifacts_count = pipeline.pipeline_artifacts.update_all(locked: :unlocked)
end
end
end
|