From 2187bccc38f1b8a5a94b07bc115a89face211bf1 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Thu, 3 Jun 2010 00:01:36 +1000 Subject: consistent hashing support. WARNING: update exmpp before running this --- src/ejabberd_c2s.erl | 187 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 135 insertions(+), 52 deletions(-) (limited to 'src/ejabberd_c2s.erl') diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 2991677e1..11a0a39ab 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -35,7 +35,7 @@ %% External exports -export([start/2, stop/1, - start_link/2, + start_link/3, send_text/2, send_element/2, socket_type/0, @@ -56,8 +56,9 @@ code_change/4, handle_info/3, terminate/3, - print_state/1 - ]). + print_state/1, + migrate/3 + ]). -export([get_state/1]). @@ -100,6 +101,7 @@ conn = unknown, auth_module = unknown, ip, + fsm_limit_opts, lang}). %-define(DBGFSM, true). @@ -112,11 +114,12 @@ %% Module start with or without supervisor: -ifdef(NO_TRANSIENT_SUPERVISORS). --define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s, [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS)). +-define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s, + [SockData, Opts, FSMLimitOpts], + FSMLimitOpts ++ ?FSMOPTS)). -else. -define(SUPERVISOR_START, supervisor:start_child(ejabberd_c2s_sup, - [SockData, Opts])). + [SockData, Opts, FSMLimitOpts])). -endif. %% This is the timeout to apply between event when starting a new @@ -155,12 +158,17 @@ %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- +start(StateName, #state{fsm_limit_opts = Opts} = State) -> + start(StateName, State, Opts); start(SockData, Opts) -> + start(SockData, Opts, fsm_limit_opts(Opts)). + +start(SockData, Opts, FSMLimitOpts) -> ?SUPERVISOR_START. -start_link(SockData, Opts) -> - ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS). +start_link(SockData, Opts, FSMLimitOpts) -> + ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts, FSMLimitOpts], + FSMLimitOpts ++ ?FSMOPTS). socket_type() -> xml_stream. @@ -177,6 +185,9 @@ get_state(FsmRef) -> stop(FsmRef) -> ?GEN_FSM:send_event(FsmRef, closed). +migrate(FsmRef, Node, After) -> + ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}). + %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- @@ -188,7 +199,7 @@ stop(FsmRef) -> %% ignore | %% {stop, StopReason} %%---------------------------------------------------------------------- -init([{SockMod, Socket}, Opts]) -> +init([{SockMod, Socket}, Opts, FSMLimitOpts]) -> Access = case lists:keysearch(access, 1, Opts) of {value, {_, A}} -> A; _ -> all @@ -212,7 +223,12 @@ init([{SockMod, Socket}, Opts]) -> (_) -> false end, Opts), TLSOpts = [verify_none | TLSOpts1], - IP = peerip(SockMod, Socket), + IP = case lists:keysearch(frontend_ip, 1, Opts) of + {value, {_, IP1}} -> + IP1; + _ -> + peerip(SockMod, Socket) + end, %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of true -> @@ -240,8 +256,29 @@ init([{SockMod, Socket}, Opts]) -> streamid = new_id(), access = Access, shaper = Shaper, - ip = IP}, + ip = IP, + fsm_limit_opts = FSMLimitOpts}, ?C2S_OPEN_TIMEOUT} + end; +init([StateName, StateData, _FSMLimitOpts]) -> + MRef = (StateData#state.sockmod):monitor(StateData#state.socket), + if StateName == session_established -> + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + {Time, _} = StateData#state.sid, + SID = {Time, self()}, + Priority = case StateData#state.pres_last of + undefined -> + undefined; + El -> + exmpp_presence:get_priority(El) + end, + ejabberd_sm:open_session(SID, StateData#state.jid, Priority, Info), + NewStateData = StateData#state{sid = SID, socket_monitor = MRef}, + {ok, StateName, NewStateData}; + true -> + {ok, StateName, StateData#state{socket_monitor = MRef}} end. %% Return list of all available resources of contacts, @@ -460,12 +497,12 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> exmpp_jid:to_binary(JID), AuthModule]), SID = {now(), self()}, Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, AuthModule}], + %% Info = [{ip, StateData#state.ip}, {conn, Conn}, + %% {auth_module, AuthModule}], Res = exmpp_server_legacy_auth:success(El), send_element(StateData, Res), - ejabberd_sm:open_session( - SID, exmpp_jid:make(U, StateData#state.server, R), Info), + %% ejabberd_sm:open_session( + %% SID, exmpp_jid:make(U, StateData#state.server, R), Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, @@ -479,19 +516,19 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> privacy_get_user_list, StateData#state.server, #userlist{}, [UBinary, StateData#state.server]), - fsm_next_state(session_established, - StateData#state{ - sasl_state = 'undefined', - %not used anymore, let the GC work. - user = list_to_binary(U), - resource = list_to_binary(R), - jid = JID, - sid = SID, - conn = Conn, - auth_module = AuthModule, - pres_f = ?SETS:from_list(Fs1), - pres_t = ?SETS:from_list(Ts1), - privacy_list = PrivList}); + maybe_migrate(session_established, + StateData#state{ + sasl_state = 'undefined', + %not used anymore, let the GC work. + user = list_to_binary(U), + resource = list_to_binary(R), + jid = JID, + sid = SID, + conn = Conn, + auth_module = AuthModule, + pres_f = ?SETS:from_list(Fs1), + pres_t = ?SETS:from_list(Ts1), + privacy_list = PrivList}); _ -> ?INFO_MSG( "(~w) Failed legacy authentication for ~s", @@ -805,19 +842,19 @@ wait_for_session({xmlstreamelement, El}, StateData) -> [StateData#state.user, StateData#state.server]), SID = {now(), self()}, Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:open_session( - SID, JID, Info), - fsm_next_state(session_established, - StateData#state{ - sasl_state = 'undefined', - %not used anymore, let the GC work. - sid = SID, - conn = Conn, - pres_f = ?SETS:from_list(Fs1), - pres_t = ?SETS:from_list(Ts1), - privacy_list = PrivList}); + %% Info = [{ip, StateData#state.ip}, {conn, Conn}, + %% {auth_module, StateData#state.auth_module}], + %% ejabberd_sm:open_session( + %% SID, JID, Info), + maybe_migrate(session_established, + StateData#state{ + sasl_state = 'undefined', + %%not used anymore, let the GC work. + sid = SID, + conn = Conn, + pres_f = ?SETS:from_list(Fs1), + pres_t = ?SETS:from_list(Ts1), + privacy_list = PrivList}); _ -> ejabberd_hooks:run(forbidden_session_hook, StateData#state.server, [JID]), @@ -997,6 +1034,8 @@ session_established2(El, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() -> + fsm_migrate(StateName, StateData, Node, After * 2); handle_event(_Event, StateName, StateData) -> fsm_next_state(StateName, StateData). @@ -1337,6 +1376,20 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Purpose: Shutdown the fsm %% Returns: any %%---------------------------------------------------------------------- +terminate({migrated, ClonePid}, StateName, StateData) -> + if StateName == session_established -> + ?INFO_MSG("(~w) Migrating ~s to ~p on node ~p", + [StateData#state.socket, + exmpp_jid:to_binary(StateData#state.jid), + ClonePid, node(ClonePid)]), + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.jid); + true -> + ok + end, + (StateData#state.sockmod):change_controller( + StateData#state.socket, ClonePid), + ok; terminate(_Reason, StateName, StateData) -> %%TODO: resource could be 'undefined' if terminate before bind? case StateName of @@ -1504,16 +1557,21 @@ get_auth_tags([], U, P, D, R) -> get_conn_type(StateData) -> case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of - gen_tcp -> c2s; - tls -> c2s_tls; - ejabberd_zlib -> - case ejabberd_zlib:get_sockmod((StateData#state.socket)#socket_state.socket) of - gen_tcp -> c2s_compressed; - tls -> c2s_compressed_tls - end; - ejabberd_http_poll -> http_poll; - ejabberd_http_bind -> http_bind; - _ -> unknown + gen_tcp -> c2s; + tls -> c2s_tls; + ejabberd_zlib -> + if is_pid(StateData#state.socket) -> + unknown; + true -> + case ejabberd_zlib:get_sockmod( + (StateData#state.socket)#socket_state.socket) of + gen_tcp -> c2s_compressed; + tls -> c2s_compressed_tls + end + end; + ejabberd_http_poll -> http_poll; + ejabberd_http_bind -> http_bind; + _ -> unknown end. process_presence_probe(From, To, StateData) -> @@ -2072,6 +2130,27 @@ peerip(SockMod, Socket) -> _ -> undefined end. +maybe_migrate(StateName, StateData) -> + case ejabberd_cluster:get_node({StateData#state.user, + StateData#state.server}) of + Node when Node == node() -> + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + #state{user = U, server = S, jid = JID, sid = SID} = StateData, + ejabberd_sm:open_session(SID, JID, Info), + case ejabberd_cluster:get_node_new({U, S}) of + Node -> + ok; + NewNode -> + After = ejabberd_cluster:rehash_timeout(), + migrate(self(), NewNode, After) + end, + fsm_next_state(StateName, StateData); + Node -> + fsm_migrate(StateName, StateData, Node, 0) + end. + %% fsm_next_state: Generate the next_state FSM tuple with different %% timeout, depending on the future state fsm_next_state(session_established, StateData) -> @@ -2079,6 +2158,10 @@ fsm_next_state(session_established, StateData) -> fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. +fsm_migrate(StateName, StateData, Node, Timeout) -> + {migrate, StateData, + {Node, ?MODULE, start, [StateName, StateData]}, Timeout}. + %% fsm_reply: Generate the reply FSM tuple with different timeout, %% depending on the future state fsm_reply(Reply, session_established, StateData) -> -- cgit v1.2.3