diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2015-06-22 16:56:08 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2015-06-22 16:56:08 +0300 |
commit | 83cce468a573d3ca58a0c6653a2ccc17c7b4dbc3 (patch) | |
tree | 0e8bffc0327e389ab9eda4b944d85b35fd1e90eb /src/mod_mam.erl | |
parent | 66310788848ef185f3831648b2abf67ab6ded7fa (diff) |
Add MAM (XEP-0313) support
Diffstat (limited to 'src/mod_mam.erl')
-rw-r--r-- | src/mod_mam.erl | 823 |
1 files changed, 823 insertions, 0 deletions
diff --git a/src/mod_mam.erl b/src/mod_mam.erl new file mode 100644 index 000000000..1de1d9a95 --- /dev/null +++ b/src/mod_mam.erl @@ -0,0 +1,823 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeniy Khramtsov <ekhramtsov@process-one.net> +%%% @doc +%%% Message Archive Management (XEP-0313) +%%% @end +%%% Created : 4 Jul 2013 by Evgeniy Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2013-2015 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%------------------------------------------------------------------- +-module(mod_mam). + +-behaviour(gen_mod). + +%% API +-export([start/2, stop/1]). + +-export([user_send_packet/4, user_receive_packet/5, + process_iq/3, remove_user/2, mod_opt_type/1]). + +-include_lib("stdlib/include/ms_transform.hrl"). +-include("jlib.hrl"). +-include("logger.hrl"). + +-record(archive_msg, + {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$2', + id = <<>> :: binary() | '_', + timestamp = now() :: erlang:timestamp() | '_' | '$1', + peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3', + bare_peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3', + packet = #xmlel{} :: xmlel() | '_'}). + +-record(archive_prefs, + {us = {<<"">>, <<"">>} :: {binary(), binary()}, + default = never :: never | always | roster, + always = [] :: [ljid()], + never = [] :: [ljid()]}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start(Host, Opts) -> + IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1, + one_queue), + DBType = gen_mod:db_type(Host, Opts), + init_db(DBType, Host), + init_cache(DBType, Opts), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, + ?NS_MAM_TMP, ?MODULE, process_iq, IQDisc), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, + ?NS_MAM_TMP, ?MODULE, process_iq, IQDisc), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, + ?NS_MAM_0, ?MODULE, process_iq, IQDisc), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, + ?NS_MAM_0, ?MODULE, process_iq, IQDisc), + ejabberd_hooks:add(user_receive_packet, Host, ?MODULE, + user_receive_packet, 500), + ejabberd_hooks:add(user_send_packet, Host, ?MODULE, + user_send_packet, 500), + ejabberd_hooks:add(remove_user, Host, ?MODULE, + remove_user, 50), + ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, + remove_user, 50), + ok. + +init_db(odbc, Host) -> + Muchost = gen_mod:get_module_opt_host(Host, mod_muc, + <<"conference.@HOST@">>), + ets:insert(ejabberd_modules, {ejabberd_module, {mod_mam, Muchost}, + [{db_type, odbc}]}), + mnesia:dirty_write({local_config, {modules,Muchost}, + [{mod_mam, [{db_type, odbc}]}]}); +init_db(mnesia, _Host) -> + mnesia:create_table(archive_msg, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, archive_msg)}]), + mnesia:create_table(archive_prefs, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, archive_prefs)}]); +init_db(_, _) -> + ok. + +init_cache(_DBType, Opts) -> + MaxSize = gen_mod:get_opt(cache_size, Opts, + fun(I) when is_integer(I), I>0 -> I end, + 1000), + LifeTime = gen_mod:get_opt(cache_life_time, Opts, + fun(I) when is_integer(I), I>0 -> I end, + timer:hours(1) div 1000), + cache_tab:new(archive_prefs, [{max_size, MaxSize}, + {life_time, LifeTime}]). + +stop(Host) -> + ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, + user_send_packet, 500), + ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE, + user_receive_packet, 500), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_TMP), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_0), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_0), + ejabberd_hooks:delete(remove_user, Host, ?MODULE, + remove_user, 50), + ejabberd_hooks:delete(anonymous_purge_hook, Host, + ?MODULE, remove_user, 50), + ok. + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + remove_user(LUser, LServer, + gen_mod:db_type(LServer, ?MODULE)). + +remove_user(LUser, LServer, mnesia) -> + US = {LUser, LServer}, + F = fun () -> + mnesia:delete({archive_msg, US}), + mnesia:delete({archive_prefs, US}) + end, + mnesia:transaction(F); +remove_user(LUser, LServer, odbc) -> + SUser = ejabberd_odbc:escape(LUser), + ejabberd_odbc:sql_query( + LServer, + [<<"delete from archive where username='">>, SUser, <<"';">>]), + ejabberd_odbc:sql_query( + LServer, + [<<"delete from archive_prefs where username='">>, SUser, <<"';">>]). + +user_receive_packet(Pkt, C2SState, JID, Peer, _To) -> + LUser = JID#jid.luser, + LServer = JID#jid.lserver, + case should_archive(Pkt) of + true -> + NewPkt = strip_my_archived_tag(Pkt, LServer), + case store(C2SState, NewPkt, LUser, LServer, + Peer, true, recv) of + {ok, ID} -> + Archived = #xmlel{name = <<"archived">>, + attrs = [{<<"by">>, LServer}, + {<<"xmlns">>, ?NS_MAM_TMP}, + {<<"id">>, ID}]}, + NewEls = [Archived|NewPkt#xmlel.children], + NewPkt#xmlel{children = NewEls}; + _ -> + NewPkt + end; + muc -> + Pkt; + false -> + Pkt + end. + +user_send_packet(Pkt, C2SState, JID, Peer) -> + LUser = JID#jid.luser, + LServer = JID#jid.lserver, + case should_archive(Pkt) of + S when (S==true) -> + NewPkt = strip_my_archived_tag(Pkt, LServer), + store0(C2SState, jlib:replace_from_to(JID, Peer, NewPkt), + LUser, LServer, Peer, S, send), + NewPkt; + S when (S==muc) -> + NewPkt = strip_my_archived_tag(Pkt, LServer), + case store0(C2SState, jlib:replace_from_to(JID, Peer, NewPkt), + LUser, LServer, Peer, S, send) of + {ok, ID} -> + By = jlib:jid_to_string(Peer), + Archived = #xmlel{name = <<"archived">>, + attrs = [{<<"by">>, By}, {<<"xmlns">>, ?NS_MAM_TMP}, + {<<"id">>, ID}]}, + NewEls = [Archived|NewPkt#xmlel.children], + NewPkt#xmlel{children = NewEls}; + _ -> + NewPkt + end; + false -> + Pkt + end. + +process_iq(#jid{lserver = LServer} = From, + #jid{lserver = LServer} = To, + #iq{type = get, sub_el = #xmlel{name = <<"query">>} = SubEl} = IQ) -> + NS = xml:get_tag_attr_s(<<"xmlns">>, SubEl), + Fs = case NS of + ?NS_MAM_TMP -> + lists:flatmap( + fun(#xmlel{name = <<"start">>} = El) -> + [{<<"start">>, [xml:get_tag_cdata(El)]}]; + (#xmlel{name = <<"end">>} = El) -> + [{<<"end">>, [xml:get_tag_cdata(El)]}]; + (#xmlel{name = <<"with">>} = El) -> + [{<<"with">>, [xml:get_tag_cdata(El)]}]; + (#xmlel{name = <<"withroom">>} = El) -> + [{<<"withroom">>, [xml:get_tag_cdata(El)]}]; + (#xmlel{name = <<"withtext">>} = El) -> + [{<<"withtext">>, [xml:get_tag_cdata(El)]}]; + (#xmlel{name = <<"set">>}) -> + [{<<"set">>, SubEl}]; + (_) -> + [] + end, SubEl#xmlel.children); + ?NS_MAM_0 -> + case {xml:get_subtag_with_xmlns(SubEl, <<"x">>, ?NS_XDATA), + xml:get_subtag_with_xmlns(SubEl, <<"set">>, ?NS_RSM)} of + {#xmlel{} = XData, false} -> + jlib:parse_xdata_submit(XData); + {#xmlel{} = XData, #xmlel{}} -> + [{<<"set">>, SubEl} | jlib:parse_xdata_submit(XData)]; + {false, #xmlel{}} -> + [{<<"set">>, SubEl}]; + {false, false} -> + [] + end + end, + case catch lists:foldl( + fun({<<"start">>, [Data|_]}, {_, End, With, RSM}) -> + {{_, _, _} = jlib:datetime_string_to_timestamp(Data), + End, With, RSM}; + ({<<"end">>, [Data|_]}, {Start, _, With, RSM}) -> + {Start, + {_, _, _} = jlib:datetime_string_to_timestamp(Data), + With, RSM}; + ({<<"with">>, [Data|_]}, {Start, End, _, RSM}) -> + {Start, End, jlib:jid_tolower(jlib:string_to_jid(Data)), RSM}; + ({<<"withroom">>, [Data|_]}, {Start, End, _, RSM}) -> + {Start, End, + {room, jlib:jid_tolower(jlib:string_to_jid(Data))}, + RSM}; + ({<<"withtext">>, [Data|_]}, {Start, End, _, RSM}) -> + {Start, End, {text, Data}, RSM}; + ({<<"set">>, El}, {Start, End, With, _}) -> + {Start, End, With, jlib:rsm_decode(El)}; + (_, Acc) -> + Acc + end, {none, [], none, none}, Fs) of + {'EXIT', _} -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]}; + {Start, End, With, RSM} -> + select_and_send(From, To, Start, End, With, RSM, IQ) + end; +process_iq(#jid{luser = LUser, lserver = LServer}, + #jid{lserver = LServer}, + #iq{type = set, sub_el = #xmlel{name = <<"prefs">>} = SubEl} = IQ) -> + try {case xml:get_tag_attr_s(<<"default">>, SubEl) of + <<"always">> -> always; + <<"never">> -> never; + <<"roster">> -> roster + end, + lists:foldl( + fun(#xmlel{name = <<"always">>, children = Els}, {A, N}) -> + {get_jids(Els) ++ A, N}; + (#xmlel{name = <<"never">>, children = Els}, {A, N}) -> + {A, get_jids(Els) ++ N}; + (_, {A, N}) -> + {A, N} + end, {[], []}, SubEl#xmlel.children)} of + {Default, {Always, Never}} -> + case write_prefs(LUser, LServer, LServer, Default, + lists:usort(Always), lists:usort(Never)) of + ok -> + IQ#iq{type = result, sub_el = []}; + _Err -> + IQ#iq{type = error, + sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]} + end + catch _:_ -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]} + end; +process_iq(_, _, #iq{sub_el = SubEl} = IQ) -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +should_archive(#xmlel{name = <<"message">>} = Pkt) -> + case {xml:get_attr_s(<<"type">>, Pkt#xmlel.attrs), + xml:get_subtag_cdata(Pkt, <<"body">>)} of + {<<"error">>, _} -> + false; + {<<"groupchat">>, _} -> + To = xml:get_attr_s(<<"to">>, Pkt#xmlel.attrs), + case (jlib:string_to_jid(To))#jid.resource of + <<"">> -> muc; + _ -> false + end; + {_, <<>>} -> + %% Empty body + false; + _ -> + true + end; +should_archive(#xmlel{}) -> + false. + +strip_my_archived_tag(Pkt, LServer) -> + NewEls = lists:filter( + fun(#xmlel{name = <<"archived">>, + attrs = Attrs}) -> + case catch jlib:nameprep( + xml:get_attr_s( + <<"by">>, Attrs)) of + LServer -> + false; + _ -> + true + end; + (_) -> + true + end, Pkt#xmlel.children), + Pkt#xmlel{children = NewEls}. + +should_archive_peer(C2SState, + #archive_prefs{default = Default, + always = Always, + never = Never}, + Peer) -> + LPeer = jlib:jid_tolower(Peer), + case lists:member(LPeer, Always) of + true -> + true; + false -> + case lists:member(LPeer, Never) of + true -> + false; + false -> + case Default of + always -> true; + never -> false; + roster -> + case ejabberd_c2s:get_subscription( + LPeer, C2SState) of + both -> true; + from -> true; + to -> true; + _ -> false + end + end + end + end. + +store0(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) -> + case Type of + muc -> store(C2SState, Pkt, Peer#jid.luser, LServer, + jlib:jid_replace_resource(Peer, LUser), Type, Dir); + true -> store(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) + end. + +store(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) -> + Prefs = get_prefs(LUser, LServer), + case should_archive_peer(C2SState, Prefs, Peer) of + true -> + do_store(Pkt, LUser, LServer, Peer, Type, Dir, + gen_mod:db_type(LServer, ?MODULE)); + false -> + pass + end. + +do_store(Pkt, LUser, LServer, Peer, Type, _Dir, mnesia) -> + LPeer = {PUser, PServer, _} = jlib:jid_tolower(Peer), + LServer2 = case Type of muc -> Peer#jid.lserver; _ -> LServer end, + TS = now(), + ID = jlib:integer_to_binary(now_to_usec(TS)), + case mnesia:dirty_write( + #archive_msg{us = {LUser, LServer2}, + id = ID, + timestamp = TS, + peer = LPeer, + bare_peer = {PUser, PServer, <<>>}, + packet = Pkt}) of + ok -> + {ok, ID}; + Err -> + Err + end; +do_store(Pkt, LUser, LServer, Peer, _Type, _Dir, odbc) -> + TSinteger = now_to_usec(now()), + ID = TS = jlib:integer_to_binary(TSinteger), + BarePeer = jlib:jid_to_string( + jlib:jid_tolower( + jlib:jid_remove_resource(Peer))), + LPeer = jlib:jid_to_string( + jlib:jid_tolower(Peer)), + XML = xml:element_to_binary(Pkt), + Body = xml:get_subtag_cdata(Pkt, <<"body">>), + case ejabberd_odbc:sql_query( + LServer, + [<<"insert into archive (username, timestamp, " + "peer, bare_peer, xml, txt) values (">>, + <<"'">>, ejabberd_odbc:escape(LUser), <<"', ">>, + <<"'">>, TS, <<"', ">>, + <<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(XML), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(Body), <<"');">>]) of + {updated, _} -> + {ok, ID}; + Err -> + Err + end. + +write_prefs(LUser, LServer, Host, Default, Always, Never) -> + DBType = case gen_mod:db_type(Host, ?MODULE) of + odbc -> {odbc, Host}; + DB -> DB + end, + Prefs = #archive_prefs{us = {LUser, LServer}, + default = Default, + always = Always, + never = Never}, + cache_tab:dirty_insert( + archive_prefs, {LUser, LServer}, Prefs, + fun() -> write_prefs(LUser, LServer, Prefs, DBType) end). + +write_prefs(_LUser, _LServer, Prefs, mnesia) -> + mnesia:dirty_write(Prefs); +write_prefs(LUser, _LServer, #archive_prefs{default = Default, + never = Never, + always = Always}, + {odbc, Host}) -> + SUser = ejabberd_odbc:escape(LUser), + SDefault = erlang:atom_to_binary(Default, utf8), + SAlways = ejabberd_odbc:encode_term(Always), + SNever = ejabberd_odbc:encode_term(Never), + case update(Host, <<"archive_prefs">>, + [<<"username">>, <<"def">>, <<"always">>, <<"never">>], + [SUser, SDefault, SAlways, SNever], + [<<"username='">>, SUser, <<"'">>]) of + {updated, _} -> + ok; + Err -> + Err + end. + +get_prefs(LUser, LServer) -> + DBType = gen_mod:db_type(LServer, ?MODULE), + Res = cache_tab:lookup(archive_prefs, {LUser, LServer}, + fun() -> get_prefs(LUser, LServer, + DBType) + end), + case Res of + {ok, Prefs} -> + Prefs; + error -> + Default = gen_mod:get_module_opt( + LServer, ?MODULE, default, + fun(always) -> always; + (never) -> never; + (roster) -> roster + end, never), + #archive_prefs{us = {LUser, LServer}, default = Default} + end. + +get_prefs(LUser, LServer, mnesia) -> + case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of + [Prefs] -> + {ok, Prefs}; + _ -> + error + end; +get_prefs(LUser, LServer, odbc) -> + case ejabberd_odbc:sql_query( + LServer, + [<<"select def, always, never from archive_prefs ">>, + <<"where username='">>, + ejabberd_odbc:escape(LUser), <<"';">>]) of + {selected, _, [[SDefault, SAlways, SNever]]} -> + Default = erlang:binary_to_existing_atom(SDefault, utf8), + Always = ejabberd_odbc:decode_term(SAlways), + Never = ejabberd_odbc:decode_term(SNever), + {ok, #archive_prefs{us = {LUser, LServer}, + default = Default, + always = Always, + never = Never}}; + _ -> + error + end. + +select_and_send(#jid{lserver = LServer} = From, + To, Start, End, With, RSM, IQ) -> + DBType = case gen_mod:db_type(LServer, ?MODULE) of + odbc -> {odbc, LServer}; + DB -> DB + end, + select_and_send(From, To, Start, End, With, RSM, IQ, + DBType). + +select_and_send(From, To, Start, End, With, RSM, IQ, DBType) -> + {Msgs, Count} = select_and_start(From, To, Start, End, With, + RSM, DBType), + SortedMsgs = lists:keysort(2, Msgs), + send(From, To, SortedMsgs, RSM, Count, IQ). + +select_and_start(From, _To, StartUser, End, With, RSM, DB) -> + {JidRequestor, Start, With2} = case With of + {room, {LUserRoom, LServerRoom, <<>>} = WithJid} -> + JR = jlib:make_jid(LUserRoom,LServerRoom,<<>>), + St = StartUser, + {JR, St, WithJid}; + _ -> + {From, StartUser, With} + end, + select(JidRequestor, Start, End, With2, RSM, DB). + +select(#jid{luser = LUser, lserver = LServer} = JidRequestor, + Start, End, With, RSM, mnesia) -> + MS = make_matchspec(LUser, LServer, Start, End, With), + Msgs = mnesia:dirty_select(archive_msg, MS), + FilteredMsgs = filter_by_rsm(Msgs, RSM), + Count = length(Msgs), + {lists:map( + fun(Msg) -> + {Msg#archive_msg.id, + jlib:binary_to_integer(Msg#archive_msg.id), + msg_to_el(Msg, JidRequestor)} + end, FilteredMsgs), Count}; +select(#jid{luser = LUser, lserver = LServer} = JidRequestor, + Start, End, With, RSM, {odbc, Host}) -> + {Query, CountQuery} = make_sql_query(LUser, LServer, + Start, End, With, RSM), + case {ejabberd_odbc:sql_query(Host, Query), + ejabberd_odbc:sql_query(Host, CountQuery)} of + {{selected, _, Res}, {selected, _, [[Count]]}} -> + {lists:map( + fun([TS, XML, PeerBin]) -> + #xmlel{} = El = xml_stream:parse_element(XML), + Now = usec_to_now(jlib:binary_to_integer(TS)), + PeerJid = jlib:jid_tolower(jlib:string_to_jid(PeerBin)), + {TS, jlib:binary_to_integer(TS), + msg_to_el(#archive_msg{timestamp = Now, + packet = El, + peer = PeerJid}, + JidRequestor)} + end, Res), jlib:binary_to_integer(Count)}; + _ -> + {[], 0} + end. + +msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, peer = Peer}, + JidRequestor) -> + Delay = jlib:now_to_utc_string(TS), + Pkt = maybe_update_from_to(Pkt1, JidRequestor, Peer), + #xmlel{name = <<"forwarded">>, + attrs = [{<<"xmlns">>, ?NS_FORWARD}], + children = [#xmlel{name = <<"delay">>, + attrs = [{<<"xmlns">>, ?NS_DELAY}, + {<<"stamp">>, Delay}]}, + xml:replace_tag_attr( + <<"xmlns">>, <<"jabber:client">>, Pkt)]}. + +maybe_update_from_to(Pkt, _JIDRequestor, undefined) -> + Pkt; +maybe_update_from_to(Pkt, JidRequestor, Peer) -> + case xml:get_attr_s(<<"type">>, Pkt#xmlel.attrs) of + <<"groupchat">> -> + Pkt2 = xml:replace_tag_attr(<<"to">>, + jlib:jid_to_string(JidRequestor), + Pkt), + xml:replace_tag_attr(<<"from">>, jlib:jid_to_string(Peer), + Pkt2); + _ -> Pkt + end. + +send(From, To, Msgs, RSM, Count, #iq{sub_el = SubEl} = IQ) -> + QID = xml:get_tag_attr_s(<<"queryid">>, SubEl), + NS = xml:get_tag_attr_s(<<"xmlns">>, SubEl), + QIDAttr = if QID /= <<>> -> + [{<<"queryid">>, QID}]; + true -> + [] + end, + Els = lists:map( + fun({ID, _IDInt, El}) -> + #xmlel{name = <<"message">>, + children = [#xmlel{name = <<"result">>, + attrs = [{<<"xmlns">>, NS}, + {<<"id">>, ID}|QIDAttr], + children = [El]}]} + end, Msgs), + RSMOut = make_rsm_out(Msgs, RSM, Count, QIDAttr, NS), + case NS of + ?NS_MAM_TMP -> + lists:foreach( + fun(El) -> + ejabberd_router:route(To, From, El) + end, Els), + IQ#iq{type = result, sub_el = RSMOut}; + ?NS_MAM_0 -> + ejabberd_router:route( + To, From, jlib:iq_to_xml(IQ#iq{type = result, sub_el = []})), + lists:foreach( + fun(El) -> + ejabberd_router:route(To, From, El) + end, Els), + ejabberd_router:route( + To, From, #xmlel{name = <<"message">>, + children = RSMOut}), + ignore + end. + + +make_rsm_out(_Msgs, none, _Count, _QIDAttr, ?NS_MAM_TMP) -> + []; +make_rsm_out(_Msgs, none, _Count, QIDAttr, ?NS_MAM_0) -> + [#xmlel{name = <<"fin">>, attrs = [{<<"xmlns">>, ?NS_MAM_0}|QIDAttr]}]; +make_rsm_out([], #rsm_in{}, Count, QIDAttr, NS) -> + Tag = if NS == ?NS_MAM_TMP -> <<"query">>; + true -> <<"fin">> + end, + [#xmlel{name = Tag, attrs = [{<<"xmlns">>, NS}|QIDAttr], + children = jlib:rsm_encode(#rsm_out{count = Count})}]; +make_rsm_out([{FirstID, _, _}|_] = Msgs, #rsm_in{}, Count, QIDAttr, NS) -> + {LastID, _, _} = lists:last(Msgs), + Tag = if NS == ?NS_MAM_TMP -> <<"query">>; + true -> <<"fin">> + end, + [#xmlel{name = Tag, attrs = [{<<"xmlns">>, NS}|QIDAttr], + children = jlib:rsm_encode( + #rsm_out{first = FirstID, count = Count, + last = LastID})}]. + +filter_by_rsm(Msgs, none) -> + Msgs; +filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max =< 0 -> + []; +filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) -> + NewMsgs = case Direction of + aft -> + lists:filter( + fun(#archive_msg{id = I}) -> + I > ID + end, Msgs); + before -> + lists:foldl( + fun(#archive_msg{id = I} = Msg, Acc) when I < ID -> + [Msg|Acc]; + (_, Acc) -> + Acc + end, [], Msgs); + _ -> + Msgs + end, + filter_by_max(NewMsgs, Max). + +filter_by_max(Msgs, undefined) -> + Msgs; +filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 -> + lists:sublist(Msgs, Len); +filter_by_max(_Msgs, _Junk) -> + []. + +make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) -> + ets:fun2ms( + fun(#archive_msg{timestamp = TS, + us = US, + bare_peer = BPeer} = Msg) + when Start =< TS, End >= TS, + US == {LUser, LServer}, + BPeer == With -> + Msg + end); +make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) -> + ets:fun2ms( + fun(#archive_msg{timestamp = TS, + us = US, + peer = Peer} = Msg) + when Start =< TS, End >= TS, + US == {LUser, LServer}, + Peer == With -> + Msg + end); +make_matchspec(LUser, LServer, Start, End, none) -> + ets:fun2ms( + fun(#archive_msg{timestamp = TS, + us = US, + peer = Peer} = Msg) + when Start =< TS, End >= TS, + US == {LUser, LServer} -> + Msg + end). + +make_sql_query(LUser, _LServer, Start, End, With, RSM) -> + {Max, Direction, ID} = case RSM of + #rsm_in{} -> + {RSM#rsm_in.max, + RSM#rsm_in.direction, + RSM#rsm_in.id}; + none -> + {none, none, none} + end, + LimitClause = if is_integer(Max), Max >= 0 -> + [<<" limit ">>, jlib:integer_to_binary(Max)]; + true -> + [] + end, + WithClause = case With of + {text, <<>>} -> + []; + {text, Txt} -> + [<<" and match (txt) against ('">>, + ejabberd_odbc:escape(Txt), <<"')">>]; + {_, _, <<>>} -> + [<<" and bare_peer='">>, + ejabberd_odbc:escape(jlib:jid_to_string(With)), + <<"'">>]; + {_, _, _} -> + [<<" and peer='">>, + ejabberd_odbc:escape(jlib:jid_to_string(With)), + <<"'">>]; + none -> + [] + end, + DirectionClause = case catch jlib:binary_to_integer(ID) of + I when is_integer(I), I >= 0 -> + case Direction of + before -> + [<<" and timestamp < ">>, ID, + <<" order by timestamp desc">>]; + aft -> + [<<" and timestamp > ">>, ID, + <<" order by timestamp asc">>]; + _ -> + [] + end; + _ -> + [] + end, + StartClause = case Start of + {_, _, _} -> + [<<" and timestamp >= ">>, + jlib:integer_to_binary(now_to_usec(Start))]; + _ -> + [] + end, + EndClause = case End of + {_, _, _} -> + [<<" and timestamp <= ">>, + jlib:integer_to_binary(now_to_usec(End))]; + _ -> + [] + end, + SUser = ejabberd_odbc:escape(LUser), + {[<<"select timestamp, xml, peer from archive where username='">>, + SUser, <<"'">>] ++ WithClause ++ StartClause ++ EndClause ++ + DirectionClause ++ LimitClause ++ [<<";">>], + [<<"select count(*) from archive where username='">>, + SUser, <<"'">>] ++ WithClause ++ StartClause ++ EndClause ++ [<<";">>]}. + +now_to_usec({MSec, Sec, USec}) -> + (MSec*1000000 + Sec)*1000000 + USec. + +usec_to_now(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}. + +get_jids(Els) -> + lists:flatmap( + fun(#xmlel{name = <<"jid">>} = El) -> + J = jlib:string_to_jid(xml:get_tag_cdata(El)), + [jlib:jid_tolower(jlib:jid_remove_resource(J)), + jlib:jid_tolower(J)]; + (_) -> + [] + end, Els). + +update(LServer, Table, Fields, Vals, Where) -> + UPairs = lists:zipwith(fun (A, B) -> + <<A/binary, "='", B/binary, "'">> + end, + Fields, Vals), + case ejabberd_odbc:sql_query(LServer, + [<<"update ">>, Table, <<" set ">>, + join(UPairs, <<", ">>), <<" where ">>, Where, + <<";">>]) + of + {updated, 1} -> {updated, 1}; + _ -> + ejabberd_odbc:sql_query(LServer, + [<<"insert into ">>, Table, <<"(">>, + join(Fields, <<", ">>), <<") values ('">>, + join(Vals, <<"', '">>), <<"');">>]) + end. + +%% Almost a copy of string:join/2. +join([], _Sep) -> []; +join([H | T], Sep) -> [H, [[Sep, X] || X <- T]]. + +mod_opt_type(cache_life_time) -> + fun (I) when is_integer(I), I > 0 -> I end; +mod_opt_type(cache_size) -> + fun (I) when is_integer(I), I > 0 -> I end; +mod_opt_type(db_type) -> fun gen_mod:v_db/1; +mod_opt_type(default) -> + fun (always) -> always; + (never) -> never; + (roster) -> roster + end; +mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1; +mod_opt_type(store_body_only) -> + fun (B) when is_boolean(B) -> B end; +mod_opt_type(_) -> + [cache_life_time, cache_size, db_type, default, iqdisc, + store_body_only]. |