From 40feed723d3b651ff19fdcd7fb31b1c3bad24943 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Thu, 6 Aug 2015 13:33:39 +0300 Subject: Add support for MUC MAM --- src/mod_mam.erl | 242 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 150 insertions(+), 92 deletions(-) (limited to 'src/mod_mam.erl') diff --git a/src/mod_mam.erl b/src/mod_mam.erl index d0ac38d54..1d4dd1a50 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -34,11 +34,12 @@ -export([user_send_packet/4, user_receive_packet/5, process_iq_v0_2/3, process_iq_v0_3/3, remove_user/2, - mod_opt_type/1]). + mod_opt_type/1, muc_process_iq/4, muc_filter_message/5]). -include_lib("stdlib/include/ms_transform.hrl"). -include("jlib.hrl"). -include("logger.hrl"). +-include("mod_muc_room.hrl"). -record(archive_msg, {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$2', @@ -46,7 +47,9 @@ timestamp = now() :: erlang:timestamp() | '_' | '$1', peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3', bare_peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3', - packet = #xmlel{} :: xmlel() | '_'}). + packet = #xmlel{} :: xmlel() | '_', + nick = <<"">> :: binary(), + type = chat :: chat | groupchat}). -record(archive_prefs, {us = {<<"">>, <<"">>} :: {binary(), binary()}, @@ -75,19 +78,16 @@ start(Host, Opts) -> user_receive_packet, 500), ejabberd_hooks:add(user_send_packet, Host, ?MODULE, user_send_packet, 500), + ejabberd_hooks:add(muc_filter_message, Host, ?MODULE, + muc_filter_message, 50), + ejabberd_hooks:add(muc_process_iq, Host, ?MODULE, + muc_process_iq, 50), ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50), ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE, remove_user, 50), ok. -init_db(odbc, Host) -> - Muchost = gen_mod:get_module_opt_host(Host, mod_muc, - <<"conference.@HOST@">>), - ets:insert(ejabberd_modules, {ejabberd_module, {mod_mam, Muchost}, - [{db_type, odbc}]}), - mnesia:dirty_write({local_config, {modules,Muchost}, - [{mod_mam, [{db_type, odbc}]}]}); init_db(mnesia, _Host) -> mnesia:create_table(archive_msg, [{disc_only_copies, [node()]}, @@ -114,6 +114,10 @@ stop(Host) -> user_send_packet, 500), ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE, user_receive_packet, 500), + ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE, + muc_filter_message, 50), + ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE, + muc_process_iq, 50), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_TMP), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_0), @@ -152,8 +156,7 @@ user_receive_packet(Pkt, C2SState, JID, Peer, _To) -> case should_archive(Pkt) of true -> NewPkt = strip_my_archived_tag(Pkt, LServer), - case store(C2SState, NewPkt, LUser, LServer, - Peer, true, recv) of + case store_msg(C2SState, NewPkt, LUser, LServer, Peer, recv) of {ok, ID} -> Archived = #xmlel{name = <<"archived">>, attrs = [{<<"by">>, LServer}, @@ -164,8 +167,6 @@ user_receive_packet(Pkt, C2SState, JID, Peer, _To) -> _ -> NewPkt end; - muc -> - Pkt; false -> Pkt end. @@ -174,29 +175,25 @@ user_send_packet(Pkt, C2SState, JID, Peer) -> LUser = JID#jid.luser, LServer = JID#jid.lserver, case should_archive(Pkt) of - S when (S==true) -> + true -> NewPkt = strip_my_archived_tag(Pkt, LServer), - store0(C2SState, jlib:replace_from_to(JID, Peer, NewPkt), - LUser, LServer, Peer, S, send), + store_msg(C2SState, jlib:replace_from_to(JID, Peer, NewPkt), + LUser, LServer, Peer, send), NewPkt; - S when (S==muc) -> - NewPkt = strip_my_archived_tag(Pkt, LServer), - case store0(C2SState, jlib:replace_from_to(JID, Peer, NewPkt), - LUser, LServer, Peer, S, send) of - {ok, ID} -> - By = jlib:jid_to_string(Peer), - Archived = #xmlel{name = <<"archived">>, - attrs = [{<<"by">>, By}, {<<"xmlns">>, ?NS_MAM_TMP}, - {<<"id">>, ID}]}, - NewEls = [Archived|NewPkt#xmlel.children], - NewPkt#xmlel{children = NewEls}; - _ -> - NewPkt - end; false -> Pkt end. +muc_filter_message(Pkt, #state{config = Config} = MUCState, + RoomJID, From, FromNick) -> + if Config#config.mam -> + NewPkt = strip_my_archived_tag(Pkt, MUCState#state.server_host), + store_muc(MUCState, NewPkt, RoomJID, From, FromNick), + NewPkt; + true -> + Pkt + end. + % Query archive v0.2 process_iq_v0_2(#jid{lserver = LServer} = From, #jid{lserver = LServer} = To, @@ -217,7 +214,7 @@ process_iq_v0_2(#jid{lserver = LServer} = From, (_) -> [] end, SubEl#xmlel.children), - process_iq(From, To, IQ, SubEl, Fs); + process_iq(LServer, From, To, IQ, SubEl, Fs, chat); process_iq_v0_2(From, To, IQ) -> process_iq(From, To, IQ). @@ -225,7 +222,28 @@ process_iq_v0_2(From, To, IQ) -> process_iq_v0_3(#jid{lserver = LServer} = From, #jid{lserver = LServer} = To, #iq{type = set, sub_el = #xmlel{name = <<"query">>} = SubEl} = IQ) -> - Fs = case {xml:get_subtag_with_xmlns(SubEl, <<"x">>, ?NS_XDATA), + process_iq(LServer, From, To, IQ, SubEl, get_xdata_fields(SubEl), chat); +process_iq_v0_3(From, To, IQ) -> + process_iq(From, To, IQ). + +muc_process_iq(#iq{type = set, + sub_el = #xmlel{name = <<"query">>, + attrs = Attrs} = SubEl} = IQ, + MUCState, From, To) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + ?NS_MAM_0 -> + LServer = MUCState#state.server_host, + Role = mod_muc_room:get_role(From, MUCState), + process_iq(LServer, From, To, IQ, SubEl, + get_xdata_fields(SubEl), {groupchat, Role}); + _ -> + IQ + end; +muc_process_iq(IQ, _MUCState, _From, _To) -> + IQ. + +get_xdata_fields(SubEl) -> + case {xml:get_subtag_with_xmlns(SubEl, <<"x">>, ?NS_XDATA), xml:get_subtag_with_xmlns(SubEl, <<"set">>, ?NS_RSM)} of {#xmlel{} = XData, false} -> jlib:parse_xdata_submit(XData); @@ -235,10 +253,7 @@ process_iq_v0_3(#jid{lserver = LServer} = From, [{<<"set">>, SubEl}]; {false, false} -> [] - end, - process_iq(From, To, IQ, SubEl, Fs); -process_iq_v0_3(From, To, IQ) -> - process_iq(From, To, IQ). + end. %%%=================================================================== %%% Internal functions @@ -276,7 +291,7 @@ process_iq(#jid{luser = LUser, lserver = LServer}, process_iq(_, _, #iq{sub_el = SubEl} = IQ) -> IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}. -process_iq(From, To, IQ, SubEl, Fs) -> +process_iq(LServer, From, To, IQ, SubEl, Fs, MsgType) -> case catch lists:foldl( fun({<<"start">>, [Data|_]}, {_, End, With, RSM}) -> {{_, _, _} = jlib:datetime_string_to_timestamp(Data), @@ -301,7 +316,8 @@ process_iq(From, To, IQ, SubEl, Fs) -> {'EXIT', _} -> IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]}; {Start, End, With, RSM} -> - select_and_send(From, To, Start, End, With, RSM, IQ) + select_and_send(LServer, From, To, Start, End, + With, RSM, IQ, MsgType) end. should_archive(#xmlel{name = <<"message">>} = Pkt) -> @@ -310,11 +326,7 @@ should_archive(#xmlel{name = <<"message">>} = Pkt) -> {<<"error">>, _} -> false; {<<"groupchat">>, _} -> - To = xml:get_attr_s(<<"to">>, Pkt#xmlel.attrs), - case (jlib:string_to_jid(To))#jid.resource of - <<"">> -> muc; - _ -> false - end; + false; {_, <<>>} -> %% Empty body false; @@ -370,43 +382,57 @@ should_archive_peer(C2SState, end end. -store0(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) -> - case Type of - muc -> store(C2SState, Pkt, Peer#jid.luser, LServer, - jlib:jid_replace_resource(Peer, LUser), Type, Dir); - true -> store(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) - end. +should_archive_muc(_MUCState, _Peer) -> + %% TODO + true. -store(C2SState, Pkt, LUser, LServer, Peer, Type, Dir) -> +store_msg(C2SState, Pkt, LUser, LServer, Peer, Dir) -> Prefs = get_prefs(LUser, LServer), case should_archive_peer(C2SState, Prefs, Peer) of true -> - do_store(Pkt, LUser, LServer, Peer, Type, Dir, + US = {LUser, LServer}, + store(Pkt, LServer, US, chat, Peer, <<"">>, Dir, gen_mod:db_type(LServer, ?MODULE)); false -> pass end. -do_store(Pkt, LUser, LServer, Peer, Type, _Dir, mnesia) -> +store_muc(MUCState, Pkt, RoomJID, Peer, Nick) -> + case should_archive_muc(MUCState, Peer) of + true -> + LServer = MUCState#state.server_host, + {U, S, _} = jlib:jid_tolower(RoomJID), + store(Pkt, LServer, {U, S}, groupchat, Peer, Nick, recv, + gen_mod:db_type(LServer, ?MODULE)); + false -> + pass + end. + +store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, mnesia) -> LPeer = {PUser, PServer, _} = jlib:jid_tolower(Peer), - LServer2 = case Type of muc -> Peer#jid.lserver; _ -> LServer end, TS = now(), ID = jlib:integer_to_binary(now_to_usec(TS)), case mnesia:dirty_write( - #archive_msg{us = {LUser, LServer2}, + #archive_msg{us = {LUser, LServer}, id = ID, timestamp = TS, peer = LPeer, bare_peer = {PUser, PServer, <<>>}, + type = Type, + nick = Nick, packet = Pkt}) of ok -> {ok, ID}; Err -> Err end; -do_store(Pkt, LUser, LServer, Peer, _Type, _Dir, odbc) -> +store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, odbc) -> TSinteger = now_to_usec(now()), ID = TS = jlib:integer_to_binary(TSinteger), + SUser = case Type of + chat -> LUser; + groupchat -> jlib:jid_to_string({LUser, LHost, <<>>}) + end, BarePeer = jlib:jid_to_string( jlib:jid_tolower( jlib:jid_remove_resource(Peer))), @@ -417,13 +443,15 @@ do_store(Pkt, LUser, LServer, Peer, _Type, _Dir, odbc) -> case ejabberd_odbc:sql_query( LServer, [<<"insert into archive (username, timestamp, " - "peer, bare_peer, xml, txt) values (">>, - <<"'">>, ejabberd_odbc:escape(LUser), <<"', ">>, + "peer, bare_peer, xml, txt, kind, nick) values (">>, + <<"'">>, ejabberd_odbc:escape(SUser), <<"', ">>, <<"'">>, TS, <<"', ">>, <<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>, <<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>, <<"'">>, ejabberd_odbc:escape(XML), <<"', ">>, - <<"'">>, ejabberd_odbc:escape(Body), <<"');">>]) of + <<"'">>, ejabberd_odbc:escape(Body), <<"', ">>, + <<"'">>, jlib:atom_to_binary(Type), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(Nick), <<"');">>]) of {updated, _} -> {ok, ID}; Err -> @@ -507,34 +535,37 @@ get_prefs(LUser, LServer, odbc) -> error end. -select_and_send(#jid{lserver = LServer} = From, - To, Start, End, With, RSM, IQ) -> +select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType) -> DBType = case gen_mod:db_type(LServer, ?MODULE) of odbc -> {odbc, LServer}; DB -> DB end, - select_and_send(From, To, Start, End, With, RSM, IQ, - DBType). + select_and_send(LServer, From, To, Start, End, With, RSM, IQ, + MsgType, DBType). -select_and_send(From, To, Start, End, With, RSM, IQ, DBType) -> - {Msgs, IsComplete, Count} = select_and_start(From, To, Start, End, With, - RSM, DBType), +select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType, DBType) -> + {Msgs, IsComplete, Count} = select_and_start(LServer, From, To, Start, End, + With, RSM, MsgType, DBType), SortedMsgs = lists:keysort(2, Msgs), send(From, To, SortedMsgs, RSM, Count, IsComplete, IQ). -select_and_start(From, _To, StartUser, End, With, RSM, DB) -> - {JidRequestor, Start, With2} = case With of - {room, {LUserRoom, LServerRoom, <<>>} = WithJid} -> - JR = jlib:make_jid(LUserRoom,LServerRoom,<<>>), - St = StartUser, - {JR, St, WithJid}; +select_and_start(LServer, From, To, Start, End, With, RSM, MsgType, DBType) -> + case MsgType of + chat -> + case With of + {room, {_, _, <<"">>} = WithJID} -> + select(LServer, jlib:make_jid(WithJID), Start, End, + WithJID, RSM, MsgType, DBType); _ -> - {From, StartUser, With} - end, - select(JidRequestor, Start, End, With2, RSM, DB). + select(LServer, From, Start, End, + With, RSM, MsgType, DBType) + end; + {groupchat, _Role} -> + select(LServer, To, Start, End, With, RSM, MsgType, DBType) + end. -select(#jid{luser = LUser, lserver = LServer} = JidRequestor, - Start, End, With, RSM, mnesia) -> +select(_LServer, #jid{luser = LUser, lserver = LServer} = JidRequestor, + Start, End, With, RSM, MsgType, mnesia) -> MS = make_matchspec(LUser, LServer, Start, End, With), Msgs = mnesia:dirty_select(archive_msg, MS), {FilteredMsgs, IsComplete} = filter_by_rsm(Msgs, RSM), @@ -543,12 +574,16 @@ select(#jid{luser = LUser, lserver = LServer} = JidRequestor, fun(Msg) -> {Msg#archive_msg.id, jlib:binary_to_integer(Msg#archive_msg.id), - msg_to_el(Msg, JidRequestor)} + msg_to_el(Msg, MsgType, JidRequestor)} end, FilteredMsgs), IsComplete, Count}; -select(#jid{luser = LUser, lserver = LServer} = JidRequestor, - Start, End, With, RSM, {odbc, Host}) -> - {Query, CountQuery} = make_sql_query(LUser, LServer, - Start, End, With, RSM), +select(LServer, #jid{luser = LUser} = JidRequestor, + Start, End, With, RSM, MsgType, {odbc, Host}) -> + User = case MsgType of + chat -> LUser; + {groupchat, _Role} -> jlib:jid_to_string(JidRequestor) + end, + {Query, CountQuery} = make_sql_query(User, LServer, + Start, End, With, RSM), % XXX TODO from XEP-0313: % To conserve resources, a server MAY place a reasonable limit on % how many stanzas may be pushed to a client in one request. If a @@ -573,24 +608,31 @@ select(#jid{luser = LUser, lserver = LServer} = JidRequestor, {Res, true} end, {lists:map( - fun([TS, XML, PeerBin]) -> + fun([TS, XML, PeerBin, Kind, Nick]) -> #xmlel{} = El = xml_stream:parse_element(XML), Now = usec_to_now(jlib:binary_to_integer(TS)), PeerJid = jlib:jid_tolower(jlib:string_to_jid(PeerBin)), + T = if Kind /= <<"">> -> + jlib:binary_to_atom(Kind); + true -> chat + end, {TS, jlib:binary_to_integer(TS), msg_to_el(#archive_msg{timestamp = Now, packet = El, + type = T, + nick = Nick, peer = PeerJid}, + MsgType, JidRequestor)} end, Res1), IsComplete, jlib:binary_to_integer(Count)}; _ -> {[], false, 0} end. -msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, peer = Peer}, - JidRequestor) -> +msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, nick = Nick, peer = Peer}, + MsgType, JidRequestor) -> Delay = jlib:now_to_utc_string(TS), - Pkt = maybe_update_from_to(Pkt1, JidRequestor, Peer), + Pkt = maybe_update_from_to(Pkt1, JidRequestor, Peer, MsgType, Nick), #xmlel{name = <<"forwarded">>, attrs = [{<<"xmlns">>, ?NS_FORWARD}], children = [#xmlel{name = <<"delay">>, @@ -599,9 +641,9 @@ msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, peer = Peer}, xml:replace_tag_attr( <<"xmlns">>, <<"jabber:client">>, Pkt)]}. -maybe_update_from_to(Pkt, _JIDRequestor, undefined) -> +maybe_update_from_to(Pkt, _JIDRequestor, undefined, _Type, _Nick) -> Pkt; -maybe_update_from_to(Pkt, JidRequestor, Peer) -> +maybe_update_from_to(Pkt, JidRequestor, Peer, chat, _Nick) -> case xml:get_attr_s(<<"type">>, Pkt#xmlel.attrs) of <<"groupchat">> -> Pkt2 = xml:replace_tag_attr(<<"to">>, @@ -610,7 +652,23 @@ maybe_update_from_to(Pkt, JidRequestor, Peer) -> xml:replace_tag_attr(<<"from">>, jlib:jid_to_string(Peer), Pkt2); _ -> Pkt - end. + end; +maybe_update_from_to(#xmlel{children = Els} = Pkt, JidRequestor, + Peer, {groupchat, Role}, Nick) -> + Items = case Role of + moderator -> + [#xmlel{name = <<"x">>, + attrs = [{<<"xmlns">>, ?NS_MUC_ADMIN}], + children = + [#xmlel{name = <<"item">>, + attrs = [{<<"jid">>, + jlib:jid_to_string(Peer)}]}]}]; + _ -> + [] + end, + Pkt1 = Pkt#xmlel{children = Items ++ Els}, + Pkt2 = jlib:replace_from(jlib:jid_replace_resource(JidRequestor, Nick), Pkt1), + jlib:remove_attr(<<"to">>, Pkt2). send(From, To, Msgs, RSM, Count, IsComplete, #iq{sub_el = SubEl} = IQ) -> QID = xml:get_tag_attr_s(<<"queryid">>, SubEl), @@ -737,7 +795,7 @@ make_matchspec(LUser, LServer, Start, End, none) -> Msg end). -make_sql_query(LUser, _LServer, Start, End, With, RSM) -> +make_sql_query(User, _LServer, Start, End, With, RSM) -> {Max, Direction, ID} = case RSM of #rsm_in{} -> {RSM#rsm_in.max, @@ -795,9 +853,9 @@ make_sql_query(LUser, _LServer, Start, End, With, RSM) -> _ -> [] end, - SUser = ejabberd_odbc:escape(LUser), + SUser = ejabberd_odbc:escape(User), - Query = [<<"SELECT timestamp, xml, peer" + Query = [<<"SELECT timestamp, xml, peer, kind, nick" " FROM archive WHERE username='">>, SUser, <<"'">>, WithClause, StartClause, EndClause, PageClause], @@ -808,7 +866,7 @@ make_sql_query(LUser, _LServer, Start, End, With, RSM) -> % ID can be empty because of % XEP-0059: Result Set Management % 2.5 Requesting the Last Page in a Result Set - [<<"SELECT timestamp, xml, peer FROM (">>, Query, + [<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query, <<" ORDER BY timestamp DESC ">>, LimitClause, <<") AS t ORDER BY timestamp ASC;">>]; _ -> -- cgit v1.2.3