diff options
Diffstat (limited to 'src/ejd2sql.erl')
-rw-r--r-- | src/ejd2sql.erl | 294 |
1 files changed, 163 insertions, 131 deletions
diff --git a/src/ejd2sql.erl b/src/ejd2sql.erl index 7bace05dd..9b7fdbbef 100644 --- a/src/ejd2sql.erl +++ b/src/ejd2sql.erl @@ -30,12 +30,12 @@ -include("logger.hrl"). -include("ejabberd_sql_pt.hrl"). --export([export/2, export/3, import_file/2, import/2, - import/3, delete/1]). +-export([export/2, export/3, import/3, import/4, delete/1, import_info/1]). + -define(MAX_RECORDS_PER_TRANSACTION, 100). --record(dump, {fd, cont = start}). +-record(sql_dump, {fd, type}). %%%---------------------------------------------------------------------- %%% API @@ -50,13 +50,14 @@ modules() -> [ejabberd_auth, mod_announce, + mod_caps, mod_irc, mod_last, mod_muc, mod_offline, mod_privacy, mod_private, - %% mod_pubsub, + mod_pubsub, mod_roster, mod_shared_roster, mod_vcard, @@ -100,49 +101,44 @@ delete(Server, Module) -> delete(LServer, Table, ConvertFun) end, Module:export(Server)). -import_file(Server, FileName) when is_binary(FileName) -> - import(Server, binary_to_list(FileName)); -import_file(Server, FileName) -> - case disk_log:open([{name, make_ref()}, - {file, FileName}, - {mode, read_only}]) of - {ok, Fd} -> - LServer = jid:nameprep(Server), - Mods = [{Mod, gen_mod:db_type(LServer, Mod)} - || Mod <- modules(), gen_mod:is_loaded(LServer, Mod)], - AuthMods = case lists:member(ejabberd_auth_mnesia, - ejabberd_auth:auth_modules(LServer)) of - true -> - [{ejabberd_auth, mnesia}]; - false -> - [] - end, - import_dump(LServer, AuthMods ++ Mods, #dump{fd = Fd}); - Err -> - exit(Err) - end. - -import(Server, Output) -> - import(Server, Output, [{fast, true}]). - -import(Server, Output, Opts) -> - LServer = jid:nameprep(iolist_to_binary(Server)), - Modules = modules(), - IO = prepare_output(Output, disk_log), +import(Server, Dir, ToType) -> lists:foreach( - fun(Module) -> - import(LServer, IO, Opts, Module) - end, Modules), - close_output(Output, IO). + fun(Mod) -> + ?INFO_MSG("importing ~p...", [Mod]), + import(Mod, Server, Dir, ToType) + end, modules()). -import(Server, Output, Opts, Module) -> +import(Mod, Server, Dir, ToType) -> LServer = jid:nameprep(iolist_to_binary(Server)), - IO = prepare_output(Output, disk_log), + try Mod:import_start(LServer, ToType) + catch error:undef -> ok end, lists:foreach( - fun({SelectQuery, ConvertFun}) -> - import(LServer, SelectQuery, IO, ConvertFun, Opts) - end, Module:import(Server)), - close_output(Output, IO). + fun({File, Tab, _Mod, FieldsNumber}) -> + FileName = filename:join([Dir, File]), + case open_sql_dump(FileName) of + {ok, #sql_dump{type = FromType} = Dump} -> + import_rows(LServer, {sql, FromType}, ToType, + Tab, Mod, Dump, FieldsNumber), + close_sql_dump(Dump); + {error, enoent} -> + ok; + eof -> + ?INFO_MSG("It seems like SQL dump ~s is empty", [FileName]); + Err -> + ?ERROR_MSG("Failed to open SQL dump ~s: ~s", + [FileName, format_error(Err)]) + end + end, import_info(Mod)), + try Mod:import_stop(LServer, ToType) + catch error:undef -> ok end. + +import_info(Mod) -> + Info = Mod:import_info(), + lists:map( + fun({Tab, FieldsNum}) -> + FileName = <<Tab/binary, ".txt">>, + {FileName, Tab, Mod, FieldsNum} + end, Info). %%%---------------------------------------------------------------------- %%% Internal functions @@ -200,79 +196,6 @@ delete(LServer, Table, ConvertFun) -> end, mnesia:transaction(F). -import(LServer, SelectQuery, IO, ConvertFun, Opts) -> - F = case proplists:get_bool(fast, Opts) of - true -> - fun() -> - case ejabberd_sql:sql_query_t(SelectQuery) of - {selected, _, Rows} -> - lists:foldl(fun process_sql_row/2, - {IO, ConvertFun, undefined}, Rows); - Err -> - erlang:error(Err) - end - end; - false -> - fun() -> - ejabberd_sql:sql_query_t( - [iolist_to_binary( - [<<"declare c cursor for ">>, SelectQuery])]), - fetch(IO, ConvertFun, undefined) - end - end, - ejabberd_sql:sql_transaction(LServer, F). - -fetch(IO, ConvertFun, PrevRow) -> - case ejabberd_sql:sql_query_t([<<"fetch c;">>]) of - {selected, _, [Row]} -> - process_sql_row(Row, {IO, ConvertFun, PrevRow}), - fetch(IO, ConvertFun, Row); - {selected, _, []} -> - ok; - Err -> - erlang:error(Err) - end. - -process_sql_row(Row, {IO, ConvertFun, PrevRow}) when Row == PrevRow -> - %% Avoid calling ConvertFun with the same input - {IO, ConvertFun, Row}; -process_sql_row(Row, {IO, ConvertFun, _PrevRow}) -> - case catch ConvertFun(Row) of - {'EXIT', _} = Err -> - ?ERROR_MSG("failed to convert ~p: ~p", [Row, Err]); - Term -> - ok = disk_log:log(IO#dump.fd, Term) - end, - {IO, ConvertFun, Row}. - -import_dump(LServer, Mods, #dump{fd = Fd, cont = Cont}) -> - case disk_log:chunk(Fd, Cont) of - {NewCont, Terms} -> - import_terms(LServer, Mods, Terms), - import_dump(LServer, Mods, #dump{fd = Fd, cont = NewCont}); - eof -> - ok; - Err -> - exit(Err) - end. - -import_terms(LServer, Mods, [Term|Terms]) -> - import_term(LServer, Mods, Term), - import_terms(LServer, Mods, Terms); -import_terms(_LServer, _Mods, []) -> - ok. - -import_term(LServer, [{Mod, DBType}|Mods], Term) -> - case catch Mod:import(LServer, DBType, Term) of - pass -> import_term(LServer, Mods, Term); - ok -> ok; - Err -> - ?ERROR_MSG("failed to import ~p for module ~p: ~p", - [Term, Mod, Err]) - end; -import_term(_LServer, [], _Term) -> - ok. - prepare_output(FileName) -> prepare_output(FileName, normal). @@ -285,25 +208,11 @@ prepare_output(FileName, normal) when is_list(FileName) -> Err -> exit(Err) end; -prepare_output(FileName, disk_log) when is_list(FileName) -> - case disk_log:open([{name, make_ref()}, - {repair, truncate}, - {file, FileName}]) of - {ok, Fd} -> - #dump{fd = Fd}; - Err -> - exit(Err) - end; prepare_output(Output, _Type) -> Output. close_output(FileName, Fd) when FileName /= Fd -> - case Fd of - #dump{} -> - disk_log:close(Fd#dump.fd); - _ -> - file:close(Fd) - end, + file:close(Fd), ok; close_output(_, _) -> ok. @@ -321,6 +230,129 @@ flatten1([H|T], Acc) -> flatten1([], Acc) -> Acc. +import_rows(LServer, FromType, ToType, Tab, Mod, Dump, FieldsNumber) -> + case read_row_from_sql_dump(Dump, FieldsNumber) of + {ok, Fields} -> + case catch Mod:import(LServer, FromType, ToType, Tab, Fields) of + ok -> + ok; + Err -> + ?ERROR_MSG("Failed to import fields ~p for tab ~p: ~p", + [Fields, Tab, Err]) + end, + import_rows(LServer, FromType, ToType, + Tab, Mod, Dump, FieldsNumber); + eof -> + ok; + Err -> + ?ERROR_MSG("Failed to read row from SQL dump: ~s", + [format_error(Err)]) + end. + +open_sql_dump(FileName) -> + case file:open(FileName, [raw, read, binary, read_ahead]) of + {ok, Fd} -> + case file:read(Fd, 11) of + {ok, <<"PGCOPY\n", 16#ff, "\r\n", 0>>} -> + case skip_pgcopy_header(Fd) of + ok -> + {ok, #sql_dump{fd = Fd, type = pgsql}}; + Err -> + Err + end; + {ok, _} -> + file:position(Fd, 0), + {ok, #sql_dump{fd = Fd, type = mysql}}; + Err -> + Err + end; + Err -> + Err + end. + +close_sql_dump(#sql_dump{fd = Fd}) -> + file:close(Fd). + +read_row_from_sql_dump(#sql_dump{fd = Fd, type = pgsql}, _) -> + case file:read(Fd, 2) of + {ok, <<(-1):16/signed>>} -> + eof; + {ok, <<FieldsNum:16>>} -> + read_fields(Fd, FieldsNum, []); + {ok, _} -> + {error, eof}; + eof -> + {error, eof}; + {error, _} = Err -> + Err + end; +read_row_from_sql_dump(#sql_dump{fd = Fd, type = mysql}, FieldsNum) -> + read_lines(Fd, FieldsNum, <<"">>, []). + +skip_pgcopy_header(Fd) -> + try + {ok, <<_:4/binary, ExtSize:32>>} = file:read(Fd, 8), + {ok, <<_:ExtSize/binary>>} = file:read(Fd, ExtSize), + ok + catch error:{badmatch, {error, _} = Err} -> + Err; + error:{badmatch, _} -> + {error, eof} + end. + +read_fields(_Fd, 0, Acc) -> + {ok, lists:reverse(Acc)}; +read_fields(Fd, N, Acc) -> + case file:read(Fd, 4) of + {ok, <<(-1):32/signed>>} -> + read_fields(Fd, N-1, [null|Acc]); + {ok, <<ValSize:32>>} -> + case file:read(Fd, ValSize) of + {ok, <<Val:ValSize/binary>>} -> + read_fields(Fd, N-1, [Val|Acc]); + {ok, _} -> + {error, eof}; + Err -> + Err + end; + {ok, _} -> + {error, eof}; + eof -> + {error, eof}; + {error, _} = Err -> + Err + end. + +read_lines(_Fd, 0, <<"">>, Acc) -> + {ok, lists:reverse(Acc)}; +read_lines(Fd, N, Buf, Acc) -> + case file:read_line(Fd) of + {ok, Data} when size(Data) >= 2 -> + Size = size(Data) - 2, + case Data of + <<Val:Size/binary, 0, $\n>> -> + NewBuf = <<Buf/binary, Val/binary>>, + read_lines(Fd, N-1, <<"">>, [NewBuf|Acc]); + _ -> + NewBuf = <<Buf/binary, Data/binary>>, + read_lines(Fd, N, NewBuf, Acc) + end; + {ok, Data} -> + NewBuf = <<Buf/binary, Data/binary>>, + read_lines(Fd, N, NewBuf, Acc); + eof when Buf == <<"">>, Acc == [] -> + eof; + eof -> + {error, eof}; + {error, _} = Err -> + Err + end. + +format_error({error, eof}) -> + "unexpected end of file"; +format_error({error, Posix}) -> + file:format_error(Posix). + format_queries(SQLs) -> lists:map( fun(#sql_query{} = SQL) -> |