Welcome to mirror list, hosted at ThFree Co, Russian Federation.

sidekiq_cluster.rb « sidekiq_cluster - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c68cbe7c16304ffff1ed91c695fe44a49bad4a24 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# frozen_string_literal: true

require_relative '../lib/gitlab/process_management'

module Gitlab
  module SidekiqCluster
    # How long to wait when asking for a clean termination.
    # It maps the Sidekiq default timeout:
    # https://github.com/mperham/sidekiq/wiki/Signals#term
    #
    # This value is passed to Sidekiq's `-t` if none
    # is given through arguments.
    DEFAULT_SOFT_TIMEOUT_SECONDS = 25

    # Additional time granted after surpassing the soft timeout
    # before we kill the process.
    TIMEOUT_GRACE_PERIOD_SECONDS = 5

    # The singleton instance used to supervise cluster processes.
    SidekiqProcessSupervisor = Class.new(Gitlab::ProcessSupervisor)

    # Starts Sidekiq workers for the pairs of processes.
    #
    # Example:
    #
    #     start([ ['foo'], ['bar', 'baz'] ], :production)
    #
    # This would start two Sidekiq processes: one processing "foo", and one
    # processing "bar" and "baz". Each one is placed in its own process group.
    #
    # queues - An Array containing Arrays. Each sub Array should specify the
    #          queues to use for a single process.
    #
    # directory - The directory of the Rails application.
    #
    # Returns an Array containing the PIDs of the started processes.
    def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 50, min_concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
      queues.map.with_index do |pair, index|
        start_sidekiq(pair, env: env,
                            directory: directory,
                            max_concurrency: max_concurrency,
                            min_concurrency: min_concurrency,
                            worker_id: index,
                            timeout: timeout,
                            dryrun: dryrun)
      end
    end

    # Starts a Sidekiq process that processes _only_ the given queues.
    #
    # Returns the PID of the started process.
    def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurrency:, worker_id:, timeout:, dryrun:)
      counts = count_by_queue(queues)

      cmd = %w[bundle exec sidekiq]
      cmd << "-c#{self.concurrency(queues, min_concurrency, max_concurrency)}"
      cmd << "-e#{env}"
      cmd << "-t#{timeout}"
      cmd << "-gqueues:#{proc_details(counts)}"
      cmd << "-r#{directory}"

      counts.each do |queue, count|
        cmd << "-q#{queue},#{count}"
      end

      if dryrun
        puts Shellwords.join(cmd) # rubocop:disable Rails/Output
        return
      end

      # We need to remove Bundler specific env vars, since otherwise the
      # child process will think we are passing an alternative Gemfile
      # and will clear and reset LOAD_PATH.
      pid = Bundler.with_original_env do
        Process.spawn(
          { 'ENABLE_SIDEKIQ_CLUSTER' => '1',
            'SIDEKIQ_WORKER_ID' => worker_id.to_s },
          *cmd,
          pgroup: true,
          err: $stderr,
          out: $stdout
        )
      end

      ProcessManagement.wait_async(pid)

      pid
    end

    def self.count_by_queue(queues)
      queues.tally
    end

    def self.proc_details(counts)
      counts.map do |queue, count|
        if count == 1
          queue
        else
          "#{queue} (#{count})"
        end
      end.join(',')
    end

    def self.concurrency(queues, min_concurrency, max_concurrency)
      concurrency_from_queues = queues.length + 1
      max = max_concurrency > 0 ? max_concurrency : concurrency_from_queues
      min = [min_concurrency, max].min

      concurrency_from_queues.clamp(min, max)
    end
  end
end