1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
# frozen_string_literal: true
module Gitlab
module Database
module Partitioning
module List
class ConvertTable
UnableToPartition = Class.new(StandardError)
SQL_STATEMENT_SEPARATOR = ";\n\n"
PARTITIONING_CONSTRAINT_NAME = 'partitioning_constraint'
attr_reader :partitioning_column, :table_name, :parent_table_name, :zero_partition_value
def initialize(
migration_context:, table_name:, parent_table_name:, partitioning_column:,
zero_partition_value:)
@migration_context = migration_context
@connection = migration_context.connection
@table_name = table_name
@parent_table_name = parent_table_name
@partitioning_column = partitioning_column
@zero_partition_value = zero_partition_value
end
def prepare_for_partitioning(async: false)
assert_existing_constraints_partitionable
add_partitioning_check_constraint(async: async)
end
def revert_preparation_for_partitioning
migration_context.remove_check_constraint(table_name, partitioning_constraint.name)
end
def partition
# If already partitioned, the table is no longer partitionable. Thus we skip checks leading up
# to partitioning if the partitioning transaction has already succeeded.
unless already_partitioned?
assert_existing_constraints_partitionable
assert_partitioning_constraint_present
create_parent_table
migration_context.with_lock_retries do
redefine_loose_foreign_key_triggers do
migration_context.execute(sql_to_convert_table)
end
end
end
# Attaching foreign keys handles cases where one or more foreign keys already exists, so it doesn't
# need a check similar to the rest of this method
attach_foreign_keys_to_parent
end
def revert_partitioning
migration_context.with_lock_retries(raise_on_exhaustion: true) do
migration_context.execute(<<~SQL)
ALTER TABLE #{connection.quote_table_name(parent_table_name)}
DETACH PARTITION #{connection.quote_table_name(table_name)};
SQL
alter_sequences_sql = alter_sequence_statements(old_table: parent_table_name, new_table: table_name)
.join(SQL_STATEMENT_SEPARATOR)
migration_context.execute(alter_sequences_sql)
# This takes locks for all the foreign keys that the parent table had.
# However, those same locks were taken while detaching the partition, and we can't avoid that.
# If we dropped the foreign key before detaching the partition to avoid this locking,
# the drop would cascade to the child partitions and drop their foreign keys as well
migration_context.drop_table(parent_table_name)
end
add_partitioning_check_constraint
end
private
attr_reader :connection, :migration_context
delegate :quote_table_name, :quote_column_name, :current_schema, to: :connection
def sql_to_convert_table
# The critical statement here is the attach_table_to_parent statement.
# The following statements could be run in a later transaction,
# but they acquire the same locks so it's much faster to include them
# here.
[
attach_table_to_parent_statement,
alter_sequence_statements(old_table: table_name, new_table: parent_table_name),
remove_constraint_statement
].flatten.join(SQL_STATEMENT_SEPARATOR)
end
def table_identifier
"#{current_schema}.#{table_name}"
end
def assert_existing_constraints_partitionable
violating_constraints = Gitlab::Database::PostgresConstraint
.by_table_identifier(table_identifier)
.primary_or_unique_constraints
.not_including_column(partitioning_column)
.to_a
return if violating_constraints.empty?
violation_messages = violating_constraints.map { |c| "#{c.name} on (#{c.column_names.join(', ')})" }
raise UnableToPartition, <<~MSG
Constraints on #{table_name} are incompatible with partitioning on #{partitioning_column}
All primary key and unique constraints must include the partitioning column.
Violations:
#{violation_messages.join("\n")}
MSG
end
def partitioning_constraint
constraints_on_column = Gitlab::Database::PostgresConstraint
.by_table_identifier(table_identifier)
.check_constraints
.including_column(partitioning_column)
check_body = "CHECK ((#{partitioning_column} = #{zero_partition_value}))"
constraints_on_column.find do |constraint|
constraint.definition.start_with?(check_body)
end
end
def assert_partitioning_constraint_present
return if partitioning_constraint&.constraint_valid?
raise UnableToPartition, <<~MSG
Table #{table_name} is not ready for partitioning.
Before partitioning, a check constraint must enforce that (#{partitioning_column} = #{zero_partition_value})
MSG
end
def add_partitioning_check_constraint(async: false)
return validate_partitioning_constraint_synchronously if partitioning_constraint.present?
check_body = "#{partitioning_column} = #{connection.quote(zero_partition_value)}"
# Any constraint name would work. The constraint is found based on its definition before partitioning
migration_context.add_check_constraint(
table_name, check_body, PARTITIONING_CONSTRAINT_NAME,
validate: !async
)
if async
migration_context.prepare_async_check_constraint_validation(
table_name, name: PARTITIONING_CONSTRAINT_NAME
)
end
return if partitioning_constraint.present?
raise UnableToPartition, <<~MSG
Error adding partitioning constraint `#{PARTITIONING_CONSTRAINT_NAME}` for `#{table_name}`
MSG
end
def validate_partitioning_constraint_synchronously
if partitioning_constraint.constraint_valid?
return Gitlab::AppLogger.info <<~MSG
Nothing to do, the partitioning constraint exists and is valid for `#{table_name}`
MSG
end
# Async validations are executed only on .com, we need to validate synchronously for self-managed
migration_context.validate_check_constraint(table_name, partitioning_constraint.name)
return if partitioning_constraint.constraint_valid?
raise UnableToPartition, <<~MSG
Error validating partitioning constraint `#{partitioning_constraint.name}` for `#{table_name}`
MSG
end
def create_parent_table
migration_context.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS #{quote_table_name(parent_table_name)} (
LIKE #{quote_table_name(table_name)} INCLUDING ALL
) PARTITION BY LIST(#{quote_column_name(partitioning_column)})
SQL
end
def attach_foreign_keys_to_parent
migration_context.foreign_keys(table_name).each do |fk|
# At this point no other connection knows about the parent table.
# Thus the only contended lock in the following transaction is on fk.to_table.
# So a deadlock is impossible.
# (We also take a share update exclusive lock against the recently attached child table,
# but that only blocks vacuum and other schema modifications, not reads or writes)
# If we're rerunning this migration after a failure to acquire a lock, the foreign key might already exist
# Don't try to recreate it in that case
if migration_context.foreign_keys(parent_table_name)
.any? { |p_fk| p_fk.options[:name] == fk.options[:name] }
next
end
migration_context.with_lock_retries(raise_on_exhaustion: true) do
migration_context.add_foreign_key(parent_table_name, fk.to_table, **fk.options)
end
end
end
def attach_table_to_parent_statement
<<~SQL
ALTER TABLE #{quote_table_name(parent_table_name)}
ATTACH PARTITION #{table_name}
FOR VALUES IN (#{zero_partition_value})
SQL
end
def alter_sequence_statements(old_table:, new_table:)
sequences_owned_by(old_table).map do |seq_info|
seq_name, column_name = seq_info.values_at(:name, :column_name)
statement_parts = []
# If a different user owns the old table, the conversion process will fail to reassign the sequence
# ownership to the new parent table (as it will be owned by the current user).
# Force the old table to be owned by the current user in that case.
unless current_user_owns_table?(old_table)
statement_parts << set_current_user_owns_table_statement(old_table)
end
statement_parts << <<~SQL.chomp
ALTER SEQUENCE #{quote_table_name(seq_name)} OWNED BY #{quote_table_name(new_table)}.#{quote_column_name(column_name)}
SQL
statement_parts.join(SQL_STATEMENT_SEPARATOR)
end
end
def remove_constraint_statement
<<~SQL
ALTER TABLE #{quote_table_name(parent_table_name)}
DROP CONSTRAINT #{quote_table_name(partitioning_constraint.name)}
SQL
end
# TODO: https://gitlab.com/gitlab-org/gitlab/-/issues/373887
def sequences_owned_by(table_name)
sequence_data = connection.exec_query(<<~SQL, nil, [table_name])
SELECT seq_pg_class.relname AS seq_name,
dep_pg_class.relname AS table_name,
pg_attribute.attname AS col_name
FROM pg_class seq_pg_class
INNER JOIN pg_depend ON seq_pg_class.oid = pg_depend.objid
INNER JOIN pg_class dep_pg_class ON pg_depend.refobjid = dep_pg_class.oid
INNER JOIN pg_attribute ON dep_pg_class.oid = pg_attribute.attrelid
AND pg_depend.refobjsubid = pg_attribute.attnum
WHERE seq_pg_class.relkind = 'S'
AND dep_pg_class.relname = $1
SQL
sequence_data.map do |seq_info|
name, column_name = seq_info.values_at('seq_name', 'col_name')
{ name: name, column_name: column_name }
end
end
def table_owner(table_name)
connection.select_value(<<~SQL, nil, [table_name])
SELECT tableowner FROM pg_tables WHERE tablename = $1
SQL
end
def current_user_owns_table?(table_name)
current_user = connection.select_value('select current_user')
table_owner(table_name) == current_user
end
def set_current_user_owns_table_statement(table_name)
<<~SQL.chomp
ALTER TABLE #{connection.quote_table_name(table_name)} OWNER TO CURRENT_USER
SQL
end
def table_name_for_identifier(table_identifier)
/^\w+\.(\w+)*$/.match(table_identifier)[1]
end
def redefine_loose_foreign_key_triggers
if migration_context.has_loose_foreign_key?(table_name)
migration_context.untrack_record_deletions(table_name)
yield if block_given?
migration_context.track_record_deletions(parent_table_name)
migration_context.track_record_deletions(table_name)
elsif block_given?
yield
end
end
def already_partitioned?
Gitlab::Database::PostgresPartition.for_identifier(table_identifier).exists?
end
end
end
end
end
end
|