Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ejabberd_c2s.erl82
-rw-r--r--src/mod_client_state.erl107
2 files changed, 104 insertions, 85 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 080880bec..5e30d5ffc 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -104,7 +104,6 @@
ip,
aux_fields = [],
csi_state = active,
- csi_queue = [],
mgmt_state,
mgmt_xmlns,
mgmt_queue,
@@ -1147,7 +1146,7 @@ session_established({xmlstreamelement,
#xmlel{name = <<"active">>,
attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}]}},
StateData) ->
- NewStateData = csi_queue_flush(StateData),
+ NewStateData = csi_flush_queue(StateData),
fsm_next_state(session_established, NewStateData#state{csi_state = active});
session_established({xmlstreamelement,
#xmlel{name = <<"inactive">>,
@@ -2763,7 +2762,7 @@ handle_resume(StateData, Attrs) ->
#xmlel{name = <<"r">>,
attrs = [{<<"xmlns">>, AttrXmlns}],
children = []}),
- FlushedState = csi_queue_flush(NewState),
+ FlushedState = csi_flush_queue(NewState),
NewStateData = FlushedState#state{csi_state = active},
?INFO_MSG("Resumed session for ~s",
[jid:to_string(NewStateData#state.jid)]),
@@ -2995,7 +2994,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
privacy_list = OldStateData#state.privacy_list,
aux_fields = OldStateData#state.aux_fields,
csi_state = OldStateData#state.csi_state,
- csi_queue = OldStateData#state.csi_queue,
mgmt_xmlns = OldStateData#state.mgmt_xmlns,
mgmt_queue = OldStateData#state.mgmt_queue,
mgmt_timeout = OldStateData#state.mgmt_timeout,
@@ -3028,65 +3026,25 @@ add_resent_delay_info(#state{server = From}, El, Time) ->
%%% XEP-0352
%%%----------------------------------------------------------------------
-csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData,
+csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData,
Stanza) ->
- Action = ejabberd_hooks:run_fold(csi_filter_stanza,
- StateData#state.server,
- send, [Stanza]),
- ?DEBUG("Going to ~p stanza for inactive client ~p",
- [Action, jid:to_string(JID)]),
- case Action of
- queue -> csi_queue_add(StateData, Stanza);
- drop -> StateData;
- send ->
- From = fxml:get_tag_attr_s(<<"from">>, Stanza),
- StateData1 = csi_queue_send(StateData, From),
- StateData2 = send_stanza(StateData1#state{csi_state = active},
- Stanza),
- StateData2#state{csi_state = CsiState}
- end.
-
-csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) ->
- case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of
- true -> csi_queue_add(csi_queue_flush(StateData), Stanza);
- false ->
- From = fxml:get_tag_attr_s(<<"from">>, Stanza),
- NewQueue = lists:keystore(From, 1, Queue, {From, p1_time_compat:timestamp(), Stanza}),
- StateData#state{csi_queue = NewQueue}
- end.
-
-csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} =
- StateData, From) ->
- case lists:keytake(From, 1, Queue) of
- {value, {From, Time, Stanza}, NewQueue} ->
- NewStanza = jlib:add_delay_info(Stanza, Host, Time,
- <<"Client Inactive">>),
- NewStateData = send_stanza(StateData#state{csi_state = active},
- NewStanza),
- NewStateData#state{csi_queue = NewQueue, csi_state = CsiState};
- false -> StateData
- end.
-
-csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID,
- server = Host} = StateData) ->
- ?DEBUG("Flushing CSI queue for ~s", [jid:to_string(JID)]),
- NewStateData =
- lists:foldl(fun({_From, Time, Stanza}, AccState) ->
- NewStanza =
- jlib:add_delay_info(Stanza, Host, Time,
- <<"Client Inactive">>),
- send_stanza(AccState, NewStanza)
- end, StateData#state{csi_state = active}, Queue),
- NewStateData#state{csi_queue = [], csi_state = CsiState}.
-
-%% Make sure we won't push too many messages to the XEP-0198 queue when the
-%% client becomes 'active' again. Otherwise, the client might not manage to
-%% acknowledge the message flood in time. Also, don't let the queue grow to
-%% more than 100 stanzas.
-csi_max_queue(#state{mgmt_max_queue = infinity}) -> 100;
-csi_max_queue(#state{mgmt_max_queue = Max}) when Max > 200 -> 100;
-csi_max_queue(#state{mgmt_max_queue = Max}) when Max < 2 -> 1;
-csi_max_queue(#state{mgmt_max_queue = Max}) -> Max div 2.
+ {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server,
+ {StateData, [Stanza]},
+ [Server, Stanza]),
+ StateData2 = lists:foldl(fun(CurStanza, AccState) ->
+ send_stanza(AccState, CurStanza)
+ end, StateData1#state{csi_state = active},
+ Stanzas),
+ StateData2#state{csi_state = CsiState}.
+
+csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) ->
+ {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server,
+ {StateData, []}, [Server]),
+ StateData2 = lists:foldl(fun(CurStanza, AccState) ->
+ send_stanza(AccState, CurStanza)
+ end, StateData1#state{csi_state = active},
+ Stanzas),
+ StateData2#state{csi_state = CsiState}.
%%%----------------------------------------------------------------------
%%% JID Set memory footprint reduction code
diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl
index 790e808f1..7d23beb0d 100644
--- a/src/mod_client_state.erl
+++ b/src/mod_client_state.erl
@@ -31,20 +31,22 @@
-behavior(gen_mod).
-export([start/2, stop/1, add_stream_feature/2,
- filter_presence/2, filter_chat_states/2,
+ filter_presence/3, filter_chat_states/3, filter_other/3, flush_queue/2,
mod_opt_type/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
-include("jlib.hrl").
+-define(CSI_QUEUE_MAX, 100).
+
start(Host, Opts) ->
QueuePresence = gen_mod:get_opt(queue_presence, Opts,
fun(B) when is_boolean(B) -> B end,
true),
DropChatStates = gen_mod:get_opt(drop_chat_states, Opts,
- fun(B) when is_boolean(B) -> B end,
- true),
+ fun(B) when is_boolean(B) -> B end,
+ true),
if QueuePresence; DropChatStates ->
ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE,
add_stream_feature, 50),
@@ -57,10 +59,13 @@ start(Host, Opts) ->
ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
filter_chat_states, 50);
true -> ok
- end;
+ end,
+ ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
+ filter_other, 100),
+ ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE,
+ flush_queue, 50);
true -> ok
- end,
- ok.
+ end.
stop(Host) ->
QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence,
@@ -81,10 +86,13 @@ stop(Host) ->
ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
filter_chat_states, 50);
true -> ok
- end;
+ end,
+ ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
+ filter_other, 100),
+ ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE,
+ flush_queue, 50);
true -> ok
- end,
- ok.
+ end.
add_stream_feature(Features, _Host) ->
Feature = #xmlel{name = <<"csi">>,
@@ -92,30 +100,83 @@ add_stream_feature(Features, _Host) ->
children = []},
[Feature | Features].
-filter_presence(_Action, #xmlel{name = <<"presence">>, attrs = Attrs}) ->
+filter_presence({C2SState, _OutStanzas} = Acc, Host,
+ #xmlel{name = <<"presence">>, attrs = Attrs} = Stanza) ->
case fxml:get_attr(<<"type">>, Attrs) of
{value, Type} when Type /= <<"unavailable">> ->
- ?DEBUG("Got important presence stanza", []),
- {stop, send};
+ Acc;
_ ->
?DEBUG("Got availability presence stanza", []),
- {stop, queue}
+ queue_add(presence, Stanza, Host, C2SState)
end;
-filter_presence(Action, _Stanza) -> Action.
+filter_presence(Acc, _Host, _Stanza) -> Acc.
-filter_chat_states(_Action, #xmlel{name = <<"message">>} = Stanza) ->
+filter_chat_states({C2SState, _OutStanzas} = Acc, _Host,
+ #xmlel{name = <<"message">>} = Stanza) ->
case jlib:is_standalone_chat_state(Stanza) of
- true ->
+ true -> % Drop the stanza.
?DEBUG("Got standalone chat state notification", []),
- {stop, drop};
+ {stop, {C2SState, []}};
false ->
- ?DEBUG("Got message stanza", []),
- {stop, send}
+ Acc
end;
-filter_chat_states(Action, _Stanza) -> Action.
+filter_chat_states(Acc, _Host, _Stanza) -> Acc.
+
+filter_other({C2SState, _OutStanzas}, Host, Stanza) ->
+ ?DEBUG("Won't add stanza to CSI queue", []),
+ queue_take(Stanza, Host, C2SState).
+
+flush_queue({C2SState, _OutStanzas}, Host) ->
+ ?DEBUG("Going to flush CSI queue", []),
+ Queue = get_queue(C2SState),
+ NewState = set_queue([], C2SState),
+ {stop, {NewState, get_stanzas(Queue, Host)}}.
+
+queue_add(Type, Stanza, Host, C2SState) ->
+ case get_queue(C2SState) of
+ Queue when length(Queue) >= ?CSI_QUEUE_MAX ->
+ ?DEBUG("CSI queue too large, going to flush it", []),
+ NewState = set_queue([], C2SState),
+ {stop, {NewState, get_stanzas(Queue, Host) ++ [Stanza]}};
+ Queue ->
+ ?DEBUG("Adding stanza to CSI queue", []),
+ From = fxml:get_tag_attr_s(<<"from">>, Stanza),
+ Key = {jid:tolower(jid:from_string(From)), Type},
+ Entry = {Key, p1_time_compat:timestamp(), Stanza},
+ NewQueue = lists:keystore(Key, 1, Queue, Entry),
+ NewState = set_queue(NewQueue, C2SState),
+ {stop, {NewState, []}}
+ end.
+
+queue_take(Stanza, Host, C2SState) ->
+ From = fxml:get_tag_attr_s(<<"from">>, Stanza),
+ {LUser, LServer, _LResource} = jid:tolower(jid:from_string(From)),
+ {Selected, Rest} = lists:partition(
+ fun({{{U, S, _R}, _Type}, _Time, _Stanza}) ->
+ U == LUser andalso S == LServer
+ end, get_queue(C2SState)),
+ NewState = set_queue(Rest, C2SState),
+ {stop, {NewState, get_stanzas(Selected, Host) ++ [Stanza]}}.
+
+set_queue(Queue, C2SState) ->
+ ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState).
+
+get_queue(C2SState) ->
+ case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of
+ {ok, Queue} ->
+ Queue;
+ error ->
+ []
+ end.
+
+get_stanzas(Queue, Host) ->
+ lists:map(fun({_Key, Time, Stanza}) ->
+ jlib:add_delay_info(Stanza, Host, Time,
+ <<"Client Inactive">>)
+ end, Queue).
-mod_opt_type(drop_chat_states) ->
- fun(B) when is_boolean(B) -> B end;
mod_opt_type(queue_presence) ->
fun(B) when is_boolean(B) -> B end;
-mod_opt_type(_) -> [drop_chat_states, queue_presence].
+mod_opt_type(drop_chat_states) ->
+ fun(B) when is_boolean(B) -> B end;
+mod_opt_type(_) -> [queue_presence, drop_chat_states].