diff options
Diffstat (limited to 'spec/workers/click_house')
-rw-r--r-- | spec/workers/click_house/event_authors_consistency_cron_worker_spec.rb | 104 | ||||
-rw-r--r-- | spec/workers/click_house/events_sync_worker_spec.rb | 174 |
2 files changed, 113 insertions, 165 deletions
diff --git a/spec/workers/click_house/event_authors_consistency_cron_worker_spec.rb b/spec/workers/click_house/event_authors_consistency_cron_worker_spec.rb new file mode 100644 index 00000000000..d4fa35b9b82 --- /dev/null +++ b/spec/workers/click_house/event_authors_consistency_cron_worker_spec.rb @@ -0,0 +1,104 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ClickHouse::EventAuthorsConsistencyCronWorker, feature_category: :value_stream_management do + let(:worker) { described_class.new } + + context 'when ClickHouse is disabled' do + it 'does nothing' do + allow(ClickHouse::Client).to receive(:database_configured?).and_return(false) + + expect(worker).not_to receive(:log_extra_metadata_on_done) + + worker.perform + end + end + + context 'when the event_sync_worker_for_click_house feature flag is off' do + it 'does nothing' do + allow(ClickHouse::Client).to receive(:database_configured?).and_return(true) + stub_feature_flags(event_sync_worker_for_click_house: false) + + expect(worker).not_to receive(:log_extra_metadata_on_done) + + worker.perform + end + end + + context 'when ClickHouse is available', :click_house do + let_it_be(:connection) { ClickHouse::Connection.new(:main) } + let_it_be_with_reload(:user1) { create(:user) } + let_it_be_with_reload(:user2) { create(:user) } + + let(:leftover_author_ids) { connection.select('SELECT DISTINCT author_id FROM events FINAL').pluck('author_id') } + let(:deleted_user_id1) { user2.id + 1 } + let(:deleted_user_id2) { user2.id + 2 } + + before do + insert_query = <<~SQL + INSERT INTO events (id, author_id) VALUES + (1, #{user1.id}), + (2, #{user2.id}), + (3, #{deleted_user_id1}), + (4, #{deleted_user_id1}), + (5, #{deleted_user_id2}) + SQL + + connection.execute(insert_query) + end + + it 'cleans up all inconsistent records in ClickHouse' do + worker.perform + + expect(leftover_author_ids).to contain_exactly(user1.id, user2.id) + + # the next job starts from the beginning of the table + expect(ClickHouse::SyncCursor.cursor_for(:event_authors_consistency_check)).to eq(0) + end + + context 'when the previous job was not finished' do + it 'continues the processing from the cursor' do + ClickHouse::SyncCursor.update_cursor_for(:event_authors_consistency_check, deleted_user_id1) + + worker.perform + + # the previous records should remain + expect(leftover_author_ids).to contain_exactly(user1.id, user2.id) + end + end + + context 'when processing stops due to the record clean up limit' do + it 'stores the last processed id value' do + User.where(id: [user1.id, user2.id]).delete_all + + stub_const("#{described_class}::MAX_AUTHOR_DELETIONS", 2) + stub_const("#{described_class}::POSTGRESQL_BATCH_SIZE", 1) + + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :deletion_limit_reached, deletions: 2 }) + + worker.perform + + expect(leftover_author_ids).to contain_exactly(deleted_user_id1, deleted_user_id2) + expect(ClickHouse::SyncCursor.cursor_for(:event_authors_consistency_check)).to eq(user2.id) + end + end + + context 'when time limit is reached' do + it 'stops the processing earlier' do + stub_const("#{described_class}::POSTGRESQL_BATCH_SIZE", 1) + + # stop at the third author_id + allow_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |runtime_limiter| + allow(runtime_limiter).to receive(:over_time?).and_return(false, false, true) + end + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :over_time, deletions: 1 }) + + worker.perform + + expect(leftover_author_ids).to contain_exactly(user1.id, user2.id, deleted_user_id2) + end + end + end +end diff --git a/spec/workers/click_house/events_sync_worker_spec.rb b/spec/workers/click_house/events_sync_worker_spec.rb index 9662f26115a..dc3dea24e37 100644 --- a/spec/workers/click_house/events_sync_worker_spec.rb +++ b/spec/workers/click_house/events_sync_worker_spec.rb @@ -11,176 +11,20 @@ RSpec.describe ClickHouse::EventsSyncWorker, feature_category: :value_stream_man ) end - it_behaves_like 'an idempotent worker' do - context 'when the event_sync_worker_for_click_house feature flag is on', :click_house do - before do - stub_feature_flags(event_sync_worker_for_click_house: true) + context 'when worker is enqueued' do + it 'calls ::ClickHouse::SyncStrategies::EventSyncStrategy with correct args' do + expect_next_instance_of(::ClickHouse::SyncStrategies::EventSyncStrategy) do |instance| + expect(instance).to receive(:execute) end - context 'when there is nothing to sync' do - it 'adds metadata for the worker' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 0, reached_end_of_table: true }) - - worker.perform - - events = ClickHouse::Client.select('SELECT * FROM events', :main) - expect(events).to be_empty - end - end - - context 'when syncing records' do - let_it_be(:group) { create(:group) } - let_it_be(:project) { create(:project, group: group) } - let_it_be(:issue) { create(:issue, project: project) } - let_it_be(:project_event2) { create(:event, :closed, project: project, target: issue) } - let_it_be(:event_without_parent) { create(:event, :joined, project: nil, group: nil) } - let_it_be(:group_event) { create(:event, :created, group: group, project: nil) } - let_it_be(:project_event1) { create(:event, :created, project: project, target: issue) } - # looks invalid but we have some records like this on PRD - - it 'inserts all records' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 4, reached_end_of_table: true }) - - worker.perform - - expected_records = [ - hash_including('id' => project_event2.id, 'path' => "#{group.id}/#{project.project_namespace.id}/", - 'target_type' => 'Issue'), - hash_including('id' => event_without_parent.id, 'path' => '', 'target_type' => ''), - hash_including('id' => group_event.id, 'path' => "#{group.id}/", 'target_type' => ''), - hash_including('id' => project_event1.id, 'path' => "#{group.id}/#{project.project_namespace.id}/", - 'target_type' => 'Issue') - ] - - events = ClickHouse::Client.select('SELECT * FROM events ORDER BY id', :main) - - expect(events).to match(expected_records) - - last_processed_id = ClickHouse::SyncCursor.cursor_for(:events) - expect(last_processed_id).to eq(project_event1.id) - end - - context 'when multiple batches are needed' do - before do - stub_const("#{described_class}::BATCH_SIZE", 1) - stub_const("#{described_class}::INSERT_BATCH_SIZE", 1) - end - - it 'inserts all records' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 4, reached_end_of_table: true }) - - worker.perform - - events = ClickHouse::Client.select('SELECT * FROM events', :main) - expect(events.size).to eq(4) - end - - context 'when new records are inserted while processing' do - it 'does not process new records created during the iteration' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 4, - reached_end_of_table: true }) - - # Simulating the case when there is an insert during the iteration - call_count = 0 - allow(worker).to receive(:next_batch).and_wrap_original do |method| - call_count += 1 - create(:event) if call_count == 3 - method.call - end - - worker.perform - end - end - end - - context 'when time limit is reached' do - before do - stub_const("#{described_class}::BATCH_SIZE", 1) - end - - it 'stops the processing' do - allow_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |runtime_limiter| - allow(runtime_limiter).to receive(:over_time?).and_return(false, true) - end - - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 2, reached_end_of_table: false }) - - worker.perform - - last_processed_id = ClickHouse::SyncCursor.cursor_for(:events) - expect(last_processed_id).to eq(event_without_parent.id) - end - end - - context 'when syncing from a certain point' do - before do - ClickHouse::SyncCursor.update_cursor_for(:events, project_event2.id) - end - - it 'syncs records after the cursor' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 3, reached_end_of_table: true }) - - worker.perform - - events = ClickHouse::Client.select('SELECT id FROM events ORDER BY id', :main) - expect(events).to eq([{ 'id' => event_without_parent.id }, { 'id' => group_event.id }, - { 'id' => project_event1.id }]) - end - - context 'when there is nothing to sync' do - it 'does nothing' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, - { status: :processed, records_inserted: 0, reached_end_of_table: true }) - - ClickHouse::SyncCursor.update_cursor_for(:events, project_event1.id) - worker.perform - - events = ClickHouse::Client.select('SELECT id FROM events ORDER BY id', :main) - expect(events).to be_empty - end - end - end - end - end - - context 'when clickhouse is not configured' do - before do - allow(ClickHouse::Client).to receive(:database_configured?).and_return(false) - end - - it 'skips execution' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :disabled }) - - worker.perform - end - end - end - - context 'when exclusive lease error happens' do - it 'skips execution' do - stub_feature_flags(event_sync_worker_for_click_house: true) - allow(ClickHouse::Client).to receive(:database_configured?).with(:main).and_return(true) - - expect(worker).to receive(:in_lock).and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :skipped }) - worker.perform end - end - context 'when the event_sync_worker_for_click_house feature flag is off' do - before do - stub_feature_flags(event_sync_worker_for_click_house: false) - end - - it 'skips execution' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :disabled }) + it 'correctly logs the metadata on done' do + expect_next_instance_of(::ClickHouse::SyncStrategies::EventSyncStrategy) do |instance| + expect(instance).to receive(:execute).and_return({ status: :ok }) + end + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :ok }) worker.perform end |