Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2010-06-02 18:01:36 +0400
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2010-06-02 18:01:36 +0400
commit2187bccc38f1b8a5a94b07bc115a89face211bf1 (patch)
treef6c7b4446ebd71218daaef9aa7a32f4d74f8ca39 /src/ejabberd_c2s.erl
parent6c0c30c0320468b3979459a85374531fdba952e3 (diff)
consistent hashing support. WARNING: update exmpp before running this
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r--src/ejabberd_c2s.erl187
1 files changed, 135 insertions, 52 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 2991677e1..11a0a39ab 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -35,7 +35,7 @@
%% External exports
-export([start/2,
stop/1,
- start_link/2,
+ start_link/3,
send_text/2,
send_element/2,
socket_type/0,
@@ -56,8 +56,9 @@
code_change/4,
handle_info/3,
terminate/3,
- print_state/1
- ]).
+ print_state/1,
+ migrate/3
+ ]).
-export([get_state/1]).
@@ -100,6 +101,7 @@
conn = unknown,
auth_module = unknown,
ip,
+ fsm_limit_opts,
lang}).
%-define(DBGFSM, true).
@@ -112,11 +114,12 @@
%% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS).
--define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s, [SockData, Opts],
- fsm_limit_opts(Opts) ++ ?FSMOPTS)).
+-define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s,
+ [SockData, Opts, FSMLimitOpts],
+ FSMLimitOpts ++ ?FSMOPTS)).
-else.
-define(SUPERVISOR_START, supervisor:start_child(ejabberd_c2s_sup,
- [SockData, Opts])).
+ [SockData, Opts, FSMLimitOpts])).
-endif.
%% This is the timeout to apply between event when starting a new
@@ -155,12 +158,17 @@
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
+start(StateName, #state{fsm_limit_opts = Opts} = State) ->
+ start(StateName, State, Opts);
start(SockData, Opts) ->
+ start(SockData, Opts, fsm_limit_opts(Opts)).
+
+start(SockData, Opts, FSMLimitOpts) ->
?SUPERVISOR_START.
-start_link(SockData, Opts) ->
- ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts],
- fsm_limit_opts(Opts) ++ ?FSMOPTS).
+start_link(SockData, Opts, FSMLimitOpts) ->
+ ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts, FSMLimitOpts],
+ FSMLimitOpts ++ ?FSMOPTS).
socket_type() ->
xml_stream.
@@ -177,6 +185,9 @@ get_state(FsmRef) ->
stop(FsmRef) ->
?GEN_FSM:send_event(FsmRef, closed).
+migrate(FsmRef, Node, After) ->
+ ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}).
+
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
@@ -188,7 +199,7 @@ stop(FsmRef) ->
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
-init([{SockMod, Socket}, Opts]) ->
+init([{SockMod, Socket}, Opts, FSMLimitOpts]) ->
Access = case lists:keysearch(access, 1, Opts) of
{value, {_, A}} -> A;
_ -> all
@@ -212,7 +223,12 @@ init([{SockMod, Socket}, Opts]) ->
(_) -> false
end, Opts),
TLSOpts = [verify_none | TLSOpts1],
- IP = peerip(SockMod, Socket),
+ IP = case lists:keysearch(frontend_ip, 1, Opts) of
+ {value, {_, IP1}} ->
+ IP1;
+ _ ->
+ peerip(SockMod, Socket)
+ end,
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
true ->
@@ -240,8 +256,29 @@ init([{SockMod, Socket}, Opts]) ->
streamid = new_id(),
access = Access,
shaper = Shaper,
- ip = IP},
+ ip = IP,
+ fsm_limit_opts = FSMLimitOpts},
?C2S_OPEN_TIMEOUT}
+ end;
+init([StateName, StateData, _FSMLimitOpts]) ->
+ MRef = (StateData#state.sockmod):monitor(StateData#state.socket),
+ if StateName == session_established ->
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ {Time, _} = StateData#state.sid,
+ SID = {Time, self()},
+ Priority = case StateData#state.pres_last of
+ undefined ->
+ undefined;
+ El ->
+ exmpp_presence:get_priority(El)
+ end,
+ ejabberd_sm:open_session(SID, StateData#state.jid, Priority, Info),
+ NewStateData = StateData#state{sid = SID, socket_monitor = MRef},
+ {ok, StateName, NewStateData};
+ true ->
+ {ok, StateName, StateData#state{socket_monitor = MRef}}
end.
%% Return list of all available resources of contacts,
@@ -460,12 +497,12 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
exmpp_jid:to_binary(JID), AuthModule]),
SID = {now(), self()},
Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, AuthModule}],
+ %% Info = [{ip, StateData#state.ip}, {conn, Conn},
+ %% {auth_module, AuthModule}],
Res = exmpp_server_legacy_auth:success(El),
send_element(StateData, Res),
- ejabberd_sm:open_session(
- SID, exmpp_jid:make(U, StateData#state.server, R), Info),
+ %% ejabberd_sm:open_session(
+ %% SID, exmpp_jid:make(U, StateData#state.server, R), Info),
change_shaper(StateData, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
@@ -479,19 +516,19 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
privacy_get_user_list, StateData#state.server,
#userlist{},
[UBinary, StateData#state.server]),
- fsm_next_state(session_established,
- StateData#state{
- sasl_state = 'undefined',
- %not used anymore, let the GC work.
- user = list_to_binary(U),
- resource = list_to_binary(R),
- jid = JID,
- sid = SID,
- conn = Conn,
- auth_module = AuthModule,
- pres_f = ?SETS:from_list(Fs1),
- pres_t = ?SETS:from_list(Ts1),
- privacy_list = PrivList});
+ maybe_migrate(session_established,
+ StateData#state{
+ sasl_state = 'undefined',
+ %not used anymore, let the GC work.
+ user = list_to_binary(U),
+ resource = list_to_binary(R),
+ jid = JID,
+ sid = SID,
+ conn = Conn,
+ auth_module = AuthModule,
+ pres_f = ?SETS:from_list(Fs1),
+ pres_t = ?SETS:from_list(Ts1),
+ privacy_list = PrivList});
_ ->
?INFO_MSG(
"(~w) Failed legacy authentication for ~s",
@@ -805,19 +842,19 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
[StateData#state.user, StateData#state.server]),
SID = {now(), self()},
Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
- ejabberd_sm:open_session(
- SID, JID, Info),
- fsm_next_state(session_established,
- StateData#state{
- sasl_state = 'undefined',
- %not used anymore, let the GC work.
- sid = SID,
- conn = Conn,
- pres_f = ?SETS:from_list(Fs1),
- pres_t = ?SETS:from_list(Ts1),
- privacy_list = PrivList});
+ %% Info = [{ip, StateData#state.ip}, {conn, Conn},
+ %% {auth_module, StateData#state.auth_module}],
+ %% ejabberd_sm:open_session(
+ %% SID, JID, Info),
+ maybe_migrate(session_established,
+ StateData#state{
+ sasl_state = 'undefined',
+ %%not used anymore, let the GC work.
+ sid = SID,
+ conn = Conn,
+ pres_f = ?SETS:from_list(Fs1),
+ pres_t = ?SETS:from_list(Ts1),
+ privacy_list = PrivList});
_ ->
ejabberd_hooks:run(forbidden_session_hook,
StateData#state.server, [JID]),
@@ -997,6 +1034,8 @@ session_established2(El, StateData) ->
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
+handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() ->
+ fsm_migrate(StateName, StateData, Node, After * 2);
handle_event(_Event, StateName, StateData) ->
fsm_next_state(StateName, StateData).
@@ -1337,6 +1376,20 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
+terminate({migrated, ClonePid}, StateName, StateData) ->
+ if StateName == session_established ->
+ ?INFO_MSG("(~w) Migrating ~s to ~p on node ~p",
+ [StateData#state.socket,
+ exmpp_jid:to_binary(StateData#state.jid),
+ ClonePid, node(ClonePid)]),
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.jid);
+ true ->
+ ok
+ end,
+ (StateData#state.sockmod):change_controller(
+ StateData#state.socket, ClonePid),
+ ok;
terminate(_Reason, StateName, StateData) ->
%%TODO: resource could be 'undefined' if terminate before bind?
case StateName of
@@ -1504,16 +1557,21 @@ get_auth_tags([], U, P, D, R) ->
get_conn_type(StateData) ->
case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of
- gen_tcp -> c2s;
- tls -> c2s_tls;
- ejabberd_zlib ->
- case ejabberd_zlib:get_sockmod((StateData#state.socket)#socket_state.socket) of
- gen_tcp -> c2s_compressed;
- tls -> c2s_compressed_tls
- end;
- ejabberd_http_poll -> http_poll;
- ejabberd_http_bind -> http_bind;
- _ -> unknown
+ gen_tcp -> c2s;
+ tls -> c2s_tls;
+ ejabberd_zlib ->
+ if is_pid(StateData#state.socket) ->
+ unknown;
+ true ->
+ case ejabberd_zlib:get_sockmod(
+ (StateData#state.socket)#socket_state.socket) of
+ gen_tcp -> c2s_compressed;
+ tls -> c2s_compressed_tls
+ end
+ end;
+ ejabberd_http_poll -> http_poll;
+ ejabberd_http_bind -> http_bind;
+ _ -> unknown
end.
process_presence_probe(From, To, StateData) ->
@@ -2072,6 +2130,27 @@ peerip(SockMod, Socket) ->
_ -> undefined
end.
+maybe_migrate(StateName, StateData) ->
+ case ejabberd_cluster:get_node({StateData#state.user,
+ StateData#state.server}) of
+ Node when Node == node() ->
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ #state{user = U, server = S, jid = JID, sid = SID} = StateData,
+ ejabberd_sm:open_session(SID, JID, Info),
+ case ejabberd_cluster:get_node_new({U, S}) of
+ Node ->
+ ok;
+ NewNode ->
+ After = ejabberd_cluster:rehash_timeout(),
+ migrate(self(), NewNode, After)
+ end,
+ fsm_next_state(StateName, StateData);
+ Node ->
+ fsm_migrate(StateName, StateData, Node, 0)
+ end.
+
%% fsm_next_state: Generate the next_state FSM tuple with different
%% timeout, depending on the future state
fsm_next_state(session_established, StateData) ->
@@ -2079,6 +2158,10 @@ fsm_next_state(session_established, StateData) ->
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
+fsm_migrate(StateName, StateData, Node, Timeout) ->
+ {migrate, StateData,
+ {Node, ?MODULE, start, [StateName, StateData]}, Timeout}.
+
%% fsm_reply: Generate the reply FSM tuple with different timeout,
%% depending on the future state
fsm_reply(Reply, session_established, StateData) ->