Welcome to mirror list, hosted at ThFree Co, Russian Federation.

semi_reliable_fetch.rb « sidekiq « lib « sidekiq-reliable-fetch « gems « vendor - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: e65d9b6324aea84a17f6efae700297e29392ffad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# frozen_string_literal: true

module Sidekiq
  class SemiReliableFetch < BaseReliableFetch
    # We want the fetch operation to timeout every few seconds so the thread
    # can check if the process is shutting down. This constant is only used
    # for semi-reliable fetch.
    DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT = 5 # seconds

    def initialize(options)
      super

      @queues = @queues.uniq
    end

    private

    def retrieve_unit_of_work
      work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd, timeout: semi_reliable_fetch_timeout) }
      return unless work

      queue, job = work
      unit_of_work = UnitOfWork.new(queue, job)

      Sidekiq.redis do |conn|
        conn.lpush(self.class.working_queue_name(unit_of_work.queue), unit_of_work.job)
      end

      unit_of_work
    end

    def queues_cmd
      if strictly_ordered_queues
        @queues
      else
        @queues.shuffle
      end
    end

    def semi_reliable_fetch_timeout
      @semi_reliable_fetch_timeout ||= ENV['SIDEKIQ_SEMI_RELIABLE_FETCH_TIMEOUT']&.to_i || DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT
    end
  end
end