1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# frozen_string_literal: true
module Gitlab
module GithubGistsImport
module Importer
class GistsImporter
attr_reader :user, :client, :already_imported_cache_key
ALREADY_IMPORTED_CACHE_KEY = 'github-gists-importer/already-imported/%{user}'
RESULT_CONTEXT = Struct.new(:success?, :error, :waiter, :next_attempt_in, keyword_init: true)
def initialize(user, token)
@user = user
@client = Gitlab::GithubImport::Client.new(token, parallel: true)
@already_imported_cache_key = format(ALREADY_IMPORTED_CACHE_KEY, user: user.id)
end
def execute
waiter = spread_parallel_import
expire_already_imported_cache!
RESULT_CONTEXT.new(success?: true, waiter: waiter)
rescue Gitlab::GithubImport::RateLimitError => e
RESULT_CONTEXT.new(success?: false, error: e, next_attempt_in: client.rate_limit_resets_in)
rescue StandardError => e
RESULT_CONTEXT.new(success?: false, error: e)
end
private
def spread_parallel_import
waiter = JobWaiter.new
worker_arguments = fetch_gists_to_import.map { |gist_hash| [user.id, gist_hash, waiter.key] }
waiter.jobs_remaining = worker_arguments.size
schedule_bulk_perform(worker_arguments)
waiter
end
def fetch_gists_to_import
page_counter = Gitlab::GithubImport::PageCounter.new(user, :gists, 'github-gists-importer')
collection = []
client.each_page(:gists, nil, page: page_counter.current) do |page|
next unless page_counter.set(page.number)
collection += gists_from(page)
end
page_counter.expire!
collection
end
def gists_from(page)
page.objects.each.with_object([]) do |gist, page_collection|
gist = gist.to_h
next if already_imported?(gist)
page_collection << ::Gitlab::GithubGistsImport::Representation::Gist.from_api_response(gist).to_hash
mark_as_imported(gist)
end
end
def schedule_bulk_perform(worker_arguments)
# rubocop:disable Scalability/BulkPerformWithContext
Gitlab::ApplicationContext.with_context(user: user) do
Gitlab::GithubGistsImport::ImportGistWorker.bulk_perform_in(
1.second,
worker_arguments,
batch_size: 1000,
batch_delay: 1.minute
)
end
# rubocop:enable Scalability/BulkPerformWithContext
end
def already_imported?(gist)
Gitlab::Cache::Import::Caching.set_includes?(already_imported_cache_key, gist[:id])
end
def mark_as_imported(gist)
Gitlab::Cache::Import::Caching.set_add(already_imported_cache_key, gist[:id])
end
def expire_already_imported_cache!
Gitlab::Cache::Import::Caching
.expire(already_imported_cache_key, Gitlab::Cache::Import::Caching::SHORTER_TIMEOUT)
end
end
end
end
end
|