1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# frozen_string_literal: true
module Gitlab
module Import
module AdvanceStage
INTERVAL = 30.seconds.to_i
TIMEOUT_DURATION = 2.hours
AdvanceStageTimeoutError = Class.new(StandardError)
# The number of seconds to wait (while blocking the thread) before
# continuing to the next waiter.
BLOCKING_WAIT_TIME = 5
# project_id - The ID of the project being imported.
# waiters - A Hash mapping Gitlab::JobWaiter keys to the number of
# remaining jobs.
# next_stage - The name of the next stage to start when all jobs have been
# completed.
# timeout_timer - Time the sidekiq worker was first initiated with the current job_count
# previous_job_count - Number of jobs remaining on last invocation of this worker
def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now.to_s, previous_job_count = nil)
import_state_jid = find_import_state_jid(project_id)
# If the import state is nil the project may have been deleted or the import
# may have failed or been canceled. In this case we tidy up the cache data and no
# longer attempt to advance to the next stage.
if import_state_jid.nil?
clear_waiter_caches(waiters)
return
end
new_waiters = wait_for_jobs(waiters)
new_job_count = new_waiters.values.sum
# Reset the timeout timer as some jobs finished processing
if new_job_count != previous_job_count
timeout_timer = Time.zone.now
previous_job_count = new_job_count
end
if new_waiters.empty?
proceed_to_next_stage(import_state_jid, next_stage, project_id)
elsif timeout_reached?(timeout_timer) && new_job_count == previous_job_count
handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count)
else
self.class.perform_in(INTERVAL,
project_id, new_waiters.deep_stringify_keys, next_stage.to_s, timeout_timer.to_s, previous_job_count
)
end
end
def wait_for_jobs(waiters)
waiters.each_with_object({}) do |(key, remaining), new_waiters|
waiter = JobWaiter.new(remaining, key)
# We wait for a brief moment of time so we don't reschedule if we can
# complete the work fast enough.
waiter.wait(BLOCKING_WAIT_TIME)
next unless waiter.jobs_remaining > 0
new_waiters[waiter.key] = waiter.jobs_remaining
end
end
def find_import_state_jid(project_id)
raise NotImplementedError
end
def find_import_state(id)
raise NotImplementedError
end
private
def proceed_to_next_stage(import_state_jid, next_stage, project_id)
# We refresh the import JID here so workers importing individual
# resources (e.g. notes) don't have to do this all the time, reducing
# the pressure on Redis. We _only_ do this once all jobs are done so
# we don't get stuck forever if one or more jobs failed to notify the
# JobWaiter.
import_state_jid.refresh_jid_expiration
next_stage_worker(next_stage).perform_async(project_id)
end
def handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count)
project = Project.find_by_id(project_id)
strategy = project.import_data&.data&.dig("timeout_strategy") || ProjectImportData::PESSIMISTIC_TIMEOUT
Gitlab::Import::Logger.info(
message: 'Timeout reached, no longer retrying',
project_id: project_id,
jobs_remaining: new_job_count,
waiters: new_waiters,
timeout_strategy: strategy
)
clear_waiter_caches(new_waiters)
case strategy
when ProjectImportData::OPTIMISTIC_TIMEOUT
proceed_to_next_stage(import_state_jid, next_stage, project_id)
when ProjectImportData::PESSIMISTIC_TIMEOUT
import_state = find_import_state(import_state_jid.id)
fail_import_and_log_status(import_state)
end
end
def fail_import_and_log_status(import_state)
raise AdvanceStageTimeoutError, "Failing advance stage, timeout reached with pessimistic strategy"
rescue AdvanceStageTimeoutError => e
Gitlab::Import::ImportFailureService.track(
import_state: import_state,
exception: e,
error_source: self.class.name,
fail_import: true
)
end
def timeout_reached?(timeout_timer)
timeout_timer = Time.zone.parse(timeout_timer) if timeout_timer.is_a?(String)
Time.zone.now > timeout_timer + TIMEOUT_DURATION
end
def next_stage_worker(next_stage)
raise NotImplementedError
end
def clear_waiter_caches(waiters)
waiters.each_key do |key|
JobWaiter.delete_key(key)
end
end
end
end
end
|