Welcome to mirror list, hosted at ThFree Co, Russian Federation.

bulk_insert_safe.rb « concerns « models « app - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: a61db2dc148829180b04335abd0fe32411ae0ee6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# frozen_string_literal: true

##
# A mixin for ActiveRecord models that enables callers to insert instances of the
# target class into the database en-bloc via the [bulk_insert] method.
#
# Upon inclusion in the target class, the mixin will perform a number of checks to
# ensure that the target is eligible for bulk insertions. For instance, it must not
# use ActiveRecord callbacks that fire between [save]s, since these would not run
# properly when instances are inserted in bulk.
#
# The mixin uses ActiveRecord 6's [InsertAll] type internally for bulk insertions.
# Unlike [InsertAll], however, it requires you to pass instances of the target type
# rather than row hashes, since it will run validations prior to insertion.
#
# @example
#
#   class MyRecord < ApplicationRecord
#     include BulkInsertSafe # must be included _last_ i.e. after any other concerns
#   end
#
#   # simple
#   MyRecord.bulk_insert!(items)
#
#   # with custom batch size
#   MyRecord.bulk_insert!(items, batch_size: 100)
#
#   # without validations
#   MyRecord.bulk_insert!(items, validate: false)
#
#   # with attribute hash modification
#   MyRecord.bulk_insert!(items) { |item_attrs| item_attrs['col'] = 42 }
#
#
module BulkInsertSafe
  extend ActiveSupport::Concern

  # These are the callbacks we think safe when used on models that are
  # written to the database in bulk
  CALLBACK_NAME_WHITELIST = Set[
    :initialize,
    :validate,
    :validation,
    :find,
    :destroy
  ].freeze

  DEFAULT_BATCH_SIZE = 500

  MethodNotAllowedError = Class.new(StandardError)
  PrimaryKeySetError = Class.new(StandardError)

  class_methods do
    def set_callback(name, *args)
      unless _bulk_insert_callback_allowed?(name, args)
        raise MethodNotAllowedError.new(
          "Not allowed to call `set_callback(#{name}, #{args})` when model extends `BulkInsertSafe`." \
            "Callbacks that fire per each record being inserted do not work with bulk-inserts.")
      end

      super
    end

    # Inserts the given ActiveRecord [items] to the table mapped to this class.
    # Items will be inserted in batches of a given size, where insertion semantics are
    # "atomic across all batches".
    #
    # @param [Boolean] validate          Whether validations should run on [items]
    # @param [Integer] batch_size        How many items should at most be inserted at once
    # @param [Boolean] skip_duplicates   Marks duplicates as allowed, and skips inserting them
    # @param [Proc]    handle_attributes Block that will receive each item attribute hash
    #                                    prior to insertion for further processing
    #
    # Note that this method will throw on the following occasions:
    # - [PrimaryKeySetError]            when primary keys are set on entities prior to insertion
    # - [ActiveRecord::RecordInvalid]   on entity validation failures
    # - [ActiveRecord::RecordNotUnique] on duplicate key errors
    #
    # @return true if operation succeeded, throws otherwise.
    #
    def bulk_insert!(items, validate: true, skip_duplicates: false, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
      _bulk_insert_all!(items,
        validate: validate,
        on_duplicate: skip_duplicates ? :skip : :raise,
        unique_by: nil,
        batch_size: batch_size,
        &handle_attributes)
    end

    # Upserts the given ActiveRecord [items] to the table mapped to this class.
    # Items will be inserted or updated in batches of a given size,
    # where insertion semantics are "atomic across all batches".
    #
    # @param [Boolean] validate          Whether validations should run on [items]
    # @param [Integer] batch_size        How many items should at most be inserted at once
    # @param [Symbol/Array] unique_by    Defines index or columns to use to consider item duplicate
    # @param [Proc]    handle_attributes Block that will receive each item attribute hash
    #                                    prior to insertion for further processing
    #
    # Unique indexes can be identified by columns or name:
    #  - unique_by: :isbn
    #  - unique_by: %i[ author_id name ]
    #  - unique_by: :index_books_on_isbn
    #
    # Note that this method will throw on the following occasions:
    # - [PrimaryKeySetError]            when primary keys are set on entities prior to insertion
    # - [ActiveRecord::RecordInvalid]   on entity validation failures
    # - [ActiveRecord::RecordNotUnique] on duplicate key errors
    #
    # @return true if operation succeeded, throws otherwise.
    #
    def bulk_upsert!(items, unique_by:, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
      _bulk_insert_all!(items,
        validate: validate,
        on_duplicate: :update,
        unique_by: unique_by,
        batch_size: batch_size,
        &handle_attributes)
    end

    private

    def _bulk_insert_all!(items, on_duplicate:, unique_by:, validate:, batch_size:, &handle_attributes)
      return true if items.empty?

      transaction do
        items.each_slice(batch_size) do |item_batch|
          attributes = _bulk_insert_item_attributes(
            item_batch, validate, &handle_attributes)

          ActiveRecord::InsertAll
            .new(self, attributes, on_duplicate: on_duplicate, unique_by: unique_by)
            .execute
        end
      end

      true
    end

    def _bulk_insert_item_attributes(items, validate_items)
      items.map do |item|
        item.validate! if validate_items

        attributes = {}
        column_names.each do |name|
          value = item.read_attribute(name)
          value = item.type_for_attribute(name).serialize(value) # rubocop:disable Cop/ActiveRecordSerialize
          attributes[name] = value
        end

        _bulk_insert_reject_primary_key!(attributes, item.class.primary_key)

        yield attributes if block_given?

        attributes
      end
    end

    def _bulk_insert_reject_primary_key!(attributes, primary_key)
      if existing_pk = attributes.delete(primary_key)
        raise PrimaryKeySetError, "Primary key set: #{primary_key}:#{existing_pk}\n" \
          "Bulk-inserts are only supported for rows that don't already have PK set"
      end
    end

    def _bulk_insert_callback_allowed?(name, args)
      _bulk_insert_whitelisted?(name) || _bulk_insert_saved_from_belongs_to?(name, args)
    end

    # belongs_to associations will install a before_save hook during class loading
    def _bulk_insert_saved_from_belongs_to?(name, args)
      args.first == :before && args.second.to_s.start_with?('autosave_associated_records_for_')
    end

    def _bulk_insert_whitelisted?(name)
      CALLBACK_NAME_WHITELIST.include?(name)
    end
  end
end