diff options
Diffstat (limited to 'app')
20 files changed, 409 insertions, 60 deletions
diff --git a/app/assets/javascripts/environments/components/kubernetes_tabs.vue b/app/assets/javascripts/environments/components/kubernetes_tabs.vue index 0d80b1fd797..da37df3fae7 100644 --- a/app/assets/javascripts/environments/components/kubernetes_tabs.vue +++ b/app/assets/javascripts/environments/components/kubernetes_tabs.vue @@ -1,9 +1,12 @@ <script> import { GlTabs, GlTab, GlLoadingIcon, GlBadge, GlTable, GlPagination } from '@gitlab/ui'; import { __, s__ } from '~/locale'; -import { getAge } from '~/kubernetes_dashboard/helpers/k8s_integration_helper'; +import { + getAge, + generateServicePortsString, +} from '~/kubernetes_dashboard/helpers/k8s_integration_helper'; +import { SERVICES_TABLE_FIELDS } from '~/kubernetes_dashboard/constants'; import k8sServicesQuery from '../graphql/queries/k8s_services.query.graphql'; -import { generateServicePortsString } from '../helpers/k8s_integration_helper'; import { SERVICES_LIMIT_PER_PAGE } from '../constants'; import KubernetesSummary from './kubernetes_summary.vue'; @@ -82,6 +85,14 @@ export default { ? null : nextPage; }, + servicesFields() { + return SERVICES_TABLE_FIELDS.map((field) => { + return { + ...field, + thClass: tableHeadingClasses, + }; + }); + }, }, i18n: { servicesTitle: s__('Environment|Services'), @@ -94,43 +105,6 @@ export default { ports: s__('Environment|Ports'), age: s__('Environment|Age'), }, - servicesFields: [ - { - key: 'name', - label: __('Name'), - thClass: tableHeadingClasses, - }, - { - key: 'namespace', - label: __('Namespace'), - thClass: tableHeadingClasses, - }, - { - key: 'type', - label: __('Type'), - thClass: tableHeadingClasses, - }, - { - key: 'clusterIP', - label: s__('Environment|Cluster IP'), - thClass: tableHeadingClasses, - }, - { - key: 'externalIP', - label: s__('Environment|External IP'), - thClass: tableHeadingClasses, - }, - { - key: 'ports', - label: s__('Environment|Ports'), - thClass: tableHeadingClasses, - }, - { - key: 'age', - label: s__('Environment|Age'), - thClass: tableHeadingClasses, - }, - ], SERVICES_LIMIT_PER_PAGE, }; </script> @@ -154,7 +128,7 @@ export default { <gl-table v-else - :fields="$options.servicesFields" + :fields="servicesFields" :items="servicesItems" :per-page="$options.SERVICES_LIMIT_PER_PAGE" :current-page="currentPage" diff --git a/app/assets/javascripts/environments/helpers/k8s_integration_helper.js b/app/assets/javascripts/environments/helpers/k8s_integration_helper.js index 8b907f0b174..99d5ee44b6c 100644 --- a/app/assets/javascripts/environments/helpers/k8s_integration_helper.js +++ b/app/assets/javascripts/environments/helpers/k8s_integration_helper.js @@ -13,17 +13,6 @@ import { } from '~/kubernetes_dashboard/constants'; import { CLUSTER_AGENT_ERROR_MESSAGES } from '../constants'; -export function generateServicePortsString(ports) { - if (!ports?.length) return ''; - - return ports - .map((port) => { - const nodePort = port.nodePort ? `:${port.nodePort}` : ''; - return `${port.port}${nodePort}/${port.protocol}`; - }) - .join(', '); -} - export function getDeploymentsStatuses(items) { const failed = []; const ready = []; diff --git a/app/assets/javascripts/kubernetes_dashboard/components/workload_details.vue b/app/assets/javascripts/kubernetes_dashboard/components/workload_details.vue index 0d219f915c9..41fb2527036 100644 --- a/app/assets/javascripts/kubernetes_dashboard/components/workload_details.vue +++ b/app/assets/javascripts/kubernetes_dashboard/components/workload_details.vue @@ -14,8 +14,7 @@ export default { item: { type: Object, required: true, - validator: (item) => - ['name', 'kind', 'labels', 'annotations', 'status'].every((key) => item[key]), + validator: (item) => ['name', 'kind', 'labels', 'annotations'].every((key) => item[key]), }, }, computed: { @@ -63,7 +62,7 @@ export default { </gl-badge> </div> </workload-details-item> - <workload-details-item :label="$options.i18n.status"> + <workload-details-item v-if="item.status" :label="$options.i18n.status"> <gl-badge :variant="$options.WORKLOAD_STATUS_BADGE_VARIANTS[item.status]">{{ item.status }}</gl-badge></workload-details-item diff --git a/app/assets/javascripts/kubernetes_dashboard/components/workload_layout.vue b/app/assets/javascripts/kubernetes_dashboard/components/workload_layout.vue index 8c6a08ad504..8b1436b5486 100644 --- a/app/assets/javascripts/kubernetes_dashboard/components/workload_layout.vue +++ b/app/assets/javascripts/kubernetes_dashboard/components/workload_layout.vue @@ -33,6 +33,11 @@ export default { type: Array, required: true, }, + fields: { + type: Array, + required: false, + default: null, + }, }, data() { return { @@ -59,7 +64,7 @@ export default { </gl-alert> <div v-else> <workload-stats :stats="stats" /> - <workload-table :items="items" @select-item="onItemSelect" /> + <workload-table :items="items" :fields="fields" @select-item="onItemSelect" /> <gl-drawer :open="showDetailsDrawer" diff --git a/app/assets/javascripts/kubernetes_dashboard/constants.js b/app/assets/javascripts/kubernetes_dashboard/constants.js index cc554722bba..0696fcab875 100644 --- a/app/assets/javascripts/kubernetes_dashboard/constants.js +++ b/app/assets/javascripts/kubernetes_dashboard/constants.js @@ -1,4 +1,4 @@ -import { s__ } from '~/locale'; +import { __, s__ } from '~/locale'; export const STATUS_RUNNING = 'Running'; export const STATUS_PENDING = 'Pending'; @@ -53,3 +53,34 @@ export const DEFAULT_WORKLOAD_TABLE_FIELDS = [ export const STATUS_TRUE = 'True'; export const STATUS_FALSE = 'False'; + +export const SERVICES_TABLE_FIELDS = [ + { + key: 'name', + label: __('Name'), + }, + { + key: 'namespace', + label: __('Namespace'), + }, + { + key: 'type', + label: __('Type'), + }, + { + key: 'clusterIP', + label: s__('Environment|Cluster IP'), + }, + { + key: 'externalIP', + label: s__('Environment|External IP'), + }, + { + key: 'ports', + label: s__('Environment|Ports'), + }, + { + key: 'age', + label: s__('Environment|Age'), + }, +]; diff --git a/app/assets/javascripts/kubernetes_dashboard/graphql/client.js b/app/assets/javascripts/kubernetes_dashboard/graphql/client.js index 4a1ab56a8e9..9454465df9d 100644 --- a/app/assets/javascripts/kubernetes_dashboard/graphql/client.js +++ b/app/assets/javascripts/kubernetes_dashboard/graphql/client.js @@ -8,6 +8,7 @@ import k8sReplicaSetsQuery from './queries/k8s_dashboard_replica_sets.query.grap import k8sDaemonSetsQuery from './queries/k8s_dashboard_daemon_sets.query.graphql'; import k8sJobsQuery from './queries/k8s_dashboard_jobs.query.graphql'; import k8sCronJobsQuery from './queries/k8s_dashboard_cron_jobs.query.graphql'; +import k8sServicesQuery from './queries/k8s_dashboard_services.query.graphql'; import { resolvers } from './resolvers'; export const apolloProvider = () => { @@ -110,6 +111,19 @@ export const apolloProvider = () => { }, }); + cache.writeQuery({ + query: k8sServicesQuery, + data: { + metadata, + spec: { + type: null, + clusterIP: null, + externalIP: null, + ports: null, + }, + }, + }); + return new VueApollo({ defaultClient, }); diff --git a/app/assets/javascripts/kubernetes_dashboard/graphql/helpers/resolver_helpers.js b/app/assets/javascripts/kubernetes_dashboard/graphql/helpers/resolver_helpers.js index a06883a0b24..b9c195d83d0 100644 --- a/app/assets/javascripts/kubernetes_dashboard/graphql/helpers/resolver_helpers.js +++ b/app/assets/javascripts/kubernetes_dashboard/graphql/helpers/resolver_helpers.js @@ -62,6 +62,24 @@ export const mapJobItem = (item) => { }; }; +export const mapServicesItems = (item) => { + const { type, clusterIP, externalIP, ports } = item.spec; + + return { + metadata: { + ...item.metadata, + annotations: item.metadata?.annotations || {}, + labels: item.metadata?.labels || {}, + }, + spec: { + type, + clusterIP: clusterIP || '-', + externalIP: externalIP || '-', + ports, + }, + }; +}; + export const mapCronJobItem = (item) => { const metadata = { ...item.metadata, diff --git a/app/assets/javascripts/kubernetes_dashboard/graphql/queries/k8s_dashboard_services.query.graphql b/app/assets/javascripts/kubernetes_dashboard/graphql/queries/k8s_dashboard_services.query.graphql new file mode 100644 index 00000000000..7d42d66183e --- /dev/null +++ b/app/assets/javascripts/kubernetes_dashboard/graphql/queries/k8s_dashboard_services.query.graphql @@ -0,0 +1,17 @@ +query getK8sDashboardServices($configuration: LocalConfiguration) { + k8sServices(configuration: $configuration) @client { + metadata { + name + namespace + creationTimestamp + labels + annotations + } + spec { + type + clusterIP + externalIP + ports + } + } +} diff --git a/app/assets/javascripts/kubernetes_dashboard/graphql/resolvers/kubernetes.js b/app/assets/javascripts/kubernetes_dashboard/graphql/resolvers/kubernetes.js index 3450e2780cb..75285ad2cca 100644 --- a/app/assets/javascripts/kubernetes_dashboard/graphql/resolvers/kubernetes.js +++ b/app/assets/javascripts/kubernetes_dashboard/graphql/resolvers/kubernetes.js @@ -1,4 +1,4 @@ -import { Configuration, AppsV1Api, BatchV1Api } from '@gitlab/cluster-client'; +import { Configuration, CoreV1Api, AppsV1Api, BatchV1Api } from '@gitlab/cluster-client'; import { getK8sPods, @@ -9,6 +9,7 @@ import { watchWorkloadItems, mapJobItem, mapCronJobItem, + mapServicesItems, } from '../helpers/resolver_helpers'; import k8sDashboardPodsQuery from '../queries/k8s_dashboard_pods.query.graphql'; import k8sDashboardDeploymentsQuery from '../queries/k8s_dashboard_deployments.query.graphql'; @@ -17,6 +18,7 @@ import k8sDashboardReplicaSetsQuery from '../queries/k8s_dashboard_replica_sets. import k8sDaemonSetsQuery from '../queries/k8s_dashboard_daemon_sets.query.graphql'; import k8sJobsQuery from '../queries/k8s_dashboard_jobs.query.graphql'; import k8sCronJobsQuery from '../queries/k8s_dashboard_cron_jobs.query.graphql'; +import k8sServicesQuery from '../queries/k8s_dashboard_services.query.graphql'; export default { k8sPods(_, { configuration }, { client }) { @@ -244,4 +246,40 @@ export default { } }); }, + + k8sServices(_, { configuration, namespace = '' }, { client }) { + const config = new Configuration(configuration); + + const coreV1Api = new CoreV1Api(config); + const servicesApi = namespace + ? coreV1Api.listCoreV1NamespacedService({ namespace }) + : coreV1Api.listCoreV1ServiceForAllNamespaces(); + return servicesApi + .then((res) => { + const watchPath = buildWatchPath({ + resource: 'services', + namespace, + }); + watchWorkloadItems({ + client, + query: k8sServicesQuery, + configuration, + namespace, + watchPath, + queryField: 'k8sServices', + mapFn: mapServicesItems, + }); + + const data = res?.items || []; + + return data.map(mapServicesItems); + }) + .catch(async (err) => { + try { + await handleClusterError(err); + } catch (error) { + throw new Error(error.message); + } + }); + }, }; diff --git a/app/assets/javascripts/kubernetes_dashboard/helpers/k8s_integration_helper.js b/app/assets/javascripts/kubernetes_dashboard/helpers/k8s_integration_helper.js index 25135e23dc8..d3116fd611a 100644 --- a/app/assets/javascripts/kubernetes_dashboard/helpers/k8s_integration_helper.js +++ b/app/assets/javascripts/kubernetes_dashboard/helpers/k8s_integration_helper.js @@ -77,3 +77,14 @@ export function calculateCronJobStatus(item) { } return STATUS_READY; } + +export function generateServicePortsString(ports) { + if (!ports?.length) return ''; + + return ports + .map((port) => { + const nodePort = port.nodePort ? `:${port.nodePort}` : ''; + return `${port.port}${nodePort}/${port.protocol}`; + }) + .join(', '); +} diff --git a/app/assets/javascripts/kubernetes_dashboard/pages/services_page.vue b/app/assets/javascripts/kubernetes_dashboard/pages/services_page.vue new file mode 100644 index 00000000000..4dc8fb6b6c0 --- /dev/null +++ b/app/assets/javascripts/kubernetes_dashboard/pages/services_page.vue @@ -0,0 +1,69 @@ +<script> +import { s__ } from '~/locale'; +import { getAge, generateServicePortsString } from '../helpers/k8s_integration_helper'; +import { SERVICES_TABLE_FIELDS } from '../constants'; +import WorkloadLayout from '../components/workload_layout.vue'; +import k8sServicesQuery from '../graphql/queries/k8s_dashboard_services.query.graphql'; + +export default { + components: { + WorkloadLayout, + }, + inject: ['configuration'], + apollo: { + k8sServices: { + query: k8sServicesQuery, + variables() { + return { + configuration: this.configuration, + }; + }, + update(data) { + return ( + data?.k8sServices?.map((service) => { + return { + name: service.metadata?.name, + namespace: service.metadata?.namespace, + type: service.spec?.type, + clusterIP: service.spec?.clusterIP, + externalIP: service.spec?.externalIP, + ports: generateServicePortsString(service?.spec?.ports), + age: getAge(service.metadata?.creationTimestamp), + labels: service.metadata?.labels, + annotations: service.metadata?.annotations, + kind: s__('KubernetesDashboard|Service'), + }; + }) || [] + ); + }, + error(err) { + this.errorMessage = err?.message; + }, + }, + }, + data() { + return { + k8sServices: [], + errorMessage: '', + }; + }, + computed: { + loading() { + return this.$apollo.queries.k8sServices.loading; + }, + servicesStats() { + return []; + }, + }, + SERVICES_TABLE_FIELDS, +}; +</script> +<template> + <workload-layout + :loading="loading" + :error-message="errorMessage" + :stats="servicesStats" + :items="k8sServices" + :fields="$options.SERVICES_TABLE_FIELDS" + /> +</template> diff --git a/app/assets/javascripts/kubernetes_dashboard/router/constants.js b/app/assets/javascripts/kubernetes_dashboard/router/constants.js index a383ccd03e1..f02c01d7973 100644 --- a/app/assets/javascripts/kubernetes_dashboard/router/constants.js +++ b/app/assets/javascripts/kubernetes_dashboard/router/constants.js @@ -5,6 +5,7 @@ export const REPLICA_SETS_ROUTE_NAME = 'replicaSets'; export const DAEMON_SETS_ROUTE_NAME = 'daemonSets'; export const JOBS_ROUTE_NAME = 'jobs'; export const CRON_JOBS_ROUTE_NAME = 'cronJobs'; +export const SERVICES_ROUTE_NAME = 'services'; export const PODS_ROUTE_PATH = '/pods'; export const DEPLOYMENTS_ROUTE_PATH = '/deployments'; @@ -13,3 +14,4 @@ export const REPLICA_SETS_ROUTE_PATH = '/replicasets'; export const DAEMON_SETS_ROUTE_PATH = '/daemonsets'; export const JOBS_ROUTE_PATH = '/jobs'; export const CRON_JOBS_ROUTE_PATH = '/cronjobs'; +export const SERVICES_ROUTE_PATH = '/services'; diff --git a/app/assets/javascripts/kubernetes_dashboard/router/routes.js b/app/assets/javascripts/kubernetes_dashboard/router/routes.js index 01bb48e8dce..7448508de8a 100644 --- a/app/assets/javascripts/kubernetes_dashboard/router/routes.js +++ b/app/assets/javascripts/kubernetes_dashboard/router/routes.js @@ -6,6 +6,7 @@ import ReplicaSetsPage from '../pages/replica_sets_page.vue'; import DaemonSetsPage from '../pages/daemon_sets_page.vue'; import JobsPage from '../pages/jobs_page.vue'; import CronJobsPage from '../pages/cron_jobs_page.vue'; +import ServicesPage from '../pages/services_page.vue'; import { PODS_ROUTE_NAME, @@ -22,6 +23,8 @@ import { JOBS_ROUTE_PATH, CRON_JOBS_ROUTE_NAME, CRON_JOBS_ROUTE_PATH, + SERVICES_ROUTE_NAME, + SERVICES_ROUTE_PATH, } from './constants'; export default [ @@ -81,4 +84,12 @@ export default [ title: s__('KubernetesDashboard|CronJobs'), }, }, + { + name: SERVICES_ROUTE_NAME, + path: SERVICES_ROUTE_PATH, + component: ServicesPage, + meta: { + title: s__('KubernetesDashboard|Services'), + }, + }, ]; diff --git a/app/assets/javascripts/vue_shared/components/gl_countdown.vue b/app/assets/javascripts/vue_shared/components/gl_countdown.vue index 1769a283d8c..0e8ecc36f37 100644 --- a/app/assets/javascripts/vue_shared/components/gl_countdown.vue +++ b/app/assets/javascripts/vue_shared/components/gl_countdown.vue @@ -30,6 +30,11 @@ export default { mounted() { const updateRemainingTime = () => { const remainingMilliseconds = calculateRemainingMilliseconds(this.endDateString); + + if (remainingMilliseconds < 1) { + this.$emit('timer-expired'); + } + this.remainingTime = formatTime(remainingMilliseconds); }; diff --git a/app/graphql/types/ci/ci_cd_setting_type.rb b/app/graphql/types/ci/ci_cd_setting_type.rb index f01c63d717b..0c2d1b788af 100644 --- a/app/graphql/types/ci/ci_cd_setting_type.rb +++ b/app/graphql/types/ci/ci_cd_setting_type.rb @@ -27,7 +27,7 @@ module Types field :merge_pipelines_enabled, GraphQL::Types::Boolean, null: true, - description: 'Whether merge pipelines are enabled.', + description: 'Whether merged results pipelines are enabled.', method: :merge_pipelines_enabled? field :project, Types::ProjectType, diff --git a/app/models/member.rb b/app/models/member.rb index 845a711e986..d3101656739 100644 --- a/app/models/member.rb +++ b/app/models/member.rb @@ -288,6 +288,14 @@ class Member < ApplicationRecord refresh_member_authorized_projects end + after_create if: :update_organization_user? do + Organizations::OrganizationUser.upsert( + { organization_id: source.organization_id, user_id: user_id, access_level: :default }, + unique_by: [:organization_id, :user_id], + on_duplicate: :skip # Do not change access_level, could make :owner :default + ) + end + attribute :notification_level, default: -> { NotificationSetting.levels[:global] } class << self @@ -657,6 +665,12 @@ class Member < ApplicationRecord user&.project_bot? end + def update_organization_user? + return false unless Feature.enabled?(:update_organization_users, source.root_ancestor, type: :gitlab_com_derisk) + + !invite? && source.organization.present? + end + def log_invitation_token_cleanup return true unless Gitlab.com? && invite? && invite_accepted_at? diff --git a/app/models/users/phone_number_validation.rb b/app/models/users/phone_number_validation.rb index f6521eada40..ffb8d3a95a2 100644 --- a/app/models/users/phone_number_validation.rb +++ b/app/models/users/phone_number_validation.rb @@ -4,6 +4,11 @@ module Users class PhoneNumberValidation < ApplicationRecord include IgnorableColumns + # SMS send attempts subsequent to the first one will have wait times of 1 + # min, 3 min, 5 min after each one respectively. Wait time between the fifth + # attempt and so on will be 10 minutes. + SMS_SEND_WAIT_TIMES = [1.minute, 3.minutes, 5.minutes, 10.minutes].freeze + self.primary_key = :user_id self.table_name = 'user_phone_number_validations' @@ -62,5 +67,18 @@ module Users def validated? validated_at.present? end + + def sms_send_allowed_after + return unless Feature.enabled?(:sms_send_wait_time, user) + + # first send is allowed anytime + return if sms_send_count < 1 + return unless sms_sent_at + + max_wait_time = SMS_SEND_WAIT_TIMES.last + wait_time = SMS_SEND_WAIT_TIMES.fetch(sms_send_count - 1, max_wait_time) + + sms_sent_at + wait_time + end end end diff --git a/app/policies/organizations/organization_policy.rb b/app/policies/organizations/organization_policy.rb index afd8c6e144f..a203a58b164 100644 --- a/app/policies/organizations/organization_policy.rb +++ b/app/policies/organizations/organization_policy.rb @@ -3,6 +3,7 @@ module Organizations class OrganizationPolicy < BasePolicy condition(:organization_user) { @subject.user?(@user) } + condition(:organization_owner) { @subject.owner?(@user) } desc 'Organization is public' condition(:public_organization, scope: :subject, score: 0) { true } @@ -18,11 +19,14 @@ module Organizations enable :read_organization_user end - rule { organization_user }.policy do + rule { organization_owner }.policy do enable :admin_organization - enable :create_group + end + + rule { organization_user }.policy do enable :read_organization enable :read_organization_user + enable :create_group end end end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index fc0695c7f62..dfad9f7f673 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -345,6 +345,15 @@ :weight: 1 :idempotent: false :tags: [] +- :name: cronjob:click_house_event_authors_consistency_cron + :worker_name: ClickHouse::EventAuthorsConsistencyCronWorker + :feature_category: :value_stream_management + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:click_house_events_sync :worker_name: ClickHouse::EventsSyncWorker :feature_category: :value_stream_management diff --git a/app/workers/click_house/event_authors_consistency_cron_worker.rb b/app/workers/click_house/event_authors_consistency_cron_worker.rb new file mode 100644 index 00000000000..5c52cda0204 --- /dev/null +++ b/app/workers/click_house/event_authors_consistency_cron_worker.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +module ClickHouse + # rubocop: disable CodeReuse/ActiveRecord -- Building worker-specific ActiveRecord and ClickHouse queries + class EventAuthorsConsistencyCronWorker + include ApplicationWorker + include ClickHouseWorker + include Gitlab::ExclusiveLeaseHelpers + include Gitlab::Utils::StrongMemoize + + idempotent! + queue_namespace :cronjob + data_consistency :delayed + worker_has_external_dependencies! # the worker interacts with a ClickHouse database + feature_category :value_stream_management + + MAX_TTL = 5.minutes.to_i + MAX_RUNTIME = 150.seconds + MAX_AUTHOR_DELETIONS = 2000 + CLICK_HOUSE_BATCH_SIZE = 100_000 + POSTGRESQL_BATCH_SIZE = 2500 + + def perform + return unless enabled? + + runtime_limiter = Analytics::CycleAnalytics::RuntimeLimiter.new(MAX_RUNTIME) + + in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do + author_records_to_delete = [] + last_processed_id = 0 + iterator.each_batch(column: :author_id, of: CLICK_HOUSE_BATCH_SIZE) do |scope| + query = scope.select(Arel.sql('DISTINCT author_id')).to_sql + ids_from_click_house = connection.select(query).pluck('author_id').sort + + ids_from_click_house.each_slice(POSTGRESQL_BATCH_SIZE) do |ids| + author_records_to_delete.concat(missing_user_ids(ids)) + last_processed_id = ids.last + + to_be_deleted_size = author_records_to_delete.size + if to_be_deleted_size >= MAX_AUTHOR_DELETIONS + metadata.merge!(status: :deletion_limit_reached, deletions: to_be_deleted_size) + break + end + + if runtime_limiter.over_time? + metadata.merge!(status: :over_time, deletions: to_be_deleted_size) + break + end + end + + break if limit_was_reached? + end + + delete_records_from_click_house(author_records_to_delete) + + last_processed_id = 0 if table_fully_processed? + ClickHouse::SyncCursor.update_cursor_for(:event_authors_consistency_check, last_processed_id) + + log_extra_metadata_on_done(:result, metadata) + end + end + + private + + def metadata + @metadata ||= { status: :processed, deletions: 0 } + end + + def limit_was_reached? + metadata[:status] == :deletion_limit_reached || metadata[:status] == :over_time + end + + def table_fully_processed? + metadata[:status] == :processed + end + + def enabled? + ClickHouse::Client.database_configured?(:main) && Feature.enabled?(:event_sync_worker_for_click_house) + end + + def previous_author_id + value = ClickHouse::SyncCursor.cursor_for(:event_authors_consistency_check) + value == 0 ? nil : value + end + strong_memoize_attr :previous_author_id + + def iterator + builder = ClickHouse::QueryBuilder.new('event_authors') + ClickHouse::Iterator.new(query_builder: builder, connection: connection, min_value: previous_author_id) + end + + def connection + @connection ||= ClickHouse::Connection.new(:main) + end + + def missing_user_ids(ids) + value_list = Arel::Nodes::ValuesList.new(ids.map { |id| [id] }) + User + .from("(#{value_list.to_sql}) AS user_ids(id)") + .where('NOT EXISTS (SELECT 1 FROM users WHERE id = user_ids.id)') + .pluck(:id) + end + + def delete_records_from_click_house(ids) + query = ClickHouse::Client::Query.new( + raw_query: "DELETE FROM events WHERE author_id IN ({author_ids:Array(UInt64)})", + placeholders: { author_ids: ids.to_json } + ) + + connection.execute(query) + + query = ClickHouse::Client::Query.new( + raw_query: "DELETE FROM event_authors WHERE author_id IN ({author_ids:Array(UInt64)})", + placeholders: { author_ids: ids.to_json } + ) + + connection.execute(query) + end + end + # rubocop: enable CodeReuse/ActiveRecord +end |