blob: aa9ff7000f19e1db605002c1fa06ecb81fa4bc3a (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# frozen_string_literal: true
module Gitlab
module BitbucketImport
module Importers
class LfsObjectsImporter
include ParallelScheduling
def execute
log_info(import_stage: 'import_lfs_objects', message: 'starting')
download_service = Projects::LfsPointers::LfsObjectDownloadListService.new(project)
begin
queue_workers(download_service) if project.lfs_enabled?
rescue StandardError => e
track_import_failure!(project, exception: e)
end
log_info(import_stage: 'import_lfs_objects', message: 'finished')
job_waiter
end
private
def sidekiq_worker_class
ImportLfsObjectWorker
end
def collection_method
:lfs_objects
end
def id_for_already_enqueued_cache(object)
object.oid
end
def queue_workers(download_service)
download_service.each_list_item do |lfs_download_object|
# Needs to come before `already_enqueued?` as `jobs_remaining` resets to zero when the job restarts and
# jobs_remaining needs to be the total amount of enqueued jobs
job_waiter.jobs_remaining += 1
next if already_enqueued?(lfs_download_object)
job_delay = calculate_job_delay(job_waiter.jobs_remaining)
sidekiq_worker_class.perform_in(job_delay, project.id, lfs_download_object.to_hash, job_waiter.key)
mark_as_enqueued(lfs_download_object)
end
end
end
end
end
end
|