Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaweł Chmielowski <pchmielowski@process-one.net>2020-04-07 15:51:49 +0300
committerPaweł Chmielowski <pchmielowski@process-one.net>2020-04-07 15:51:49 +0300
commit9bb3aee0e2f1649c45404725675c09d9548cb1b9 (patch)
treef3632e34dffe9dcda90f205b6c92706a5941bc9d
parent16585713f84937a26d14da6ad226148b594e96db (diff)
Make resumed sessions try to deliver possibly queued messages to new session
Between receiving resume request and being closed by new session, it's possible (even if not very likely) that new messages would arrive to process that is resumed. In that case try to reroute messages that were received after we sent resume reply to new process.
-rw-r--r--src/mod_stream_mgmt.erl19
1 files changed, 17 insertions, 2 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index fb1a609b1..0c73eb7f7 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -192,7 +192,7 @@ c2s_handle_recv(State, _, _) ->
c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
lang := Lang} = State, Pkt, SendResult)
- when MgmtState == pending; MgmtState == active ->
+ when MgmtState == pending; MgmtState == active; MgmtState == resumed ->
IsStanza = xmpp:is_stanza(Pkt),
case Pkt of
_ when IsStanza ->
@@ -214,6 +214,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
end;
#stream_error{} ->
case MgmtState of
+ resumed ->
+ State;
active ->
State;
pending ->
@@ -230,7 +232,7 @@ c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State,
{resume_session, Time}, From) ->
State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)},
Mod:reply(From, {resume, State1}),
- {stop, State#{mgmt_state => resumed}};
+ {stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}};
c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) ->
Mod:reply(From, {error, session_not_found}),
{stop, State};
@@ -282,6 +284,7 @@ c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason
[jid:encode(JID)]),
{U, S, R} = jid:tolower(JID),
ejabberd_sm:close_session(SID, U, S, R),
+ route_late_queue_after_resume(State),
ejabberd_c2s:bounce_message_queue(SID, JID),
{stop, State};
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
@@ -544,6 +547,18 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
State
end.
+-spec route_late_queue_after_resume(state()) -> ok.
+route_late_queue_after_resume(#{mgmt_queue := Queue, jid := JID})
+ when ?qlen(Queue) > 0 ->
+ ?DEBUG("Re-routing ~B late queued packets to ~ts",
+ [p1_queue:len(Queue), jid:encode(JID)]),
+ p1_queue:foreach(
+ fun({_, _Time, Pkt}) ->
+ ejabberd_router:route(Pkt)
+ end, Queue);
+route_late_queue_after_resume(_State) ->
+ ok.
+
-spec resend_unacked_stanzas(state()) -> state().
resend_unacked_stanzas(#{mgmt_state := MgmtState,
mgmt_queue := Queue,