diff options
Diffstat (limited to 'spec/workers/concerns/application_worker_spec.rb')
-rw-r--r-- | spec/workers/concerns/application_worker_spec.rb | 381 |
1 files changed, 330 insertions, 51 deletions
diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb index af038c81b9e..fbf39b3c7cd 100644 --- a/spec/workers/concerns/application_worker_spec.rb +++ b/spec/workers/concerns/application_worker_spec.rb @@ -285,48 +285,38 @@ RSpec.describe ApplicationWorker do end end - describe '.bulk_perform_async' do - before do - stub_const(worker.name, worker) + context 'different kinds of push_bulk' do + shared_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' do + before do + stub_feature_flags(sidekiq_push_bulk_in_batches: false) + end end - it 'enqueues jobs in bulk' do - Sidekiq::Testing.fake! do - worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]]) - - expect(worker.jobs.count).to eq 2 - expect(worker.jobs).to all(include('enqueued_at')) + shared_context 'set safe limit beyond the number of jobs to be enqueued' do + before do + stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", args.count + 1) end end - end - describe '.bulk_perform_in' do - before do - stub_const(worker.name, worker) + shared_context 'set safe limit below the number of jobs to be enqueued' do + before do + stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", 2) + end end - context 'when delay is valid' do - it 'correctly schedules jobs' do - Sidekiq::Testing.fake! do - worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]]) + shared_examples_for 'returns job_id of all enqueued jobs' do + let(:job_id_regex) { /[0-9a-f]{12}/ } - expect(worker.jobs.count).to eq 2 - expect(worker.jobs).to all(include('at')) - end - end - end + it 'returns job_id of all enqueued jobs' do + job_ids = perform_action - context 'when delay is invalid' do - it 'raises an ArgumentError exception' do - expect { worker.bulk_perform_in(-60, [['Foo']]) } - .to raise_error(ArgumentError) + expect(job_ids.count).to eq(args.count) + expect(job_ids).to all(match(job_id_regex)) end end - context 'with batches' do - let(:batch_delay) { 1.minute } - - it 'correctly schedules jobs' do + shared_examples_for 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do + it 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do expect(Sidekiq::Client).to( receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]])) .ordered @@ -337,29 +327,318 @@ RSpec.describe ApplicationWorker do .and_call_original) expect(Sidekiq::Client).to( receive(:push_bulk).with(hash_including('args' => [['Foo', [5]]])) - .ordered - .and_call_original) + .ordered + .and_call_original) - worker.bulk_perform_in( - 1.minute, - [['Foo', [1]], ['Foo', [2]], ['Foo', [3]], ['Foo', [4]], ['Foo', [5]]], - batch_size: 2, batch_delay: batch_delay) - - expect(worker.jobs.count).to eq 5 - expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at']) - expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at']) - expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay) - expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay) - end - - context 'when batch_size is invalid' do - it 'raises an ArgumentError exception' do - expect do - worker.bulk_perform_in(1.minute, - [['Foo']], - batch_size: -1, batch_delay: batch_delay) - end.to raise_error(ArgumentError) + perform_action + + expect(worker.jobs.count).to eq args.count + expect(worker.jobs).to all(include('enqueued_at')) + end + end + + shared_examples_for 'enqueues jobs in one go' do + it 'enqueues jobs in one go' do + expect(Sidekiq::Client).to( + receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original) + expect(Sidekiq.logger).not_to receive(:info) + + perform_action + + expect(worker.jobs.count).to eq args.count + expect(worker.jobs).to all(include('enqueued_at')) + end + end + + shared_examples_for 'logs bulk insertions' do + it 'logs arguments and job IDs' do + worker.log_bulk_perform_async! + + expect(Sidekiq.logger).to( + receive(:info).with(hash_including('class' => worker.name, 'args_list' => args)).once.and_call_original) + expect(Sidekiq.logger).to( + receive(:info).with(hash_including('class' => worker.name, 'jid_list' => anything)).once.and_call_original) + + perform_action + end + end + + before do + stub_const(worker.name, worker) + end + + let(:args) do + [ + ['Foo', [1]], + ['Foo', [2]], + ['Foo', [3]], + ['Foo', [4]], + ['Foo', [5]] + ] + end + + describe '.bulk_perform_async' do + shared_examples_for 'does not schedule the jobs for any specific time' do + it 'does not schedule the jobs for any specific time' do + perform_action + + expect(worker.jobs).to all(exclude('at')) + end + end + + subject(:perform_action) do + worker.bulk_perform_async(args) + end + + context 'push_bulk in safe limit batches' do + context 'when the number of jobs to be enqueued does not exceed the safe limit' do + include_context 'set safe limit beyond the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'logs bulk insertions' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'does not schedule the jobs for any specific time' end + + context 'when the number of jobs to be enqueued exceeds safe limit' do + include_context 'set safe limit below the number of jobs to be enqueued' + + it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'does not schedule the jobs for any specific time' + end + + context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do + include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' + + context 'when the number of jobs to be enqueued does not exceed the safe limit' do + include_context 'set safe limit beyond the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'logs bulk insertions' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'does not schedule the jobs for any specific time' + end + + context 'when the number of jobs to be enqueued exceeds safe limit' do + include_context 'set safe limit below the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'does not schedule the jobs for any specific time' + end + end + end + end + + describe '.bulk_perform_in' do + context 'without batches' do + shared_examples_for 'schedules all the jobs at a specific time' do + it 'schedules all the jobs at a specific time' do + perform_action + + worker.jobs.each do |job_detail| + expect(job_detail['at']).to be_within(3.seconds).of(expected_scheduled_at_time) + end + end + end + + let(:delay) { 3.minutes } + let(:expected_scheduled_at_time) { Time.current.to_i + delay.to_i } + + subject(:perform_action) do + worker.bulk_perform_in(delay, args) + end + + context 'when the scheduled time falls in the past' do + let(:delay) { -60 } + + it 'raises an ArgumentError exception' do + expect { perform_action } + .to raise_error(ArgumentError) + end + end + + context 'push_bulk in safe limit batches' do + context 'when the number of jobs to be enqueued does not exceed the safe limit' do + include_context 'set safe limit beyond the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time' + end + + context 'when the number of jobs to be enqueued exceeds safe limit' do + include_context 'set safe limit below the number of jobs to be enqueued' + + it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time' + end + + context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do + include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' + + context 'when the number of jobs to be enqueued does not exceed the safe limit' do + include_context 'set safe limit beyond the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time' + end + + context 'when the number of jobs to be enqueued exceeds safe limit' do + include_context 'set safe limit below the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time' + end + end + end + end + + context 'with batches' do + shared_examples_for 'schedules all the jobs at a specific time, per batch' do + it 'schedules all the jobs at a specific time, per batch' do + perform_action + + expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at']) + expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at']) + expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay) + expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay) + end + end + + let(:delay) { 1.minute } + let(:batch_size) { 2 } + let(:batch_delay) { 10.minutes } + + subject(:perform_action) do + worker.bulk_perform_in(delay, args, batch_size: batch_size, batch_delay: batch_delay) + end + + context 'when the `batch_size` is invalid' do + context 'when `batch_size` is 0' do + let(:batch_size) { 0 } + + it 'raises an ArgumentError exception' do + expect { perform_action } + .to raise_error(ArgumentError) + end + end + + context 'when `batch_size` is negative' do + let(:batch_size) { -3 } + + it 'raises an ArgumentError exception' do + expect { perform_action } + .to raise_error(ArgumentError) + end + end + end + + context 'when the `batch_delay` is invalid' do + context 'when `batch_delay` is 0' do + let(:batch_delay) { 0.minutes } + + it 'raises an ArgumentError exception' do + expect { perform_action } + .to raise_error(ArgumentError) + end + end + + context 'when `batch_delay` is negative' do + let(:batch_delay) { -3.minutes } + + it 'raises an ArgumentError exception' do + expect { perform_action } + .to raise_error(ArgumentError) + end + end + end + + context 'push_bulk in safe limit batches' do + context 'when the number of jobs to be enqueued does not exceed the safe limit' do + include_context 'set safe limit beyond the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time, per batch' + end + + context 'when the number of jobs to be enqueued exceeds safe limit' do + include_context 'set safe limit below the number of jobs to be enqueued' + + it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time, per batch' + end + + context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do + include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' + + context 'when the number of jobs to be enqueued does not exceed the safe limit' do + include_context 'set safe limit beyond the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time, per batch' + end + + context 'when the number of jobs to be enqueued exceeds safe limit' do + include_context 'set safe limit below the number of jobs to be enqueued' + + it_behaves_like 'enqueues jobs in one go' + it_behaves_like 'returns job_id of all enqueued jobs' + it_behaves_like 'schedules all the jobs at a specific time, per batch' + end + end + end + end + end + end + + describe '.with_status' do + around do |example| + Sidekiq::Testing.fake!(&example) + end + + context 'when the worker does have status_expiration set' do + let(:status_expiration_worker) do + Class.new(worker) do + sidekiq_options status_expiration: 3 + end + end + + it 'uses status_expiration from the worker' do + status_expiration_worker.with_status.perform_async + + expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3) + expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1) + end + + it 'uses status_expiration from the worker without with_status' do + status_expiration_worker.perform_async + + expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3) + expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1) + end + end + + context 'when the worker does not have status_expiration set' do + it 'uses the default status_expiration' do + worker.with_status.perform_async + + expect(Sidekiq::Queues[worker.queue].first).to include('status_expiration' => Gitlab::SidekiqStatus::DEFAULT_EXPIRATION) + expect(Sidekiq::Queues[worker.queue].length).to eq(1) + end + + it 'does not set status_expiration without with_status' do + worker.perform_async + + expect(Sidekiq::Queues[worker.queue].first).not_to include('status_expiration') + expect(Sidekiq::Queues[worker.queue].length).to eq(1) end end end |