Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejd2sql.erl')
-rw-r--r--src/ejd2sql.erl294
1 files changed, 163 insertions, 131 deletions
diff --git a/src/ejd2sql.erl b/src/ejd2sql.erl
index 7bace05dd..9b7fdbbef 100644
--- a/src/ejd2sql.erl
+++ b/src/ejd2sql.erl
@@ -30,12 +30,12 @@
-include("logger.hrl").
-include("ejabberd_sql_pt.hrl").
--export([export/2, export/3, import_file/2, import/2,
- import/3, delete/1]).
+-export([export/2, export/3, import/3, import/4, delete/1, import_info/1]).
+
-define(MAX_RECORDS_PER_TRANSACTION, 100).
--record(dump, {fd, cont = start}).
+-record(sql_dump, {fd, type}).
%%%----------------------------------------------------------------------
%%% API
@@ -50,13 +50,14 @@
modules() ->
[ejabberd_auth,
mod_announce,
+ mod_caps,
mod_irc,
mod_last,
mod_muc,
mod_offline,
mod_privacy,
mod_private,
- %% mod_pubsub,
+ mod_pubsub,
mod_roster,
mod_shared_roster,
mod_vcard,
@@ -100,49 +101,44 @@ delete(Server, Module) ->
delete(LServer, Table, ConvertFun)
end, Module:export(Server)).
-import_file(Server, FileName) when is_binary(FileName) ->
- import(Server, binary_to_list(FileName));
-import_file(Server, FileName) ->
- case disk_log:open([{name, make_ref()},
- {file, FileName},
- {mode, read_only}]) of
- {ok, Fd} ->
- LServer = jid:nameprep(Server),
- Mods = [{Mod, gen_mod:db_type(LServer, Mod)}
- || Mod <- modules(), gen_mod:is_loaded(LServer, Mod)],
- AuthMods = case lists:member(ejabberd_auth_mnesia,
- ejabberd_auth:auth_modules(LServer)) of
- true ->
- [{ejabberd_auth, mnesia}];
- false ->
- []
- end,
- import_dump(LServer, AuthMods ++ Mods, #dump{fd = Fd});
- Err ->
- exit(Err)
- end.
-
-import(Server, Output) ->
- import(Server, Output, [{fast, true}]).
-
-import(Server, Output, Opts) ->
- LServer = jid:nameprep(iolist_to_binary(Server)),
- Modules = modules(),
- IO = prepare_output(Output, disk_log),
+import(Server, Dir, ToType) ->
lists:foreach(
- fun(Module) ->
- import(LServer, IO, Opts, Module)
- end, Modules),
- close_output(Output, IO).
+ fun(Mod) ->
+ ?INFO_MSG("importing ~p...", [Mod]),
+ import(Mod, Server, Dir, ToType)
+ end, modules()).
-import(Server, Output, Opts, Module) ->
+import(Mod, Server, Dir, ToType) ->
LServer = jid:nameprep(iolist_to_binary(Server)),
- IO = prepare_output(Output, disk_log),
+ try Mod:import_start(LServer, ToType)
+ catch error:undef -> ok end,
lists:foreach(
- fun({SelectQuery, ConvertFun}) ->
- import(LServer, SelectQuery, IO, ConvertFun, Opts)
- end, Module:import(Server)),
- close_output(Output, IO).
+ fun({File, Tab, _Mod, FieldsNumber}) ->
+ FileName = filename:join([Dir, File]),
+ case open_sql_dump(FileName) of
+ {ok, #sql_dump{type = FromType} = Dump} ->
+ import_rows(LServer, {sql, FromType}, ToType,
+ Tab, Mod, Dump, FieldsNumber),
+ close_sql_dump(Dump);
+ {error, enoent} ->
+ ok;
+ eof ->
+ ?INFO_MSG("It seems like SQL dump ~s is empty", [FileName]);
+ Err ->
+ ?ERROR_MSG("Failed to open SQL dump ~s: ~s",
+ [FileName, format_error(Err)])
+ end
+ end, import_info(Mod)),
+ try Mod:import_stop(LServer, ToType)
+ catch error:undef -> ok end.
+
+import_info(Mod) ->
+ Info = Mod:import_info(),
+ lists:map(
+ fun({Tab, FieldsNum}) ->
+ FileName = <<Tab/binary, ".txt">>,
+ {FileName, Tab, Mod, FieldsNum}
+ end, Info).
%%%----------------------------------------------------------------------
%%% Internal functions
@@ -200,79 +196,6 @@ delete(LServer, Table, ConvertFun) ->
end,
mnesia:transaction(F).
-import(LServer, SelectQuery, IO, ConvertFun, Opts) ->
- F = case proplists:get_bool(fast, Opts) of
- true ->
- fun() ->
- case ejabberd_sql:sql_query_t(SelectQuery) of
- {selected, _, Rows} ->
- lists:foldl(fun process_sql_row/2,
- {IO, ConvertFun, undefined}, Rows);
- Err ->
- erlang:error(Err)
- end
- end;
- false ->
- fun() ->
- ejabberd_sql:sql_query_t(
- [iolist_to_binary(
- [<<"declare c cursor for ">>, SelectQuery])]),
- fetch(IO, ConvertFun, undefined)
- end
- end,
- ejabberd_sql:sql_transaction(LServer, F).
-
-fetch(IO, ConvertFun, PrevRow) ->
- case ejabberd_sql:sql_query_t([<<"fetch c;">>]) of
- {selected, _, [Row]} ->
- process_sql_row(Row, {IO, ConvertFun, PrevRow}),
- fetch(IO, ConvertFun, Row);
- {selected, _, []} ->
- ok;
- Err ->
- erlang:error(Err)
- end.
-
-process_sql_row(Row, {IO, ConvertFun, PrevRow}) when Row == PrevRow ->
- %% Avoid calling ConvertFun with the same input
- {IO, ConvertFun, Row};
-process_sql_row(Row, {IO, ConvertFun, _PrevRow}) ->
- case catch ConvertFun(Row) of
- {'EXIT', _} = Err ->
- ?ERROR_MSG("failed to convert ~p: ~p", [Row, Err]);
- Term ->
- ok = disk_log:log(IO#dump.fd, Term)
- end,
- {IO, ConvertFun, Row}.
-
-import_dump(LServer, Mods, #dump{fd = Fd, cont = Cont}) ->
- case disk_log:chunk(Fd, Cont) of
- {NewCont, Terms} ->
- import_terms(LServer, Mods, Terms),
- import_dump(LServer, Mods, #dump{fd = Fd, cont = NewCont});
- eof ->
- ok;
- Err ->
- exit(Err)
- end.
-
-import_terms(LServer, Mods, [Term|Terms]) ->
- import_term(LServer, Mods, Term),
- import_terms(LServer, Mods, Terms);
-import_terms(_LServer, _Mods, []) ->
- ok.
-
-import_term(LServer, [{Mod, DBType}|Mods], Term) ->
- case catch Mod:import(LServer, DBType, Term) of
- pass -> import_term(LServer, Mods, Term);
- ok -> ok;
- Err ->
- ?ERROR_MSG("failed to import ~p for module ~p: ~p",
- [Term, Mod, Err])
- end;
-import_term(_LServer, [], _Term) ->
- ok.
-
prepare_output(FileName) ->
prepare_output(FileName, normal).
@@ -285,25 +208,11 @@ prepare_output(FileName, normal) when is_list(FileName) ->
Err ->
exit(Err)
end;
-prepare_output(FileName, disk_log) when is_list(FileName) ->
- case disk_log:open([{name, make_ref()},
- {repair, truncate},
- {file, FileName}]) of
- {ok, Fd} ->
- #dump{fd = Fd};
- Err ->
- exit(Err)
- end;
prepare_output(Output, _Type) ->
Output.
close_output(FileName, Fd) when FileName /= Fd ->
- case Fd of
- #dump{} ->
- disk_log:close(Fd#dump.fd);
- _ ->
- file:close(Fd)
- end,
+ file:close(Fd),
ok;
close_output(_, _) ->
ok.
@@ -321,6 +230,129 @@ flatten1([H|T], Acc) ->
flatten1([], Acc) ->
Acc.
+import_rows(LServer, FromType, ToType, Tab, Mod, Dump, FieldsNumber) ->
+ case read_row_from_sql_dump(Dump, FieldsNumber) of
+ {ok, Fields} ->
+ case catch Mod:import(LServer, FromType, ToType, Tab, Fields) of
+ ok ->
+ ok;
+ Err ->
+ ?ERROR_MSG("Failed to import fields ~p for tab ~p: ~p",
+ [Fields, Tab, Err])
+ end,
+ import_rows(LServer, FromType, ToType,
+ Tab, Mod, Dump, FieldsNumber);
+ eof ->
+ ok;
+ Err ->
+ ?ERROR_MSG("Failed to read row from SQL dump: ~s",
+ [format_error(Err)])
+ end.
+
+open_sql_dump(FileName) ->
+ case file:open(FileName, [raw, read, binary, read_ahead]) of
+ {ok, Fd} ->
+ case file:read(Fd, 11) of
+ {ok, <<"PGCOPY\n", 16#ff, "\r\n", 0>>} ->
+ case skip_pgcopy_header(Fd) of
+ ok ->
+ {ok, #sql_dump{fd = Fd, type = pgsql}};
+ Err ->
+ Err
+ end;
+ {ok, _} ->
+ file:position(Fd, 0),
+ {ok, #sql_dump{fd = Fd, type = mysql}};
+ Err ->
+ Err
+ end;
+ Err ->
+ Err
+ end.
+
+close_sql_dump(#sql_dump{fd = Fd}) ->
+ file:close(Fd).
+
+read_row_from_sql_dump(#sql_dump{fd = Fd, type = pgsql}, _) ->
+ case file:read(Fd, 2) of
+ {ok, <<(-1):16/signed>>} ->
+ eof;
+ {ok, <<FieldsNum:16>>} ->
+ read_fields(Fd, FieldsNum, []);
+ {ok, _} ->
+ {error, eof};
+ eof ->
+ {error, eof};
+ {error, _} = Err ->
+ Err
+ end;
+read_row_from_sql_dump(#sql_dump{fd = Fd, type = mysql}, FieldsNum) ->
+ read_lines(Fd, FieldsNum, <<"">>, []).
+
+skip_pgcopy_header(Fd) ->
+ try
+ {ok, <<_:4/binary, ExtSize:32>>} = file:read(Fd, 8),
+ {ok, <<_:ExtSize/binary>>} = file:read(Fd, ExtSize),
+ ok
+ catch error:{badmatch, {error, _} = Err} ->
+ Err;
+ error:{badmatch, _} ->
+ {error, eof}
+ end.
+
+read_fields(_Fd, 0, Acc) ->
+ {ok, lists:reverse(Acc)};
+read_fields(Fd, N, Acc) ->
+ case file:read(Fd, 4) of
+ {ok, <<(-1):32/signed>>} ->
+ read_fields(Fd, N-1, [null|Acc]);
+ {ok, <<ValSize:32>>} ->
+ case file:read(Fd, ValSize) of
+ {ok, <<Val:ValSize/binary>>} ->
+ read_fields(Fd, N-1, [Val|Acc]);
+ {ok, _} ->
+ {error, eof};
+ Err ->
+ Err
+ end;
+ {ok, _} ->
+ {error, eof};
+ eof ->
+ {error, eof};
+ {error, _} = Err ->
+ Err
+ end.
+
+read_lines(_Fd, 0, <<"">>, Acc) ->
+ {ok, lists:reverse(Acc)};
+read_lines(Fd, N, Buf, Acc) ->
+ case file:read_line(Fd) of
+ {ok, Data} when size(Data) >= 2 ->
+ Size = size(Data) - 2,
+ case Data of
+ <<Val:Size/binary, 0, $\n>> ->
+ NewBuf = <<Buf/binary, Val/binary>>,
+ read_lines(Fd, N-1, <<"">>, [NewBuf|Acc]);
+ _ ->
+ NewBuf = <<Buf/binary, Data/binary>>,
+ read_lines(Fd, N, NewBuf, Acc)
+ end;
+ {ok, Data} ->
+ NewBuf = <<Buf/binary, Data/binary>>,
+ read_lines(Fd, N, NewBuf, Acc);
+ eof when Buf == <<"">>, Acc == [] ->
+ eof;
+ eof ->
+ {error, eof};
+ {error, _} = Err ->
+ Err
+ end.
+
+format_error({error, eof}) ->
+ "unexpected end of file";
+format_error({error, Posix}) ->
+ file:format_error(Posix).
+
format_queries(SQLs) ->
lists:map(
fun(#sql_query{} = SQL) ->