Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-06-20 14:10:13 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-06-20 14:10:13 +0300
commit0ea3fcec397b69815975647f5e2aa5fe944a8486 (patch)
tree7979381b89d26011bcf9bdc989a40fcc2f1ed4ff /app/workers/bulk_imports
parent72123183a20411a36d607d70b12d57c484394c8e (diff)
Add latest changes from gitlab-org/gitlab@15-1-stable-eev15.1.0-rc42
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb112
1 files changed, 72 insertions, 40 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index b515f0fa202..9c95e25e2e8 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -12,7 +12,7 @@ module BulkImports
worker_has_external_dependencies!
def perform(pipeline_tracker_id, stage, entity_id)
- pipeline_tracker = ::BulkImports::Tracker
+ @pipeline_tracker = ::BulkImports::Tracker
.with_status(:enqueued)
.find_by_id(pipeline_tracker_id)
@@ -24,7 +24,7 @@ module BulkImports
)
)
- run(pipeline_tracker)
+ run
else
logger.error(
structured_payload(
@@ -41,48 +41,29 @@ module BulkImports
private
- def run(pipeline_tracker)
- if pipeline_tracker.entity.failed?
- raise(Entity::FailedError, 'Failed entity status')
- end
-
- if file_extraction_pipeline?(pipeline_tracker)
- export_status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation)
+ attr_reader :pipeline_tracker
- raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker)
- raise(Pipeline::FailedError, export_status.error) if export_status.failed?
+ def run
+ raise(Entity::FailedError, 'Failed entity status') if pipeline_tracker.entity.failed?
+ raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?
+ raise(Pipeline::FailedError, export_status.error) if export_failed?
- return reenqueue(pipeline_tracker) if export_status.started?
- end
+ return re_enqueue if export_empty? || export_started?
pipeline_tracker.update!(status_event: 'start', jid: jid)
-
- context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
-
pipeline_tracker.pipeline_class.new(context).run
-
pipeline_tracker.finish!
rescue BulkImports::NetworkError => e
if e.retriable?(pipeline_tracker)
- logger.error(
- structured_payload(
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: "Retrying error: #{e.message}"
- )
- )
-
- pipeline_tracker.update!(status_event: 'retry', jid: jid)
-
- reenqueue(pipeline_tracker, delay: e.retry_delay)
+ retry_tracker(e)
else
- fail_tracker(pipeline_tracker, e)
+ fail_tracker(e)
end
rescue StandardError => e
- fail_tracker(pipeline_tracker, e)
+ fail_tracker(e)
end
- def fail_tracker(pipeline_tracker, exception)
+ def fail_tracker(exception)
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
@@ -98,21 +79,22 @@ module BulkImports
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name
)
+
+ BulkImports::Failure.create(
+ bulk_import_entity_id: context.entity.id,
+ pipeline_class: pipeline_tracker.pipeline_name,
+ pipeline_step: 'pipeline_worker_run',
+ exception_class: exception.class.to_s,
+ exception_message: exception.message.truncate(255),
+ correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
+ )
end
def logger
@logger ||= Gitlab::Import::Logger.build
end
- def file_extraction_pipeline?(pipeline_tracker)
- pipeline_tracker.pipeline_class.file_extraction_pipeline?
- end
-
- def job_timeout?(pipeline_tracker)
- (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
- end
-
- def reenqueue(pipeline_tracker, delay: FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
+ def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
self.class.perform_in(
delay,
pipeline_tracker.id,
@@ -120,5 +102,55 @@ module BulkImports
pipeline_tracker.entity.id
)
end
+
+ def context
+ @context ||= ::BulkImports::Pipeline::Context.new(pipeline_tracker)
+ end
+
+ def export_status
+ @export_status ||= ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation)
+ end
+
+ def file_extraction_pipeline?
+ pipeline_tracker.file_extraction_pipeline?
+ end
+
+ def job_timeout?
+ return false unless file_extraction_pipeline?
+
+ (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
+ end
+
+ def export_failed?
+ return false unless file_extraction_pipeline?
+
+ export_status.failed?
+ end
+
+ def export_started?
+ return false unless file_extraction_pipeline?
+
+ export_status.started?
+ end
+
+ def export_empty?
+ return false unless file_extraction_pipeline?
+
+ export_status.empty?
+ end
+
+ def retry_tracker(exception)
+ logger.error(
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: "Retrying error: #{exception.message}"
+ )
+ )
+
+ pipeline_tracker.update!(status_event: 'retry', jid: jid)
+
+ re_enqueue(exception.retry_delay)
+ end
end
end