Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r--src/ejabberd_c2s.erl88
1 files changed, 76 insertions, 12 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 73cc57247..226c5e0da 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -32,6 +32,7 @@
-protocol({xep, 78, '2.5'}).
-protocol({xep, 138, '2.0'}).
-protocol({xep, 198, '1.3'}).
+-protocol({xep, 356, '7.1'}).
-update_info({update, 0}).
@@ -48,6 +49,7 @@
send_element/2,
socket_type/0,
get_presence/1,
+ get_last_presence/1,
get_aux_field/2,
set_aux_field/3,
del_aux_field/2,
@@ -116,9 +118,12 @@
mgmt_pending_since,
mgmt_timeout,
mgmt_max_timeout,
+ mgmt_ack_timeout,
+ mgmt_ack_timer,
mgmt_resend,
mgmt_stanzas_in = 0,
mgmt_stanzas_out = 0,
+ mgmt_stanzas_req = 0,
ask_offline = true,
lang = <<"">>}).
@@ -217,6 +222,9 @@ socket_type() -> xml_stream.
get_presence(FsmRef) ->
(?GEN_FSM):sync_send_all_state_event(FsmRef,
{get_presence}, 1000).
+get_last_presence(FsmRef) ->
+ (?GEN_FSM):sync_send_all_state_event(FsmRef,
+ {get_last_presence}, 1000).
get_aux_field(Key, #state{aux_fields = Opts}) ->
case lists:keysearch(Key, 1, Opts) of
@@ -329,13 +337,18 @@ init([{SockMod, Socket}, Opts]) ->
_ -> 1000
end,
ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
- Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
+ RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo;
_ -> 300
end,
MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of
Max when is_integer(Max), Max >= ResumeTimeout -> Max;
_ -> ResumeTimeout
end,
+ AckTimeout = case proplists:get_value(ack_timeout, Opts) of
+ ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000;
+ infinity -> undefined;
+ _ -> 60000
+ end,
ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of
Resend when is_boolean(Resend) -> Resend;
if_offline -> if_offline;
@@ -359,6 +372,7 @@ init([{SockMod, Socket}, Opts]) ->
mgmt_max_queue = MaxAckQueue,
mgmt_timeout = ResumeTimeout,
mgmt_max_timeout = MaxResumeTimeout,
+ mgmt_ack_timeout = AckTimeout,
mgmt_resend = ResendOnTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -1332,6 +1346,15 @@ handle_sync_event({get_presence}, _From, StateName,
Resource = StateData#state.resource,
Reply = {User, Resource, Show, Status},
fsm_reply(Reply, StateName, StateData);
+handle_sync_event({get_last_presence}, _From, StateName,
+ StateData) ->
+ User = StateData#state.user,
+ Server = StateData#state.server,
+ PresLast = StateData#state.pres_last,
+ Resource = StateData#state.resource,
+ Reply = {User, Server, Resource, PresLast},
+ fsm_reply(Reply, StateName, StateData);
+
handle_sync_event(get_subscribed, _From, StateName,
StateData) ->
Subscribed = (?SETS):to_list(StateData#state.pres_f),
@@ -1775,6 +1798,11 @@ handle_info({set_resume_timeout, Timeout}, StateName, StateData) ->
fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout});
handle_info(dont_ask_offline, StateName, StateData) ->
fsm_next_state(StateName, StateData#state{ask_offline = false});
+handle_info(close, StateName, StateData) ->
+ ?DEBUG("Timeout waiting for stream management acknowledgement of ~s",
+ [jid:to_string(StateData#state.jid)]),
+ close(self()),
+ fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined});
handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) ->
%% This happens if the resume_session/1 request timed out; the new session
%% now receives the late response.
@@ -1910,8 +1938,8 @@ send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive ->
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
mgmt_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
- NewStateData = send_stanza_and_ack_req(StateData, Stanza),
- mgmt_queue_add(NewStateData, Stanza);
+ NewStateData = mgmt_queue_add(StateData, Stanza),
+ mgmt_send_stanza(NewStateData, Stanza);
send_stanza(StateData, Stanza) ->
send_element(StateData, Stanza),
StateData.
@@ -2499,6 +2527,12 @@ fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined,
sid = SID, jid = JID, ip = IP,
conn = Conn, auth_module = AuthModule,
server = Host} = StateData) ->
+ case StateData of
+ #state{mgmt_ack_timer = undefined} ->
+ ok;
+ #state{mgmt_ack_timer = Timer} ->
+ erlang:cancel_timer(Timer)
+ end,
?INFO_MSG("Waiting for resumption of stream for ~s",
[jid:to_string(JID)]),
Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
@@ -2779,7 +2813,8 @@ handle_r(StateData) ->
handle_a(StateData, Attrs) ->
case catch jlib:binary_to_integer(fxml:get_attr_s(<<"h">>, Attrs)) of
H when is_integer(H), H >= 0 ->
- check_h_attribute(StateData, H);
+ NewStateData = check_h_attribute(StateData, H),
+ maybe_renew_ack_request(NewStateData);
_ ->
?DEBUG("Ignoring invalid ACK element from ~s",
[jid:to_string(StateData#state.jid)]),
@@ -2878,16 +2913,45 @@ update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El)
update_num_stanzas_in(StateData, _El) ->
StateData.
-send_stanza_and_ack_req(StateData, Stanza) ->
- AckReq = #xmlel{name = <<"r">>,
- attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
- children = []},
- case send_element(StateData, Stanza) == ok andalso
- send_element(StateData, AckReq) == ok of
+mgmt_send_stanza(StateData, Stanza) ->
+ case send_element(StateData, Stanza) of
+ ok ->
+ maybe_request_ack(StateData);
+ _ ->
+ StateData#state{mgmt_state = pending}
+ end.
+
+maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) ->
+ request_ack(StateData);
+maybe_request_ack(StateData) ->
+ StateData.
+
+request_ack(#state{mgmt_xmlns = Xmlns,
+ mgmt_ack_timeout = AckTimeout} = StateData) ->
+ AckReq = #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, Xmlns}]},
+ case {send_element(StateData, AckReq), AckTimeout} of
+ {ok, undefined} ->
+ ok;
+ {ok, Timeout} ->
+ Timer = erlang:send_after(Timeout, self(), close),
+ StateData#state{mgmt_ack_timer = Timer,
+ mgmt_stanzas_req = StateData#state.mgmt_stanzas_out};
+ _ ->
+ StateData#state{mgmt_state = pending}
+ end.
+
+maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) ->
+ StateData;
+maybe_renew_ack_request(#state{mgmt_ack_timer = Timer,
+ mgmt_queue = Queue,
+ mgmt_stanzas_out = NumStanzasOut,
+ mgmt_stanzas_req = NumStanzasReq} = StateData) ->
+ erlang:cancel_timer(Timer),
+ case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of
true ->
- StateData;
+ request_ack(StateData#state{mgmt_ack_timer = undefined});
false ->
- StateData#state{mgmt_state = pending}
+ StateData#state{mgmt_ack_timer = undefined}
end.
mgmt_queue_add(StateData, El) ->