blob: e65d9b6324aea84a17f6efae700297e29392ffad (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
# frozen_string_literal: true
module Sidekiq
class SemiReliableFetch < BaseReliableFetch
# We want the fetch operation to timeout every few seconds so the thread
# can check if the process is shutting down. This constant is only used
# for semi-reliable fetch.
DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT = 5 # seconds
def initialize(options)
super
@queues = @queues.uniq
end
private
def retrieve_unit_of_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd, timeout: semi_reliable_fetch_timeout) }
return unless work
queue, job = work
unit_of_work = UnitOfWork.new(queue, job)
Sidekiq.redis do |conn|
conn.lpush(self.class.working_queue_name(unit_of_work.queue), unit_of_work.job)
end
unit_of_work
end
def queues_cmd
if strictly_ordered_queues
@queues
else
@queues.shuffle
end
end
def semi_reliable_fetch_timeout
@semi_reliable_fetch_timeout ||= ENV['SIDEKIQ_SEMI_RELIABLE_FETCH_TIMEOUT']&.to_i || DEFAULT_SEMI_RELIABLE_FETCH_TIMEOUT
end
end
end
|