Welcome to mirror list, hosted at ThFree Co, Russian Federation.

application_worker.rb « concerns « workers « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 3399a4f9b578c3fb3852fb356b84046fed0872ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# frozen_string_literal: true

require 'sidekiq/api'

Sidekiq::Worker.extend ActiveSupport::Concern

module ApplicationWorker
  extend ActiveSupport::Concern

  include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
  include WorkerAttributes
  include WorkerContext
  include Gitlab::SidekiqVersioning::Worker

  LOGGING_EXTRA_KEY = 'extra'
  DEFAULT_DELAY_INTERVAL = 1

  included do
    set_queue
    after_set_class_attribute { set_queue }

    def structured_payload(payload = {})
      context = Gitlab::ApplicationContext.current.merge(
        'class' => self.class.name,
        'job_status' => 'running',
        'queue' => self.class.queue,
        'jid' => jid
      )

      payload.stringify_keys.merge(context)
    end

    def log_extra_metadata_on_done(key, value)
      @done_log_extra_metadata ||= {}
      @done_log_extra_metadata[key] = value
    end

    def logging_extras
      return {} unless @done_log_extra_metadata

      # Prefix keys with class name to avoid conflicts in Elasticsearch types.
      # Also prefix with "extra." so that we know to log these new fields.
      @done_log_extra_metadata.transform_keys do |k|
        "#{LOGGING_EXTRA_KEY}.#{self.class.name.gsub("::", "_").underscore}.#{k}"
      end
    end
  end

  class_methods do
    extend ::Gitlab::Utils::Override

    def inherited(subclass)
      subclass.set_queue
      subclass.after_set_class_attribute { subclass.set_queue }
    end

    def generated_queue_name
      Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
    end

    def validate_worker_attributes!
      # Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism
      # is not disabled.
      if retry_disabled? && get_data_consistency == :delayed
        raise ArgumentError, "Retry support cannot be disabled if data_consistency is set to :delayed"
      end
    end

    # Checks if sidekiq retry support is disabled
    def retry_disabled?
      get_sidekiq_options['retry'] == 0 || get_sidekiq_options['retry'] == false
    end

    override :sidekiq_options
    def sidekiq_options(opts = {})
      super.tap do
        validate_worker_attributes!
      end
    end

    override :data_consistency
    def data_consistency(data_consistency, feature_flag: nil)
      super

      validate_worker_attributes!
    end

    def perform_async(*args)
      # Worker execution for workers with data_consistency set to :delayed or :sticky
      # will be delayed to give replication enough time to complete
      if utilizes_load_balancing_capabilities?
        perform_in(delay_interval, *args)
      else
        super
      end
    end

    def set_queue
      queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
      sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
    end

    def queue_namespace(new_namespace = nil)
      if new_namespace
        sidekiq_options queue_namespace: new_namespace

        set_queue
      else
        get_sidekiq_options['queue_namespace']&.to_s
      end
    end

    def queue
      get_sidekiq_options['queue'].to_s
    end

    # Set/get which arguments can be logged and sent to Sentry.
    #
    # Numeric arguments are logged by default, so there is no need to
    # list those.
    #
    # Non-numeric arguments must be listed by position, as Sidekiq
    # cannot see argument names.
    #
    def loggable_arguments(*args)
      if args.any?
        @loggable_arguments = args
      else
        @loggable_arguments || []
      end
    end

    def queue_size
      Sidekiq::Queue.new(queue).size
    end

    def bulk_perform_async(args_list)
      Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
    end

    def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
      now = Time.now.to_i
      schedule = now + delay.to_i

      if schedule <= now
        raise ArgumentError, _('The schedule time must be in the future!')
      end

      if batch_size && batch_delay
        args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx|
          batch_schedule = schedule + idx * batch_delay.to_i
          Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
        end
      else
        Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
      end
    end

    protected

    def delay_interval
      DEFAULT_DELAY_INTERVAL.seconds
    end
  end
end