diff options
author | Douglas Barbosa Alexandre <dbalexandre@gmail.com> | 2017-07-07 05:20:14 +0300 |
---|---|---|
committer | Douglas Barbosa Alexandre <dbalexandre@gmail.com> | 2017-07-07 22:07:57 +0300 |
commit | 8c37d0afef6f75341d19220f9d6b13f4b3409b09 (patch) | |
tree | 4ea45daf0c6a0cc17b0aaafeb649a9afda4f84f7 | |
parent | 5c0b6314683c190062557b5401e768aa41586c1c (diff) |
Limit the number of concurrent projects that can be synchronized
-rw-r--r-- | app/workers/geo_repository_sync_worker.rb | 107 |
1 files changed, 92 insertions, 15 deletions
diff --git a/app/workers/geo_repository_sync_worker.rb b/app/workers/geo_repository_sync_worker.rb index eab8b867fe2..9e945b5b3b4 100644 --- a/app/workers/geo_repository_sync_worker.rb +++ b/app/workers/geo_repository_sync_worker.rb @@ -2,33 +2,73 @@ class GeoRepositorySyncWorker include Sidekiq::Worker include CronjobQueue - RUN_TIME = 5.minutes.to_i - BATCH_SIZE = 100 + LEASE_KEY = 'geo_repository_sync_worker'.freeze + LEASE_TIMEOUT = 8.hours.freeze + BATCH_SIZE = 1000 BACKOFF_DELAY = 5.minutes + MAX_CAPACITY = 25 + RUN_TIME = 60.minutes.to_i + + def initialize + @pending_projects = [] + @scheduled_jobs = [] + end def perform return unless Gitlab::Geo.secondary_role_enabled? return unless Gitlab::Geo.primary_node.present? - start_time = Time.now - project_ids_not_synced = find_project_ids_not_synced - project_ids_updated_recently = find_project_ids_updated_recently - project_ids = interleave(project_ids_not_synced, project_ids_updated_recently) + logger.info "Started Geo repository sync scheduler" - logger.info "Started Geo repository syncing for #{project_ids.length} project(s)" + @start_time = Time.now - project_ids.each do |project_id| - break if over_time?(start_time) - break unless node_enabled? + # Prevent multiple Sidekiq workers from attempting to schedule projects synchronization + try_obtain_lease do + loop do + break unless node_enabled? - Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now) - end + update_jobs_in_progress + load_pending_projects if reload_queue? + + # If we are still under the limit after refreshing our DB, we can end + # after scheduling the remaining transfers. + last_batch = reload_queue? + + break if over_time? + break unless projects_remain? + + schedule_jobs + + break if last_batch + + sleep(1) + end - logger.info "Finished Geo repository syncing for #{project_ids.length} project(s)" + logger.info "Finished Geo repository sync scheduler" + end end private + def reload_queue? + @pending_projects.size < MAX_CAPACITY + end + + def projects_remain? + @pending_projects.size + end + + def over_time? + Time.now - @start_time >= RUN_TIME + end + + def load_pending_projects + project_ids_not_synced = find_project_ids_not_synced + project_ids_updated_recently = find_project_ids_updated_recently + + @pending_projects = interleave(project_ids_not_synced, project_ids_updated_recently) + end + def find_project_ids_not_synced Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id)) .order(last_repository_updated_at: :desc) @@ -51,8 +91,45 @@ class GeoRepositorySyncWorker end.flatten(1).uniq.compact.take(BATCH_SIZE) end - def over_time?(start_time) - Time.now - start_time >= RUN_TIME + def schedule_jobs + num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_projects.size].min + return unless projects_remain? + + num_to_schedule.times do + project_id = @pending_projects.shift + job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now) + + if job_id + @scheduled_jobs << { id: project_id, job_id: job_id } + end + end + end + + def scheduled_job_ids + @scheduled_jobs.map { |data| data[:job_id] } + end + + def update_jobs_in_progress + status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids) + + # SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise. + # For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ] + # Next, filter out the jobs that have completed. + @scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact + end + + def try_obtain_lease + uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain + + return unless uuid + + yield + + release_lease(uuid) + end + + def release_lease(uuid) + Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid) end def node_enabled? |