diff options
author | sunnavy <sunnavy@bestpractical.com> | 2022-10-13 16:34:01 +0300 |
---|---|---|
committer | sunnavy <sunnavy@bestpractical.com> | 2022-10-28 00:09:01 +0300 |
commit | 32f8de14d50cb998860b1ba5ebb0ce01f0a62fbe (patch) | |
tree | 16aa2c80c99257d0f71b2d6f3268929ded3bd8e8 | |
parent | 88076841a7a95089b09b44bf19404a58576664c6 (diff) |
Add batch mode to importer for data serialized with --clone or --all
By reducing SQL calls a lot, importer can run much faster now. Cloned
data doesn't have dependencies to resolve, so we can also insert rows in
parallel, which makes the speedup more noticable.
-rw-r--r-- | lib/RT/Migrate/Importer.pm | 281 | ||||
-rw-r--r-- | sbin/rt-importer.in | 16 |
2 files changed, 267 insertions, 30 deletions
diff --git a/lib/RT/Migrate/Importer.pm b/lib/RT/Migrate/Importer.pm index 91036377b1..5c355e444d 100644 --- a/lib/RT/Migrate/Importer.pm +++ b/lib/RT/Migrate/Importer.pm @@ -74,6 +74,8 @@ sub Init { AutoCommit => 1, BatchUserPrincipals => 0, BatchGroupPrincipals => 0, + BatchSize => 0, + MaxProcesses => 10, @_, ); @@ -87,6 +89,8 @@ sub Init { $self->{AutoCommit} = $args{AutoCommit}; $self->{$_} = $args{$_} for qw/BatchUserPrincipals BatchGroupPrincipals/; + $self->{BatchSize} = $args{BatchSize}; + $self->{MaxProcesses} = $args{MaxProcesses} || 10; $self->{HandleError} = sub { 0 }; $self->{HandleError} = $args{HandleError} @@ -166,6 +170,7 @@ sub InitStream { Type => 'FreeformSingle', ); } + $self->{OriginalId} = $cf->Id; } if ( !$self->{Clone} ) { @@ -221,6 +226,29 @@ sub NextPrincipalId { return $id; } +sub NextId { + my $self = shift; + my $class = shift; + my $id = shift; + + if ( $id ) { + $self->{_next_id}{$class} = $id; + return $id; + } + + if ( defined $self->{_next_id}{$class} ) { + return $self->{_next_id}{$class}++; + } + return; +} + +sub HasNextId { + my $self = shift; + my $class = shift; + + return defined $self->{_next_id}{$class} ? 1 : 0; +} + sub Resolve { my $self = shift; my ($uid, $class, $id) = @_; @@ -235,27 +263,39 @@ sub Resolve { return unless $self->{Pending}{$uid}; + my @left; for my $ref (@{$self->{Pending}{$uid}}) { - my ($pclass, $pid) = @{ $self->Lookup( $ref->{uid} ) }; - my $obj = $pclass->new( RT->SystemUser ); - $obj->LoadByCols( Id => $pid ); - $obj->__Set( - Field => $ref->{column}, - Value => $id, - ) if defined $ref->{column}; - $obj->__Set( - Field => $ref->{classcolumn}, - Value => $class, - ) if defined $ref->{classcolumn}; - $obj->__Set( - Field => $ref->{uri}, - Value => $self->LookupObj($uid)->URI, - ) if defined $ref->{uri}; - if (my $method = $ref->{method}) { - $obj->$method($self, $ref, $class, $id); + if ( my $lookup = $self->Lookup( $ref->{uid} ) ) { + my ( $pclass, $pid ) = @{$lookup}; + my $obj = $pclass->new( RT->SystemUser ); + $obj->LoadByCols( Id => $pid ); + $obj->__Set( + Field => $ref->{column}, + Value => $id, + ) if defined $ref->{column}; + $obj->__Set( + Field => $ref->{classcolumn}, + Value => $class, + ) if defined $ref->{classcolumn}; + $obj->__Set( + Field => $ref->{uri}, + Value => $self->LookupObj($uid)->URI, + ) if defined $ref->{uri}; + if ( my $method = $ref->{method} ) { + $obj->$method( $self, $ref, $class, $id ); + } + } + else { + push @left, $ref; } } - delete $self->{Pending}{$uid}; + + if ( @left ) { + $self->{Pending}{$uid} = \@left; + } + else { + delete $self->{Pending}{$uid}; + } } sub Lookup { @@ -419,6 +459,23 @@ sub Create { } } + if ( $self->{BatchSize} && ( $self->{Clone} || $self->{All} ) ) { + + # Finish up the previous class if there are any records left + my ($previous_class) = grep { $_ ne $class && @{ $self->{_batch}{$_} } } keys %{ $self->{_batch} || {} }; + if ($previous_class) { + $self->BatchCreate( $previous_class, $self->{_batch}{$previous_class} ); + } + + if ( $data->{id} || $self->HasNextId($class) ) { + push @{ $self->{_batch}{$class} }, [ $uid, $data ]; + if ( @{ $self->{_batch}{$class} } == $self->{BatchSize} ) { + $self->BatchCreate( $class, $self->{_batch}{$class} ); + } + return; + } + } + my ($id, $msg) = eval { # catch and rethrow on the outside so we can provide more info local $SIG{__DIE__}; @@ -436,6 +493,16 @@ sub Create { } } + return $obj if $self->{Clone}; + + # Users/Groups have id set in PreInflate, no need to set here + if ( $self->{BatchSize} + && $self->{All} + && $class =~ /^RT::(?:Ticket|Transaction|Attachment|GroupMember|ObjectCustomFieldValue|Attribute|Link)$/ ) + { + $self->NextId( $class, $id + 1 ); + } + $self->{ObjectCount}{$class}++; $self->Resolve( $uid => $class, $id ); @@ -513,18 +580,7 @@ sub ReadStream { # If it's a ticket, we might need to create a # TicketCustomField for the previous ID if ($class eq "RT::Ticket" and $self->{OriginalId}) { - my $value = $self->{ExcludeOrganization} - ? $origid - : $self->Organization . ":$origid"; - - $obj->Load( $self->Lookup($uid)->[1] ); - my ($id, $msg) = $obj->AddCustomFieldValue( - Field => $self->{OriginalId}, - Value => $value, - RecordTransaction => 0, - ); - warn "Failed to add custom field to $uid: $msg" - unless $id; + $self->CreateOriginalIdOCFVs( { $self->Lookup($uid)->[1] => $origid } ); } # If it's a CF, we don't know yet if it's global (the OCF @@ -542,6 +598,33 @@ sub CloseStream { $self->{Progress}->(undef, 'force') if $self->{Progress}; + my %order = ( + 'RT::Ticket' => 1, + 'RT::Group' => 2, + 'RT::GroupMember' => 3, + 'RT::ObjectCustomFieldValue' => 4, + 'RT::Transaction' => 5, + 'RT::Attachment' => 6, + ); + + for my $class ( + sort { ( $order{$a} // 0 ) <=> ( $order{$b} // 0 ) } + grep { @{ $self->{_batch}{$_} } } keys %{ $self->{_batch} || {} } + ) + { + $self->BatchCreate( $class, $self->{_batch}{$class} ); + } + + $self->{_pm}->wait_all_children if $self->{_pm}; + + # Now have all data imported, try to resolve again. + my @uids = grep { $self->{UIDs}{$_} } keys %{ $self->{Pending} }; + + for my $uid (@uids) { + my ( $class, $id ) = split /-/, $self->{UIDs}{$uid}, 2; + $self->Resolve( $uid, $class, $id ); + } + # Fill CGM # Groups @@ -644,6 +727,144 @@ sub Progress { return $self->{Progress} = $_[0]; } +sub BatchCreate { + my $self = shift; + my $class = shift; + my $items = shift or return; + + return unless @$items; + + if ( $self->{Clone} ) { + + if ( !$self->{_pm} ) { + require Parallel::ForkManager; + $self->{_pm} = Parallel::ForkManager->new( $self->{MaxProcesses} ); + } + + $self->{ObjectCount}{$class} += @$items; + my @copy = @$items; + @$items = (); + + $RT::Handle->Commit unless $self->{AutoCommit}; + $RT::Handle->Disconnect; + + if ( $self->{_pm}->start ) { + $RT::Handle->Connect; + $RT::Handle->BeginTransaction unless $self->{AutoCommit}; + return 1; + } + + $RT::Handle->Connect; + $self->_BatchCreate( $class, \@copy ); + $self->{_pm}->finish; + } + else { + # In case there are duplicates, which could happen for merged members. + if ( $class eq 'RT::GroupMember' ) { + my %added; + @$items = grep { !$added{ $_->[1]{GroupId} }{ $_->[1]{MemberId} }++ } @$items; + } + + my $map = $self->_BatchCreate( $class, $items ); + $self->{ObjectCount}{$class} += @$items; + @$items = (); + + if ($map) { + $self->{UIDs}{$_} = $map->{$_} for keys %$map; + + my %ticket_map; + for my $uid ( keys %$map ) { + my ( $class, $id ) = split /-/, $map->{$uid}, 2; + + $self->Resolve( $uid => $class, $id ); + if ( $class eq 'RT::User' && $uid =~ /-RT_System$/ ) { + RT->InitSystemObjects; + } + elsif ( $class eq 'RT::Attribute' ) { + my $attr = RT::Attribute->new( RT->SystemUser ); + $attr->Load($id); + $attr->PostInflate( $self, $uid ); + } + elsif ( $self->{OriginalId} && $class eq 'RT::Ticket' ) { + my ($orig_id) = ( $uid =~ /-(\d+)$/ ); + $ticket_map{$id} = $orig_id; + } + } + + $self->CreateOriginalIdOCFVs( \%ticket_map ) if %ticket_map; + } + return 1; + } +} + +sub _BatchCreate { + my $self = shift; + my $class = shift; + my $items = shift or return; + return unless @$items; + + my %map; + + # Do not actually insert, just get the SQL, with sorted field/value pairs + local *RT::Handle::Insert = sub { + my $self = shift; + my $table = shift; + my %attr = @_; + return $self->InsertQueryString( $table, map { $_ => $attr{$_} } sort keys %attr ); + }; + + my %query; + for (@$items) { + my ( $uid, $data ) = @$_; + my $obj = $class->new( RT->SystemUser ); + + my $id = $data->{id} || $self->NextId($class); + my ( $sql, @bind ) = $obj->DBIx::SearchBuilder::Record::Create( %$data, id => $id ); + $map{$uid} = join '-', $class, $id unless $self->{Clone}; + push @{ $query{$sql} }, \@bind; + } + + my $dbh = $RT::Handle->dbh; + + for my $sql ( keys %query ) { + + my $count = @{ $query{$sql} }; + my $values_paren; + if ( $sql =~ /(\(\?.+?\))/i ) { + $values_paren = $1; + } + + # DBs have placeholder limitations(64k for Pg), here we replace + # placeholders to support bigger batch sizes. The performance is similar. + my $batch_sql + = $RT::Handle->FillIn( $sql . ( ", $values_paren" x ( $count - 1 ) ), [ map @$_, @{ $query{$sql} } ] ); + $self->RunSQL($batch_sql); + } + + # Clone doesn't need to return anything + return $self->{Clone} ? () : \%map; +} + +sub CreateOriginalIdOCFVs { + my $self = shift; + my $ticket_map = shift; + if ( %$ticket_map ) { + my $sql + = 'INSERT INTO ObjectCustomFieldValues (CustomField, ObjectType, ObjectId, Content, Creator) VALUES '; + my @values; + if ( !$self->{ExcludeOrganization} ) { + $ticket_map->{$_} = "$self->{Organization}:$ticket_map->{$_}" for keys %$ticket_map; + } + + my $creator = RT->SystemUser->Id; + for my $id ( sort { $a <=> $b } keys %$ticket_map ) { + push @values, qq{($self->{OriginalId}, 'RT::Ticket', $id, '$ticket_map->{$id}', $creator)}; + } + $sql .= join ',', @values; + $self->RunSQL($sql); + } +} + sub RunSQL { my $self = shift; my $rv; diff --git a/sbin/rt-importer.in b/sbin/rt-importer.in index bd9b9ae587..2c8e56c039 100644 --- a/sbin/rt-importer.in +++ b/sbin/rt-importer.in @@ -104,6 +104,8 @@ GetOptions( "batch-user-principals=i", "batch-group-principals=i", + "batch-size=i", + "max-processes=i", "dump=s@", ) or Pod::Usage::pod2usage(); @@ -168,6 +170,8 @@ my $import = RT::Migrate::Importer::File->new( AutoCommit => $OPT{'auto-commit'}, BatchUserPrincipals => $OPT{'batch-user-principals'}, BatchGroupPrincipals => $OPT{'batch-group-principals'}, + BatchSize => $OPT{'batch-size'}, + MaxProcesses => $OPT{'max-processes'} || 10, HandleError => $error_handler, ); @@ -317,6 +321,18 @@ The number of user/group principals to create in batch beforehand. Default is 0. This is to improve performance for not-cloned serialized data of big instances, usually you don't need to specify this. +=item B<--batch-size> I<BATCH_SIZE> + +Create objects in batch. Default is 0, meaning batch processing is not +enabled. This is for data serialized with C<--clone> or C<--all>. For cloned +serialized data, each batch processing will also take place in a separate +child process. + +=item B<--max-processes> I<MAX_PROCESSES> + +The number of max allowed child processes for batch processing. Default is +10. This is for cloned serialized data only. + =back |