Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb58
-rw-r--r--app/workers/bulk_imports/finish_batched_pipeline_worker.rb45
-rw-r--r--app/workers/bulk_imports/finish_batched_relation_export_worker.rb2
-rw-r--r--app/workers/bulk_imports/pipeline_batch_worker.rb81
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb23
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb4
6 files changed, 179 insertions, 34 deletions
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 530419dac26..44759916f99 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -15,35 +15,40 @@ module BulkImports
end
def perform(entity_id)
- entity = BulkImports::Entity.find(entity_id)
+ @entity = BulkImports::Entity.find(entity_id)
- entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil?
-
- request_export(entity)
+ set_source_xid
+ request_export
BulkImports::EntityWorker.perform_async(entity_id)
end
def perform_failure(exception, entity_id)
- entity = BulkImports::Entity.find(entity_id)
+ @entity = BulkImports::Entity.find(entity_id)
- log_and_fail(exception, entity)
+ log_and_fail(exception)
end
private
- def request_export(entity)
- http_client(entity).post(entity.export_relations_url_path)
+ attr_reader :entity
+
+ def set_source_xid
+ entity.update!(source_xid: entity_source_xid) if entity.source_xid.nil?
+ end
+
+ def request_export
+ http_client.post(export_url)
end
- def http_client(entity)
+ def http_client
@client ||= Clients::HTTP.new(
url: entity.bulk_import.configuration.url,
token: entity.bulk_import.configuration.access_token
)
end
- def failure_attributes(exception, entity)
+ def failure_attributes(exception)
{
bulk_import_entity_id: entity.id,
pipeline_class: 'ExportRequestWorker',
@@ -53,23 +58,20 @@ module BulkImports
}
end
- def graphql_client(entity)
+ def graphql_client
@graphql_client ||= BulkImports::Clients::Graphql.new(
url: entity.bulk_import.configuration.url,
token: entity.bulk_import.configuration.access_token
)
end
- def entity_source_xid(entity)
- query = entity_query(entity)
- client = graphql_client(entity)
-
- response = client.execute(
- client.parse(query.to_s),
+ def entity_source_xid
+ response = graphql_client.execute(
+ graphql_client.parse(entity_query.to_s),
{ full_path: entity.source_full_path }
).original_hash
- ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id
+ ::GlobalID.parse(response.dig(*entity_query.data_path, 'id')).model_id
rescue StandardError => e
log_exception(e,
{
@@ -86,12 +88,12 @@ module BulkImports
nil
end
- def entity_query(entity)
- if entity.group?
- BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
- else
- BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
- end
+ def entity_query
+ @entity_query ||= if entity.group?
+ BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
+ else
+ BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
+ end
end
def logger
@@ -104,7 +106,7 @@ module BulkImports
logger.error(structured_payload(payload))
end
- def log_and_fail(exception, entity)
+ def log_and_fail(exception)
log_exception(exception,
{
bulk_import_entity_id: entity.id,
@@ -117,9 +119,13 @@ module BulkImports
}
)
- BulkImports::Failure.create(failure_attributes(exception, entity))
+ BulkImports::Failure.create(failure_attributes(exception))
entity.fail_op!
end
+
+ def export_url
+ entity.export_relations_url_path(batched: Feature.enabled?(:bulk_imports_batched_import_export))
+ end
end
end
diff --git a/app/workers/bulk_imports/finish_batched_pipeline_worker.rb b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
new file mode 100644
index 00000000000..4200d0e4a0f
--- /dev/null
+++ b/app/workers/bulk_imports/finish_batched_pipeline_worker.rb
@@ -0,0 +1,45 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class FinishBatchedPipelineWorker
+ include ApplicationWorker
+ include ExceptionBacktrace
+
+ REQUEUE_DELAY = 5.seconds
+
+ idempotent!
+ deduplicate :until_executing
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ feature_category :importers
+
+ def perform(pipeline_tracker_id)
+ @tracker = Tracker.find(pipeline_tracker_id)
+
+ return unless tracker.batched?
+ return unless tracker.started?
+ return re_enqueue if import_in_progress?
+
+ if tracker.stale?
+ tracker.batches.map(&:fail_op!)
+ tracker.fail_op!
+ else
+ tracker.finish!
+ end
+
+ ensure
+ ::BulkImports::EntityWorker.perform_async(tracker.entity.id, tracker.stage)
+ end
+
+ private
+
+ attr_reader :tracker
+
+ def re_enqueue
+ self.class.perform_in(REQUEUE_DELAY, tracker.id)
+ end
+
+ def import_in_progress?
+ tracker.batches.any?(&:started?)
+ end
+ end
+end
diff --git a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
index aa7bbffa732..92a33a971e7 100644
--- a/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
+++ b/app/workers/bulk_imports/finish_batched_relation_export_worker.rb
@@ -5,7 +5,7 @@ module BulkImports
include ApplicationWorker
idempotent!
- data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ data_consistency :sticky
feature_category :importers
REENQUEUE_DELAY = 5.seconds
diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb
new file mode 100644
index 00000000000..378eff99b52
--- /dev/null
+++ b/app/workers/bulk_imports/pipeline_batch_worker.rb
@@ -0,0 +1,81 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class PipelineBatchWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+ include ExclusiveLeaseGuard
+
+ data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
+ feature_category :importers
+ sidekiq_options retry: false, dead: false
+ worker_has_external_dependencies!
+
+ def perform(batch_id)
+ @batch = ::BulkImports::BatchTracker.find(batch_id)
+ @tracker = @batch.tracker
+
+ try_obtain_lease { run }
+ ensure
+ ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
+ end
+
+ private
+
+ attr_reader :batch, :tracker
+
+ def run
+ return batch.skip! if tracker.failed? || tracker.finished?
+
+ batch.start!
+ tracker.pipeline_class.new(context).run
+ batch.finish!
+ rescue BulkImports::RetryPipelineError => e
+ retry_batch(e)
+ rescue StandardError => e
+ fail_batch(e)
+ end
+
+ def fail_batch(exception)
+ batch.fail_op!
+
+ Gitlab::ErrorTracking.track_exception(
+ exception,
+ batch_id: batch.id,
+ tracker_id: tracker.id,
+ pipeline_class: tracker.pipeline_name,
+ pipeline_step: 'pipeline_batch_worker_run'
+ )
+
+ BulkImports::Failure.create(
+ bulk_import_entity_id: batch.tracker.entity.id,
+ pipeline_class: tracker.pipeline_name,
+ pipeline_step: 'pipeline_batch_worker_run',
+ exception_class: exception.class.to_s,
+ exception_message: exception.message.truncate(255),
+ correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
+ )
+ end
+
+ def context
+ @context ||= ::BulkImports::Pipeline::Context.new(tracker, batch_number: batch.batch_number)
+ end
+
+ def retry_batch(exception)
+ batch.retry!
+
+ re_enqueue(exception.retry_delay)
+ end
+
+ def lease_timeout
+ 30
+ end
+
+ def lease_key
+ "gitlab:bulk_imports:pipeline_batch_worker:#{batch.id}"
+ end
+
+ def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ self.class.perform_in(delay, batch.id)
+ end
+ end
+end
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index f03e0bc0656..e0db18cb987 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -31,7 +31,6 @@ module BulkImports
fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped?
end
end
-
ensure
::BulkImports::EntityWorker.perform_async(entity_id, stage)
end
@@ -49,9 +48,17 @@ module BulkImports
return re_enqueue if export_empty? || export_started?
- pipeline_tracker.update!(status_event: 'start', jid: jid)
- pipeline_tracker.pipeline_class.new(context).run
- pipeline_tracker.finish!
+ if file_extraction_pipeline? && export_status.batched?
+ pipeline_tracker.update!(status_event: 'start', jid: jid, batched: true)
+
+ return pipeline_tracker.finish! if export_status.batches_count < 1
+
+ enqueue_batches
+ else
+ pipeline_tracker.update!(status_event: 'start', jid: jid)
+ pipeline_tracker.pipeline_class.new(context).run
+ pipeline_tracker.finish!
+ end
rescue BulkImports::RetryPipelineError => e
retry_tracker(e)
rescue StandardError => e
@@ -179,5 +186,13 @@ module BulkImports
time_since_tracker_created > Pipeline::NDJSON_EXPORT_TIMEOUT
end
+
+ def enqueue_batches
+ 1.upto(export_status.batches_count) do |batch_number|
+ batch = pipeline_tracker.batches.find_or_create_by!(batch_number: batch_number) # rubocop:disable CodeReuse/ActiveRecord
+
+ ::BulkImports::PipelineBatchWorker.perform_async(batch.id)
+ end
+ end
end
end
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index b6693f0b07d..531edc6c7a7 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -18,9 +18,7 @@ module BulkImports
portable = portable(portable_id, portable_class)
config = BulkImports::FileTransfer.config_for(portable)
- if Feature.enabled?(:bulk_imports_batched_import_export) &&
- Gitlab::Utils.to_boolean(batched) &&
- config.batchable_relation?(relation)
+ if Gitlab::Utils.to_boolean(batched) && config.batchable_relation?(relation)
BatchedRelationExportService.new(user, portable, relation, jid).execute
else
RelationExportService.new(user, portable, relation, jid).execute