Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDouglas Barbosa Alexandre <dbalexandre@gmail.com>2017-07-07 05:20:14 +0300
committerDouglas Barbosa Alexandre <dbalexandre@gmail.com>2017-07-07 22:07:57 +0300
commit8c37d0afef6f75341d19220f9d6b13f4b3409b09 (patch)
tree4ea45daf0c6a0cc17b0aaafeb649a9afda4f84f7
parent5c0b6314683c190062557b5401e768aa41586c1c (diff)
Limit the number of concurrent projects that can be synchronized
-rw-r--r--app/workers/geo_repository_sync_worker.rb107
1 files changed, 92 insertions, 15 deletions
diff --git a/app/workers/geo_repository_sync_worker.rb b/app/workers/geo_repository_sync_worker.rb
index eab8b867fe2..9e945b5b3b4 100644
--- a/app/workers/geo_repository_sync_worker.rb
+++ b/app/workers/geo_repository_sync_worker.rb
@@ -2,33 +2,73 @@ class GeoRepositorySyncWorker
include Sidekiq::Worker
include CronjobQueue
- RUN_TIME = 5.minutes.to_i
- BATCH_SIZE = 100
+ LEASE_KEY = 'geo_repository_sync_worker'.freeze
+ LEASE_TIMEOUT = 8.hours.freeze
+ BATCH_SIZE = 1000
BACKOFF_DELAY = 5.minutes
+ MAX_CAPACITY = 25
+ RUN_TIME = 60.minutes.to_i
+
+ def initialize
+ @pending_projects = []
+ @scheduled_jobs = []
+ end
def perform
return unless Gitlab::Geo.secondary_role_enabled?
return unless Gitlab::Geo.primary_node.present?
- start_time = Time.now
- project_ids_not_synced = find_project_ids_not_synced
- project_ids_updated_recently = find_project_ids_updated_recently
- project_ids = interleave(project_ids_not_synced, project_ids_updated_recently)
+ logger.info "Started Geo repository sync scheduler"
- logger.info "Started Geo repository syncing for #{project_ids.length} project(s)"
+ @start_time = Time.now
- project_ids.each do |project_id|
- break if over_time?(start_time)
- break unless node_enabled?
+ # Prevent multiple Sidekiq workers from attempting to schedule projects synchronization
+ try_obtain_lease do
+ loop do
+ break unless node_enabled?
- Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
- end
+ update_jobs_in_progress
+ load_pending_projects if reload_queue?
+
+ # If we are still under the limit after refreshing our DB, we can end
+ # after scheduling the remaining transfers.
+ last_batch = reload_queue?
+
+ break if over_time?
+ break unless projects_remain?
+
+ schedule_jobs
+
+ break if last_batch
+
+ sleep(1)
+ end
- logger.info "Finished Geo repository syncing for #{project_ids.length} project(s)"
+ logger.info "Finished Geo repository sync scheduler"
+ end
end
private
+ def reload_queue?
+ @pending_projects.size < MAX_CAPACITY
+ end
+
+ def projects_remain?
+ @pending_projects.size
+ end
+
+ def over_time?
+ Time.now - @start_time >= RUN_TIME
+ end
+
+ def load_pending_projects
+ project_ids_not_synced = find_project_ids_not_synced
+ project_ids_updated_recently = find_project_ids_updated_recently
+
+ @pending_projects = interleave(project_ids_not_synced, project_ids_updated_recently)
+ end
+
def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc)
@@ -51,8 +91,45 @@ class GeoRepositorySyncWorker
end.flatten(1).uniq.compact.take(BATCH_SIZE)
end
- def over_time?(start_time)
- Time.now - start_time >= RUN_TIME
+ def schedule_jobs
+ num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_projects.size].min
+ return unless projects_remain?
+
+ num_to_schedule.times do
+ project_id = @pending_projects.shift
+ job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
+
+ if job_id
+ @scheduled_jobs << { id: project_id, job_id: job_id }
+ end
+ end
+ end
+
+ def scheduled_job_ids
+ @scheduled_jobs.map { |data| data[:job_id] }
+ end
+
+ def update_jobs_in_progress
+ status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
+
+ # SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
+ # For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
+ # Next, filter out the jobs that have completed.
+ @scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
+ end
+
+ def try_obtain_lease
+ uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain
+
+ return unless uuid
+
+ yield
+
+ release_lease(uuid)
+ end
+
+ def release_lease(uuid)
+ Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?