diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
6 files changed, 151 insertions, 52 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index 79ac853ea0c..4cf540ce3b8 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -19,6 +19,7 @@ module Gitlab class DuplicateJob DUPLICATE_KEY_TTL = 6.hours DEFAULT_STRATEGY = :until_executing + STRATEGY_NONE = :none attr_reader :existing_jid @@ -51,6 +52,8 @@ module Gitlab end end + job['idempotency_key'] = idempotency_key + self.existing_jid = read_jid.value end @@ -100,6 +103,7 @@ module Gitlab def strategy return DEFAULT_STRATEGY unless worker_klass return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?) + return STRATEGY_NONE unless worker_klass.deduplication_enabled? worker_klass.get_deduplicate_strategy end @@ -117,7 +121,7 @@ module Gitlab end def idempotency_key - @idempotency_key ||= "#{namespace}:#{idempotency_hash}" + @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}" end def idempotency_hash @@ -129,6 +133,10 @@ module Gitlab end def idempotency_string + # TODO: dump the argument's JSON using `Sidekiq.dump_json` instead + # this should be done in the next release so all jobs are written + # with their idempotency key. + # see https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1090 "#{worker_class_name}:#{arguments.join('-')}" end end diff --git a/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb b/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb index b542aa4fe4c..1f0c63c5fff 100644 --- a/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb +++ b/lib/gitlab/sidekiq_middleware/instrumentation_logger.rb @@ -3,24 +3,6 @@ module Gitlab module SidekiqMiddleware class InstrumentationLogger - def self.keys - @keys ||= [ - :cpu_s, - :gitaly_calls, - :gitaly_duration_s, - :rugged_calls, - :rugged_duration_s, - :elasticsearch_calls, - :elasticsearch_duration_s, - :elasticsearch_timed_out_count, - *::Gitlab::Memory::Instrumentation::KEY_MAPPING.values, - *::Gitlab::Instrumentation::Redis.known_payload_keys, - *::Gitlab::Metrics::Subscribers::ActiveRecord.known_payload_keys, - *::Gitlab::Metrics::Subscribers::ExternalHttp::KNOWN_PAYLOAD_KEYS, - *::Gitlab::Metrics::Subscribers::RackAttack::PAYLOAD_KEYS - ] - end - def call(worker, job, queue) ::Gitlab::InstrumentationHelper.init_instrumentation_data @@ -37,7 +19,6 @@ module Gitlab # https://github.com/mperham/sidekiq/blob/53bd529a0c3f901879925b8390353129c465b1f2/lib/sidekiq/processor.rb#L115-L118 job[:instrumentation] = {}.tap do |instrumentation_values| ::Gitlab::InstrumentationHelper.add_instrumentation_data(instrumentation_values) - instrumentation_values.slice!(*self.class.keys) end end end diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb index 474afffcf93..6d130957f36 100644 --- a/lib/gitlab/sidekiq_middleware/server_metrics.rb +++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb @@ -13,6 +13,10 @@ module Gitlab @metrics = init_metrics @metrics[:sidekiq_concurrency].set({}, Sidekiq.options[:concurrency].to_i) + + if ::Gitlab::Database::LoadBalancing.enable? + @metrics[:sidekiq_load_balancing_count] = ::Gitlab::Metrics.counter(:sidekiq_load_balancing_count, 'Sidekiq jobs with load balancing') + end end def call(worker, job, queue) @@ -69,6 +73,15 @@ module Gitlab @metrics[:sidekiq_redis_requests_duration_seconds].observe(labels, get_redis_time(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(instrumentation)) + + if ::Gitlab::Database::LoadBalancing.enable? && job[:database_chosen] + load_balancing_labels = { + database_chosen: job[:database_chosen], + data_consistency: job[:data_consistency] + } + + @metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1) + end end end diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb b/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb new file mode 100644 index 00000000000..bce295d8ba5 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module SizeLimiter + class Compressor + PayloadDecompressionConflictError = Class.new(StandardError) + PayloadDecompressionError = Class.new(StandardError) + + # Level 5 is a good trade-off between space and time + # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1054#note_568129605 + COMPRESS_LEVEL = 5 + ORIGINAL_SIZE_KEY = 'original_job_size_bytes' + COMPRESSED_KEY = 'compressed' + + def self.compressed?(job) + job&.has_key?(COMPRESSED_KEY) + end + + def self.compress(job, job_args) + compressed_args = Base64.strict_encode64(Zlib::Deflate.deflate(job_args, COMPRESS_LEVEL)) + + job[COMPRESSED_KEY] = true + job[ORIGINAL_SIZE_KEY] = job_args.bytesize + job['args'] = [compressed_args] + + compressed_args + end + + def self.decompress(job) + return unless compressed?(job) + + validate_args!(job) + + job.except!(ORIGINAL_SIZE_KEY, COMPRESSED_KEY) + job['args'] = Sidekiq.load_json(Zlib::Inflate.inflate(Base64.strict_decode64(job['args'].first))) + rescue Zlib::Error + raise PayloadDecompressionError, 'Fail to decompress Sidekiq job payload' + end + + def self.validate_args!(job) + if job['args'] && job['args'].length != 1 + exception = PayloadDecompressionConflictError.new('Sidekiq argument list should include 1 argument.\ + This means that there is another a middleware interfering with the job payload.\ + That conflicts with the payload compressor') + ::Gitlab::ErrorTracking.track_and_raise_exception(exception) + end + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/server.rb b/lib/gitlab/sidekiq_middleware/size_limiter/server.rb new file mode 100644 index 00000000000..70b384c8f28 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/size_limiter/server.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module SizeLimiter + class Server + def call(worker, job, queue) + # This middleware should always decompress jobs regardless of the + # limiter mode or size limit. Otherwise, this could leave compressed + # payloads in queues that are then not able to be processed. + ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.decompress(job) + + yield + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb index 2c50c4a2157..d86f1609f14 100644 --- a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb +++ b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb @@ -3,76 +3,103 @@ module Gitlab module SidekiqMiddleware module SizeLimiter - # Validate a Sidekiq job payload limit based on current configuration. + # Handle a Sidekiq job payload limit based on current configuration. # This validator pulls the configuration from the environment variables: - # # - GITLAB_SIDEKIQ_SIZE_LIMITER_MODE: the current mode of the size - # limiter. This must be either `track` or `raise`. - # + # limiter. This must be either `track` or `compress`. + # - GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES: the + # threshold before the input job payload is compressed. # - GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES: the size limit in bytes. # - # If the size of job payload after serialization exceeds the limit, an - # error is tracked raised adhering to the mode. + # In track mode, if a job payload limit exceeds the size limit, an + # event is sent to Sentry and the job is scheduled like normal. + # + # In compress mode, if a job payload limit exceeds the threshold, it is + # then compressed. If the compressed payload still exceeds the limit, the + # job is discarded, and a ExceedLimitError exception is raised. class Validator def self.validate!(worker_class, job) new(worker_class, job).validate! end DEFAULT_SIZE_LIMIT = 0 + DEFAULT_COMPRESION_THRESHOLD_BYTES = 100_000 # 100kb MODES = [ TRACK_MODE = 'track', - RAISE_MODE = 'raise' + COMPRESS_MODE = 'compress' ].freeze - attr_reader :mode, :size_limit + attr_reader :mode, :size_limit, :compression_threshold def initialize( worker_class, job, mode: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_MODE'], + compression_threshold: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES'], size_limit: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES'] ) @worker_class = worker_class @job = job + set_mode(mode) + set_compression_threshold(compression_threshold) + set_size_limit(size_limit) + end + + def validate! + return unless @size_limit > 0 + return if allow_big_payload? + + job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args'])) + return if job_args.bytesize <= @size_limit + + exception = exceed_limit_error(job_args) + if compress_mode? + raise exception + else + track(exception) + end + end + + private + + def set_mode(mode) @mode = (mode || TRACK_MODE).to_s.strip unless MODES.include?(@mode) ::Sidekiq.logger.warn "Invalid Sidekiq size limiter mode: #{@mode}. Fallback to #{TRACK_MODE} mode." @mode = TRACK_MODE end + end + + def set_compression_threshold(compression_threshold) + @compression_threshold = (compression_threshold || DEFAULT_COMPRESION_THRESHOLD_BYTES).to_i + if @compression_threshold <= 0 + ::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}" + @compression_threshold = DEFAULT_COMPRESION_THRESHOLD_BYTES + end + end + def set_size_limit(size_limit) @size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i if @size_limit < 0 ::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}" end end - def validate! - return unless @size_limit > 0 - - return if allow_big_payload? - return if job_size <= @size_limit - - exception = ExceedLimitError.new(@worker_class, job_size, @size_limit) - # This should belong to Gitlab::ErrorTracking. We'll remove this - # after this epic is done: - # https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396 - exception.set_backtrace(backtrace) - - if raise_mode? - raise exception - else - track(exception) + def exceed_limit_error(job_args) + ExceedLimitError.new(@worker_class, job_args.bytesize, @size_limit).tap do |exception| + # This should belong to Gitlab::ErrorTracking. We'll remove this + # after this epic is done: + # https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396 + exception.set_backtrace(backtrace) end end - private + def compress_if_necessary(job_args) + return job_args unless compress_mode? + return job_args if job_args.bytesize < @compression_threshold - def job_size - # This maynot be the optimal solution, but can be acceptable solution - # for now. Internally, Sidekiq calls Sidekiq.dump_json everywhere. - # There is no clean way to intefere to prevent double serialization. - @job_size ||= ::Sidekiq.dump_json(@job).bytesize + ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compress(@job, job_args) end def allow_big_payload? @@ -80,8 +107,8 @@ module Gitlab worker_class.respond_to?(:big_payload?) && worker_class.big_payload? end - def raise_mode? - @mode == RAISE_MODE + def compress_mode? + @mode == COMPRESS_MODE end def track(exception) |