Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-11-18 16:16:36 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2021-11-18 16:16:36 +0300
commit311b0269b4eb9839fa63f80c8d7a58f32b8138a0 (patch)
tree07e7870bca8aed6d61fdcc810731c50d2c40af47 /spec/workers/concerns/application_worker_spec.rb
parent27909cef6c4170ed9205afa7426b8d3de47cbb0c (diff)
Add latest changes from gitlab-org/gitlab@14-5-stable-eev14.5.0-rc42
Diffstat (limited to 'spec/workers/concerns/application_worker_spec.rb')
-rw-r--r--spec/workers/concerns/application_worker_spec.rb381
1 files changed, 330 insertions, 51 deletions
diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb
index af038c81b9e..fbf39b3c7cd 100644
--- a/spec/workers/concerns/application_worker_spec.rb
+++ b/spec/workers/concerns/application_worker_spec.rb
@@ -285,48 +285,38 @@ RSpec.describe ApplicationWorker do
end
end
- describe '.bulk_perform_async' do
- before do
- stub_const(worker.name, worker)
+ context 'different kinds of push_bulk' do
+ shared_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' do
+ before do
+ stub_feature_flags(sidekiq_push_bulk_in_batches: false)
+ end
end
- it 'enqueues jobs in bulk' do
- Sidekiq::Testing.fake! do
- worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]])
-
- expect(worker.jobs.count).to eq 2
- expect(worker.jobs).to all(include('enqueued_at'))
+ shared_context 'set safe limit beyond the number of jobs to be enqueued' do
+ before do
+ stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", args.count + 1)
end
end
- end
- describe '.bulk_perform_in' do
- before do
- stub_const(worker.name, worker)
+ shared_context 'set safe limit below the number of jobs to be enqueued' do
+ before do
+ stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", 2)
+ end
end
- context 'when delay is valid' do
- it 'correctly schedules jobs' do
- Sidekiq::Testing.fake! do
- worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
+ shared_examples_for 'returns job_id of all enqueued jobs' do
+ let(:job_id_regex) { /[0-9a-f]{12}/ }
- expect(worker.jobs.count).to eq 2
- expect(worker.jobs).to all(include('at'))
- end
- end
- end
+ it 'returns job_id of all enqueued jobs' do
+ job_ids = perform_action
- context 'when delay is invalid' do
- it 'raises an ArgumentError exception' do
- expect { worker.bulk_perform_in(-60, [['Foo']]) }
- .to raise_error(ArgumentError)
+ expect(job_ids.count).to eq(args.count)
+ expect(job_ids).to all(match(job_id_regex))
end
end
- context 'with batches' do
- let(:batch_delay) { 1.minute }
-
- it 'correctly schedules jobs' do
+ shared_examples_for 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
+ it 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]]))
.ordered
@@ -337,29 +327,318 @@ RSpec.describe ApplicationWorker do
.and_call_original)
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [5]]]))
- .ordered
- .and_call_original)
+ .ordered
+ .and_call_original)
- worker.bulk_perform_in(
- 1.minute,
- [['Foo', [1]], ['Foo', [2]], ['Foo', [3]], ['Foo', [4]], ['Foo', [5]]],
- batch_size: 2, batch_delay: batch_delay)
-
- expect(worker.jobs.count).to eq 5
- expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at'])
- expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
- expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay)
- expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay)
- end
-
- context 'when batch_size is invalid' do
- it 'raises an ArgumentError exception' do
- expect do
- worker.bulk_perform_in(1.minute,
- [['Foo']],
- batch_size: -1, batch_delay: batch_delay)
- end.to raise_error(ArgumentError)
+ perform_action
+
+ expect(worker.jobs.count).to eq args.count
+ expect(worker.jobs).to all(include('enqueued_at'))
+ end
+ end
+
+ shared_examples_for 'enqueues jobs in one go' do
+ it 'enqueues jobs in one go' do
+ expect(Sidekiq::Client).to(
+ receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original)
+ expect(Sidekiq.logger).not_to receive(:info)
+
+ perform_action
+
+ expect(worker.jobs.count).to eq args.count
+ expect(worker.jobs).to all(include('enqueued_at'))
+ end
+ end
+
+ shared_examples_for 'logs bulk insertions' do
+ it 'logs arguments and job IDs' do
+ worker.log_bulk_perform_async!
+
+ expect(Sidekiq.logger).to(
+ receive(:info).with(hash_including('class' => worker.name, 'args_list' => args)).once.and_call_original)
+ expect(Sidekiq.logger).to(
+ receive(:info).with(hash_including('class' => worker.name, 'jid_list' => anything)).once.and_call_original)
+
+ perform_action
+ end
+ end
+
+ before do
+ stub_const(worker.name, worker)
+ end
+
+ let(:args) do
+ [
+ ['Foo', [1]],
+ ['Foo', [2]],
+ ['Foo', [3]],
+ ['Foo', [4]],
+ ['Foo', [5]]
+ ]
+ end
+
+ describe '.bulk_perform_async' do
+ shared_examples_for 'does not schedule the jobs for any specific time' do
+ it 'does not schedule the jobs for any specific time' do
+ perform_action
+
+ expect(worker.jobs).to all(exclude('at'))
+ end
+ end
+
+ subject(:perform_action) do
+ worker.bulk_perform_async(args)
+ end
+
+ context 'push_bulk in safe limit batches' do
+ context 'when the number of jobs to be enqueued does not exceed the safe limit' do
+ include_context 'set safe limit beyond the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'logs bulk insertions'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'does not schedule the jobs for any specific time'
end
+
+ context 'when the number of jobs to be enqueued exceeds safe limit' do
+ include_context 'set safe limit below the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'does not schedule the jobs for any specific time'
+ end
+
+ context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
+ include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
+
+ context 'when the number of jobs to be enqueued does not exceed the safe limit' do
+ include_context 'set safe limit beyond the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'logs bulk insertions'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'does not schedule the jobs for any specific time'
+ end
+
+ context 'when the number of jobs to be enqueued exceeds safe limit' do
+ include_context 'set safe limit below the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'does not schedule the jobs for any specific time'
+ end
+ end
+ end
+ end
+
+ describe '.bulk_perform_in' do
+ context 'without batches' do
+ shared_examples_for 'schedules all the jobs at a specific time' do
+ it 'schedules all the jobs at a specific time' do
+ perform_action
+
+ worker.jobs.each do |job_detail|
+ expect(job_detail['at']).to be_within(3.seconds).of(expected_scheduled_at_time)
+ end
+ end
+ end
+
+ let(:delay) { 3.minutes }
+ let(:expected_scheduled_at_time) { Time.current.to_i + delay.to_i }
+
+ subject(:perform_action) do
+ worker.bulk_perform_in(delay, args)
+ end
+
+ context 'when the scheduled time falls in the past' do
+ let(:delay) { -60 }
+
+ it 'raises an ArgumentError exception' do
+ expect { perform_action }
+ .to raise_error(ArgumentError)
+ end
+ end
+
+ context 'push_bulk in safe limit batches' do
+ context 'when the number of jobs to be enqueued does not exceed the safe limit' do
+ include_context 'set safe limit beyond the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time'
+ end
+
+ context 'when the number of jobs to be enqueued exceeds safe limit' do
+ include_context 'set safe limit below the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time'
+ end
+
+ context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
+ include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
+
+ context 'when the number of jobs to be enqueued does not exceed the safe limit' do
+ include_context 'set safe limit beyond the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time'
+ end
+
+ context 'when the number of jobs to be enqueued exceeds safe limit' do
+ include_context 'set safe limit below the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time'
+ end
+ end
+ end
+ end
+
+ context 'with batches' do
+ shared_examples_for 'schedules all the jobs at a specific time, per batch' do
+ it 'schedules all the jobs at a specific time, per batch' do
+ perform_action
+
+ expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at'])
+ expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
+ expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay)
+ expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay)
+ end
+ end
+
+ let(:delay) { 1.minute }
+ let(:batch_size) { 2 }
+ let(:batch_delay) { 10.minutes }
+
+ subject(:perform_action) do
+ worker.bulk_perform_in(delay, args, batch_size: batch_size, batch_delay: batch_delay)
+ end
+
+ context 'when the `batch_size` is invalid' do
+ context 'when `batch_size` is 0' do
+ let(:batch_size) { 0 }
+
+ it 'raises an ArgumentError exception' do
+ expect { perform_action }
+ .to raise_error(ArgumentError)
+ end
+ end
+
+ context 'when `batch_size` is negative' do
+ let(:batch_size) { -3 }
+
+ it 'raises an ArgumentError exception' do
+ expect { perform_action }
+ .to raise_error(ArgumentError)
+ end
+ end
+ end
+
+ context 'when the `batch_delay` is invalid' do
+ context 'when `batch_delay` is 0' do
+ let(:batch_delay) { 0.minutes }
+
+ it 'raises an ArgumentError exception' do
+ expect { perform_action }
+ .to raise_error(ArgumentError)
+ end
+ end
+
+ context 'when `batch_delay` is negative' do
+ let(:batch_delay) { -3.minutes }
+
+ it 'raises an ArgumentError exception' do
+ expect { perform_action }
+ .to raise_error(ArgumentError)
+ end
+ end
+ end
+
+ context 'push_bulk in safe limit batches' do
+ context 'when the number of jobs to be enqueued does not exceed the safe limit' do
+ include_context 'set safe limit beyond the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time, per batch'
+ end
+
+ context 'when the number of jobs to be enqueued exceeds safe limit' do
+ include_context 'set safe limit below the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time, per batch'
+ end
+
+ context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
+ include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
+
+ context 'when the number of jobs to be enqueued does not exceed the safe limit' do
+ include_context 'set safe limit beyond the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time, per batch'
+ end
+
+ context 'when the number of jobs to be enqueued exceeds safe limit' do
+ include_context 'set safe limit below the number of jobs to be enqueued'
+
+ it_behaves_like 'enqueues jobs in one go'
+ it_behaves_like 'returns job_id of all enqueued jobs'
+ it_behaves_like 'schedules all the jobs at a specific time, per batch'
+ end
+ end
+ end
+ end
+ end
+ end
+
+ describe '.with_status' do
+ around do |example|
+ Sidekiq::Testing.fake!(&example)
+ end
+
+ context 'when the worker does have status_expiration set' do
+ let(:status_expiration_worker) do
+ Class.new(worker) do
+ sidekiq_options status_expiration: 3
+ end
+ end
+
+ it 'uses status_expiration from the worker' do
+ status_expiration_worker.with_status.perform_async
+
+ expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3)
+ expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1)
+ end
+
+ it 'uses status_expiration from the worker without with_status' do
+ status_expiration_worker.perform_async
+
+ expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3)
+ expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1)
+ end
+ end
+
+ context 'when the worker does not have status_expiration set' do
+ it 'uses the default status_expiration' do
+ worker.with_status.perform_async
+
+ expect(Sidekiq::Queues[worker.queue].first).to include('status_expiration' => Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
+ expect(Sidekiq::Queues[worker.queue].length).to eq(1)
+ end
+
+ it 'does not set status_expiration without with_status' do
+ worker.perform_async
+
+ expect(Sidekiq::Queues[worker.queue].first).not_to include('status_expiration')
+ expect(Sidekiq::Queues[worker.queue].length).to eq(1)
end
end
end