Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-02-27 21:09:21 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2020-02-27 21:09:21 +0300
commite0fa0638a422c3e20d4423c9bb69d79afc9c7d3d (patch)
tree9abb3c0706576bbda895fe9539a55556930606e2 /lib/gitlab/sidekiq_middleware
parentf8d15ca65390475e356b06dedc51e10ccd179f86 (diff)
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb13
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb104
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb13
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies.rb21
-rw-r--r--lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb36
5 files changed, 187 insertions, 0 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb
new file mode 100644
index 00000000000..bb0c18735bb
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module DuplicateJobs
+ class Client
+ def call(worker_class, job, queue, _redis_pool, &block)
+ DuplicateJob.new(job, queue).schedule(&block)
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
new file mode 100644
index 00000000000..b84673c4cee
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
@@ -0,0 +1,104 @@
+# frozen_string_literal: true
+
+require 'digest'
+
+module Gitlab
+ module SidekiqMiddleware
+ module DuplicateJobs
+ # This class defines an identifier of a job in a queue
+ # The identifier based on a job's class and arguments.
+ #
+ # As strategy decides when to keep track of the job in redis and when to
+ # remove it.
+ #
+ # Storing the deduplication key in redis can be done by calling `check!`
+ # check returns the `jid` of the job if it was scheduled, or the `jid` of
+ # the duplicate job if it was already scheduled
+ #
+ # When new jobs can be scheduled again, the strategy calls `#delete`.
+ class DuplicateJob
+ DUPLICATE_KEY_TTL = 6.hours
+
+ attr_reader :existing_jid
+
+ def initialize(job, queue_name, strategy: :until_executing)
+ @job = job
+ @queue_name = queue_name
+ @strategy = strategy
+ end
+
+ # This will continue the middleware chain if the job should be scheduled
+ # It will return false if the job needs to be cancelled
+ def schedule(&block)
+ Strategies.for(strategy).new(self).schedule(job, &block)
+ end
+
+ # This will continue the server middleware chain if the job should be
+ # executed.
+ # It will return false if the job should not be executed.
+ def perform(&block)
+ Strategies.for(strategy).new(self).perform(job, &block)
+ end
+
+ # This method will return the jid that was set in redis
+ def check!
+ read_jid = nil
+
+ Sidekiq.redis do |redis|
+ redis.multi do |multi|
+ redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true)
+ read_jid = redis.get(idempotency_key)
+ end
+ end
+
+ self.existing_jid = read_jid.value
+ end
+
+ def delete!
+ Sidekiq.redis do |redis|
+ redis.del(idempotency_key)
+ end
+ end
+
+ def duplicate?
+ raise "Call `#check!` first to check for existing duplicates" unless existing_jid
+
+ jid != existing_jid
+ end
+
+ private
+
+ attr_reader :queue_name, :strategy, :job
+ attr_writer :existing_jid
+
+ def worker_class_name
+ job['class']
+ end
+
+ def arguments
+ job['args']
+ end
+
+ def jid
+ job['jid']
+ end
+
+ def idempotency_key
+ @idempotency_key ||= "#{namespace}:#{idempotency_hash}"
+ end
+
+ def idempotency_hash
+ Digest::SHA256.hexdigest(idempotency_string)
+ end
+
+ def namespace
+ "#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue_name}"
+ end
+
+ def idempotency_string
+ "#{worker_class_name}:#{arguments.join('-')}"
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb
new file mode 100644
index 00000000000..a35edc5774e
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/server.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module DuplicateJobs
+ class Server
+ def call(worker, job, queue, &block)
+ DuplicateJob.new(job, queue).perform(&block)
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies.rb
new file mode 100644
index 00000000000..a08310a58ff
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module DuplicateJobs
+ module Strategies
+ UnknownStrategyError = Class.new(StandardError)
+
+ STRATEGIES = {
+ until_executing: UntilExecuting
+ }.freeze
+
+ def self.for(name)
+ STRATEGIES.fetch(name)
+ rescue KeyError
+ raise UnknownStrategyError, "Unknown deduplication strategy #{name}"
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
new file mode 100644
index 00000000000..b8f49b67a59
--- /dev/null
+++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb
@@ -0,0 +1,36 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqMiddleware
+ module DuplicateJobs
+ module Strategies
+ # This strategy takes a lock before scheduling the job in a queue and
+ # removes the lock before the job starts allowing a new job to be queued
+ # while a job is still executing.
+ class UntilExecuting
+ def initialize(duplicate_job)
+ @duplicate_job = duplicate_job
+ end
+
+ def schedule(job)
+ if duplicate_job.check! && duplicate_job.duplicate?
+ job['duplicate-of'] = duplicate_job.existing_jid
+ end
+
+ yield
+ end
+
+ def perform(_job)
+ duplicate_job.delete!
+
+ yield
+ end
+
+ private
+
+ attr_reader :duplicate_job
+ end
+ end
+ end
+ end
+end