1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# frozen_string_literal: true
module Gitlab
module Database
# Constructs queries of the form:
#
# with cte(a, b, c) as (
# select * from (values (:x, :y, :z), (:q, :r, :s)) as t
# )
# update table set b = cte.b, c = cte.c where a = cte.a
#
# Which is useful if you want to update a set of records in a single query
# but cannot express the update as a calculation (i.e. you have arbitrary
# updates to perform).
#
# The requirements are that the table must have an ID column used to
# identify the rows to be updated.
#
# Usage:
#
# mapping = {
# issue_a => { title: 'This title', relative_position: 100 },
# issue_b => { title: 'That title', relative_position: 173 }
# }
#
# ::Gitlab::Database::BulkUpdate.execute(%i[title relative_position], mapping)
#
# Note that this is a very low level tool, and operates on the raw column
# values. Enums/state fields must be translated into their underlying
# representations, for example, and no hooks will be called.
#
module BulkUpdate
LIST_SEPARATOR = ', '
class Setter
include Gitlab::Utils::StrongMemoize
def initialize(model, columns, mapping)
@table_name = model.table_name
@connection = model.connection
@columns = self.class.column_definitions(model, columns)
@mapping = self.class.value_mapping(mapping)
end
def update!
connection.exec_update(sql, log_name, params)
end
def self.column_definitions(model, columns)
raise ArgumentError, 'invalid columns' if columns.blank? || columns.any? { |c| !c.is_a?(Symbol) }
raise ArgumentError, 'cannot set ID' if columns.include?(:id)
([:id] | columns).map { |name| column_definition(model, name) }
end
def self.column_definition(model, name)
definition = model.column_for_attribute(name)
raise ArgumentError, "Unknown column: #{name}" unless definition.type
definition
end
def self.value_mapping(mapping)
raise ArgumentError, 'invalid mapping' if mapping.blank?
raise ArgumentError, 'invalid mapping value' if mapping.any? { |_k, v| !v.is_a?(Hash) }
mapping
end
private
attr_reader :table_name, :connection, :columns, :mapping
def log_name
strong_memoize(:log_name) do
"BulkUpdate #{table_name} #{columns.drop(1).map(&:name)}:#{mapping.size}"
end
end
def params
mapping.flat_map do |k, v|
obj_id = k.try(:id) || k
v = v.merge(id: obj_id)
columns.map { |c| query_attribute(c, k, v.with_indifferent_access) }
end
end
def query_attribute(column, key, values)
value = values[column.name]
key[column.name] = value if key.try(:id) # optimistic update
ActiveRecord::Relation::QueryAttribute.from_user(nil, value, ActiveModel::Type.lookup(column.type))
end
def values
counter = 0
typed = false
mapping.map do |k, v|
binds = columns.map do |c|
bind = "$#{counter += 1}"
# PG is not great at inferring types - help it for the first row.
bind += "::#{c.sql_type}" unless typed
bind
end
typed = true
"(#{list_of(binds)})"
end
end
def list_of(list)
list.join(LIST_SEPARATOR)
end
def sql
<<~SQL
WITH cte(#{list_of(cte_columns)}) AS MATERIALIZED (VALUES #{list_of(values)})
UPDATE #{table_name} SET #{list_of(updates)} FROM cte WHERE cte_id = id
SQL
end
def column_names
strong_memoize(:column_names) { columns.map(&:name) }
end
def cte_columns
strong_memoize(:cte_columns) do
column_names.map do |c|
connection.quote_column_name("cte_#{c}")
end
end
end
def updates
column_names.zip(cte_columns).drop(1).map do |dest, src|
"#{connection.quote_column_name(dest)} = cte.#{src}"
end
end
end
def self.execute(columns, mapping, &to_class)
raise ArgumentError if mapping.blank?
entries_by_class = mapping.group_by { |k, v| to_class ? yield(k) : k.class }
entries_by_class.each do |model, entries|
Setter.new(model, columns, entries).update!
end
end
end
end
end
|