Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristophe Romain <christophe.romain@process-one.net>2017-08-14 16:25:45 +0300
committerChristophe Romain <christophe.romain@process-one.net>2017-08-14 16:25:45 +0300
commit68fb12153eb2e24ab18412ff9aa4f4a149f54018 (patch)
tree8dd8ebfba146d80e068153812ddeaae71b027226 /src
parente19d1e9571cef87dfca0805481dd7bfa6f2f9161 (diff)
Revert "Temporary remove recent last_item refactor"
This reverts commit 1820b4f63b5c08a7a5fc603a7161d5c5e7e92ab4.
Diffstat (limited to 'src')
-rw-r--r--src/mod_pubsub.erl412
1 files changed, 178 insertions, 234 deletions
diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl
index 194d12444..a67ae5bfc 100644
--- a/src/mod_pubsub.erl
+++ b/src/mod_pubsub.erl
@@ -52,7 +52,7 @@
%% exports for hooks
-export([presence_probe/3, caps_add/3, caps_update/3,
in_subscription/6, out_subscription/4,
- on_user_offline/3, remove_user/2,
+ on_user_online/1, on_user_offline/2, remove_user/2,
disco_local_identity/5, disco_local_features/5,
disco_local_items/5, disco_sm_identity/5,
disco_sm_features/5, disco_sm_items/5,
@@ -89,11 +89,7 @@
%% API and gen_server callbacks
-export([start/2, stop/1, init/1,
handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, depends/2, export/1]).
-
--export([send_loop/1, mod_opt_type/1]).
-
--define(LOOPNAME, ejabberd_mod_pubsub_loop).
+ terminate/2, code_change/3, depends/2, export/1, mod_opt_type/1]).
%%====================================================================
%% API
@@ -300,7 +296,9 @@ init([ServerHost, Opts]) ->
?MODULE, process_commands, IQDisc),
Plugins
end, Hosts),
- ejabberd_hooks:add(sm_remove_connection_hook, ServerHost,
+ ejabberd_hooks:add(c2s_session_opened, ServerHost,
+ ?MODULE, on_user_online, 75),
+ ejabberd_hooks:add(c2s_terminated, ServerHost,
?MODULE, on_user_offline, 75),
ejabberd_hooks:add(disco_local_identity, ServerHost,
?MODULE, disco_local_identity, 75),
@@ -337,34 +335,16 @@ init([ServerHost, Opts]) ->
false ->
ok
end,
- {_, State} = init_send_loop(ServerHost, Hosts),
- {ok, State}.
-
-init_send_loop(ServerHost, Hosts) ->
NodeTree = config(ServerHost, nodetree),
Plugins = config(ServerHost, plugins),
- LastItemCache = config(ServerHost, last_item_cache),
- MaxItemsNode = config(ServerHost, max_items_node),
PepMapping = config(ServerHost, pep_mapping),
- PepOffline = config(ServerHost, ignore_pep_from_offline),
- Access = config(ServerHost, access),
DBType = gen_mod:db_type(ServerHost, ?MODULE),
- State = #state{hosts = Hosts, server_host = ServerHost,
- access = Access, pep_mapping = PepMapping,
- ignore_pep_from_offline = PepOffline,
- last_item_cache = LastItemCache,
- max_items_node = MaxItemsNode, nodetree = NodeTree,
- plugins = Plugins, db_type = DBType},
- Proc = gen_mod:get_module_proc(ServerHost, ?LOOPNAME),
- Pid = case whereis(Proc) of
- undefined ->
- SendLoop = spawn(?MODULE, send_loop, [State]),
- register(Proc, SendLoop),
- SendLoop;
- Loop ->
- Loop
- end,
- {Pid, State}.
+ {ok, #state{hosts = Hosts, server_host = ServerHost,
+ access = Access, pep_mapping = PepMapping,
+ ignore_pep_from_offline = PepOffline,
+ last_item_cache = LastItemCache,
+ max_items_node = MaxItemsNode, nodetree = NodeTree,
+ plugins = Plugins, db_type = DBType}}.
depends(ServerHost, Opts) ->
Host = gen_mod:get_opt_host(ServerHost, Opts, <<"pubsub.@HOST@">>),
@@ -414,94 +394,6 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) ->
TreePlugin:terminate(Host, ServerHost),
ok.
-get_subscribed(User, Server) ->
- Items = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]),
- lists:filtermap(
- fun(#roster{jid = LJID, subscription = Sub})
- when Sub == both orelse Sub == from ->
- {true, LJID};
- (_) ->
- false
- end, Items).
-
-send_loop(State) ->
- receive
- {presence, JID, _Pid} ->
- ServerHost = State#state.server_host,
- Host = host(State#state.server_host),
- DBType = State#state.db_type,
- LJID = jid:tolower(JID),
- BJID = jid:remove_resource(LJID),
- lists:foreach(
- fun(PType) ->
- Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID),
- lists:foreach(
- fun({NodeRec, _, _, SubJID}) ->
- {_, Node} = NodeRec#pubsub_node.nodeid,
- Nidx = NodeRec#pubsub_node.id,
- Options = NodeRec#pubsub_node.options,
- [send_items(Host, Node, Nidx, PType, Options, SubJID, last)
- || NodeRec#pubsub_node.type == PType]
- end,
- lists:usort(Subs))
- end,
- State#state.plugins),
- if not State#state.ignore_pep_from_offline ->
- {User, Server, Resource} = LJID,
- Contacts = get_subscribed(User, Server),
- lists:foreach(
- fun({U, S, R}) when S == ServerHost ->
- case user_resources(U, S) of
- [] -> %% offline
- PeerJID = jid:make(U, S, R),
- self() ! {presence, User, Server, [Resource], PeerJID};
- _ -> %% online
- %% this is already handled by presence probe
- ok
- end;
- (_) ->
- %% we can not do anything in any cases
- ok
- end, Contacts);
- true ->
- ok
- end,
- send_loop(State);
- {presence, User, Server, Resources, JID} ->
- spawn(fun() ->
- Host = host(State#state.server_host),
- Owner = jid:remove_resource(jid:tolower(JID)),
- lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) ->
- case match_option(Options, send_last_published_item, on_sub_and_presence) of
- true ->
- lists:foreach(fun(Resource) ->
- LJID = {User, Server, Resource},
- Subscribed = case get_option(Options, access_model) of
- open -> true;
- presence -> true;
- whitelist -> false; % subscribers are added manually
- authorize -> false; % likewise
- roster ->
- Grps = get_option(Options, roster_groups_allowed, []),
- {OU, OS, _} = Owner,
- element(2, get_roster_info(OU, OS, LJID, Grps))
- end,
- if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, LJID, last);
- true -> ok
- end
- end,
- Resources);
- _ ->
- ok
- end
- end,
- tree_action(Host, get_nodes, [Owner, JID]))
- end),
- send_loop(State);
- stop ->
- ok
- end.
-
%% -------
%% disco hooks handling functions
%%
@@ -660,12 +552,12 @@ disco_items(Host, Node, From) ->
end.
%% -------
-%% presence hooks handling functions
+%% presence and session hooks handling functions
%%
-spec caps_add(jid(), jid(), [binary()]) -> ok.
-caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features)
- when Host =/= S ->
+caps_add(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features)
+ when S1 =/= S2 ->
%% When a remote contact goes online while the local user is offline, the
%% remote contact won't receive last items from the local user even if
%% ignore_pep_from_offline is set to false. To work around this issue a bit,
@@ -675,30 +567,36 @@ caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID
%% contact becomes available; the former is also executed when the local
%% user goes online (because that triggers the contact to send a presence
%% packet with CAPS).
- presence(Host, {presence, U, S, [R], JID});
+ send_last_pep(To, From);
caps_add(_From, _To, _Feature) ->
ok.
-spec caps_update(jid(), jid(), [binary()]) -> ok.
-caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) ->
- presence(Host, {presence, U, S, [R], JID}).
+caps_update(From, To, _Features) ->
+ send_last_pep(To, From).
-spec presence_probe(jid(), jid(), pid()) -> ok.
presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) ->
%% ignore presence_probe from my other ressources
- %% to not get duplicated last items
ok;
-presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, Pid) ->
- presence(S, {presence, From, Pid}),
- presence(S, {presence, From#jid.luser, S, [From#jid.lresource], To});
+presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, _Pid) ->
+ send_last_pep(To, From);
presence_probe(_From, _To, _Pid) ->
- %% ignore presence_probe from remote contacts,
- %% those are handled via caps_add
+ %% ignore presence_probe from remote contacts, those are handled via caps_add
ok.
-presence(ServerHost, Presence) ->
- gen_mod:get_module_proc(ServerHost, ?LOOPNAME) ! Presence,
- ok.
+-spec on_user_online(ejabberd_c2s:state()) -> ejabberd_c2s:state().
+on_user_online(C2SState) ->
+ JID = maps:get(jid, C2SState),
+ send_last_items(JID),
+ C2SState.
+
+-spec on_user_offline(ejabberd_c2s:state(), atom()) -> ejabberd_c2s:state().
+on_user_offline(#{jid := JID} = C2SState, _Reason) ->
+ purge_offline(jid:tolower(JID)),
+ C2SState;
+on_user_offline(C2SState, _Reason) ->
+ C2SState.
%% -------
%% subscription hooks handling functions
@@ -707,14 +605,8 @@ presence(ServerHost, Presence) ->
-spec out_subscription(
binary(), binary(), jid(),
subscribed | unsubscribed | subscribe | unsubscribe) -> boolean().
-out_subscription(User, Server, JID, subscribed) ->
- Owner = jid:make(User, Server),
- {PUser, PServer, PResource} = jid:tolower(JID),
- PResources = case PResource of
- <<>> -> user_resources(PUser, PServer);
- _ -> [PResource]
- end,
- presence(Server, {presence, PUser, PServer, PResources, Owner}),
+out_subscription(User, Server, To, subscribed) ->
+ send_last_pep(jid:make(User, Server), To),
true;
out_subscription(_, _, _, _) ->
true.
@@ -883,7 +775,9 @@ terminate(_Reason,
false ->
ok
end,
- ejabberd_hooks:delete(sm_remove_connection_hook, ServerHost,
+ ejabberd_hooks:delete(c2s_session_opened, ServerHost,
+ ?MODULE, on_user_online, 75),
+ ejabberd_hooks:delete(c2s_terminated, ServerHost,
?MODULE, on_user_offline, 75),
ejabberd_hooks:delete(disco_local_identity, ServerHost,
?MODULE, disco_local_identity, 75),
@@ -901,12 +795,6 @@ terminate(_Reason,
?MODULE, remove_user, 50),
ejabberd_hooks:delete(c2s_handle_info, ServerHost,
?MODULE, c2s_handle_info, 50),
- case whereis(gen_mod:get_module_proc(ServerHost, ?LOOPNAME)) of
- undefined ->
- ?ERROR_MSG("~s process is dead, pubsub was broken", [?LOOPNAME]);
- Pid ->
- Pid ! stop
- end,
lists:foreach(
fun(Host) ->
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
@@ -1797,7 +1685,7 @@ subscribe_node(Host, Node, From, JID, Configuration) ->
Nidx = TNode#pubsub_node.id,
Type = TNode#pubsub_node.type,
Options = TNode#pubsub_node.options,
- send_items(Host, Node, Nidx, Type, Options, Subscriber, last),
+ send_items(Host, Node, Nidx, Type, Options, Subscriber, 1),
ServerHost = serverhost(Host),
ejabberd_hooks:run(pubsub_subscribe_node, ServerHost,
[ServerHost, Host, Node, Subscriber, SubId]),
@@ -2069,8 +1957,6 @@ purge_node(Host, Node, Owner) ->
%% @doc <p>Return the items of a given node.</p>
%% <p>The number of items to return is limited by MaxItems.</p>
%% <p>The permission are not checked in this function.</p>
-%% @todo We probably need to check that the user doing the query has the right
-%% to read the items.
-spec get_items(host(), binary(), jid(), binary(),
binary(), [binary()], undefined | rsm_set()) ->
{result, pubsub()} | {error, stanza_error()}.
@@ -2153,43 +2039,23 @@ get_allowed_items_call(Host, Nidx, From, Type, Options, Owners, RSM) ->
{PS, RG} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups),
node_call(Host, Type, get_items, [Nidx, From, AccessModel, PS, RG, undefined, RSM]).
-get_last_items(Host, Type, Nidx, LJID, Count) ->
+get_last_items(Host, Type, Nidx, LJID, 1) ->
case get_cached_item(Host, Nidx) of
undefined ->
- case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
+ case node_action(Host, Type, get_last_items, [Nidx, LJID, 1]) of
{result, Items} -> Items;
- _ -> []
+ _ -> []
end;
LastItem ->
[LastItem]
- end.
-
-%% @doc <p>Resend the items of a node to the user.</p>
-%% @todo use cache-last-item feature
-send_items(Host, Node, Nidx, Type, Options, LJID, last) ->
- case get_last_items(Host, Type, Nidx, LJID, 1) of
- [LastItem] ->
- Stanza = items_event_stanza(Node, Options, [LastItem]),
- dispatch_items(Host, LJID, Node, Stanza);
- _ ->
- ok
end;
-send_items(Host, Node, Nidx, Type, Options, LJID, Number) when Number > 0 ->
- Stanza = items_event_stanza(Node, Options, get_last_items(Host, Type, Nidx, Number, LJID)),
- dispatch_items(Host, LJID, Node, Stanza);
-send_items(Host, Node, _Nidx, _Type, Options, LJID, _) ->
- Stanza = items_event_stanza(Node, Options, []),
- dispatch_items(Host, LJID, Node, Stanza).
-
-dispatch_items({FromU, FromS, FromR}, To, Node, Stanza) ->
- SenderResource = user_resource(FromU, FromS, FromR),
- ejabberd_sm:route(jid:make(FromU, FromS, SenderResource),
- {send_filtered, {pep_message, <<((Node))/binary, "+notify">>},
- jid:make(FromU, FromS), jid:make(To),
- Stanza});
-dispatch_items(From, To, _Node, Stanza) ->
- ejabberd_router:route(
- xmpp:set_from_to(Stanza, service_jid(From), jid:make(To))).
+get_last_items(Host, Type, Nidx, LJID, Count) when Count > 1 ->
+ case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
+ {result, Items} -> Items;
+ _ -> []
+ end;
+get_last_items(_Host, _Type, _Nidx, _LJID, _Count) ->
+ [].
%% @doc <p>Return the list of affiliations as an XMPP response.</p>
-spec get_affiliations(host(), binary(), jid(), [binary()]) ->
@@ -2536,14 +2402,15 @@ get_subscriptions_for_send_last(Host, PType, sql, JID, LJID, BJID) ->
{result, Subs} = node_action(Host, PType,
get_entity_subscriptions_for_send_last,
[Host, JID]),
- [{Node, Sub, SubId, SubJID}
+ [{Node, SubId, SubJID}
|| {Node, Sub, SubId, SubJID} <- Subs,
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)];
+ % sql version already filter result by on_sub_and_presence
get_subscriptions_for_send_last(Host, PType, _, JID, LJID, BJID) ->
{result, Subs} = node_action(Host, PType,
get_entity_subscriptions,
[Host, JID]),
- [{Node, Sub, SubId, SubJID}
+ [{Node, SubId, SubJID}
|| {Node, Sub, SubId, SubJID} <- Subs,
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID),
match_option(Node, send_last_published_item, on_sub_and_presence)].
@@ -2932,8 +2799,9 @@ get_options_for_subs(Host, Nidx, Subs, true) ->
broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
NotificationType = get_option(NodeOptions, notification_type, headline),
BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull
- From = service_jid(Host),
- Stanza = add_message_type(BaseStanza, NotificationType),
+ Stanza = add_message_type(
+ xmpp:set_from(BaseStanza, service_jid(Host)),
+ NotificationType),
%% Handles explicit subscriptions
SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth),
lists:foreach(fun ({LJID, _NodeName, SubIDs}) ->
@@ -2956,7 +2824,7 @@ broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType
end,
lists:foreach(fun(To) ->
ejabberd_router:route(
- xmpp:set_from_to(StanzaToSend, From, jid:make(To)))
+ xmpp:set_to(StanzaToSend, jid:make(To)))
end, LJIDs)
end, SubIDsByJID).
@@ -2965,55 +2833,142 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, Nidx, Type, NodeO
%% Handles implicit presence subscriptions
SenderResource = user_resource(LUser, LServer, LResource),
NotificationType = get_option(NodeOptions, notification_type, headline),
- Stanza = add_message_type(BaseStanza, NotificationType),
+ Stanza = add_message_type(
+ xmpp:set_from(BaseStanza, jid:make(LUser, LServer)),
+ NotificationType),
%% set the from address on the notification to the bare JID of the account owner
%% Also, add "replyto" if entity has presence subscription to the account owner
%% See XEP-0163 1.1 section 4.3.1
ejabberd_sm:route(jid:make(LUser, LServer, SenderResource),
{pep_message, <<((Node))/binary, "+notify">>,
- jid:make(LUser, LServer),
add_extended_headers(
Stanza, extended_headers([Publisher]))});
broadcast_stanza(Host, _Publisher, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
broadcast_stanza(Host, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM).
-spec c2s_handle_info(ejabberd_c2s:state(), term()) -> ejabberd_c2s:state().
-c2s_handle_info(#{server := Server} = C2SState,
- {pep_message, Feature, From, Packet}) ->
- LServer = jid:nameprep(Server),
- lists:foreach(
- fun({USR, Caps}) ->
- Features = mod_caps:get_features(LServer, Caps),
- case lists:member(Feature, Features) of
- true ->
- To = jid:make(USR),
- NewPacket = xmpp:set_from_to(Packet, From, To),
- ejabberd_router:route(NewPacket);
- false ->
- ok
- end
- end, mod_caps:list_features(C2SState)),
+c2s_handle_info(#{lserver := LServer} = C2SState,
+ {pep_message, Feature, Packet}) ->
+ [maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet)
+ || {USR, Caps} <- mod_caps:list_features(C2SState)],
{stop, C2SState};
-c2s_handle_info(#{server := Server} = C2SState,
- {send_filtered, {pep_message, Feature}, From, To, Packet}) ->
- LServer = jid:nameprep(Server),
- case mod_caps:get_user_caps(To, C2SState) of
- {ok, Caps} ->
- Features = mod_caps:get_features(LServer, Caps),
- case lists:member(Feature, Features) of
- true ->
- NewPacket = xmpp:set_from_to(Packet, From, To),
- ejabberd_router:route(NewPacket);
- false ->
- ok
- end;
- error ->
- ok
+c2s_handle_info(#{lserver := LServer} = C2SState,
+ {pep_message, Feature, Packet, USR}) ->
+ case mod_caps:get_user_caps(USR, C2SState) of
+ {ok, Caps} -> maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet);
+ error -> ok
end,
{stop, C2SState};
c2s_handle_info(C2SState, _) ->
C2SState.
+send_items(Host, Node, Nidx, Type, Options, LJID, Number) ->
+ send_items(Host, Node, Nidx, Type, Options, Host, LJID, LJID, Number).
+send_items(Host, Node, Nidx, Type, Options, Publisher, SubLJID, ToLJID, Number) ->
+ case get_last_items(Host, Type, Nidx, SubLJID, Number) of
+ [] ->
+ ok;
+ Items ->
+ Stanza = items_event_stanza(Node, Options, Items),
+ send_stanza(Publisher, ToLJID, Node, Stanza)
+ end.
+
+send_stanza({LUser, LServer, _} = Publisher, USR, Node, BaseStanza) ->
+ Stanza = xmpp:set_from(BaseStanza, jid:make(LUser, LServer)),
+ USRs = case USR of
+ {PUser, PServer, <<>>} ->
+ [{PUser, PServer, PRessource}
+ || PRessource <- user_resources(PUser, PServer)];
+ _ ->
+ [USR]
+ end,
+ [ejabberd_sm:route(jid:make(Publisher),
+ {pep_message, <<((Node))/binary, "+notify">>,
+ add_extended_headers(
+ Stanza, extended_headers([Publisher])),
+ To}) || To <- USRs];
+send_stanza(Host, USR, _Node, Stanza) ->
+ ejabberd_router:route(
+ xmpp:set_from_to(Stanza, service_jid(Host), jid:make(USR))).
+
+maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet) ->
+ Features = mod_caps:get_features(LServer, Caps),
+ case lists:member(Feature, Features) of
+ true ->
+ ejabberd_router:route(xmpp:set_to(Packet, jid:make(USR)));
+ false ->
+ ok
+ end.
+
+send_last_items(JID) ->
+ ServerHost = JID#jid.lserver,
+ Host = host(ServerHost),
+ DBType = config(ServerHost, db_type),
+ LJID = jid:tolower(JID),
+ BJID = jid:remove_resource(LJID),
+ lists:foreach(
+ fun(PType) ->
+ Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID),
+ lists:foreach(
+ fun({#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx,
+ options = Options}, _, SubJID})
+ when Type == PType->
+ send_items(Host, Node, Nidx, PType, Options, Host, SubJID, LJID, 1);
+ (_) ->
+ ok
+ end,
+ lists:usort(Subs))
+ end, config(ServerHost, plugins)).
+% pep_from_offline hack can not work anymore, as sender c2s does not
+% exists when sender is offline, so we can't get match receiver caps
+% does it make sens to send PEP from an offline contact anyway ?
+% case config(ServerHost, ignore_pep_from_offline) of
+% false ->
+% Roster = ejabberd_hooks:run_fold(roster_get, ServerHost, [],
+% [{JID#jid.luser, ServerHost}]),
+% lists:foreach(
+% fun(#roster{jid = {U, S, R}, subscription = Sub})
+% when Sub == both orelse Sub == from,
+% S == ServerHost ->
+% case user_resources(U, S) of
+% [] -> send_last_pep(jid:make(U, S, R), JID);
+% _ -> ok %% this is already handled by presence probe
+% end;
+% (_) ->
+% ok %% we can not do anything in any cases
+% end, Roster);
+% true ->
+% ok
+% end.
+send_last_pep(From, To) ->
+ ServerHost = From#jid.lserver,
+ Host = host(ServerHost),
+ Publisher = jid:tolower(From),
+ Owner = jid:remove_resource(Publisher),
+ lists:foreach(
+ fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) ->
+ case match_option(Options, send_last_published_item, on_sub_and_presence) of
+ true ->
+ LJID = jid:tolower(To),
+ Subscribed = case get_option(Options, access_model) of
+ open -> true;
+ presence -> true;
+ whitelist -> false; % subscribers are added manually
+ authorize -> false; % likewise
+ roster ->
+ Grps = get_option(Options, roster_groups_allowed, []),
+ {OU, OS, _} = Owner,
+ element(2, get_roster_info(OU, OS, LJID, Grps))
+ end,
+ if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, Publisher, LJID, LJID, 1);
+ true -> ok
+ end;
+ _ ->
+ ok
+ end
+ end,
+ tree_action(Host, get_nodes, [Owner, From])).
+
subscribed_nodes_by_jid(NotifyType, SubsByDepth) ->
NodesToDeliver = fun (Depth, Node, Subs, Acc) ->
NodeName = case Node#pubsub_node.nodeid of
@@ -3186,9 +3141,6 @@ node_owners_call(_Host, _Type, _Nidx, Owners) ->
%% @doc <p>Return the maximum number of items for a given node.</p>
%% <p>Unlimited means that there is no limit in the number of items that can
%% be stored.</p>
-%% @todo In practice, the current data structure means that we cannot manage
-%% millions of items on a given node. This should be addressed in a new
-%% version.
-spec max_items(host(), [{atom(), any()}]) -> non_neg_integer().
max_items(Host, Options) ->
case get_option(Options, persist_items) of
@@ -3792,14 +3744,6 @@ subid_shim(SubIds) ->
extended_headers(Jids) ->
[#address{type = replyto, jid = Jid} || Jid <- Jids].
--spec on_user_offline(ejabberd_sm:sid(), jid(), ejabberd_sm:info()) -> ok.
-on_user_offline(_, JID, _) ->
- {User, Server, Resource} = jid:tolower(JID),
- case user_resources(User, Server) of
- [] -> purge_offline({User, Server, Resource});
- _ -> ok
- end.
-
-spec purge_offline(ljid()) -> ok.
purge_offline(LJID) ->
Host = host(element(2, LJID)),
@@ -3840,7 +3784,7 @@ purge_offline(LJID) ->
end
end, lists:usort(lists:flatten(Affs)));
{Error, _} ->
- ?DEBUG("on_user_offline ~p", [Error])
+ ?ERROR_MSG("can not purge offline: ~p", [Error])
end.
-spec purge_offline(host(), ljid(), binary()) -> ok | {error, stanza_error()}.
@@ -3852,13 +3796,13 @@ purge_offline(Host, LJID, Node) ->
{result, {[], _}} ->
ok;
{result, {Items, _}} ->
- {User, Server, _} = LJID,
+ {User, Server, Resource} = LJID,
PublishModel = get_option(Options, publish_model),
ForceNotify = get_option(Options, notify_retract),
{_, NodeId} = Node#pubsub_node.nodeid,
lists:foreach(fun
- (#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, _}}})
- when (U == User) and (S == Server) ->
+ (#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, R}}})
+ when (U == User) and (S == Server) and (R == Resource) ->
case node_action(Host, Type, delete_item, [Nidx, {U, S, <<>>}, PublishModel, ItemId]) of
{result, {_, broadcast}} ->
broadcast_retract_items(Host, NodeId, Nidx, Type, Options, [ItemId], ForceNotify),