diff options
Diffstat (limited to 'spec/workers/click_house/events_sync_worker_spec.rb')
-rw-r--r-- | spec/workers/click_house/events_sync_worker_spec.rb | 145 |
1 files changed, 123 insertions, 22 deletions
diff --git a/spec/workers/click_house/events_sync_worker_spec.rb b/spec/workers/click_house/events_sync_worker_spec.rb index 8f328839cfd..01267db36a7 100644 --- a/spec/workers/click_house/events_sync_worker_spec.rb +++ b/spec/workers/click_house/events_sync_worker_spec.rb @@ -3,48 +3,125 @@ require 'spec_helper' RSpec.describe ClickHouse::EventsSyncWorker, feature_category: :value_stream_management do - let(:databases) { { main: :some_db } } let(:worker) { described_class.new } - before do - allow(ClickHouse::Client.configuration).to receive(:databases).and_return(databases) - end - - include_examples 'an idempotent worker' do - context 'when the event_sync_worker_for_click_house feature flag is on' do + it_behaves_like 'an idempotent worker' do + context 'when the event_sync_worker_for_click_house feature flag is on', :click_house do before do stub_feature_flags(event_sync_worker_for_click_house: true) end - it 'returns true' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :processed }) + context 'when there is nothing to sync' do + it 'adds metadata for the worker' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 0, reached_end_of_table: true }) - worker.perform + worker.perform + + events = ClickHouse::Client.select('SELECT * FROM events', :main) + expect(events).to be_empty + end end - context 'when no ClickHouse databases are configured' do - let(:databases) { {} } + context 'when syncing records' do + let_it_be(:group) { create(:group) } + let_it_be(:project) { create(:project, group: group) } + let_it_be(:issue) { create(:issue, project: project) } + let_it_be(:project_event2) { create(:event, :closed, project: project, target: issue) } + let_it_be(:event_without_parent) { create(:event, :joined, project: nil, group: nil) } + let_it_be(:group_event) { create(:event, :created, group: group, project: nil) } + let_it_be(:project_event1) { create(:event, :created, project: project, target: issue) } + # looks invalid but we have some records like this on PRD - it 'skips execution' do - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :disabled }) + it 'inserts all records' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 4, reached_end_of_table: true }) worker.perform + + expected_records = [ + hash_including('id' => project_event2.id, 'path' => "#{group.id}/#{project.project_namespace.id}/", + 'target_type' => 'Issue'), + hash_including('id' => event_without_parent.id, 'path' => '', 'target_type' => ''), + hash_including('id' => group_event.id, 'path' => "#{group.id}/", 'target_type' => ''), + hash_including('id' => project_event1.id, 'path' => "#{group.id}/#{project.project_namespace.id}/", + 'target_type' => 'Issue') + ] + + events = ClickHouse::Client.select('SELECT * FROM events ORDER BY id', :main) + + expect(events).to match(expected_records) + + last_processed_id = ClickHouse::SyncCursor.cursor_for(:events) + expect(last_processed_id).to eq(project_event1.id) end - end - context 'when exclusive lease error happens' do - it 'skips execution' do - expect(worker).to receive(:in_lock).and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) - expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :skipped }) + context 'when multiple batches are needed' do + before do + stub_const("#{described_class}::BATCH_SIZE", 1) + stub_const("#{described_class}::INSERT_BATCH_SIZE", 1) + end - worker.perform + it 'inserts all records' do + worker.perform + + events = ClickHouse::Client.select('SELECT * FROM events', :main) + expect(events.size).to eq(4) + end + end + + context 'when time limit is reached' do + before do + stub_const("#{described_class}::BATCH_SIZE", 1) + end + + it 'stops the processing' do + allow_next_instance_of(Analytics::CycleAnalytics::RuntimeLimiter) do |runtime_limiter| + allow(runtime_limiter).to receive(:over_time?).and_return(false, true) + end + + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 2, reached_end_of_table: false }) + + worker.perform + + last_processed_id = ClickHouse::SyncCursor.cursor_for(:events) + expect(last_processed_id).to eq(event_without_parent.id) + end + end + + context 'when syncing from a certain point' do + before do + ClickHouse::SyncCursor.update_cursor_for(:events, project_event2.id) + end + + it 'syncs records after the cursor' do + worker.perform + + events = ClickHouse::Client.select('SELECT id FROM events ORDER BY id', :main) + expect(events).to eq([{ 'id' => event_without_parent.id }, { 'id' => group_event.id }, + { 'id' => project_event1.id }]) + end + + context 'when there is nothing to sync' do + it 'does nothing' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, + { status: :processed, records_inserted: 0, reached_end_of_table: true }) + + ClickHouse::SyncCursor.update_cursor_for(:events, project_event1.id) + worker.perform + + events = ClickHouse::Client.select('SELECT id FROM events ORDER BY id', :main) + expect(events).to be_empty + end + end end end end - context 'when the event_sync_worker_for_click_house feature flag is off' do + context 'when clickhouse is not configured' do before do - stub_feature_flags(event_sync_worker_for_click_house: false) + allow(ClickHouse::Client.configuration).to receive(:databases).and_return({}) end it 'skips execution' do @@ -54,4 +131,28 @@ RSpec.describe ClickHouse::EventsSyncWorker, feature_category: :value_stream_man end end end + + context 'when exclusive lease error happens' do + it 'skips execution' do + stub_feature_flags(event_sync_worker_for_click_house: true) + allow(ClickHouse::Client.configuration).to receive(:databases).and_return({ main: :some_db }) + + expect(worker).to receive(:in_lock).and_raise(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError) + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :skipped }) + + worker.perform + end + end + + context 'when the event_sync_worker_for_click_house feature flag is off' do + before do + stub_feature_flags(event_sync_worker_for_click_house: false) + end + + it 'skips execution' do + expect(worker).to receive(:log_extra_metadata_on_done).with(:result, { status: :disabled }) + + worker.perform + end + end end |