diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2022-06-20 14:10:13 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2022-06-20 14:10:13 +0300 |
commit | 0ea3fcec397b69815975647f5e2aa5fe944a8486 (patch) | |
tree | 7979381b89d26011bcf9bdc989a40fcc2f1ed4ff /app/workers/bulk_imports | |
parent | 72123183a20411a36d607d70b12d57c484394c8e (diff) |
Add latest changes from gitlab-org/gitlab@15-1-stable-eev15.1.0-rc42
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 112 |
1 files changed, 72 insertions, 40 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index b515f0fa202..9c95e25e2e8 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -12,7 +12,7 @@ module BulkImports worker_has_external_dependencies! def perform(pipeline_tracker_id, stage, entity_id) - pipeline_tracker = ::BulkImports::Tracker + @pipeline_tracker = ::BulkImports::Tracker .with_status(:enqueued) .find_by_id(pipeline_tracker_id) @@ -24,7 +24,7 @@ module BulkImports ) ) - run(pipeline_tracker) + run else logger.error( structured_payload( @@ -41,48 +41,29 @@ module BulkImports private - def run(pipeline_tracker) - if pipeline_tracker.entity.failed? - raise(Entity::FailedError, 'Failed entity status') - end - - if file_extraction_pipeline?(pipeline_tracker) - export_status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation) + attr_reader :pipeline_tracker - raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker) - raise(Pipeline::FailedError, export_status.error) if export_status.failed? + def run + raise(Entity::FailedError, 'Failed entity status') if pipeline_tracker.entity.failed? + raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout? + raise(Pipeline::FailedError, export_status.error) if export_failed? - return reenqueue(pipeline_tracker) if export_status.started? - end + return re_enqueue if export_empty? || export_started? pipeline_tracker.update!(status_event: 'start', jid: jid) - - context = ::BulkImports::Pipeline::Context.new(pipeline_tracker) - pipeline_tracker.pipeline_class.new(context).run - pipeline_tracker.finish! rescue BulkImports::NetworkError => e if e.retriable?(pipeline_tracker) - logger.error( - structured_payload( - entity_id: pipeline_tracker.entity.id, - pipeline_name: pipeline_tracker.pipeline_name, - message: "Retrying error: #{e.message}" - ) - ) - - pipeline_tracker.update!(status_event: 'retry', jid: jid) - - reenqueue(pipeline_tracker, delay: e.retry_delay) + retry_tracker(e) else - fail_tracker(pipeline_tracker, e) + fail_tracker(e) end rescue StandardError => e - fail_tracker(pipeline_tracker, e) + fail_tracker(e) end - def fail_tracker(pipeline_tracker, exception) + def fail_tracker(exception) pipeline_tracker.update!(status_event: 'fail_op', jid: jid) logger.error( @@ -98,21 +79,22 @@ module BulkImports entity_id: pipeline_tracker.entity.id, pipeline_name: pipeline_tracker.pipeline_name ) + + BulkImports::Failure.create( + bulk_import_entity_id: context.entity.id, + pipeline_class: pipeline_tracker.pipeline_name, + pipeline_step: 'pipeline_worker_run', + exception_class: exception.class.to_s, + exception_message: exception.message.truncate(255), + correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id + ) end def logger @logger ||= Gitlab::Import::Logger.build end - def file_extraction_pipeline?(pipeline_tracker) - pipeline_tracker.pipeline_class.file_extraction_pipeline? - end - - def job_timeout?(pipeline_tracker) - (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT - end - - def reenqueue(pipeline_tracker, delay: FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) + def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY) self.class.perform_in( delay, pipeline_tracker.id, @@ -120,5 +102,55 @@ module BulkImports pipeline_tracker.entity.id ) end + + def context + @context ||= ::BulkImports::Pipeline::Context.new(pipeline_tracker) + end + + def export_status + @export_status ||= ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation) + end + + def file_extraction_pipeline? + pipeline_tracker.file_extraction_pipeline? + end + + def job_timeout? + return false unless file_extraction_pipeline? + + (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT + end + + def export_failed? + return false unless file_extraction_pipeline? + + export_status.failed? + end + + def export_started? + return false unless file_extraction_pipeline? + + export_status.started? + end + + def export_empty? + return false unless file_extraction_pipeline? + + export_status.empty? + end + + def retry_tracker(exception) + logger.error( + structured_payload( + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name, + message: "Retrying error: #{exception.message}" + ) + ) + + pipeline_tracker.update!(status_event: 'retry', jid: jid) + + re_enqueue(exception.retry_delay) + end end end |