Welcome to mirror list, hosted at ThFree Co, Russian Federation.

export_request_worker.rb « bulk_imports « workers « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 1a5f625042936f65113ec1c2e9c345f0a0083d0a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# frozen_string_literal: true

module BulkImports
  class ExportRequestWorker
    include ApplicationWorker

    data_consistency :always

    idempotent!
    worker_has_external_dependencies!
    feature_category :importers

    def perform(entity_id)
      entity = BulkImports::Entity.find(entity_id)

      entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil?

      request_export(entity)

      BulkImports::EntityWorker.perform_async(entity_id)
    rescue BulkImports::NetworkError => e
      if e.retriable?(entity)
        retry_request(e, entity)
      else
        log_exception(e,
          {
            bulk_import_entity_id: entity.id,
            bulk_import_id: entity.bulk_import_id,
            bulk_import_entity_type: entity.source_type,
            source_full_path: entity.source_full_path,
            message: "Request to export #{entity.source_type} failed",
            source_version: entity.bulk_import.source_version_info.to_s,
            importer: 'gitlab_migration'
          }
        )

        BulkImports::Failure.create(failure_attributes(e, entity))

        entity.fail_op!
      end
    end

    private

    def request_export(entity)
      http_client(entity).post(entity.export_relations_url_path)
    end

    def http_client(entity)
      @client ||= Clients::HTTP.new(
        url: entity.bulk_import.configuration.url,
        token: entity.bulk_import.configuration.access_token
      )
    end

    def failure_attributes(exception, entity)
      {
        bulk_import_entity_id: entity.id,
        pipeline_class: 'ExportRequestWorker',
        exception_class: exception.class.to_s,
        exception_message: exception.message.truncate(255),
        correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
      }
    end

    def graphql_client(entity)
      @graphql_client ||= BulkImports::Clients::Graphql.new(
        url: entity.bulk_import.configuration.url,
        token: entity.bulk_import.configuration.access_token
      )
    end

    def entity_source_xid(entity)
      query = entity_query(entity)
      client = graphql_client(entity)

      response = client.execute(
        client.parse(query.to_s),
        { full_path: entity.source_full_path }
      ).original_hash

      ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id
    rescue StandardError => e
      log_exception(e,
        {
          message: 'Failed to fetch source entity id',
          bulk_import_entity_id: entity.id,
          bulk_import_id: entity.bulk_import_id,
          bulk_import_entity_type: entity.source_type,
          source_full_path: entity.source_full_path,
          source_version: entity.bulk_import.source_version_info.to_s,
          importer: 'gitlab_migration'
        }
      )

      nil
    end

    def entity_query(entity)
      if entity.group?
        BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
      else
        BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
      end
    end

    def retry_request(exception, entity)
      log_exception(exception,
        {
          message: 'Retrying export request',
          bulk_import_entity_id: entity.id,
          bulk_import_id: entity.bulk_import_id,
          bulk_import_entity_type: entity.source_type,
          source_full_path: entity.source_full_path,
          source_version: entity.bulk_import.source_version_info.to_s,
          importer: 'gitlab_migration'
        }
      )

      self.class.perform_in(2.seconds, entity.id)
    end

    def logger
      @logger ||= Gitlab::Import::Logger.build
    end

    def log_exception(exception, payload)
      Gitlab::ExceptionLogFormatter.format!(exception, payload)

      logger.error(structured_payload(payload))
    end
  end
end