Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb127
1 files changed, 50 insertions, 77 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 5716f6e3f31..62e85d38e61 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -3,6 +3,7 @@
module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ include ExclusiveLeaseGuard
FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds
@@ -10,44 +11,24 @@ module BulkImports
feature_category :importers
sidekiq_options retry: false, dead: false
worker_has_external_dependencies!
+ deduplicate :until_executing
def perform(pipeline_tracker_id, stage, entity_id)
- @pipeline_tracker = ::BulkImports::Tracker
- .with_status(:enqueued)
- .find_by_id(pipeline_tracker_id)
-
- if pipeline_tracker.present?
- @entity = @pipeline_tracker.entity
-
- logger.info(
- structured_payload(
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: 'Pipeline starting',
- source_version: source_version,
- importer: 'gitlab_migration'
- )
- )
-
- run
- else
- @entity = ::BulkImports::Entity.find(entity_id)
-
- logger.error(
- structured_payload(
- bulk_import_entity_id: entity_id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_tracker_id: pipeline_tracker_id,
- message: 'Unstarted pipeline not found',
- source_version: source_version,
- importer: 'gitlab_migration'
- )
- )
+ @entity = ::BulkImports::Entity.find(entity_id)
+ @pipeline_tracker = ::BulkImports::Tracker.find(pipeline_tracker_id)
+
+ try_obtain_lease do
+ if pipeline_tracker.enqueued?
+ logger.info(log_attributes(message: 'Pipeline starting'))
+
+ run
+ else
+ message = "Pipeline in #{pipeline_tracker.human_status_name} state instead of expected enqueued state"
+
+ logger.error(log_attributes(message: message))
+
+ fail_tracker(StandardError.new(message)) unless pipeline_tracker.finished? || pipeline_tracker.skipped?
+ end
end
ensure
@@ -63,6 +44,7 @@ module BulkImports
raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?
raise(Pipeline::FailedError, "Export from source instance failed: #{export_status.error}") if export_failed?
+ raise(Pipeline::ExpiredError, 'Empty export status on source instance') if empty_export_timeout?
return re_enqueue if export_empty? || export_started?
@@ -82,29 +64,9 @@ module BulkImports
def fail_tracker(exception)
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
- log_exception(exception,
- {
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: 'Pipeline failed',
- source_version: source_version,
- importer: 'gitlab_migration'
- }
- )
+ log_exception(exception, log_attributes(message: 'Pipeline failed'))
- Gitlab::ErrorTracking.track_exception(
- exception,
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_name: pipeline_tracker.pipeline_name,
- source_version: source_version,
- importer: 'gitlab_migration'
- )
+ Gitlab::ErrorTracking.track_exception(exception, log_attributes)
BulkImports::Failure.create(
bulk_import_entity_id: entity.id,
@@ -144,7 +106,11 @@ module BulkImports
def job_timeout?
return false unless file_extraction_pipeline?
- (Time.zone.now - entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
+ time_since_entity_created > Pipeline::NDJSON_EXPORT_TIMEOUT
+ end
+
+ def empty_export_timeout?
+ export_empty? && time_since_entity_created > Pipeline::EMPTY_EXPORT_STATUS_TIMEOUT
end
def export_failed?
@@ -166,18 +132,7 @@ module BulkImports
end
def retry_tracker(exception)
- log_exception(exception,
- {
- bulk_import_entity_id: entity.id,
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- source_full_path: entity.source_full_path,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: "Retrying pipeline",
- source_version: source_version,
- importer: 'gitlab_migration'
- }
- )
+ log_exception(exception, log_attributes(message: "Retrying pipeline"))
pipeline_tracker.update!(status_event: 'retry', jid: jid)
@@ -185,25 +140,43 @@ module BulkImports
end
def skip_tracker
- logger.info(
- structured_payload(
+ logger.info(log_attributes(message: 'Skipping pipeline due to failed entity'))
+
+ pipeline_tracker.update!(status_event: 'skip', jid: jid)
+ end
+
+ def log_attributes(extra = {})
+ structured_payload(
+ {
bulk_import_entity_id: entity.id,
bulk_import_id: entity.bulk_import_id,
bulk_import_entity_type: entity.source_type,
source_full_path: entity.source_full_path,
+ pipeline_tracker_id: pipeline_tracker.id,
pipeline_name: pipeline_tracker.pipeline_name,
- message: 'Skipping pipeline due to failed entity',
+ pipeline_tracker_state: pipeline_tracker.human_status_name,
source_version: source_version,
importer: 'gitlab_migration'
- )
+ }.merge(extra)
)
-
- pipeline_tracker.update!(status_event: 'skip', jid: jid)
end
def log_exception(exception, payload)
Gitlab::ExceptionLogFormatter.format!(exception, payload)
+
logger.error(structured_payload(payload))
end
+
+ def time_since_entity_created
+ Time.zone.now - entity.created_at
+ end
+
+ def lease_timeout
+ 30
+ end
+
+ def lease_key
+ "gitlab:bulk_imports:pipeline_worker:#{pipeline_tracker.id}"
+ end
end
end