Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb')
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb252
1 files changed, 245 insertions, 7 deletions
diff --git a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
index d67cb95f483..cc69a11f7f8 100644
--- a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
@@ -9,7 +9,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
described_class.new(job, queue)
end
- let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123' } }
+ let(:wal_locations) do
+ {
+ main: '0/D525E3A8',
+ ci: 'AB/12345'
+ }
+ end
+
+ let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123', 'wal_locations' => wal_locations } }
let(:queue) { 'authorized_projects' }
let(:idempotency_key) do
@@ -74,13 +81,39 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when there was no job in the queue yet' do
it { expect(duplicate_job.check!).to eq('123') }
- it "adds a key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
+ it "adds a idempotency key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
expect { duplicate_job.check! }
.to change { read_idempotency_key_with_ttl(idempotency_key) }
.from([nil, -2])
.to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
end
+ context 'when wal locations is not empty' do
+ it "adds a existing wal locations key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
+ expect { duplicate_job.check! }
+ .to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
+ .from([nil, -2])
+ .to([wal_locations[:main], be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
+ .and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
+ .from([nil, -2])
+ .to([wal_locations[:ci], be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
+ end
+ end
+
+ context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
+ before do
+ stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
+ end
+
+ it "does not change the existing wal locations key's TTL" do
+ expect { duplicate_job.check! }
+ .to not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
+ .from([nil, -2])
+ .and not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
+ .from([nil, -2])
+ end
+ end
+
it "adds the idempotency key to the jobs payload" do
expect { duplicate_job.check! }.to change { job['idempotency_key'] }.from(nil).to(idempotency_key)
end
@@ -89,6 +122,9 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when there was already a job with same arguments in the same queue' do
before do
set_idempotency_key(idempotency_key, 'existing-key')
+ wal_locations.each do |config_name, location|
+ set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location)
+ end
end
it { expect(duplicate_job.check!).to eq('existing-key') }
@@ -99,6 +135,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
.from(['existing-key', -1])
end
+ it "does not change the existing wal locations key's TTL" do
+ expect { duplicate_job.check! }
+ .to not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
+ .from([wal_locations[:main], -1])
+ .and not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
+ .from([wal_locations[:ci], -1])
+ end
+
it 'sets the existing jid' do
duplicate_job.check!
@@ -107,6 +151,117 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
end
+ describe '#update_latest_wal_location!' do
+ let(:offset) { '1024' }
+
+ before do
+ allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:main).and_return(offset)
+ allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:ci).and_return(offset)
+ end
+
+ shared_examples 'updates wal location' do
+ it 'updates a wal location to redis with an offset' do
+ expect { duplicate_job.update_latest_wal_location! }
+ .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
+ .from(existing_wal_with_offset[:main])
+ .to(new_wal_with_offset[:main])
+ .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
+ .from(existing_wal_with_offset[:ci])
+ .to(new_wal_with_offset[:ci])
+ end
+ end
+
+ context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
+ before do
+ stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
+ end
+
+ it "doesn't call Sidekiq.redis" do
+ expect(Sidekiq).not_to receive(:redis)
+
+ duplicate_job.update_latest_wal_location!
+ end
+
+ it "doesn't update a wal location to redis with an offset" do
+ expect { duplicate_job.update_latest_wal_location! }
+ .to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
+ .from([])
+ .and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
+ .from([])
+ end
+ end
+
+ context "when the key doesn't exists in redis" do
+ include_examples 'updates wal location' do
+ let(:existing_wal_with_offset) { { main: [], ci: [] } }
+ let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } }
+ end
+ end
+
+ context "when the key exists in redis" do
+ let(:existing_offset) { '1023'}
+ let(:existing_wal_locations) do
+ {
+ main: '0/D525E3NM',
+ ci: 'AB/111112'
+ }
+ end
+
+ before do
+ rpush_to_redis_key(wal_location_key(idempotency_key, :main), existing_wal_locations[:main], existing_offset)
+ rpush_to_redis_key(wal_location_key(idempotency_key, :ci), existing_wal_locations[:ci], existing_offset)
+ end
+
+ context "when the new offset is bigger then the existing one" do
+ include_examples 'updates wal location' do
+ let(:existing_wal_with_offset) { existing_wal_locations.transform_values { |v| [v, existing_offset] } }
+ let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } }
+ end
+ end
+
+ context "when the old offset is not bigger then the existing one" do
+ let(:existing_offset) { offset }
+
+ it "does not update a wal location to redis with an offset" do
+ expect { duplicate_job.update_latest_wal_location! }
+ .to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
+ .from([existing_wal_locations[:main], existing_offset])
+ .and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
+ .from([existing_wal_locations[:ci], existing_offset])
+ end
+ end
+ end
+ end
+
+ describe '#latest_wal_locations' do
+ context 'when job was deduplicated and wal locations were already persisted' do
+ before do
+ rpush_to_redis_key(wal_location_key(idempotency_key, :main), wal_locations[:main], 1024)
+ rpush_to_redis_key(wal_location_key(idempotency_key, :ci), wal_locations[:ci], 1024)
+ end
+
+ it { expect(duplicate_job.latest_wal_locations).to eq(wal_locations) }
+ end
+
+ context 'when job is not deduplication and wal locations were not persisted' do
+ it { expect(duplicate_job.latest_wal_locations).to be_empty }
+ end
+
+ context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
+ before do
+ stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
+ end
+
+ it "doesn't call Sidekiq.redis" do
+ expect(Sidekiq).not_to receive(:redis)
+
+ duplicate_job.latest_wal_locations
+ end
+
+ it { expect(duplicate_job.latest_wal_locations).to eq({}) }
+ end
+ end
+
describe '#delete!' do
context "when we didn't track the definition" do
it { expect { duplicate_job.delete! }.not_to raise_error }
@@ -115,14 +270,79 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when the key exists in redis' do
before do
set_idempotency_key(idempotency_key, 'existing-jid')
+ wal_locations.each do |config_name, location|
+ set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location)
+ set_idempotency_key(wal_location_key(idempotency_key, config_name), location)
+ end
end
shared_examples 'deleting the duplicate job' do
- it 'removes the key from redis' do
- expect { duplicate_job.delete! }
- .to change { read_idempotency_key_with_ttl(idempotency_key) }
- .from(['existing-jid', -1])
- .to([nil, -2])
+ shared_examples 'deleting keys from redis' do |key_name|
+ it "removes the #{key_name} from redis" do
+ expect { duplicate_job.delete! }
+ .to change { read_idempotency_key_with_ttl(key) }
+ .from([from_value, -1])
+ .to([nil, -2])
+ end
+ end
+
+ shared_examples 'does not delete key from redis' do |key_name|
+ it "does not remove the #{key_name} from redis" do
+ expect { duplicate_job.delete! }
+ .to not_change { read_idempotency_key_with_ttl(key) }
+ .from([from_value, -1])
+ end
+ end
+
+ it_behaves_like 'deleting keys from redis', 'idempotent key' do
+ let(:key) { idempotency_key }
+ let(:from_value) { 'existing-jid' }
+ end
+
+ it_behaves_like 'deleting keys from redis', 'existing wal location keys for main database' do
+ let(:key) { existing_wal_location_key(idempotency_key, :main) }
+ let(:from_value) { wal_locations[:main] }
+ end
+
+ it_behaves_like 'deleting keys from redis', 'existing wal location keys for ci database' do
+ let(:key) { existing_wal_location_key(idempotency_key, :ci) }
+ let(:from_value) { wal_locations[:ci] }
+ end
+
+ it_behaves_like 'deleting keys from redis', 'latest wal location keys for main database' do
+ let(:key) { wal_location_key(idempotency_key, :main) }
+ let(:from_value) { wal_locations[:main] }
+ end
+
+ it_behaves_like 'deleting keys from redis', 'latest wal location keys for ci database' do
+ let(:key) { wal_location_key(idempotency_key, :ci) }
+ let(:from_value) { wal_locations[:ci] }
+ end
+
+ context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
+ before do
+ stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
+ end
+
+ it_behaves_like 'does not delete key from redis', 'latest wal location keys for main database' do
+ let(:key) { existing_wal_location_key(idempotency_key, :main) }
+ let(:from_value) { wal_locations[:main] }
+ end
+
+ it_behaves_like 'does not delete key from redis', 'latest wal location keys for ci database' do
+ let(:key) { existing_wal_location_key(idempotency_key, :ci) }
+ let(:from_value) { wal_locations[:ci] }
+ end
+
+ it_behaves_like 'does not delete key from redis', 'latest wal location keys for main database' do
+ let(:key) { wal_location_key(idempotency_key, :main) }
+ let(:from_value) { wal_locations[:main] }
+ end
+
+ it_behaves_like 'does not delete key from redis', 'latest wal location keys for ci database' do
+ let(:key) { wal_location_key(idempotency_key, :ci) }
+ let(:from_value) { wal_locations[:ci] }
+ end
end
end
@@ -254,10 +474,22 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
end
+ def existing_wal_location_key(idempotency_key, config_name)
+ "#{idempotency_key}:#{config_name}:existing_wal_location"
+ end
+
+ def wal_location_key(idempotency_key, config_name)
+ "#{idempotency_key}:#{config_name}:wal_location"
+ end
+
def set_idempotency_key(key, value = '1')
Sidekiq.redis { |r| r.set(key, value) }
end
+ def rpush_to_redis_key(key, wal, offset)
+ Sidekiq.redis { |r| r.rpush(key, [wal, offset]) }
+ end
+
def read_idempotency_key_with_ttl(key)
Sidekiq.redis do |redis|
redis.pipelined do |p|
@@ -266,4 +498,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
end
end
+
+ def read_range_from_redis(key)
+ Sidekiq.redis do |redis|
+ redis.lrange(key, 0, -1)
+ end
+ end
end