Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2022-10-20 12:40:42 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2022-10-20 12:40:42 +0300
commitee664acb356f8123f4f6b00b73c1e1cf0866c7fb (patch)
treef8479f94a28f66654c6a4f6fb99bad6b4e86a40e /app/workers/bulk_imports
parent62f7d5c5b69180e82ae8196b7b429eeffc8e7b4f (diff)
Add latest changes from gitlab-org/gitlab@15-5-stable-eev15.5.0-rc42
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r--app/workers/bulk_imports/entity_worker.rb26
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb89
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb54
3 files changed, 139 insertions, 30 deletions
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index f6b1c693fe4..ada3210624c 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -15,9 +15,11 @@ module BulkImports
if stage_running?(entity_id, current_stage)
logger.info(
structured_payload(
- entity_id: entity_id,
+ bulk_import_entity_id: entity_id,
+ bulk_import_id: bulk_import_id(entity_id),
current_stage: current_stage,
- message: 'Stage running'
+ message: 'Stage running',
+ importer: 'gitlab_migration'
)
)
@@ -26,9 +28,11 @@ module BulkImports
logger.info(
structured_payload(
- entity_id: entity_id,
+ bulk_import_entity_id: entity_id,
+ bulk_import_id: bulk_import_id(entity_id),
current_stage: current_stage,
- message: 'Stage starting'
+ message: 'Stage starting',
+ importer: 'gitlab_migration'
)
)
@@ -42,13 +46,17 @@ module BulkImports
rescue StandardError => e
logger.error(
structured_payload(
- entity_id: entity_id,
+ bulk_import_entity_id: entity_id,
+ bulk_import_id: bulk_import_id(entity_id),
current_stage: current_stage,
- message: e.message
+ message: e.message,
+ importer: 'gitlab_migration'
)
)
- Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
+ Gitlab::ErrorTracking.track_exception(
+ e, bulk_import_entity_id: entity_id, bulk_import_id: bulk_import_id(entity_id), importer: 'gitlab_migration'
+ )
end
private
@@ -63,6 +71,10 @@ module BulkImports
BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
+ def bulk_import_id(entity_id)
+ @bulk_import_id ||= Entity.find(entity_id).bulk_import_id
+ end
+
def logger
@logger ||= Gitlab::Import::Logger.build
end
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 0d3e4f013dd..a57071ddcf1 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -13,45 +13,112 @@ module BulkImports
def perform(entity_id)
entity = BulkImports::Entity.find(entity_id)
+ entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil?
+
request_export(entity)
+
+ BulkImports::EntityWorker.perform_async(entity_id)
rescue BulkImports::NetworkError => e
- log_export_failure(e, entity)
+ if e.retriable?(entity)
+ retry_request(e, entity)
+ else
+ log_export_failure(e, entity)
- entity.fail_op!
+ entity.fail_op!
+ end
end
private
def request_export(entity)
- http_client(entity.bulk_import.configuration).post(entity.export_relations_url_path)
+ http_client(entity).post(entity.export_relations_url_path)
end
- def http_client(configuration)
+ def http_client(entity)
@client ||= Clients::HTTP.new(
- url: configuration.url,
- token: configuration.access_token
+ url: entity.bulk_import.configuration.url,
+ token: entity.bulk_import.configuration.access_token
)
end
def log_export_failure(exception, entity)
- attributes = {
+ Gitlab::Import::Logger.error(
+ structured_payload(
+ log_attributes(exception, entity).merge(
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ message: "Request to export #{entity.source_type} failed",
+ importer: 'gitlab_migration'
+ )
+ )
+ )
+
+ BulkImports::Failure.create(log_attributes(exception, entity))
+ end
+
+ def log_attributes(exception, entity)
+ {
bulk_import_entity_id: entity.id,
pipeline_class: 'ExportRequestWorker',
exception_class: exception.class.to_s,
exception_message: exception.message.truncate(255),
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
+ end
+
+ def graphql_client(entity)
+ @graphql_client ||= BulkImports::Clients::Graphql.new(
+ url: entity.bulk_import.configuration.url,
+ token: entity.bulk_import.configuration.access_token
+ )
+ end
+
+ def entity_source_xid(entity)
+ query = entity_query(entity)
+ client = graphql_client(entity)
+
+ response = client.execute(
+ client.parse(query.to_s),
+ { full_path: entity.source_full_path }
+ ).original_hash
+
+ ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id
+ rescue StandardError => e
+ Gitlab::Import::Logger.error(
+ structured_payload(
+ log_attributes(e, entity).merge(
+ message: 'Failed to fetch source entity id',
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ importer: 'gitlab_migration'
+ )
+ )
+ )
+
+ nil
+ end
+
+ def entity_query(entity)
+ if entity.group?
+ BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
+ else
+ BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
+ end
+ end
+ def retry_request(exception, entity)
Gitlab::Import::Logger.error(
structured_payload(
- attributes.merge(
- bulk_import_id: entity.bulk_import.id,
- bulk_import_entity_type: entity.source_type
+ log_attributes(exception, entity).merge(
+ message: 'Retrying export request',
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ importer: 'gitlab_migration'
)
)
)
- BulkImports::Failure.create(attributes)
+ self.class.perform_in(2.seconds, entity.id)
end
end
end
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index e171ec1e194..6d314774cff 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -19,8 +19,11 @@ module BulkImports
if pipeline_tracker.present?
logger.info(
structured_payload(
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name
+ bulk_import_entity_id: pipeline_tracker.entity.id,
+ bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: 'Pipeline starting',
+ importer: 'gitlab_migration'
)
)
@@ -28,9 +31,11 @@ module BulkImports
else
logger.error(
structured_payload(
- entity_id: entity_id,
+ bulk_import_entity_id: entity_id,
+ bulk_import_id: bulk_import_id(entity_id),
pipeline_tracker_id: pipeline_tracker_id,
- message: 'Unstarted pipeline not found'
+ message: 'Unstarted pipeline not found',
+ importer: 'gitlab_migration'
)
)
end
@@ -44,9 +49,10 @@ module BulkImports
attr_reader :pipeline_tracker
def run
- raise(Entity::FailedError, 'Failed entity status') if pipeline_tracker.entity.failed?
+ return skip_tracker if pipeline_tracker.entity.failed?
+
raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?
- raise(Pipeline::FailedError, export_status.error) if export_failed?
+ raise(Pipeline::FailedError, "Export from source instance failed: #{export_status.error}") if export_failed?
return re_enqueue if export_empty? || export_started?
@@ -59,21 +65,29 @@ module BulkImports
fail_tracker(e)
end
+ def bulk_import_id(entity_id)
+ @bulk_import_id ||= Entity.find(entity_id).bulk_import_id
+ end
+
def fail_tracker(exception)
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
structured_payload(
- entity_id: pipeline_tracker.entity.id,
+ bulk_import_entity_id: pipeline_tracker.entity.id,
+ bulk_import_id: pipeline_tracker.entity.bulk_import_id,
pipeline_name: pipeline_tracker.pipeline_name,
- message: exception.message
+ message: exception.message,
+ importer: 'gitlab_migration'
)
)
Gitlab::ErrorTracking.track_exception(
exception,
- entity_id: pipeline_tracker.entity.id,
- pipeline_name: pipeline_tracker.pipeline_name
+ bulk_import_entity_id: pipeline_tracker.entity.id,
+ bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ importer: 'gitlab_migration'
)
BulkImports::Failure.create(
@@ -138,9 +152,11 @@ module BulkImports
def retry_tracker(exception)
logger.error(
structured_payload(
- entity_id: pipeline_tracker.entity.id,
+ bulk_import_entity_id: pipeline_tracker.entity.id,
+ bulk_import_id: pipeline_tracker.entity.bulk_import_id,
pipeline_name: pipeline_tracker.pipeline_name,
- message: "Retrying error: #{exception.message}"
+ message: "Retrying error: #{exception.message}",
+ importer: 'gitlab_migration'
)
)
@@ -148,5 +164,19 @@ module BulkImports
re_enqueue(exception.retry_delay)
end
+
+ def skip_tracker
+ logger.info(
+ structured_payload(
+ bulk_import_entity_id: pipeline_tracker.entity.id,
+ bulk_import_id: pipeline_tracker.entity.bulk_import_id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: 'Skipping pipeline due to failed entity',
+ importer: 'gitlab_migration'
+ )
+ )
+
+ pipeline_tracker.update!(status_event: 'skip', jid: jid)
+ end
end
end