From 5cc8e807df6994fa6b0e860bbcfe0af8fa7fe19f Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Sun, 11 Dec 2016 15:03:37 +0300 Subject: Initial version of new XMPP stream behaviour (for review) --- src/ejabberd_c2s.erl | 3257 +++++++++----------------------------------------- 1 file changed, 551 insertions(+), 2706 deletions(-) (limited to 'src/ejabberd_c2s.erl') diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 6d84d8d93..1568d5db6 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1,8 +1,5 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_c2s.erl -%%% Author : Alexey Shchepin -%%% Purpose : Serve C2S connection -%%% Created : 16 Nov 2002 by Alexey Shchepin +%%%------------------------------------------------------------------- +%%% Created : 8 Dec 2016 by Evgeny Khramtsov %%% %%% %%% ejabberd, Copyright (C) 2002-2016 ProcessOne @@ -21,1998 +18,534 @@ %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_c2s). - --behaviour(ejabberd_config). - --author('alexey@process-one.net'). - --protocol({xep, 78, '2.5'}). --protocol({xep, 138, '2.0'}). --protocol({xep, 198, '1.3'}). --protocol({xep, 356, '7.1'}). - --update_info({update, 0}). - --define(GEN_FSM, p1_fsm). - --behaviour(?GEN_FSM). - -%% External exports --export([start/2, - stop/1, - start_link/2, - close/1, - send_text/2, - send_element/2, - socket_type/0, - get_presence/1, - get_last_presence/1, - get_aux_field/2, - set_aux_field/3, - del_aux_field/2, - get_subscription/2, - get_queued_stanzas/1, - get_csi_state/1, - set_csi_state/2, - get_resume_timeout/1, - set_resume_timeout/2, - send_filtered/5, - broadcast/4, - get_subscribed/1, - transform_listen_option/2]). - --export([init/1, wait_for_stream/2, wait_for_auth/2, - wait_for_feature_request/2, wait_for_bind/2, - wait_for_sasl_response/2, - wait_for_resume/2, session_established/2, - handle_event/3, handle_sync_event/4, code_change/4, - handle_info/3, terminate/3, print_state/1, opt_type/1]). +-behaviour(xmpp_stream_in). + +-protocol({rfc, 6121}). + +%% ejabberd_socket callbacks +-export([start/2, socket_type/0]). +%% xmpp_stream_in callbacks +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). +-export([tls_options/1, tls_required/1, sasl_mechanisms/1, init_sasl/1, bind/2, + unauthenticated_stream_features/1, authenticated_stream_features/1, + handle_stream_start/1, handle_stream_end/1, handle_stream_close/1, + handle_unauthenticated_packet/2, handle_authenticated_packet/2, + handle_auth_success/4, handle_auth_failure/4, + handle_unbinded_packet/2]). +%% API +-export([get_presence/1, get_subscription/2, get_subscribed/1, + send/2, close/1]). -include("ejabberd.hrl"). --include("logger.hrl"). - -include("xmpp.hrl"). -%%-include("legacy.hrl"). - --include("mod_privacy.hrl"). +-include("logger.hrl"). -define(SETS, gb_sets). --define(DICT, dict). - -%% pres_a contains all the presence available send (either through roster mechanism or directed). -%% Directed presence unavailable remove user from pres_a. --record(state, {socket, - sockmod, - socket_monitor, - xml_socket, - streamid, - sasl_state, - access, - shaper, - zlib = false, - tls = false, - tls_required = false, - tls_enabled = false, - tls_options = [], - authenticated = false, - jid, - user = <<"">>, server = <<"">>, resource = <<"">>, - sid, - pres_t = ?SETS:new(), - pres_f = ?SETS:new(), - pres_a = ?SETS:new(), - pres_last, - pres_timestamp, - privacy_list = #userlist{}, - conn = unknown, - auth_module = unknown, - ip, - aux_fields = [], - csi_state = active, - mgmt_state, - mgmt_xmlns, - mgmt_queue, - mgmt_max_queue, - mgmt_pending_since, - mgmt_timeout, - mgmt_max_timeout, - mgmt_ack_timeout, - mgmt_ack_timer, - mgmt_resend, - mgmt_stanzas_in = 0, - mgmt_stanzas_out = 0, - mgmt_stanzas_req = 0, - ask_offline = true, - lang = <<"">>}). - --type state_name() :: wait_for_stream | wait_for_auth | - wait_for_feature_request | wait_for_bind | - wait_for_sasl_response | wait_for_resume | - session_established. --type state() :: #state{}. --type fsm_stop() :: {stop, normal, state()}. --type fsm_next() :: {next_state, state_name(), state(), non_neg_integer()}. --type fsm_reply() :: {reply, any(), state_name(), state(), non_neg_integer()}. --type fsm_transition() :: fsm_stop() | fsm_next(). --export_type([state/0]). - -%-define(DBGFSM, true). +%%-define(DBGFSM, true). -ifdef(DBGFSM). - -define(FSMOPTS, [{debug, [trace]}]). - -else. - -define(FSMOPTS, []). - -endif. -%% This is the timeout to apply between event when starting a new -%% session: --define(C2S_OPEN_TIMEOUT, 60000). - --define(C2S_HIBERNATE_TIMEOUT, ejabberd_config:get_option(c2s_hibernate, fun(X) when is_integer(X); X == hibernate-> X end, 90000)). - --define(STREAM_HEADER, - <<"">>). - --define(STREAM_TRAILER, <<"">>). +-type state() :: map(). +-type next_state() :: {noreply, state()} | {stop, term(), state()}. +-export_type([state/0, next_state/0]). -%% XEP-0198: - --define(IS_STREAM_MGMT_PACKET(Pkt), - is_record(Pkt, sm_enable) or - is_record(Pkt, sm_resume) or - is_record(Pkt, sm_a) or - is_record(Pkt, sm_r)). - -%%%---------------------------------------------------------------------- -%%% API -%%%---------------------------------------------------------------------- +%%%=================================================================== +%%% ejabberd_socket API +%%%=================================================================== start(SockData, Opts) -> - ?GEN_FSM:start(ejabberd_c2s, - [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS). - -start_link(SockData, Opts) -> - (?GEN_FSM):start_link(ejabberd_c2s, - [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS). - -socket_type() -> xml_stream. + xmpp_stream_in:start(?MODULE, [SockData, Opts], + fsm_limit_opts(Opts) ++ ?FSMOPTS). -%% Return Username, Resource and presence information -get_presence(FsmRef) -> - (?GEN_FSM):sync_send_all_state_event(FsmRef, - {get_presence}, 1000). -get_last_presence(FsmRef) -> - (?GEN_FSM):sync_send_all_state_event(FsmRef, - {get_last_presence}, 1000). +socket_type() -> + xml_stream. --spec get_aux_field(any(), state()) -> {ok, any()} | error. -get_aux_field(Key, #state{aux_fields = Opts}) -> - case lists:keyfind(Key, 1, Opts) of - {_, Val} -> {ok, Val}; - false -> error - end. - --spec set_aux_field(any(), any(), state()) -> state(). -set_aux_field(Key, Val, - #state{aux_fields = Opts} = State) -> - Opts1 = lists:keydelete(Key, 1, Opts), - State#state{aux_fields = [{Key, Val} | Opts1]}. - --spec del_aux_field(any(), state()) -> state(). -del_aux_field(Key, #state{aux_fields = Opts} = State) -> - Opts1 = lists:keydelete(Key, 1, Opts), - State#state{aux_fields = Opts1}. +-spec get_presence(pid()) -> presence(). +get_presence(Ref) -> + xmpp_stream_in:call(Ref, get_presence, 1000). -spec get_subscription(jid() | ljid(), state()) -> both | from | to | none. -get_subscription(From = #jid{}, StateData) -> - get_subscription(jid:tolower(From), StateData); -get_subscription(LFrom, StateData) -> - LBFrom = setelement(3, LFrom, <<"">>), - F = (?SETS):is_element(LFrom, StateData#state.pres_f) - orelse - (?SETS):is_element(LBFrom, StateData#state.pres_f), - T = (?SETS):is_element(LFrom, StateData#state.pres_t) - orelse - (?SETS):is_element(LBFrom, StateData#state.pres_t), +get_subscription(#jid{} = From, State) -> + get_subscription(jid:tolower(From), State); +get_subscription(LFrom, #{pres_f := PresF, pres_t := PresT}) -> + LBFrom = jid:remove_resource(LFrom), + F = ?SETS:is_element(LFrom, PresF) orelse ?SETS:is_element(LBFrom, PresF), + T = ?SETS:is_element(LFrom, PresT) orelse ?SETS:is_element(LBFrom, PresT), if F and T -> both; F -> from; T -> to; true -> none end. -get_queued_stanzas(#state{mgmt_queue = Queue} = StateData) -> - lists:map(fun({_N, Time, El}) -> - add_resent_delay_info(StateData, El, Time) - end, queue:to_list(Queue)). - -get_csi_state(#state{csi_state = CsiState}) -> - CsiState. - -set_csi_state(#state{} = StateData, CsiState) -> - StateData#state{csi_state = CsiState}; -set_csi_state(FsmRef, CsiState) -> - FsmRef ! {set_csi_state, CsiState}. - -get_resume_timeout(#state{mgmt_timeout = Timeout}) -> - Timeout. - -set_resume_timeout(#state{} = StateData, Timeout) -> - StateData#state{mgmt_timeout = Timeout}; -set_resume_timeout(FsmRef, Timeout) -> - FsmRef ! {set_resume_timeout, Timeout}. - --spec send_filtered(pid(), binary(), jid(), jid(), stanza()) -> any(). -send_filtered(FsmRef, Feature, From, To, Packet) -> - FsmRef ! {send_filtered, Feature, From, To, Packet}. - --spec broadcast(pid(), any(), jid(), stanza()) -> any(). -broadcast(FsmRef, Type, From, Packet) -> - FsmRef ! {broadcast, Type, From, Packet}. - --spec stop(pid()) -> any(). -stop(FsmRef) -> (?GEN_FSM):send_event(FsmRef, stop). - --spec close(pid()) -> any(). -%% What is the difference between stop and close??? -close(FsmRef) -> (?GEN_FSM):send_event(FsmRef, closed). - -%%%---------------------------------------------------------------------- -%%% Callback functions from gen_fsm -%%%---------------------------------------------------------------------- - -init([{SockMod, Socket}, Opts]) -> - Access = gen_mod:get_opt(access, Opts, - fun acl:access_rules_validator/1, all), - Shaper = gen_mod:get_opt(shaper, Opts, - fun acl:shaper_rules_validator/1, none), - XMLSocket = case lists:keysearch(xml_socket, 1, Opts) of - {value, {_, XS}} -> XS; - _ -> false - end, - Zlib = proplists:get_bool(zlib, Opts), - StartTLS = proplists:get_bool(starttls, Opts), - StartTLSRequired = proplists:get_bool(starttls_required, Opts), - TLSEnabled = proplists:get_bool(tls, Opts), - TLS = StartTLS orelse - StartTLSRequired orelse TLSEnabled, - TLSOpts1 = lists:filter(fun ({certfile, _}) -> true; - ({ciphers, _}) -> true; - ({dhfile, _}) -> true; - (_) -> false - end, - Opts), - TLSOpts2 = case lists:keysearch(protocol_options, 1, Opts) of - {value, {_, O}} -> - [_|ProtocolOptions] = lists:foldl( - fun(X, Acc) -> X ++ Acc end, [], - [["|" | binary_to_list(Opt)] || Opt <- O, is_binary(Opt)] - ), - [{protocol_options, iolist_to_binary(ProtocolOptions)} | TLSOpts1]; - _ -> TLSOpts1 - end, - TLSOpts3 = case proplists:get_bool(tls_compression, Opts) of - false -> [compression_none | TLSOpts2]; - true -> TLSOpts2 - end, - TLSOpts = [verify_none | TLSOpts3], - StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), - StreamMgmtState = if StreamMgmtEnabled -> inactive; - true -> disabled - end, - MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of - Limit when is_integer(Limit), Limit > 0 -> Limit; - infinity -> infinity; - _ -> 1000 - end, - ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of - RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo; - _ -> 300 - end, - MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of - Max when is_integer(Max), Max >= ResumeTimeout -> Max; - _ -> ResumeTimeout - end, - AckTimeout = case proplists:get_value(ack_timeout, Opts) of - ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000; - infinity -> undefined; - _ -> 60000 - end, - ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of - Resend when is_boolean(Resend) -> Resend; - if_offline -> if_offline; - _ -> false - end, - IP = peerip(SockMod, Socket), - Socket1 = if TLSEnabled andalso - SockMod /= ejabberd_frontend_socket -> - SockMod:starttls(Socket, TLSOpts); - true -> Socket - end, - SocketMonitor = SockMod:monitor(Socket1), - StateData = #state{socket = Socket1, sockmod = SockMod, - socket_monitor = SocketMonitor, - xml_socket = XMLSocket, zlib = Zlib, tls = TLS, - tls_required = StartTLSRequired, - tls_enabled = TLSEnabled, tls_options = TLSOpts, - sid = ejabberd_sm:make_sid(), streamid = new_id(), - access = Access, shaper = Shaper, ip = IP, - mgmt_state = StreamMgmtState, - mgmt_max_queue = MaxAckQueue, - mgmt_timeout = ResumeTimeout, - mgmt_max_timeout = MaxResumeTimeout, - mgmt_ack_timeout = AckTimeout, - mgmt_resend = ResendOnTimeout}, - {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}. - -spec get_subscribed(pid()) -> [ljid()]. -%% Return list of all available resources of contacts, -get_subscribed(FsmRef) -> - (?GEN_FSM):sync_send_all_state_event(FsmRef, - get_subscribed, 1000). +%% Return list of all available resources of contacts +get_subscribed(Ref) -> + xmpp_stream_in:call(Ref, get_subscribed, 1000). + +-spec close(pid()) -> ok. +close(Ref) -> + xmpp_stream_in:cast(Ref, closed). + +-spec send(state(), xmpp_element()) -> next_state(). +send(State, Pkt) -> + xmpp_stream_in:send(State, Pkt). + +%%%=================================================================== +%%% xmpp_stream_in callbacks +%%%=================================================================== +tls_options(#{server := Server, tls_options := TLSOpts}) -> + LServer = jid:nameprep(Server), + case ejabberd_config:get_option({domain_certfile, LServer}, + fun iolist_to_binary/1) of + undefined -> + TLSOpts; + CertFile -> + lists:keystore(certfile, 1, TLSOpts, {certfile, CertFile}) + end. -wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> - try xmpp:decode(#xmlel{name = Name, attrs = Attrs}) of - #stream_start{xmlns = NS_CLIENT, stream_xmlns = NS_STREAM, - version = Version, lang = Lang} - when NS_CLIENT /= ?NS_CLIENT; NS_STREAM /= ?NS_STREAM -> - send_header(StateData, ?MYNAME, Version, Lang), - send_element(StateData, xmpp:serr_invalid_namespace()), - {stop, normal, StateData}; - #stream_start{lang = Lang, version = Version} when byte_size(Lang) > 35 -> - %% As stated in BCP47, 4.4.1: - %% Protocols or specifications that specify limited buffer sizes for - %% language tags MUST allow for language tags of at least 35 characters. - %% Do not store long language tag to avoid possible DoS/flood attacks - send_header(StateData, ?MYNAME, Version, ?MYLANG), - Txt = <<"Too long value of 'xml:lang' attribute">>, - send_element(StateData, - xmpp:serr_policy_violation(Txt, ?MYLANG)), - {stop, normal, StateData}; - #stream_start{to = undefined, lang = Lang, version = Version} -> - Txt = <<"Missing 'to' attribute">>, - send_header(StateData, ?MYNAME, Version, Lang), - send_element(StateData, - xmpp:serr_improper_addressing(Txt, Lang)), - {stop, normal, StateData}; - #stream_start{to = #jid{lserver = To}, lang = Lang, - version = Version} -> - Server = case StateData#state.server of - <<"">> -> To; - S -> S - end, - StreamVersion = case Version of - {1,0} -> {1,0}; - _ -> undefined - end, - IsBlacklistedIP = is_ip_blacklisted(StateData#state.ip, Lang), - case lists:member(Server, ?MYHOSTS) of - true when IsBlacklistedIP == false -> - change_shaper(StateData, jid:make(<<"">>, Server, <<"">>)), - case StreamVersion of - {1,0} -> - send_header(StateData, Server, {1,0}, ?MYLANG), - case StateData#state.authenticated of - false -> - TLS = StateData#state.tls, - TLSEnabled = StateData#state.tls_enabled, - TLSRequired = StateData#state.tls_required, - SASLState = cyrsasl:server_new( - <<"jabber">>, Server, <<"">>, [], - fun (U) -> - ejabberd_auth:get_password_with_authmodule( - U, Server) - end, - fun(U, AuthzId, P) -> - ejabberd_auth:check_password_with_authmodule( - U, AuthzId, Server, P) - end, - fun(U, AuthzId, P, D, DG) -> - ejabberd_auth:check_password_with_authmodule( - U, AuthzId, Server, P, D, DG) - end), - Mechs = - case TLSEnabled or not TLSRequired of - true -> - [#sasl_mechanisms{list = cyrsasl:listmech(Server)}]; - false -> - [] - end, - SockMod = - (StateData#state.sockmod):get_sockmod(StateData#state.socket), - Zlib = StateData#state.zlib, - CompressFeature = case Zlib andalso - ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of - true -> - [#compression{methods = [<<"zlib">>]}]; - _ -> - [] - end, - TLSFeature = - case (TLS == true) andalso - (TLSEnabled == false) andalso - (SockMod == gen_tcp) of - true -> - [#starttls{required = TLSRequired}]; - false -> - [] - end, - StreamFeatures1 = TLSFeature ++ CompressFeature ++ Mechs, - StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features, - Server, StreamFeatures1, [Server]), - send_element(StateData, - #stream_features{sub_els = StreamFeatures}), - fsm_next_state(wait_for_feature_request, - StateData#state{server = Server, - sasl_state = SASLState, - lang = Lang}); - _ -> - case StateData#state.resource of - <<"">> -> - RosterVersioningFeature = - ejabberd_hooks:run_fold(roster_get_versioning_feature, - Server, [], - [Server]), - StreamManagementFeature = - case stream_mgmt_enabled(StateData) of - true -> - [#feature_sm{xmlns = ?NS_STREAM_MGMT_2}, - #feature_sm{xmlns = ?NS_STREAM_MGMT_3}]; - false -> - [] - end, - SockMod = - (StateData#state.sockmod):get_sockmod( - StateData#state.socket), - Zlib = StateData#state.zlib, - CompressFeature = - case Zlib andalso - ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of - true -> - [#compression{methods = [<<"zlib">>]}]; - _ -> - [] - end, - StreamFeatures1 = - [#bind{}, #xmpp_session{optional = true}] - ++ - RosterVersioningFeature ++ - StreamManagementFeature ++ - CompressFeature ++ - ejabberd_hooks:run_fold(c2s_post_auth_features, - Server, [], [Server]), - StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features, - Server, StreamFeatures1, [Server]), - send_element(StateData, - #stream_features{sub_els = StreamFeatures}), - fsm_next_state(wait_for_bind, - StateData#state{server = Server, lang = Lang}); - _ -> - send_element(StateData, #stream_features{}), - fsm_next_state(session_established, - StateData#state{server = Server, lang = Lang}) - end - end; - _ -> - send_header(StateData, Server, StreamVersion, ?MYLANG), - if not StateData#state.tls_enabled and - StateData#state.tls_required -> - send_element( - StateData, - xmpp:serr_policy_violation( - <<"Use of STARTTLS required">>, Lang)), - {stop, normal, StateData}; - true -> - fsm_next_state(wait_for_auth, - StateData#state{server = Server, - lang = Lang}) - end - end; - true -> - IP = StateData#state.ip, - {true, LogReason, ReasonT} = IsBlacklistedIP, - ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s", - [jlib:ip_to_list(IP), LogReason]), - send_header(StateData, Server, StreamVersion, ?MYLANG), - send_element(StateData, xmpp:serr_policy_violation(ReasonT, Lang)), - {stop, normal, StateData}; - _ -> - send_header(StateData, ?MYNAME, StreamVersion, ?MYLANG), - send_element(StateData, xmpp:serr_host_unknown()), - {stop, normal, StateData} - end; - _ -> - send_header(StateData, ?MYNAME, {1,0}, ?MYLANG), - send_element(StateData, xmpp:serr_invalid_xml()), - {stop, normal, StateData} - catch _:{xmpp_codec, Why} -> - Txt = xmpp:format_error(Why), - send_header(StateData, ?MYNAME, {1,0}, ?MYLANG), - send_element(StateData, xmpp:serr_invalid_xml(Txt, ?MYLANG)), - {stop, normal, StateData} - end; -wait_for_stream(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_stream({xmlstreamelement, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream({xmlstreamend, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream({xmlstreamerror, _}, StateData) -> - send_header(StateData, ?MYNAME, {1,0}, <<"">>), - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream(closed, StateData) -> - {stop, normal, StateData}; -wait_for_stream(stop, StateData) -> - {stop, normal, StateData}. +tls_required(#{tls_required := TLSRequired}) -> + TLSRequired. -wait_for_auth({xmlstreamelement, #xmlel{} = El}, StateData) -> - decode_element(El, wait_for_auth, StateData); -wait_for_auth(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_auth, dispatch_stream_mgmt(Pkt, StateData)); -wait_for_auth(#iq{type = get, sub_els = [#legacy_auth{}]} = IQ, StateData) -> - Auth = #legacy_auth{username = <<>>, password = <<>>, resource = <<>>}, - Res = case ejabberd_auth:plain_password_required(StateData#state.server) of - false -> - xmpp:make_iq_result(IQ, Auth#legacy_auth{digest = <<>>}); - true -> - xmpp:make_iq_result(IQ, Auth) - end, - send_element(StateData, Res), - fsm_next_state(wait_for_auth, StateData); -wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{resource = <<"">>}]} = IQ, - StateData) -> - Lang = StateData#state.lang, - Txt = <<"No resource provided">>, - Err = xmpp:make_error(IQ, xmpp:err_not_acceptable(Txt, Lang)), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData); -wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{username = U, - password = P0, - digest = D0, - resource = R}]} = IQ, - StateData) when is_binary(U), is_binary(R) -> - JID = jid:make(U, StateData#state.server, R), - case (JID /= error) andalso - acl:access_matches(StateData#state.access, - #{usr => jid:split(JID), ip => StateData#state.ip}, - StateData#state.server) == allow of - true -> - DGen = fun (PW) -> - p1_sha:sha(<<(StateData#state.streamid)/binary, PW/binary>>) - end, - P = if is_binary(P0) -> P0; true -> <<>> end, - D = if is_binary(D0) -> D0; true -> <<>> end, - case ejabberd_auth:check_password_with_authmodule( - U, U, StateData#state.server, P, D, DGen) of - {true, AuthModule} -> - ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p from ~s", - [StateData#state.socket, - jid:to_string(JID), AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, AuthModule}], - Res = xmpp:make_iq_result(IQ), - send_element(StateData, Res), - ejabberd_sm:open_session(StateData#state.sid, U, - StateData#state.server, R, - Info), - change_shaper(StateData, JID), - {Fs, Ts} = ejabberd_hooks:run_fold( - roster_get_subscription_lists, - StateData#state.server, - {[], []}, - [U, StateData#state.server]), - LJID = jid:tolower(jid:remove_resource(JID)), - Fs1 = [LJID | Fs], - Ts1 = [LJID | Ts], - PrivList = ejabberd_hooks:run_fold(privacy_get_user_list, - StateData#state.server, - #userlist{}, - [U, StateData#state.server]), - NewStateData = StateData#state{ - user = U, - resource = R, - jid = JID, - conn = Conn, - auth_module = AuthModule, - pres_f = (?SETS):from_list(Fs1), - pres_t = (?SETS):from_list(Ts1), - privacy_list = PrivList}, - fsm_next_state(session_established, NewStateData); - _ -> - ?INFO_MSG("(~w) Failed legacy authentication for ~s from ~s", - [StateData#state.socket, - jid:to_string(JID), - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, U, StateData#state.server, - StateData#state.ip]), - Lang = StateData#state.lang, - Txt = <<"Legacy authentication failed">>, - Err = xmpp:make_error(IQ, xmpp:err_not_authorized(Txt, Lang)), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData) - end; - false when JID == error -> - ?INFO_MSG("(~w) Forbidden legacy authentication " - "for username '~s' with resource '~s'", - [StateData#state.socket, U, R]), - Err = xmpp:make_error(IQ, xmpp:err_jid_malformed()), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData); - false -> - ?INFO_MSG("(~w) Forbidden legacy authentication for ~s from ~s", - [StateData#state.socket, - jid:to_string(JID), - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, U, StateData#state.server, - StateData#state.ip]), - Lang = StateData#state.lang, - Txt = <<"Legacy authentication forbidden">>, - Err = xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)), - send_element(StateData, Err), - fsm_next_state(wait_for_auth, StateData) - end; -wait_for_auth(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_auth({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -wait_for_auth({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_auth(closed, StateData) -> - {stop, normal, StateData}; -wait_for_auth(stop, StateData) -> - {stop, normal, StateData}; -wait_for_auth(Pkt, StateData) -> - process_unauthenticated_stanza(StateData, Pkt), - fsm_next_state(wait_for_auth, StateData). +unauthenticated_stream_features(#{server := Server}) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_pre_auth_features, LServer, [], [LServer]). -wait_for_feature_request({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_feature_request, StateData); -wait_for_feature_request(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_feature_request, - dispatch_stream_mgmt(Pkt, StateData)); -wait_for_feature_request(#sasl_auth{mechanism = Mech, - text = ClientIn}, - #state{tls_enabled = TLSEnabled, - tls_required = TLSRequired} = StateData) - when TLSEnabled or not TLSRequired -> - case cyrsasl:server_start(StateData#state.sasl_state, Mech, ClientIn) of - {ok, Props} -> - (StateData#state.sockmod):reset_stream(StateData#state.socket), - U = identity(Props), - AuthModule = proplists:get_value(auth_module, Props, undefined), - ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", - [StateData#state.socket, U, AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_success{}), - fsm_next_state(wait_for_stream, - StateData#state{streamid = new_id(), - authenticated = true, - auth_module = AuthModule, - sasl_state = undefined, - user = U}); - {continue, ServerOut, NewSASLState} -> - send_element(StateData, #sasl_challenge{text = ServerOut}), - fsm_next_state(wait_for_sasl_response, - StateData#state{sasl_state = NewSASLState}); - {error, Error, Username} -> - ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s", - [StateData#state.socket, - Username, StateData#state.server, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, Username, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData); - {error, Error} -> - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData) - end; -wait_for_feature_request(#starttls{}, - #state{tls = true, tls_enabled = false} = StateData) -> - case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of - gen_tcp -> - TLSOpts = case ejabberd_config:get_option( - {domain_certfile, StateData#state.server}, - fun iolist_to_binary/1) of - undefined -> - StateData#state.tls_options; - CertFile -> - lists:keystore(certfile, 1, - StateData#state.tls_options, - {certfile, CertFile}) - end, - Socket = StateData#state.socket, - BProceed = fxml:element_to_binary(xmpp:encode(#starttls_proceed{})), - TLSSocket = (StateData#state.sockmod):starttls(Socket, TLSOpts, BProceed), - fsm_next_state(wait_for_stream, - StateData#state{socket = TLSSocket, - streamid = new_id(), - tls_enabled = true}); - _ -> - Lang = StateData#state.lang, - Txt = <<"Unsupported TLS transport">>, - send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)), - {stop, normal, StateData} - end; -wait_for_feature_request(#compress{} = Comp, StateData) -> - Zlib = StateData#state.zlib, - SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), - if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) -> - process_compression_request(Comp, wait_for_feature_request, StateData); - true -> - send_element(StateData, #compress_failure{reason = 'setup-failed'}), - fsm_next_state(wait_for_feature_request, StateData) - end; -wait_for_feature_request(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_feature_request({xmlstreamend, _Name}, - StateData) -> - {stop, normal, StateData}; -wait_for_feature_request({xmlstreamerror, _}, - StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_feature_request(closed, StateData) -> - {stop, normal, StateData}; -wait_for_feature_request(stop, StateData) -> - {stop, normal, StateData}; -wait_for_feature_request(_Pkt, - #state{tls_required = TLSRequired, - tls_enabled = TLSEnabled} = StateData) - when TLSRequired and not TLSEnabled -> - Lang = StateData#state.lang, - Txt = <<"Use of STARTTLS required">>, - send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)), - {stop, normal, StateData}; -wait_for_feature_request(Pkt, StateData) -> - process_unauthenticated_stanza(StateData, Pkt), - fsm_next_state(wait_for_feature_request, StateData). +authenticated_stream_features(#{server := Server}) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_post_auth_features, LServer, [], [LServer]). -wait_for_sasl_response({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_sasl_response, StateData); -wait_for_sasl_response(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_sasl_response, - dispatch_stream_mgmt(Pkt, StateData)); -wait_for_sasl_response(#sasl_response{text = ClientIn}, StateData) -> - case cyrsasl:server_step(StateData#state.sasl_state, ClientIn) of - {ok, Props} -> - catch (StateData#state.sockmod):reset_stream(StateData#state.socket), - U = identity(Props), - AuthModule = proplists:get_value(auth_module, Props, <<>>), - ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", - [StateData#state.socket, U, AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_success{}), - fsm_next_state(wait_for_stream, - StateData#state{streamid = new_id(), - authenticated = true, - auth_module = AuthModule, - sasl_state = undefined, - user = U}); - {ok, Props, ServerOut} -> - (StateData#state.sockmod):reset_stream(StateData#state.socket), - U = identity(Props), - AuthModule = proplists:get_value(auth_module, Props, undefined), - ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", - [StateData#state.socket, U, AuthModule, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [true, U, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_success{text = ServerOut}), - fsm_next_state(wait_for_stream, - StateData#state{streamid = new_id(), - authenticated = true, - auth_module = AuthModule, - sasl_state = undefined, - user = U}); - {continue, ServerOut, NewSASLState} -> - send_element(StateData, #sasl_challenge{text = ServerOut}), - fsm_next_state(wait_for_sasl_response, - StateData#state{sasl_state = NewSASLState}); - {error, Error, Username} -> - ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s", - [StateData#state.socket, - Username, StateData#state.server, - ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), - ejabberd_hooks:run(c2s_auth_result, StateData#state.server, - [false, Username, StateData#state.server, - StateData#state.ip]), - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData); - {error, Error} -> - send_element(StateData, #sasl_failure{reason = Error}), - fsm_next_state(wait_for_feature_request, StateData) - end; -wait_for_sasl_response(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response({xmlstreamend, _Name}, - StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response({xmlstreamerror, _}, - StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_sasl_response(closed, StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response(stop, StateData) -> - {stop, normal, StateData}; -wait_for_sasl_response(Pkt, StateData) -> - process_unauthenticated_stanza(StateData, Pkt), - fsm_next_state(wait_for_feature_request, StateData). +sasl_mechanisms(#{server := Server}) -> + cyrsasl:listmech(jid:nameprep(Server)). --spec resource_conflict_action(binary(), binary(), binary()) -> - {accept_resource, binary()} | closenew. -resource_conflict_action(U, S, R) -> - OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of - true -> - ejabberd_config:get_option( - {resource_conflict, S}, - fun(setresource) -> setresource; - (closeold) -> closeold; - (closenew) -> closenew; - (acceptnew) -> acceptnew - end); - false -> - acceptnew - end, - Option = case OptionRaw of - setresource -> setresource; - closeold -> - acceptnew; %% ejabberd_sm will close old session - closenew -> closenew; - acceptnew -> acceptnew; - _ -> acceptnew %% default ejabberd behavior - end, - case Option of - acceptnew -> {accept_resource, R}; - closenew -> closenew; - setresource -> - Rnew = new_uniq_id(), - {accept_resource, Rnew} - end. +init_sasl(#{server := Server}) -> + LServer = jid:nameprep(Server), + cyrsasl:server_new( + <<"jabber">>, LServer, <<"">>, [], + fun(U) -> + ejabberd_auth:get_password_with_authmodule(U, LServer) + end, + fun(U, AuthzId, P) -> + ejabberd_auth:check_password_with_authmodule(U, AuthzId, LServer, P) + end, + fun(U, AuthzId, P, D, DG) -> + ejabberd_auth:check_password_with_authmodule(U, AuthzId, LServer, P, D, DG) + end). --spec decode_element(xmlel(), state_name(), state()) -> fsm_transition(). -decode_element(#xmlel{} = El, StateName, StateData) -> - try case xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of - #iq{sub_els = [_], type = T} = Pkt when T == set; T == get -> - NewPkt = xmpp:decode_els( - Pkt, ?NS_CLIENT, - fun(SubEl) when StateName == session_established -> - case xmpp:get_ns(SubEl) of - ?NS_PRIVACY -> true; - ?NS_BLOCKING -> true; - _ -> false - end; - (SubEl) -> - xmpp:is_known_tag(SubEl) - end), - ?MODULE:StateName(NewPkt, StateData); - Pkt -> - ?MODULE:StateName(Pkt, StateData) - end - catch error:{xmpp_codec, Why} -> - NS = xmpp:get_ns(El), - fsm_next_state( - StateName, - case xmpp:is_stanza(El) of - true -> - Lang = xmpp:get_lang(El), - Txt = xmpp:format_error(Why), - send_error(StateData, El, xmpp:err_bad_request(Txt, Lang)); - false when NS == ?NS_STREAM_MGMT_2; NS == ?NS_STREAM_MGMT_3 -> - Err = #sm_failed{reason = 'bad-request', xmlns = NS}, - send_element(StateData, Err), - StateData; - false -> - StateData - end) +bind(<<"">>, State) -> + bind(new_uniq_id(), State); +bind(R, #{user := U, server := S} = State) -> + case resource_conflict_action(U, S, R) of + closenew -> + {error, xmpp:err_conflict(), State}; + {accept_resource, Resource} -> + open_session(State, Resource) end. -wait_for_bind({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_bind, StateData); -wait_for_bind(#sm_resume{} = Pkt, StateData) -> - case handle_resume(StateData, Pkt) of - {ok, ResumedState} -> - fsm_next_state(session_established, ResumedState); - error -> - fsm_next_state(wait_for_bind, StateData) - end; -wait_for_bind(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(wait_for_bind, dispatch_stream_mgmt(Pkt, StateData)); -wait_for_bind(#iq{type = set, - sub_els = [#bind{resource = R0}]} = IQ, StateData) -> - U = StateData#state.user, - R = case R0 of - <<>> -> new_uniq_id(); - _ -> R0 - end, - case resource_conflict_action(U, StateData#state.server, R) of - closenew -> - Err = xmpp:make_error(IQ, xmpp:err_conflict()), - send_element(StateData, Err), - fsm_next_state(wait_for_bind, StateData); - {accept_resource, R2} -> - JID = jid:make(U, StateData#state.server, R2), - StateData2 = StateData#state{resource = R2, jid = JID}, - case open_session(StateData2) of - {ok, StateData3} -> - Res = xmpp:make_iq_result(IQ, #bind{jid = JID}), - try - send_element(StateData3, Res) - catch - exit:normal -> close(self()) - end, - fsm_next_state_pack(session_established,StateData3); - {error, Error} -> - Err = xmpp:make_error(IQ, Error), - send_element(StateData, Err), - fsm_next_state(wait_for_bind, StateData) +handle_stream_start(#{server := Server, ip := IP, lang := Lang} = State) -> + LServer = jid:nameprep(Server), + case lists:member(LServer, ?MYHOSTS) of + false -> + xmpp_stream_in:send(State, xmpp:serr_host_unknown()); + true -> + case check_bl_c2s(IP, Lang) of + false -> + change_shaper(State), + {noreply, State}; + {true, LogReason, ReasonT} -> + ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s", + [jlib:ip_to_list(IP), LogReason]), + Err = xmpp:serr_policy_violation(ReasonT, Lang), + xmpp_stream_in:send(State, Err) end - end; -wait_for_bind(#compress{} = Comp, StateData) -> - Zlib = StateData#state.zlib, - SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), - if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) -> - process_compression_request(Comp, wait_for_bind, StateData); - true -> - send_element(StateData, #compress_failure{reason = 'setup-failed'}), - fsm_next_state(wait_for_bind, StateData) - end; -wait_for_bind(timeout, StateData) -> - {stop, normal, StateData}; -wait_for_bind({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -wait_for_bind({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_bind(closed, StateData) -> - {stop, normal, StateData}; -wait_for_bind(stop, StateData) -> - {stop, normal, StateData}; -wait_for_bind(Pkt, StateData) -> - fsm_next_state( - wait_for_bind, - case xmpp:is_stanza(Pkt) of - true -> - send_error(StateData, Pkt, xmpp:err_not_acceptable()); - false -> - StateData - end). - --spec open_session(state()) -> {ok, state()} | {error, stanza_error()}. -open_session(StateData) -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - Lang = StateData#state.lang, - IP = StateData#state.ip, - case acl:access_matches(StateData#state.access, - #{usr => jid:split(JID), ip => IP}, - StateData#state.server) of - allow -> - ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, jid:to_string(JID)]), - change_shaper(StateData, JID), - {Fs, Ts} = ejabberd_hooks:run_fold( - roster_get_subscription_lists, - StateData#state.server, - {[], []}, - [U, StateData#state.server]), - LJID = jid:tolower(jid:remove_resource(JID)), - Fs1 = [LJID | Fs], - Ts1 = [LJID | Ts], - PrivList = - ejabberd_hooks:run_fold( - privacy_get_user_list, - StateData#state.server, - #userlist{}, - [U, StateData#state.server]), - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:open_session( - StateData#state.sid, U, StateData#state.server, R, Info), - UpdatedStateData = - StateData#state{ - conn = Conn, - pres_f = ?SETS:from_list(Fs1), - pres_t = ?SETS:from_list(Ts1), - privacy_list = PrivList}, - {ok, UpdatedStateData}; - _ -> - ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), - ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, jid:to_string(JID)]), - Txt = <<"Denied by ACL">>, - {error, xmpp:err_not_allowed(Txt, Lang)} end. -session_established({xmlstreamelement, El}, StateData) -> - decode_element(El, session_established, StateData); -session_established(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> - fsm_next_state(session_established, dispatch_stream_mgmt(Pkt, StateData)); -session_established(#csi{type = active}, StateData) -> - NewStateData = csi_flush_queue(StateData), - fsm_next_state(session_established, NewStateData#state{csi_state = active}); -session_established(#csi{type = inactive}, StateData) -> - fsm_next_state(session_established, StateData#state{csi_state = inactive}); -%% We hibernate the process to reduce memory consumption after a -%% configurable activity timeout -session_established(timeout, StateData) -> - Options = [], - proc_lib:hibernate(?GEN_FSM, enter_loop, - [?MODULE, Options, session_established, StateData]), - fsm_next_state(session_established, StateData); -session_established({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -session_established({xmlstreamerror, - <<"XML stanza is too big">> = E}, - StateData) -> - send_element(StateData, - xmpp:serr_policy_violation(E, StateData#state.lang)), - {stop, normal, StateData}; -session_established({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -session_established(closed, #state{mgmt_state = active} = StateData) -> - catch (StateData#state.sockmod):close(StateData#state.socket), - fsm_next_state(wait_for_resume, StateData); -session_established(closed, StateData) -> - {stop, normal, StateData}; -session_established(stop, StateData) -> - {stop, normal, StateData}; -session_established(Pkt, StateData) when ?is_stanza(Pkt) -> - FromJID = StateData#state.jid, - case check_from(Pkt, FromJID) of - 'invalid-from' -> - send_element(StateData, xmpp:serr_invalid_from()), - {stop, normal, StateData}; - _ -> - NewStateData = update_num_stanzas_in(StateData, Pkt), - session_established2(Pkt, NewStateData) - end; -session_established(_Pkt, StateData) -> - fsm_next_state(session_established, StateData). +handle_stream_end(State) -> + {stop, normal, State}. + +handle_stream_close(State) -> + {stop, normal, State}. + +handle_auth_success(User, Mech, AuthModule, + #{socket := Socket, ip := IP, server := Server} = State) -> + LServer = jid:nameprep(Server), + ?INFO_MSG("(~w) Accepted ~s authentication for ~s@~s by ~p from ~s", + [Socket, Mech, User, LServer, AuthModule, + ejabberd_config:may_hide_data(jlib:ip_to_list(IP))]), + State1 = State#{auth_module => AuthModule}, + ejabberd_hooks:run_fold(c2s_auth_result, LServer, + {noreply, State1}, [true, User]). + +handle_auth_failure(User, Mech, Reason, + #{socket := Socket, ip := IP, server := Server} = State) -> + LServer = jid:nameprep(Server), + ?INFO_MSG("(~w) Failed ~s authentication ~sfrom ~s: ~s", + [Socket, Mech, + if User /= <<"">> -> ["for ", User, "@", LServer, " "]; + true -> "" + end, + ejabberd_config:may_hide_data(jlib:ip_to_list(IP)), Reason]), + ejabberd_hooks:run_fold(c2s_auth_result, LServer, + {noreply, State}, [false, User]). + +handle_unbinded_packet(Pkt, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer, + {noreply, State}, [Pkt]). + +handle_unauthenticated_packet(Pkt, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_unauthenticated_packet, + LServer, {noreply, State}, [Pkt]). + +handle_authenticated_packet(Pkt, #{server := Server} = State) when not ?is_stanza(Pkt) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_authenticated_packet, + LServer, {noreply, State}, [Pkt]); +handle_authenticated_packet(Pkt, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + case ejabberd_hooks:run_fold(c2s_authenticated_packet, + LServer, {noreply, State}, [Pkt]) of + {noreply, State1} -> + Pkt1 = ejabberd_hooks:run_fold(user_send_packet, LServer, Pkt, [State1]), + Res = case Pkt1 of + #presence{to = #jid{lresource = <<"">>}} -> + process_self_presence(State1, Pkt1); + #presence{} -> + process_presence_out(State1, Pkt1); + _ -> + check_privacy_then_route(State1, Pkt1) + end, + ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]), + Res; + Err -> + ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]), + Err + end. --spec session_established2(xmpp_element(), state()) -> fsm_next(). -%% Process packets sent by user (coming from user on c2s XMPP connection) -session_established2(Pkt, StateData) -> - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, - ToJID = case xmpp:get_to(Pkt) of - undefined -> jid:make(User, Server, <<"">>); - J -> J - end, - Lang = case xmpp:get_lang(Pkt) of - <<"">> -> StateData#state.lang; - L -> L +init([State, Opts]) -> + Access = gen_mod:get_opt(access, Opts, fun acl:access_rules_validator/1, all), + Shaper = gen_mod:get_opt(shaper, Opts, fun acl:shaper_rules_validator/1, none), + TLSOpts = lists:filter( + fun({certfile, _}) -> true; + ({ciphers, _}) -> true; + ({dhfile, _}) -> true; + (_) -> false + end, Opts), + TLSRequired = proplists:get_bool(starttls_required, Opts), + TLSVerify = proplists:get_bool(tls_verify, Opts), + State1 = State#{tls_options => TLSOpts, + tls_required => TLSRequired, + tls_verify => TLSVerify, + pres_a => ?SETS:new(), + pres_f => ?SETS:new(), + pres_t => ?SETS:new(), + sid => ejabberd_sm:make_sid(), + lang => ?MYLANG, + server => ?MYNAME, + access => Access, + shaper => Shaper}, + ejabberd_hooks:run_fold(c2s_init, {ok, State1}, []). + +handle_call(get_presence, _From, + #{user := U, server := S, resource := R} = State) -> + Pres = case maps:get(pres_last, State, undefined) of + undefined -> + From = jid:make(U, S, R), + To = jid:remove_resource(From), + #presence{from = From, to = To, type = unavailable}; + P -> + P end, - NewPkt = xmpp:set_lang(Pkt, Lang), - NewState = - case NewPkt of - #presence{} -> - Presence0 = ejabberd_hooks:run_fold( - c2s_update_presence, Server, NewPkt, - [User, Server]), - Presence = ejabberd_hooks:run_fold( - user_send_packet, Server, Presence0, - [StateData, FromJID, ToJID]), - case ToJID of - #jid{user = User, server = Server, resource = <<"">>} -> - ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, Presence, StateData]), - presence_update(FromJID, Presence, - StateData); - _ -> - presence_track(FromJID, ToJID, Presence, - StateData) - end; - #iq{type = T, sub_els = [El]} when T == set; T == get -> - NS = xmpp:get_ns(El), - if NS == ?NS_BLOCKING; NS == ?NS_PRIVACY -> - IQ = xmpp:set_from_to(Pkt, FromJID, ToJID), - process_privacy_iq(IQ, StateData); - NS == ?NS_SESSION -> - Res = xmpp:make_iq_result(Pkt), - send_stanza(StateData, Res); - true -> - NewPkt0 = ejabberd_hooks:run_fold( - user_send_packet, Server, NewPkt, - [StateData, FromJID, ToJID]), - check_privacy_route(FromJID, StateData, FromJID, - ToJID, NewPkt0) - end; - _ -> - NewPkt0 = ejabberd_hooks:run_fold( - user_send_packet, Server, NewPkt, - [StateData, FromJID, ToJID]), - check_privacy_route(FromJID, StateData, FromJID, - ToJID, NewPkt0) - end, - ejabberd_hooks:run(c2s_loop_debug, - [{xmlstreamelement, Pkt}]), - fsm_next_state(session_established, NewState). - -wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> - Result = session_established(Event, StateData), - fsm_next_state(wait_for_resume, element(3, Result)); -wait_for_resume(timeout, StateData) -> - ?DEBUG("Timed out waiting for resumption of stream for ~s", - [jid:to_string(StateData#state.jid)]), - {stop, normal, StateData#state{mgmt_state = timeout}}; -wait_for_resume(Event, StateData) -> - ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), - fsm_next_state(wait_for_resume, StateData). - -handle_event(_Event, StateName, StateData) -> - fsm_next_state(StateName, StateData). - -handle_sync_event({get_presence}, _From, StateName, - StateData) -> - User = StateData#state.user, - PresLast = StateData#state.pres_last, - Show = get_showtag(PresLast), - Status = get_statustag(PresLast), - Resource = StateData#state.resource, - Reply = {User, Resource, Show, Status}, - fsm_reply(Reply, StateName, StateData); -handle_sync_event({get_last_presence}, _From, StateName, - StateData) -> - User = StateData#state.user, - Server = StateData#state.server, - PresLast = StateData#state.pres_last, - Resource = StateData#state.resource, - Reply = {User, Server, Resource, PresLast}, - fsm_reply(Reply, StateName, StateData); - -handle_sync_event(get_subscribed, _From, StateName, - StateData) -> - Subscribed = (?SETS):to_list(StateData#state.pres_f), - {reply, Subscribed, StateName, StateData}; -handle_sync_event({resume_session, Time}, _From, _StateName, - StateData) when element(1, StateData#state.sid) == Time -> - %% The old session should be closed before the new one is opened, so we do - %% this here instead of leaving it to the terminate callback - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource), - {stop, normal, {resume, StateData}, StateData#state{mgmt_state = resumed}}; -handle_sync_event({resume_session, _Time}, _From, StateName, - StateData) -> - {reply, {error, <<"Previous session not found">>}, StateName, StateData}; -handle_sync_event(_Event, _From, StateName, - StateData) -> - Reply = ok, fsm_reply(Reply, StateName, StateData). - -code_change(_OldVsn, StateName, StateData, _Extra) -> - {ok, StateName, StateData}. - -handle_info({send_text, Text}, StateName, StateData) -> - send_text(StateData, Text), - ejabberd_hooks:run(c2s_loop_debug, [Text]), - fsm_next_state(StateName, StateData); -handle_info(replaced, StateName, StateData) -> - Lang = StateData#state.lang, - Pkt = xmpp:serr_conflict(<<"Replaced by new connection">>, Lang), - handle_info({kick, replaced, Pkt}, StateName, StateData); -handle_info(kick, StateName, StateData) -> - Lang = StateData#state.lang, - Pkt = xmpp:serr_policy_violation(<<"has been kicked">>, Lang), - handle_info({kick, kicked_by_admin, Pkt}, StateName, StateData); -handle_info({kick, Reason, Pkt}, _StateName, StateData) -> - send_element(StateData, Pkt), - {stop, normal, - StateData#state{authenticated = Reason}}; -handle_info({route, _From, _To, {broadcast, Data}}, - StateName, StateData) -> - ?DEBUG("broadcast~n~p~n", [Data]), - case Data of - {item, IJID, ISubscription} -> - fsm_next_state(StateName, - roster_change(IJID, ISubscription, StateData)); - {exit, Reason} -> - Lang = StateData#state.lang, - send_element(StateData, xmpp:serr_conflict(Reason, Lang)), - {stop, normal, StateData}; - {privacy_list, PrivList, PrivListName} -> - case ejabberd_hooks:run_fold(privacy_updated_list, - StateData#state.server, - false, - [StateData#state.privacy_list, - PrivList]) of - false -> - fsm_next_state(StateName, StateData); - NewPL -> - PrivPushIQ = - #iq{type = set, - from = jid:remove_resource(StateData#state.jid), - to = StateData#state.jid, - id = <<"push", (randoms:get_string())/binary>>, - sub_els = [#privacy_query{ - lists = [#privacy_list{ - name = PrivListName}]}]}, - NewState = send_stanza(StateData, PrivPushIQ), - fsm_next_state(StateName, - NewState#state{privacy_list = NewPL}) - end; - {blocking, What} -> - NewState = route_blocking(What, StateData), - fsm_next_state(StateName, NewState); - _ -> - fsm_next_state(StateName, StateData) - end; -%% Process Packets that are to be send to the user -handle_info({route, From, To, Packet}, StateName, StateData) when ?is_stanza(Packet) -> - {Pass, NewState} = - case Packet of - #presence{type = T} -> - State = ejabberd_hooks:run_fold(c2s_presence_in, - StateData#state.server, - StateData, - [{From, To, Packet}]), - case T of - probe -> - LFrom = jid:tolower(From), - LBFrom = jid:remove_resource(LFrom), - NewStateData = - case (?SETS):is_element(LFrom, State#state.pres_a) - orelse (?SETS):is_element(LBFrom, State#state.pres_a) of - true -> State; - false -> - case (?SETS):is_element(LFrom, State#state.pres_f) of - true -> - A = (?SETS):add_element(LFrom, State#state.pres_a), - State#state{pres_a = A}; - false -> - case (?SETS):is_element(LBFrom, State#state.pres_f) of - true -> - A = (?SETS):add_element(LBFrom, State#state.pres_a), - State#state{pres_a = A}; - false -> - State - end - end - end, - process_presence_probe(From, To, NewStateData), - {false, NewStateData}; - error -> - NewA = ?SETS:del_element(jid:tolower(From), State#state.pres_a), - {true, State#state{pres_a = NewA}}; - subscribe -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - subscribed -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - unsubscribe -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - unsubscribed -> - SRes = is_privacy_allow(State, From, To, Packet, in), - {SRes, State}; - _ -> - case privacy_check_packet(State, From, To, Packet, in) of - allow -> - LFrom = jid:tolower(From), - LBFrom = jid:remove_resource(LFrom), - case (?SETS):is_element(LFrom, State#state.pres_a) - orelse (?SETS):is_element(LBFrom, State#state.pres_a) of - true -> - {true, State}; - false -> - case (?SETS):is_element(LFrom, State#state.pres_f) of - true -> - A = (?SETS):add_element(LFrom, State#state.pres_a), - {true, State#state{pres_a = A}}; - false -> - case (?SETS):is_element(LBFrom, - State#state.pres_f) of - true -> - A = (?SETS):add_element( - LBFrom, - State#state.pres_a), - {true, State#state{pres_a = A}}; - false -> - {true, State} - end - end - end; - deny -> {false, State} - end - end; - #iq{type = T} -> - case xmpp:has_subtag(Packet, #last{}) of - true when T == get; T == set -> - LFrom = jid:tolower(From), - LBFrom = jid:remove_resource(LFrom), - HasFromSub = ((?SETS):is_element(LFrom, StateData#state.pres_f) - orelse (?SETS):is_element(LBFrom, StateData#state.pres_f)) - andalso is_privacy_allow(StateData, To, From, #presence{}, out), - case HasFromSub of - true -> - case privacy_check_packet( - StateData, From, To, Packet, in) of - allow -> - {true, StateData}; - deny -> - ejabberd_router:route_error( - To, From, Packet, - xmpp:err_service_unavailable()), - {false, StateData} - end; - _ -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_forbidden()), - {false, StateData} - end; - _ -> - case privacy_check_packet(StateData, From, To, Packet, in) of - allow -> - {true, StateData}; - deny -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_service_unavailable()), - {false, StateData} - end - end; - #message{type = T} -> - case privacy_check_packet(StateData, From, To, Packet, in) of - allow -> - {true, StateData}; - deny -> - case T of - groupchat -> ok; - headline -> ok; - _ -> - case xmpp:has_subtag(Packet, #muc_user{}) of - true -> - ok; - false -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_service_unavailable()) - end - end, - {false, StateData} - end - end, + {reply, Pres, State}; +handle_call(get_subscribed, _From, #{pres_f := PresF} = State) -> + Subscribed = ?SETS:to_list(PresF), + {reply, Subscribed, State}; +handle_call(Request, From, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold( + c2s_handle_call, LServer, {noreply, State}, [Request, From]). + +handle_cast(closed, State) -> + handle_stream_close(State); +handle_cast(Msg, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_handle_cast, LServer, {noreply, State}, [Msg]). + +handle_info({route, From, To, Packet0}, #{server := Server} = State) -> + Packet = xmpp:set_from_to(Packet0, From, To), + LServer = jid:nameprep(Server), + {Pass, NewState} = case Packet of + #presence{} -> + process_presence_in(State, Packet); + #message{} -> + process_message_in(State, Packet); + #iq{} -> + process_iq_in(State, Packet) + end, if Pass -> - FixedPacket0 = xmpp:set_from_to(Packet, From, To), - FixedPacket = ejabberd_hooks:run_fold( - user_receive_packet, - NewState#state.server, - FixedPacket0, - [NewState, NewState#state.jid, From, To]), - SentStateData = send_packet(NewState, FixedPacket), + LServer = jid:nameprep(Server), + Packet1 = ejabberd_hooks:run_fold( + user_receive_packet, LServer, Packet, [NewState]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, SentStateData); - true -> - ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState) - end; -handle_info({'DOWN', Monitor, _Type, _Object, _Info}, - _StateName, StateData) - when Monitor == StateData#state.socket_monitor -> - if StateData#state.mgmt_state == active; - StateData#state.mgmt_state == pending -> - fsm_next_state(wait_for_resume, StateData); + xmpp_stream_in:send(NewState, Packet1); true -> - {stop, normal, StateData} + ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), + {noreply, NewState} end; -handle_info(system_shutdown, StateName, StateData) -> - case StateName of - wait_for_stream -> - send_header(StateData, ?MYNAME, {1,0}, <<"en">>), - send_element(StateData, xmpp:serr_system_shutdown()), - ok; - _ -> - send_element(StateData, xmpp:serr_system_shutdown()), - ok - end, - {stop, normal, StateData}; -handle_info({route_xmlstreamelement, El}, _StateName, StateData) -> - {next_state, NStateName, NStateData, _Timeout} = - session_established({xmlstreamelement, El}, StateData), - fsm_next_state(NStateName, NStateData); -handle_info({force_update_presence, LUser, LServer}, StateName, - #state{jid = #jid{luser = LUser, lserver = LServer}} = StateData) -> - NewStateData = case StateData#state.pres_last of - #presence{} -> - Presence = - ejabberd_hooks:run_fold(c2s_update_presence, - LServer, - StateData#state.pres_last, - [LUser, LServer]), - StateData2 = StateData#state{pres_last = Presence}, - presence_update(StateData2#state.jid, Presence, - StateData2), - StateData2; - undefined -> StateData - end, - fsm_next_state(StateName, NewStateData); -handle_info({send_filtered, Feature, From, To, Packet}, StateName, StateData) -> - Drop = ejabberd_hooks:run_fold(c2s_filter_packet, StateData#state.server, - true, [StateData#state.server, StateData, - Feature, To, Packet]), - NewStateData = if Drop -> - ?DEBUG("Dropping packet from ~p to ~p", - [jid:to_string(From), - jid:to_string(To)]), - StateData; - true -> - FinalPacket = xmpp:set_from_to(Packet, From, To), - case StateData#state.jid of - To -> - case privacy_check_packet(StateData, From, To, - FinalPacket, in) of - deny -> - StateData; - allow -> - send_stanza(StateData, FinalPacket) - end; - _ -> - ejabberd_router:route(From, To, FinalPacket), - StateData - end - end, - fsm_next_state(StateName, NewStateData); -handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> - Recipients = ejabberd_hooks:run_fold( - c2s_broadcast_recipients, StateData#state.server, - [], - [StateData#state.server, StateData, Type, From, Packet]), - lists:foreach( - fun(USR) -> - ejabberd_router:route( - From, jid:make(USR), Packet) - end, lists:usort(Recipients)), - fsm_next_state(StateName, StateData); -handle_info({set_csi_state, CsiState}, StateName, StateData) -> - fsm_next_state(StateName, StateData#state{csi_state = CsiState}); -handle_info({set_resume_timeout, Timeout}, StateName, StateData) -> - fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout}); -handle_info(dont_ask_offline, StateName, StateData) -> - fsm_next_state(StateName, StateData#state{ask_offline = false}); -handle_info(close, StateName, StateData) -> - ?DEBUG("Timeout waiting for stream management acknowledgement of ~s", - [jid:to_string(StateData#state.jid)]), - close(self()), - fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined}); -handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) -> - %% This happens if the resume_session/1 request timed out; the new session - %% now receives the late response. - ?DEBUG("Received old session state for ~s after failed resumption", - [jid:to_string(OldStateData#state.jid)]), - handle_unacked_stanzas(OldStateData#state{mgmt_resend = false}), - fsm_next_state(StateName, StateData); -handle_info(Info, StateName, StateData) -> - ?ERROR_MSG("Unexpected info: ~p", [Info]), - fsm_next_state(StateName, StateData). +handle_info(system_shutdown, State) -> + xmpp_stream_in:send(State, xmpp:serr_system_shutdown()); +handle_info(Info, #{server := Server} = State) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(c2s_handle_info, LServer, {noreply, State}, [Info]). --spec print_state(state()) -> state(). -print_state(State = #state{pres_t = T, pres_f = F, pres_a = A}) -> - State#state{pres_t = {pres_t, (?SETS):size(T)}, - pres_f = {pres_f, (?SETS):size(F)}, - pres_a = {pres_a, (?SETS):size(A)}}. - -terminate(_Reason, StateName, StateData) -> - case StateData#state.mgmt_state of - resumed -> - ?INFO_MSG("Closing former stream of resumed session for ~s", - [jid:to_string(StateData#state.jid)]); - _ -> - if StateName == session_established; - StateName == wait_for_resume -> - case StateData#state.authenticated of - replaced -> - ?INFO_MSG("(~w) Replaced session for ~s", - [StateData#state.socket, - jid:to_string(StateData#state.jid)]), - From = StateData#state.jid, - Lang = StateData#state.lang, - Status = <<"Replaced by new connection">>, - Packet = #presence{ - type = unavailable, - status = xmpp:mk_text(Status, Lang)}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - Status), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet); - _ -> - ?INFO_MSG("(~w) Close session for ~s", - [StateData#state.socket, - jid:to_string(StateData#state.jid)]), - EmptySet = (?SETS):new(), - case StateData of - #state{pres_last = undefined, pres_a = EmptySet} -> - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource); - _ -> - From = StateData#state.jid, - Packet = #presence{type = unavailable}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet) - end, - case StateData#state.mgmt_state of - timeout -> - Info = [{num_stanzas_in, - StateData#state.mgmt_stanzas_in}], - ejabberd_sm:set_offline_info(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - Info); - _ -> - ok - end - end, - handle_unacked_stanzas(StateData), - bounce_messages(); - true -> - ok - end - end, - catch send_trailer(StateData), - (StateData#state.sockmod):close(StateData#state.socket), +terminate(_Reason, _State) -> ok. -%%%---------------------------------------------------------------------- -%%% Internal functions -%%%---------------------------------------------------------------------- --spec change_shaper(state(), jid()) -> ok. -change_shaper(StateData, JID) -> - Shaper = acl:access_matches(StateData#state.shaper, - #{usr => jid:split(JID), ip => StateData#state.ip}, - StateData#state.server), - (StateData#state.sockmod):change_shaper(StateData#state.socket, - Shaper). - --spec send_text(state(), iodata()) -> ok | {error, any()}. -send_text(StateData, Text) when StateData#state.mgmt_state == pending -> - ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); -send_text(StateData, Text) when StateData#state.xml_socket -> - ?DEBUG("Send Text on stream = ~p", [Text]), - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamraw, Text}); -send_text(StateData, Text) when StateData#state.mgmt_state == active -> - ?DEBUG("Send XML on stream = ~p", [Text]), - case catch (StateData#state.sockmod):send(StateData#state.socket, Text) of - {'EXIT', _} -> - (StateData#state.sockmod):close(StateData#state.socket), - {error, closed}; - _ -> - ok - end; -send_text(StateData, Text) -> - ?DEBUG("Send XML on stream = ~p", [Text]), - (StateData#state.sockmod):send(StateData#state.socket, Text). - --spec send_element(state(), xmlel() | xmpp_element()) -> ok | {error, any()}. -send_element(StateData, El) when StateData#state.mgmt_state == pending -> - ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); -send_element(StateData, #xmlel{} = El) when StateData#state.xml_socket -> - ?DEBUG("Send XML on stream = ~p", [fxml:element_to_binary(El)]), - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamelement, El}); -send_element(StateData, #xmlel{} = El) -> - send_text(StateData, fxml:element_to_binary(El)); -send_element(StateData, Pkt) -> - send_element(StateData, xmpp:encode(Pkt, ?NS_CLIENT)). - --spec send_error(state(), xmlel() | stanza(), stanza_error()) -> state(). -send_error(StateData, Stanza, Error) -> - Type = xmpp:get_type(Stanza), - if Type == error; Type == result; - Type == <<"error">>; Type == <<"result">> -> - StateData; - true -> - send_stanza(StateData, xmpp:make_error(Stanza, Error)) - end. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. --spec send_stanza(state(), xmpp_element()) -> state(). -send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive -> - csi_filter_stanza(StateData, Stanza); -send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> - mgmt_queue_add(StateData, Stanza); -send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> - NewStateData = mgmt_queue_add(StateData, Stanza), - mgmt_send_stanza(NewStateData, Stanza); -send_stanza(StateData, Stanza) -> - send_element(StateData, Stanza), - StateData. +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec check_bl_c2s({inet:ip_address(), non_neg_integer()}, binary()) + -> false | {true, binary(), binary()}. +check_bl_c2s({IP, _Port}, Lang) -> + ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]). --spec send_packet(state(), xmpp_element()) -> state(). -send_packet(StateData, Packet) -> - case xmpp:is_stanza(Packet) of - true -> - send_stanza(StateData, Packet); - false -> - send_element(StateData, Packet), - StateData +-spec open_session(state(), binary()) -> {ok, state()} | {error, stanza_error(), state()}. +open_session(#{user := U, server := S, sid := SID, + socket := Socket, ip := IP, auth_module := AuthMod, + access := Access, lang := Lang} = State, R) -> + JID = jid:make(U, S, R), + LServer = JID#jid.lserver, + case acl:access_matches(Access, + #{usr => jid:split(JID), ip => IP}, + LServer) of + allow -> + ?INFO_MSG("(~w) Opened session for ~s", + [Socket, jid:to_string(JID)]), + change_shaper(State), + Conn = get_conn_type(State), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}], + ejabberd_sm:open_session(SID, U, LServer, R, Info), + State1 = State#{conn => Conn, resource => R, jid => JID}, + State2 = ejabberd_hooks:run_fold( + c2s_session_opened, LServer, State1, []), + {ok, State2}; + deny -> + ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]), + ?INFO_MSG("(~w) Forbidden session for ~s", + [Socket, jid:to_string(JID)]), + Txt = <<"Denied by ACL">>, + {error, xmpp:err_not_allowed(Txt, Lang), State} end. --spec send_header(state(), binary(), binary(), binary()) -> ok | {error, any()}. -send_header(StateData, Server, Version, Lang) -> - Header = #xmlel{name = Name, attrs = Attrs} = - xmpp:encode(#stream_start{version = Version, - lang = Lang, - xmlns = ?NS_CLIENT, - stream_xmlns = ?NS_STREAM, - id = StateData#state.streamid, - from = jid:make(Server)}), - if StateData#state.xml_socket -> - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamstart, Name, Attrs}); - true -> - send_text(StateData, fxml:element_to_header(Header)) +-spec process_iq_in(state(), iq()) -> {boolean(), state()}. +process_iq_in(State, #iq{} = IQ) -> + case privacy_check_packet(State, IQ, in) of + allow -> + {true, State}; + deny -> + route_error(IQ, xmpp:err_service_unavailable()), + {false, State} end. --spec send_trailer(state()) -> ok | {error, any()}. -send_trailer(StateData) - when StateData#state.mgmt_state == pending -> - ?DEBUG("Cannot send stream trailer while waiting for resumption", []); -send_trailer(StateData) - when StateData#state.xml_socket -> - (StateData#state.sockmod):send_xml(StateData#state.socket, - {xmlstreamend, <<"stream:stream">>}); -send_trailer(StateData) -> - send_text(StateData, ?STREAM_TRAILER). - --spec new_id() -> binary(). -new_id() -> randoms:get_string(). - --spec new_uniq_id() -> binary(). -new_uniq_id() -> - iolist_to_binary([randoms:get_string(), - integer_to_binary(p1_time_compat:unique_integer([positive]))]). - --spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket | - c2s_compressed_tls | http_bind. -get_conn_type(StateData) -> - case (StateData#state.sockmod):get_transport(StateData#state.socket) of - tcp -> c2s; - tls -> c2s_tls; - tcp_zlib -> c2s_compressed; - tls_zlib -> c2s_compressed_tls; - http_bind -> http_bind; - websocket -> websocket +-spec process_message_in(state(), message()) -> {boolean(), state()}. +process_message_in(State, #message{type = T} = Msg) -> + case privacy_check_packet(State, Msg, in) of + allow -> + {true, State}; + deny when T == groupchat; T == headline -> + ok; + deny -> + case xmpp:has_subtag(Msg, #muc_user{}) of + true -> + ok; + false -> + route_error(Msg, xmpp:err_service_unavailable()) + end, + {false, State} end. --spec process_presence_probe(jid(), jid(), state()) -> ok. -process_presence_probe(From, To, StateData) -> - LFrom = jid:tolower(From), - LBFrom = setelement(3, LFrom, <<"">>), - case StateData#state.pres_last of - undefined -> - ok; +-spec process_presence_in(state(), presence()) -> {boolean(), state()}. +process_presence_in(#{server := Server, pres_a := PresA} = State0, + #presence{from = From, to = To, type = T} = Pres) -> + LServer = jid:nameprep(Server), + State = ejabberd_hooks:run_fold(c2s_presence_in, LServer, State0, [Pres]), + case T of + probe -> + NewState = do_some_magic(State, From), + route_probe_reply(From, To, NewState), + {false, NewState}; + error -> + A = ?SETS:del_element(jid:tolower(From), PresA), + {true, State#{pres_a => A}}; _ -> - Cond = ((?SETS):is_element(LFrom, StateData#state.pres_f) - orelse - ((LFrom /= LBFrom) andalso - (?SETS):is_element(LBFrom, StateData#state.pres_f))), - if Cond -> - %% To is the one sending the presence (the probe target) - Packet = xmpp_util:add_delay_info( - StateData#state.pres_last, To, - StateData#state.pres_timestamp), - case privacy_check_packet(StateData, To, From, Packet, out) of - deny -> - ok; - allow -> - Pid=element(2, StateData#state.sid), - ejabberd_hooks:run(presence_probe_hook, StateData#state.server, [From, To, Pid]), - %% Don't route a presence probe to oneself - case From == To of - false -> - ejabberd_router:route(To, From, Packet); - true -> - ok - end - end; - true -> - ok + case privacy_check_packet(State, Pres, in) of + allow when T == error -> + {true, State}; + allow -> + NewState = do_some_magic(State, From), + {true, NewState}; + deny -> + {false, State} end end. -%% User updates his presence (non-directed presence packet) --spec presence_update(jid(), presence(), state()) -> state(). -presence_update(From, Packet, StateData) -> - #presence{type = Type} = Packet, - case Type of - unavailable -> - Status = xmpp:get_text(Packet#presence.status), - Info = [{ip, StateData#state.ip}, - {conn, StateData#state.conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, Status, Info), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - StateData#state{pres_last = undefined, - pres_timestamp = undefined, pres_a = (?SETS):new()}; - error -> StateData; - probe -> StateData; - subscribe -> StateData; - subscribed -> StateData; - unsubscribe -> StateData; - unsubscribed -> StateData; - _ -> - OldPriority = case StateData#state.pres_last of - undefined -> 0; - OldPresence -> get_priority_from_presence(OldPresence) - end, - NewPriority = get_priority_from_presence(Packet), - update_priority(NewPriority, Packet, StateData), - FromUnavail = (StateData#state.pres_last == undefined), - ?DEBUG("from unavail = ~p~n", [FromUnavail]), - NewStateData = StateData#state{pres_last = Packet, - pres_timestamp = p1_time_compat:timestamp()}, - NewState = if FromUnavail -> - ejabberd_hooks:run(user_available_hook, - NewStateData#state.server, - [NewStateData#state.jid]), - ResentStateData = if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> NewStateData - end, - presence_broadcast_first(From, ResentStateData, - Packet); +-spec route_probe_reply(jid(), jid(), state()) -> ok. +route_probe_reply(From, To, #{server := Server, pres_f := PresF, + pres_last := LastPres, + pres_timestamp := TS} = State) -> + LFrom = jid:tolower(From), + LBFrom = jid:remove_resource(LFrom), + case ?SETS:is_element(LFrom, PresF) + orelse ?SETS:is_element(LBFrom, PresF) of + true -> + %% To is my JID + Packet = xmpp_util:add_delay_info(LastPres, To, TS), + case privacy_check_packet(State, Packet, out) of + deny -> + ok; + allow -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run(presence_probe_hook, + LServer, + [From, To, self()]), + %% Don't route a presence probe to oneself + case From == To of + false -> + route(xmpp:set_from_to(Packet, To, From)); true -> - presence_broadcast_to_trusted(NewStateData, From, - NewStateData#state.pres_f, - NewStateData#state.pres_a, - Packet), - if OldPriority < 0, NewPriority >= 0 -> - resend_offline_messages(NewStateData); - true -> ok - end, - NewStateData - end, - NewState - end. + ok + end + end; + false -> + ok + end; +route_probe_reply(_, _, _) -> + ok. -%% User sends a directed presence packet --spec presence_track(jid(), jid(), presence(), state()) -> state(). -presence_track(From, To, Packet, StateData) -> - #presence{type = Type} = Packet, +-spec process_presence_out(state(), presence()) -> next_state(). +process_presence_out(#{user := User, server := Server, + lang := Lang, pres_a := PresA} = State, + #presence{from = From, to = To, type = Type} = Pres) -> + LServer = jid:nameprep(Server), LTo = jid:tolower(To), - User = StateData#state.user, - Server = StateData#state.server, - Lang = StateData#state.lang, - case privacy_check_packet(StateData, From, To, Packet, out) of + case privacy_check_packet(State, Pres, out) of deny -> ErrText = <<"Your active privacy list has denied " "the routing of this stanza.">>, Err = xmpp:err_not_acceptable(ErrText, Lang), - send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); + xmpp_stream_in:send_error(State, Pres, Err); allow when Type == subscribe; Type == subscribed; Type == unsubscribe; Type == unsubscribed -> - Access = gen_mod:get_module_opt(Server, mod_roster, access, + Access = gen_mod:get_module_opt(LServer, mod_roster, access, fun(A) when is_atom(A) -> A end, all), MyBareJID = jid:make(User, Server, <<"">>), - case acl:match_rule(Server, Access, MyBareJID) of + case acl:match_rule(LServer, Access, MyBareJID) of deny -> ErrText = <<"Denied by ACL">>, Err = xmpp:err_forbidden(ErrText, Lang), - send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); + xmpp_stream_in:send_error(State, Pres, Err); allow -> ejabberd_hooks:run(roster_out_subscription, - Server, + LServer, [User, Server, To, Type]), - ejabberd_router:route(jid:remove_resource(From), To, Packet), - StateData + BareFrom = jid:remove_resource(From), + route(xmpp:set_from_to(Pres, BareFrom, To)), + {noreply, State} end; allow when Type == error; Type == probe -> - ejabberd_router:route(From, To, Packet), - StateData; + route(Pres), + {noreply, State}; allow -> - ejabberd_router:route(From, To, Packet), + route(Pres), A = case Type of - available -> - ?SETS:add_element(LTo, StateData#state.pres_a); - unavailable -> - ?SETS:del_element(LTo, StateData#state.pres_a) + available -> ?SETS:add_element(LTo, PresA); + unavailable -> ?SETS:del_element(LTo, PresA) end, - StateData#state{pres_a = A} + {noreply, State#{pres_a => A}} end. --spec check_privacy_route(jid(), state(), jid(), jid(), stanza()) -> state(). -check_privacy_route(From, StateData, FromRoute, To, - Packet) -> - case privacy_check_packet(StateData, From, To, Packet, - out) of +-spec process_self_presence(state(), presence()) -> {noreply, state()}. +process_self_presence(#{ip := IP, conn := Conn, + auth_module := AuthMod, sid := SID, + user := U, server := S, resource := R} = State, + #presence{type = unavailable} = Pres) -> + Status = xmpp:get_text(Pres#presence.status), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}], + ejabberd_sm:unset_presence(SID, U, S, R, Status, Info), + State1 = broadcast_presence_unavailable(State, Pres), + State2 = maps:remove(pres_last, maps:remove(pres_timestamp, State1)), + {noreply, State2}; +process_self_presence(#{server := Server} = State, + #presence{type = available} = Pres) -> + LServer = jid:nameprep(Server), + PreviousPres = maps:get(pres_last, State, undefined), + update_priority(State, Pres), + State1 = ejabberd_hooks:run_fold(user_available_hook, LServer, State, [Pres]), + State2 = State1#{pres_last => Pres, + pres_timestamp => p1_time_compat:timestamp()}, + FromUnavailable = PreviousPres == undefined, + State3 = broadcast_presence_available(State2, Pres, FromUnavailable), + {noreply, State3}; +process_self_presence(State, _Pres) -> + {noreply, State}. + +-spec update_priority(state(), presence()) -> ok. +update_priority(#{ip := IP, conn := Conn, auth_module := AuthMod, + sid := SID, user := U, server := S, resource := R}, + Pres) -> + Priority = get_priority_from_presence(Pres), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthMod}], + ejabberd_sm:set_presence(SID, U, S, R, Priority, Pres, Info). + +-spec broadcast_presence_unavailable(state(), presence()) -> state(). +broadcast_presence_unavailable(#{pres_a := PresA} = State, Pres) -> + JIDs = filter_blocked(State, Pres, PresA), + route_multiple(State, JIDs, Pres), + State#{pres_a => ?SETS:new()}. + +-spec broadcast_presence_available(state(), presence(), boolean()) -> state(). +broadcast_presence_available(#{pres_a := PresA, pres_f := PresF, + pres_t := PresT} = State, + Pres, _FromUnavailable = true) -> + Probe = #presence{type = probe}, + TJIDs = filter_blocked(State, Probe, PresT), + FJIDs = filter_blocked(State, Pres, PresF), + route_multiple(State, TJIDs, Probe), + route_multiple(State, FJIDs, Pres), + State#{pres_a => ?SETS:union(PresA, PresF)}; +broadcast_presence_available(#{pres_a := PresA, pres_f := PresF} = State, + Pres, _FromUnavailable = false) -> + JIDs = filter_blocked(State, Pres, ?SETS:intersection(PresA, PresF)), + route_multiple(State, JIDs, Pres), + State. + +-spec check_privacy_then_route(state(), stanza()) -> next_state(). +check_privacy_then_route(#{lang := Lang} = State, Pkt) -> + case privacy_check_packet(State, Pkt, out) of deny -> - Lang = StateData#state.lang, ErrText = <<"Your active privacy list has denied " "the routing of this stanza.">>, Err = xmpp:err_not_acceptable(ErrText, Lang), - send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); + xmpp_stream_in:send_error(State, Pkt, Err); allow -> - ejabberd_router:route(FromRoute, To, Packet), - StateData - end. - -%% Check if privacy rules allow this delivery --spec privacy_check_packet(state(), jid(), jid(), stanza(), in | out) -> allow | deny. -privacy_check_packet(StateData, From, To, Packet, - Dir) -> - ejabberd_hooks:run_fold(privacy_check_packet, - StateData#state.server, allow, - [StateData#state.user, StateData#state.server, - StateData#state.privacy_list, {From, To, Packet}, - Dir]). - --spec is_privacy_allow(state(), jid(), jid(), stanza(), in | out) -> boolean(). -is_privacy_allow(StateData, From, To, Packet, Dir) -> - allow == - privacy_check_packet(StateData, From, To, Packet, Dir). - -%% Send presence when disconnecting --spec presence_broadcast(state(), jid(), ?SETS:set(), presence()) -> ok. -presence_broadcast(StateData, From, JIDSet, Packet) -> - JIDs = ?SETS:to_list(JIDSet), - JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), - Server = StateData#state.server, - send_multiple(From, Server, JIDs2, Packet). - --spec presence_broadcast_to_trusted( - state(), jid(), ?SETS:set(), ?SETS:set(), presence()) -> ok. -%% Send presence when updating presence -presence_broadcast_to_trusted(StateData, From, Trusted, JIDSet, Packet) -> - JIDs = ?SETS:to_list(?SETS:intersection(Trusted, JIDSet)), - JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), - Server = StateData#state.server, - send_multiple(From, Server, JIDs2, Packet). - -%% Send presence when connecting --spec presence_broadcast_first(jid(), state(), presence()) -> state(). -presence_broadcast_first(From, StateData, Packet) -> - JIDsProbe = - ?SETS:fold( - fun(JID, L) -> [JID | L] end, - [], - StateData#state.pres_t), - PacketProbe = #presence{type = probe}, - JIDs2Probe = format_and_check_privacy(From, StateData, PacketProbe, JIDsProbe, out), - Server = StateData#state.server, - send_multiple(From, Server, JIDs2Probe, PacketProbe), - {As, JIDs} = - ?SETS:fold( - fun(JID, {A, JID_list}) -> - {?SETS:add_element(JID, A), JID_list++[JID]} - end, - {StateData#state.pres_a, []}, - StateData#state.pres_f), - JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), - send_multiple(From, Server, JIDs2, Packet), - StateData#state{pres_a = As}. - --spec format_and_check_privacy( - jid(), state(), stanza(), [ljid()], in | out) -> [jid()]. -format_and_check_privacy(From, StateData, Packet, JIDs, Dir) -> - FJIDs = [jid:make(JID) || JID <- JIDs], - lists:filter( - fun(FJID) -> - case ejabberd_hooks:run_fold( - privacy_check_packet, StateData#state.server, - allow, - [StateData#state.user, - StateData#state.server, - StateData#state.privacy_list, - {From, FJID, Packet}, - Dir]) of - deny -> false; - allow -> true - end - end, - FJIDs). - --spec send_multiple(jid(), binary(), [jid()], stanza()) -> ok. -send_multiple(From, Server, JIDs, Packet) -> - ejabberd_router_multicast:route_multicast(From, Server, JIDs, Packet). - --spec roster_change(jid(), both | from | none | remove | to, state()) -> state(). -roster_change(IJID, ISubscription, StateData) -> - LIJID = jid:tolower(IJID), - IsFrom = (ISubscription == both) or (ISubscription == from), - IsTo = (ISubscription == both) or (ISubscription == to), - OldIsFrom = (?SETS):is_element(LIJID, StateData#state.pres_f), - FSet = if - IsFrom -> (?SETS):add_element(LIJID, StateData#state.pres_f); - true -> ?SETS:del_element(LIJID, StateData#state.pres_f) - end, - TSet = if - IsTo -> (?SETS):add_element(LIJID, StateData#state.pres_t); - true -> ?SETS:del_element(LIJID, StateData#state.pres_t) - end, - case StateData#state.pres_last of - undefined -> - StateData#state{pres_f = FSet, pres_t = TSet}; - P -> - ?DEBUG("roster changed for ~p~n", - [StateData#state.user]), - From = StateData#state.jid, - To = jid:make(IJID), - Cond1 = IsFrom andalso not OldIsFrom, - Cond2 = not IsFrom andalso OldIsFrom andalso - ((?SETS):is_element(LIJID, StateData#state.pres_a)), - if Cond1 -> - ?DEBUG("C1: ~p~n", [LIJID]), - case privacy_check_packet(StateData, From, To, P, out) - of - deny -> ok; - allow -> ejabberd_router:route(From, To, P) - end, - A = (?SETS):add_element(LIJID, StateData#state.pres_a), - StateData#state{pres_a = A, pres_f = FSet, - pres_t = TSet}; - Cond2 -> - ?DEBUG("C2: ~p~n", [LIJID]), - PU = #presence{type = unavailable}, - case privacy_check_packet(StateData, From, To, PU, out) - of - deny -> ok; - allow -> ejabberd_router:route(From, To, PU) - end, - A = ?SETS:del_element(LIJID, StateData#state.pres_a), - StateData#state{pres_a = A, pres_f = FSet, - pres_t = TSet}; - true -> StateData#state{pres_f = FSet, pres_t = TSet} - end + route(Pkt), + {noreply, State} end. --spec update_priority(integer(), presence(), state()) -> ok. -update_priority(Priority, Packet, StateData) -> - Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:set_presence(StateData#state.sid, - StateData#state.user, StateData#state.server, - StateData#state.resource, Priority, Packet, Info). +-spec privacy_check_packet(state(), stanza(), in | out) -> allow | deny. +privacy_check_packet(#{server := Server} = State, Pkt, Dir) -> + LServer = jid:nameprep(Server), + ejabberd_hooks:run_fold(privacy_check_packet, LServer, allow, [State, Pkt, Dir]). -spec get_priority_from_presence(presence()) -> integer(). get_priority_from_presence(#presence{priority = Prio}) -> @@ -2021,817 +554,129 @@ get_priority_from_presence(#presence{priority = Prio}) -> _ -> Prio end. --spec process_privacy_iq(iq(), state()) -> state(). -process_privacy_iq(#iq{from = From, to = To, - type = Type, lang = Lang} = IQ, StateData) -> - Txt = <<"No module is handling this query">>, - {Res, NewStateData} = - case Type of - get -> - R = ejabberd_hooks:run_fold( - privacy_iq_get, - StateData#state.server, - {error, xmpp:err_feature_not_implemented(Txt, Lang)}, - [IQ, StateData#state.privacy_list]), - {R, StateData}; - set -> - case ejabberd_hooks:run_fold( - privacy_iq_set, - StateData#state.server, - {error, xmpp:err_feature_not_implemented(Txt, Lang)}, - [IQ, StateData#state.privacy_list]) - of - {result, R, NewPrivList} -> - {{result, R}, - StateData#state{privacy_list = - NewPrivList}}; - R -> {R, StateData} - end - end, - IQRes = case Res of - {result, Result} -> - xmpp:make_iq_result(IQ, Result); - {error, Error} -> - xmpp:make_error(IQ, Error) - end, - ejabberd_router:route(To, From, IQRes), - NewStateData. +-spec filter_blocked(state(), presence(), ?SETS:set()) -> [jid()]. +filter_blocked(#{user := U, server := S, resource := R} = State, + Pres, LJIDSet) -> + From = jid:make(U, S, R), + ?SETS:fold( + fun(LJID, Acc) -> + To = jid:make(LJID), + Pkt = xmpp:set_from_to(Pres, From, To), + case privacy_check_packet(State, Pkt, out) of + allow -> [To|Acc]; + deny -> Acc + end + end, [], LJIDSet). + +-spec route(stanza()) -> ok. +route(Pkt) -> + From = xmpp:get_from(Pkt), + To = xmpp:get_to(Pkt), + ejabberd_router:route(From, To, Pkt). + +-spec route_error(stanza(), stanza_error()) -> ok. +route_error(Pkt, Err) -> + From = xmpp:get_from(Pkt), + To = xmpp:get_to(Pkt), + ejabberd_router:route_error(To, From, Pkt, Err). + +-spec route_multiple(state(), [jid()], stanza()) -> ok. +route_multiple(#{server := Server}, JIDs, Pkt) -> + LServer = jid:nameprep(Server), + From = xmpp:get_from(Pkt), + ejabberd_router_multicast:route_multicast(From, LServer, JIDs, Pkt). --spec resend_offline_messages(state()) -> ok. -resend_offline_messages(#state{ask_offline = true} = StateData) -> - case ejabberd_hooks:run_fold(resend_offline_messages_hook, - StateData#state.server, [], - [StateData#state.user, StateData#state.server]) - of - Rs -> %%when is_list(Rs) -> - lists:foreach(fun ({route, From, To, Packet}) -> - Pass = case privacy_check_packet(StateData, - From, To, - Packet, in) - of - allow -> true; - deny -> false - end, - if Pass -> - ejabberd_router:route(From, To, Packet); - true -> ok - end - end, - Rs) - end; -resend_offline_messages(_StateData) -> - ok. - --spec resend_subscription_requests(state()) -> state(). -resend_subscription_requests(#state{user = User, - server = Server} = StateData) -> - PendingSubscriptions = - ejabberd_hooks:run_fold(resend_subscription_requests_hook, - Server, [], [User, Server]), - lists:foldl(fun (XMLPacket, AccStateData) -> - send_packet(AccStateData, XMLPacket) +-spec resource_conflict_action(binary(), binary(), binary()) -> + {accept_resource, binary()} | closenew. +resource_conflict_action(U, S, R) -> + OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of + true -> + ejabberd_config:get_option( + {resource_conflict, S}, + fun(setresource) -> setresource; + (closeold) -> closeold; + (closenew) -> closenew; + (acceptnew) -> acceptnew + end); + false -> + acceptnew end, - StateData, - PendingSubscriptions). - --spec get_showtag(undefined | presence()) -> binary(). -get_showtag(undefined) -> <<"unavailable">>; -get_showtag(#presence{show = undefined}) -> <<"available">>; -get_showtag(#presence{show = Show}) -> atom_to_binary(Show, utf8). - --spec get_statustag(undefined | presence()) -> binary(). -get_statustag(#presence{status = Status}) -> xmpp:get_text(Status); -get_statustag(undefined) -> <<"">>. - --spec process_unauthenticated_stanza(state(), iq()) -> ok | {error, any()}. -process_unauthenticated_stanza(StateData, #iq{type = T, lang = L} = IQ) - when T == set; T == get -> - Lang = if L == undefined; L == <<"">> -> StateData#state.lang; - true -> L - end, - NewIQ = IQ#iq{lang = Lang}, - Res = ejabberd_hooks:run_fold(c2s_unauthenticated_iq, - StateData#state.server, empty, - [StateData#state.server, NewIQ, - StateData#state.ip]), - case Res of - empty -> - Txt = <<"Authentication required">>, - Err0 = xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)), - Err1 = Err0#iq{from = jid:make(<<>>, StateData#state.server, <<>>), - to = undefined}, - send_element(StateData, Err1); - _ -> - send_element(StateData, Res) - end; -process_unauthenticated_stanza(_StateData, _) -> - %% Drop any stanza, which isn't IQ stanza - ok. - --spec peerip(ejabberd_socket:sockmod(), - ejabberd_socket:socket()) -> - {inet:ip_address(), non_neg_integer()} | undefined. -peerip(SockMod, Socket) -> - IP = case SockMod of - gen_tcp -> inet:peername(Socket); - _ -> SockMod:peername(Socket) - end, - case IP of - {ok, IPOK} -> IPOK; - _ -> undefined - end. - -%% fsm_next_state_pack: Pack the StateData structure to improve -%% sharing. --spec fsm_next_state_pack(state_name(), state()) -> fsm_transition(). -fsm_next_state_pack(StateName, StateData) -> - fsm_next_state_gc(StateName, pack(StateData)). - --spec fsm_next_state_gc(state_name(), state()) -> fsm_transition(). -%% fsm_next_state_gc: Garbage collect the process heap to make use of -%% the newly packed StateData structure. -fsm_next_state_gc(StateName, PackedStateData) -> - erlang:garbage_collect(), - fsm_next_state(StateName, PackedStateData). - -%% fsm_next_state: Generate the next_state FSM tuple with different -%% timeout, depending on the future state --spec fsm_next_state(state_name(), state()) -> fsm_transition(). -fsm_next_state(session_established, #state{mgmt_max_queue = exceeded} = - StateData) -> - ?WARNING_MSG("ACK queue too long, terminating session for ~s", - [jid:to_string(StateData#state.jid)]), - Err = xmpp:serr_policy_violation(<<"Too many unacked stanzas">>, - StateData#state.lang), - send_element(StateData, Err), - {stop, normal, StateData#state{mgmt_resend = false}}; -fsm_next_state(session_established, #state{mgmt_state = pending} = StateData) -> - fsm_next_state(wait_for_resume, StateData); -fsm_next_state(session_established, StateData) -> - {next_state, session_established, StateData, - ?C2S_HIBERNATE_TIMEOUT}; -fsm_next_state(wait_for_resume, #state{mgmt_timeout = 0} = StateData) -> - {stop, normal, StateData}; -fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined, - sid = SID, jid = JID, ip = IP, - conn = Conn, auth_module = AuthModule, - server = Host} = StateData) -> - case StateData of - #state{mgmt_ack_timer = undefined} -> - ok; - #state{mgmt_ack_timer = Timer} -> - erlang:cancel_timer(Timer) - end, - ?INFO_MSG("Waiting for resumption of stream for ~s", - [jid:to_string(JID)]), - Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], - NewStateData = ejabberd_hooks:run_fold(c2s_session_pending, Host, StateData, - [SID, JID, Info]), - {next_state, wait_for_resume, - NewStateData#state{mgmt_state = pending, - mgmt_pending_since = os:timestamp()}, - NewStateData#state.mgmt_timeout}; -fsm_next_state(wait_for_resume, StateData) -> - Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), - Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), - {next_state, wait_for_resume, StateData, Timeout}; -fsm_next_state(StateName, StateData) -> - {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. - -%% fsm_reply: Generate the reply FSM tuple with different timeout, -%% depending on the future state --spec fsm_reply(_, state_name(), state()) -> fsm_reply(). -fsm_reply(Reply, session_established, StateData) -> - {reply, Reply, session_established, StateData, - ?C2S_HIBERNATE_TIMEOUT}; -fsm_reply(Reply, wait_for_resume, StateData) -> - Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), - Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), - {reply, Reply, wait_for_resume, StateData, Timeout}; -fsm_reply(Reply, StateName, StateData) -> - {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. - -%% Used by c2s blacklist plugins --spec is_ip_blacklisted(undefined | {inet:ip_address(), non_neg_integer()}, - binary()) -> false | {true, binary(), binary()}. -is_ip_blacklisted(undefined, _Lang) -> false; -is_ip_blacklisted({IP, _Port}, Lang) -> - ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]). - -%% Check from attributes -%% returns invalid-from|NewElement --spec check_from(stanza(), jid()) -> 'invalid-from' | stanza(). -check_from(Pkt, FromJID) -> - JID = xmpp:get_from(Pkt), - case JID of - undefined -> - Pkt; - #jid{} -> - if - (JID#jid.luser == FromJID#jid.luser) and - (JID#jid.lserver == FromJID#jid.lserver) and - (JID#jid.lresource == FromJID#jid.lresource) -> - Pkt; - (JID#jid.luser == FromJID#jid.luser) and - (JID#jid.lserver == FromJID#jid.lserver) and - (JID#jid.lresource == <<"">>) -> - Pkt; - true -> - 'invalid-from' - end - end. - -fsm_limit_opts(Opts) -> - case lists:keysearch(max_fsm_queue, 1, Opts) of - {value, {_, N}} when is_integer(N) -> [{max_queue, N}]; - _ -> - case ejabberd_config:get_option( - max_fsm_queue, - fun(I) when is_integer(I), I > 0 -> I end) of - undefined -> []; - N -> [{max_queue, N}] - end + Option = case OptionRaw of + setresource -> setresource; + closeold -> + acceptnew; %% ejabberd_sm will close old session + closenew -> closenew; + acceptnew -> acceptnew; + _ -> acceptnew %% default ejabberd behavior + end, + case Option of + acceptnew -> {accept_resource, R}; + closenew -> closenew; + setresource -> + Rnew = new_uniq_id(), + {accept_resource, Rnew} end. --spec bounce_messages() -> ok. -bounce_messages() -> - receive - {route, From, To, El} -> - ejabberd_router:route(From, To, El), bounce_messages() - after 0 -> ok - end. +-spec new_uniq_id() -> binary(). +new_uniq_id() -> + iolist_to_binary( + [randoms:get_string(), + integer_to_binary(p1_time_compat:unique_integer([positive]))]). --spec process_compression_request(compress(), state_name(), state()) -> fsm_next(). -process_compression_request(#compress{methods = []}, StateName, StateData) -> - send_element(StateData, #compress_failure{reason = 'setup-failed'}), - fsm_next_state(StateName, StateData); -process_compression_request(#compress{methods = Ms}, StateName, StateData) -> - case lists:member(<<"zlib">>, Ms) of - true -> - Socket = StateData#state.socket, - BCompressed = fxml:element_to_binary(xmpp:encode(#compressed{})), - ZlibSocket = (StateData#state.sockmod):compress(Socket, BCompressed), - fsm_next_state(wait_for_stream, - StateData#state{socket = ZlibSocket, - streamid = new_id()}); - false -> - send_element(StateData, - #compress_failure{reason = 'unsupported-method'}), - fsm_next_state(StateName, StateData) +-spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket | + c2s_compressed_tls | http_bind. +get_conn_type(State) -> + case xmpp_stream_in:get_transport(State) of + tcp -> c2s; + tls -> c2s_tls; + tcp_zlib -> c2s_compressed; + tls_zlib -> c2s_compressed_tls; + http_bind -> http_bind; + websocket -> websocket end. -%%%---------------------------------------------------------------------- -%%% XEP-0191 -%%%---------------------------------------------------------------------- - --spec route_blocking( - {block, [jid()]} | {unblock, [jid()]} | unblock_all, state()) -> state(). -route_blocking(What, StateData) -> - SubEl = case What of - {block, JIDs} -> - #block{items = JIDs}; - {unblock, JIDs} -> - #unblock{items = JIDs}; - unblock_all -> - #unblock{} - end, - PrivPushIQ = #iq{type = set, id = <<"push">>, sub_els = [SubEl], - from = jid:remove_resource(StateData#state.jid), - to = StateData#state.jid}, - %% No need to replace active privacy list here, - %% blocking pushes are always accompanied by - %% Privacy List pushes - send_stanza(StateData, PrivPushIQ). - -%%%---------------------------------------------------------------------- -%%% XEP-0198 -%%%---------------------------------------------------------------------- --spec stream_mgmt_enabled(state()) -> boolean(). -stream_mgmt_enabled(#state{mgmt_state = disabled}) -> - false; -stream_mgmt_enabled(_StateData) -> - true. - --spec dispatch_stream_mgmt(xmpp_element(), state()) -> state(). -dispatch_stream_mgmt(El, #state{mgmt_state = MgmtState} = StateData) - when MgmtState == active; - MgmtState == pending -> - perform_stream_mgmt(El, StateData); -dispatch_stream_mgmt(El, StateData) -> - negotiate_stream_mgmt(El, StateData). - --spec negotiate_stream_mgmt(xmpp_element(), state()) -> state(). -negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> - %% XEP-0198 says: "For client-to-server connections, the client MUST NOT - %% attempt to enable stream management until after it has completed Resource - %% Binding unless it is resuming a previous session". However, it also - %% says: "Stream management errors SHOULD be considered recoverable", so we - %% won't bail out. - send_element(StateData, #sm_failed{reason = 'unexpected-request', - xmlns = ?NS_STREAM_MGMT_3}), - StateData; -negotiate_stream_mgmt(Pkt, StateData) -> - Xmlns = xmpp:get_ns(Pkt), - case stream_mgmt_enabled(StateData) of +-spec change_shaper(state()) -> ok. +change_shaper(#{shaper := ShaperName, ip := IP, + user := U, server := S, resource := R} = State) -> + #jid{lserver = LServer} = JID = jid:make(U, S, R), + Shaper = acl:access_matches(ShaperName, + #{usr => jid:split(JID), ip => IP}, + LServer), + xmpp_stream_in:change_shaper(State, Shaper). + +-spec do_some_magic(state(), jid()) -> state(). +do_some_magic(#{pres_a := PresA, pres_f := PresF} = State, From) -> + LFrom = jid:tolower(From), + LBFrom = jid:remove_resource(LFrom), + case (?SETS):is_element(LFrom, PresA) orelse + (?SETS):is_element(LBFrom, PresA) of true -> - case Pkt of - #sm_enable{} -> - handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Pkt); - _ -> - Res = if is_record(Pkt, sm_a); - is_record(Pkt, sm_r); - is_record(Pkt, sm_resume) -> - #sm_failed{reason = 'unexpected-request', - xmlns = Xmlns}; - true -> - #sm_failed{reason = 'bad-request', - xmlns = Xmlns} - end, - send_element(StateData, Res), - StateData - end; + State; false -> - send_element(StateData, - #sm_failed{reason = 'service-unavailable', - xmlns = Xmlns}), - StateData - end. - --spec perform_stream_mgmt(xmpp_element(), state()) -> state(). -perform_stream_mgmt(Pkt, StateData) -> - case xmpp:get_ns(Pkt) of - Xmlns when Xmlns == StateData#state.mgmt_xmlns -> - case Pkt of - #sm_r{} -> - handle_r(StateData); - #sm_a{} -> - handle_a(StateData, Pkt); - _ -> - Res = if is_record(Pkt, sm_enable); - is_record(Pkt, sm_resume) -> - #sm_failed{reason = 'unexpected-request', - xmlns = Xmlns}; - true -> - #sm_failed{reason = 'bad-request', - xmlns = Xmlns} - end, - send_element(StateData, Res), - StateData - end; - _ -> - send_element(StateData, - #sm_failed{reason = 'unsupported-version', - xmlns = StateData#state.mgmt_xmlns}) - end. - --spec handle_enable(state(), sm_enable()) -> state(). -handle_enable(#state{mgmt_timeout = DefaultTimeout, - mgmt_max_timeout = MaxTimeout} = StateData, - #sm_enable{resume = Resume, max = Max}) -> - Timeout = if Resume == false -> - 0; - Max /= undefined, Max > 0, Max =< MaxTimeout -> - Max; - true -> - DefaultTimeout - end, - Res = if Timeout > 0 -> - ?INFO_MSG("Stream management with resumption enabled for ~s", - [jid:to_string(StateData#state.jid)]), - #sm_enabled{xmlns = StateData#state.mgmt_xmlns, - id = make_resume_id(StateData), - resume = true, - max = Timeout}; - true -> - ?INFO_MSG("Stream management without resumption enabled for ~s", - [jid:to_string(StateData#state.jid)]), - #sm_enabled{xmlns = StateData#state.mgmt_xmlns} - end, - send_element(StateData, Res), - StateData#state{mgmt_state = active, - mgmt_queue = queue:new(), - mgmt_timeout = Timeout * 1000}. - --spec handle_r(state()) -> state(). -handle_r(StateData) -> - Res = #sm_a{xmlns = StateData#state.mgmt_xmlns, - h = StateData#state.mgmt_stanzas_in}, - send_element(StateData, Res), - StateData. - --spec handle_a(state(), sm_a()) -> state(). -handle_a(StateData, #sm_a{h = H}) -> - NewStateData = check_h_attribute(StateData, H), - maybe_renew_ack_request(NewStateData). - --spec handle_resume(state(), sm_resume()) -> {ok, state()} | error. -handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> - R = case stream_mgmt_enabled(StateData) of - true -> - case inherit_session_state(StateData, PrevID) of - {ok, InheritedState, Info} -> - {ok, InheritedState, Info, H}; - {error, Err, InH} -> - {error, #sm_failed{reason = 'item-not-found', - h = InH, xmlns = Xmlns}, Err}; - {error, Err} -> - {error, #sm_failed{reason = 'item-not-found', - xmlns = Xmlns}, Err} - end; - false -> - {error, #sm_failed{reason = 'service-unavailable', - xmlns = Xmlns}, - <<"XEP-0198 disabled">>} - end, - case R of - {ok, ResumedState, ResumedInfo, NumHandled} -> - NewState = check_h_attribute(ResumedState, NumHandled), - AttrXmlns = NewState#state.mgmt_xmlns, - AttrId = make_resume_id(NewState), - AttrH = NewState#state.mgmt_stanzas_in, - send_element(NewState, #sm_resumed{xmlns = AttrXmlns, - h = AttrH, - previd = AttrId}), - SendFun = fun(_F, _T, El, Time) -> - NewEl = add_resent_delay_info(NewState, El, Time), - send_element(NewState, NewEl) - end, - handle_unacked_stanzas(NewState, SendFun), - send_element(NewState, #sm_r{xmlns = AttrXmlns}), - NewState1 = csi_flush_queue(NewState), - NewState2 = ejabberd_hooks:run_fold(c2s_session_resumed, - StateData#state.server, - NewState1, - [NewState1#state.sid, - NewState1#state.jid, - ResumedInfo]), - ?INFO_MSG("Resumed session for ~s", - [jid:to_string(NewState2#state.jid)]), - {ok, NewState2}; - {error, El, Msg} -> - send_element(StateData, El), - ?INFO_MSG("Cannot resume session for ~s@~s: ~s", - [StateData#state.user, StateData#state.server, Msg]), - error - end. - --spec check_h_attribute(state(), non_neg_integer()) -> state(). -check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) - when H > NumStanzasOut -> - ?DEBUG("~s acknowledged ~B stanzas, but only ~B were sent", - [jid:to_string(StateData#state.jid), H, NumStanzasOut]), - mgmt_queue_drop(StateData#state{mgmt_stanzas_out = H}, NumStanzasOut); -check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) -> - ?DEBUG("~s acknowledged ~B of ~B stanzas", - [jid:to_string(StateData#state.jid), H, NumStanzasOut]), - mgmt_queue_drop(StateData, H). - --spec update_num_stanzas_in(state(), xmpp_element()) -> state(). -update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El) - when MgmtState == active; - MgmtState == pending -> - NewNum = case {xmpp:is_stanza(El), StateData#state.mgmt_stanzas_in} of - {true, 4294967295} -> - 0; - {true, Num} -> - Num + 1; - {false, Num} -> - Num - end, - StateData#state{mgmt_stanzas_in = NewNum}; -update_num_stanzas_in(StateData, _El) -> - StateData. - -mgmt_send_stanza(StateData, Stanza) -> - case send_element(StateData, Stanza) of - ok -> - maybe_request_ack(StateData); - _ -> - StateData#state{mgmt_state = pending} - end. - -maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) -> - request_ack(StateData); -maybe_request_ack(StateData) -> - StateData. - -request_ack(#state{mgmt_xmlns = Xmlns, - mgmt_ack_timeout = AckTimeout} = StateData) -> - AckReq = #sm_r{xmlns = Xmlns}, - case {send_element(StateData, AckReq), AckTimeout} of - {ok, undefined} -> - ok; - {ok, Timeout} -> - Timer = erlang:send_after(Timeout, self(), close), - StateData#state{mgmt_ack_timer = Timer, - mgmt_stanzas_req = StateData#state.mgmt_stanzas_out}; - _ -> - StateData#state{mgmt_state = pending} - end. - -maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) -> - StateData; -maybe_renew_ack_request(#state{mgmt_ack_timer = Timer, - mgmt_queue = Queue, - mgmt_stanzas_out = NumStanzasOut, - mgmt_stanzas_req = NumStanzasReq} = StateData) -> - erlang:cancel_timer(Timer), - case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of - true -> - request_ack(StateData#state{mgmt_ack_timer = undefined}); - false -> - StateData#state{mgmt_ack_timer = undefined} - end. - --spec mgmt_queue_add(state(), xmpp_element()) -> state(). -mgmt_queue_add(StateData, El) -> - NewNum = case StateData#state.mgmt_stanzas_out of - 4294967295 -> - 0; - Num -> - Num + 1 - end, - NewQueue = queue:in({NewNum, p1_time_compat:timestamp(), El}, StateData#state.mgmt_queue), - NewState = StateData#state{mgmt_queue = NewQueue, - mgmt_stanzas_out = NewNum}, - check_queue_length(NewState). - --spec mgmt_queue_drop(state(), non_neg_integer()) -> state(). -mgmt_queue_drop(StateData, NumHandled) -> - NewQueue = jlib:queue_drop_while(fun({N, _T, _E}) -> N =< NumHandled end, - StateData#state.mgmt_queue), - StateData#state{mgmt_queue = NewQueue}. - --spec check_queue_length(state()) -> state(). -check_queue_length(#state{mgmt_max_queue = Limit} = StateData) - when Limit == infinity; - Limit == exceeded -> - StateData; -check_queue_length(#state{mgmt_queue = Queue, - mgmt_max_queue = Limit} = StateData) -> - case queue:len(Queue) > Limit of - true -> - StateData#state{mgmt_max_queue = exceeded}; - false -> - StateData - end. - --spec handle_unacked_stanzas(state(), fun((_, _, _, _) -> _)) -> ok. -handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData, F) - when MgmtState == active; - MgmtState == pending; - MgmtState == timeout -> - Queue = StateData#state.mgmt_queue, - case queue:len(Queue) of - 0 -> - ok; - N -> - ?DEBUG("~B stanza(s) were not acknowledged by ~s", - [N, jid:to_string(StateData#state.jid)]), - lists:foreach( - fun({_, Time, Pkt}) -> - From = xmpp:get_from(Pkt), - To = xmpp:get_to(Pkt), - case {From, To} of - {#jid{}, #jid{}} -> - F(From, To, Pkt, Time); - {_, _} -> - ?DEBUG("Dropping stanza due to invalid JID(s)", []) - end - end, queue:to_list(Queue)) - end; -handle_unacked_stanzas(_StateData, _F) -> - ok. - --spec handle_unacked_stanzas(state()) -> ok. -handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData) - when MgmtState == active; - MgmtState == pending; - MgmtState == timeout -> - ResendOnTimeout = - case StateData#state.mgmt_resend of - Resend when is_boolean(Resend) -> - Resend; - if_offline -> - Resource = StateData#state.resource, - case ejabberd_sm:get_user_resources(StateData#state.user, - StateData#state.server) of - [Resource] -> % Same resource opened new session - true; - [] -> - true; - _ -> - false - end - end, - Lang = StateData#state.lang, - ReRoute = case ResendOnTimeout of + case (?SETS):is_element(LFrom, PresF) of true -> - fun(From, To, El, Time) -> - NewEl = add_resent_delay_info(StateData, El, Time), - ejabberd_router:route(From, To, NewEl) - end; + A = (?SETS):add_element(LFrom, PresA), + State#{pres_a => A}; false -> - fun(From, To, El, _Time) -> - Txt = <<"User session terminated">>, - ejabberd_router:route_error( - To, From, El, xmpp:err_service_unavailable(Txt, Lang)) + case (?SETS):is_element(LBFrom, PresF) of + true -> + A = (?SETS):add_element(LBFrom, PresA), + State#{pres_a => A}; + false -> + State end - end, - F = fun(From, _To, #presence{}, _Time) -> - ?DEBUG("Dropping presence stanza from ~s", - [jid:to_string(From)]); - (From, To, #iq{} = El, _Time) -> - Txt = <<"User session terminated">>, - ejabberd_router:route_error( - To, From, El, xmpp:err_service_unavailable(Txt, Lang)); - (From, _To, #message{meta = #{carbon_copy := true}}, _Time) -> - %% XEP-0280 says: "When a receiving server attempts to deliver a - %% forked message, and that message bounces with an error for - %% any reason, the receiving server MUST NOT forward that error - %% back to the original sender." Resending such a stanza could - %% easily lead to unexpected results as well. - ?DEBUG("Dropping forwarded message stanza from ~s", - [jid:to_string(From)]); - (From, To, El, Time) -> - case ejabberd_hooks:run_fold(message_is_archived, - StateData#state.server, false, - [StateData, From, - StateData#state.jid, El]) of - true -> - ?DEBUG("Dropping archived message stanza from ~p", - [jid:to_string(xmpp:get_from(El))]); - false -> - ReRoute(From, To, El, Time) - end - end, - handle_unacked_stanzas(StateData, F); -handle_unacked_stanzas(_StateData) -> - ok. - --spec inherit_session_state(state(), binary()) -> {ok, state()} | - {error, binary()} | - {error, binary(), non_neg_integer()}. -inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> - case jlib:base64_to_term(ResumeID) of - {term, {R, Time}} -> - case ejabberd_sm:get_session_pid(U, S, R) of - none -> - case ejabberd_sm:get_offline_info(Time, U, S, R) of - none -> - {error, <<"Previous session PID not found">>}; - Info -> - case proplists:get_value(num_stanzas_in, Info) of - undefined -> - {error, <<"Previous session timed out">>}; - H -> - {error, <<"Previous session timed out">>, H} - end - end; - OldPID -> - OldSID = {Time, OldPID}, - case catch resume_session(OldSID) of - {resume, OldStateData} -> - NewSID = {Time, self()}, % Old time, new PID - Priority = case OldStateData#state.pres_last of - undefined -> - 0; - Presence -> - get_priority_from_presence(Presence) - end, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:open_session(NewSID, U, S, R, - Priority, Info), - {ok, StateData#state{conn = Conn, - sid = NewSID, - jid = OldStateData#state.jid, - resource = OldStateData#state.resource, - pres_t = OldStateData#state.pres_t, - pres_f = OldStateData#state.pres_f, - pres_a = OldStateData#state.pres_a, - pres_last = OldStateData#state.pres_last, - pres_timestamp = OldStateData#state.pres_timestamp, - privacy_list = OldStateData#state.privacy_list, - aux_fields = OldStateData#state.aux_fields, - mgmt_xmlns = OldStateData#state.mgmt_xmlns, - mgmt_queue = OldStateData#state.mgmt_queue, - mgmt_timeout = OldStateData#state.mgmt_timeout, - mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, - mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, - mgmt_state = active, - csi_state = active}, Info}; - {error, Msg} -> - {error, Msg}; - _ -> - {error, <<"Cannot grab session state">>} - end - end; - _ -> - {error, <<"Invalid 'previd' value">>} - end. - --spec resume_session({integer(), pid()}) -> any(). -resume_session({Time, PID}) -> - (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 15000). - --spec make_resume_id(state()) -> binary(). -make_resume_id(StateData) -> - {Time, _} = StateData#state.sid, - jlib:term_to_base64({StateData#state.resource, Time}). - --spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza(). -add_resent_delay_info(_State, #iq{} = El, _Time) -> - El; -add_resent_delay_info(#state{server = From}, El, Time) -> - xmpp_util:add_delay_info(El, jid:make(From), Time, <<"Resent">>). - -%%%---------------------------------------------------------------------- -%%% XEP-0352 -%%%---------------------------------------------------------------------- --spec csi_filter_stanza(state(), stanza()) -> state(). -csi_filter_stanza(#state{csi_state = CsiState, jid = JID, server = Server} = - StateData, Stanza) -> - {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server, - {StateData, [Stanza]}, - [Server, JID, Stanza]), - StateData2 = lists:foldl(fun(CurStanza, AccState) -> - send_stanza(AccState, CurStanza) - end, StateData1#state{csi_state = active}, - Stanzas), - StateData2#state{csi_state = CsiState}. - --spec csi_flush_queue(state()) -> state(). -csi_flush_queue(#state{csi_state = CsiState, jid = JID, server = Server} = - StateData) -> - {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server, - {StateData, []}, - [Server, JID]), - StateData2 = lists:foldl(fun(CurStanza, AccState) -> - send_stanza(AccState, CurStanza) - end, StateData1#state{csi_state = active}, - Stanzas), - StateData2#state{csi_state = CsiState}. - -%%%---------------------------------------------------------------------- -%%% JID Set memory footprint reduction code -%%%---------------------------------------------------------------------- - -%% Try to reduce the heap footprint of the four presence sets -%% by ensuring that we re-use strings and Jids wherever possible. --spec pack(state()) -> state(). -pack(S = #state{pres_a = A, pres_f = F, - pres_t = T}) -> - {NewA, Pack2} = pack_jid_set(A, gb_trees:empty()), - {NewF, Pack3} = pack_jid_set(F, Pack2), - {NewT, _Pack4} = pack_jid_set(T, Pack3), - S#state{pres_a = NewA, pres_f = NewF, - pres_t = NewT}. - -pack_jid_set(Set, Pack) -> - Jids = (?SETS):to_list(Set), - {PackedJids, NewPack} = pack_jids(Jids, Pack, []), - {(?SETS):from_list(PackedJids), NewPack}. - -pack_jids([], Pack, Acc) -> {Acc, Pack}; -pack_jids([{U, S, R} = Jid | Jids], Pack, Acc) -> - case gb_trees:lookup(Jid, Pack) of - {value, PackedJid} -> - pack_jids(Jids, Pack, [PackedJid | Acc]); - none -> - {NewU, Pack1} = pack_string(U, Pack), - {NewS, Pack2} = pack_string(S, Pack1), - {NewR, Pack3} = pack_string(R, Pack2), - NewJid = {NewU, NewS, NewR}, - NewPack = gb_trees:insert(NewJid, NewJid, Pack3), - pack_jids(Jids, NewPack, [NewJid | Acc]) - end. - -pack_string(String, Pack) -> - case gb_trees:lookup(String, Pack) of - {value, PackedString} -> {PackedString, Pack}; - none -> {String, gb_trees:insert(String, String, Pack)} + end end. -transform_listen_option(Opt, Opts) -> - [Opt|Opts]. - --spec identity([{atom(), binary()}]) -> binary(). -identity(Props) -> - case proplists:get_value(authzid, Props, <<>>) of - <<>> -> proplists:get_value(username, Props, <<>>); - AuthzId -> AuthzId +-spec fsm_limit_opts([proplists:property()]) -> [proplists:property()]. +fsm_limit_opts(Opts) -> + case lists:keysearch(max_fsm_queue, 1, Opts) of + {value, {_, N}} when is_integer(N) -> [{max_queue, N}]; + _ -> + case ejabberd_config:get_option( + max_fsm_queue, + fun(I) when is_integer(I), I > 0 -> I end) of + undefined -> []; + N -> [{max_queue, N}] + end end. - -opt_type(domain_certfile) -> fun iolist_to_binary/1; -opt_type(max_fsm_queue) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(resource_conflict) -> - fun (setresource) -> setresource; - (closeold) -> closeold; - (closenew) -> closenew; - (acceptnew) -> acceptnew - end; -opt_type(_) -> - [domain_certfile, max_fsm_queue, resource_conflict]. -- cgit v1.2.3