diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware/size_limiter')
3 files changed, 129 insertions, 32 deletions
diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb b/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb new file mode 100644 index 00000000000..bce295d8ba5 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/size_limiter/compressor.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module SizeLimiter + class Compressor + PayloadDecompressionConflictError = Class.new(StandardError) + PayloadDecompressionError = Class.new(StandardError) + + # Level 5 is a good trade-off between space and time + # https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1054#note_568129605 + COMPRESS_LEVEL = 5 + ORIGINAL_SIZE_KEY = 'original_job_size_bytes' + COMPRESSED_KEY = 'compressed' + + def self.compressed?(job) + job&.has_key?(COMPRESSED_KEY) + end + + def self.compress(job, job_args) + compressed_args = Base64.strict_encode64(Zlib::Deflate.deflate(job_args, COMPRESS_LEVEL)) + + job[COMPRESSED_KEY] = true + job[ORIGINAL_SIZE_KEY] = job_args.bytesize + job['args'] = [compressed_args] + + compressed_args + end + + def self.decompress(job) + return unless compressed?(job) + + validate_args!(job) + + job.except!(ORIGINAL_SIZE_KEY, COMPRESSED_KEY) + job['args'] = Sidekiq.load_json(Zlib::Inflate.inflate(Base64.strict_decode64(job['args'].first))) + rescue Zlib::Error + raise PayloadDecompressionError, 'Fail to decompress Sidekiq job payload' + end + + def self.validate_args!(job) + if job['args'] && job['args'].length != 1 + exception = PayloadDecompressionConflictError.new('Sidekiq argument list should include 1 argument.\ + This means that there is another a middleware interfering with the job payload.\ + That conflicts with the payload compressor') + ::Gitlab::ErrorTracking.track_and_raise_exception(exception) + end + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/server.rb b/lib/gitlab/sidekiq_middleware/size_limiter/server.rb new file mode 100644 index 00000000000..70b384c8f28 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/size_limiter/server.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module SizeLimiter + class Server + def call(worker, job, queue) + # This middleware should always decompress jobs regardless of the + # limiter mode or size limit. Otherwise, this could leave compressed + # payloads in queues that are then not able to be processed. + ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.decompress(job) + + yield + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb index 2c50c4a2157..d86f1609f14 100644 --- a/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb +++ b/lib/gitlab/sidekiq_middleware/size_limiter/validator.rb @@ -3,76 +3,103 @@ module Gitlab module SidekiqMiddleware module SizeLimiter - # Validate a Sidekiq job payload limit based on current configuration. + # Handle a Sidekiq job payload limit based on current configuration. # This validator pulls the configuration from the environment variables: - # # - GITLAB_SIDEKIQ_SIZE_LIMITER_MODE: the current mode of the size - # limiter. This must be either `track` or `raise`. - # + # limiter. This must be either `track` or `compress`. + # - GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES: the + # threshold before the input job payload is compressed. # - GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES: the size limit in bytes. # - # If the size of job payload after serialization exceeds the limit, an - # error is tracked raised adhering to the mode. + # In track mode, if a job payload limit exceeds the size limit, an + # event is sent to Sentry and the job is scheduled like normal. + # + # In compress mode, if a job payload limit exceeds the threshold, it is + # then compressed. If the compressed payload still exceeds the limit, the + # job is discarded, and a ExceedLimitError exception is raised. class Validator def self.validate!(worker_class, job) new(worker_class, job).validate! end DEFAULT_SIZE_LIMIT = 0 + DEFAULT_COMPRESION_THRESHOLD_BYTES = 100_000 # 100kb MODES = [ TRACK_MODE = 'track', - RAISE_MODE = 'raise' + COMPRESS_MODE = 'compress' ].freeze - attr_reader :mode, :size_limit + attr_reader :mode, :size_limit, :compression_threshold def initialize( worker_class, job, mode: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_MODE'], + compression_threshold: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES'], size_limit: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES'] ) @worker_class = worker_class @job = job + set_mode(mode) + set_compression_threshold(compression_threshold) + set_size_limit(size_limit) + end + + def validate! + return unless @size_limit > 0 + return if allow_big_payload? + + job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args'])) + return if job_args.bytesize <= @size_limit + + exception = exceed_limit_error(job_args) + if compress_mode? + raise exception + else + track(exception) + end + end + + private + + def set_mode(mode) @mode = (mode || TRACK_MODE).to_s.strip unless MODES.include?(@mode) ::Sidekiq.logger.warn "Invalid Sidekiq size limiter mode: #{@mode}. Fallback to #{TRACK_MODE} mode." @mode = TRACK_MODE end + end + + def set_compression_threshold(compression_threshold) + @compression_threshold = (compression_threshold || DEFAULT_COMPRESION_THRESHOLD_BYTES).to_i + if @compression_threshold <= 0 + ::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}" + @compression_threshold = DEFAULT_COMPRESION_THRESHOLD_BYTES + end + end + def set_size_limit(size_limit) @size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i if @size_limit < 0 ::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}" end end - def validate! - return unless @size_limit > 0 - - return if allow_big_payload? - return if job_size <= @size_limit - - exception = ExceedLimitError.new(@worker_class, job_size, @size_limit) - # This should belong to Gitlab::ErrorTracking. We'll remove this - # after this epic is done: - # https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396 - exception.set_backtrace(backtrace) - - if raise_mode? - raise exception - else - track(exception) + def exceed_limit_error(job_args) + ExceedLimitError.new(@worker_class, job_args.bytesize, @size_limit).tap do |exception| + # This should belong to Gitlab::ErrorTracking. We'll remove this + # after this epic is done: + # https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396 + exception.set_backtrace(backtrace) end end - private + def compress_if_necessary(job_args) + return job_args unless compress_mode? + return job_args if job_args.bytesize < @compression_threshold - def job_size - # This maynot be the optimal solution, but can be acceptable solution - # for now. Internally, Sidekiq calls Sidekiq.dump_json everywhere. - # There is no clean way to intefere to prevent double serialization. - @job_size ||= ::Sidekiq.dump_json(@job).bytesize + ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compress(@job, job_args) end def allow_big_payload? @@ -80,8 +107,8 @@ module Gitlab worker_class.respond_to?(:big_payload?) && worker_class.big_payload? end - def raise_mode? - @mode == RAISE_MODE + def compress_mode? + @mode == COMPRESS_MODE end def track(exception) |