Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/sidekiq_cluster.rb')
-rw-r--r--lib/gitlab/sidekiq_cluster.rb171
1 files changed, 0 insertions, 171 deletions
diff --git a/lib/gitlab/sidekiq_cluster.rb b/lib/gitlab/sidekiq_cluster.rb
deleted file mode 100644
index cc1bd282da8..00000000000
--- a/lib/gitlab/sidekiq_cluster.rb
+++ /dev/null
@@ -1,171 +0,0 @@
-# frozen_string_literal: true
-
-require 'shellwords'
-
-module Gitlab
- module SidekiqCluster
- # The signals that should terminate both the master and workers.
- TERMINATE_SIGNALS = %i(INT TERM).freeze
-
- # The signals that should simply be forwarded to the workers.
- FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP).freeze
-
- # Traps the given signals and yields the block whenever these signals are
- # received.
- #
- # The block is passed the name of the signal.
- #
- # Example:
- #
- # trap_signals(%i(HUP TERM)) do |signal|
- # ...
- # end
- def self.trap_signals(signals)
- signals.each do |signal|
- trap(signal) do
- yield signal
- end
- end
- end
-
- def self.trap_terminate(&block)
- trap_signals(TERMINATE_SIGNALS, &block)
- end
-
- def self.trap_forward(&block)
- trap_signals(FORWARD_SIGNALS, &block)
- end
-
- def self.signal(pid, signal)
- Process.kill(signal, pid)
- true
- rescue Errno::ESRCH
- false
- end
-
- def self.signal_processes(pids, signal)
- pids.each { |pid| signal(pid, signal) }
- end
-
- # Starts Sidekiq workers for the pairs of processes.
- #
- # Example:
- #
- # start([ ['foo'], ['bar', 'baz'] ], :production)
- #
- # This would start two Sidekiq processes: one processing "foo", and one
- # processing "bar" and "baz". Each one is placed in its own process group.
- #
- # queues - An Array containing Arrays. Each sub Array should specify the
- # queues to use for a single process.
- #
- # directory - The directory of the Rails application.
- #
- # Returns an Array containing the PIDs of the started processes.
- def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, min_concurrency: 0, timeout: CLI::DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
- queues.map.with_index do |pair, index|
- start_sidekiq(pair, env: env,
- directory: directory,
- max_concurrency: max_concurrency,
- min_concurrency: min_concurrency,
- worker_id: index,
- timeout: timeout,
- dryrun: dryrun)
- end
- end
-
- # Starts a Sidekiq process that processes _only_ the given queues.
- #
- # Returns the PID of the started process.
- def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, timeout:, dryrun:)
- counts = count_by_queue(queues)
-
- cmd = %w[bundle exec sidekiq]
- cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency)}"
- cmd << "-e#{env}"
- cmd << "-t#{timeout}"
- cmd << "-gqueues:#{proc_details(counts)}"
- cmd << "-r#{directory}"
-
- counts.each do |queue, count|
- cmd << "-q#{queue},#{count}"
- end
-
- if dryrun
- puts Shellwords.join(cmd) # rubocop:disable Rails/Output
- return
- end
-
- pid = Process.spawn(
- { 'ENABLE_SIDEKIQ_CLUSTER' => '1',
- 'SIDEKIQ_WORKER_ID' => worker_id.to_s },
- *cmd,
- pgroup: true,
- err: $stderr,
- out: $stdout
- )
-
- wait_async(pid)
-
- pid
- end
-
- def self.count_by_queue(queues)
- queues.tally
- end
-
- def self.proc_details(counts)
- counts.map do |queue, count|
- if count == 1
- queue
- else
- "#{queue} (#{count})"
- end
- end.join(',')
- end
-
- def self.concurrency(queues, min_concurrency, max_concurrency)
- concurrency_from_queues = queues.length + 1
- max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues
- min = [min_concurrency, max].min
-
- concurrency_from_queues.clamp(min, max)
- end
-
- # Waits for the given process to complete using a separate thread.
- def self.wait_async(pid)
- Thread.new do
- Process.wait(pid) rescue Errno::ECHILD
- end
- end
-
- # Returns true if all the processes are alive.
- def self.all_alive?(pids)
- pids.each do |pid|
- return false unless process_alive?(pid)
- end
-
- true
- end
-
- def self.any_alive?(pids)
- pids_alive(pids).any?
- end
-
- def self.pids_alive(pids)
- pids.select { |pid| process_alive?(pid) }
- end
-
- def self.process_alive?(pid)
- # Signal 0 tests whether the process exists and we have access to send signals
- # but is otherwise a noop (doesn't actually send a signal to the process)
- signal(pid, 0)
- end
-
- def self.write_pid(path)
- File.open(path, 'w') do |handle|
- handle.write(Process.pid.to_s)
- end
- end
- end
-end