Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/bestpractical/rt.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsunnavy <sunnavy@bestpractical.com>2022-10-13 16:34:01 +0300
committersunnavy <sunnavy@bestpractical.com>2022-10-28 00:09:01 +0300
commit32f8de14d50cb998860b1ba5ebb0ce01f0a62fbe (patch)
tree16aa2c80c99257d0f71b2d6f3268929ded3bd8e8
parent88076841a7a95089b09b44bf19404a58576664c6 (diff)
Add batch mode to importer for data serialized with --clone or --all
By reducing SQL calls a lot, importer can run much faster now. Cloned data doesn't have dependencies to resolve, so we can also insert rows in parallel, which makes the speedup more noticable.
-rw-r--r--lib/RT/Migrate/Importer.pm281
-rw-r--r--sbin/rt-importer.in16
2 files changed, 267 insertions, 30 deletions
diff --git a/lib/RT/Migrate/Importer.pm b/lib/RT/Migrate/Importer.pm
index 91036377b1..5c355e444d 100644
--- a/lib/RT/Migrate/Importer.pm
+++ b/lib/RT/Migrate/Importer.pm
@@ -74,6 +74,8 @@ sub Init {
AutoCommit => 1,
BatchUserPrincipals => 0,
BatchGroupPrincipals => 0,
+ BatchSize => 0,
+ MaxProcesses => 10,
@_,
);
@@ -87,6 +89,8 @@ sub Init {
$self->{AutoCommit} = $args{AutoCommit};
$self->{$_} = $args{$_} for qw/BatchUserPrincipals BatchGroupPrincipals/;
+ $self->{BatchSize} = $args{BatchSize};
+ $self->{MaxProcesses} = $args{MaxProcesses} || 10;
$self->{HandleError} = sub { 0 };
$self->{HandleError} = $args{HandleError}
@@ -166,6 +170,7 @@ sub InitStream {
Type => 'FreeformSingle',
);
}
+ $self->{OriginalId} = $cf->Id;
}
if ( !$self->{Clone} ) {
@@ -221,6 +226,29 @@ sub NextPrincipalId {
return $id;
}
+sub NextId {
+ my $self = shift;
+ my $class = shift;
+ my $id = shift;
+
+ if ( $id ) {
+ $self->{_next_id}{$class} = $id;
+ return $id;
+ }
+
+ if ( defined $self->{_next_id}{$class} ) {
+ return $self->{_next_id}{$class}++;
+ }
+ return;
+}
+
+sub HasNextId {
+ my $self = shift;
+ my $class = shift;
+
+ return defined $self->{_next_id}{$class} ? 1 : 0;
+}
+
sub Resolve {
my $self = shift;
my ($uid, $class, $id) = @_;
@@ -235,27 +263,39 @@ sub Resolve {
return unless $self->{Pending}{$uid};
+ my @left;
for my $ref (@{$self->{Pending}{$uid}}) {
- my ($pclass, $pid) = @{ $self->Lookup( $ref->{uid} ) };
- my $obj = $pclass->new( RT->SystemUser );
- $obj->LoadByCols( Id => $pid );
- $obj->__Set(
- Field => $ref->{column},
- Value => $id,
- ) if defined $ref->{column};
- $obj->__Set(
- Field => $ref->{classcolumn},
- Value => $class,
- ) if defined $ref->{classcolumn};
- $obj->__Set(
- Field => $ref->{uri},
- Value => $self->LookupObj($uid)->URI,
- ) if defined $ref->{uri};
- if (my $method = $ref->{method}) {
- $obj->$method($self, $ref, $class, $id);
+ if ( my $lookup = $self->Lookup( $ref->{uid} ) ) {
+ my ( $pclass, $pid ) = @{$lookup};
+ my $obj = $pclass->new( RT->SystemUser );
+ $obj->LoadByCols( Id => $pid );
+ $obj->__Set(
+ Field => $ref->{column},
+ Value => $id,
+ ) if defined $ref->{column};
+ $obj->__Set(
+ Field => $ref->{classcolumn},
+ Value => $class,
+ ) if defined $ref->{classcolumn};
+ $obj->__Set(
+ Field => $ref->{uri},
+ Value => $self->LookupObj($uid)->URI,
+ ) if defined $ref->{uri};
+ if ( my $method = $ref->{method} ) {
+ $obj->$method( $self, $ref, $class, $id );
+ }
+ }
+ else {
+ push @left, $ref;
}
}
- delete $self->{Pending}{$uid};
+
+ if ( @left ) {
+ $self->{Pending}{$uid} = \@left;
+ }
+ else {
+ delete $self->{Pending}{$uid};
+ }
}
sub Lookup {
@@ -419,6 +459,23 @@ sub Create {
}
}
+ if ( $self->{BatchSize} && ( $self->{Clone} || $self->{All} ) ) {
+
+ # Finish up the previous class if there are any records left
+ my ($previous_class) = grep { $_ ne $class && @{ $self->{_batch}{$_} } } keys %{ $self->{_batch} || {} };
+ if ($previous_class) {
+ $self->BatchCreate( $previous_class, $self->{_batch}{$previous_class} );
+ }
+
+ if ( $data->{id} || $self->HasNextId($class) ) {
+ push @{ $self->{_batch}{$class} }, [ $uid, $data ];
+ if ( @{ $self->{_batch}{$class} } == $self->{BatchSize} ) {
+ $self->BatchCreate( $class, $self->{_batch}{$class} );
+ }
+ return;
+ }
+ }
+
my ($id, $msg) = eval {
# catch and rethrow on the outside so we can provide more info
local $SIG{__DIE__};
@@ -436,6 +493,16 @@ sub Create {
}
}
+ return $obj if $self->{Clone};
+
+ # Users/Groups have id set in PreInflate, no need to set here
+ if ( $self->{BatchSize}
+ && $self->{All}
+ && $class =~ /^RT::(?:Ticket|Transaction|Attachment|GroupMember|ObjectCustomFieldValue|Attribute|Link)$/ )
+ {
+ $self->NextId( $class, $id + 1 );
+ }
+
$self->{ObjectCount}{$class}++;
$self->Resolve( $uid => $class, $id );
@@ -513,18 +580,7 @@ sub ReadStream {
# If it's a ticket, we might need to create a
# TicketCustomField for the previous ID
if ($class eq "RT::Ticket" and $self->{OriginalId}) {
- my $value = $self->{ExcludeOrganization}
- ? $origid
- : $self->Organization . ":$origid";
-
- $obj->Load( $self->Lookup($uid)->[1] );
- my ($id, $msg) = $obj->AddCustomFieldValue(
- Field => $self->{OriginalId},
- Value => $value,
- RecordTransaction => 0,
- );
- warn "Failed to add custom field to $uid: $msg"
- unless $id;
+ $self->CreateOriginalIdOCFVs( { $self->Lookup($uid)->[1] => $origid } );
}
# If it's a CF, we don't know yet if it's global (the OCF
@@ -542,6 +598,33 @@ sub CloseStream {
$self->{Progress}->(undef, 'force') if $self->{Progress};
+ my %order = (
+ 'RT::Ticket' => 1,
+ 'RT::Group' => 2,
+ 'RT::GroupMember' => 3,
+ 'RT::ObjectCustomFieldValue' => 4,
+ 'RT::Transaction' => 5,
+ 'RT::Attachment' => 6,
+ );
+
+ for my $class (
+ sort { ( $order{$a} // 0 ) <=> ( $order{$b} // 0 ) }
+ grep { @{ $self->{_batch}{$_} } } keys %{ $self->{_batch} || {} }
+ )
+ {
+ $self->BatchCreate( $class, $self->{_batch}{$class} );
+ }
+
+ $self->{_pm}->wait_all_children if $self->{_pm};
+
+ # Now have all data imported, try to resolve again.
+ my @uids = grep { $self->{UIDs}{$_} } keys %{ $self->{Pending} };
+
+ for my $uid (@uids) {
+ my ( $class, $id ) = split /-/, $self->{UIDs}{$uid}, 2;
+ $self->Resolve( $uid, $class, $id );
+ }
+
# Fill CGM
# Groups
@@ -644,6 +727,144 @@ sub Progress {
return $self->{Progress} = $_[0];
}
+sub BatchCreate {
+ my $self = shift;
+ my $class = shift;
+ my $items = shift or return;
+
+ return unless @$items;
+
+ if ( $self->{Clone} ) {
+
+ if ( !$self->{_pm} ) {
+ require Parallel::ForkManager;
+ $self->{_pm} = Parallel::ForkManager->new( $self->{MaxProcesses} );
+ }
+
+ $self->{ObjectCount}{$class} += @$items;
+ my @copy = @$items;
+ @$items = ();
+
+ $RT::Handle->Commit unless $self->{AutoCommit};
+ $RT::Handle->Disconnect;
+
+ if ( $self->{_pm}->start ) {
+ $RT::Handle->Connect;
+ $RT::Handle->BeginTransaction unless $self->{AutoCommit};
+ return 1;
+ }
+
+ $RT::Handle->Connect;
+ $self->_BatchCreate( $class, \@copy );
+ $self->{_pm}->finish;
+ }
+ else {
+ # In case there are duplicates, which could happen for merged members.
+ if ( $class eq 'RT::GroupMember' ) {
+ my %added;
+ @$items = grep { !$added{ $_->[1]{GroupId} }{ $_->[1]{MemberId} }++ } @$items;
+ }
+
+ my $map = $self->_BatchCreate( $class, $items );
+ $self->{ObjectCount}{$class} += @$items;
+ @$items = ();
+
+ if ($map) {
+ $self->{UIDs}{$_} = $map->{$_} for keys %$map;
+
+ my %ticket_map;
+ for my $uid ( keys %$map ) {
+ my ( $class, $id ) = split /-/, $map->{$uid}, 2;
+
+ $self->Resolve( $uid => $class, $id );
+ if ( $class eq 'RT::User' && $uid =~ /-RT_System$/ ) {
+ RT->InitSystemObjects;
+ }
+ elsif ( $class eq 'RT::Attribute' ) {
+ my $attr = RT::Attribute->new( RT->SystemUser );
+ $attr->Load($id);
+ $attr->PostInflate( $self, $uid );
+ }
+ elsif ( $self->{OriginalId} && $class eq 'RT::Ticket' ) {
+ my ($orig_id) = ( $uid =~ /-(\d+)$/ );
+ $ticket_map{$id} = $orig_id;
+ }
+ }
+
+ $self->CreateOriginalIdOCFVs( \%ticket_map ) if %ticket_map;
+ }
+ return 1;
+ }
+}
+
+sub _BatchCreate {
+ my $self = shift;
+ my $class = shift;
+ my $items = shift or return;
+ return unless @$items;
+
+ my %map;
+
+ # Do not actually insert, just get the SQL, with sorted field/value pairs
+ local *RT::Handle::Insert = sub {
+ my $self = shift;
+ my $table = shift;
+ my %attr = @_;
+ return $self->InsertQueryString( $table, map { $_ => $attr{$_} } sort keys %attr );
+ };
+
+ my %query;
+ for (@$items) {
+ my ( $uid, $data ) = @$_;
+ my $obj = $class->new( RT->SystemUser );
+
+ my $id = $data->{id} || $self->NextId($class);
+ my ( $sql, @bind ) = $obj->DBIx::SearchBuilder::Record::Create( %$data, id => $id );
+ $map{$uid} = join '-', $class, $id unless $self->{Clone};
+ push @{ $query{$sql} }, \@bind;
+ }
+
+ my $dbh = $RT::Handle->dbh;
+
+ for my $sql ( keys %query ) {
+
+ my $count = @{ $query{$sql} };
+ my $values_paren;
+ if ( $sql =~ /(\(\?.+?\))/i ) {
+ $values_paren = $1;
+ }
+
+ # DBs have placeholder limitations(64k for Pg), here we replace
+ # placeholders to support bigger batch sizes. The performance is similar.
+ my $batch_sql
+ = $RT::Handle->FillIn( $sql . ( ", $values_paren" x ( $count - 1 ) ), [ map @$_, @{ $query{$sql} } ] );
+ $self->RunSQL($batch_sql);
+ }
+
+ # Clone doesn't need to return anything
+ return $self->{Clone} ? () : \%map;
+}
+
+sub CreateOriginalIdOCFVs {
+ my $self = shift;
+ my $ticket_map = shift;
+ if ( %$ticket_map ) {
+ my $sql
+ = 'INSERT INTO ObjectCustomFieldValues (CustomField, ObjectType, ObjectId, Content, Creator) VALUES ';
+ my @values;
+ if ( !$self->{ExcludeOrganization} ) {
+ $ticket_map->{$_} = "$self->{Organization}:$ticket_map->{$_}" for keys %$ticket_map;
+ }
+
+ my $creator = RT->SystemUser->Id;
+ for my $id ( sort { $a <=> $b } keys %$ticket_map ) {
+ push @values, qq{($self->{OriginalId}, 'RT::Ticket', $id, '$ticket_map->{$id}', $creator)};
+ }
+ $sql .= join ',', @values;
+ $self->RunSQL($sql);
+ }
+}
+
sub RunSQL {
my $self = shift;
my $rv;
diff --git a/sbin/rt-importer.in b/sbin/rt-importer.in
index bd9b9ae587..2c8e56c039 100644
--- a/sbin/rt-importer.in
+++ b/sbin/rt-importer.in
@@ -104,6 +104,8 @@ GetOptions(
"batch-user-principals=i",
"batch-group-principals=i",
+ "batch-size=i",
+ "max-processes=i",
"dump=s@",
) or Pod::Usage::pod2usage();
@@ -168,6 +170,8 @@ my $import = RT::Migrate::Importer::File->new(
AutoCommit => $OPT{'auto-commit'},
BatchUserPrincipals => $OPT{'batch-user-principals'},
BatchGroupPrincipals => $OPT{'batch-group-principals'},
+ BatchSize => $OPT{'batch-size'},
+ MaxProcesses => $OPT{'max-processes'} || 10,
HandleError => $error_handler,
);
@@ -317,6 +321,18 @@ The number of user/group principals to create in batch beforehand. Default is 0.
This is to improve performance for not-cloned serialized data of big instances,
usually you don't need to specify this.
+=item B<--batch-size> I<BATCH_SIZE>
+
+Create objects in batch. Default is 0, meaning batch processing is not
+enabled. This is for data serialized with C<--clone> or C<--all>. For cloned
+serialized data, each batch processing will also take place in a separate
+child process.
+
+=item B<--max-processes> I<MAX_PROCESSES>
+
+The number of max allowed child processes for batch processing. Default is
+10. This is for cloned serialized data only.
+
=back