Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-12-11 15:03:37 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-12-11 15:03:37 +0300
commit5cc8e807df6994fa6b0e860bbcfe0af8fa7fe19f (patch)
treef10816cf358fce8744f87e722667683a623e22ec /src/ejabberd_c2s.erl
parent23f70753131b42df0203703c29de92fa10513656 (diff)
Initial version of new XMPP stream behaviour (for review)
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r--src/ejabberd_c2s.erl3257
1 files changed, 551 insertions, 2706 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 6d84d8d93..1568d5db6 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -1,8 +1,5 @@
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_c2s.erl
-%%% Author : Alexey Shchepin <alexey@process-one.net>
-%%% Purpose : Serve C2S connection
-%%% Created : 16 Nov 2002 by Alexey Shchepin <alexey@process-one.net>
+%%%-------------------------------------------------------------------
+%%% Created : 8 Dec 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
@@ -21,1998 +18,534 @@
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
-%%%----------------------------------------------------------------------
-
+%%%-------------------------------------------------------------------
-module(ejabberd_c2s).
-
--behaviour(ejabberd_config).
-
--author('alexey@process-one.net').
-
--protocol({xep, 78, '2.5'}).
--protocol({xep, 138, '2.0'}).
--protocol({xep, 198, '1.3'}).
--protocol({xep, 356, '7.1'}).
-
--update_info({update, 0}).
-
--define(GEN_FSM, p1_fsm).
-
--behaviour(?GEN_FSM).
-
-%% External exports
--export([start/2,
- stop/1,
- start_link/2,
- close/1,
- send_text/2,
- send_element/2,
- socket_type/0,
- get_presence/1,
- get_last_presence/1,
- get_aux_field/2,
- set_aux_field/3,
- del_aux_field/2,
- get_subscription/2,
- get_queued_stanzas/1,
- get_csi_state/1,
- set_csi_state/2,
- get_resume_timeout/1,
- set_resume_timeout/2,
- send_filtered/5,
- broadcast/4,
- get_subscribed/1,
- transform_listen_option/2]).
-
--export([init/1, wait_for_stream/2, wait_for_auth/2,
- wait_for_feature_request/2, wait_for_bind/2,
- wait_for_sasl_response/2,
- wait_for_resume/2, session_established/2,
- handle_event/3, handle_sync_event/4, code_change/4,
- handle_info/3, terminate/3, print_state/1, opt_type/1]).
+-behaviour(xmpp_stream_in).
+
+-protocol({rfc, 6121}).
+
+%% ejabberd_socket callbacks
+-export([start/2, socket_type/0]).
+%% xmpp_stream_in callbacks
+-export([init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3]).
+-export([tls_options/1, tls_required/1, sasl_mechanisms/1, init_sasl/1, bind/2,
+ unauthenticated_stream_features/1, authenticated_stream_features/1,
+ handle_stream_start/1, handle_stream_end/1, handle_stream_close/1,
+ handle_unauthenticated_packet/2, handle_authenticated_packet/2,
+ handle_auth_success/4, handle_auth_failure/4,
+ handle_unbinded_packet/2]).
+%% API
+-export([get_presence/1, get_subscription/2, get_subscribed/1,
+ send/2, close/1]).
-include("ejabberd.hrl").
--include("logger.hrl").
-
-include("xmpp.hrl").
-%%-include("legacy.hrl").
-
--include("mod_privacy.hrl").
+-include("logger.hrl").
-define(SETS, gb_sets).
--define(DICT, dict).
-
-%% pres_a contains all the presence available send (either through roster mechanism or directed).
-%% Directed presence unavailable remove user from pres_a.
--record(state, {socket,
- sockmod,
- socket_monitor,
- xml_socket,
- streamid,
- sasl_state,
- access,
- shaper,
- zlib = false,
- tls = false,
- tls_required = false,
- tls_enabled = false,
- tls_options = [],
- authenticated = false,
- jid,
- user = <<"">>, server = <<"">>, resource = <<"">>,
- sid,
- pres_t = ?SETS:new(),
- pres_f = ?SETS:new(),
- pres_a = ?SETS:new(),
- pres_last,
- pres_timestamp,
- privacy_list = #userlist{},
- conn = unknown,
- auth_module = unknown,
- ip,
- aux_fields = [],
- csi_state = active,
- mgmt_state,
- mgmt_xmlns,
- mgmt_queue,
- mgmt_max_queue,
- mgmt_pending_since,
- mgmt_timeout,
- mgmt_max_timeout,
- mgmt_ack_timeout,
- mgmt_ack_timer,
- mgmt_resend,
- mgmt_stanzas_in = 0,
- mgmt_stanzas_out = 0,
- mgmt_stanzas_req = 0,
- ask_offline = true,
- lang = <<"">>}).
-
--type state_name() :: wait_for_stream | wait_for_auth |
- wait_for_feature_request | wait_for_bind |
- wait_for_sasl_response | wait_for_resume |
- session_established.
--type state() :: #state{}.
--type fsm_stop() :: {stop, normal, state()}.
--type fsm_next() :: {next_state, state_name(), state(), non_neg_integer()}.
--type fsm_reply() :: {reply, any(), state_name(), state(), non_neg_integer()}.
--type fsm_transition() :: fsm_stop() | fsm_next().
--export_type([state/0]).
-
-%-define(DBGFSM, true).
+%%-define(DBGFSM, true).
-ifdef(DBGFSM).
-
-define(FSMOPTS, [{debug, [trace]}]).
-
-else.
-
-define(FSMOPTS, []).
-
-endif.
-%% This is the timeout to apply between event when starting a new
-%% session:
--define(C2S_OPEN_TIMEOUT, 60000).
-
--define(C2S_HIBERNATE_TIMEOUT, ejabberd_config:get_option(c2s_hibernate, fun(X) when is_integer(X); X == hibernate-> X end, 90000)).
-
--define(STREAM_HEADER,
- <<"<?xml version='1.0'?><stream:stream "
- "xmlns='jabber:client' xmlns:stream='http://et"
- "herx.jabber.org/streams' id='~s' from='~s'~s"
- "~s>">>).
-
--define(STREAM_TRAILER, <<"</stream:stream>">>).
+-type state() :: map().
+-type next_state() :: {noreply, state()} | {stop, term(), state()}.
+-export_type([state/0, next_state/0]).
-%% XEP-0198:
-
--define(IS_STREAM_MGMT_PACKET(Pkt),
- is_record(Pkt, sm_enable) or
- is_record(Pkt, sm_resume) or
- is_record(Pkt, sm_a) or
- is_record(Pkt, sm_r)).
-
-%%%----------------------------------------------------------------------
-%%% API
-%%%----------------------------------------------------------------------
+%%%===================================================================
+%%% ejabberd_socket API
+%%%===================================================================
start(SockData, Opts) ->
- ?GEN_FSM:start(ejabberd_c2s,
- [SockData, Opts],
- fsm_limit_opts(Opts) ++ ?FSMOPTS).
-
-start_link(SockData, Opts) ->
- (?GEN_FSM):start_link(ejabberd_c2s,
- [SockData, Opts],
- fsm_limit_opts(Opts) ++ ?FSMOPTS).
-
-socket_type() -> xml_stream.
+ xmpp_stream_in:start(?MODULE, [SockData, Opts],
+ fsm_limit_opts(Opts) ++ ?FSMOPTS).
-%% Return Username, Resource and presence information
-get_presence(FsmRef) ->
- (?GEN_FSM):sync_send_all_state_event(FsmRef,
- {get_presence}, 1000).
-get_last_presence(FsmRef) ->
- (?GEN_FSM):sync_send_all_state_event(FsmRef,
- {get_last_presence}, 1000).
+socket_type() ->
+ xml_stream.
--spec get_aux_field(any(), state()) -> {ok, any()} | error.
-get_aux_field(Key, #state{aux_fields = Opts}) ->
- case lists:keyfind(Key, 1, Opts) of
- {_, Val} -> {ok, Val};
- false -> error
- end.
-
--spec set_aux_field(any(), any(), state()) -> state().
-set_aux_field(Key, Val,
- #state{aux_fields = Opts} = State) ->
- Opts1 = lists:keydelete(Key, 1, Opts),
- State#state{aux_fields = [{Key, Val} | Opts1]}.
-
--spec del_aux_field(any(), state()) -> state().
-del_aux_field(Key, #state{aux_fields = Opts} = State) ->
- Opts1 = lists:keydelete(Key, 1, Opts),
- State#state{aux_fields = Opts1}.
+-spec get_presence(pid()) -> presence().
+get_presence(Ref) ->
+ xmpp_stream_in:call(Ref, get_presence, 1000).
-spec get_subscription(jid() | ljid(), state()) -> both | from | to | none.
-get_subscription(From = #jid{}, StateData) ->
- get_subscription(jid:tolower(From), StateData);
-get_subscription(LFrom, StateData) ->
- LBFrom = setelement(3, LFrom, <<"">>),
- F = (?SETS):is_element(LFrom, StateData#state.pres_f)
- orelse
- (?SETS):is_element(LBFrom, StateData#state.pres_f),
- T = (?SETS):is_element(LFrom, StateData#state.pres_t)
- orelse
- (?SETS):is_element(LBFrom, StateData#state.pres_t),
+get_subscription(#jid{} = From, State) ->
+ get_subscription(jid:tolower(From), State);
+get_subscription(LFrom, #{pres_f := PresF, pres_t := PresT}) ->
+ LBFrom = jid:remove_resource(LFrom),
+ F = ?SETS:is_element(LFrom, PresF) orelse ?SETS:is_element(LBFrom, PresF),
+ T = ?SETS:is_element(LFrom, PresT) orelse ?SETS:is_element(LBFrom, PresT),
if F and T -> both;
F -> from;
T -> to;
true -> none
end.
-get_queued_stanzas(#state{mgmt_queue = Queue} = StateData) ->
- lists:map(fun({_N, Time, El}) ->
- add_resent_delay_info(StateData, El, Time)
- end, queue:to_list(Queue)).
-
-get_csi_state(#state{csi_state = CsiState}) ->
- CsiState.
-
-set_csi_state(#state{} = StateData, CsiState) ->
- StateData#state{csi_state = CsiState};
-set_csi_state(FsmRef, CsiState) ->
- FsmRef ! {set_csi_state, CsiState}.
-
-get_resume_timeout(#state{mgmt_timeout = Timeout}) ->
- Timeout.
-
-set_resume_timeout(#state{} = StateData, Timeout) ->
- StateData#state{mgmt_timeout = Timeout};
-set_resume_timeout(FsmRef, Timeout) ->
- FsmRef ! {set_resume_timeout, Timeout}.
-
--spec send_filtered(pid(), binary(), jid(), jid(), stanza()) -> any().
-send_filtered(FsmRef, Feature, From, To, Packet) ->
- FsmRef ! {send_filtered, Feature, From, To, Packet}.
-
--spec broadcast(pid(), any(), jid(), stanza()) -> any().
-broadcast(FsmRef, Type, From, Packet) ->
- FsmRef ! {broadcast, Type, From, Packet}.
-
--spec stop(pid()) -> any().
-stop(FsmRef) -> (?GEN_FSM):send_event(FsmRef, stop).
-
--spec close(pid()) -> any().
-%% What is the difference between stop and close???
-close(FsmRef) -> (?GEN_FSM):send_event(FsmRef, closed).
-
-%%%----------------------------------------------------------------------
-%%% Callback functions from gen_fsm
-%%%----------------------------------------------------------------------
-
-init([{SockMod, Socket}, Opts]) ->
- Access = gen_mod:get_opt(access, Opts,
- fun acl:access_rules_validator/1, all),
- Shaper = gen_mod:get_opt(shaper, Opts,
- fun acl:shaper_rules_validator/1, none),
- XMLSocket = case lists:keysearch(xml_socket, 1, Opts) of
- {value, {_, XS}} -> XS;
- _ -> false
- end,
- Zlib = proplists:get_bool(zlib, Opts),
- StartTLS = proplists:get_bool(starttls, Opts),
- StartTLSRequired = proplists:get_bool(starttls_required, Opts),
- TLSEnabled = proplists:get_bool(tls, Opts),
- TLS = StartTLS orelse
- StartTLSRequired orelse TLSEnabled,
- TLSOpts1 = lists:filter(fun ({certfile, _}) -> true;
- ({ciphers, _}) -> true;
- ({dhfile, _}) -> true;
- (_) -> false
- end,
- Opts),
- TLSOpts2 = case lists:keysearch(protocol_options, 1, Opts) of
- {value, {_, O}} ->
- [_|ProtocolOptions] = lists:foldl(
- fun(X, Acc) -> X ++ Acc end, [],
- [["|" | binary_to_list(Opt)] || Opt <- O, is_binary(Opt)]
- ),
- [{protocol_options, iolist_to_binary(ProtocolOptions)} | TLSOpts1];
- _ -> TLSOpts1
- end,
- TLSOpts3 = case proplists:get_bool(tls_compression, Opts) of
- false -> [compression_none | TLSOpts2];
- true -> TLSOpts2
- end,
- TLSOpts = [verify_none | TLSOpts3],
- StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
- StreamMgmtState = if StreamMgmtEnabled -> inactive;
- true -> disabled
- end,
- MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
- Limit when is_integer(Limit), Limit > 0 -> Limit;
- infinity -> infinity;
- _ -> 1000
- end,
- ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
- RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo;
- _ -> 300
- end,
- MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of
- Max when is_integer(Max), Max >= ResumeTimeout -> Max;
- _ -> ResumeTimeout
- end,
- AckTimeout = case proplists:get_value(ack_timeout, Opts) of
- ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000;
- infinity -> undefined;
- _ -> 60000
- end,
- ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of
- Resend when is_boolean(Resend) -> Resend;
- if_offline -> if_offline;
- _ -> false
- end,
- IP = peerip(SockMod, Socket),
- Socket1 = if TLSEnabled andalso
- SockMod /= ejabberd_frontend_socket ->
- SockMod:starttls(Socket, TLSOpts);
- true -> Socket
- end,
- SocketMonitor = SockMod:monitor(Socket1),
- StateData = #state{socket = Socket1, sockmod = SockMod,
- socket_monitor = SocketMonitor,
- xml_socket = XMLSocket, zlib = Zlib, tls = TLS,
- tls_required = StartTLSRequired,
- tls_enabled = TLSEnabled, tls_options = TLSOpts,
- sid = ejabberd_sm:make_sid(), streamid = new_id(),
- access = Access, shaper = Shaper, ip = IP,
- mgmt_state = StreamMgmtState,
- mgmt_max_queue = MaxAckQueue,
- mgmt_timeout = ResumeTimeout,
- mgmt_max_timeout = MaxResumeTimeout,
- mgmt_ack_timeout = AckTimeout,
- mgmt_resend = ResendOnTimeout},
- {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}.
-
-spec get_subscribed(pid()) -> [ljid()].
-%% Return list of all available resources of contacts,
-get_subscribed(FsmRef) ->
- (?GEN_FSM):sync_send_all_state_event(FsmRef,
- get_subscribed, 1000).
+%% Return list of all available resources of contacts
+get_subscribed(Ref) ->
+ xmpp_stream_in:call(Ref, get_subscribed, 1000).
+
+-spec close(pid()) -> ok.
+close(Ref) ->
+ xmpp_stream_in:cast(Ref, closed).
+
+-spec send(state(), xmpp_element()) -> next_state().
+send(State, Pkt) ->
+ xmpp_stream_in:send(State, Pkt).
+
+%%%===================================================================
+%%% xmpp_stream_in callbacks
+%%%===================================================================
+tls_options(#{server := Server, tls_options := TLSOpts}) ->
+ LServer = jid:nameprep(Server),
+ case ejabberd_config:get_option({domain_certfile, LServer},
+ fun iolist_to_binary/1) of
+ undefined ->
+ TLSOpts;
+ CertFile ->
+ lists:keystore(certfile, 1, TLSOpts, {certfile, CertFile})
+ end.
-wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
- try xmpp:decode(#xmlel{name = Name, attrs = Attrs}) of
- #stream_start{xmlns = NS_CLIENT, stream_xmlns = NS_STREAM,
- version = Version, lang = Lang}
- when NS_CLIENT /= ?NS_CLIENT; NS_STREAM /= ?NS_STREAM ->
- send_header(StateData, ?MYNAME, Version, Lang),
- send_element(StateData, xmpp:serr_invalid_namespace()),
- {stop, normal, StateData};
- #stream_start{lang = Lang, version = Version} when byte_size(Lang) > 35 ->
- %% As stated in BCP47, 4.4.1:
- %% Protocols or specifications that specify limited buffer sizes for
- %% language tags MUST allow for language tags of at least 35 characters.
- %% Do not store long language tag to avoid possible DoS/flood attacks
- send_header(StateData, ?MYNAME, Version, ?MYLANG),
- Txt = <<"Too long value of 'xml:lang' attribute">>,
- send_element(StateData,
- xmpp:serr_policy_violation(Txt, ?MYLANG)),
- {stop, normal, StateData};
- #stream_start{to = undefined, lang = Lang, version = Version} ->
- Txt = <<"Missing 'to' attribute">>,
- send_header(StateData, ?MYNAME, Version, Lang),
- send_element(StateData,
- xmpp:serr_improper_addressing(Txt, Lang)),
- {stop, normal, StateData};
- #stream_start{to = #jid{lserver = To}, lang = Lang,
- version = Version} ->
- Server = case StateData#state.server of
- <<"">> -> To;
- S -> S
- end,
- StreamVersion = case Version of
- {1,0} -> {1,0};
- _ -> undefined
- end,
- IsBlacklistedIP = is_ip_blacklisted(StateData#state.ip, Lang),
- case lists:member(Server, ?MYHOSTS) of
- true when IsBlacklistedIP == false ->
- change_shaper(StateData, jid:make(<<"">>, Server, <<"">>)),
- case StreamVersion of
- {1,0} ->
- send_header(StateData, Server, {1,0}, ?MYLANG),
- case StateData#state.authenticated of
- false ->
- TLS = StateData#state.tls,
- TLSEnabled = StateData#state.tls_enabled,
- TLSRequired = StateData#state.tls_required,
- SASLState = cyrsasl:server_new(
- <<"jabber">>, Server, <<"">>, [],
- fun (U) ->
- ejabberd_auth:get_password_with_authmodule(
- U, Server)
- end,
- fun(U, AuthzId, P) ->
- ejabberd_auth:check_password_with_authmodule(
- U, AuthzId, Server, P)
- end,
- fun(U, AuthzId, P, D, DG) ->
- ejabberd_auth:check_password_with_authmodule(
- U, AuthzId, Server, P, D, DG)
- end),
- Mechs =
- case TLSEnabled or not TLSRequired of
- true ->
- [#sasl_mechanisms{list = cyrsasl:listmech(Server)}];
- false ->
- []
- end,
- SockMod =
- (StateData#state.sockmod):get_sockmod(StateData#state.socket),
- Zlib = StateData#state.zlib,
- CompressFeature = case Zlib andalso
- ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of
- true ->
- [#compression{methods = [<<"zlib">>]}];
- _ ->
- []
- end,
- TLSFeature =
- case (TLS == true) andalso
- (TLSEnabled == false) andalso
- (SockMod == gen_tcp) of
- true ->
- [#starttls{required = TLSRequired}];
- false ->
- []
- end,
- StreamFeatures1 = TLSFeature ++ CompressFeature ++ Mechs,
- StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features,
- Server, StreamFeatures1, [Server]),
- send_element(StateData,
- #stream_features{sub_els = StreamFeatures}),
- fsm_next_state(wait_for_feature_request,
- StateData#state{server = Server,
- sasl_state = SASLState,
- lang = Lang});
- _ ->
- case StateData#state.resource of
- <<"">> ->
- RosterVersioningFeature =
- ejabberd_hooks:run_fold(roster_get_versioning_feature,
- Server, [],
- [Server]),
- StreamManagementFeature =
- case stream_mgmt_enabled(StateData) of
- true ->
- [#feature_sm{xmlns = ?NS_STREAM_MGMT_2},
- #feature_sm{xmlns = ?NS_STREAM_MGMT_3}];
- false ->
- []
- end,
- SockMod =
- (StateData#state.sockmod):get_sockmod(
- StateData#state.socket),
- Zlib = StateData#state.zlib,
- CompressFeature =
- case Zlib andalso
- ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of
- true ->
- [#compression{methods = [<<"zlib">>]}];
- _ ->
- []
- end,
- StreamFeatures1 =
- [#bind{}, #xmpp_session{optional = true}]
- ++
- RosterVersioningFeature ++
- StreamManagementFeature ++
- CompressFeature ++
- ejabberd_hooks:run_fold(c2s_post_auth_features,
- Server, [], [Server]),
- StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features,
- Server, StreamFeatures1, [Server]),
- send_element(StateData,
- #stream_features{sub_els = StreamFeatures}),
- fsm_next_state(wait_for_bind,
- StateData#state{server = Server, lang = Lang});
- _ ->
- send_element(StateData, #stream_features{}),
- fsm_next_state(session_established,
- StateData#state{server = Server, lang = Lang})
- end
- end;
- _ ->
- send_header(StateData, Server, StreamVersion, ?MYLANG),
- if not StateData#state.tls_enabled and
- StateData#state.tls_required ->
- send_element(
- StateData,
- xmpp:serr_policy_violation(
- <<"Use of STARTTLS required">>, Lang)),
- {stop, normal, StateData};
- true ->
- fsm_next_state(wait_for_auth,
- StateData#state{server = Server,
- lang = Lang})
- end
- end;
- true ->
- IP = StateData#state.ip,
- {true, LogReason, ReasonT} = IsBlacklistedIP,
- ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s",
- [jlib:ip_to_list(IP), LogReason]),
- send_header(StateData, Server, StreamVersion, ?MYLANG),
- send_element(StateData, xmpp:serr_policy_violation(ReasonT, Lang)),
- {stop, normal, StateData};
- _ ->
- send_header(StateData, ?MYNAME, StreamVersion, ?MYLANG),
- send_element(StateData, xmpp:serr_host_unknown()),
- {stop, normal, StateData}
- end;
- _ ->
- send_header(StateData, ?MYNAME, {1,0}, ?MYLANG),
- send_element(StateData, xmpp:serr_invalid_xml()),
- {stop, normal, StateData}
- catch _:{xmpp_codec, Why} ->
- Txt = xmpp:format_error(Why),
- send_header(StateData, ?MYNAME, {1,0}, ?MYLANG),
- send_element(StateData, xmpp:serr_invalid_xml(Txt, ?MYLANG)),
- {stop, normal, StateData}
- end;
-wait_for_stream(timeout, StateData) ->
- {stop, normal, StateData};
-wait_for_stream({xmlstreamelement, _}, StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_stream({xmlstreamend, _}, StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_stream({xmlstreamerror, _}, StateData) ->
- send_header(StateData, ?MYNAME, {1,0}, <<"">>),
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_stream(closed, StateData) ->
- {stop, normal, StateData};
-wait_for_stream(stop, StateData) ->
- {stop, normal, StateData}.
+tls_required(#{tls_required := TLSRequired}) ->
+ TLSRequired.
-wait_for_auth({xmlstreamelement, #xmlel{} = El}, StateData) ->
- decode_element(El, wait_for_auth, StateData);
-wait_for_auth(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) ->
- fsm_next_state(wait_for_auth, dispatch_stream_mgmt(Pkt, StateData));
-wait_for_auth(#iq{type = get, sub_els = [#legacy_auth{}]} = IQ, StateData) ->
- Auth = #legacy_auth{username = <<>>, password = <<>>, resource = <<>>},
- Res = case ejabberd_auth:plain_password_required(StateData#state.server) of
- false ->
- xmpp:make_iq_result(IQ, Auth#legacy_auth{digest = <<>>});
- true ->
- xmpp:make_iq_result(IQ, Auth)
- end,
- send_element(StateData, Res),
- fsm_next_state(wait_for_auth, StateData);
-wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{resource = <<"">>}]} = IQ,
- StateData) ->
- Lang = StateData#state.lang,
- Txt = <<"No resource provided">>,
- Err = xmpp:make_error(IQ, xmpp:err_not_acceptable(Txt, Lang)),
- send_element(StateData, Err),
- fsm_next_state(wait_for_auth, StateData);
-wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{username = U,
- password = P0,
- digest = D0,
- resource = R}]} = IQ,
- StateData) when is_binary(U), is_binary(R) ->
- JID = jid:make(U, StateData#state.server, R),
- case (JID /= error) andalso
- acl:access_matches(StateData#state.access,
- #{usr => jid:split(JID), ip => StateData#state.ip},
- StateData#state.server) == allow of
- true ->
- DGen = fun (PW) ->
- p1_sha:sha(<<(StateData#state.streamid)/binary, PW/binary>>)
- end,
- P = if is_binary(P0) -> P0; true -> <<>> end,
- D = if is_binary(D0) -> D0; true -> <<>> end,
- case ejabberd_auth:check_password_with_authmodule(
- U, U, StateData#state.server, P, D, DGen) of
- {true, AuthModule} ->
- ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p from ~s",
- [StateData#state.socket,
- jid:to_string(JID), AuthModule,
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [true, U, StateData#state.server,
- StateData#state.ip]),
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, AuthModule}],
- Res = xmpp:make_iq_result(IQ),
- send_element(StateData, Res),
- ejabberd_sm:open_session(StateData#state.sid, U,
- StateData#state.server, R,
- Info),
- change_shaper(StateData, JID),
- {Fs, Ts} = ejabberd_hooks:run_fold(
- roster_get_subscription_lists,
- StateData#state.server,
- {[], []},
- [U, StateData#state.server]),
- LJID = jid:tolower(jid:remove_resource(JID)),
- Fs1 = [LJID | Fs],
- Ts1 = [LJID | Ts],
- PrivList = ejabberd_hooks:run_fold(privacy_get_user_list,
- StateData#state.server,
- #userlist{},
- [U, StateData#state.server]),
- NewStateData = StateData#state{
- user = U,
- resource = R,
- jid = JID,
- conn = Conn,
- auth_module = AuthModule,
- pres_f = (?SETS):from_list(Fs1),
- pres_t = (?SETS):from_list(Ts1),
- privacy_list = PrivList},
- fsm_next_state(session_established, NewStateData);
- _ ->
- ?INFO_MSG("(~w) Failed legacy authentication for ~s from ~s",
- [StateData#state.socket,
- jid:to_string(JID),
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [false, U, StateData#state.server,
- StateData#state.ip]),
- Lang = StateData#state.lang,
- Txt = <<"Legacy authentication failed">>,
- Err = xmpp:make_error(IQ, xmpp:err_not_authorized(Txt, Lang)),
- send_element(StateData, Err),
- fsm_next_state(wait_for_auth, StateData)
- end;
- false when JID == error ->
- ?INFO_MSG("(~w) Forbidden legacy authentication "
- "for username '~s' with resource '~s'",
- [StateData#state.socket, U, R]),
- Err = xmpp:make_error(IQ, xmpp:err_jid_malformed()),
- send_element(StateData, Err),
- fsm_next_state(wait_for_auth, StateData);
- false ->
- ?INFO_MSG("(~w) Forbidden legacy authentication for ~s from ~s",
- [StateData#state.socket,
- jid:to_string(JID),
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [false, U, StateData#state.server,
- StateData#state.ip]),
- Lang = StateData#state.lang,
- Txt = <<"Legacy authentication forbidden">>,
- Err = xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)),
- send_element(StateData, Err),
- fsm_next_state(wait_for_auth, StateData)
- end;
-wait_for_auth(timeout, StateData) ->
- {stop, normal, StateData};
-wait_for_auth({xmlstreamend, _Name}, StateData) ->
- {stop, normal, StateData};
-wait_for_auth({xmlstreamerror, _}, StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_auth(closed, StateData) ->
- {stop, normal, StateData};
-wait_for_auth(stop, StateData) ->
- {stop, normal, StateData};
-wait_for_auth(Pkt, StateData) ->
- process_unauthenticated_stanza(StateData, Pkt),
- fsm_next_state(wait_for_auth, StateData).
+unauthenticated_stream_features(#{server := Server}) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_pre_auth_features, LServer, [], [LServer]).
-wait_for_feature_request({xmlstreamelement, El}, StateData) ->
- decode_element(El, wait_for_feature_request, StateData);
-wait_for_feature_request(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) ->
- fsm_next_state(wait_for_feature_request,
- dispatch_stream_mgmt(Pkt, StateData));
-wait_for_feature_request(#sasl_auth{mechanism = Mech,
- text = ClientIn},
- #state{tls_enabled = TLSEnabled,
- tls_required = TLSRequired} = StateData)
- when TLSEnabled or not TLSRequired ->
- case cyrsasl:server_start(StateData#state.sasl_state, Mech, ClientIn) of
- {ok, Props} ->
- (StateData#state.sockmod):reset_stream(StateData#state.socket),
- U = identity(Props),
- AuthModule = proplists:get_value(auth_module, Props, undefined),
- ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s",
- [StateData#state.socket, U, AuthModule,
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [true, U, StateData#state.server,
- StateData#state.ip]),
- send_element(StateData, #sasl_success{}),
- fsm_next_state(wait_for_stream,
- StateData#state{streamid = new_id(),
- authenticated = true,
- auth_module = AuthModule,
- sasl_state = undefined,
- user = U});
- {continue, ServerOut, NewSASLState} ->
- send_element(StateData, #sasl_challenge{text = ServerOut}),
- fsm_next_state(wait_for_sasl_response,
- StateData#state{sasl_state = NewSASLState});
- {error, Error, Username} ->
- ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s",
- [StateData#state.socket,
- Username, StateData#state.server,
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [false, Username, StateData#state.server,
- StateData#state.ip]),
- send_element(StateData, #sasl_failure{reason = Error}),
- fsm_next_state(wait_for_feature_request, StateData);
- {error, Error} ->
- send_element(StateData, #sasl_failure{reason = Error}),
- fsm_next_state(wait_for_feature_request, StateData)
- end;
-wait_for_feature_request(#starttls{},
- #state{tls = true, tls_enabled = false} = StateData) ->
- case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of
- gen_tcp ->
- TLSOpts = case ejabberd_config:get_option(
- {domain_certfile, StateData#state.server},
- fun iolist_to_binary/1) of
- undefined ->
- StateData#state.tls_options;
- CertFile ->
- lists:keystore(certfile, 1,
- StateData#state.tls_options,
- {certfile, CertFile})
- end,
- Socket = StateData#state.socket,
- BProceed = fxml:element_to_binary(xmpp:encode(#starttls_proceed{})),
- TLSSocket = (StateData#state.sockmod):starttls(Socket, TLSOpts, BProceed),
- fsm_next_state(wait_for_stream,
- StateData#state{socket = TLSSocket,
- streamid = new_id(),
- tls_enabled = true});
- _ ->
- Lang = StateData#state.lang,
- Txt = <<"Unsupported TLS transport">>,
- send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)),
- {stop, normal, StateData}
- end;
-wait_for_feature_request(#compress{} = Comp, StateData) ->
- Zlib = StateData#state.zlib,
- SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket),
- if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) ->
- process_compression_request(Comp, wait_for_feature_request, StateData);
- true ->
- send_element(StateData, #compress_failure{reason = 'setup-failed'}),
- fsm_next_state(wait_for_feature_request, StateData)
- end;
-wait_for_feature_request(timeout, StateData) ->
- {stop, normal, StateData};
-wait_for_feature_request({xmlstreamend, _Name},
- StateData) ->
- {stop, normal, StateData};
-wait_for_feature_request({xmlstreamerror, _},
- StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_feature_request(closed, StateData) ->
- {stop, normal, StateData};
-wait_for_feature_request(stop, StateData) ->
- {stop, normal, StateData};
-wait_for_feature_request(_Pkt,
- #state{tls_required = TLSRequired,
- tls_enabled = TLSEnabled} = StateData)
- when TLSRequired and not TLSEnabled ->
- Lang = StateData#state.lang,
- Txt = <<"Use of STARTTLS required">>,
- send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)),
- {stop, normal, StateData};
-wait_for_feature_request(Pkt, StateData) ->
- process_unauthenticated_stanza(StateData, Pkt),
- fsm_next_state(wait_for_feature_request, StateData).
+authenticated_stream_features(#{server := Server}) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_post_auth_features, LServer, [], [LServer]).
-wait_for_sasl_response({xmlstreamelement, El}, StateData) ->
- decode_element(El, wait_for_sasl_response, StateData);
-wait_for_sasl_response(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) ->
- fsm_next_state(wait_for_sasl_response,
- dispatch_stream_mgmt(Pkt, StateData));
-wait_for_sasl_response(#sasl_response{text = ClientIn}, StateData) ->
- case cyrsasl:server_step(StateData#state.sasl_state, ClientIn) of
- {ok, Props} ->
- catch (StateData#state.sockmod):reset_stream(StateData#state.socket),
- U = identity(Props),
- AuthModule = proplists:get_value(auth_module, Props, <<>>),
- ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s",
- [StateData#state.socket, U, AuthModule,
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [true, U, StateData#state.server,
- StateData#state.ip]),
- send_element(StateData, #sasl_success{}),
- fsm_next_state(wait_for_stream,
- StateData#state{streamid = new_id(),
- authenticated = true,
- auth_module = AuthModule,
- sasl_state = undefined,
- user = U});
- {ok, Props, ServerOut} ->
- (StateData#state.sockmod):reset_stream(StateData#state.socket),
- U = identity(Props),
- AuthModule = proplists:get_value(auth_module, Props, undefined),
- ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s",
- [StateData#state.socket, U, AuthModule,
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [true, U, StateData#state.server,
- StateData#state.ip]),
- send_element(StateData, #sasl_success{text = ServerOut}),
- fsm_next_state(wait_for_stream,
- StateData#state{streamid = new_id(),
- authenticated = true,
- auth_module = AuthModule,
- sasl_state = undefined,
- user = U});
- {continue, ServerOut, NewSASLState} ->
- send_element(StateData, #sasl_challenge{text = ServerOut}),
- fsm_next_state(wait_for_sasl_response,
- StateData#state{sasl_state = NewSASLState});
- {error, Error, Username} ->
- ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s",
- [StateData#state.socket,
- Username, StateData#state.server,
- ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]),
- ejabberd_hooks:run(c2s_auth_result, StateData#state.server,
- [false, Username, StateData#state.server,
- StateData#state.ip]),
- send_element(StateData, #sasl_failure{reason = Error}),
- fsm_next_state(wait_for_feature_request, StateData);
- {error, Error} ->
- send_element(StateData, #sasl_failure{reason = Error}),
- fsm_next_state(wait_for_feature_request, StateData)
- end;
-wait_for_sasl_response(timeout, StateData) ->
- {stop, normal, StateData};
-wait_for_sasl_response({xmlstreamend, _Name},
- StateData) ->
- {stop, normal, StateData};
-wait_for_sasl_response({xmlstreamerror, _},
- StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_sasl_response(closed, StateData) ->
- {stop, normal, StateData};
-wait_for_sasl_response(stop, StateData) ->
- {stop, normal, StateData};
-wait_for_sasl_response(Pkt, StateData) ->
- process_unauthenticated_stanza(StateData, Pkt),
- fsm_next_state(wait_for_feature_request, StateData).
+sasl_mechanisms(#{server := Server}) ->
+ cyrsasl:listmech(jid:nameprep(Server)).
--spec resource_conflict_action(binary(), binary(), binary()) ->
- {accept_resource, binary()} | closenew.
-resource_conflict_action(U, S, R) ->
- OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of
- true ->
- ejabberd_config:get_option(
- {resource_conflict, S},
- fun(setresource) -> setresource;
- (closeold) -> closeold;
- (closenew) -> closenew;
- (acceptnew) -> acceptnew
- end);
- false ->
- acceptnew
- end,
- Option = case OptionRaw of
- setresource -> setresource;
- closeold ->
- acceptnew; %% ejabberd_sm will close old session
- closenew -> closenew;
- acceptnew -> acceptnew;
- _ -> acceptnew %% default ejabberd behavior
- end,
- case Option of
- acceptnew -> {accept_resource, R};
- closenew -> closenew;
- setresource ->
- Rnew = new_uniq_id(),
- {accept_resource, Rnew}
- end.
+init_sasl(#{server := Server}) ->
+ LServer = jid:nameprep(Server),
+ cyrsasl:server_new(
+ <<"jabber">>, LServer, <<"">>, [],
+ fun(U) ->
+ ejabberd_auth:get_password_with_authmodule(U, LServer)
+ end,
+ fun(U, AuthzId, P) ->
+ ejabberd_auth:check_password_with_authmodule(U, AuthzId, LServer, P)
+ end,
+ fun(U, AuthzId, P, D, DG) ->
+ ejabberd_auth:check_password_with_authmodule(U, AuthzId, LServer, P, D, DG)
+ end).
--spec decode_element(xmlel(), state_name(), state()) -> fsm_transition().
-decode_element(#xmlel{} = El, StateName, StateData) ->
- try case xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of
- #iq{sub_els = [_], type = T} = Pkt when T == set; T == get ->
- NewPkt = xmpp:decode_els(
- Pkt, ?NS_CLIENT,
- fun(SubEl) when StateName == session_established ->
- case xmpp:get_ns(SubEl) of
- ?NS_PRIVACY -> true;
- ?NS_BLOCKING -> true;
- _ -> false
- end;
- (SubEl) ->
- xmpp:is_known_tag(SubEl)
- end),
- ?MODULE:StateName(NewPkt, StateData);
- Pkt ->
- ?MODULE:StateName(Pkt, StateData)
- end
- catch error:{xmpp_codec, Why} ->
- NS = xmpp:get_ns(El),
- fsm_next_state(
- StateName,
- case xmpp:is_stanza(El) of
- true ->
- Lang = xmpp:get_lang(El),
- Txt = xmpp:format_error(Why),
- send_error(StateData, El, xmpp:err_bad_request(Txt, Lang));
- false when NS == ?NS_STREAM_MGMT_2; NS == ?NS_STREAM_MGMT_3 ->
- Err = #sm_failed{reason = 'bad-request', xmlns = NS},
- send_element(StateData, Err),
- StateData;
- false ->
- StateData
- end)
+bind(<<"">>, State) ->
+ bind(new_uniq_id(), State);
+bind(R, #{user := U, server := S} = State) ->
+ case resource_conflict_action(U, S, R) of
+ closenew ->
+ {error, xmpp:err_conflict(), State};
+ {accept_resource, Resource} ->
+ open_session(State, Resource)
end.
-wait_for_bind({xmlstreamelement, El}, StateData) ->
- decode_element(El, wait_for_bind, StateData);
-wait_for_bind(#sm_resume{} = Pkt, StateData) ->
- case handle_resume(StateData, Pkt) of
- {ok, ResumedState} ->
- fsm_next_state(session_established, ResumedState);
- error ->
- fsm_next_state(wait_for_bind, StateData)
- end;
-wait_for_bind(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) ->
- fsm_next_state(wait_for_bind, dispatch_stream_mgmt(Pkt, StateData));
-wait_for_bind(#iq{type = set,
- sub_els = [#bind{resource = R0}]} = IQ, StateData) ->
- U = StateData#state.user,
- R = case R0 of
- <<>> -> new_uniq_id();
- _ -> R0
- end,
- case resource_conflict_action(U, StateData#state.server, R) of
- closenew ->
- Err = xmpp:make_error(IQ, xmpp:err_conflict()),
- send_element(StateData, Err),
- fsm_next_state(wait_for_bind, StateData);
- {accept_resource, R2} ->
- JID = jid:make(U, StateData#state.server, R2),
- StateData2 = StateData#state{resource = R2, jid = JID},
- case open_session(StateData2) of
- {ok, StateData3} ->
- Res = xmpp:make_iq_result(IQ, #bind{jid = JID}),
- try
- send_element(StateData3, Res)
- catch
- exit:normal -> close(self())
- end,
- fsm_next_state_pack(session_established,StateData3);
- {error, Error} ->
- Err = xmpp:make_error(IQ, Error),
- send_element(StateData, Err),
- fsm_next_state(wait_for_bind, StateData)
+handle_stream_start(#{server := Server, ip := IP, lang := Lang} = State) ->
+ LServer = jid:nameprep(Server),
+ case lists:member(LServer, ?MYHOSTS) of
+ false ->
+ xmpp_stream_in:send(State, xmpp:serr_host_unknown());
+ true ->
+ case check_bl_c2s(IP, Lang) of
+ false ->
+ change_shaper(State),
+ {noreply, State};
+ {true, LogReason, ReasonT} ->
+ ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s",
+ [jlib:ip_to_list(IP), LogReason]),
+ Err = xmpp:serr_policy_violation(ReasonT, Lang),
+ xmpp_stream_in:send(State, Err)
end
- end;
-wait_for_bind(#compress{} = Comp, StateData) ->
- Zlib = StateData#state.zlib,
- SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket),
- if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) ->
- process_compression_request(Comp, wait_for_bind, StateData);
- true ->
- send_element(StateData, #compress_failure{reason = 'setup-failed'}),
- fsm_next_state(wait_for_bind, StateData)
- end;
-wait_for_bind(timeout, StateData) ->
- {stop, normal, StateData};
-wait_for_bind({xmlstreamend, _Name}, StateData) ->
- {stop, normal, StateData};
-wait_for_bind({xmlstreamerror, _}, StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-wait_for_bind(closed, StateData) ->
- {stop, normal, StateData};
-wait_for_bind(stop, StateData) ->
- {stop, normal, StateData};
-wait_for_bind(Pkt, StateData) ->
- fsm_next_state(
- wait_for_bind,
- case xmpp:is_stanza(Pkt) of
- true ->
- send_error(StateData, Pkt, xmpp:err_not_acceptable());
- false ->
- StateData
- end).
-
--spec open_session(state()) -> {ok, state()} | {error, stanza_error()}.
-open_session(StateData) ->
- U = StateData#state.user,
- R = StateData#state.resource,
- JID = StateData#state.jid,
- Lang = StateData#state.lang,
- IP = StateData#state.ip,
- case acl:access_matches(StateData#state.access,
- #{usr => jid:split(JID), ip => IP},
- StateData#state.server) of
- allow ->
- ?INFO_MSG("(~w) Opened session for ~s",
- [StateData#state.socket, jid:to_string(JID)]),
- change_shaper(StateData, JID),
- {Fs, Ts} = ejabberd_hooks:run_fold(
- roster_get_subscription_lists,
- StateData#state.server,
- {[], []},
- [U, StateData#state.server]),
- LJID = jid:tolower(jid:remove_resource(JID)),
- Fs1 = [LJID | Fs],
- Ts1 = [LJID | Ts],
- PrivList =
- ejabberd_hooks:run_fold(
- privacy_get_user_list,
- StateData#state.server,
- #userlist{},
- [U, StateData#state.server]),
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
- ejabberd_sm:open_session(
- StateData#state.sid, U, StateData#state.server, R, Info),
- UpdatedStateData =
- StateData#state{
- conn = Conn,
- pres_f = ?SETS:from_list(Fs1),
- pres_t = ?SETS:from_list(Ts1),
- privacy_list = PrivList},
- {ok, UpdatedStateData};
- _ ->
- ejabberd_hooks:run(forbidden_session_hook,
- StateData#state.server, [JID]),
- ?INFO_MSG("(~w) Forbidden session for ~s",
- [StateData#state.socket, jid:to_string(JID)]),
- Txt = <<"Denied by ACL">>,
- {error, xmpp:err_not_allowed(Txt, Lang)}
end.
-session_established({xmlstreamelement, El}, StateData) ->
- decode_element(El, session_established, StateData);
-session_established(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) ->
- fsm_next_state(session_established, dispatch_stream_mgmt(Pkt, StateData));
-session_established(#csi{type = active}, StateData) ->
- NewStateData = csi_flush_queue(StateData),
- fsm_next_state(session_established, NewStateData#state{csi_state = active});
-session_established(#csi{type = inactive}, StateData) ->
- fsm_next_state(session_established, StateData#state{csi_state = inactive});
-%% We hibernate the process to reduce memory consumption after a
-%% configurable activity timeout
-session_established(timeout, StateData) ->
- Options = [],
- proc_lib:hibernate(?GEN_FSM, enter_loop,
- [?MODULE, Options, session_established, StateData]),
- fsm_next_state(session_established, StateData);
-session_established({xmlstreamend, _Name}, StateData) ->
- {stop, normal, StateData};
-session_established({xmlstreamerror,
- <<"XML stanza is too big">> = E},
- StateData) ->
- send_element(StateData,
- xmpp:serr_policy_violation(E, StateData#state.lang)),
- {stop, normal, StateData};
-session_established({xmlstreamerror, _}, StateData) ->
- send_element(StateData, xmpp:serr_not_well_formed()),
- {stop, normal, StateData};
-session_established(closed, #state{mgmt_state = active} = StateData) ->
- catch (StateData#state.sockmod):close(StateData#state.socket),
- fsm_next_state(wait_for_resume, StateData);
-session_established(closed, StateData) ->
- {stop, normal, StateData};
-session_established(stop, StateData) ->
- {stop, normal, StateData};
-session_established(Pkt, StateData) when ?is_stanza(Pkt) ->
- FromJID = StateData#state.jid,
- case check_from(Pkt, FromJID) of
- 'invalid-from' ->
- send_element(StateData, xmpp:serr_invalid_from()),
- {stop, normal, StateData};
- _ ->
- NewStateData = update_num_stanzas_in(StateData, Pkt),
- session_established2(Pkt, NewStateData)
- end;
-session_established(_Pkt, StateData) ->
- fsm_next_state(session_established, StateData).
+handle_stream_end(State) ->
+ {stop, normal, State}.
+
+handle_stream_close(State) ->
+ {stop, normal, State}.
+
+handle_auth_success(User, Mech, AuthModule,
+ #{socket := Socket, ip := IP, server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ?INFO_MSG("(~w) Accepted ~s authentication for ~s@~s by ~p from ~s",
+ [Socket, Mech, User, LServer, AuthModule,
+ ejabberd_config:may_hide_data(jlib:ip_to_list(IP))]),
+ State1 = State#{auth_module => AuthModule},
+ ejabberd_hooks:run_fold(c2s_auth_result, LServer,
+ {noreply, State1}, [true, User]).
+
+handle_auth_failure(User, Mech, Reason,
+ #{socket := Socket, ip := IP, server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ?INFO_MSG("(~w) Failed ~s authentication ~sfrom ~s: ~s",
+ [Socket, Mech,
+ if User /= <<"">> -> ["for ", User, "@", LServer, " "];
+ true -> ""
+ end,
+ ejabberd_config:may_hide_data(jlib:ip_to_list(IP)), Reason]),
+ ejabberd_hooks:run_fold(c2s_auth_result, LServer,
+ {noreply, State}, [false, User]).
+
+handle_unbinded_packet(Pkt, #{server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer,
+ {noreply, State}, [Pkt]).
+
+handle_unauthenticated_packet(Pkt, #{server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_unauthenticated_packet,
+ LServer, {noreply, State}, [Pkt]).
+
+handle_authenticated_packet(Pkt, #{server := Server} = State) when not ?is_stanza(Pkt) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_authenticated_packet,
+ LServer, {noreply, State}, [Pkt]);
+handle_authenticated_packet(Pkt, #{server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ case ejabberd_hooks:run_fold(c2s_authenticated_packet,
+ LServer, {noreply, State}, [Pkt]) of
+ {noreply, State1} ->
+ Pkt1 = ejabberd_hooks:run_fold(user_send_packet, LServer, Pkt, [State1]),
+ Res = case Pkt1 of
+ #presence{to = #jid{lresource = <<"">>}} ->
+ process_self_presence(State1, Pkt1);
+ #presence{} ->
+ process_presence_out(State1, Pkt1);
+ _ ->
+ check_privacy_then_route(State1, Pkt1)
+ end,
+ ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]),
+ Res;
+ Err ->
+ ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]),
+ Err
+ end.
--spec session_established2(xmpp_element(), state()) -> fsm_next().
-%% Process packets sent by user (coming from user on c2s XMPP connection)
-session_established2(Pkt, StateData) ->
- User = StateData#state.user,
- Server = StateData#state.server,
- FromJID = StateData#state.jid,
- ToJID = case xmpp:get_to(Pkt) of
- undefined -> jid:make(User, Server, <<"">>);
- J -> J
- end,
- Lang = case xmpp:get_lang(Pkt) of
- <<"">> -> StateData#state.lang;
- L -> L
+init([State, Opts]) ->
+ Access = gen_mod:get_opt(access, Opts, fun acl:access_rules_validator/1, all),
+ Shaper = gen_mod:get_opt(shaper, Opts, fun acl:shaper_rules_validator/1, none),
+ TLSOpts = lists:filter(
+ fun({certfile, _}) -> true;
+ ({ciphers, _}) -> true;
+ ({dhfile, _}) -> true;
+ (_) -> false
+ end, Opts),
+ TLSRequired = proplists:get_bool(starttls_required, Opts),
+ TLSVerify = proplists:get_bool(tls_verify, Opts),
+ State1 = State#{tls_options => TLSOpts,
+ tls_required => TLSRequired,
+ tls_verify => TLSVerify,
+ pres_a => ?SETS:new(),
+ pres_f => ?SETS:new(),
+ pres_t => ?SETS:new(),
+ sid => ejabberd_sm:make_sid(),
+ lang => ?MYLANG,
+ server => ?MYNAME,
+ access => Access,
+ shaper => Shaper},
+ ejabberd_hooks:run_fold(c2s_init, {ok, State1}, []).
+
+handle_call(get_presence, _From,
+ #{user := U, server := S, resource := R} = State) ->
+ Pres = case maps:get(pres_last, State, undefined) of
+ undefined ->
+ From = jid:make(U, S, R),
+ To = jid:remove_resource(From),
+ #presence{from = From, to = To, type = unavailable};
+ P ->
+ P
end,
- NewPkt = xmpp:set_lang(Pkt, Lang),
- NewState =
- case NewPkt of
- #presence{} ->
- Presence0 = ejabberd_hooks:run_fold(
- c2s_update_presence, Server, NewPkt,
- [User, Server]),
- Presence = ejabberd_hooks:run_fold(
- user_send_packet, Server, Presence0,
- [StateData, FromJID, ToJID]),
- case ToJID of
- #jid{user = User, server = Server, resource = <<"">>} ->
- ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)",
- [FromJID, Presence, StateData]),
- presence_update(FromJID, Presence,
- StateData);
- _ ->
- presence_track(FromJID, ToJID, Presence,
- StateData)
- end;
- #iq{type = T, sub_els = [El]} when T == set; T == get ->
- NS = xmpp:get_ns(El),
- if NS == ?NS_BLOCKING; NS == ?NS_PRIVACY ->
- IQ = xmpp:set_from_to(Pkt, FromJID, ToJID),
- process_privacy_iq(IQ, StateData);
- NS == ?NS_SESSION ->
- Res = xmpp:make_iq_result(Pkt),
- send_stanza(StateData, Res);
- true ->
- NewPkt0 = ejabberd_hooks:run_fold(
- user_send_packet, Server, NewPkt,
- [StateData, FromJID, ToJID]),
- check_privacy_route(FromJID, StateData, FromJID,
- ToJID, NewPkt0)
- end;
- _ ->
- NewPkt0 = ejabberd_hooks:run_fold(
- user_send_packet, Server, NewPkt,
- [StateData, FromJID, ToJID]),
- check_privacy_route(FromJID, StateData, FromJID,
- ToJID, NewPkt0)
- end,
- ejabberd_hooks:run(c2s_loop_debug,
- [{xmlstreamelement, Pkt}]),
- fsm_next_state(session_established, NewState).
-
-wait_for_resume({xmlstreamelement, _El} = Event, StateData) ->
- Result = session_established(Event, StateData),
- fsm_next_state(wait_for_resume, element(3, Result));
-wait_for_resume(timeout, StateData) ->
- ?DEBUG("Timed out waiting for resumption of stream for ~s",
- [jid:to_string(StateData#state.jid)]),
- {stop, normal, StateData#state{mgmt_state = timeout}};
-wait_for_resume(Event, StateData) ->
- ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]),
- fsm_next_state(wait_for_resume, StateData).
-
-handle_event(_Event, StateName, StateData) ->
- fsm_next_state(StateName, StateData).
-
-handle_sync_event({get_presence}, _From, StateName,
- StateData) ->
- User = StateData#state.user,
- PresLast = StateData#state.pres_last,
- Show = get_showtag(PresLast),
- Status = get_statustag(PresLast),
- Resource = StateData#state.resource,
- Reply = {User, Resource, Show, Status},
- fsm_reply(Reply, StateName, StateData);
-handle_sync_event({get_last_presence}, _From, StateName,
- StateData) ->
- User = StateData#state.user,
- Server = StateData#state.server,
- PresLast = StateData#state.pres_last,
- Resource = StateData#state.resource,
- Reply = {User, Server, Resource, PresLast},
- fsm_reply(Reply, StateName, StateData);
-
-handle_sync_event(get_subscribed, _From, StateName,
- StateData) ->
- Subscribed = (?SETS):to_list(StateData#state.pres_f),
- {reply, Subscribed, StateName, StateData};
-handle_sync_event({resume_session, Time}, _From, _StateName,
- StateData) when element(1, StateData#state.sid) == Time ->
- %% The old session should be closed before the new one is opened, so we do
- %% this here instead of leaving it to the terminate callback
- ejabberd_sm:close_session(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource),
- {stop, normal, {resume, StateData}, StateData#state{mgmt_state = resumed}};
-handle_sync_event({resume_session, _Time}, _From, StateName,
- StateData) ->
- {reply, {error, <<"Previous session not found">>}, StateName, StateData};
-handle_sync_event(_Event, _From, StateName,
- StateData) ->
- Reply = ok, fsm_reply(Reply, StateName, StateData).
-
-code_change(_OldVsn, StateName, StateData, _Extra) ->
- {ok, StateName, StateData}.
-
-handle_info({send_text, Text}, StateName, StateData) ->
- send_text(StateData, Text),
- ejabberd_hooks:run(c2s_loop_debug, [Text]),
- fsm_next_state(StateName, StateData);
-handle_info(replaced, StateName, StateData) ->
- Lang = StateData#state.lang,
- Pkt = xmpp:serr_conflict(<<"Replaced by new connection">>, Lang),
- handle_info({kick, replaced, Pkt}, StateName, StateData);
-handle_info(kick, StateName, StateData) ->
- Lang = StateData#state.lang,
- Pkt = xmpp:serr_policy_violation(<<"has been kicked">>, Lang),
- handle_info({kick, kicked_by_admin, Pkt}, StateName, StateData);
-handle_info({kick, Reason, Pkt}, _StateName, StateData) ->
- send_element(StateData, Pkt),
- {stop, normal,
- StateData#state{authenticated = Reason}};
-handle_info({route, _From, _To, {broadcast, Data}},
- StateName, StateData) ->
- ?DEBUG("broadcast~n~p~n", [Data]),
- case Data of
- {item, IJID, ISubscription} ->
- fsm_next_state(StateName,
- roster_change(IJID, ISubscription, StateData));
- {exit, Reason} ->
- Lang = StateData#state.lang,
- send_element(StateData, xmpp:serr_conflict(Reason, Lang)),
- {stop, normal, StateData};
- {privacy_list, PrivList, PrivListName} ->
- case ejabberd_hooks:run_fold(privacy_updated_list,
- StateData#state.server,
- false,
- [StateData#state.privacy_list,
- PrivList]) of
- false ->
- fsm_next_state(StateName, StateData);
- NewPL ->
- PrivPushIQ =
- #iq{type = set,
- from = jid:remove_resource(StateData#state.jid),
- to = StateData#state.jid,
- id = <<"push", (randoms:get_string())/binary>>,
- sub_els = [#privacy_query{
- lists = [#privacy_list{
- name = PrivListName}]}]},
- NewState = send_stanza(StateData, PrivPushIQ),
- fsm_next_state(StateName,
- NewState#state{privacy_list = NewPL})
- end;
- {blocking, What} ->
- NewState = route_blocking(What, StateData),
- fsm_next_state(StateName, NewState);
- _ ->
- fsm_next_state(StateName, StateData)
- end;
-%% Process Packets that are to be send to the user
-handle_info({route, From, To, Packet}, StateName, StateData) when ?is_stanza(Packet) ->
- {Pass, NewState} =
- case Packet of
- #presence{type = T} ->
- State = ejabberd_hooks:run_fold(c2s_presence_in,
- StateData#state.server,
- StateData,
- [{From, To, Packet}]),
- case T of
- probe ->
- LFrom = jid:tolower(From),
- LBFrom = jid:remove_resource(LFrom),
- NewStateData =
- case (?SETS):is_element(LFrom, State#state.pres_a)
- orelse (?SETS):is_element(LBFrom, State#state.pres_a) of
- true -> State;
- false ->
- case (?SETS):is_element(LFrom, State#state.pres_f) of
- true ->
- A = (?SETS):add_element(LFrom, State#state.pres_a),
- State#state{pres_a = A};
- false ->
- case (?SETS):is_element(LBFrom, State#state.pres_f) of
- true ->
- A = (?SETS):add_element(LBFrom, State#state.pres_a),
- State#state{pres_a = A};
- false ->
- State
- end
- end
- end,
- process_presence_probe(From, To, NewStateData),
- {false, NewStateData};
- error ->
- NewA = ?SETS:del_element(jid:tolower(From), State#state.pres_a),
- {true, State#state{pres_a = NewA}};
- subscribe ->
- SRes = is_privacy_allow(State, From, To, Packet, in),
- {SRes, State};
- subscribed ->
- SRes = is_privacy_allow(State, From, To, Packet, in),
- {SRes, State};
- unsubscribe ->
- SRes = is_privacy_allow(State, From, To, Packet, in),
- {SRes, State};
- unsubscribed ->
- SRes = is_privacy_allow(State, From, To, Packet, in),
- {SRes, State};
- _ ->
- case privacy_check_packet(State, From, To, Packet, in) of
- allow ->
- LFrom = jid:tolower(From),
- LBFrom = jid:remove_resource(LFrom),
- case (?SETS):is_element(LFrom, State#state.pres_a)
- orelse (?SETS):is_element(LBFrom, State#state.pres_a) of
- true ->
- {true, State};
- false ->
- case (?SETS):is_element(LFrom, State#state.pres_f) of
- true ->
- A = (?SETS):add_element(LFrom, State#state.pres_a),
- {true, State#state{pres_a = A}};
- false ->
- case (?SETS):is_element(LBFrom,
- State#state.pres_f) of
- true ->
- A = (?SETS):add_element(
- LBFrom,
- State#state.pres_a),
- {true, State#state{pres_a = A}};
- false ->
- {true, State}
- end
- end
- end;
- deny -> {false, State}
- end
- end;
- #iq{type = T} ->
- case xmpp:has_subtag(Packet, #last{}) of
- true when T == get; T == set ->
- LFrom = jid:tolower(From),
- LBFrom = jid:remove_resource(LFrom),
- HasFromSub = ((?SETS):is_element(LFrom, StateData#state.pres_f)
- orelse (?SETS):is_element(LBFrom, StateData#state.pres_f))
- andalso is_privacy_allow(StateData, To, From, #presence{}, out),
- case HasFromSub of
- true ->
- case privacy_check_packet(
- StateData, From, To, Packet, in) of
- allow ->
- {true, StateData};
- deny ->
- ejabberd_router:route_error(
- To, From, Packet,
- xmpp:err_service_unavailable()),
- {false, StateData}
- end;
- _ ->
- ejabberd_router:route_error(
- To, From, Packet, xmpp:err_forbidden()),
- {false, StateData}
- end;
- _ ->
- case privacy_check_packet(StateData, From, To, Packet, in) of
- allow ->
- {true, StateData};
- deny ->
- ejabberd_router:route_error(
- To, From, Packet, xmpp:err_service_unavailable()),
- {false, StateData}
- end
- end;
- #message{type = T} ->
- case privacy_check_packet(StateData, From, To, Packet, in) of
- allow ->
- {true, StateData};
- deny ->
- case T of
- groupchat -> ok;
- headline -> ok;
- _ ->
- case xmpp:has_subtag(Packet, #muc_user{}) of
- true ->
- ok;
- false ->
- ejabberd_router:route_error(
- To, From, Packet, xmpp:err_service_unavailable())
- end
- end,
- {false, StateData}
- end
- end,
+ {reply, Pres, State};
+handle_call(get_subscribed, _From, #{pres_f := PresF} = State) ->
+ Subscribed = ?SETS:to_list(PresF),
+ {reply, Subscribed, State};
+handle_call(Request, From, #{server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(
+ c2s_handle_call, LServer, {noreply, State}, [Request, From]).
+
+handle_cast(closed, State) ->
+ handle_stream_close(State);
+handle_cast(Msg, #{server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_handle_cast, LServer, {noreply, State}, [Msg]).
+
+handle_info({route, From, To, Packet0}, #{server := Server} = State) ->
+ Packet = xmpp:set_from_to(Packet0, From, To),
+ LServer = jid:nameprep(Server),
+ {Pass, NewState} = case Packet of
+ #presence{} ->
+ process_presence_in(State, Packet);
+ #message{} ->
+ process_message_in(State, Packet);
+ #iq{} ->
+ process_iq_in(State, Packet)
+ end,
if Pass ->
- FixedPacket0 = xmpp:set_from_to(Packet, From, To),
- FixedPacket = ejabberd_hooks:run_fold(
- user_receive_packet,
- NewState#state.server,
- FixedPacket0,
- [NewState, NewState#state.jid, From, To]),
- SentStateData = send_packet(NewState, FixedPacket),
+ LServer = jid:nameprep(Server),
+ Packet1 = ejabberd_hooks:run_fold(
+ user_receive_packet, LServer, Packet, [NewState]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- fsm_next_state(StateName, SentStateData);
- true ->
- ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- fsm_next_state(StateName, NewState)
- end;
-handle_info({'DOWN', Monitor, _Type, _Object, _Info},
- _StateName, StateData)
- when Monitor == StateData#state.socket_monitor ->
- if StateData#state.mgmt_state == active;
- StateData#state.mgmt_state == pending ->
- fsm_next_state(wait_for_resume, StateData);
+ xmpp_stream_in:send(NewState, Packet1);
true ->
- {stop, normal, StateData}
+ ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
+ {noreply, NewState}
end;
-handle_info(system_shutdown, StateName, StateData) ->
- case StateName of
- wait_for_stream ->
- send_header(StateData, ?MYNAME, {1,0}, <<"en">>),
- send_element(StateData, xmpp:serr_system_shutdown()),
- ok;
- _ ->
- send_element(StateData, xmpp:serr_system_shutdown()),
- ok
- end,
- {stop, normal, StateData};
-handle_info({route_xmlstreamelement, El}, _StateName, StateData) ->
- {next_state, NStateName, NStateData, _Timeout} =
- session_established({xmlstreamelement, El}, StateData),
- fsm_next_state(NStateName, NStateData);
-handle_info({force_update_presence, LUser, LServer}, StateName,
- #state{jid = #jid{luser = LUser, lserver = LServer}} = StateData) ->
- NewStateData = case StateData#state.pres_last of
- #presence{} ->
- Presence =
- ejabberd_hooks:run_fold(c2s_update_presence,
- LServer,
- StateData#state.pres_last,
- [LUser, LServer]),
- StateData2 = StateData#state{pres_last = Presence},
- presence_update(StateData2#state.jid, Presence,
- StateData2),
- StateData2;
- undefined -> StateData
- end,
- fsm_next_state(StateName, NewStateData);
-handle_info({send_filtered, Feature, From, To, Packet}, StateName, StateData) ->
- Drop = ejabberd_hooks:run_fold(c2s_filter_packet, StateData#state.server,
- true, [StateData#state.server, StateData,
- Feature, To, Packet]),
- NewStateData = if Drop ->
- ?DEBUG("Dropping packet from ~p to ~p",
- [jid:to_string(From),
- jid:to_string(To)]),
- StateData;
- true ->
- FinalPacket = xmpp:set_from_to(Packet, From, To),
- case StateData#state.jid of
- To ->
- case privacy_check_packet(StateData, From, To,
- FinalPacket, in) of
- deny ->
- StateData;
- allow ->
- send_stanza(StateData, FinalPacket)
- end;
- _ ->
- ejabberd_router:route(From, To, FinalPacket),
- StateData
- end
- end,
- fsm_next_state(StateName, NewStateData);
-handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
- Recipients = ejabberd_hooks:run_fold(
- c2s_broadcast_recipients, StateData#state.server,
- [],
- [StateData#state.server, StateData, Type, From, Packet]),
- lists:foreach(
- fun(USR) ->
- ejabberd_router:route(
- From, jid:make(USR), Packet)
- end, lists:usort(Recipients)),
- fsm_next_state(StateName, StateData);
-handle_info({set_csi_state, CsiState}, StateName, StateData) ->
- fsm_next_state(StateName, StateData#state{csi_state = CsiState});
-handle_info({set_resume_timeout, Timeout}, StateName, StateData) ->
- fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout});
-handle_info(dont_ask_offline, StateName, StateData) ->
- fsm_next_state(StateName, StateData#state{ask_offline = false});
-handle_info(close, StateName, StateData) ->
- ?DEBUG("Timeout waiting for stream management acknowledgement of ~s",
- [jid:to_string(StateData#state.jid)]),
- close(self()),
- fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined});
-handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) ->
- %% This happens if the resume_session/1 request timed out; the new session
- %% now receives the late response.
- ?DEBUG("Received old session state for ~s after failed resumption",
- [jid:to_string(OldStateData#state.jid)]),
- handle_unacked_stanzas(OldStateData#state{mgmt_resend = false}),
- fsm_next_state(StateName, StateData);
-handle_info(Info, StateName, StateData) ->
- ?ERROR_MSG("Unexpected info: ~p", [Info]),
- fsm_next_state(StateName, StateData).
+handle_info(system_shutdown, State) ->
+ xmpp_stream_in:send(State, xmpp:serr_system_shutdown());
+handle_info(Info, #{server := Server} = State) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(c2s_handle_info, LServer, {noreply, State}, [Info]).
--spec print_state(state()) -> state().
-print_state(State = #state{pres_t = T, pres_f = F, pres_a = A}) ->
- State#state{pres_t = {pres_t, (?SETS):size(T)},
- pres_f = {pres_f, (?SETS):size(F)},
- pres_a = {pres_a, (?SETS):size(A)}}.
-
-terminate(_Reason, StateName, StateData) ->
- case StateData#state.mgmt_state of
- resumed ->
- ?INFO_MSG("Closing former stream of resumed session for ~s",
- [jid:to_string(StateData#state.jid)]);
- _ ->
- if StateName == session_established;
- StateName == wait_for_resume ->
- case StateData#state.authenticated of
- replaced ->
- ?INFO_MSG("(~w) Replaced session for ~s",
- [StateData#state.socket,
- jid:to_string(StateData#state.jid)]),
- From = StateData#state.jid,
- Lang = StateData#state.lang,
- Status = <<"Replaced by new connection">>,
- Packet = #presence{
- type = unavailable,
- status = xmpp:mk_text(Status, Lang)},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- Status),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet);
- _ ->
- ?INFO_MSG("(~w) Close session for ~s",
- [StateData#state.socket,
- jid:to_string(StateData#state.jid)]),
- EmptySet = (?SETS):new(),
- case StateData of
- #state{pres_last = undefined, pres_a = EmptySet} ->
- ejabberd_sm:close_session(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource);
- _ ->
- From = StateData#state.jid,
- Packet = #presence{type = unavailable},
- ejabberd_sm:close_session_unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- <<"">>),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet)
- end,
- case StateData#state.mgmt_state of
- timeout ->
- Info = [{num_stanzas_in,
- StateData#state.mgmt_stanzas_in}],
- ejabberd_sm:set_offline_info(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource,
- Info);
- _ ->
- ok
- end
- end,
- handle_unacked_stanzas(StateData),
- bounce_messages();
- true ->
- ok
- end
- end,
- catch send_trailer(StateData),
- (StateData#state.sockmod):close(StateData#state.socket),
+terminate(_Reason, _State) ->
ok.
-%%%----------------------------------------------------------------------
-%%% Internal functions
-%%%----------------------------------------------------------------------
--spec change_shaper(state(), jid()) -> ok.
-change_shaper(StateData, JID) ->
- Shaper = acl:access_matches(StateData#state.shaper,
- #{usr => jid:split(JID), ip => StateData#state.ip},
- StateData#state.server),
- (StateData#state.sockmod):change_shaper(StateData#state.socket,
- Shaper).
-
--spec send_text(state(), iodata()) -> ok | {error, any()}.
-send_text(StateData, Text) when StateData#state.mgmt_state == pending ->
- ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
-send_text(StateData, Text) when StateData#state.xml_socket ->
- ?DEBUG("Send Text on stream = ~p", [Text]),
- (StateData#state.sockmod):send_xml(StateData#state.socket,
- {xmlstreamraw, Text});
-send_text(StateData, Text) when StateData#state.mgmt_state == active ->
- ?DEBUG("Send XML on stream = ~p", [Text]),
- case catch (StateData#state.sockmod):send(StateData#state.socket, Text) of
- {'EXIT', _} ->
- (StateData#state.sockmod):close(StateData#state.socket),
- {error, closed};
- _ ->
- ok
- end;
-send_text(StateData, Text) ->
- ?DEBUG("Send XML on stream = ~p", [Text]),
- (StateData#state.sockmod):send(StateData#state.socket, Text).
-
--spec send_element(state(), xmlel() | xmpp_element()) -> ok | {error, any()}.
-send_element(StateData, El) when StateData#state.mgmt_state == pending ->
- ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
-send_element(StateData, #xmlel{} = El) when StateData#state.xml_socket ->
- ?DEBUG("Send XML on stream = ~p", [fxml:element_to_binary(El)]),
- (StateData#state.sockmod):send_xml(StateData#state.socket,
- {xmlstreamelement, El});
-send_element(StateData, #xmlel{} = El) ->
- send_text(StateData, fxml:element_to_binary(El));
-send_element(StateData, Pkt) ->
- send_element(StateData, xmpp:encode(Pkt, ?NS_CLIENT)).
-
--spec send_error(state(), xmlel() | stanza(), stanza_error()) -> state().
-send_error(StateData, Stanza, Error) ->
- Type = xmpp:get_type(Stanza),
- if Type == error; Type == result;
- Type == <<"error">>; Type == <<"result">> ->
- StateData;
- true ->
- send_stanza(StateData, xmpp:make_error(Stanza, Error))
- end.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
--spec send_stanza(state(), xmpp_element()) -> state().
-send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive ->
- csi_filter_stanza(StateData, Stanza);
-send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
- mgmt_queue_add(StateData, Stanza);
-send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
- NewStateData = mgmt_queue_add(StateData, Stanza),
- mgmt_send_stanza(NewStateData, Stanza);
-send_stanza(StateData, Stanza) ->
- send_element(StateData, Stanza),
- StateData.
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec check_bl_c2s({inet:ip_address(), non_neg_integer()}, binary())
+ -> false | {true, binary(), binary()}.
+check_bl_c2s({IP, _Port}, Lang) ->
+ ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]).
--spec send_packet(state(), xmpp_element()) -> state().
-send_packet(StateData, Packet) ->
- case xmpp:is_stanza(Packet) of
- true ->
- send_stanza(StateData, Packet);
- false ->
- send_element(StateData, Packet),
- StateData
+-spec open_session(state(), binary()) -> {ok, state()} | {error, stanza_error(), state()}.
+open_session(#{user := U, server := S, sid := SID,
+ socket := Socket, ip := IP, auth_module := AuthMod,
+ access := Access, lang := Lang} = State, R) ->
+ JID = jid:make(U, S, R),
+ LServer = JID#jid.lserver,
+ case acl:access_matches(Access,
+ #{usr => jid:split(JID), ip => IP},
+ LServer) of
+ allow ->
+ ?INFO_MSG("(~w) Opened session for ~s",
+ [Socket, jid:to_string(JID)]),
+ change_shaper(State),
+ Conn = get_conn_type(State),
+ Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}],
+ ejabberd_sm:open_session(SID, U, LServer, R, Info),
+ State1 = State#{conn => Conn, resource => R, jid => JID},
+ State2 = ejabberd_hooks:run_fold(
+ c2s_session_opened, LServer, State1, []),
+ {ok, State2};
+ deny ->
+ ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]),
+ ?INFO_MSG("(~w) Forbidden session for ~s",
+ [Socket, jid:to_string(JID)]),
+ Txt = <<"Denied by ACL">>,
+ {error, xmpp:err_not_allowed(Txt, Lang), State}
end.
--spec send_header(state(), binary(), binary(), binary()) -> ok | {error, any()}.
-send_header(StateData, Server, Version, Lang) ->
- Header = #xmlel{name = Name, attrs = Attrs} =
- xmpp:encode(#stream_start{version = Version,
- lang = Lang,
- xmlns = ?NS_CLIENT,
- stream_xmlns = ?NS_STREAM,
- id = StateData#state.streamid,
- from = jid:make(Server)}),
- if StateData#state.xml_socket ->
- (StateData#state.sockmod):send_xml(StateData#state.socket,
- {xmlstreamstart, Name, Attrs});
- true ->
- send_text(StateData, fxml:element_to_header(Header))
+-spec process_iq_in(state(), iq()) -> {boolean(), state()}.
+process_iq_in(State, #iq{} = IQ) ->
+ case privacy_check_packet(State, IQ, in) of
+ allow ->
+ {true, State};
+ deny ->
+ route_error(IQ, xmpp:err_service_unavailable()),
+ {false, State}
end.
--spec send_trailer(state()) -> ok | {error, any()}.
-send_trailer(StateData)
- when StateData#state.mgmt_state == pending ->
- ?DEBUG("Cannot send stream trailer while waiting for resumption", []);
-send_trailer(StateData)
- when StateData#state.xml_socket ->
- (StateData#state.sockmod):send_xml(StateData#state.socket,
- {xmlstreamend, <<"stream:stream">>});
-send_trailer(StateData) ->
- send_text(StateData, ?STREAM_TRAILER).
-
--spec new_id() -> binary().
-new_id() -> randoms:get_string().
-
--spec new_uniq_id() -> binary().
-new_uniq_id() ->
- iolist_to_binary([randoms:get_string(),
- integer_to_binary(p1_time_compat:unique_integer([positive]))]).
-
--spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket |
- c2s_compressed_tls | http_bind.
-get_conn_type(StateData) ->
- case (StateData#state.sockmod):get_transport(StateData#state.socket) of
- tcp -> c2s;
- tls -> c2s_tls;
- tcp_zlib -> c2s_compressed;
- tls_zlib -> c2s_compressed_tls;
- http_bind -> http_bind;
- websocket -> websocket
+-spec process_message_in(state(), message()) -> {boolean(), state()}.
+process_message_in(State, #message{type = T} = Msg) ->
+ case privacy_check_packet(State, Msg, in) of
+ allow ->
+ {true, State};
+ deny when T == groupchat; T == headline ->
+ ok;
+ deny ->
+ case xmpp:has_subtag(Msg, #muc_user{}) of
+ true ->
+ ok;
+ false ->
+ route_error(Msg, xmpp:err_service_unavailable())
+ end,
+ {false, State}
end.
--spec process_presence_probe(jid(), jid(), state()) -> ok.
-process_presence_probe(From, To, StateData) ->
- LFrom = jid:tolower(From),
- LBFrom = setelement(3, LFrom, <<"">>),
- case StateData#state.pres_last of
- undefined ->
- ok;
+-spec process_presence_in(state(), presence()) -> {boolean(), state()}.
+process_presence_in(#{server := Server, pres_a := PresA} = State0,
+ #presence{from = From, to = To, type = T} = Pres) ->
+ LServer = jid:nameprep(Server),
+ State = ejabberd_hooks:run_fold(c2s_presence_in, LServer, State0, [Pres]),
+ case T of
+ probe ->
+ NewState = do_some_magic(State, From),
+ route_probe_reply(From, To, NewState),
+ {false, NewState};
+ error ->
+ A = ?SETS:del_element(jid:tolower(From), PresA),
+ {true, State#{pres_a => A}};
_ ->
- Cond = ((?SETS):is_element(LFrom, StateData#state.pres_f)
- orelse
- ((LFrom /= LBFrom) andalso
- (?SETS):is_element(LBFrom, StateData#state.pres_f))),
- if Cond ->
- %% To is the one sending the presence (the probe target)
- Packet = xmpp_util:add_delay_info(
- StateData#state.pres_last, To,
- StateData#state.pres_timestamp),
- case privacy_check_packet(StateData, To, From, Packet, out) of
- deny ->
- ok;
- allow ->
- Pid=element(2, StateData#state.sid),
- ejabberd_hooks:run(presence_probe_hook, StateData#state.server, [From, To, Pid]),
- %% Don't route a presence probe to oneself
- case From == To of
- false ->
- ejabberd_router:route(To, From, Packet);
- true ->
- ok
- end
- end;
- true ->
- ok
+ case privacy_check_packet(State, Pres, in) of
+ allow when T == error ->
+ {true, State};
+ allow ->
+ NewState = do_some_magic(State, From),
+ {true, NewState};
+ deny ->
+ {false, State}
end
end.
-%% User updates his presence (non-directed presence packet)
--spec presence_update(jid(), presence(), state()) -> state().
-presence_update(From, Packet, StateData) ->
- #presence{type = Type} = Packet,
- case Type of
- unavailable ->
- Status = xmpp:get_text(Packet#presence.status),
- Info = [{ip, StateData#state.ip},
- {conn, StateData#state.conn},
- {auth_module, StateData#state.auth_module}],
- ejabberd_sm:unset_presence(StateData#state.sid,
- StateData#state.user,
- StateData#state.server,
- StateData#state.resource, Status, Info),
- presence_broadcast(StateData, From,
- StateData#state.pres_a, Packet),
- StateData#state{pres_last = undefined,
- pres_timestamp = undefined, pres_a = (?SETS):new()};
- error -> StateData;
- probe -> StateData;
- subscribe -> StateData;
- subscribed -> StateData;
- unsubscribe -> StateData;
- unsubscribed -> StateData;
- _ ->
- OldPriority = case StateData#state.pres_last of
- undefined -> 0;
- OldPresence -> get_priority_from_presence(OldPresence)
- end,
- NewPriority = get_priority_from_presence(Packet),
- update_priority(NewPriority, Packet, StateData),
- FromUnavail = (StateData#state.pres_last == undefined),
- ?DEBUG("from unavail = ~p~n", [FromUnavail]),
- NewStateData = StateData#state{pres_last = Packet,
- pres_timestamp = p1_time_compat:timestamp()},
- NewState = if FromUnavail ->
- ejabberd_hooks:run(user_available_hook,
- NewStateData#state.server,
- [NewStateData#state.jid]),
- ResentStateData = if NewPriority >= 0 ->
- resend_offline_messages(NewStateData),
- resend_subscription_requests(NewStateData);
- true -> NewStateData
- end,
- presence_broadcast_first(From, ResentStateData,
- Packet);
+-spec route_probe_reply(jid(), jid(), state()) -> ok.
+route_probe_reply(From, To, #{server := Server, pres_f := PresF,
+ pres_last := LastPres,
+ pres_timestamp := TS} = State) ->
+ LFrom = jid:tolower(From),
+ LBFrom = jid:remove_resource(LFrom),
+ case ?SETS:is_element(LFrom, PresF)
+ orelse ?SETS:is_element(LBFrom, PresF) of
+ true ->
+ %% To is my JID
+ Packet = xmpp_util:add_delay_info(LastPres, To, TS),
+ case privacy_check_packet(State, Packet, out) of
+ deny ->
+ ok;
+ allow ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run(presence_probe_hook,
+ LServer,
+ [From, To, self()]),
+ %% Don't route a presence probe to oneself
+ case From == To of
+ false ->
+ route(xmpp:set_from_to(Packet, To, From));
true ->
- presence_broadcast_to_trusted(NewStateData, From,
- NewStateData#state.pres_f,
- NewStateData#state.pres_a,
- Packet),
- if OldPriority < 0, NewPriority >= 0 ->
- resend_offline_messages(NewStateData);
- true -> ok
- end,
- NewStateData
- end,
- NewState
- end.
+ ok
+ end
+ end;
+ false ->
+ ok
+ end;
+route_probe_reply(_, _, _) ->
+ ok.
-%% User sends a directed presence packet
--spec presence_track(jid(), jid(), presence(), state()) -> state().
-presence_track(From, To, Packet, StateData) ->
- #presence{type = Type} = Packet,
+-spec process_presence_out(state(), presence()) -> next_state().
+process_presence_out(#{user := User, server := Server,
+ lang := Lang, pres_a := PresA} = State,
+ #presence{from = From, to = To, type = Type} = Pres) ->
+ LServer = jid:nameprep(Server),
LTo = jid:tolower(To),
- User = StateData#state.user,
- Server = StateData#state.server,
- Lang = StateData#state.lang,
- case privacy_check_packet(StateData, From, To, Packet, out) of
+ case privacy_check_packet(State, Pres, out) of
deny ->
ErrText = <<"Your active privacy list has denied "
"the routing of this stanza.">>,
Err = xmpp:err_not_acceptable(ErrText, Lang),
- send_error(StateData, xmpp:set_from_to(Packet, From, To), Err);
+ xmpp_stream_in:send_error(State, Pres, Err);
allow when Type == subscribe; Type == subscribed;
Type == unsubscribe; Type == unsubscribed ->
- Access = gen_mod:get_module_opt(Server, mod_roster, access,
+ Access = gen_mod:get_module_opt(LServer, mod_roster, access,
fun(A) when is_atom(A) -> A end,
all),
MyBareJID = jid:make(User, Server, <<"">>),
- case acl:match_rule(Server, Access, MyBareJID) of
+ case acl:match_rule(LServer, Access, MyBareJID) of
deny ->
ErrText = <<"Denied by ACL">>,
Err = xmpp:err_forbidden(ErrText, Lang),
- send_error(StateData, xmpp:set_from_to(Packet, From, To), Err);
+ xmpp_stream_in:send_error(State, Pres, Err);
allow ->
ejabberd_hooks:run(roster_out_subscription,
- Server,
+ LServer,
[User, Server, To, Type]),
- ejabberd_router:route(jid:remove_resource(From), To, Packet),
- StateData
+ BareFrom = jid:remove_resource(From),
+ route(xmpp:set_from_to(Pres, BareFrom, To)),
+ {noreply, State}
end;
allow when Type == error; Type == probe ->
- ejabberd_router:route(From, To, Packet),
- StateData;
+ route(Pres),
+ {noreply, State};
allow ->
- ejabberd_router:route(From, To, Packet),
+ route(Pres),
A = case Type of
- available ->
- ?SETS:add_element(LTo, StateData#state.pres_a);
- unavailable ->
- ?SETS:del_element(LTo, StateData#state.pres_a)
+ available -> ?SETS:add_element(LTo, PresA);
+ unavailable -> ?SETS:del_element(LTo, PresA)
end,
- StateData#state{pres_a = A}
+ {noreply, State#{pres_a => A}}
end.
--spec check_privacy_route(jid(), state(), jid(), jid(), stanza()) -> state().
-check_privacy_route(From, StateData, FromRoute, To,
- Packet) ->
- case privacy_check_packet(StateData, From, To, Packet,
- out) of
+-spec process_self_presence(state(), presence()) -> {noreply, state()}.
+process_self_presence(#{ip := IP, conn := Conn,
+ auth_module := AuthMod, sid := SID,
+ user := U, server := S, resource := R} = State,
+ #presence{type = unavailable} = Pres) ->
+ Status = xmpp:get_text(Pres#presence.status),
+ Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}],
+ ejabberd_sm:unset_presence(SID, U, S, R, Status, Info),
+ State1 = broadcast_presence_unavailable(State, Pres),
+ State2 = maps:remove(pres_last, maps:remove(pres_timestamp, State1)),
+ {noreply, State2};
+process_self_presence(#{server := Server} = State,
+ #presence{type = available} = Pres) ->
+ LServer = jid:nameprep(Server),
+ PreviousPres = maps:get(pres_last, State, undefined),
+ update_priority(State, Pres),
+ State1 = ejabberd_hooks:run_fold(user_available_hook, LServer, State, [Pres]),
+ State2 = State1#{pres_last => Pres,
+ pres_timestamp => p1_time_compat:timestamp()},
+ FromUnavailable = PreviousPres == undefined,
+ State3 = broadcast_presence_available(State2, Pres, FromUnavailable),
+ {noreply, State3};
+process_self_presence(State, _Pres) ->
+ {noreply, State}.
+
+-spec update_priority(state(), presence()) -> ok.
+update_priority(#{ip := IP, conn := Conn, auth_module := AuthMod,
+ sid := SID, user := U, server := S, resource := R},
+ Pres) ->
+ Priority = get_priority_from_presence(Pres),
+ Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}],
+ ejabberd_sm:set_presence(SID, U, S, R, Priority, Pres, Info).
+
+-spec broadcast_presence_unavailable(state(), presence()) -> state().
+broadcast_presence_unavailable(#{pres_a := PresA} = State, Pres) ->
+ JIDs = filter_blocked(State, Pres, PresA),
+ route_multiple(State, JIDs, Pres),
+ State#{pres_a => ?SETS:new()}.
+
+-spec broadcast_presence_available(state(), presence(), boolean()) -> state().
+broadcast_presence_available(#{pres_a := PresA, pres_f := PresF,
+ pres_t := PresT} = State,
+ Pres, _FromUnavailable = true) ->
+ Probe = #presence{type = probe},
+ TJIDs = filter_blocked(State, Probe, PresT),
+ FJIDs = filter_blocked(State, Pres, PresF),
+ route_multiple(State, TJIDs, Probe),
+ route_multiple(State, FJIDs, Pres),
+ State#{pres_a => ?SETS:union(PresA, PresF)};
+broadcast_presence_available(#{pres_a := PresA, pres_f := PresF} = State,
+ Pres, _FromUnavailable = false) ->
+ JIDs = filter_blocked(State, Pres, ?SETS:intersection(PresA, PresF)),
+ route_multiple(State, JIDs, Pres),
+ State.
+
+-spec check_privacy_then_route(state(), stanza()) -> next_state().
+check_privacy_then_route(#{lang := Lang} = State, Pkt) ->
+ case privacy_check_packet(State, Pkt, out) of
deny ->
- Lang = StateData#state.lang,
ErrText = <<"Your active privacy list has denied "
"the routing of this stanza.">>,
Err = xmpp:err_not_acceptable(ErrText, Lang),
- send_error(StateData, xmpp:set_from_to(Packet, From, To), Err);
+ xmpp_stream_in:send_error(State, Pkt, Err);
allow ->
- ejabberd_router:route(FromRoute, To, Packet),
- StateData
- end.
-
-%% Check if privacy rules allow this delivery
--spec privacy_check_packet(state(), jid(), jid(), stanza(), in | out) -> allow | deny.
-privacy_check_packet(StateData, From, To, Packet,
- Dir) ->
- ejabberd_hooks:run_fold(privacy_check_packet,
- StateData#state.server, allow,
- [StateData#state.user, StateData#state.server,
- StateData#state.privacy_list, {From, To, Packet},
- Dir]).
-
--spec is_privacy_allow(state(), jid(), jid(), stanza(), in | out) -> boolean().
-is_privacy_allow(StateData, From, To, Packet, Dir) ->
- allow ==
- privacy_check_packet(StateData, From, To, Packet, Dir).
-
-%% Send presence when disconnecting
--spec presence_broadcast(state(), jid(), ?SETS:set(), presence()) -> ok.
-presence_broadcast(StateData, From, JIDSet, Packet) ->
- JIDs = ?SETS:to_list(JIDSet),
- JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out),
- Server = StateData#state.server,
- send_multiple(From, Server, JIDs2, Packet).
-
--spec presence_broadcast_to_trusted(
- state(), jid(), ?SETS:set(), ?SETS:set(), presence()) -> ok.
-%% Send presence when updating presence
-presence_broadcast_to_trusted(StateData, From, Trusted, JIDSet, Packet) ->
- JIDs = ?SETS:to_list(?SETS:intersection(Trusted, JIDSet)),
- JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out),
- Server = StateData#state.server,
- send_multiple(From, Server, JIDs2, Packet).
-
-%% Send presence when connecting
--spec presence_broadcast_first(jid(), state(), presence()) -> state().
-presence_broadcast_first(From, StateData, Packet) ->
- JIDsProbe =
- ?SETS:fold(
- fun(JID, L) -> [JID | L] end,
- [],
- StateData#state.pres_t),
- PacketProbe = #presence{type = probe},
- JIDs2Probe = format_and_check_privacy(From, StateData, PacketProbe, JIDsProbe, out),
- Server = StateData#state.server,
- send_multiple(From, Server, JIDs2Probe, PacketProbe),
- {As, JIDs} =
- ?SETS:fold(
- fun(JID, {A, JID_list}) ->
- {?SETS:add_element(JID, A), JID_list++[JID]}
- end,
- {StateData#state.pres_a, []},
- StateData#state.pres_f),
- JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out),
- send_multiple(From, Server, JIDs2, Packet),
- StateData#state{pres_a = As}.
-
--spec format_and_check_privacy(
- jid(), state(), stanza(), [ljid()], in | out) -> [jid()].
-format_and_check_privacy(From, StateData, Packet, JIDs, Dir) ->
- FJIDs = [jid:make(JID) || JID <- JIDs],
- lists:filter(
- fun(FJID) ->
- case ejabberd_hooks:run_fold(
- privacy_check_packet, StateData#state.server,
- allow,
- [StateData#state.user,
- StateData#state.server,
- StateData#state.privacy_list,
- {From, FJID, Packet},
- Dir]) of
- deny -> false;
- allow -> true
- end
- end,
- FJIDs).
-
--spec send_multiple(jid(), binary(), [jid()], stanza()) -> ok.
-send_multiple(From, Server, JIDs, Packet) ->
- ejabberd_router_multicast:route_multicast(From, Server, JIDs, Packet).
-
--spec roster_change(jid(), both | from | none | remove | to, state()) -> state().
-roster_change(IJID, ISubscription, StateData) ->
- LIJID = jid:tolower(IJID),
- IsFrom = (ISubscription == both) or (ISubscription == from),
- IsTo = (ISubscription == both) or (ISubscription == to),
- OldIsFrom = (?SETS):is_element(LIJID, StateData#state.pres_f),
- FSet = if
- IsFrom -> (?SETS):add_element(LIJID, StateData#state.pres_f);
- true -> ?SETS:del_element(LIJID, StateData#state.pres_f)
- end,
- TSet = if
- IsTo -> (?SETS):add_element(LIJID, StateData#state.pres_t);
- true -> ?SETS:del_element(LIJID, StateData#state.pres_t)
- end,
- case StateData#state.pres_last of
- undefined ->
- StateData#state{pres_f = FSet, pres_t = TSet};
- P ->
- ?DEBUG("roster changed for ~p~n",
- [StateData#state.user]),
- From = StateData#state.jid,
- To = jid:make(IJID),
- Cond1 = IsFrom andalso not OldIsFrom,
- Cond2 = not IsFrom andalso OldIsFrom andalso
- ((?SETS):is_element(LIJID, StateData#state.pres_a)),
- if Cond1 ->
- ?DEBUG("C1: ~p~n", [LIJID]),
- case privacy_check_packet(StateData, From, To, P, out)
- of
- deny -> ok;
- allow -> ejabberd_router:route(From, To, P)
- end,
- A = (?SETS):add_element(LIJID, StateData#state.pres_a),
- StateData#state{pres_a = A, pres_f = FSet,
- pres_t = TSet};
- Cond2 ->
- ?DEBUG("C2: ~p~n", [LIJID]),
- PU = #presence{type = unavailable},
- case privacy_check_packet(StateData, From, To, PU, out)
- of
- deny -> ok;
- allow -> ejabberd_router:route(From, To, PU)
- end,
- A = ?SETS:del_element(LIJID, StateData#state.pres_a),
- StateData#state{pres_a = A, pres_f = FSet,
- pres_t = TSet};
- true -> StateData#state{pres_f = FSet, pres_t = TSet}
- end
+ route(Pkt),
+ {noreply, State}
end.
--spec update_priority(integer(), presence(), state()) -> ok.
-update_priority(Priority, Packet, StateData) ->
- Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn},
- {auth_module, StateData#state.auth_module}],
- ejabberd_sm:set_presence(StateData#state.sid,
- StateData#state.user, StateData#state.server,
- StateData#state.resource, Priority, Packet, Info).
+-spec privacy_check_packet(state(), stanza(), in | out) -> allow | deny.
+privacy_check_packet(#{server := Server} = State, Pkt, Dir) ->
+ LServer = jid:nameprep(Server),
+ ejabberd_hooks:run_fold(privacy_check_packet, LServer, allow, [State, Pkt, Dir]).
-spec get_priority_from_presence(presence()) -> integer().
get_priority_from_presence(#presence{priority = Prio}) ->
@@ -2021,817 +554,129 @@ get_priority_from_presence(#presence{priority = Prio}) ->
_ -> Prio
end.
--spec process_privacy_iq(iq(), state()) -> state().
-process_privacy_iq(#iq{from = From, to = To,
- type = Type, lang = Lang} = IQ, StateData) ->
- Txt = <<"No module is handling this query">>,
- {Res, NewStateData} =
- case Type of
- get ->
- R = ejabberd_hooks:run_fold(
- privacy_iq_get,
- StateData#state.server,
- {error, xmpp:err_feature_not_implemented(Txt, Lang)},
- [IQ, StateData#state.privacy_list]),
- {R, StateData};
- set ->
- case ejabberd_hooks:run_fold(
- privacy_iq_set,
- StateData#state.server,
- {error, xmpp:err_feature_not_implemented(Txt, Lang)},
- [IQ, StateData#state.privacy_list])
- of
- {result, R, NewPrivList} ->
- {{result, R},
- StateData#state{privacy_list =
- NewPrivList}};
- R -> {R, StateData}
- end
- end,
- IQRes = case Res of
- {result, Result} ->
- xmpp:make_iq_result(IQ, Result);
- {error, Error} ->
- xmpp:make_error(IQ, Error)
- end,
- ejabberd_router:route(To, From, IQRes),
- NewStateData.
+-spec filter_blocked(state(), presence(), ?SETS:set()) -> [jid()].
+filter_blocked(#{user := U, server := S, resource := R} = State,
+ Pres, LJIDSet) ->
+ From = jid:make(U, S, R),
+ ?SETS:fold(
+ fun(LJID, Acc) ->
+ To = jid:make(LJID),
+ Pkt = xmpp:set_from_to(Pres, From, To),
+ case privacy_check_packet(State, Pkt, out) of
+ allow -> [To|Acc];
+ deny -> Acc
+ end
+ end, [], LJIDSet).
+
+-spec route(stanza()) -> ok.
+route(Pkt) ->
+ From = xmpp:get_from(Pkt),
+ To = xmpp:get_to(Pkt),
+ ejabberd_router:route(From, To, Pkt).
+
+-spec route_error(stanza(), stanza_error()) -> ok.
+route_error(Pkt, Err) ->
+ From = xmpp:get_from(Pkt),
+ To = xmpp:get_to(Pkt),
+ ejabberd_router:route_error(To, From, Pkt, Err).
+
+-spec route_multiple(state(), [jid()], stanza()) -> ok.
+route_multiple(#{server := Server}, JIDs, Pkt) ->
+ LServer = jid:nameprep(Server),
+ From = xmpp:get_from(Pkt),
+ ejabberd_router_multicast:route_multicast(From, LServer, JIDs, Pkt).
--spec resend_offline_messages(state()) -> ok.
-resend_offline_messages(#state{ask_offline = true} = StateData) ->
- case ejabberd_hooks:run_fold(resend_offline_messages_hook,
- StateData#state.server, [],
- [StateData#state.user, StateData#state.server])
- of
- Rs -> %%when is_list(Rs) ->
- lists:foreach(fun ({route, From, To, Packet}) ->
- Pass = case privacy_check_packet(StateData,
- From, To,
- Packet, in)
- of
- allow -> true;
- deny -> false
- end,
- if Pass ->
- ejabberd_router:route(From, To, Packet);
- true -> ok
- end
- end,
- Rs)
- end;
-resend_offline_messages(_StateData) ->
- ok.
-
--spec resend_subscription_requests(state()) -> state().
-resend_subscription_requests(#state{user = User,
- server = Server} = StateData) ->
- PendingSubscriptions =
- ejabberd_hooks:run_fold(resend_subscription_requests_hook,
- Server, [], [User, Server]),
- lists:foldl(fun (XMLPacket, AccStateData) ->
- send_packet(AccStateData, XMLPacket)
+-spec resource_conflict_action(binary(), binary(), binary()) ->
+ {accept_resource, binary()} | closenew.
+resource_conflict_action(U, S, R) ->
+ OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of
+ true ->
+ ejabberd_config:get_option(
+ {resource_conflict, S},
+ fun(setresource) -> setresource;
+ (closeold) -> closeold;
+ (closenew) -> closenew;
+ (acceptnew) -> acceptnew
+ end);
+ false ->
+ acceptnew
end,
- StateData,
- PendingSubscriptions).
-
--spec get_showtag(undefined | presence()) -> binary().
-get_showtag(undefined) -> <<"unavailable">>;
-get_showtag(#presence{show = undefined}) -> <<"available">>;
-get_showtag(#presence{show = Show}) -> atom_to_binary(Show, utf8).
-
--spec get_statustag(undefined | presence()) -> binary().
-get_statustag(#presence{status = Status}) -> xmpp:get_text(Status);
-get_statustag(undefined) -> <<"">>.
-
--spec process_unauthenticated_stanza(state(), iq()) -> ok | {error, any()}.
-process_unauthenticated_stanza(StateData, #iq{type = T, lang = L} = IQ)
- when T == set; T == get ->
- Lang = if L == undefined; L == <<"">> -> StateData#state.lang;
- true -> L
- end,
- NewIQ = IQ#iq{lang = Lang},
- Res = ejabberd_hooks:run_fold(c2s_unauthenticated_iq,
- StateData#state.server, empty,
- [StateData#state.server, NewIQ,
- StateData#state.ip]),
- case Res of
- empty ->
- Txt = <<"Authentication required">>,
- Err0 = xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)),
- Err1 = Err0#iq{from = jid:make(<<>>, StateData#state.server, <<>>),
- to = undefined},
- send_element(StateData, Err1);
- _ ->
- send_element(StateData, Res)
- end;
-process_unauthenticated_stanza(_StateData, _) ->
- %% Drop any stanza, which isn't IQ stanza
- ok.
-
--spec peerip(ejabberd_socket:sockmod(),
- ejabberd_socket:socket()) ->
- {inet:ip_address(), non_neg_integer()} | undefined.
-peerip(SockMod, Socket) ->
- IP = case SockMod of
- gen_tcp -> inet:peername(Socket);
- _ -> SockMod:peername(Socket)
- end,
- case IP of
- {ok, IPOK} -> IPOK;
- _ -> undefined
- end.
-
-%% fsm_next_state_pack: Pack the StateData structure to improve
-%% sharing.
--spec fsm_next_state_pack(state_name(), state()) -> fsm_transition().
-fsm_next_state_pack(StateName, StateData) ->
- fsm_next_state_gc(StateName, pack(StateData)).
-
--spec fsm_next_state_gc(state_name(), state()) -> fsm_transition().
-%% fsm_next_state_gc: Garbage collect the process heap to make use of
-%% the newly packed StateData structure.
-fsm_next_state_gc(StateName, PackedStateData) ->
- erlang:garbage_collect(),
- fsm_next_state(StateName, PackedStateData).
-
-%% fsm_next_state: Generate the next_state FSM tuple with different
-%% timeout, depending on the future state
--spec fsm_next_state(state_name(), state()) -> fsm_transition().
-fsm_next_state(session_established, #state{mgmt_max_queue = exceeded} =
- StateData) ->
- ?WARNING_MSG("ACK queue too long, terminating session for ~s",
- [jid:to_string(StateData#state.jid)]),
- Err = xmpp:serr_policy_violation(<<"Too many unacked stanzas">>,
- StateData#state.lang),
- send_element(StateData, Err),
- {stop, normal, StateData#state{mgmt_resend = false}};
-fsm_next_state(session_established, #state{mgmt_state = pending} = StateData) ->
- fsm_next_state(wait_for_resume, StateData);
-fsm_next_state(session_established, StateData) ->
- {next_state, session_established, StateData,
- ?C2S_HIBERNATE_TIMEOUT};
-fsm_next_state(wait_for_resume, #state{mgmt_timeout = 0} = StateData) ->
- {stop, normal, StateData};
-fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined,
- sid = SID, jid = JID, ip = IP,
- conn = Conn, auth_module = AuthModule,
- server = Host} = StateData) ->
- case StateData of
- #state{mgmt_ack_timer = undefined} ->
- ok;
- #state{mgmt_ack_timer = Timer} ->
- erlang:cancel_timer(Timer)
- end,
- ?INFO_MSG("Waiting for resumption of stream for ~s",
- [jid:to_string(JID)]),
- Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
- NewStateData = ejabberd_hooks:run_fold(c2s_session_pending, Host, StateData,
- [SID, JID, Info]),
- {next_state, wait_for_resume,
- NewStateData#state{mgmt_state = pending,
- mgmt_pending_since = os:timestamp()},
- NewStateData#state.mgmt_timeout};
-fsm_next_state(wait_for_resume, StateData) ->
- Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
- Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
- {next_state, wait_for_resume, StateData, Timeout};
-fsm_next_state(StateName, StateData) ->
- {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
-
-%% fsm_reply: Generate the reply FSM tuple with different timeout,
-%% depending on the future state
--spec fsm_reply(_, state_name(), state()) -> fsm_reply().
-fsm_reply(Reply, session_established, StateData) ->
- {reply, Reply, session_established, StateData,
- ?C2S_HIBERNATE_TIMEOUT};
-fsm_reply(Reply, wait_for_resume, StateData) ->
- Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
- Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
- {reply, Reply, wait_for_resume, StateData, Timeout};
-fsm_reply(Reply, StateName, StateData) ->
- {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
-
-%% Used by c2s blacklist plugins
--spec is_ip_blacklisted(undefined | {inet:ip_address(), non_neg_integer()},
- binary()) -> false | {true, binary(), binary()}.
-is_ip_blacklisted(undefined, _Lang) -> false;
-is_ip_blacklisted({IP, _Port}, Lang) ->
- ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]).
-
-%% Check from attributes
-%% returns invalid-from|NewElement
--spec check_from(stanza(), jid()) -> 'invalid-from' | stanza().
-check_from(Pkt, FromJID) ->
- JID = xmpp:get_from(Pkt),
- case JID of
- undefined ->
- Pkt;
- #jid{} ->
- if
- (JID#jid.luser == FromJID#jid.luser) and
- (JID#jid.lserver == FromJID#jid.lserver) and
- (JID#jid.lresource == FromJID#jid.lresource) ->
- Pkt;
- (JID#jid.luser == FromJID#jid.luser) and
- (JID#jid.lserver == FromJID#jid.lserver) and
- (JID#jid.lresource == <<"">>) ->
- Pkt;
- true ->
- 'invalid-from'
- end
- end.
-
-fsm_limit_opts(Opts) ->
- case lists:keysearch(max_fsm_queue, 1, Opts) of
- {value, {_, N}} when is_integer(N) -> [{max_queue, N}];
- _ ->
- case ejabberd_config:get_option(
- max_fsm_queue,
- fun(I) when is_integer(I), I > 0 -> I end) of
- undefined -> [];
- N -> [{max_queue, N}]
- end
+ Option = case OptionRaw of
+ setresource -> setresource;
+ closeold ->
+ acceptnew; %% ejabberd_sm will close old session
+ closenew -> closenew;
+ acceptnew -> acceptnew;
+ _ -> acceptnew %% default ejabberd behavior
+ end,
+ case Option of
+ acceptnew -> {accept_resource, R};
+ closenew -> closenew;
+ setresource ->
+ Rnew = new_uniq_id(),
+ {accept_resource, Rnew}
end.
--spec bounce_messages() -> ok.
-bounce_messages() ->
- receive
- {route, From, To, El} ->
- ejabberd_router:route(From, To, El), bounce_messages()
- after 0 -> ok
- end.
+-spec new_uniq_id() -> binary().
+new_uniq_id() ->
+ iolist_to_binary(
+ [randoms:get_string(),
+ integer_to_binary(p1_time_compat:unique_integer([positive]))]).
--spec process_compression_request(compress(), state_name(), state()) -> fsm_next().
-process_compression_request(#compress{methods = []}, StateName, StateData) ->
- send_element(StateData, #compress_failure{reason = 'setup-failed'}),
- fsm_next_state(StateName, StateData);
-process_compression_request(#compress{methods = Ms}, StateName, StateData) ->
- case lists:member(<<"zlib">>, Ms) of
- true ->
- Socket = StateData#state.socket,
- BCompressed = fxml:element_to_binary(xmpp:encode(#compressed{})),
- ZlibSocket = (StateData#state.sockmod):compress(Socket, BCompressed),
- fsm_next_state(wait_for_stream,
- StateData#state{socket = ZlibSocket,
- streamid = new_id()});
- false ->
- send_element(StateData,
- #compress_failure{reason = 'unsupported-method'}),
- fsm_next_state(StateName, StateData)
+-spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket |
+ c2s_compressed_tls | http_bind.
+get_conn_type(State) ->
+ case xmpp_stream_in:get_transport(State) of
+ tcp -> c2s;
+ tls -> c2s_tls;
+ tcp_zlib -> c2s_compressed;
+ tls_zlib -> c2s_compressed_tls;
+ http_bind -> http_bind;
+ websocket -> websocket
end.
-%%%----------------------------------------------------------------------
-%%% XEP-0191
-%%%----------------------------------------------------------------------
-
--spec route_blocking(
- {block, [jid()]} | {unblock, [jid()]} | unblock_all, state()) -> state().
-route_blocking(What, StateData) ->
- SubEl = case What of
- {block, JIDs} ->
- #block{items = JIDs};
- {unblock, JIDs} ->
- #unblock{items = JIDs};
- unblock_all ->
- #unblock{}
- end,
- PrivPushIQ = #iq{type = set, id = <<"push">>, sub_els = [SubEl],
- from = jid:remove_resource(StateData#state.jid),
- to = StateData#state.jid},
- %% No need to replace active privacy list here,
- %% blocking pushes are always accompanied by
- %% Privacy List pushes
- send_stanza(StateData, PrivPushIQ).
-
-%%%----------------------------------------------------------------------
-%%% XEP-0198
-%%%----------------------------------------------------------------------
--spec stream_mgmt_enabled(state()) -> boolean().
-stream_mgmt_enabled(#state{mgmt_state = disabled}) ->
- false;
-stream_mgmt_enabled(_StateData) ->
- true.
-
--spec dispatch_stream_mgmt(xmpp_element(), state()) -> state().
-dispatch_stream_mgmt(El, #state{mgmt_state = MgmtState} = StateData)
- when MgmtState == active;
- MgmtState == pending ->
- perform_stream_mgmt(El, StateData);
-dispatch_stream_mgmt(El, StateData) ->
- negotiate_stream_mgmt(El, StateData).
-
--spec negotiate_stream_mgmt(xmpp_element(), state()) -> state().
-negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
- %% XEP-0198 says: "For client-to-server connections, the client MUST NOT
- %% attempt to enable stream management until after it has completed Resource
- %% Binding unless it is resuming a previous session". However, it also
- %% says: "Stream management errors SHOULD be considered recoverable", so we
- %% won't bail out.
- send_element(StateData, #sm_failed{reason = 'unexpected-request',
- xmlns = ?NS_STREAM_MGMT_3}),
- StateData;
-negotiate_stream_mgmt(Pkt, StateData) ->
- Xmlns = xmpp:get_ns(Pkt),
- case stream_mgmt_enabled(StateData) of
+-spec change_shaper(state()) -> ok.
+change_shaper(#{shaper := ShaperName, ip := IP,
+ user := U, server := S, resource := R} = State) ->
+ #jid{lserver = LServer} = JID = jid:make(U, S, R),
+ Shaper = acl:access_matches(ShaperName,
+ #{usr => jid:split(JID), ip => IP},
+ LServer),
+ xmpp_stream_in:change_shaper(State, Shaper).
+
+-spec do_some_magic(state(), jid()) -> state().
+do_some_magic(#{pres_a := PresA, pres_f := PresF} = State, From) ->
+ LFrom = jid:tolower(From),
+ LBFrom = jid:remove_resource(LFrom),
+ case (?SETS):is_element(LFrom, PresA) orelse
+ (?SETS):is_element(LBFrom, PresA) of
true ->
- case Pkt of
- #sm_enable{} ->
- handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Pkt);
- _ ->
- Res = if is_record(Pkt, sm_a);
- is_record(Pkt, sm_r);
- is_record(Pkt, sm_resume) ->
- #sm_failed{reason = 'unexpected-request',
- xmlns = Xmlns};
- true ->
- #sm_failed{reason = 'bad-request',
- xmlns = Xmlns}
- end,
- send_element(StateData, Res),
- StateData
- end;
+ State;
false ->
- send_element(StateData,
- #sm_failed{reason = 'service-unavailable',
- xmlns = Xmlns}),
- StateData
- end.
-
--spec perform_stream_mgmt(xmpp_element(), state()) -> state().
-perform_stream_mgmt(Pkt, StateData) ->
- case xmpp:get_ns(Pkt) of
- Xmlns when Xmlns == StateData#state.mgmt_xmlns ->
- case Pkt of
- #sm_r{} ->
- handle_r(StateData);
- #sm_a{} ->
- handle_a(StateData, Pkt);
- _ ->
- Res = if is_record(Pkt, sm_enable);
- is_record(Pkt, sm_resume) ->
- #sm_failed{reason = 'unexpected-request',
- xmlns = Xmlns};
- true ->
- #sm_failed{reason = 'bad-request',
- xmlns = Xmlns}
- end,
- send_element(StateData, Res),
- StateData
- end;
- _ ->
- send_element(StateData,
- #sm_failed{reason = 'unsupported-version',
- xmlns = StateData#state.mgmt_xmlns})
- end.
-
--spec handle_enable(state(), sm_enable()) -> state().
-handle_enable(#state{mgmt_timeout = DefaultTimeout,
- mgmt_max_timeout = MaxTimeout} = StateData,
- #sm_enable{resume = Resume, max = Max}) ->
- Timeout = if Resume == false ->
- 0;
- Max /= undefined, Max > 0, Max =< MaxTimeout ->
- Max;
- true ->
- DefaultTimeout
- end,
- Res = if Timeout > 0 ->
- ?INFO_MSG("Stream management with resumption enabled for ~s",
- [jid:to_string(StateData#state.jid)]),
- #sm_enabled{xmlns = StateData#state.mgmt_xmlns,
- id = make_resume_id(StateData),
- resume = true,
- max = Timeout};
- true ->
- ?INFO_MSG("Stream management without resumption enabled for ~s",
- [jid:to_string(StateData#state.jid)]),
- #sm_enabled{xmlns = StateData#state.mgmt_xmlns}
- end,
- send_element(StateData, Res),
- StateData#state{mgmt_state = active,
- mgmt_queue = queue:new(),
- mgmt_timeout = Timeout * 1000}.
-
--spec handle_r(state()) -> state().
-handle_r(StateData) ->
- Res = #sm_a{xmlns = StateData#state.mgmt_xmlns,
- h = StateData#state.mgmt_stanzas_in},
- send_element(StateData, Res),
- StateData.
-
--spec handle_a(state(), sm_a()) -> state().
-handle_a(StateData, #sm_a{h = H}) ->
- NewStateData = check_h_attribute(StateData, H),
- maybe_renew_ack_request(NewStateData).
-
--spec handle_resume(state(), sm_resume()) -> {ok, state()} | error.
-handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
- R = case stream_mgmt_enabled(StateData) of
- true ->
- case inherit_session_state(StateData, PrevID) of
- {ok, InheritedState, Info} ->
- {ok, InheritedState, Info, H};
- {error, Err, InH} ->
- {error, #sm_failed{reason = 'item-not-found',
- h = InH, xmlns = Xmlns}, Err};
- {error, Err} ->
- {error, #sm_failed{reason = 'item-not-found',
- xmlns = Xmlns}, Err}
- end;
- false ->
- {error, #sm_failed{reason = 'service-unavailable',
- xmlns = Xmlns},
- <<"XEP-0198 disabled">>}
- end,
- case R of
- {ok, ResumedState, ResumedInfo, NumHandled} ->
- NewState = check_h_attribute(ResumedState, NumHandled),
- AttrXmlns = NewState#state.mgmt_xmlns,
- AttrId = make_resume_id(NewState),
- AttrH = NewState#state.mgmt_stanzas_in,
- send_element(NewState, #sm_resumed{xmlns = AttrXmlns,
- h = AttrH,
- previd = AttrId}),
- SendFun = fun(_F, _T, El, Time) ->
- NewEl = add_resent_delay_info(NewState, El, Time),
- send_element(NewState, NewEl)
- end,
- handle_unacked_stanzas(NewState, SendFun),
- send_element(NewState, #sm_r{xmlns = AttrXmlns}),
- NewState1 = csi_flush_queue(NewState),
- NewState2 = ejabberd_hooks:run_fold(c2s_session_resumed,
- StateData#state.server,
- NewState1,
- [NewState1#state.sid,
- NewState1#state.jid,
- ResumedInfo]),
- ?INFO_MSG("Resumed session for ~s",
- [jid:to_string(NewState2#state.jid)]),
- {ok, NewState2};
- {error, El, Msg} ->
- send_element(StateData, El),
- ?INFO_MSG("Cannot resume session for ~s@~s: ~s",
- [StateData#state.user, StateData#state.server, Msg]),
- error
- end.
-
--spec check_h_attribute(state(), non_neg_integer()) -> state().
-check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H)
- when H > NumStanzasOut ->
- ?DEBUG("~s acknowledged ~B stanzas, but only ~B were sent",
- [jid:to_string(StateData#state.jid), H, NumStanzasOut]),
- mgmt_queue_drop(StateData#state{mgmt_stanzas_out = H}, NumStanzasOut);
-check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) ->
- ?DEBUG("~s acknowledged ~B of ~B stanzas",
- [jid:to_string(StateData#state.jid), H, NumStanzasOut]),
- mgmt_queue_drop(StateData, H).
-
--spec update_num_stanzas_in(state(), xmpp_element()) -> state().
-update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El)
- when MgmtState == active;
- MgmtState == pending ->
- NewNum = case {xmpp:is_stanza(El), StateData#state.mgmt_stanzas_in} of
- {true, 4294967295} ->
- 0;
- {true, Num} ->
- Num + 1;
- {false, Num} ->
- Num
- end,
- StateData#state{mgmt_stanzas_in = NewNum};
-update_num_stanzas_in(StateData, _El) ->
- StateData.
-
-mgmt_send_stanza(StateData, Stanza) ->
- case send_element(StateData, Stanza) of
- ok ->
- maybe_request_ack(StateData);
- _ ->
- StateData#state{mgmt_state = pending}
- end.
-
-maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) ->
- request_ack(StateData);
-maybe_request_ack(StateData) ->
- StateData.
-
-request_ack(#state{mgmt_xmlns = Xmlns,
- mgmt_ack_timeout = AckTimeout} = StateData) ->
- AckReq = #sm_r{xmlns = Xmlns},
- case {send_element(StateData, AckReq), AckTimeout} of
- {ok, undefined} ->
- ok;
- {ok, Timeout} ->
- Timer = erlang:send_after(Timeout, self(), close),
- StateData#state{mgmt_ack_timer = Timer,
- mgmt_stanzas_req = StateData#state.mgmt_stanzas_out};
- _ ->
- StateData#state{mgmt_state = pending}
- end.
-
-maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) ->
- StateData;
-maybe_renew_ack_request(#state{mgmt_ack_timer = Timer,
- mgmt_queue = Queue,
- mgmt_stanzas_out = NumStanzasOut,
- mgmt_stanzas_req = NumStanzasReq} = StateData) ->
- erlang:cancel_timer(Timer),
- case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of
- true ->
- request_ack(StateData#state{mgmt_ack_timer = undefined});
- false ->
- StateData#state{mgmt_ack_timer = undefined}
- end.
-
--spec mgmt_queue_add(state(), xmpp_element()) -> state().
-mgmt_queue_add(StateData, El) ->
- NewNum = case StateData#state.mgmt_stanzas_out of
- 4294967295 ->
- 0;
- Num ->
- Num + 1
- end,
- NewQueue = queue:in({NewNum, p1_time_compat:timestamp(), El}, StateData#state.mgmt_queue),
- NewState = StateData#state{mgmt_queue = NewQueue,
- mgmt_stanzas_out = NewNum},
- check_queue_length(NewState).
-
--spec mgmt_queue_drop(state(), non_neg_integer()) -> state().
-mgmt_queue_drop(StateData, NumHandled) ->
- NewQueue = jlib:queue_drop_while(fun({N, _T, _E}) -> N =< NumHandled end,
- StateData#state.mgmt_queue),
- StateData#state{mgmt_queue = NewQueue}.
-
--spec check_queue_length(state()) -> state().
-check_queue_length(#state{mgmt_max_queue = Limit} = StateData)
- when Limit == infinity;
- Limit == exceeded ->
- StateData;
-check_queue_length(#state{mgmt_queue = Queue,
- mgmt_max_queue = Limit} = StateData) ->
- case queue:len(Queue) > Limit of
- true ->
- StateData#state{mgmt_max_queue = exceeded};
- false ->
- StateData
- end.
-
--spec handle_unacked_stanzas(state(), fun((_, _, _, _) -> _)) -> ok.
-handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData, F)
- when MgmtState == active;
- MgmtState == pending;
- MgmtState == timeout ->
- Queue = StateData#state.mgmt_queue,
- case queue:len(Queue) of
- 0 ->
- ok;
- N ->
- ?DEBUG("~B stanza(s) were not acknowledged by ~s",
- [N, jid:to_string(StateData#state.jid)]),
- lists:foreach(
- fun({_, Time, Pkt}) ->
- From = xmpp:get_from(Pkt),
- To = xmpp:get_to(Pkt),
- case {From, To} of
- {#jid{}, #jid{}} ->
- F(From, To, Pkt, Time);
- {_, _} ->
- ?DEBUG("Dropping stanza due to invalid JID(s)", [])
- end
- end, queue:to_list(Queue))
- end;
-handle_unacked_stanzas(_StateData, _F) ->
- ok.
-
--spec handle_unacked_stanzas(state()) -> ok.
-handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData)
- when MgmtState == active;
- MgmtState == pending;
- MgmtState == timeout ->
- ResendOnTimeout =
- case StateData#state.mgmt_resend of
- Resend when is_boolean(Resend) ->
- Resend;
- if_offline ->
- Resource = StateData#state.resource,
- case ejabberd_sm:get_user_resources(StateData#state.user,
- StateData#state.server) of
- [Resource] -> % Same resource opened new session
- true;
- [] ->
- true;
- _ ->
- false
- end
- end,
- Lang = StateData#state.lang,
- ReRoute = case ResendOnTimeout of
+ case (?SETS):is_element(LFrom, PresF) of
true ->
- fun(From, To, El, Time) ->
- NewEl = add_resent_delay_info(StateData, El, Time),
- ejabberd_router:route(From, To, NewEl)
- end;
+ A = (?SETS):add_element(LFrom, PresA),
+ State#{pres_a => A};
false ->
- fun(From, To, El, _Time) ->
- Txt = <<"User session terminated">>,
- ejabberd_router:route_error(
- To, From, El, xmpp:err_service_unavailable(Txt, Lang))
+ case (?SETS):is_element(LBFrom, PresF) of
+ true ->
+ A = (?SETS):add_element(LBFrom, PresA),
+ State#{pres_a => A};
+ false ->
+ State
end
- end,
- F = fun(From, _To, #presence{}, _Time) ->
- ?DEBUG("Dropping presence stanza from ~s",
- [jid:to_string(From)]);
- (From, To, #iq{} = El, _Time) ->
- Txt = <<"User session terminated">>,
- ejabberd_router:route_error(
- To, From, El, xmpp:err_service_unavailable(Txt, Lang));
- (From, _To, #message{meta = #{carbon_copy := true}}, _Time) ->
- %% XEP-0280 says: "When a receiving server attempts to deliver a
- %% forked message, and that message bounces with an error for
- %% any reason, the receiving server MUST NOT forward that error
- %% back to the original sender." Resending such a stanza could
- %% easily lead to unexpected results as well.
- ?DEBUG("Dropping forwarded message stanza from ~s",
- [jid:to_string(From)]);
- (From, To, El, Time) ->
- case ejabberd_hooks:run_fold(message_is_archived,
- StateData#state.server, false,
- [StateData, From,
- StateData#state.jid, El]) of
- true ->
- ?DEBUG("Dropping archived message stanza from ~p",
- [jid:to_string(xmpp:get_from(El))]);
- false ->
- ReRoute(From, To, El, Time)
- end
- end,
- handle_unacked_stanzas(StateData, F);
-handle_unacked_stanzas(_StateData) ->
- ok.
-
--spec inherit_session_state(state(), binary()) -> {ok, state()} |
- {error, binary()} |
- {error, binary(), non_neg_integer()}.
-inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
- case jlib:base64_to_term(ResumeID) of
- {term, {R, Time}} ->
- case ejabberd_sm:get_session_pid(U, S, R) of
- none ->
- case ejabberd_sm:get_offline_info(Time, U, S, R) of
- none ->
- {error, <<"Previous session PID not found">>};
- Info ->
- case proplists:get_value(num_stanzas_in, Info) of
- undefined ->
- {error, <<"Previous session timed out">>};
- H ->
- {error, <<"Previous session timed out">>, H}
- end
- end;
- OldPID ->
- OldSID = {Time, OldPID},
- case catch resume_session(OldSID) of
- {resume, OldStateData} ->
- NewSID = {Time, self()}, % Old time, new PID
- Priority = case OldStateData#state.pres_last of
- undefined ->
- 0;
- Presence ->
- get_priority_from_presence(Presence)
- end,
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
- ejabberd_sm:open_session(NewSID, U, S, R,
- Priority, Info),
- {ok, StateData#state{conn = Conn,
- sid = NewSID,
- jid = OldStateData#state.jid,
- resource = OldStateData#state.resource,
- pres_t = OldStateData#state.pres_t,
- pres_f = OldStateData#state.pres_f,
- pres_a = OldStateData#state.pres_a,
- pres_last = OldStateData#state.pres_last,
- pres_timestamp = OldStateData#state.pres_timestamp,
- privacy_list = OldStateData#state.privacy_list,
- aux_fields = OldStateData#state.aux_fields,
- mgmt_xmlns = OldStateData#state.mgmt_xmlns,
- mgmt_queue = OldStateData#state.mgmt_queue,
- mgmt_timeout = OldStateData#state.mgmt_timeout,
- mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in,
- mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out,
- mgmt_state = active,
- csi_state = active}, Info};
- {error, Msg} ->
- {error, Msg};
- _ ->
- {error, <<"Cannot grab session state">>}
- end
- end;
- _ ->
- {error, <<"Invalid 'previd' value">>}
- end.
-
--spec resume_session({integer(), pid()}) -> any().
-resume_session({Time, PID}) ->
- (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 15000).
-
--spec make_resume_id(state()) -> binary().
-make_resume_id(StateData) ->
- {Time, _} = StateData#state.sid,
- jlib:term_to_base64({StateData#state.resource, Time}).
-
--spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza().
-add_resent_delay_info(_State, #iq{} = El, _Time) ->
- El;
-add_resent_delay_info(#state{server = From}, El, Time) ->
- xmpp_util:add_delay_info(El, jid:make(From), Time, <<"Resent">>).
-
-%%%----------------------------------------------------------------------
-%%% XEP-0352
-%%%----------------------------------------------------------------------
--spec csi_filter_stanza(state(), stanza()) -> state().
-csi_filter_stanza(#state{csi_state = CsiState, jid = JID, server = Server} =
- StateData, Stanza) ->
- {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server,
- {StateData, [Stanza]},
- [Server, JID, Stanza]),
- StateData2 = lists:foldl(fun(CurStanza, AccState) ->
- send_stanza(AccState, CurStanza)
- end, StateData1#state{csi_state = active},
- Stanzas),
- StateData2#state{csi_state = CsiState}.
-
--spec csi_flush_queue(state()) -> state().
-csi_flush_queue(#state{csi_state = CsiState, jid = JID, server = Server} =
- StateData) ->
- {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server,
- {StateData, []},
- [Server, JID]),
- StateData2 = lists:foldl(fun(CurStanza, AccState) ->
- send_stanza(AccState, CurStanza)
- end, StateData1#state{csi_state = active},
- Stanzas),
- StateData2#state{csi_state = CsiState}.
-
-%%%----------------------------------------------------------------------
-%%% JID Set memory footprint reduction code
-%%%----------------------------------------------------------------------
-
-%% Try to reduce the heap footprint of the four presence sets
-%% by ensuring that we re-use strings and Jids wherever possible.
--spec pack(state()) -> state().
-pack(S = #state{pres_a = A, pres_f = F,
- pres_t = T}) ->
- {NewA, Pack2} = pack_jid_set(A, gb_trees:empty()),
- {NewF, Pack3} = pack_jid_set(F, Pack2),
- {NewT, _Pack4} = pack_jid_set(T, Pack3),
- S#state{pres_a = NewA, pres_f = NewF,
- pres_t = NewT}.
-
-pack_jid_set(Set, Pack) ->
- Jids = (?SETS):to_list(Set),
- {PackedJids, NewPack} = pack_jids(Jids, Pack, []),
- {(?SETS):from_list(PackedJids), NewPack}.
-
-pack_jids([], Pack, Acc) -> {Acc, Pack};
-pack_jids([{U, S, R} = Jid | Jids], Pack, Acc) ->
- case gb_trees:lookup(Jid, Pack) of
- {value, PackedJid} ->
- pack_jids(Jids, Pack, [PackedJid | Acc]);
- none ->
- {NewU, Pack1} = pack_string(U, Pack),
- {NewS, Pack2} = pack_string(S, Pack1),
- {NewR, Pack3} = pack_string(R, Pack2),
- NewJid = {NewU, NewS, NewR},
- NewPack = gb_trees:insert(NewJid, NewJid, Pack3),
- pack_jids(Jids, NewPack, [NewJid | Acc])
- end.
-
-pack_string(String, Pack) ->
- case gb_trees:lookup(String, Pack) of
- {value, PackedString} -> {PackedString, Pack};
- none -> {String, gb_trees:insert(String, String, Pack)}
+ end
end.
-transform_listen_option(Opt, Opts) ->
- [Opt|Opts].
-
--spec identity([{atom(), binary()}]) -> binary().
-identity(Props) ->
- case proplists:get_value(authzid, Props, <<>>) of
- <<>> -> proplists:get_value(username, Props, <<>>);
- AuthzId -> AuthzId
+-spec fsm_limit_opts([proplists:property()]) -> [proplists:property()].
+fsm_limit_opts(Opts) ->
+ case lists:keysearch(max_fsm_queue, 1, Opts) of
+ {value, {_, N}} when is_integer(N) -> [{max_queue, N}];
+ _ ->
+ case ejabberd_config:get_option(
+ max_fsm_queue,
+ fun(I) when is_integer(I), I > 0 -> I end) of
+ undefined -> [];
+ N -> [{max_queue, N}]
+ end
end.
-
-opt_type(domain_certfile) -> fun iolist_to_binary/1;
-opt_type(max_fsm_queue) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(resource_conflict) ->
- fun (setresource) -> setresource;
- (closeold) -> closeold;
- (closenew) -> closenew;
- (acceptnew) -> acceptnew
- end;
-opt_type(_) ->
- [domain_certfile, max_fsm_queue, resource_conflict].