Welcome to mirror list, hosted at ThFree Co, Russian Federation.

lock_writes_manager.rb « database « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 8ddd871f93cf8303e89085ef278ae6ddb8d418af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# frozen_string_literal: true

module Gitlab
  module Database
    class LockWritesManager
      TRIGGER_FUNCTION_NAME = 'gitlab_schema_prevent_write'

      # Triggers to block INSERT / UPDATE / DELETE
      # Triggers on TRUNCATE are not added to the information_schema.triggers
      # See https://www.postgresql.org/message-id/16934.1568989957%40sss.pgh.pa.us
      EXPECTED_TRIGGER_RECORD_COUNT = 3

      # table_name can include schema name as a prefix. For example: 'gitlab_partitions_static.events_03',
      # otherwise, it will default to current used schema, for example 'public'.
      def initialize(table_name:, connection:, database_name:, with_retries: true, logger: nil, dry_run: false)
        @table_name = table_name
        @connection = connection
        @database_name = database_name
        @logger = logger
        @dry_run = dry_run
        @with_retries = with_retries

        @table_name_without_schema = ActiveRecord::ConnectionAdapters::PostgreSQL::Utils
          .extract_schema_qualified_name(table_name.to_s)
          .identifier
      end

      def table_locked_for_writes?
        query = <<~SQL
            SELECT COUNT(*) from information_schema.triggers
            WHERE event_object_table = '#{table_name_without_schema}'
            AND trigger_name = '#{write_trigger_name}'
        SQL

        connection.select_value(query) == EXPECTED_TRIGGER_RECORD_COUNT
      end

      def lock_writes
        if table_locked_for_writes?
          logger&.info "Skipping lock_writes, because #{table_name} is already locked for writes"
          return result_hash(action: 'skipped')
        end

        logger&.info "Database: '#{database_name}', Table: '#{table_name}': Lock Writes".color(:yellow)
        sql_statement = <<~SQL
          CREATE TRIGGER #{write_trigger_name}
            BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE
            ON #{table_name}
            FOR EACH STATEMENT EXECUTE FUNCTION #{TRIGGER_FUNCTION_NAME}();
        SQL

        execute_sql_statement(sql_statement)

        result_hash(action: dry_run ? 'needs_lock' : 'locked')
      end

      def unlock_writes
        unless table_locked_for_writes?
          logger&.info "Skipping unlock_writes, because #{table_name} is already unlocked for writes"
          return result_hash(action: 'skipped')
        end

        logger&.info "Database: '#{database_name}', Table: '#{table_name}': Allow Writes".color(:green)
        sql_statement = <<~SQL
          DROP TRIGGER IF EXISTS #{write_trigger_name} ON #{table_name};
        SQL

        execute_sql_statement(sql_statement)

        result_hash(action: dry_run ? 'needs_unlock' : 'unlocked')
      end

      private

      attr_reader :table_name, :connection, :database_name, :logger, :dry_run, :table_name_without_schema, :with_retries

      def execute_sql_statement(sql)
        if dry_run
          logger&.info sql
        elsif with_retries
          raise "Cannot call lock_retries_helper if a transaction is already open" if connection.transaction_open?

          run_with_retries(connection) do
            connection.execute(sql)
          end
        else
          connection.execute(sql)
        end
      end

      def run_with_retries(connection, &block)
        with_statement_timeout_retries do
          with_lock_retries(connection) do
            yield
          end
        end
      end

      def with_statement_timeout_retries(times = 5)
        current_iteration = 1
        begin
          yield
        rescue ActiveRecord::QueryCanceled => err # rubocop:disable Database/RescueQueryCanceled
          if current_iteration <= times
            current_iteration += 1
            retry
          else
            raise err
          end
        end
      end

      def with_lock_retries(connection, &block)
        Gitlab::Database::WithLockRetries.new(
          klass: "gitlab:db:lock_writes",
          logger: logger || Gitlab::AppLogger,
          connection: connection,
          allow_savepoints: false # this causes the WithLockRetries to fail if sub-transaction has been detected.
        ).run(&block)
      end

      def write_trigger_name
        "gitlab_schema_write_trigger_for_#{table_name_without_schema}"
      end

      def result_hash(action:)
        { action: action, database: database_name, table: table_name, dry_run: dry_run }
      end
    end
  end
end