diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2022-10-20 12:40:42 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2022-10-20 12:40:42 +0300 |
commit | ee664acb356f8123f4f6b00b73c1e1cf0866c7fb (patch) | |
tree | f8479f94a28f66654c6a4f6fb99bad6b4e86a40e /app/workers/bulk_imports | |
parent | 62f7d5c5b69180e82ae8196b7b429eeffc8e7b4f (diff) |
Add latest changes from gitlab-org/gitlab@15-5-stable-eev15.5.0-rc42
Diffstat (limited to 'app/workers/bulk_imports')
-rw-r--r-- | app/workers/bulk_imports/entity_worker.rb | 26 | ||||
-rw-r--r-- | app/workers/bulk_imports/export_request_worker.rb | 89 | ||||
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 54 |
3 files changed, 139 insertions, 30 deletions
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index f6b1c693fe4..ada3210624c 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -15,9 +15,11 @@ module BulkImports if stage_running?(entity_id, current_stage) logger.info( structured_payload( - entity_id: entity_id, + bulk_import_entity_id: entity_id, + bulk_import_id: bulk_import_id(entity_id), current_stage: current_stage, - message: 'Stage running' + message: 'Stage running', + importer: 'gitlab_migration' ) ) @@ -26,9 +28,11 @@ module BulkImports logger.info( structured_payload( - entity_id: entity_id, + bulk_import_entity_id: entity_id, + bulk_import_id: bulk_import_id(entity_id), current_stage: current_stage, - message: 'Stage starting' + message: 'Stage starting', + importer: 'gitlab_migration' ) ) @@ -42,13 +46,17 @@ module BulkImports rescue StandardError => e logger.error( structured_payload( - entity_id: entity_id, + bulk_import_entity_id: entity_id, + bulk_import_id: bulk_import_id(entity_id), current_stage: current_stage, - message: e.message + message: e.message, + importer: 'gitlab_migration' ) ) - Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id) + Gitlab::ErrorTracking.track_exception( + e, bulk_import_entity_id: entity_id, bulk_import_id: bulk_import_id(entity_id), importer: 'gitlab_migration' + ) end private @@ -63,6 +71,10 @@ module BulkImports BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue') end + def bulk_import_id(entity_id) + @bulk_import_id ||= Entity.find(entity_id).bulk_import_id + end + def logger @logger ||= Gitlab::Import::Logger.build end diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index 0d3e4f013dd..a57071ddcf1 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -13,45 +13,112 @@ module BulkImports def perform(entity_id) entity = BulkImports::Entity.find(entity_id) + entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil? + request_export(entity) + + BulkImports::EntityWorker.perform_async(entity_id) rescue BulkImports::NetworkError => e - log_export_failure(e, entity) + if e.retriable?(entity) + retry_request(e, entity) + else + log_export_failure(e, entity) - entity.fail_op! + entity.fail_op! + end end private def request_export(entity) - http_client(entity.bulk_import.configuration).post(entity.export_relations_url_path) + http_client(entity).post(entity.export_relations_url_path) end - def http_client(configuration) + def http_client(entity) @client ||= Clients::HTTP.new( - url: configuration.url, - token: configuration.access_token + url: entity.bulk_import.configuration.url, + token: entity.bulk_import.configuration.access_token ) end def log_export_failure(exception, entity) - attributes = { + Gitlab::Import::Logger.error( + structured_payload( + log_attributes(exception, entity).merge( + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + message: "Request to export #{entity.source_type} failed", + importer: 'gitlab_migration' + ) + ) + ) + + BulkImports::Failure.create(log_attributes(exception, entity)) + end + + def log_attributes(exception, entity) + { bulk_import_entity_id: entity.id, pipeline_class: 'ExportRequestWorker', exception_class: exception.class.to_s, exception_message: exception.message.truncate(255), correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id } + end + + def graphql_client(entity) + @graphql_client ||= BulkImports::Clients::Graphql.new( + url: entity.bulk_import.configuration.url, + token: entity.bulk_import.configuration.access_token + ) + end + + def entity_source_xid(entity) + query = entity_query(entity) + client = graphql_client(entity) + + response = client.execute( + client.parse(query.to_s), + { full_path: entity.source_full_path } + ).original_hash + + ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id + rescue StandardError => e + Gitlab::Import::Logger.error( + structured_payload( + log_attributes(e, entity).merge( + message: 'Failed to fetch source entity id', + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + importer: 'gitlab_migration' + ) + ) + ) + + nil + end + + def entity_query(entity) + if entity.group? + BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil) + else + BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil) + end + end + def retry_request(exception, entity) Gitlab::Import::Logger.error( structured_payload( - attributes.merge( - bulk_import_id: entity.bulk_import.id, - bulk_import_entity_type: entity.source_type + log_attributes(exception, entity).merge( + message: 'Retrying export request', + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + importer: 'gitlab_migration' ) ) ) - BulkImports::Failure.create(attributes) + self.class.perform_in(2.seconds, entity.id) end end end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index e171ec1e194..6d314774cff 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -19,8 +19,11 @@ module BulkImports if pipeline_tracker.present? logger.info( structured_payload( - entity_id: pipeline_tracker.entity.id, - pipeline_name: pipeline_tracker.pipeline_name + bulk_import_entity_id: pipeline_tracker.entity.id, + bulk_import_id: pipeline_tracker.entity.bulk_import_id, + pipeline_name: pipeline_tracker.pipeline_name, + message: 'Pipeline starting', + importer: 'gitlab_migration' ) ) @@ -28,9 +31,11 @@ module BulkImports else logger.error( structured_payload( - entity_id: entity_id, + bulk_import_entity_id: entity_id, + bulk_import_id: bulk_import_id(entity_id), pipeline_tracker_id: pipeline_tracker_id, - message: 'Unstarted pipeline not found' + message: 'Unstarted pipeline not found', + importer: 'gitlab_migration' ) ) end @@ -44,9 +49,10 @@ module BulkImports attr_reader :pipeline_tracker def run - raise(Entity::FailedError, 'Failed entity status') if pipeline_tracker.entity.failed? + return skip_tracker if pipeline_tracker.entity.failed? + raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout? - raise(Pipeline::FailedError, export_status.error) if export_failed? + raise(Pipeline::FailedError, "Export from source instance failed: #{export_status.error}") if export_failed? return re_enqueue if export_empty? || export_started? @@ -59,21 +65,29 @@ module BulkImports fail_tracker(e) end + def bulk_import_id(entity_id) + @bulk_import_id ||= Entity.find(entity_id).bulk_import_id + end + def fail_tracker(exception) pipeline_tracker.update!(status_event: 'fail_op', jid: jid) logger.error( structured_payload( - entity_id: pipeline_tracker.entity.id, + bulk_import_entity_id: pipeline_tracker.entity.id, + bulk_import_id: pipeline_tracker.entity.bulk_import_id, pipeline_name: pipeline_tracker.pipeline_name, - message: exception.message + message: exception.message, + importer: 'gitlab_migration' ) ) Gitlab::ErrorTracking.track_exception( exception, - entity_id: pipeline_tracker.entity.id, - pipeline_name: pipeline_tracker.pipeline_name + bulk_import_entity_id: pipeline_tracker.entity.id, + bulk_import_id: pipeline_tracker.entity.bulk_import_id, + pipeline_name: pipeline_tracker.pipeline_name, + importer: 'gitlab_migration' ) BulkImports::Failure.create( @@ -138,9 +152,11 @@ module BulkImports def retry_tracker(exception) logger.error( structured_payload( - entity_id: pipeline_tracker.entity.id, + bulk_import_entity_id: pipeline_tracker.entity.id, + bulk_import_id: pipeline_tracker.entity.bulk_import_id, pipeline_name: pipeline_tracker.pipeline_name, - message: "Retrying error: #{exception.message}" + message: "Retrying error: #{exception.message}", + importer: 'gitlab_migration' ) ) @@ -148,5 +164,19 @@ module BulkImports re_enqueue(exception.retry_delay) end + + def skip_tracker + logger.info( + structured_payload( + bulk_import_entity_id: pipeline_tracker.entity.id, + bulk_import_id: pipeline_tracker.entity.bulk_import_id, + pipeline_name: pipeline_tracker.pipeline_name, + message: 'Skipping pipeline due to failed entity', + importer: 'gitlab_migration' + ) + ) + + pipeline_tracker.update!(status_event: 'skip', jid: jid) + end end end |