Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb')
-rw-r--r--vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb269
1 files changed, 269 insertions, 0 deletions
diff --git a/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb b/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb
new file mode 100644
index 00000000000..e9c9f050982
--- /dev/null
+++ b/vendor/gems/sidekiq-reliable-fetch/lib/sidekiq/base_reliable_fetch.rb
@@ -0,0 +1,269 @@
+# frozen_string_literal: true
+
+require_relative 'interrupted_set'
+
+module Sidekiq
+ class BaseReliableFetch
+ DEFAULT_CLEANUP_INTERVAL = 60 * 60 # 1 hour
+ HEARTBEAT_INTERVAL = 20 # seconds
+ HEARTBEAT_LIFESPAN = 60 # seconds
+ HEARTBEAT_RETRY_DELAY = 1 # seconds
+ WORKING_QUEUE_PREFIX = 'working'
+
+ # Defines how often we try to take a lease to not flood our
+ # Redis server with SET requests
+ DEFAULT_LEASE_INTERVAL = 2 * 60 # seconds
+ LEASE_KEY = 'reliable-fetcher-cleanup-lock'
+
+ # Defines the COUNT parameter that will be passed to Redis SCAN command
+ SCAN_COUNT = 1000
+
+ # How much time a job can be interrupted
+ DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION = 3
+
+ # Regexes for matching working queue keys
+ WORKING_QUEUE_REGEX = /#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*:[0-9a-f]*)\z/.freeze
+ LEGACY_WORKING_QUEUE_REGEX = /#{WORKING_QUEUE_PREFIX}:(queue:.*):([^:]*:[0-9]*)\z/.freeze
+
+ UnitOfWork = Struct.new(:queue, :job) do
+ def acknowledge
+ Sidekiq.redis { |conn| conn.lrem(Sidekiq::BaseReliableFetch.working_queue_name(queue), 1, job) }
+ end
+
+ def queue_name
+ queue.sub(/.*queue:/, '')
+ end
+
+ def requeue
+ Sidekiq.redis do |conn|
+ conn.multi do |multi|
+ multi.lpush(queue, job)
+ multi.lrem(Sidekiq::BaseReliableFetch.working_queue_name(queue), 1, job)
+ end
+ end
+ end
+ end
+
+ def self.setup_reliable_fetch!(config)
+ config = config.options unless config.respond_to?(:[])
+
+ fetch_strategy = if config[:semi_reliable_fetch]
+ Sidekiq::SemiReliableFetch
+ else
+ Sidekiq::ReliableFetch
+ end
+
+ config[:fetch] = fetch_strategy.new(config)
+
+ Sidekiq.logger.info('GitLab reliable fetch activated!')
+
+ start_heartbeat_thread
+ end
+
+ def self.start_heartbeat_thread
+ Thread.new do
+ loop do
+ begin
+ heartbeat
+
+ sleep HEARTBEAT_INTERVAL
+ rescue => e
+ Sidekiq.logger.error("Heartbeat thread error: #{e.message}")
+
+ sleep HEARTBEAT_RETRY_DELAY
+ end
+ end
+ end
+ end
+
+ def self.hostname
+ Socket.gethostname
+ end
+
+ def self.process_nonce
+ @@process_nonce ||= SecureRandom.hex(6)
+ end
+
+ def self.identity
+ @@identity ||= "#{hostname}:#{$$}:#{process_nonce}"
+ end
+
+ def self.heartbeat
+ Sidekiq.redis do |conn|
+ conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN)
+ end
+
+ Sidekiq.logger.debug("Heartbeat for #{identity}")
+ end
+
+ def self.worker_dead?(identity, conn)
+ !conn.get(heartbeat_key(identity))
+ end
+
+ def self.heartbeat_key(identity)
+ "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}"
+ end
+
+ def self.working_queue_name(queue)
+ "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}"
+ end
+
+ attr_reader :cleanup_interval, :last_try_to_take_lease_at, :lease_interval,
+ :queues, :use_semi_reliable_fetch,
+ :strictly_ordered_queues
+
+ def initialize(options)
+ raise ArgumentError, 'missing queue list' unless options[:queues]
+
+ @config = options
+ @interrupted_set = Sidekiq::InterruptedSet.new
+ @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL)
+ @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL)
+ @last_try_to_take_lease_at = 0
+ @strictly_ordered_queues = !!options[:strict]
+ @queues = options[:queues].map { |q| "queue:#{q}" }
+ end
+
+ def retrieve_work
+ clean_working_queues! if take_lease
+
+ retrieve_unit_of_work
+ end
+
+ def retrieve_unit_of_work
+ raise NotImplementedError,
+ "#{self.class} does not implement #{__method__}"
+ end
+
+ def bulk_requeue(inprogress, _options)
+ return if inprogress.empty?
+
+ Sidekiq.redis do |conn|
+ inprogress.each do |unit_of_work|
+ conn.multi do |multi|
+ preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi)
+
+ multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job)
+ end
+ end
+ end
+ rescue => e
+ Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}")
+ end
+
+ private
+
+ def preprocess_interrupted_job(job, queue, conn = nil)
+ msg = Sidekiq.load_json(job)
+ msg['interrupted_count'] = msg['interrupted_count'].to_i + 1
+
+ if interruption_exhausted?(msg)
+ send_to_quarantine(msg, conn)
+ else
+ requeue_job(queue, msg, conn)
+ end
+ end
+
+ # If you want this method to be run in a scope of multi connection
+ # you need to pass it
+ def requeue_job(queue, msg, conn)
+ with_connection(conn) do |conn|
+ conn.lpush(queue, Sidekiq.dump_json(msg))
+ end
+
+ Sidekiq.logger.info(
+ message: "Pushed job #{msg['jid']} back to queue #{queue}",
+ jid: msg['jid'],
+ queue: queue
+ )
+ end
+
+ def extract_queue_and_identity(key)
+ # New identity format is "{hostname}:{pid}:{randomhex}
+ # Old identity format is "{hostname}:{pid}"
+ # Queue names may also have colons (namespaced).
+ # Expressing this in a single regex is unreadable
+
+ # Test the newer expected format first, only checking the older if necessary
+ original_queue, identity = key.scan(WORKING_QUEUE_REGEX).flatten
+ return original_queue, identity unless original_queue.nil? || identity.nil?
+
+ key.scan(LEGACY_WORKING_QUEUE_REGEX).flatten
+ end
+
+ # Detect "old" jobs and requeue them because the worker they were assigned
+ # to probably failed miserably.
+ def clean_working_queues!
+ Sidekiq.logger.info('Cleaning working queues')
+
+ Sidekiq.redis do |conn|
+ conn.scan_each(match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT) do |key|
+ original_queue, identity = extract_queue_and_identity(key)
+
+ next if original_queue.nil? || identity.nil?
+
+ clean_working_queue!(original_queue, key) if self.class.worker_dead?(identity, conn)
+ end
+ end
+ end
+
+ def clean_working_queue!(original_queue, working_queue)
+ Sidekiq.redis do |conn|
+ while job = conn.rpop(working_queue)
+ preprocess_interrupted_job(job, original_queue)
+ end
+ end
+ end
+
+ def interruption_exhausted?(msg)
+ return false if max_retries_after_interruption(msg['class']) < 0
+
+ msg['interrupted_count'].to_i >= max_retries_after_interruption(msg['class'])
+ end
+
+ def max_retries_after_interruption(worker_class)
+ max_retries_after_interruption = nil
+
+ max_retries_after_interruption ||= begin
+ Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption]
+ rescue NameError
+ end
+
+ max_retries_after_interruption ||= @config[:max_retries_after_interruption]
+ max_retries_after_interruption ||= DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION
+ max_retries_after_interruption
+ end
+
+ def send_to_quarantine(msg, multi_connection = nil)
+ Sidekiq.logger.warn(
+ class: msg['class'],
+ jid: msg['jid'],
+ message: %(Reliable Fetcher: adding dead #{msg['class']} job #{msg['jid']} to interrupted queue)
+ )
+
+ job = Sidekiq.dump_json(msg)
+ @interrupted_set.put(job, connection: multi_connection)
+ end
+
+ # Yield block with an existing connection or creates another one
+ def with_connection(conn)
+ return yield(conn) if conn
+
+ Sidekiq.redis { |redis_conn| yield(redis_conn) }
+ end
+
+ def take_lease
+ return unless allowed_to_take_a_lease?
+
+ @last_try_to_take_lease_at = Time.now.to_f
+
+ Sidekiq.redis do |conn|
+ conn.set(LEASE_KEY, 1, nx: true, ex: cleanup_interval)
+ end
+ end
+
+ def allowed_to_take_a_lease?
+ Time.now.to_f - last_try_to_take_lease_at > lease_interval
+ end
+ end
+end