Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/support/helpers/concurrent_helpers.rb')
-rw-r--r--spec/support/helpers/concurrent_helpers.rb40
1 files changed, 40 insertions, 0 deletions
diff --git a/spec/support/helpers/concurrent_helpers.rb b/spec/support/helpers/concurrent_helpers.rb
new file mode 100644
index 00000000000..4eecc2133e7
--- /dev/null
+++ b/spec/support/helpers/concurrent_helpers.rb
@@ -0,0 +1,40 @@
+# frozen_string_literal: true
+
+module ConcurrentHelpers
+ Cancelled = Class.new(StandardError)
+
+ # To test for contention, we may need to run some actions in parallel. This
+ # helper takes an array of blocks and schedules them all on different threads
+ # in a fixed-size thread pool.
+ #
+ # @param [Array[Proc]] blocks
+ # @param [Integer] task_wait_time: time to wait for each task (upper bound on
+ # reasonable task execution time)
+ # @param [Integer] max_concurrency: maximum number of tasks to run at once
+ #
+ def run_parallel(blocks, task_wait_time: 20.seconds, max_concurrency: Concurrent.processor_count - 1)
+ thread_pool = Concurrent::FixedThreadPool.new(
+ [2, max_concurrency].max, { max_queue: blocks.size }
+ )
+ opts = { executor: thread_pool }
+
+ error = Concurrent::MVar.new
+
+ blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
+ future.wait(task_wait_time)
+
+ if future.complete?
+ error.put(future.reason) if future.reason && error.empty?
+ else
+ future.cancel
+ error.put(Cancelled.new) if error.empty?
+ end
+ end
+
+ raise error.take if error.full?
+ ensure
+ thread_pool.shutdown
+ thread_pool.wait_for_termination(10)
+ thread_pool.kill if thread_pool.running?
+ end
+end