Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-06-18 14:18:50 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2020-06-18 14:18:50 +0300
commit8c7f4e9d5f36cff46365a7f8c4b9c21578c1e781 (patch)
treea77e7fe7a93de11213032ed4ab1f33a3db51b738 /lib/gitlab/sidekiq_middleware
parent00b35af3db1abfe813a778f643dad221aad51fca (diff)
Add latest changes from gitlab-org/gitlab@13-1-stable-ee
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb3
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb45
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb26
-rw-r--r--lib/gitlab/sidekiq_middleware/server_metrics.rb42
4 files changed, 89 insertions, 27 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb
index ddd1b91410b..bb0c18735bb 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb
@@ -5,9 +5,6 @@ module Gitlab
module DuplicateJobs
class Client
def call(worker_class, job, queue, _redis_pool, &block)
- # We don't try to deduplicate jobs that are scheduled in the future
- return yield if job['at']
-
DuplicateJob.new(job, queue).schedule(&block)
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
index fa742d07af2..0dc53c61e84 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -18,13 +18,13 @@ module Gitlab
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
DUPLICATE_KEY_TTL = 6.hours
+ DEFAULT_STRATEGY = :until_executing
attr_reader :existing_jid
- def initialize(job, queue_name, strategy: :until_executing)
+ def initialize(job, queue_name)
@job = job
@queue_name = queue_name
- @strategy = strategy
end
# This will continue the middleware chain if the job should be scheduled
@@ -41,12 +41,12 @@ module Gitlab
end
# This method will return the jid that was set in redis
- def check!
+ def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil
Sidekiq.redis do |redis|
redis.multi do |multi|
- redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true)
+ redis.set(idempotency_key, jid, ex: expiry, nx: true)
read_jid = redis.get(idempotency_key)
end
end
@@ -60,6 +60,10 @@ module Gitlab
end
end
+ def scheduled?
+ scheduled_at.present?
+ end
+
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
@@ -67,14 +71,36 @@ module Gitlab
end
def droppable?
- idempotent? && duplicate? && ::Feature.disabled?("disable_#{queue_name}_deduplication")
+ idempotent? && ::Feature.disabled?("disable_#{queue_name}_deduplication")
+ end
+
+ def scheduled_at
+ job['at']
+ end
+
+ def options
+ return {} unless worker_klass
+ return {} unless worker_klass.respond_to?(:get_deduplication_options)
+
+ worker_klass.get_deduplication_options
end
private
- attr_reader :queue_name, :strategy, :job
+ attr_reader :queue_name, :job
attr_writer :existing_jid
+ def worker_klass
+ @worker_klass ||= worker_class_name.to_s.safe_constantize
+ end
+
+ def strategy
+ return DEFAULT_STRATEGY unless worker_klass
+ return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
+
+ worker_klass.get_deduplicate_strategy
+ end
+
def worker_class_name
job['class']
end
@@ -104,11 +130,10 @@ module Gitlab
end
def idempotent?
- worker_class = worker_class_name.to_s.safe_constantize
- return false unless worker_class
- return false unless worker_class.respond_to?(:idempotent?)
+ return false unless worker_klass
+ return false unless worker_klass.respond_to?(:idempotent?)
- worker_class.idempotent?
+ worker_klass.idempotent?
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
index 674e436b714..0ed4912c4cc 100644
--- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
@@ -13,13 +13,13 @@ module Gitlab
end
def schedule(job)
- if duplicate_job.check! && duplicate_job.duplicate?
+ if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
- end
- if duplicate_job.droppable?
- Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
- return false
+ if duplicate_job.droppable?
+ Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
+ return false
+ end
end
yield
@@ -34,6 +34,22 @@ module Gitlab
private
attr_reader :duplicate_job
+
+ def deduplicatable_job?
+ !duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
+ end
+
+ def check!
+ duplicate_job.check!(expiry)
+ end
+
+ def expiry
+ return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
+
+ time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
+
+ time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
+ end
end
end
end
diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb
index 61ed2fe1a06..6a942a6ce06 100644
--- a/lib/gitlab/sidekiq_middleware/server_metrics.rb
+++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb
@@ -47,6 +47,10 @@ module Gitlab
@metrics[:sidekiq_jobs_completion_seconds].observe(labels, monotonic_time)
@metrics[:sidekiq_jobs_db_seconds].observe(labels, ActiveRecord::LogSubscriber.runtime / 1000)
@metrics[:sidekiq_jobs_gitaly_seconds].observe(labels, get_gitaly_time(job))
+ @metrics[:sidekiq_redis_requests_total].increment(labels, get_redis_calls(job))
+ @metrics[:sidekiq_redis_requests_duration_seconds].observe(labels, get_redis_time(job))
+ @metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(job))
+ @metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(job))
end
end
@@ -54,15 +58,19 @@ module Gitlab
def init_metrics
{
- sidekiq_jobs_cpu_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_cpu_seconds, 'Seconds of cpu time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
- sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
- sidekiq_jobs_db_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_db_seconds, 'Seconds of database time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
- sidekiq_jobs_gitaly_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_gitaly_seconds, 'Seconds of Gitaly time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
- sidekiq_jobs_queue_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_queue_duration_seconds, 'Duration in seconds that a Sidekiq job was queued before being executed', {}, SIDEKIQ_LATENCY_BUCKETS),
- sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'),
- sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'),
- sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all),
- sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all)
+ sidekiq_jobs_cpu_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_cpu_seconds, 'Seconds of cpu time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
+ sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
+ sidekiq_jobs_db_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_db_seconds, 'Seconds of database time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
+ sidekiq_jobs_gitaly_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_gitaly_seconds, 'Seconds of Gitaly time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS),
+ sidekiq_jobs_queue_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_queue_duration_seconds, 'Duration in seconds that a Sidekiq job was queued before being executed', {}, SIDEKIQ_LATENCY_BUCKETS),
+ sidekiq_redis_requests_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_redis_requests_duration_seconds, 'Duration in seconds that a Sidekiq job spent requests a Redis server', {}, Gitlab::Instrumentation::Redis::QUERY_TIME_BUCKETS),
+ sidekiq_elasticsearch_requests_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_elasticsearch_requests_duration_seconds, 'Duration in seconds that a Sidekiq job spent in requests to an Elasticsearch server', {}, SIDEKIQ_LATENCY_BUCKETS),
+ sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'),
+ sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'),
+ sidekiq_redis_requests_total: ::Gitlab::Metrics.counter(:sidekiq_redis_requests_total, 'Redis requests during a Sidekiq job execution'),
+ sidekiq_elasticsearch_requests_total: ::Gitlab::Metrics.counter(:sidekiq_elasticsearch_requests_total, 'Elasticsearch requests during a Sidekiq job execution'),
+ sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all),
+ sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all)
}
end
@@ -70,6 +78,22 @@ module Gitlab
defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0
end
+ def get_redis_time(job)
+ job.fetch(:redis_duration_s, 0)
+ end
+
+ def get_redis_calls(job)
+ job.fetch(:redis_calls, 0)
+ end
+
+ def get_elasticsearch_time(job)
+ job.fetch(:elasticsearch_duration_s, 0)
+ end
+
+ def get_elasticsearch_calls(job)
+ job.fetch(:elasticsearch_calls, 0)
+ end
+
def get_gitaly_time(job)
job.fetch(:gitaly_duration_s, 0)
end