Welcome to mirror list, hosted at ThFree Co, Russian Federation.

semi_reliable_fetch.rb « sidekiq « lib « sidekiq-reliable-fetch « gems « vendor - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: b9855100fb659f1200139fba2abb23d8fd12ce3e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# frozen_string_literal: true

module Sidekiq
  class SemiReliableFetch < BaseReliableFetch
    # We want the fetch operation to timeout every few seconds so the thread
    # can check if the process is shutting down. This constant is only used
    # for semi-reliable fetch.
    DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT = 5 # seconds

    def initialize(capsule)
      super

      @queues = @queues.uniq
    end

    private

    def retrieve_unit_of_work
      work = brpop_with_sidekiq
      return unless work

      queue, job = work
      unit_of_work = UnitOfWork.new(queue, job)

      Sidekiq.redis do |conn|
        conn.lpush(self.class.working_queue_name(unit_of_work.queue), unit_of_work.job)
      end

      unit_of_work
    end

    def brpop_with_sidekiq
      Sidekiq.redis do |conn|
        conn.blocking_call(
          conn.read_timeout + semi_reliable_fetch_timeout,
          "brpop",
          *queues_cmd,
          semi_reliable_fetch_timeout
        )
      end
    end

    def queues_cmd
      if strictly_ordered_queues
        @queues
      else
        @queues.shuffle
      end
    end

    def semi_reliable_fetch_timeout
      @semi_reliable_fetch_timeout ||= ENV['SIDEKIQ_SEMI_RELIABLE_FETCH_TIMEOUT']&.to_i || DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT
    end
  end
end