Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-11-17 14:33:21 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-11-17 14:33:21 +0300
commit7021455bd1ed7b125c55eb1b33c5a01f2bc55ee0 (patch)
tree5bdc2229f5198d516781f8d24eace62fc7e589e9 /app/workers/bulk_imports
parent185b095e93520f96e9cfc31d9c3e69b498cdab7c (diff)
Add latest changes from gitlab-org/gitlab@15-6-stable-eev15.6.0-rc42
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r--app/workers/bulk_imports/entity_worker.rb45
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb79
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb79
3 files changed, 132 insertions, 71 deletions
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index ada3210624c..d23d57c33ab 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -12,13 +12,18 @@ module BulkImports
worker_has_external_dependencies!
def perform(entity_id, current_stage = nil)
+ @entity = ::BulkImports::Entity.find(entity_id)
+
if stage_running?(entity_id, current_stage)
logger.info(
structured_payload(
bulk_import_entity_id: entity_id,
- bulk_import_id: bulk_import_id(entity_id),
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
current_stage: current_stage,
message: 'Stage running',
+ source_version: source_version,
importer: 'gitlab_migration'
)
)
@@ -29,9 +34,12 @@ module BulkImports
logger.info(
structured_payload(
bulk_import_entity_id: entity_id,
- bulk_import_id: bulk_import_id(entity_id),
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
current_stage: current_stage,
message: 'Stage starting',
+ source_version: source_version,
importer: 'gitlab_migration'
)
)
@@ -44,23 +52,34 @@ module BulkImports
)
end
rescue StandardError => e
- logger.error(
- structured_payload(
+ log_exception(e,
+ {
bulk_import_entity_id: entity_id,
- bulk_import_id: bulk_import_id(entity_id),
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
current_stage: current_stage,
- message: e.message,
+ message: 'Entity failed',
+ source_version: source_version,
importer: 'gitlab_migration'
- )
+ }
)
Gitlab::ErrorTracking.track_exception(
- e, bulk_import_entity_id: entity_id, bulk_import_id: bulk_import_id(entity_id), importer: 'gitlab_migration'
+ e,
+ bulk_import_entity_id: entity_id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ source_version: source_version,
+ importer: 'gitlab_migration'
)
end
private
+ attr_reader :entity
+
def stage_running?(entity_id, stage)
return unless stage
@@ -71,12 +90,18 @@ module BulkImports
BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
- def bulk_import_id(entity_id)
- @bulk_import_id ||= Entity.find(entity_id).bulk_import_id
+ def source_version
+ entity.bulk_import.source_version_info.to_s
end
def logger
@logger ||= Gitlab::Import::Logger.build
end
+
+ def log_exception(exception, payload)
+ Gitlab::ExceptionLogFormatter.format!(exception, payload)
+
+ logger.error(structured_payload(payload))
+ end
end
end
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index a57071ddcf1..1a5f6250429 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -22,7 +22,19 @@ module BulkImports
if e.retriable?(entity)
retry_request(e, entity)
else
- log_export_failure(e, entity)
+ log_exception(e,
+ {
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ message: "Request to export #{entity.source_type} failed",
+ source_version: entity.bulk_import.source_version_info.to_s,
+ importer: 'gitlab_migration'
+ }
+ )
+
+ BulkImports::Failure.create(failure_attributes(e, entity))
entity.fail_op!
end
@@ -41,22 +53,7 @@ module BulkImports
)
end
- def log_export_failure(exception, entity)
- Gitlab::Import::Logger.error(
- structured_payload(
- log_attributes(exception, entity).merge(
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- message: "Request to export #{entity.source_type} failed",
- importer: 'gitlab_migration'
- )
- )
- )
-
- BulkImports::Failure.create(log_attributes(exception, entity))
- end
-
- def log_attributes(exception, entity)
+ def failure_attributes(exception, entity)
{
bulk_import_entity_id: entity.id,
pipeline_class: 'ExportRequestWorker',
@@ -84,15 +81,16 @@ module BulkImports
::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id
rescue StandardError => e
- Gitlab::Import::Logger.error(
- structured_payload(
- log_attributes(e, entity).merge(
- message: 'Failed to fetch source entity id',
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- importer: 'gitlab_migration'
- )
- )
+ log_exception(e,
+ {
+ message: 'Failed to fetch source entity id',
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ source_version: entity.bulk_import.source_version_info.to_s,
+ importer: 'gitlab_migration'
+ }
)
nil
@@ -107,18 +105,29 @@ module BulkImports
end
def retry_request(exception, entity)
- Gitlab::Import::Logger.error(
- structured_payload(
- log_attributes(exception, entity).merge(
- message: 'Retrying export request',
- bulk_import_id: entity.bulk_import_id,
- bulk_import_entity_type: entity.source_type,
- importer: 'gitlab_migration'
- )
- )
+ log_exception(exception,
+ {
+ message: 'Retrying export request',
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
+ source_version: entity.bulk_import.source_version_info.to_s,
+ importer: 'gitlab_migration'
+ }
)
self.class.perform_in(2.seconds, entity.id)
end
+
+ def logger
+ @logger ||= Gitlab::Import::Logger.build
+ end
+
+ def log_exception(exception, payload)
+ Gitlab::ExceptionLogFormatter.format!(exception, payload)
+
+ logger.error(structured_payload(payload))
+ end
end
end
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 6d314774cff..5716f6e3f31 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -17,24 +17,34 @@ module BulkImports
.find_by_id(pipeline_tracker_id)
if pipeline_tracker.present?
+ @entity = @pipeline_tracker.entity
+
logger.info(
structured_payload(
- bulk_import_entity_id: pipeline_tracker.entity.id,
- bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
pipeline_name: pipeline_tracker.pipeline_name,
message: 'Pipeline starting',
+ source_version: source_version,
importer: 'gitlab_migration'
)
)
run
else
+ @entity = ::BulkImports::Entity.find(entity_id)
+
logger.error(
structured_payload(
bulk_import_entity_id: entity_id,
- bulk_import_id: bulk_import_id(entity_id),
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
pipeline_tracker_id: pipeline_tracker_id,
message: 'Unstarted pipeline not found',
+ source_version: source_version,
importer: 'gitlab_migration'
)
)
@@ -46,10 +56,10 @@ module BulkImports
private
- attr_reader :pipeline_tracker
+ attr_reader :pipeline_tracker, :entity
def run
- return skip_tracker if pipeline_tracker.entity.failed?
+ return skip_tracker if entity.failed?
raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?
raise(Pipeline::FailedError, "Export from source instance failed: #{export_status.error}") if export_failed?
@@ -65,33 +75,39 @@ module BulkImports
fail_tracker(e)
end
- def bulk_import_id(entity_id)
- @bulk_import_id ||= Entity.find(entity_id).bulk_import_id
+ def source_version
+ entity.bulk_import.source_version_info.to_s
end
def fail_tracker(exception)
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
- logger.error(
- structured_payload(
- bulk_import_entity_id: pipeline_tracker.entity.id,
- bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ log_exception(exception,
+ {
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
pipeline_name: pipeline_tracker.pipeline_name,
- message: exception.message,
+ message: 'Pipeline failed',
+ source_version: source_version,
importer: 'gitlab_migration'
- )
+ }
)
Gitlab::ErrorTracking.track_exception(
exception,
- bulk_import_entity_id: pipeline_tracker.entity.id,
- bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
pipeline_name: pipeline_tracker.pipeline_name,
+ source_version: source_version,
importer: 'gitlab_migration'
)
BulkImports::Failure.create(
- bulk_import_entity_id: context.entity.id,
+ bulk_import_entity_id: entity.id,
pipeline_class: pipeline_tracker.pipeline_name,
pipeline_step: 'pipeline_worker_run',
exception_class: exception.class.to_s,
@@ -109,7 +125,7 @@ module BulkImports
delay,
pipeline_tracker.id,
pipeline_tracker.stage,
- pipeline_tracker.entity.id
+ entity.id
)
end
@@ -128,7 +144,7 @@ module BulkImports
def job_timeout?
return false unless file_extraction_pipeline?
- (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
+ (Time.zone.now - entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
end
def export_failed?
@@ -150,14 +166,17 @@ module BulkImports
end
def retry_tracker(exception)
- logger.error(
- structured_payload(
- bulk_import_entity_id: pipeline_tracker.entity.id,
- bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ log_exception(exception,
+ {
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
pipeline_name: pipeline_tracker.pipeline_name,
- message: "Retrying error: #{exception.message}",
+ message: "Retrying pipeline",
+ source_version: source_version,
importer: 'gitlab_migration'
- )
+ }
)
pipeline_tracker.update!(status_event: 'retry', jid: jid)
@@ -168,15 +187,23 @@ module BulkImports
def skip_tracker
logger.info(
structured_payload(
- bulk_import_entity_id: pipeline_tracker.entity.id,
- bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ bulk_import_entity_id: entity.id,
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ source_full_path: entity.source_full_path,
pipeline_name: pipeline_tracker.pipeline_name,
message: 'Skipping pipeline due to failed entity',
+ source_version: source_version,
importer: 'gitlab_migration'
)
)
pipeline_tracker.update!(status_event: 'skip', jid: jid)
end
+
+ def log_exception(exception, payload)
+ Gitlab::ExceptionLogFormatter.format!(exception, payload)
+ logger.error(structured_payload(payload))
+ end
end
end