Welcome to mirror list, hosted at ThFree Co, Russian Federation.

pipeline_batch_worker.rb « bulk_imports « workers « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 957669d4a66d8e43074e6bda1e631c586b19d766 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# frozen_string_literal: true

module BulkImports
  class PipelineBatchWorker
    include ApplicationWorker
    include ExclusiveLeaseGuard

    DEFER_ON_HEALTH_DELAY = 5.minutes

    data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency
    feature_category :importers
    sidekiq_options dead: false, retry: 3
    worker_has_external_dependencies!
    worker_resource_boundary :memory
    idempotent!

    sidekiq_retries_exhausted do |msg, exception|
      new.perform_failure(msg['args'].first, exception)
    end

    defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables|
      batch = ::BulkImports::BatchTracker.find(job_args.first)
      pipeline_tracker = batch.tracker
      pipeline_schema = ::BulkImports::PipelineSchemaInfo.new(
        pipeline_tracker.pipeline_class,
        pipeline_tracker.entity.portable_class
      )

      if pipeline_schema.db_schema && pipeline_schema.db_table
        schema = pipeline_schema.db_schema
        tables = [pipeline_schema.db_table]
      end

      [schema, tables]
    end

    def self.defer_on_database_health_signal?
      Feature.enabled?(:bulk_import_deferred_workers)
    end

    def perform(batch_id)
      @batch = ::BulkImports::BatchTracker.find(batch_id)

      @tracker = @batch.tracker
      @entity = @tracker.entity
      @pending_retry = false

      return unless process_batch?

      log_extra_metadata_on_done(:pipeline_class, @tracker.pipeline_name)

      try_obtain_lease { run }
    ensure
      unless pending_retry
        with_context(bulk_import_entity_id: entity.id) do
          ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
        end
      end
    end

    def perform_failure(batch_id, exception)
      @batch = ::BulkImports::BatchTracker.find(batch_id)
      @tracker = @batch.tracker

      fail_batch(exception)
    end

    private

    attr_reader :batch, :tracker, :pending_retry, :entity

    def run
      return batch.skip! if tracker.failed? || tracker.finished?

      logger.info(log_attributes(message: 'Batch tracker started'))
      batch.start!
      tracker.pipeline_class.new(context).run
      batch.finish!
      logger.info(log_attributes(message: 'Batch tracker finished'))
    rescue BulkImports::RetryPipelineError => e
      @pending_retry = true
      retry_batch(e)
    end

    def fail_batch(exception)
      batch.fail_op!

      Gitlab::ErrorTracking.track_exception(exception, log_attributes(message: 'Batch tracker failed'))

      BulkImports::Failure.create(
        bulk_import_entity_id: tracker.entity.id,
        pipeline_class: tracker.pipeline_name,
        pipeline_step: 'pipeline_batch_worker_run',
        exception_class: exception.class.to_s,
        exception_message: exception.message,
        correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
      )

      with_context(bulk_import_entity_id: tracker.entity.id) do
        ::BulkImports::FinishBatchedPipelineWorker.perform_async(tracker.id)
      end
    end

    def context
      @context ||= ::BulkImports::Pipeline::Context.new(tracker, batch_number: batch.batch_number)
    end

    def retry_batch(exception)
      batch.retry!

      re_enqueue(exception.retry_delay)
    end

    def lease_timeout
      30
    end

    def lease_key
      "gitlab:bulk_imports:pipeline_batch_worker:#{batch.id}"
    end

    def re_enqueue(delay = FILE_EXTRACTION_PIPELINE_PERFORM_DELAY)
      log_extra_metadata_on_done(:re_enqueue, true)

      with_context(bulk_import_entity_id: entity.id) do
        self.class.perform_in(delay, batch.id)
      end
    end

    def process_batch?
      batch.created? || batch.started?
    end

    def logger
      @logger ||= Logger.build
    end

    def log_attributes(extra = {})
      structured_payload(
        {
          batch_id: batch.id,
          batch_number: batch.batch_number,
          tracker_id: tracker.id,
          bulk_import_id: tracker.entity.bulk_import_id,
          bulk_import_entity_id: tracker.entity.id,
          pipeline_class: tracker.pipeline_name,
          pipeline_step: 'pipeline_batch_worker_run',
          importer: Logger::IMPORTER_NAME
        }.merge(extra)
      )
    end
  end
end