Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'app/services/ci/pipeline_processing/atomic_processing_service.rb')
-rw-r--r--app/services/ci/pipeline_processing/atomic_processing_service.rb46
1 files changed, 43 insertions, 3 deletions
diff --git a/app/services/ci/pipeline_processing/atomic_processing_service.rb b/app/services/ci/pipeline_processing/atomic_processing_service.rb
index 1094a131e68..c0ffbb401f6 100644
--- a/app/services/ci/pipeline_processing/atomic_processing_service.rb
+++ b/app/services/ci/pipeline_processing/atomic_processing_service.rb
@@ -22,9 +22,19 @@ module Ci
# Run the process only if we can obtain an exclusive lease; returns nil if lease is unavailable
success = try_obtain_lease { process! }
- # Re-schedule if we need further processing
- if success && pipeline.needs_processing?
- PipelineProcessWorker.perform_async(pipeline.id)
+ if success
+ if ::Feature.enabled?(:ci_reset_skipped_jobs_in_atomic_processing, project)
+ # If any jobs changed from stopped to alive status during pipeline processing, we must
+ # re-reset their dependent jobs; see https://gitlab.com/gitlab-org/gitlab/-/issues/388539.
+ new_alive_jobs.group_by(&:user).each do |user, jobs|
+ log_running_reset_skipped_jobs_service(jobs)
+
+ ResetSkippedJobsService.new(project, user).execute(jobs)
+ end
+ end
+
+ # Re-schedule if we need further processing
+ PipelineProcessWorker.perform_async(pipeline.id) if pipeline.needs_processing?
end
success
@@ -105,6 +115,25 @@ module Ci
end
end
+ # Gets the jobs that changed from stopped to alive status since the initial status collection
+ # was evaluated. We determine this by checking if their current status is no longer stopped.
+ def new_alive_jobs
+ initial_stopped_job_names = @collection.stopped_job_names
+
+ return [] if initial_stopped_job_names.empty?
+
+ new_collection = AtomicProcessingService::StatusCollection.new(pipeline)
+ new_alive_job_names = initial_stopped_job_names - new_collection.stopped_job_names
+
+ return [] if new_alive_job_names.empty?
+
+ pipeline
+ .current_jobs
+ .by_name(new_alive_job_names)
+ .preload(:user) # rubocop: disable CodeReuse/ActiveRecord
+ .to_a
+ end
+
def project
pipeline.project
end
@@ -116,6 +145,17 @@ module Ci
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
+
+ def log_running_reset_skipped_jobs_service(jobs)
+ Gitlab::AppJsonLogger.info(
+ class: self.class.name.to_s,
+ message: 'Running ResetSkippedJobsService on new alive jobs',
+ project_id: project.id,
+ pipeline_id: pipeline.id,
+ user_id: jobs.first.user.id,
+ jobs_count: jobs.count
+ )
+ end
end
end
end