Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r--app/workers/bulk_imports/entity_worker.rb37
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb10
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb41
-rw-r--r--app/workers/bulk_imports/relation_export_worker.rb4
-rw-r--r--app/workers/bulk_imports/stuck_import_worker.rb31
5 files changed, 83 insertions, 40 deletions
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index 70d6626df91..f6b1c693fe4 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -4,24 +4,32 @@ module BulkImports
class EntityWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ idempotent!
+ deduplicate :until_executing
data_consistency :always
-
feature_category :importers
-
sidekiq_options retry: false, dead: false
-
worker_has_external_dependencies!
- idempotent!
- deduplicate :until_executed, including_scheduled: true
-
def perform(entity_id, current_stage = nil)
- return if stage_running?(entity_id, current_stage)
+ if stage_running?(entity_id, current_stage)
+ logger.info(
+ structured_payload(
+ entity_id: entity_id,
+ current_stage: current_stage,
+ message: 'Stage running'
+ )
+ )
+
+ return
+ end
logger.info(
- worker: self.class.name,
- entity_id: entity_id,
- current_stage: current_stage
+ structured_payload(
+ entity_id: entity_id,
+ current_stage: current_stage,
+ message: 'Stage starting'
+ )
)
next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
@@ -33,10 +41,11 @@ module BulkImports
end
rescue StandardError => e
logger.error(
- worker: self.class.name,
- entity_id: entity_id,
- current_stage: current_stage,
- error_message: e.message
+ structured_payload(
+ entity_id: entity_id,
+ current_stage: current_stage,
+ message: e.message
+ )
)
Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 21040178cee..0d3e4f013dd 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -42,10 +42,12 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
- Gitlab::Import::Logger.warn(
- attributes.merge(
- bulk_import_id: entity.bulk_import.id,
- bulk_import_entity_type: entity.source_type
+ Gitlab::Import::Logger.error(
+ structured_payload(
+ attributes.merge(
+ bulk_import_id: entity.bulk_import.id,
+ bulk_import_entity_type: entity.source_type
+ )
)
)
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 03ec2f058ca..1a98705c151 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -4,14 +4,11 @@ module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
- data_consistency :always
-
- NDJSON_PIPELINE_PERFORM_DELAY = 1.minute
+ NDJSON_PIPELINE_PERFORM_DELAY = 10.seconds
+ data_consistency :always
feature_category :importers
-
sidekiq_options retry: false, dead: false
-
worker_has_external_dependencies!
def perform(pipeline_tracker_id, stage, entity_id)
@@ -21,18 +18,20 @@ module BulkImports
if pipeline_tracker.present?
logger.info(
- worker: self.class.name,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name
+ )
)
run(pipeline_tracker)
else
logger.error(
- worker: self.class.name,
- entity_id: entity_id,
- pipeline_tracker_id: pipeline_tracker_id,
- message: 'Unstarted pipeline not found'
+ structured_payload(
+ entity_id: entity_id,
+ pipeline_tracker_id: pipeline_tracker_id,
+ message: 'Unstarted pipeline not found'
+ )
)
end
@@ -66,10 +65,11 @@ module BulkImports
rescue BulkImports::NetworkError => e
if e.retriable?(pipeline_tracker)
logger.error(
- worker: self.class.name,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: "Retrying error: #{e.message}"
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: "Retrying error: #{e.message}"
+ )
)
pipeline_tracker.update!(status_event: 'retry', jid: jid)
@@ -86,10 +86,11 @@ module BulkImports
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
- worker: self.class.name,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name,
- message: exception.message
+ structured_payload(
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: exception.message
+ )
)
Gitlab::ErrorTracking.track_exception(
diff --git a/app/workers/bulk_imports/relation_export_worker.rb b/app/workers/bulk_imports/relation_export_worker.rb
index 9324b79cc75..dcac841b3b2 100644
--- a/app/workers/bulk_imports/relation_export_worker.rb
+++ b/app/workers/bulk_imports/relation_export_worker.rb
@@ -3,12 +3,12 @@
module BulkImports
class RelationExportWorker
include ApplicationWorker
-
- data_consistency :always
include ExceptionBacktrace
idempotent!
+ deduplicate :until_executed
loggable_arguments 2, 3
+ data_consistency :always
feature_category :importers
sidekiq_options status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
diff --git a/app/workers/bulk_imports/stuck_import_worker.rb b/app/workers/bulk_imports/stuck_import_worker.rb
new file mode 100644
index 00000000000..3fa4221728b
--- /dev/null
+++ b/app/workers/bulk_imports/stuck_import_worker.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class StuckImportWorker
+ include ApplicationWorker
+
+ # This worker does not schedule other workers that require context.
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+ idempotent!
+ data_consistency :always
+
+ feature_category :importers
+
+ def perform
+ BulkImport.stale.find_each do |import|
+ import.cleanup_stale
+ end
+
+ BulkImports::Entity.includes(:trackers).stale.find_each do |import| # rubocop: disable CodeReuse/ActiveRecord
+ ApplicationRecord.transaction do
+ import.cleanup_stale
+
+ import.trackers.find_each do |tracker|
+ tracker.cleanup_stale
+ end
+ end
+ end
+ end
+ end
+end