diff options
Diffstat (limited to 'vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb')
-rw-r--r-- | vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb b/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb new file mode 100644 index 00000000000..e9c9f050982 --- /dev/null +++ b/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb @@ -0,0 +1,269 @@ +# frozen_string_literal: true + +require_relative 'interrupted_set' + +module Sidekiq + class BaseReliableFetch + DEFAULT_CLEANUP_INTERVAL = 60 * 60 # 1 hour + HEARTBEAT_INTERVAL = 20 # seconds + HEARTBEAT_LIFESPAN = 60 # seconds + HEARTBEAT_RETRY_DELAY = 1 # seconds + WORKING_QUEUE_PREFIX = 'working' + + # Defines how often we try to take a lease to not flood our + # Redis server with SET requests + DEFAULT_LEASE_INTERVAL = 2 * 60 # seconds + LEASE_KEY = 'reliable-fetcher-cleanup-lock' + + # Defines the COUNT parameter that will be passed to Redis SCAN command + SCAN_COUNT = 1000 + + # How much time a job can be interrupted + DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION = 3 + + # Regexes for matching working queue keys + WORKING_QUEUE_REGEX = /#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/.freeze + LEGACY_WORKING_QUEUE_REGEX = /#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*)\z/.freeze + + UnitOfWork = Struct.new(:queue, :job) do + def acknowledge + Sidekiq.redis { |conn| conn.lrem(Sidekiq::BaseReliableFetch.working_queue_name(queue), 1, job) } + end + + def queue_name + queue.sub(/.*queue:/, '') + end + + def requeue + Sidekiq.redis do |conn| + conn.multi do |multi| + multi.lpush(queue, job) + multi.lrem(Sidekiq::BaseReliableFetch.working_queue_name(queue), 1, job) + end + end + end + end + + def self.setup_reliable_fetch!(config) + config = config.options unless config.respond_to?(:[]) + + fetch_strategy = if config[:semi_reliable_fetch] + Sidekiq::SemiReliableFetch + else + Sidekiq::ReliableFetch + end + + config[:fetch] = fetch_strategy.new(config) + + Sidekiq.logger.info('GitLab reliable fetch activated!') + + start_heartbeat_thread + end + + def self.start_heartbeat_thread + Thread.new do + loop do + begin + heartbeat + + sleep HEARTBEAT_INTERVAL + rescue => e + Sidekiq.logger.error("Heartbeat thread error: #{e.message}") + + sleep HEARTBEAT_RETRY_DELAY + end + end + end + end + + def self.hostname + Socket.gethostname + end + + def self.process_nonce + @@process_nonce ||= SecureRandom.hex(6) + end + + def self.identity + @@identity ||= "#{hostname}:#{$$}:#{process_nonce}" + end + + def self.heartbeat + Sidekiq.redis do |conn| + conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN) + end + + Sidekiq.logger.debug("Heartbeat for #{identity}") + end + + def self.worker_dead?(identity, conn) + !conn.get(heartbeat_key(identity)) + end + + def self.heartbeat_key(identity) + "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}" + end + + def self.working_queue_name(queue) + "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}" + end + + attr_reader :cleanup_interval, :last_try_to_take_lease_at, :lease_interval, + :queues, :use_semi_reliable_fetch, + :strictly_ordered_queues + + def initialize(options) + raise ArgumentError, 'missing queue list' unless options[:queues] + + @config = options + @interrupted_set = Sidekiq::InterruptedSet.new + @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL) + @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL) + @last_try_to_take_lease_at = 0 + @strictly_ordered_queues = !!options[:strict] + @queues = options[:queues].map { |q| "queue:#{q}" } + end + + def retrieve_work + clean_working_queues! if take_lease + + retrieve_unit_of_work + end + + def retrieve_unit_of_work + raise NotImplementedError, + "#{self.class} does not implement #{__method__}" + end + + def bulk_requeue(inprogress, _options) + return if inprogress.empty? + + Sidekiq.redis do |conn| + inprogress.each do |unit_of_work| + conn.multi do |multi| + preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi) + + multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job) + end + end + end + rescue => e + Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}") + end + + private + + def preprocess_interrupted_job(job, queue, conn = nil) + msg = Sidekiq.load_json(job) + msg['interrupted_count'] = msg['interrupted_count'].to_i + 1 + + if interruption_exhausted?(msg) + send_to_quarantine(msg, conn) + else + requeue_job(queue, msg, conn) + end + end + + # If you want this method to be run in a scope of multi connection + # you need to pass it + def requeue_job(queue, msg, conn) + with_connection(conn) do |conn| + conn.lpush(queue, Sidekiq.dump_json(msg)) + end + + Sidekiq.logger.info( + message: "Pushed job #{msg['jid']} back to queue #{queue}", + jid: msg['jid'], + queue: queue + ) + end + + def extract_queue_and_identity(key) + # New identity format is "{hostname}:{pid}:{randomhex} + # Old identity format is "{hostname}:{pid}" + # Queue names may also have colons (namespaced). + # Expressing this in a single regex is unreadable + + # Test the newer expected format first, only checking the older if necessary + original_queue, identity = key.scan(WORKING_QUEUE_REGEX).flatten + return original_queue, identity unless original_queue.nil? || identity.nil? + + key.scan(LEGACY_WORKING_QUEUE_REGEX).flatten + end + + # Detect "old" jobs and requeue them because the worker they were assigned + # to probably failed miserably. + def clean_working_queues! + Sidekiq.logger.info('Cleaning working queues') + + Sidekiq.redis do |conn| + conn.scan_each(match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT) do |key| + original_queue, identity = extract_queue_and_identity(key) + + next if original_queue.nil? || identity.nil? + + clean_working_queue!(original_queue, key) if self.class.worker_dead?(identity, conn) + end + end + end + + def clean_working_queue!(original_queue, working_queue) + Sidekiq.redis do |conn| + while job = conn.rpop(working_queue) + preprocess_interrupted_job(job, original_queue) + end + end + end + + def interruption_exhausted?(msg) + return false if max_retries_after_interruption(msg['class']) < 0 + + msg['interrupted_count'].to_i >= max_retries_after_interruption(msg['class']) + end + + def max_retries_after_interruption(worker_class) + max_retries_after_interruption = nil + + max_retries_after_interruption ||= begin + Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption] + rescue NameError + end + + max_retries_after_interruption ||= @config[:max_retries_after_interruption] + max_retries_after_interruption ||= DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION + max_retries_after_interruption + end + + def send_to_quarantine(msg, multi_connection = nil) + Sidekiq.logger.warn( + class: msg['class'], + jid: msg['jid'], + message: %(Reliable Fetcher: adding dead #{msg['class']} job #{msg['jid']} to interrupted queue) + ) + + job = Sidekiq.dump_json(msg) + @interrupted_set.put(job, connection: multi_connection) + end + + # Yield block with an existing connection or creates another one + def with_connection(conn) + return yield(conn) if conn + + Sidekiq.redis { |redis_conn| yield(redis_conn) } + end + + def take_lease + return unless allowed_to_take_a_lease? + + @last_try_to_take_lease_at = Time.now.to_f + + Sidekiq.redis do |conn| + conn.set(LEASE_KEY, 1, nx: true, ex: cleanup_interval) + end + end + + def allowed_to_take_a_lease? + Time.now.to_f - last_try_to_take_lease_at > lease_interval + end + end +end |