Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2010-06-02 18:01:36 +0400
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2010-06-02 18:01:36 +0400
commit2187bccc38f1b8a5a94b07bc115a89face211bf1 (patch)
treef6c7b4446ebd71218daaef9aa7a32f4d74f8ca39
parent6c0c30c0320468b3979459a85374531fdba952e3 (diff)
consistent hashing support. WARNING: update exmpp before running this
-rw-r--r--src/ejabberd_app.erl2
-rw-r--r--src/ejabberd_auth_anonymous.erl17
-rw-r--r--src/ejabberd_c2s.erl187
-rw-r--r--src/ejabberd_captcha.erl150
-rw-r--r--src/ejabberd_cluster.erl177
-rw-r--r--src/ejabberd_frontend_socket.erl85
-rw-r--r--src/ejabberd_local.erl50
-rw-r--r--src/ejabberd_node_groups.erl40
-rw-r--r--src/ejabberd_receiver.erl128
-rw-r--r--src/ejabberd_router.erl36
-rw-r--r--src/ejabberd_s2s.erl138
-rw-r--r--src/ejabberd_s2s_out.erl21
-rw-r--r--src/ejabberd_sm.erl215
-rw-r--r--src/ejabberd_socket.erl60
-rw-r--r--src/ejabberd_sup.erl16
-rw-r--r--src/mod_muc/mod_muc.erl173
-rw-r--r--src/mod_muc/mod_muc_log.erl11
-rw-r--r--src/mod_muc/mod_muc_room.erl75
-rw-r--r--src/mod_proxy65/mod_proxy65_sm.erl10
-rw-r--r--src/p1_fsm.erl70
-rw-r--r--src/web/ejabberd_http_bind.erl94
-rw-r--r--src/web/ejabberd_http_poll.erl57
-rw-r--r--src/web/mod_http_bind.erl10
23 files changed, 1294 insertions, 528 deletions
diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl
index b8c9ae644..1b0289752 100644
--- a/src/ejabberd_app.erl
+++ b/src/ejabberd_app.erl
@@ -65,6 +65,8 @@ start(normal, _Args) ->
%ejabberd_debug:fprof_start(),
maybe_add_nameservers(),
start_modules(),
+ ejabberd_cluster:announce(),
+ ejabberd_node_groups:start(),
ejabberd_listener:start_listeners(),
?INFO_MSG("ejabberd ~s is started in the node ~p", [?VERSION, node()]),
Sup;
diff --git a/src/ejabberd_auth_anonymous.erl b/src/ejabberd_auth_anonymous.erl
index cc4cf91fb..53b251f06 100644
--- a/src/ejabberd_auth_anonymous.erl
+++ b/src/ejabberd_auth_anonymous.erl
@@ -67,9 +67,12 @@
start(Host) when is_list(Host) ->
HostB = list_to_binary(Host),
%% TODO: Check cluster mode
+ update_tables(),
mnesia:create_table(anonymous, [{ram_copies, [node()]},
{type, bag},
- {attributes, record_info(fields, anonymous)}]),
+ {type, bag}, {local_content, true},
+ {attributes, record_info(fields, anonymous)}]),
+ mnesia:add_table_copy(anonymous, node(), ram_copies),
%% The hooks are needed to add / remove users from the anonymous tables
ejabberd_hooks:add(sm_register_connection_hook, HostB,
?MODULE, register_connection, 100),
@@ -168,7 +171,7 @@ remove_connection(SID, LUser, LServer) when is_list(LUser), is_list(LServer) ->
F = fun() ->
mnesia:delete_object({anonymous, US, SID})
end,
- mnesia:transaction(F).
+ mnesia:async_dirty(F).
%% @spec (SID, JID, Info) -> term()
%% SID = term()
@@ -184,7 +187,7 @@ register_connection(SID, JID, Info) when ?IS_JID(JID) ->
ok;
?MODULE ->
US = {LUser, LServer},
- mnesia:sync_dirty(
+ mnesia:async_dirty(
fun() -> mnesia:write(#anonymous{us = US, sid=SID})
end);
_ ->
@@ -345,3 +348,11 @@ remove_user(_User, _Server, _Password) ->
plain_password_required() ->
false.
+
+update_tables() ->
+ case catch mnesia:table_info(anonymous, local_content) of
+ false ->
+ mnesia:delete_table(anonymous);
+ _ ->
+ ok
+ end.
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 2991677e1..11a0a39ab 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -35,7 +35,7 @@
%% External exports
-export([start/2,
stop/1,
- start_link/2,
+ start_link/3,
send_text/2,
send_element/2,
socket_type/0,
@@ -56,8 +56,9 @@
code_change/4,
handle_info/3,
terminate/3,
- print_state/1
- ]).
+ print_state/1,
+ migrate/3
+ ]).
-export([get_state/1]).
@@ -100,6 +101,7 @@
conn = unknown,
auth_module = unknown,
ip,
+ fsm_limit_opts,
lang}).
%-define(DBGFSM, true).
@@ -112,11 +114,12 @@
%% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS).
--define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s, [SockData, Opts],
- fsm_limit_opts(Opts) ++ ?FSMOPTS)).
+-define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s,
+ [SockData, Opts, FSMLimitOpts],
+ FSMLimitOpts ++ ?FSMOPTS)).
-else.
-define(SUPERVISOR_START, supervisor:start_child(ejabberd_c2s_sup,
- [SockData, Opts])).
+ [SockData, Opts, FSMLimitOpts])).
-endif.
%% This is the timeout to apply between event when starting a new
@@ -155,12 +158,17 @@
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
+start(StateName, #state{fsm_limit_opts = Opts} = State) ->
+ start(StateName, State, Opts);
start(SockData, Opts) ->
+ start(SockData, Opts, fsm_limit_opts(Opts)).
+
+start(SockData, Opts, FSMLimitOpts) ->
?SUPERVISOR_START.
-start_link(SockData, Opts) ->
- ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts],
- fsm_limit_opts(Opts) ++ ?FSMOPTS).
+start_link(SockData, Opts, FSMLimitOpts) ->
+ ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts, FSMLimitOpts],
+ FSMLimitOpts ++ ?FSMOPTS).
socket_type() ->
xml_stream.
@@ -177,6 +185,9 @@ get_state(FsmRef) ->
stop(FsmRef) ->
?GEN_FSM:send_event(FsmRef, closed).
+migrate(FsmRef, Node, After) ->
+ ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}).
+
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
@@ -188,7 +199,7 @@ stop(FsmRef) ->
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
-init([{SockMod, Socket}, Opts]) ->
+init([{SockMod, Socket}, Opts, FSMLimitOpts]) ->
Access = case lists:keysearch(access, 1, Opts) of
{value, {_, A}} -> A;
_ -> all
@@ -212,7 +223,12 @@ init([{SockMod, Socket}, Opts]) ->
(_) -> false
end, Opts),
TLSOpts = [verify_none | TLSOpts1],
- IP = peerip(SockMod, Socket),
+ IP = case lists:keysearch(frontend_ip, 1, Opts) of
+ {value, {_, IP1}} ->
+ IP1;
+ _ ->
+ peerip(SockMod, Socket)
+ end,
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
true ->
@@ -240,8 +256,29 @@ init([{SockMod, Socket}, Opts]) ->
streamid = new_id(),
access = Access,
shaper = Shaper,
- ip = IP},
+ ip = IP,
+ fsm_limit_opts = FSMLimitOpts},
?C2S_OPEN_TIMEOUT}
+ end;
+init([StateName, StateData, _FSMLimitOpts]) ->
+ MRef = (StateData#state.sockmod):monitor(StateData#state.socket),
+ if StateName == session_established ->
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ {Time, _} = StateData#state.sid,
+ SID = {Time, self()},
+ Priority = case StateData#state.pres_last of
+ undefined ->
+ undefined;
+ El ->
+ exmpp_presence:get_priority(El)
+ end,
+ ejabberd_sm:open_session(SID, StateData#state.jid, Priority, Info),
+ NewStateData = StateData#state{sid = SID, socket_monitor = MRef},
+ {ok, StateName, NewStateData};
+ true ->
+ {ok, StateName, StateData#state{socket_monitor = MRef}}
end.
%% Return list of all available resources of contacts,
@@ -460,12 +497,12 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
exmpp_jid:to_binary(JID), AuthModule]),
SID = {now(), self()},
Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, AuthModule}],
+ %% Info = [{ip, StateData#state.ip}, {conn, Conn},
+ %% {auth_module, AuthModule}],
Res = exmpp_server_legacy_auth:success(El),
send_element(StateData, Res),
- ejabberd_sm:open_session(
- SID, exmpp_jid:make(U, StateData#state.server, R), Info),
+ %% ejabberd_sm:open_session(
+ %% SID, exmpp_jid:make(U, StateData#state.server, R), Info),
change_shaper(StateData, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
@@ -479,19 +516,19 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
privacy_get_user_list, StateData#state.server,
#userlist{},
[UBinary, StateData#state.server]),
- fsm_next_state(session_established,
- StateData#state{
- sasl_state = 'undefined',
- %not used anymore, let the GC work.
- user = list_to_binary(U),
- resource = list_to_binary(R),
- jid = JID,
- sid = SID,
- conn = Conn,
- auth_module = AuthModule,
- pres_f = ?SETS:from_list(Fs1),
- pres_t = ?SETS:from_list(Ts1),
- privacy_list = PrivList});
+ maybe_migrate(session_established,
+ StateData#state{
+ sasl_state = 'undefined',
+ %not used anymore, let the GC work.
+ user = list_to_binary(U),
+ resource = list_to_binary(R),
+ jid = JID,
+ sid = SID,
+ conn = Conn,
+ auth_module = AuthModule,
+ pres_f = ?SETS:from_list(Fs1),
+ pres_t = ?SETS:from_list(Ts1),
+ privacy_list = PrivList});
_ ->
?INFO_MSG(
"(~w) Failed legacy authentication for ~s",
@@ -805,19 +842,19 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
[StateData#state.user, StateData#state.server]),
SID = {now(), self()},
Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
- ejabberd_sm:open_session(
- SID, JID, Info),
- fsm_next_state(session_established,
- StateData#state{
- sasl_state = 'undefined',
- %not used anymore, let the GC work.
- sid = SID,
- conn = Conn,
- pres_f = ?SETS:from_list(Fs1),
- pres_t = ?SETS:from_list(Ts1),
- privacy_list = PrivList});
+ %% Info = [{ip, StateData#state.ip}, {conn, Conn},
+ %% {auth_module, StateData#state.auth_module}],
+ %% ejabberd_sm:open_session(
+ %% SID, JID, Info),
+ maybe_migrate(session_established,
+ StateData#state{
+ sasl_state = 'undefined',
+ %%not used anymore, let the GC work.
+ sid = SID,
+ conn = Conn,
+ pres_f = ?SETS:from_list(Fs1),
+ pres_t = ?SETS:from_list(Ts1),
+ privacy_list = PrivList});
_ ->
ejabberd_hooks:run(forbidden_session_hook,
StateData#state.server, [JID]),
@@ -997,6 +1034,8 @@ session_established2(El, StateData) ->
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
+handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() ->
+ fsm_migrate(StateName, StateData, Node, After * 2);
handle_event(_Event, StateName, StateData) ->
fsm_next_state(StateName, StateData).
@@ -1337,6 +1376,20 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
+terminate({migrated, ClonePid}, StateName, StateData) ->
+ if StateName == session_established ->
+ ?INFO_MSG("(~w) Migrating ~s to ~p on node ~p",
+ [StateData#state.socket,
+ exmpp_jid:to_binary(StateData#state.jid),
+ ClonePid, node(ClonePid)]),
+ ejabberd_sm:close_session(StateData#state.sid,
+ StateData#state.jid);
+ true ->
+ ok
+ end,
+ (StateData#state.sockmod):change_controller(
+ StateData#state.socket, ClonePid),
+ ok;
terminate(_Reason, StateName, StateData) ->
%%TODO: resource could be 'undefined' if terminate before bind?
case StateName of
@@ -1504,16 +1557,21 @@ get_auth_tags([], U, P, D, R) ->
get_conn_type(StateData) ->
case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of
- gen_tcp -> c2s;
- tls -> c2s_tls;
- ejabberd_zlib ->
- case ejabberd_zlib:get_sockmod((StateData#state.socket)#socket_state.socket) of
- gen_tcp -> c2s_compressed;
- tls -> c2s_compressed_tls
- end;
- ejabberd_http_poll -> http_poll;
- ejabberd_http_bind -> http_bind;
- _ -> unknown
+ gen_tcp -> c2s;
+ tls -> c2s_tls;
+ ejabberd_zlib ->
+ if is_pid(StateData#state.socket) ->
+ unknown;
+ true ->
+ case ejabberd_zlib:get_sockmod(
+ (StateData#state.socket)#socket_state.socket) of
+ gen_tcp -> c2s_compressed;
+ tls -> c2s_compressed_tls
+ end
+ end;
+ ejabberd_http_poll -> http_poll;
+ ejabberd_http_bind -> http_bind;
+ _ -> unknown
end.
process_presence_probe(From, To, StateData) ->
@@ -2072,6 +2130,27 @@ peerip(SockMod, Socket) ->
_ -> undefined
end.
+maybe_migrate(StateName, StateData) ->
+ case ejabberd_cluster:get_node({StateData#state.user,
+ StateData#state.server}) of
+ Node when Node == node() ->
+ Conn = get_conn_type(StateData),
+ Info = [{ip, StateData#state.ip}, {conn, Conn},
+ {auth_module, StateData#state.auth_module}],
+ #state{user = U, server = S, jid = JID, sid = SID} = StateData,
+ ejabberd_sm:open_session(SID, JID, Info),
+ case ejabberd_cluster:get_node_new({U, S}) of
+ Node ->
+ ok;
+ NewNode ->
+ After = ejabberd_cluster:rehash_timeout(),
+ migrate(self(), NewNode, After)
+ end,
+ fsm_next_state(StateName, StateData);
+ Node ->
+ fsm_migrate(StateName, StateData, Node, 0)
+ end.
+
%% fsm_next_state: Generate the next_state FSM tuple with different
%% timeout, depending on the future state
fsm_next_state(session_established, StateData) ->
@@ -2079,6 +2158,10 @@ fsm_next_state(session_established, StateData) ->
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
+fsm_migrate(StateName, StateData, Node, Timeout) ->
+ {migrate, StateData,
+ {Node, ?MODULE, start, [StateName, StateData]}, Timeout}.
+
%% fsm_reply: Generate the reply FSM tuple with different timeout,
%% depending on the future state
fsm_reply(Reply, session_established, StateData) ->
diff --git a/src/ejabberd_captcha.erl b/src/ejabberd_captcha.erl
index d933a19f3..4bf1eaacb 100644
--- a/src/ejabberd_captcha.erl
+++ b/src/ejabberd_captcha.erl
@@ -35,7 +35,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([create_captcha/6, build_captcha_html/2, check_captcha/2,
+-export([create_captcha/5, build_captcha_html/2, check_captcha/2,
process_reply/1, process/2, is_feature_available/0]).
-include_lib("exmpp/include/exmpp.hrl").
@@ -65,19 +65,11 @@
-define(CAPTCHA_TEXT(Lang), translate:translate(Lang, "Enter the text you see")).
-define(CAPTCHA_LIFETIME, 120000). % two minutes
+-define(RPC_TIMEOUT, 5000).
-record(state, {}).
-record(captcha, {id, pid, key, tref, args}).
--define(T(S),
- case catch mnesia:transaction(fun() -> S end) of
- {atomic, Res} ->
- Res;
- {_, Reason} ->
- ?ERROR_MSG("mnesia transaction failed: ~p", [Reason]),
- {error, Reason}
- end).
-
%%====================================================================
%% API
%%====================================================================
@@ -88,10 +80,11 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-create_captcha(Id, SID, From, To, Lang, Args)
- when is_list(Id), is_list(SID) ->
+create_captcha(SID, From, To, Lang, Args)
+ when is_binary(Lang), is_list(SID) ->
case create_image() of
{ok, Type, Key, Image} ->
+ Id = randoms:get_string() ++ "-" ++ ejabberd_cluster:node_id(),
B64Image = jlib:encode_base64(binary_to_list(Image)),
JID = exmpp_jid:to_list(From),
CID = "sha1+" ++ sha:sha(Image) ++ "@bob.xmpp.org",
@@ -216,13 +209,9 @@ create_captcha(Id, SID, From, To, Lang, Args)
%OOB = {xmlelement, "x", [{"xmlns", ?NS_OOBD_X_s}],
% [{xmlelement, "url", [], [{xmlcdata, get_url(Id)}]}]},
Tref = erlang:send_after(?CAPTCHA_LIFETIME, ?MODULE, {remove_id, Id}),
- case ?T(mnesia:write(#captcha{id=Id, pid=self(), key=Key,
- tref=Tref, args=Args})) of
- ok ->
- {ok, [Body, OOB, Captcha, Data]};
- _Err ->
- error
- end;
+ ets:insert(captcha, #captcha{id=Id, pid=self(), key=Key,
+ tref=Tref, args=Args}),
+ {ok, Id, [Body, OOB, Captcha, Data]};
_Err ->
error
end.
@@ -234,8 +223,8 @@ create_captcha(Id, SID, From, To, Lang, Args)
%% IdEl = xmlelement()
%% KeyEl = xmlelement()
build_captcha_html(Id, Lang) ->
- case mnesia:dirty_read(captcha, Id) of
- [#captcha{}] ->
+ case lookup_captcha(Id) of
+ {ok, _} ->
%ImgEl = {xmlelement, "img", [{"src", get_url(Id ++ "/image")}], []},
ImgEl =
#xmlel{
@@ -362,20 +351,25 @@ build_captcha_html(Id, Lang) ->
%% @spec (Id::string(), ProvidedKey::string()) -> captcha_valid | captcha_non_valid | captcha_not_found
check_captcha(Id, ProvidedKey) ->
- ?T(case mnesia:read(captcha, Id, write) of
- [#captcha{pid=Pid, args=Args, key=StoredKey, tref=Tref}] ->
- mnesia:delete({captcha, Id}),
- erlang:cancel_timer(Tref),
- if StoredKey == ProvidedKey ->
- Pid ! {captcha_succeed, Args},
- captcha_valid;
- true ->
- Pid ! {captcha_failed, Args},
- captcha_non_valid
- end;
- _ ->
- captcha_not_found
- end).
+ case string:tokens(Id, "-") of
+ [_, NodeID] ->
+ case ejabberd_cluster:get_node_by_id(NodeID) of
+ Node when Node == node() ->
+ do_check_captcha(Id, ProvidedKey);
+ Node ->
+ case catch rpc:call(Node, ?MODULE, check_captcha,
+ [Id, ProvidedKey], ?RPC_TIMEOUT) of
+ {'EXIT', _} ->
+ captcha_not_found;
+ {badrpc, _} ->
+ captcha_not_found;
+ Res ->
+ Res
+ end
+ end;
+ _ ->
+ captcha_not_found
+ end.
process_reply(El) ->
case {exmpp_xml:element_matches(El, captcha),
@@ -389,20 +383,14 @@ process_reply(El) ->
case {proplists:get_value("challenge", Fields),
proplists:get_value("ocr", Fields)} of
{[Id|_], [OCR|_]} ->
- ?T(case mnesia:read(captcha, Id, write) of
- [#captcha{pid=Pid, args=Args, key=Key, tref=Tref}] ->
- mnesia:delete({captcha, Id}),
- erlang:cancel_timer(Tref),
- if OCR == Key ->
- Pid ! {captcha_succeed, Args},
- ok;
- true ->
- Pid ! {captcha_failed, Args},
- {error, bad_match}
- end;
- _ ->
- {error, not_found}
- end);
+ case check_captcha(Id, OCR) of
+ captcha_valid ->
+ ok;
+ captcha_non_valid ->
+ {error, bad_match};
+ captcha_not_found ->
+ {error, not_found}
+ end;
_ ->
{error, malformed}
end
@@ -431,8 +419,8 @@ process(_Handlers, #request{method='GET', lang=Lang, path=[_, Id]}) ->
end;
process(_Handlers, #request{method='GET', path=[_, Id, "image"]}) ->
- case mnesia:dirty_read(captcha, Id) of
- [#captcha{key=Key}] ->
+ case lookup_captcha(Id) of
+ {ok, #captcha{key=Key}} ->
case create_image(Key) of
{ok, Type, _, Img} ->
{200,
@@ -477,10 +465,8 @@ process(_Handlers, _Request) ->
%% gen_server callbacks
%%====================================================================
init([]) ->
- mnesia:create_table(captcha,
- [{ram_copies, [node()]},
- {attributes, record_info(fields, captcha)}]),
- mnesia:add_table_copy(captcha, node(), ram_copies),
+ mnesia:delete_table(captcha),
+ ets:new(captcha, [named_table, public, {keypos, #captcha.id}]),
check_captcha_setup(),
{ok, #state{}}.
@@ -492,13 +478,13 @@ handle_cast(_Msg, State) ->
handle_info({remove_id, Id}, State) ->
?DEBUG("captcha ~p timed out", [Id]),
- _ = ?T(case mnesia:read(captcha, Id, write) of
- [#captcha{args=Args, pid=Pid}] ->
- Pid ! {captcha_failed, Args},
- mnesia:delete({captcha, Id});
- _ ->
- ok
- end),
+ case ets:lookup(captcha, Id) of
+ [#captcha{args=Args, pid=Pid}] ->
+ Pid ! {captcha_failed, Args},
+ ets:delete(captcha, Id);
+ _ ->
+ ok
+ end,
{noreply, State};
handle_info(_Info, State) ->
@@ -642,3 +628,43 @@ check_captcha_setup() ->
false ->
ok
end.
+
+lookup_captcha(Id) ->
+ case string:tokens(Id, "-") of
+ [_, NodeID] ->
+ case ejabberd_cluster:get_node_by_id(NodeID) of
+ Node when Node == node() ->
+ case ets:lookup(captcha, Id) of
+ [C] ->
+ {ok, C};
+ _ ->
+ {error, enoent}
+ end;
+ Node ->
+ case catch rpc:call(Node, ets, lookup,
+ [captcha, Id], ?RPC_TIMEOUT) of
+ [C] ->
+ {ok, C};
+ _ ->
+ {error, enoent}
+ end
+ end;
+ _ ->
+ {error, enoent}
+ end.
+
+do_check_captcha(Id, ProvidedKey) ->
+ case ets:lookup(captcha, Id) of
+ [#captcha{pid = Pid, args = Args, key = ValidKey, tref = Tref}] ->
+ ets:delete(captcha, Id),
+ erlang:cancel_timer(Tref),
+ if ValidKey == ProvidedKey ->
+ Pid ! {captcha_succeed, Args},
+ captcha_valid;
+ true ->
+ Pid ! {captcha_failed, Args},
+ captcha_non_valid
+ end;
+ _ ->
+ captcha_not_found
+ end.
diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl
new file mode 100644
index 000000000..3db8dd29d
--- /dev/null
+++ b/src/ejabberd_cluster.erl
@@ -0,0 +1,177 @@
+%%%-------------------------------------------------------------------
+%%% File : ejabberd_cluster.erl
+%%% Author : Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%% Description :
+%%%
+%%% Created : 2 Apr 2010 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(ejabberd_cluster).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, get_node/1, get_node_new/1, announce/0,
+ node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("ejabberd.hrl").
+
+-define(HASHTBL, nodes_hash).
+-define(HASHTBL_NEW, nodes_hash_new).
+-define(POINTS, 16).
+-define(REHASH_TIMEOUT, 5000).
+
+-record(state, {}).
+
+%%====================================================================
+%% API
+%%====================================================================
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_node(Key) ->
+ Hash = erlang:phash2(Key),
+ get_node_by_hash(?HASHTBL, Hash).
+
+get_node_new(Key) ->
+ Hash = erlang:phash2(Key),
+ get_node_by_hash(?HASHTBL_NEW, Hash).
+
+get_nodes() ->
+ %% TODO
+ mnesia:system_info(running_db_nodes).
+
+announce() ->
+ gen_server:call(?MODULE, announce, infinity).
+
+node_id() ->
+ integer_to_list(erlang:phash2(node())).
+
+rehash_timeout() ->
+ ?REHASH_TIMEOUT.
+
+get_node_by_id(NodeID) when is_list(NodeID) ->
+ case catch list_to_existing_atom(NodeID) of
+ {'EXIT', _} ->
+ node();
+ Res ->
+ get_node_by_id(Res)
+ end;
+get_node_by_id(NodeID) ->
+ case global:whereis_name(NodeID) of
+ Pid when is_pid(Pid) ->
+ node(Pid);
+ _ ->
+ node()
+ end.
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+init([]) ->
+ net_kernel:monitor_nodes(true, [{node_type, visible}]),
+ ets:new(?HASHTBL, [named_table, public, ordered_set]),
+ ets:new(?HASHTBL_NEW, [named_table, public, ordered_set]),
+ register_node(),
+ AllNodes = mnesia:system_info(running_db_nodes),
+ OtherNodes = case AllNodes of
+ [_] ->
+ AllNodes;
+ _ ->
+ AllNodes -- [node()]
+ end,
+ append_nodes(?HASHTBL, OtherNodes),
+ append_nodes(?HASHTBL_NEW, AllNodes),
+ {ok, #state{}}.
+
+handle_call(announce, _From, State) ->
+ case mnesia:system_info(running_db_nodes) of
+ [_MyNode] ->
+ ok;
+ Nodes ->
+ OtherNodes = Nodes -- [node()],
+ lists:foreach(
+ fun(Node) ->
+ {?MODULE, Node} ! {node_ready, node()}
+ end, OtherNodes),
+ ?INFO_MSG("waiting for migration from nodes: ~w",
+ [OtherNodes]),
+ timer:sleep(?REHASH_TIMEOUT),
+ append_node(?HASHTBL, node())
+ end,
+ {reply, ok, State};
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({node_ready, Node}, State) ->
+ ?INFO_MSG("node ~p is ready, starting migration", [Node]),
+ append_node(?HASHTBL_NEW, Node),
+ ejabberd_hooks:run(node_hash_update, [?REHASH_TIMEOUT]),
+ timer:sleep(?REHASH_TIMEOUT),
+ ?INFO_MSG("adding node ~p to hash", [Node]),
+ append_node(?HASHTBL, Node),
+ {noreply, State};
+handle_info({nodedown, Node, _}, State) ->
+ ?INFO_MSG("node ~p goes down", [Node]),
+ delete_node(?HASHTBL, Node),
+ delete_node(?HASHTBL_NEW, Node),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+append_nodes(Tab, Nodes) ->
+ lists:foreach(
+ fun(Node) ->
+ append_node(Tab, Node)
+ end, Nodes).
+
+append_node(Tab, Node) ->
+ lists:foreach(
+ fun(I) ->
+ Hash = erlang:phash2({I, Node}),
+ ets:insert(Tab, {Hash, Node})
+ end, lists:seq(1, ?POINTS)).
+
+delete_node(Tab, Node) ->
+ lists:foreach(
+ fun(I) ->
+ Hash = erlang:phash2({I, Node}),
+ ets:delete(Tab, Hash)
+ end, lists:seq(1, ?POINTS)).
+
+get_node_by_hash(Tab, Hash) ->
+ NodeHash = case ets:next(Tab, Hash) of
+ '$end_of_table' ->
+ ets:first(Tab);
+ NH ->
+ NH
+ end,
+ if NodeHash == '$end_of_table' ->
+ erlang:error(no_running_nodes);
+ true ->
+ case ets:lookup(Tab, NodeHash) of
+ [] ->
+ get_node_by_hash(Tab, Hash);
+ [{_, Node}] ->
+ Node
+ end
+ end.
+
+register_node() ->
+ global:register_name(list_to_atom(node_id()), self()).
diff --git a/src/ejabberd_frontend_socket.erl b/src/ejabberd_frontend_socket.erl
index 82106da55..88d77aed9 100644
--- a/src/ejabberd_frontend_socket.erl
+++ b/src/ejabberd_frontend_socket.erl
@@ -45,6 +45,8 @@
get_peer_certificate/1,
get_verify_result/1,
close/1,
+ setopts/2,
+ change_controller/2,
sockname/1, peername/1]).
%% gen_server callbacks
@@ -94,17 +96,15 @@ start(Module, SockMod, Socket, Opts) ->
todo
end.
-starttls(FsmRef, _TLSOpts) ->
- %gen_server:call(FsmRef, {starttls, TLSOpts}),
- FsmRef.
+starttls(FsmRef, TLSOpts) ->
+ starttls(FsmRef, TLSOpts, undefined).
starttls(FsmRef, TLSOpts, Data) ->
gen_server:call(FsmRef, {starttls, TLSOpts, Data}),
FsmRef.
compress(FsmRef) ->
- gen_server:call(FsmRef, compress),
- FsmRef.
+ compress(FsmRef, undefined).
compress(FsmRef, Data) ->
gen_server:call(FsmRef, {compress, Data}),
@@ -137,10 +137,14 @@ close(FsmRef) ->
sockname(FsmRef) ->
gen_server:call(FsmRef, sockname).
-peername(_FsmRef) ->
- %gen_server:call(FsmRef, peername).
- {ok, {{0, 0, 0, 0}, 0}}.
+setopts(FsmRef, Opts) ->
+ gen_server:call(FsmRef, {setopts, Opts}).
+
+change_controller(FsmRef, C2SPid) ->
+ gen_server:call(FsmRef, {change_controller, C2SPid}).
+peername(FsmRef) ->
+ gen_server:call(FsmRef, peername).
%%====================================================================
%% gen_server callbacks
@@ -156,9 +160,16 @@ peername(_FsmRef) ->
init([Module, SockMod, Socket, Opts, Receiver]) ->
%% TODO: monitor the receiver
Node = ejabberd_node_groups:get_closest_node(backend),
+ IP = case peername(SockMod, Socket) of
+ {ok, IP1} ->
+ IP1;
+ _ ->
+ undefined
+ end,
{SockMod2, Socket2} = check_starttls(SockMod, Socket, Receiver, Opts),
{ok, Pid} =
- rpc:call(Node, Module, start, [{?MODULE, self()}, Opts]),
+ rpc:call(Node, Module, start,
+ [{?MODULE, self()}, [{frontend_ip, IP} | Opts]]),
ejabberd_receiver:become_controller(Receiver, Pid),
{ok, #state{sockmod = SockMod2,
socket = Socket2,
@@ -173,38 +184,16 @@ init([Module, SockMod, Socket, Opts, Receiver]) ->
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call({starttls, TLSOpts}, _From, State) ->
- {ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts),
- ejabberd_receiver:starttls(State#state.receiver, TLSSocket),
- Reply = ok,
- {reply, Reply, State#state{socket = TLSSocket, sockmod = tls},
- ?HIBERNATE_TIMEOUT};
-
handle_call({starttls, TLSOpts, Data}, _From, State) ->
- {ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts),
- ejabberd_receiver:starttls(State#state.receiver, TLSSocket),
- catch (State#state.sockmod):send(
- State#state.socket, Data),
+ {ok, TLSSocket} = ejabberd_receiver:starttls(
+ State#state.receiver, TLSOpts, Data),
Reply = ok,
{reply, Reply, State#state{socket = TLSSocket, sockmod = tls},
?HIBERNATE_TIMEOUT};
-handle_call(compress, _From, State) ->
- {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
- State#state.sockmod,
- State#state.socket),
- ejabberd_receiver:compress(State#state.receiver, ZlibSocket),
- Reply = ok,
- {reply, Reply, State#state{socket = ZlibSocket, sockmod = ejabberd_zlib},
- ?HIBERNATE_TIMEOUT};
-
handle_call({compress, Data}, _From, State) ->
- {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
- State#state.sockmod,
- State#state.socket),
- ejabberd_receiver:compress(State#state.receiver, ZlibSocket),
- catch (State#state.sockmod):send(
- State#state.socket, Data),
+ {ok, ZlibSocket} = ejabberd_receiver:compress(
+ State#state.receiver, Data),
Reply = ok,
{reply, Reply, State#state{socket = ZlibSocket, sockmod = ejabberd_zlib},
?HIBERNATE_TIMEOUT};
@@ -244,13 +233,7 @@ handle_call(close, _From, State) ->
handle_call(sockname, _From, State) ->
#state{sockmod = SockMod, socket = Socket} = State,
- Reply =
- case SockMod of
- gen_tcp ->
- inet:sockname(Socket);
- _ ->
- SockMod:sockname(Socket)
- end,
+ Reply = peername(SockMod, Socket),
{reply, Reply, State, ?HIBERNATE_TIMEOUT};
handle_call(peername, _From, State) ->
@@ -264,6 +247,14 @@ handle_call(peername, _From, State) ->
end,
{reply, Reply, State, ?HIBERNATE_TIMEOUT};
+handle_call({setopts, Opts}, _From, State) ->
+ ejabberd_receiver:setopts(State#state.receiver, Opts),
+ {reply, ok, State, ?HIBERNATE_TIMEOUT};
+
+handle_call({change_controller, Pid}, _From, State) ->
+ ejabberd_receiver:change_controller(State#state.receiver, Pid),
+ {reply, ok, State, ?HIBERNATE_TIMEOUT};
+
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State, ?HIBERNATE_TIMEOUT}.
@@ -316,10 +307,16 @@ check_starttls(SockMod, Socket, Receiver, Opts) ->
end, Opts),
if
TLSEnabled ->
- {ok, TLSSocket} = tls:tcp_to_tls(Socket, TLSOpts),
- ejabberd_receiver:starttls(Receiver, TLSSocket),
+ {ok, TLSSocket} = ejabberd_receiver:starttls(Receiver, TLSOpts),
{tls, TLSSocket};
true ->
{SockMod, Socket}
end.
+peername(SockMod, Socket) ->
+ case SockMod of
+ gen_tcp ->
+ inet:peername(Socket);
+ _ ->
+ SockMod:peername(Socket)
+ end.
diff --git a/src/ejabberd_local.erl b/src/ejabberd_local.erl
index 107aadc0b..65086d972 100644
--- a/src/ejabberd_local.erl
+++ b/src/ejabberd_local.erl
@@ -142,7 +142,7 @@ route(From, To, Packet) ->
route_iq(From, To, #iq{type = Type} = IQ, F) when is_function(F) ->
Packet = if Type == set; Type == get ->
- ID = list_to_binary(randoms:get_string()),
+ ID = list_to_binary(ejabberd_router:make_id()),
Host = exmpp_jid:prep_domain(From),
register_iq_response_handler(Host, ID, undefined, F),
exmpp_iq:iq_to_xmlel(IQ#iq{id = ID});
@@ -153,10 +153,10 @@ route_iq(From, To, #iq{type = Type} = IQ, F) when is_function(F) ->
register_iq_response_handler(_Host, ID, Module, Function) ->
TRef = erlang:start_timer(?IQ_TIMEOUT, ejabberd_local, ID),
- mnesia:dirty_write(#iq_response{id = ID,
- module = Module,
- function = Function,
- timer = TRef}).
+ ets:insert(iq_response, #iq_response{id = ID,
+ module = Module,
+ function = Function,
+ timer = TRef}).
register_iq_handler(Host, XMLNS, Module, Fun) ->
ejabberd_local ! {register_iq_handler, Host, XMLNS, Module, Fun}.
@@ -198,11 +198,9 @@ init([]) ->
?MODULE, bounce_resource_packet, 100)
end, ?MYHOSTS),
catch ets:new(?IQTABLE, [named_table, public]),
- update_table(),
- mnesia:create_table(iq_response,
- [{ram_copies, [node()]},
- {attributes, record_info(fields, iq_response)}]),
- mnesia:add_table_copy(iq_response, node(), ram_copies),
+ mnesia:delete_table(iq_response),
+ catch ets:new(iq_response, [named_table, public,
+ {keypos, #iq_response.id}]),
{ok, #state{}}.
%%--------------------------------------------------------------------
@@ -286,7 +284,7 @@ handle_info(refresh_iq_handlers, State) ->
end, ets:tab2list(?IQTABLE)),
{noreply, State};
handle_info({timeout, _TRef, ID}, State) ->
- process_iq_timeout(ID),
+ spawn(fun() -> process_iq_timeout(ID) end),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -342,40 +340,22 @@ do_route(From, To, Packet) ->
end
end.
-update_table() ->
- case catch mnesia:table_info(iq_response, attributes) of
- [id, module, function] ->
- mnesia:delete_table(iq_response);
- [id, module, function, timer] ->
- ok;
- {'EXIT', _} ->
- ok
- end.
-
get_iq_callback(ID) ->
- case mnesia:dirty_read(iq_response, ID) of
+ case ets:lookup(iq_response, ID) of
[#iq_response{module = Module, timer = TRef,
function = Function}] ->
cancel_timer(TRef),
- mnesia:dirty_delete(iq_response, ID),
+ ets:delete(iq_response, ID),
{ok, Module, Function};
_ ->
error
end.
process_iq_timeout(ID) ->
- spawn(fun process_iq_timeout/0) ! ID.
-
-process_iq_timeout() ->
- receive
- ID ->
- case get_iq_callback(ID) of
- {ok, undefined, Function} ->
- Function(timeout);
- _ ->
- ok
- end
- after 5000 ->
+ case get_iq_callback(ID) of
+ {ok, undefined, Function} ->
+ Function(timeout);
+ _ ->
ok
end.
diff --git a/src/ejabberd_node_groups.erl b/src/ejabberd_node_groups.erl
index fc1b4ded5..055b96aef 100644
--- a/src/ejabberd_node_groups.erl
+++ b/src/ejabberd_node_groups.erl
@@ -31,6 +31,7 @@
%% API
-export([start_link/0,
+ start/0,
join/1,
leave/1,
get_members/1,
@@ -40,7 +41,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {}).
+-record(state, {groups = []}).
%%====================================================================
%% API
@@ -49,6 +50,15 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
+start() ->
+ ChildSpec = {?MODULE,
+ {?MODULE, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [?MODULE]},
+ supervisor:start_child(ejabberd_sup, ChildSpec).
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -81,30 +91,19 @@ get_closest_node(Name) ->
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
- {FE, BE} =
+ Groups =
case ejabberd_config:get_local_option(node_type) of
frontend ->
- {true, false};
+ [frontend];
backend ->
- {false, true};
+ [backend];
generic ->
- {true, true};
+ [frontend, backend];
undefined ->
- {true, true}
+ [frontend, backend]
end,
- if
- FE ->
- join(frontend);
- true ->
- ok
- end,
- if
- BE ->
- join(backend);
- true ->
- ok
- end,
- {ok, #state{}}.
+ lists:foreach(fun join/1, Groups),
+ {ok, #state{groups = Groups}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -144,7 +143,8 @@ handle_info(_Info, State) ->
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{groups = Groups}) ->
+ lists:foreach(fun leave/1, Groups),
ok.
%%--------------------------------------------------------------------
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl
index d0c464d30..f0cef9fe8 100644
--- a/src/ejabberd_receiver.erl
+++ b/src/ejabberd_receiver.erl
@@ -36,8 +36,12 @@
change_shaper/2,
reset_stream/1,
starttls/2,
+ starttls/3,
compress/2,
+ send/2,
become_controller/2,
+ change_controller/2,
+ setopts/2,
close/1]).
%% gen_server callbacks
@@ -52,6 +56,7 @@
c2s_pid,
max_stanza_size,
xml_stream_state,
+ tref,
timeout}).
-define(HIBERNATE_TIMEOUT, 90000).
@@ -86,15 +91,32 @@ change_shaper(Pid, Shaper) ->
reset_stream(Pid) ->
gen_server:call(Pid, reset_stream).
-starttls(Pid, TLSSocket) ->
- gen_server:call(Pid, {starttls, TLSSocket}).
+starttls(Pid, TLSOpts) ->
+ starttls(Pid, TLSOpts, undefined).
-compress(Pid, ZlibSocket) ->
- gen_server:call(Pid, {compress, ZlibSocket}).
+starttls(Pid, TLSOpts, Data) ->
+ gen_server:call(Pid, {starttls, TLSOpts, Data}).
+
+compress(Pid, Data) ->
+ gen_server:call(Pid, {compress, Data}).
become_controller(Pid, C2SPid) ->
gen_server:call(Pid, {become_controller, C2SPid}).
+change_controller(Pid, C2SPid) ->
+ gen_server:call(Pid, {change_controller, C2SPid}).
+
+setopts(Pid, Opts) ->
+ case lists:member({active, false}, Opts) of
+ true ->
+ gen_server:call(Pid, deactivate_socket);
+ false ->
+ ok
+ end.
+
+send(Pid, Data) ->
+ gen_server:call(Pid, {send, Data}).
+
close(Pid) ->
gen_server:cast(Pid, close).
@@ -132,28 +154,42 @@ init([Socket, SockMod, Shaper, MaxStanzaSize]) ->
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call({starttls, TLSSocket}, _From, State) ->
+handle_call({starttls, TLSOpts, Data}, _From, State) ->
+ {ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts),
+ if Data /= undefined ->
+ do_send(State, Data);
+ true ->
+ ok
+ end,
NewXMLStreamState = do_reset_stream(State),
NewState = State#state{socket = TLSSocket,
sock_mod = tls,
xml_stream_state = NewXMLStreamState},
case tls:recv_data(TLSSocket, "") of
{ok, TLSData} ->
- {NextState, Hib} = process_data(TLSData, NewState),
- {reply, ok, NextState, Hib};
+ {NextState, Hib} = process_data(TLSData, NewState),
+ {reply, {ok, TLSSocket}, NextState, Hib};
{error, _Reason} ->
{stop, normal, ok, NewState}
end;
-handle_call({compress, ZlibSocket}, _From,
- #state{xml_stream_state = XMLStreamState} = State) ->
+handle_call({compress, Data}, _From,
+ #state{xml_stream_state = XMLStreamState,
+ sock_mod = SockMod,
+ socket = Socket} = State) ->
+ {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(SockMod, Socket),
+ if Data /= undefined ->
+ do_send(State, Data);
+ true ->
+ ok
+ end,
NewXMLStreamState = exmpp_xmlstream:reset(XMLStreamState),
NewState = State#state{socket = ZlibSocket,
sock_mod = ejabberd_zlib,
xml_stream_state = NewXMLStreamState},
case ejabberd_zlib:recv_data(ZlibSocket, "") of
{ok, ZlibData} ->
- {NextState, Hib} = process_data(ZlibData, NewState),
- {reply, ok, NextState, Hib};
+ {NextState, Hib} = process_data(ZlibData, NewState),
+ {reply, {ok, ZlibSocket}, NextState, Hib};
{error, _Reason} ->
{stop, normal, ok, NewState}
end;
@@ -164,6 +200,7 @@ handle_call(reset_stream, _From,
{reply, Reply, State#state{xml_stream_state = NewXMLStreamState},
?HIBERNATE_TIMEOUT};
handle_call({become_controller, C2SPid}, _From, State) ->
+ erlang:monitor(process, C2SPid),
close_stream(State#state.xml_stream_state),
XMLStreamState = new_xmlstream(C2SPid, State#state.max_stanza_size),
NewState = State#state{c2s_pid = C2SPid,
@@ -171,6 +208,24 @@ handle_call({become_controller, C2SPid}, _From, State) ->
activate_socket(NewState),
Reply = ok,
{reply, Reply, NewState, ?HIBERNATE_TIMEOUT};
+handle_call({change_controller, C2SPid}, _From, State) ->
+ erlang:monitor(process, C2SPid),
+ NewXMLStreamState = exmpp_xmlstream:change_callback(
+ State#state.xml_stream_state, {gen_fsm, C2SPid}),
+ NewState = State#state{c2s_pid = C2SPid,
+ xml_stream_state = NewXMLStreamState},
+ activate_socket(NewState),
+ {reply, ok, NewState, ?HIBERNATE_TIMEOUT};
+handle_call({send, Data}, _From, State) ->
+ case do_send(State, Data) of
+ ok ->
+ {reply, ok, State, ?HIBERNATE_TIMEOUT};
+ {error, _Reason} = Err ->
+ {stop, normal, Err, State}
+ end;
+handle_call(deactivate_socket, _From, State) ->
+ deactivate_socket(State),
+ {reply, ok, State, ?HIBERNATE_TIMEOUT};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State, ?HIBERNATE_TIMEOUT}.
@@ -231,6 +286,9 @@ handle_info({Tag, _TCPSocket, Reason}, State)
_ ->
{stop, normal, State}
end;
+handle_info({'DOWN', _MRef, process, C2SPid, _},
+ #state{c2s_pid = C2SPid} = State) ->
+ {stop, normal, State};
handle_info({timeout, _Ref, activate}, State) ->
activate_socket(State),
{noreply, State, ?HIBERNATE_TIMEOUT};
@@ -288,6 +346,17 @@ activate_socket(#state{socket = Socket,
ok
end.
+deactivate_socket(#state{socket = Socket,
+ tref = TRef,
+ sock_mod = SockMod}) ->
+ cancel_timer(TRef),
+ case SockMod of
+ gen_tcp ->
+ inet:setopts(Socket, [{active, false}]);
+ _ ->
+ SockMod:setopts(Socket, [{active, false}])
+ end.
+
%% Data processing for connectors directly generating xmlel in
%% Erlang data structure.
%% WARNING: Shaper does not work with Erlang data structure.
@@ -309,25 +378,25 @@ process_data([Element|Els], #state{c2s_pid = C2SPid} = State)
%% Data processing for connectors receivind data as string.
process_data(Data,
#state{xml_stream_state = XMLStreamState,
+ tref = TRef,
shaper_state = ShaperState,
c2s_pid = C2SPid} = State) ->
?DEBUG("Received XML on stream = ~p", [Data]),
{ok, XMLStreamState1} = exmpp_xmlstream:parse(XMLStreamState, Data),
{NewShaperState, Pause} = shaper:update(ShaperState, size(Data)),
- HibTimeout =
+ {NewTRef, HibTimeout} =
if
- C2SPid == undefined ->
- infinity;
+ C2SPid == undefined ->
+ {TRef, infinity};
Pause > 0 ->
- erlang:start_timer(Pause, self(), activate),
- hibernate;
-
- true ->
- activate_socket(State),
- ?HIBERNATE_TIMEOUT
+ {erlang:start_timer(Pause, self(), activate), hibernate};
+ true ->
+ activate_socket(State),
+ {TRef, ?HIBERNATE_TIMEOUT}
end,
{State#state{xml_stream_state = XMLStreamState1,
- shaper_state = NewShaperState}, HibTimeout}.
+ tref = NewTRef,
+ shaper_state = NewShaperState}, HibTimeout}.
%% Element coming from XML parser are wrapped inside xmlstreamelement
%% When we receive directly xmlel tuple (from a socket module
@@ -345,6 +414,23 @@ close_stream(XMLStreamState) ->
exmpp_xml:stop_parser(exmpp_xmlstream:get_parser(XMLStreamState)),
exmpp_xmlstream:stop(XMLStreamState).
+do_send(State, Data) ->
+ (State#state.sock_mod):send(State#state.socket, Data).
+
+cancel_timer(TRef) when is_reference(TRef) ->
+ case erlang:cancel_timer(TRef) of
+ false ->
+ receive
+ {timeout, TRef, _} ->
+ ok
+ after 0 ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+cancel_timer(_) ->
+ ok.
do_reset_stream(#state{xml_stream_state = undefined, c2s_pid = C2SPid, max_stanza_size = MaxStanzaSize}) ->
new_xmlstream(C2SPid, MaxStanzaSize);
diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl
index 122afa4df..d6927f398 100644
--- a/src/ejabberd_router.erl
+++ b/src/ejabberd_router.erl
@@ -38,7 +38,8 @@
unregister_route/1,
unregister_routes/1,
dirty_get_all_routes/0,
- dirty_get_all_domains/0
+ dirty_get_all_domains/0,
+ make_id/0
]).
-export([start_link/0]).
@@ -54,6 +55,9 @@
-record(route, {domain, pid, local_hint}).
-record(state, {}).
+%% "rr" stands for Record-Route.
+-define(ROUTE_PREFIX, "rr-").
+
%%====================================================================
%% API
%%====================================================================
@@ -76,7 +80,7 @@ route(FromOld, ToOld, #xmlelement{} = PacketOld) ->
[{?NS_XMPP, ?NS_XMPP_pfx}]),
route(From, To, Packet);
route(From, To, Packet) ->
- case catch do_route(From, To, Packet) of
+ case catch route_check_id(From, To, Packet) of
{'EXIT', Reason} ->
?ERROR_MSG("~p~nwhen processing: ~p",
[Reason, {From, To, Packet}]);
@@ -210,6 +214,8 @@ dirty_get_all_domains() ->
lists:map(fun erlang:binary_to_list/1,
mnesia:dirty_all_keys(route))).
+make_id() ->
+ ?ROUTE_PREFIX ++ randoms:get_string() ++ "-" ++ ejabberd_cluster:node_id().
%%====================================================================
%% gen_server callbacks
@@ -338,6 +344,32 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
+route_check_id(From, To, #xmlel{name = iq} = Packet) ->
+ case exmpp_xml:get_attribute_as_list(Packet, 'id', "") of
+ ?ROUTE_PREFIX ++ Rest ->
+ Type = exmpp_xml:get_attribute_as_list(Packet, 'type', ""),
+ if Type == "error"; Type == "result" ->
+ case string:tokens(Rest, "-") of
+ [_, NodeID] ->
+ case ejabberd_cluster:get_node_by_id(NodeID) of
+ Node when Node == node() ->
+ do_route(From, To, Packet);
+ Node ->
+ {ejabberd_router, Node} !
+ {route, From, To, Packet}
+ end;
+ _ ->
+ do_route(From, To, Packet)
+ end;
+ true ->
+ do_route(From, To, Packet)
+ end;
+ _ ->
+ do_route(From, To, Packet)
+ end;
+route_check_id(From, To, Packet) ->
+ do_route(From, To, Packet).
+
do_route(OrigFrom, OrigTo, OrigPacket) ->
?DEBUG("route~n\tfrom ~p~n\tto ~p~n\tpacket ~p~n",
[OrigFrom, OrigTo, OrigPacket]),
diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl
index f5946a416..59af8d9aa 100644
--- a/src/ejabberd_s2s.erl
+++ b/src/ejabberd_s2s.erl
@@ -40,7 +40,8 @@
dirty_get_connections/0,
allow_host/2,
incoming_s2s_number/0,
- outgoing_s2s_number/0
+ outgoing_s2s_number/0,
+ migrate/1
]).
%% gen_server callbacks
@@ -113,30 +114,63 @@ remove_connection(FromTo, Pid, Key) ->
end.
have_connection(FromTo) ->
- case catch mnesia:dirty_read(s2s, FromTo) of
- [_] ->
- true;
- _ ->
- false
+ case ejabberd_cluster:get_node(FromTo) of
+ Node when Node == node() ->
+ case mnesia:dirty_read(s2s, FromTo) of
+ [_] ->
+ true;
+ _ ->
+ false
+ end;
+ Node ->
+ case catch rpc:call(Node, mnesia, dirty_read,
+ [s2s, FromTo], 5000) of
+ [_] ->
+ true;
+ _ ->
+ false
+ end
end.
has_key(FromTo, Key) ->
- case mnesia:dirty_select(s2s,
- [{#s2s{fromto = FromTo, key = Key, _ = '_'},
- [],
- ['$_']}]) of
- [] ->
- false;
- _ ->
- true
+ Query = [{#s2s{fromto = FromTo, key = Key, _ = '_'},
+ [],
+ ['$_']}],
+ case ejabberd_cluster:get_node(FromTo) of
+ Node when Node == node() ->
+ case mnesia:dirty_select(s2s, Query) of
+ [] ->
+ false;
+ _ ->
+ true
+ end;
+ Node ->
+ case catch rpc:call(Node, mnesia, dirty_select,
+ [s2s, Query], 5000) of
+ [_|_] ->
+ true;
+ _ ->
+ false
+ end
end.
get_connections_pids(FromTo) ->
- case catch mnesia:dirty_read(s2s, FromTo) of
- L when is_list(L) ->
- [Connection#s2s.pid || Connection <- L];
- _ ->
- []
+ case ejabberd_cluster:get_node(FromTo) of
+ Node when Node == node() ->
+ case catch mnesia:dirty_read(s2s, FromTo) of
+ L when is_list(L) ->
+ [Connection#s2s.pid || Connection <- L];
+ _ ->
+ []
+ end;
+ Node ->
+ case catch rpc:call(Node, mnesia, dirty_read,
+ [s2s, FromTo], 5000) of
+ L when is_list(L) ->
+ [Connection#s2s.pid || Connection <- L];
+ _ ->
+ []
+ end
end.
try_register(FromTo) ->
@@ -167,7 +201,33 @@ try_register(FromTo) ->
end.
dirty_get_connections() ->
- mnesia:dirty_all_keys(s2s).
+ lists:flatmap(
+ fun(Node) when Node == node() ->
+ mnesia:dirty_all_keys(s2s);
+ (Node) ->
+ case catch rpc:call(Node, mnesia, dirty_all_keys, [s2s], 5000) of
+ L when is_list(L) ->
+ L;
+ _ ->
+ []
+ end
+ end, ejabberd_cluster:get_nodes()).
+
+migrate(After) ->
+ Ss = mnesia:dirty_select(
+ s2s,
+ [{#s2s{fromto = '$1', pid = '$2', _ = '_'},
+ [],
+ ['$$']}]),
+ lists:foreach(
+ fun([FromTo, Pid]) ->
+ case ejabberd_cluster:get_node_new(FromTo) of
+ Node when Node /= node() ->
+ ejabberd_s2s_out:stop_connection(Pid, After * 2);
+ _ ->
+ ok
+ end
+ end, Ss).
%%====================================================================
%% gen_server callbacks
@@ -182,10 +242,11 @@ dirty_get_connections() ->
%%--------------------------------------------------------------------
init([]) ->
update_tables(),
- mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag},
+ mnesia:create_table(s2s, [{ram_copies, [node()]},
+ {type, bag}, {local_content, true},
{attributes, record_info(fields, s2s)}]),
mnesia:add_table_copy(s2s, node(), ram_copies),
- mnesia:subscribe(system),
+ ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100),
ejabberd_commands:register_commands(commands()),
{ok, #state{}}.
@@ -217,9 +278,6 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
-handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
- clean_table_from_bad_node(Node),
- {noreply, State};
%% #xmlelement{} used for retro-compatibility
handle_info({route, FromOld, ToOld, #xmlelement{} = PacketOld}, State) ->
catch throw(for_stacktrace), % To have a stacktrace.
@@ -251,6 +309,7 @@ handle_info(_Info, State) ->
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
+ ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100),
ejabberd_commands:unregister_commands(commands()),
ok.
@@ -264,22 +323,19 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-clean_table_from_bad_node(Node) ->
- F = fun() ->
- Es = mnesia:select(
- s2s,
- [{#s2s{pid = '$1', _ = '_'},
- [{'==', {node, '$1'}, Node}],
- ['$_']}]),
- lists:foreach(fun(E) ->
- mnesia:delete_object(E)
- end, Es)
- end,
- mnesia:async_dirty(F).
-
do_route(From, To, Packet) ->
?DEBUG("s2s manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
[From, To, Packet, 8]),
+ FromTo = {exmpp_jid:prep_domain_as_list(From),
+ exmpp_jid:prep_domain_as_list(To)},
+ case ejabberd_cluster:get_node(FromTo) of
+ Node when Node == node() ->
+ do_route1(From, To, Packet);
+ Node ->
+ {?MODULE, Node} ! {route, From, To, Packet}
+ end.
+
+do_route1(From, To, Packet) ->
case find_connection(From, To) of
{atomic, Pid} when is_pid(Pid) ->
?DEBUG("sending to process ~p~n", [Pid]),
@@ -510,6 +566,12 @@ update_tables() ->
mnesia:delete_table(local_s2s);
false ->
ok
+ end,
+ case catch mnesia:table_info(s2s, local_content) of
+ false ->
+ mnesia:delete_table(s2s);
+ _ ->
+ ok
end.
%% Check if host is in blacklist or white list
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl
index b2e8764e1..37b45d390 100644
--- a/src/ejabberd_s2s_out.erl
+++ b/src/ejabberd_s2s_out.erl
@@ -34,7 +34,8 @@
start_link/3,
start_connection/1,
terminate_if_waiting_delay/2,
- stop_connection/1]).
+ stop_connection/1,
+ stop_connection/2]).
%% p1_fsm callbacks (same as gen_fsm)
-export([init/1,
@@ -52,7 +53,7 @@
handle_info/3,
terminate/3,
code_change/4,
- print_state/1,
+ print_state/1,
test_get_addr_port/1,
get_addr_port/1]).
@@ -85,10 +86,11 @@
%% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS).
--define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_out, [From, Host, Type],
- fsm_limit_opts() ++ ?FSMOPTS)).
+-define(SUPERVISOR_START, rpc:call(Node, p1_fsm, start,
+ [ejabberd_s2s_out, [From, Host, Type],
+ fsm_limit_opts() ++ ?FSMOPTS])).
-else.
--define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_out_sup,
+-define(SUPERVISOR_START, supervisor:start_child({ejabberd_s2s_out_sup, Node},
[From, Host, Type])).
-endif.
@@ -115,6 +117,7 @@
%%% API
%%%----------------------------------------------------------------------
start(From, Host, Type) ->
+ Node = ejabberd_cluster:get_node({From, Host}),
?SUPERVISOR_START.
start_link(From, Host, Type) ->
@@ -125,7 +128,10 @@ start_connection(Pid) ->
p1_fsm:send_event(Pid, init).
stop_connection(Pid) ->
- p1_fsm:send_event(Pid, stop).
+ p1_fsm:send_event(Pid, closed).
+
+stop_connection(Pid, Timeout) ->
+ p1_fsm:send_all_state_event(Pid, {closed, Timeout}).
%%%----------------------------------------------------------------------
%%% Callback functions from p1_fsm
@@ -736,6 +742,9 @@ stream_established(closed, StateData) ->
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
+handle_event({closed, Timeout}, StateName, StateData) ->
+ p1_fsm:send_event_after(Timeout, closed),
+ {next_state, StateName, StateData};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData, get_timeout_interval(StateName)}.
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 08059082f..3d1984ad0 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -32,7 +32,10 @@
%% API
-export([start_link/0,
route/3,
- open_session/3, close_session/2,
+ set_session/4,
+ open_session/3,
+ open_session/4,
+ close_session/2,
check_in_subscription/6,
bounce_offline_message/3,
disconnect_removed_user/2,
@@ -43,6 +46,7 @@
dirty_get_sessions_list/0,
dirty_get_my_sessions_list/0,
get_vh_session_list/1,
+ get_vh_my_session_list/1,
get_vh_session_number/1,
register_iq_handler/4,
register_iq_handler/5,
@@ -53,7 +57,8 @@
user_resources/2,
get_session_pid/1,
get_user_info/3,
- get_user_ip/1
+ get_user_ip/1,
+ migrate/1
]).
%% gen_server callbacks
@@ -67,7 +72,6 @@
-include("mod_privacy.hrl").
-record(session, {sid, usr, us, priority, info}).
--record(session_counter, {vhost, count}).
-record(state, {}).
%% default value for the maximum number of user connections
@@ -112,10 +116,11 @@ route(From, To, Packet) ->
ok
end.
-open_session(SID, JID, Info) when ?IS_JID(JID) ->
- set_session(SID, JID, undefined, Info),
- mnesia:dirty_update_counter(session_counter,
- exmpp_jid:domain(JID), 1),
+open_session(SID, JID, Info) ->
+ open_session(SID, JID, undefined, Info).
+
+open_session(SID, JID, Priority, Info) when ?IS_JID(JID) ->
+ set_session(SID, JID, Priority, Info),
check_for_sessions_to_replace(JID),
ejabberd_hooks:run(sm_register_connection_hook, exmpp_jid:prep_domain(JID),
[SID, JID, Info]).
@@ -126,9 +131,7 @@ close_session(SID, JID ) when ?IS_JID(JID)->
[#session{info=I}] -> I
end,
F = fun() ->
- mnesia:delete({session, SID}),
- mnesia:dirty_update_counter(session_counter,
- exmpp_jid:domain(JID), -1)
+ mnesia:delete({session, SID})
end,
mnesia:sync_dirty(F),
ejabberd_hooks:run(sm_remove_connection_hook, exmpp_jid:prep_domain(JID),
@@ -156,25 +159,37 @@ disconnect_removed_user(User, Server) ->
children = [{exit, "User removed"}]}).
get_user_resources(User, Server)
- when is_binary(User), is_binary(Server) ->
+ when is_binary(User), is_binary(Server) ->
US = {User, Server},
- case catch mnesia:dirty_index_read(session, US, #session.us) of
- {'EXIT', _Reason} ->
- [];
- Ss ->
- [element(3, S#session.usr) || S <- clean_session_list(Ss)]
+ Ss = case ejabberd_cluster:get_node({User, Server}) of
+ Node when Node == node() ->
+ catch mnesia:dirty_index_read(session, US, #session.us);
+ Node ->
+ catch rpc:call(Node, mnesia, dirty_index_read,
+ [session, US, #session.us], 5000)
+ end,
+ if is_list(Ss) ->
+ [element(3, S#session.usr) || S <- clean_session_list(Ss)];
+ true ->
+ []
end.
get_user_ip(JID) when ?IS_JID(JID) ->
- USR = {exmpp_jid:prep_node(JID),
- exmpp_jid:prep_domain(JID),
+ USR = {LUser = exmpp_jid:prep_node(JID),
+ LServer = exmpp_jid:prep_domain(JID),
exmpp_jid:prep_resource(JID)},
- case mnesia:dirty_index_read(session, USR, #session.usr) of
- [] ->
- undefined;
- Ss ->
+ Ss = case ejabberd_cluster:get_node({LUser, LServer}) of
+ Node when Node == node() ->
+ mnesia:dirty_index_read(session, USR, #session.usr);
+ Node ->
+ catch rpc:call(Node, mnesia, dirty_index_read,
+ [session, USR, #session.usr], 5000)
+ end,
+ if is_list(Ss), Ss /= [] ->
Session = lists:max(Ss),
- proplists:get_value(ip, Session#session.info)
+ proplists:get_value(ip, Session#session.info);
+ true ->
+ undefined
end.
get_user_info(User, Server, Resource)
@@ -187,15 +202,21 @@ get_user_info(User, Server, Resource)
USR = {LUser,
LServer,
LResource},
- case mnesia:dirty_index_read(session, USR, #session.usr) of
- [] ->
- offline;
- Ss ->
+ Ss = case ejabberd_cluster:get_node({LUser, LServer}) of
+ Node when Node == node() ->
+ mnesia:dirty_index_read(session, USR, #session.usr);
+ Node ->
+ catch rpc:call(Node, mnesia, dirty_index_read,
+ [session, USR, #session.usr], 5000)
+ end,
+ if is_list(Ss), Ss /= [] ->
Session = lists:max(Ss),
- Node = node(element(2, Session#session.sid)),
+ N = node(element(2, Session#session.sid)),
Conn = proplists:get_value(conn, Session#session.info),
IP = proplists:get_value(ip, Session#session.info),
- [{node, Node}, {conn, Conn}, {ip, IP}]
+ [{node, N}, {conn, Conn}, {ip, IP}];
+ true ->
+ offline
end.
set_presence(SID, JID, Priority, Presence, Info) when ?IS_JID(JID) ->
@@ -229,27 +250,38 @@ get_session_pid(JID) when ?IS_JID(JID) ->
get_session_pid({exmpp_jid:prep_node(JID),
exmpp_jid:prep_domain(JID),
exmpp_jid:prep_resource(JID)});
-get_session_pid(USR) ->
- case catch mnesia:dirty_index_read(session, USR, #session.usr) of
+get_session_pid({LUser, LServer, _} = USR) ->
+ Res = case ejabberd_cluster:get_node({LUser, LServer}) of
+ Node when Node == node() ->
+ mnesia:dirty_index_read(session, USR, #session.usr);
+ Node ->
+ catch rpc:call(Node, mnesia, dirty_index_read,
+ [session, USR, #session.usr], 5000)
+ end,
+ case Res of
[#session{sid = {_, Pid}}] -> Pid;
_ -> none
end.
dirty_get_sessions_list() ->
- mnesia:dirty_select(
- session,
- [{#session{usr = '$1', _ = '_'},
- [],
- ['$1']}]).
+ Match = [{#session{usr = '$1', _ = '_'}, [], ['$1']}],
+ lists:flatmap(
+ fun(Node) when Node == node() ->
+ mnesia:dirty_select(session, Match);
+ (Node) ->
+ case catch rpc:call(Node, mnesia, dirty_select,
+ [session, Match], 5000) of
+ Ss when is_list(Ss) ->
+ Ss;
+ _ ->
+ []
+ end
+ end, ejabberd_cluster:get_nodes()).
dirty_get_my_sessions_list() ->
- mnesia:dirty_select(
- session,
- [{#session{sid = {'_', '$1'}, _ = '_'},
- [{'==', {node, '$1'}, node()}],
- ['$_']}]).
+ mnesia:dirty_match_object(#session{_ = '_'}).
-get_vh_session_list(Server) when is_binary(Server) ->
+get_vh_my_session_list(Server) when is_binary(Server) ->
LServer = exmpp_stringprep:nameprep(Server),
mnesia:dirty_select(
session,
@@ -257,19 +289,24 @@ get_vh_session_list(Server) when is_binary(Server) ->
[{'==', {element, 2, '$1'}, LServer}],
['$1']}]).
+get_vh_session_list(Server) when is_binary(Server) ->
+ lists:flatmap(
+ fun(Node) when Node == node() ->
+ get_vh_my_session_list(Server);
+ (Node) ->
+ case catch rpc:call(Node, ?MODULE, get_vh_my_session_list,
+ [Server], 5000) of
+ Ss when is_list(Ss) ->
+ Ss;
+ _ ->
+ []
+ end
+ end, ejabberd_cluster:get_nodes()).
+
get_vh_session_number(Server) ->
- LServer = exmpp_jid:prep_domain(exmpp_jid:parse(Server)),
- Query = mnesia:dirty_select(
- session_counter,
- [{#session_counter{vhost = LServer, count = '$1'},
- [],
- ['$1']}]),
- case Query of
- [Count] ->
- Count;
- _ -> 0
- end.
-
+ %% TODO
+ length(get_vh_session_list(Server)).
+
register_iq_handler(Host, XMLNS, Module, Fun) ->
ejabberd_sm ! {register_iq_handler, Host, XMLNS, Module, Fun}.
@@ -279,6 +316,21 @@ register_iq_handler(Host, XMLNS, Module, Fun, Opts) ->
unregister_iq_handler(Host, XMLNS) ->
ejabberd_sm ! {unregister_iq_handler, Host, XMLNS}.
+migrate(After) ->
+ Ss = mnesia:dirty_select(
+ session,
+ [{#session{us = '$1', sid = {'_', '$2'}, _ = '_'},
+ [],
+ ['$$']}]),
+ lists:foreach(
+ fun([US, Pid]) ->
+ case ejabberd_cluster:get_node_new(US) of
+ Node when Node /= node() ->
+ ejabberd_c2s:migrate(Pid, Node, After);
+ _ ->
+ ok
+ end
+ end, Ss).
%%====================================================================
%% gen_server callbacks
@@ -295,16 +347,13 @@ init([]) ->
update_tables(),
mnesia:create_table(session,
[{ram_copies, [node()]},
+ {local_content, true},
{attributes, record_info(fields, session)}]),
- mnesia:create_table(session_counter,
- [{ram_copies, [node()]},
- {attributes, record_info(fields, session_counter)}]),
mnesia:add_table_index(session, usr),
mnesia:add_table_index(session, us),
mnesia:add_table_copy(session, node(), ram_copies),
- mnesia:add_table_copy(session_counter, node(), ram_copies),
- mnesia:subscribe(system),
ets:new(sm_iqtable, [named_table]),
+ ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100),
lists:foreach(
fun(Host) ->
HostB = list_to_binary(Host),
@@ -367,9 +416,6 @@ handle_info({route, From, To, Packet}, State) ->
ok
end,
{noreply, State};
-handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
- recount_session_table(Node),
- {noreply, State};
handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) ->
ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}),
{noreply, State};
@@ -396,6 +442,7 @@ handle_info(_Info, State) ->
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
+ ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100),
ejabberd_commands:unregister_commands(commands()),
ok.
@@ -424,38 +471,19 @@ set_session(SID, JID, Priority, Info) ->
end,
mnesia:sync_dirty(F).
-%% Recalculates alive sessions when Node goes down
-%% and updates session and session_counter tables
-recount_session_table(Node) ->
- F = fun() ->
- Es = mnesia:select(
- session,
- [{#session{sid = {'_', '$1'}, _ = '_'},
- [{'==', {node, '$1'}, Node}],
- ['$_']}]),
- lists:foreach(fun(E) ->
- mnesia:delete({session, E#session.sid})
- end, Es),
- %% reset session_counter table with active sessions
- mnesia:clear_table(session_counter),
- lists:foreach(fun(Server) ->
- LServer = exmpp_jid:prep_domain(exmpp_jid:parse(Server)),
- Hs = mnesia:select(session,
- [{#session{usr = '$1', _ = '_'},
- [{'==', {element, 2, '$1'}, LServer}],
- ['$1']}]),
- mnesia:write(
- #session_counter{vhost = LServer,
- count = length(Hs)})
- end, ?MYHOSTS)
- end,
- mnesia:async_dirty(F).
-
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
do_route(From, To, Packet) ->
?DEBUG("session manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
[From, To, Packet, 8]),
+ {U, S} = {exmpp_jid:prep_node(To), exmpp_jid:prep_domain(To)},
+ case ejabberd_cluster:get_node({U, S}) of
+ Node when Node /= node() ->
+ {?MODULE, Node} ! {route, From, To, Packet};
+ _ ->
+ do_route1(From, To, Packet)
+ end.
+
+do_route1(From, To, Packet) ->
case exmpp_jid:prep_resource(To) of
undefined ->
case Packet of
@@ -835,4 +863,11 @@ update_tables() ->
mnesia:delete_table(local_session);
false ->
ok
+ end,
+ mnesia:delete_table(session_counter),
+ case catch mnesia:table_info(session, local_content) of
+ false ->
+ mnesia:delete_table(session);
+ _ ->
+ ok
end.
diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl
index 25ff64c27..8eeda452d 100644
--- a/src/ejabberd_socket.erl
+++ b/src/ejabberd_socket.erl
@@ -44,6 +44,7 @@
get_peer_certificate/1,
get_verify_result/1,
close/1,
+ change_controller/2,
sockname/1, peername/1]).
-include("ejabberd.hrl").
@@ -129,29 +130,19 @@ connect(Addr, Port, Opts, Timeout) ->
end.
starttls(SocketData, TLSOpts) ->
- {ok, TLSSocket} = tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts),
- ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket),
- SocketData#socket_state{socket = TLSSocket, sockmod = tls}.
+ starttls(SocketData, TLSOpts, undefined).
starttls(SocketData, TLSOpts, Data) ->
- {ok, TLSSocket} = tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts),
- ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket),
- send(SocketData, Data),
+ {ok, TLSSocket} = ejabberd_receiver:starttls(
+ SocketData#socket_state.receiver, TLSOpts, Data),
SocketData#socket_state{socket = TLSSocket, sockmod = tls}.
compress(SocketData) ->
- {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
- SocketData#socket_state.sockmod,
- SocketData#socket_state.socket),
- ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket),
- SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}.
+ compress(SocketData, undefined).
compress(SocketData, Data) ->
- {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
- SocketData#socket_state.sockmod,
- SocketData#socket_state.socket),
- ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket),
- send(SocketData, Data),
+ {ok, ZlibSocket} = ejabberd_receiver:compress(
+ SocketData#socket_state.receiver, Data),
SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}.
reset_stream(SocketData) when is_pid(SocketData#socket_state.receiver) ->
@@ -160,10 +151,25 @@ reset_stream(SocketData) when is_atom(SocketData#socket_state.receiver) ->
(SocketData#socket_state.receiver):reset_stream(
SocketData#socket_state.socket).
+change_controller(#socket_state{receiver = Recv}, Pid) when is_pid(Recv) ->
+ ejabberd_receiver:setopts(Recv, [{active, false}]),
+ sync_events(Pid),
+ ejabberd_receiver:change_controller(Recv, Pid);
+change_controller(#socket_state{socket = Socket, receiver = Mod}, Pid) ->
+ Mod:setopts(Socket, [{active, false}]),
+ sync_events(Pid),
+ Mod:change_controller(Socket, Pid).
+
%% sockmod=gen_tcp|tls|ejabberd_zlib
send(SocketData, Data) ->
- case catch (SocketData#socket_state.sockmod):send(
- SocketData#socket_state.socket, Data) of
+ Res = if node(SocketData#socket_state.receiver) == node() ->
+ catch (SocketData#socket_state.sockmod):send(
+ SocketData#socket_state.socket, Data);
+ true ->
+ catch ejabberd_receiver:send(
+ SocketData#socket_state.receiver, Data)
+ end,
+ case Res of
ok -> ok;
{error, timeout} ->
?INFO_MSG("Timeout on ~p:send",[SocketData#socket_state.sockmod]),
@@ -225,3 +231,21 @@ peername(#socket_state{sockmod = SockMod, socket = Socket}) ->
%%====================================================================
%% Internal functions
%%====================================================================
+%% dirty hack to relay queued messages from
+%% old owner to new owner. The idea is based
+%% on code of gen_tcp:controlling_process/2.
+sync_events(C2SPid) ->
+ receive
+ {'$gen_event', El} = Event when element(1, El) == xmlel;
+ element(1, El) == xmlstreamstart;
+ element(1, El) == xmlstreamelement;
+ element(1, El) == xmlstreamend;
+ element(1, El) == xmlstreamerror ->
+ C2SPid ! Event,
+ sync_events(C2SPid);
+ closed ->
+ C2SPid ! closed,
+ sync_events(C2SPid)
+ after 0 ->
+ ok
+ end.
diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl
index b3a1a5ea1..1fc193e5d 100644
--- a/src/ejabberd_sup.erl
+++ b/src/ejabberd_sup.erl
@@ -42,13 +42,6 @@ init([]) ->
brutal_kill,
worker,
[ejabberd_hooks]},
- NodeGroups =
- {ejabberd_node_groups,
- {ejabberd_node_groups, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [ejabberd_node_groups]},
SystemMonitor =
{ejabberd_system_monitor,
{ejabberd_system_monitor, start_link, []},
@@ -184,9 +177,16 @@ init([]) ->
infinity,
supervisor,
[ejabberd_tmp_sup]},
+ Cluster =
+ {ejabberd_cluster,
+ {ejabberd_cluster, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [ejabberd_cluster]},
{ok, {{one_for_one, 10, 1},
[Hooks,
- NodeGroups,
+ Cluster,
SystemMonitor,
Router,
Router_multicast,
diff --git a/src/mod_muc/mod_muc.erl b/src/mod_muc/mod_muc.erl
index c75c3e8cb..b02d8b8a4 100644
--- a/src/mod_muc/mod_muc.erl
+++ b/src/mod_muc/mod_muc.erl
@@ -41,6 +41,9 @@
create_room/5,
process_iq_disco_items/4,
broadcast_service_message/2,
+ register_room/3,
+ migrate/1,
+ get_vh_rooms/1,
can_use_nick/3]).
%% gen_server callbacks
@@ -112,7 +115,9 @@ room_destroyed(Host, Room, Pid, ServerHost) when is_binary(Host),
%% Else use the passed options as defined in mod_muc_room.
create_room(Host, Name, From, Nick, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- gen_server:call(Proc, {create, Name, From, Nick, Opts}).
+ RoomHost = gen_mod:get_module_opt_host(Host, ?MODULE, "conference.@HOST@"),
+ Node = ejabberd_cluster:get_node({Name, RoomHost}),
+ gen_server:call({Proc, Node}, {create, Name, From, Nick, Opts}).
store_room(Host, Name, Opts) when is_binary(Host), is_binary(Name) ->
F = fun() ->
@@ -163,6 +168,22 @@ can_use_nick(Host, JID, Nick) when is_binary(Host), is_binary(Nick) ->
U == LUS
end.
+migrate(After) ->
+ Rs = mnesia:dirty_select(
+ muc_online_room,
+ [{#muc_online_room{name_host = '$1', pid = '$2', _ = '_'},
+ [],
+ ['$$']}]),
+ lists:foreach(
+ fun([NameHost, Pid]) ->
+ case ejabberd_cluster:get_node_new(NameHost) of
+ Node when Node /= node() ->
+ mod_muc_room:migrate(Pid, Node, After);
+ _ ->
+ ok
+ end
+ end, Rs).
+
%%====================================================================
%% gen_server callbacks
%%====================================================================
@@ -175,6 +196,7 @@ can_use_nick(Host, JID, Nick) when is_binary(Host), is_binary(Nick) ->
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([Host, Opts]) ->
+ update_muc_online_table(),
mnesia:create_table(muc_room,
[{disc_copies, [node()]},
{attributes, record_info(fields, muc_room)}]),
@@ -183,15 +205,14 @@ init([Host, Opts]) ->
{attributes, record_info(fields, muc_registered)}]),
mnesia:create_table(muc_online_room,
[{ram_copies, [node()]},
+ {local_content, true},
{attributes, record_info(fields, muc_online_room)}]),
mnesia:add_table_copy(muc_online_room, node(), ram_copies),
catch ets:new(muc_online_users, [bag, named_table, public, {keypos, 2}]),
MyHost_L = gen_mod:get_opt_host(Host, Opts, "conference.@HOST@"),
MyHost = list_to_binary(MyHost_L),
update_tables(MyHost),
- clean_table_from_bad_node(node(), MyHost),
mnesia:add_table_index(muc_registered, nick),
- mnesia:subscribe(system),
Access = gen_mod:get_opt(access, Opts, all),
AccessCreate = gen_mod:get_opt(access_create, Opts, all),
AccessAdmin = gen_mod:get_opt(access_admin, Opts, none),
@@ -200,6 +221,7 @@ init([Host, Opts]) ->
DefRoomOpts = gen_mod:get_opt(default_room_options, Opts, []),
RoomShaper = gen_mod:get_opt(room_shaper, Opts, none),
ejabberd_router:register_route(MyHost_L),
+ ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100),
load_permanent_rooms(MyHost, Host,
{Access, AccessCreate, AccessAdmin, AccessPersistent},
HistorySize,
@@ -266,12 +288,19 @@ handle_info({route, From, To, Packet},
default_room_opts = DefRoomOpts,
history_size = HistorySize,
room_shaper = RoomShaper} = State) ->
- case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
- From, To, Packet, DefRoomOpts) of
- {'EXIT', Reason} ->
- ?ERROR_MSG("~p", [Reason]);
- _ ->
- ok
+ US = {exmpp_jid:prep_node(To), exmpp_jid:prep_domain(To)},
+ case ejabberd_cluster:get_node(US) of
+ Node when Node == node() ->
+ case catch do_route(Host, ServerHost, Access, HistorySize,
+ RoomShaper, From, To, Packet, DefRoomOpts) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p", [Reason]);
+ _ ->
+ ok
+ end;
+ Node ->
+ Proc = gen_mod:get_module_proc(ServerHost, ?PROCNAME),
+ {Proc, Node} ! {route, From, To, Packet}
end,
{noreply, State};
handle_info({room_destroyed, RoomHost, Pid}, State) ->
@@ -279,10 +308,7 @@ handle_info({room_destroyed, RoomHost, Pid}, State) ->
mnesia:delete_object(#muc_online_room{name_host = RoomHost,
pid = Pid})
end,
- mnesia:transaction(F),
- {noreply, State};
-handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
- clean_table_from_bad_node(Node),
+ mnesia:async_dirty(F),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -295,6 +321,7 @@ handle_info(_Info, State) ->
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
+ ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100),
ejabberd_router:unregister_route(binary_to_list(State#state.host)),
ok.
@@ -516,17 +543,22 @@ load_permanent_rooms(Host, ServerHost, Access, HistorySize, RoomShaper) ->
lists:foreach(
fun(R) ->
{Room, Host} = R#muc_room.name_host,
- case mnesia:dirty_read(muc_online_room, {Room, Host}) of
- [] ->
- {ok, Pid} = mod_muc_room:start(
- Host,
- ServerHost,
- Access,
- Room,
- HistorySize,
- RoomShaper,
- R#muc_room.opts),
- register_room(Host, Room, Pid);
+ case ejabberd_cluster:get_node({Room, Host}) of
+ Node when Node == node() ->
+ case mnesia:dirty_read(muc_online_room, {Room, Host}) of
+ [] ->
+ {ok, Pid} = mod_muc_room:start(
+ Host,
+ ServerHost,
+ Access,
+ Room,
+ HistorySize,
+ RoomShaper,
+ R#muc_room.opts),
+ register_room(Host, Room, Pid);
+ _ ->
+ ok
+ end;
_ ->
ok
end
@@ -555,7 +587,7 @@ register_room(Host, Room, Pid) when is_binary(Host), is_binary(Room) ->
mnesia:write(#muc_online_room{name_host = {Room, Host},
pid = Pid})
end,
- mnesia:transaction(F).
+ mnesia:async_dirty(F).
iq_disco_info(Lang) ->
@@ -605,7 +637,7 @@ iq_disco_items(Host, From, Lang, none) when is_binary(Host) ->
_ ->
false
end
- end, get_vh_rooms(Host));
+ end, get_vh_rooms_all_nodes(Host));
iq_disco_items(Host, From, Lang, Rsm) ->
{Rooms, RsmO} = get_vh_rooms(Host, Rsm),
@@ -639,19 +671,9 @@ iq_disco_items(Host, From, Lang, Rsm) ->
end, Rooms) ++ RsmOut.
get_vh_rooms(Host, #rsm_in{max=M, direction=Direction, id=I, index=Index})->
- AllRooms = lists:sort(get_vh_rooms(Host)),
+ AllRooms = get_vh_rooms_all_nodes(Host),
Count = erlang:length(AllRooms),
- Guard = case Direction of
- _ when Index =/= undefined -> [{'==', {element, 2, '$1'}, Host}];
- aft -> [{'==', {element, 2, '$1'}, Host}, {'>=',{element, 1, '$1'} ,I}];
- before when I =/= []-> [{'==', {element, 2, '$1'}, Host}, {'=<',{element, 1, '$1'} ,I}];
- _ -> [{'==', {element, 2, '$1'}, Host}]
- end,
- L = lists:sort(
- mnesia:dirty_select(muc_online_room,
- [{#muc_online_room{name_host = '$1', _ = '_'},
- Guard,
- ['$_']}])),
+ L = get_vh_rooms_direction(Direction, I, Index, AllRooms),
L2 = if
Index == undefined andalso Direction == before ->
lists:reverse(lists:sublist(lists:reverse(L), 1, M));
@@ -674,6 +696,27 @@ get_vh_rooms(Host, #rsm_in{max=M, direction=Direction, id=I, index=Index})->
{L2, #rsm_out{first=F, last=Last, count=Count, index=NewIndex}}
end.
+get_vh_rooms_direction(_Direction, _I, Index, AllRooms) when Index =/= undefined ->
+ AllRooms;
+get_vh_rooms_direction(aft, I, _Index, AllRooms) ->
+ {_Before, After} =
+ lists:splitwith(
+ fun(#muc_online_room{name_host = {Na, _}}) ->
+ Na < I end, AllRooms),
+ case After of
+ [] -> [];
+ [#muc_online_room{name_host = {I, _Host}} | AfterTail] -> AfterTail;
+ _ -> After
+ end;
+get_vh_rooms_direction(before, I, _Index, AllRooms) when I =/= []->
+ {Before, _} =
+ lists:splitwith(
+ fun(#muc_online_room{name_host = {Na, _}}) ->
+ Na < I end, AllRooms),
+ Before;
+get_vh_rooms_direction(_Direction, _I, _Index, AllRooms) ->
+ AllRooms.
+
%% @doc Return the position of desired room in the list of rooms.
%% The room must exist in the list. The count starts in 0.
%% @spec (Desired::muc_online_room(), Rooms::[muc_online_room()]) -> integer()
@@ -847,7 +890,22 @@ broadcast_service_message(Host, Msg) ->
fun(#muc_online_room{pid = Pid}) ->
gen_fsm:send_all_state_event(
Pid, {service_message, Msg})
- end, get_vh_rooms(Host)).
+ end, get_vh_rooms_all_nodes(Host)).
+
+get_vh_rooms_all_nodes(Host) ->
+ Rooms = lists:foldl(
+ fun(Node, Acc) when Node == node() ->
+ get_vh_rooms(Host) ++ Acc;
+ (Node, Acc) ->
+ case catch rpc:call(Node, ?MODULE, get_vh_rooms,
+ [Host], 5000) of
+ Res when is_list(Res) ->
+ Res ++ Acc;
+ _ ->
+ Acc
+ end
+ end, [], ejabberd_cluster:get_nodes()),
+ lists:ukeysort(#muc_online_room.name_host, Rooms).
get_vh_rooms(Host) when is_binary(Host) ->
mnesia:dirty_select(muc_online_room,
@@ -855,39 +913,18 @@ get_vh_rooms(Host) when is_binary(Host) ->
[{'==', {element, 2, '$1'}, Host}],
['$_']}]).
-
-clean_table_from_bad_node(Node) ->
- F = fun() ->
- Es = mnesia:select(
- muc_online_room,
- [{#muc_online_room{pid = '$1', _ = '_'},
- [{'==', {node, '$1'}, Node}],
- ['$_']}]),
- lists:foreach(fun(E) ->
- mnesia:delete_object(E)
- end, Es)
- end,
- mnesia:async_dirty(F).
-
-clean_table_from_bad_node(Node, Host) ->
- F = fun() ->
- Es = mnesia:select(
- muc_online_room,
- [{#muc_online_room{pid = '$1',
- name_host = {'_', Host},
- _ = '_'},
- [{'==', {node, '$1'}, Node}],
- ['$_']}]),
- lists:foreach(fun(E) ->
- mnesia:delete_object(E)
- end, Es)
- end,
- mnesia:async_dirty(F).
-
update_tables(Host) ->
update_muc_room_table(Host),
update_muc_registered_table(Host).
+update_muc_online_table() ->
+ case catch mnesia:table_info(muc_online_room, local_content) of
+ false ->
+ mnesia:delete_table(muc_online_room);
+ _ ->
+ ok
+ end.
+
update_muc_room_table(Host) ->
Fields = record_info(fields, muc_room),
case mnesia:table_info(muc_room, attributes) of
diff --git a/src/mod_muc/mod_muc_log.erl b/src/mod_muc/mod_muc_log.erl
index 71d07ee27..1b5175ec0 100644
--- a/src/mod_muc/mod_muc_log.erl
+++ b/src/mod_muc/mod_muc_log.erl
@@ -74,11 +74,11 @@
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Host, Opts) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
+ Proc = get_proc_name(Host),
+ gen_server:start_link(Proc, ?MODULE, [Host, Opts], []).
start(Host, Opts) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ Proc = get_proc_name(Host),
ChildSpec =
{Proc,
{?MODULE, start_link, [Host, Opts]},
@@ -89,7 +89,7 @@ start(Host, Opts) ->
supervisor:start_child(ejabberd_sup, ChildSpec).
stop(Host) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ Proc = get_proc_name(Host),
gen_server:call(Proc, stop),
supervisor:delete_child(ejabberd_sup, Proc).
@@ -955,7 +955,8 @@ get_room_state(RoomPid) ->
{ok, R} = gen_fsm:sync_send_all_state_event(RoomPid, get_state),
R.
-get_proc_name(Host) -> gen_mod:get_module_proc(Host, ?PROCNAME).
+get_proc_name(Host) ->
+ {global, gen_mod:get_module_proc(Host, ?PROCNAME)}.
calc_hour_offset(TimeHere) ->
TimeZero = calendar:now_to_universal_time(now()),
diff --git a/src/mod_muc/mod_muc_room.erl b/src/mod_muc/mod_muc_room.erl
index 8d2229e99..dccbd1a97 100644
--- a/src/mod_muc/mod_muc_room.erl
+++ b/src/mod_muc/mod_muc_room.erl
@@ -27,14 +27,17 @@
-module(mod_muc_room).
-author('alexey@process-one.net').
--behaviour(gen_fsm).
+-define(GEN_FSM, p1_fsm).
%% External exports
-export([start_link/9,
start_link/7,
+ start_link/2,
start/9,
start/7,
+ start/2,
+ migrate/3,
route/4]).
%% gen_fsm callbacks
@@ -44,6 +47,7 @@
handle_sync_event/4,
handle_info/3,
terminate/3,
+ print_state/1,
code_change/4]).
-include_lib("exmpp/include/exmpp.hrl").
@@ -65,19 +69,14 @@
%% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS).
--define(SUPERVISOR_START,
- gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Creator, Nick, DefRoomOpts],
- ?FSMOPTS)).
+-define(SUPERVISOR_START(Args),
+ ?GEN_FSM:start(?MODULE, Args, ?FSMOPTS)).
-else.
--define(SUPERVISOR_START,
+-define(SUPERVISOR_START(Args),
Supervisor = gen_mod:get_module_proc(ServerHost, ejabberd_mod_muc_sup),
- supervisor:start_child(
- Supervisor, [Host, ServerHost, Access, Room, HistorySize, RoomShaper,
- Creator, Nick, DefRoomOpts])).
+ supervisor:start_child(Supervisor, Args)).
-endif.
-
-define(ERR(Packet,Type, Lang, ErrText),
exmpp_stanza:error(Packet#xmlel.ns,
Type,
@@ -88,7 +87,8 @@
%%%----------------------------------------------------------------------
start(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
Creator, Nick, DefRoomOpts) ->
- ?SUPERVISOR_START.
+ ?SUPERVISOR_START([Host, ServerHost, Access, Room, HistorySize,
+ RoomShaper, Creator, Nick, DefRoomOpts]).
start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
Supervisor = gen_mod:get_module_proc(ServerHost, ejabberd_mod_muc_sup),
@@ -96,16 +96,26 @@ start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
Supervisor, [Host, ServerHost, Access, Room, HistorySize, RoomShaper,
Opts]).
+start(StateName, StateData) ->
+ ServerHost = StateData#state.server_host,
+ ?SUPERVISOR_START([StateName, StateData]).
+
start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
Creator, Nick, DefRoomOpts) ->
- gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Creator, Nick, DefRoomOpts],
- ?FSMOPTS).
+ ?GEN_FSM:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
+ RoomShaper, Creator, Nick, DefRoomOpts],
+ ?FSMOPTS).
start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
- gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Opts],
- ?FSMOPTS).
+ ?GEN_FSM:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
+ RoomShaper, Opts],
+ ?FSMOPTS).
+
+start_link(StateName, StateData) ->
+ ?GEN_FSM:start_link(?MODULE, [StateName, StateData], ?FSMOPTS).
+
+migrate(FsmRef, Node, After) ->
+ ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
@@ -147,7 +157,11 @@ init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) ->
jid = exmpp_jid:make(Room, Host),
room_shaper = Shaper}),
add_to_log(room_existence, started, State),
- {ok, normal_state, State}.
+ {ok, normal_state, State};
+init([StateName, #state{room = Room, host = Host} = StateData]) ->
+ process_flag(trap_exit, true),
+ mod_muc:register_room(Host, Room, self()),
+ {ok, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: StateName/2
@@ -600,6 +614,9 @@ handle_event(destroy, StateName, StateData) ->
handle_event({set_affiliations, Affiliations}, StateName, StateData) ->
{next_state, StateName, StateData#state{affiliations = Affiliations}};
+handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() ->
+ {migrate, StateData,
+ {Node, ?MODULE, start, [StateName, StateData]}, After * 2};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
@@ -644,6 +661,9 @@ handle_sync_event(_Event, _From, StateName, StateData) ->
code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
+print_state(StateData) ->
+ StateData.
+
%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
@@ -733,6 +753,13 @@ handle_info(_Info, StateName, StateData) ->
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
+terminate({migrated, Clone}, _StateName, StateData) ->
+ ?INFO_MSG("Migrating room ~s@~s to ~p on node ~p",
+ [StateData#state.room, StateData#state.host,
+ Clone, node(Clone)]),
+ mod_muc:room_destroyed(StateData#state.host, StateData#state.room,
+ self(), StateData#state.server_host),
+ ok;
terminate(Reason, _StateName, StateData) ->
?INFO_MSG("Stopping MUC room ~s@~s",
[StateData#state.room, StateData#state.host]),
@@ -778,7 +805,7 @@ terminate(Reason, _StateName, StateData) ->
%%%----------------------------------------------------------------------
route(Pid, From, ToNick, Packet) ->
- gen_fsm:send_event(Pid, {route, From, ToNick, Packet}).
+ ?GEN_FSM:send_event(Pid, {route, From, ToNick, Packet}).
process_groupchat_message(From, #xmlel{name = 'message'} = Packet,
StateData) ->
@@ -1673,13 +1700,15 @@ add_new_user(From, Nick, Packet, StateData) ->
From, Err),
StateData;
captcha_required ->
- ID = randoms:get_string(),
- SID = case exmpp_stanza:get_id(Packet) of undefined -> ""; SID1 -> SID1 end,
+ SID = case exmpp_stanza:get_id(Packet) of
+ undefined -> "";
+ SID1 -> SID1
+ end,
RoomJID = StateData#state.jid,
To = jid_replace_resource(RoomJID, Nick),
case ejabberd_captcha:create_captcha(
- ID, SID, RoomJID, To, Lang, From) of
- {ok, CaptchaEls} ->
+ SID, RoomJID, To, Lang, From) of
+ {ok, ID, CaptchaEls} ->
MsgPkt = #xmlel{name = 'message',
attrs = [#xmlattr{name = 'id', value = ID}],
children = CaptchaEls},
diff --git a/src/mod_proxy65/mod_proxy65_sm.erl b/src/mod_proxy65/mod_proxy65_sm.erl
index 569458f6a..bdd1b5bb8 100644
--- a/src/mod_proxy65/mod_proxy65_sm.erl
+++ b/src/mod_proxy65/mod_proxy65_sm.erl
@@ -71,7 +71,9 @@ start_link(Host, Opts) ->
gen_server:start_link({local, Proc}, ?MODULE, [Opts], []).
init([Opts]) ->
+ update_tables(),
mnesia:create_table(bytestream, [{ram_copies, [node()]},
+ {local_content, true},
{attributes, record_info(fields, bytestream)}]),
mnesia:add_table_copy(bytestream, node(), ram_copies),
MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity),
@@ -179,3 +181,11 @@ activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) ->
_ ->
error
end.
+
+update_tables() ->
+ case catch mnesia:table_info(bytestream, local_content) of
+ false ->
+ mnesia:delete_table(bytestream);
+ _ ->
+ ok
+ end.
diff --git a/src/p1_fsm.erl b/src/p1_fsm.erl
index 03ff7f8ce..9ca924112 100644
--- a/src/p1_fsm.erl
+++ b/src/p1_fsm.erl
@@ -517,6 +517,25 @@ print_event(Dev, return, {Name, StateName}) ->
io:format(Dev, "*DBG* ~p switched to state ~w~n",
[Name, StateName]).
+relay_messages(MRef, TRef, Clone, Queue) ->
+ lists:foreach(
+ fun(Msg) -> Clone ! Msg end,
+ queue:to_list(Queue)),
+ relay_messages(MRef, TRef, Clone).
+
+relay_messages(MRef, TRef, Clone) ->
+ receive
+ {'DOWN', MRef, process, Clone, Reason} ->
+ Reason;
+ {'EXIT', _Parent, _Reason} ->
+ {migrated, Clone};
+ {timeout, TRef, timeout} ->
+ {migrated, Clone};
+ Msg ->
+ Clone ! Msg,
+ relay_messages(MRef, TRef, Clone)
+ end.
+
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
Limits, Queue, QueueLen) -> %No debug here
From = from(Msg),
@@ -535,6 +554,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
reply(From, Reply),
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
Limits, Queue, QueueLen);
+ {migrate, NStateData, {Node, M, F, A}, Time1} ->
+ Reason = case catch rpc:call(Node, M, F, A, 5000) of
+ {badrpc, _} = Err ->
+ {migration_error, Err};
+ {'EXIT', _} = Err ->
+ {migration_error, Err};
+ {error, _} = Err ->
+ {migration_error, Err};
+ {ok, Clone} ->
+ process_flag(trap_exit, true),
+ MRef = erlang:monitor(process, Clone),
+ TRef = erlang:start_timer(Time1, self(), timeout),
+ relay_messages(MRef, TRef, Clone, Queue);
+ Reply ->
+ {migration_error, {bad_reply, Reply}}
+ end,
+ terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, Reply, NStateData} when From =/= undefined ->
@@ -571,6 +607,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData,
Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData,
Mod, Time1, Debug1, Limits, Queue, QueueLen);
+ {migrate, NStateData, {Node, M, F, A}, Time1} ->
+ Reason = case catch rpc:call(Node, M, F, A, Time1) of
+ {badrpc, R} ->
+ {migration_error, R};
+ {'EXIT', R} ->
+ {migration_error, R};
+ {error, R} ->
+ {migration_error, R};
+ {ok, Clone} ->
+ process_flag(trap_exit, true),
+ MRef = erlang:monitor(process, Clone),
+ TRef = erlang:start_timer(Time1, self(), timeout),
+ relay_messages(MRef, TRef, Clone, Queue);
+ Reply ->
+ {migration_error, {bad_reply, Reply}}
+ end,
+ terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, Reply, NStateData} when From =/= undefined ->
@@ -633,12 +686,10 @@ terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) ->
%% Priority shutdown should be considered as
%% shutdown by SASL
exit(shutdown);
- {process_limit, Limit} ->
- %% Priority shutdown should be considered as
- %% shutdown by SASL
- error_logger:error_msg("FSM limit reached (~p): ~p~n",
- [self(), Limit]),
- exit(shutdown);
+ {process_limit, _Limit} ->
+ exit(Reason);
+ {migrated, _Clone} ->
+ exit(normal);
_ ->
error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug),
exit(Reason)
@@ -705,7 +756,12 @@ get_msg(Msg) -> Msg.
format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time]] =
StatusData,
- Header = lists:concat(["Status for state machine ", Name]),
+ NameTag = if is_pid(Name) ->
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
+ Header = lists:concat(["Status for state machine ", NameTag]),
Log = sys:get_debug(log, Debug, []),
Specfic =
case erlang:function_exported(Mod, format_status, 2) of
diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl
index bef9c268d..d25c7e060 100644
--- a/src/web/ejabberd_http_bind.erl
+++ b/src/web/ejabberd_http_bind.erl
@@ -27,6 +27,7 @@
setopts/2,
controlling_process/2,
become_controller/2,
+ change_controller/2,
custom_receiver/1,
reset_stream/1,
change_shaper/2,
@@ -114,9 +115,19 @@
start(XMPPDomain, Sid, Key, IP) ->
?DEBUG("Starting session", []),
case catch supervisor:start_child(ejabberd_http_bind_sup, [Sid, Key, IP]) of
- {ok, Pid} -> {ok, Pid};
- _ -> check_bind_module(XMPPDomain),
- {error, "Cannot start HTTP bind session"}
+ {ok, Pid} ->
+ {ok, Pid};
+ {error, _} = Err ->
+ case check_bind_module(XMPPDomain) of
+ false ->
+ {error, "Cannot start HTTP bind session"};
+ true ->
+ ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Err]),
+ Err
+ end;
+ Exit ->
+ ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Exit]),
+ {error, Exit}
end.
start_link(Sid, Key, IP) ->
@@ -133,7 +144,13 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
true ->
gen_fsm:send_all_state_event(FsmRef, {activate, self()});
_ ->
- ok
+ case lists:member({active, false}, Opts) of
+ true ->
+ gen_fsm:sync_send_all_state_event(
+ FsmRef, deactivate_socket);
+ _ ->
+ ok
+ end
end.
controlling_process(_Socket, _Pid) ->
@@ -145,6 +162,9 @@ custom_receiver({http_bind, FsmRef, _IP}) ->
become_controller(FsmRef, C2SPid) ->
gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}).
+change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
+ become_controller(FsmRef, C2SPid).
+
reset_stream({http_bind, _FsmRef, _IP}) ->
ok.
@@ -185,12 +205,13 @@ process_request(Data, IP) ->
"xmlns='" ++ ?NS_HTTP_BIND_s ++ "'/>"};
XmppDomain ->
%% create new session
- Sid = sha:sha(term_to_binary({now(), make_ref()})),
+ Sid = make_sid(),
case start(XmppDomain, Sid, "", IP) of
{error, _} ->
- {200, ?HEADER, "<body type='terminate' "
+ {500, ?HEADER, "<body type='terminate' "
"condition='internal-server-error' "
- "xmlns='" ++ ?NS_HTTP_BIND_s ++ "'>BOSH module not started</body>"};
+ "xmlns='" ++ ?NS_HTTP_BIND_s ++
+ "'>Internal Server Error</body>"};
{ok, Pid} ->
handle_session_start(
Pid, XmppDomain, Sid, Rid, Attrs,
@@ -216,10 +237,10 @@ process_request(Data, IP) ->
handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
StreamStart, IP);
{size_limit, Sid} ->
- case mnesia:dirty_read({http_bind, Sid}) of
- [] ->
+ case get_session(Sid) of
+ {error, _} ->
{404, ?HEADER, ""};
- [#http_bind{pid = FsmRef}] ->
+ {ok, #http_bind{pid = FsmRef}} ->
gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}),
{200, ?HEADER, "<body type='terminate' "
"condition='undefined-condition' "
@@ -263,7 +284,7 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
end,
XmppVersion = exmpp_xml:get_attribute_from_list_as_list(Attrs, ?NS_BOSH, version, ""),
?DEBUG("Create session: ~p", [Sid]),
- mnesia:transaction(
+ mnesia:async_dirty(
fun() ->
mnesia:write(
#http_bind{id = Sid,
@@ -320,6 +341,7 @@ init([Sid, Key, IP]) ->
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event({become_controller, C2SPid}, StateName, StateData) ->
+ erlang:monitor(process, C2SPid),
case StateData#state.input of
cancel ->
{next_state, StateName, StateData#state{
@@ -384,6 +406,14 @@ handle_sync_event({stop,close}, _From, _StateName, StateData) ->
handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) ->
Reply = ok,
{stop, normal, Reply, StateData};
+handle_sync_event(deactivate_socket, _From, StateName, StateData) ->
+ %% Input = case StateData#state.input of
+ %% cancel ->
+ %% queue:new();
+ %% Q ->
+ %% Q
+ %% end,
+ {reply, ok, StateName, StateData#state{waiting_input = false}};
handle_sync_event({stop,Reason}, _From, _StateName, StateData) ->
?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
Reply = ok,
@@ -517,6 +547,9 @@ handle_info({timeout, ShaperTimer, _}, StateName,
#state{shaper_timer = ShaperTimer} = StateData) ->
{next_state, StateName, StateData#state{shaper_timer = undefined}};
+handle_info({'DOWN', _MRef, process, C2SPid, _}, _StateName,
+ #state{waiting_input = C2SPid} = StateData) ->
+ {stop, normal, StateData};
handle_info(_, StateName, StateData) ->
{next_state, StateName, StateData}.
@@ -788,10 +821,10 @@ handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
?DEBUG("Looking for session: ~p", [Sid]),
- case mnesia:dirty_read({http_bind, Sid}) of
- [] ->
+ case get_session(Sid) of
+ {error, _} ->
{error, not_exists};
- [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
+ {ok, #http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess}->
NewStream =
case StreamStart of
true ->
@@ -1305,7 +1338,36 @@ check_default_xmlns(#xmlel{name = Name, ns = Xmlns, attrs = Attrs, children = El
%% Print a warning in log file if this is not the case.
check_bind_module(XmppDomain) ->
case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
- true -> ok;
+ true -> true;
false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n"
- "Check your 'modules' section in your ejabberd configuration file.",[])
+ "Check your 'modules' section in your ejabberd configuration file.",[]),
+ false
+ end.
+
+make_sid() ->
+ sha:sha(term_to_binary({now(), make_ref()}))
+ ++ "-" ++ ejabberd_cluster:node_id().
+
+get_session(SID) ->
+ case string:tokens(SID, "-") of
+ [_, NodeID] ->
+ case ejabberd_cluster:get_node_by_id(NodeID) of
+ Node when Node == node() ->
+ case mnesia:dirty_read({http_bind, SID}) of
+ [] ->
+ {error, enoent};
+ [Session] ->
+ {ok, Session}
+ end;
+ Node ->
+ case catch rpc:call(Node, mnesia, dirty_read,
+ [{http_bind, SID}], 5000) of
+ [Session] ->
+ {ok, Session};
+ _ ->
+ {error, enoent}
+ end
+ end;
+ _ ->
+ {error, enoent}
end.
diff --git a/src/web/ejabberd_http_poll.erl b/src/web/ejabberd_http_poll.erl
index e1af0ad82..66685b413 100644
--- a/src/web/ejabberd_http_poll.erl
+++ b/src/web/ejabberd_http_poll.erl
@@ -83,9 +83,12 @@
%%% API
%%%----------------------------------------------------------------------
start(ID, Key, IP) ->
+ update_tables(),
mnesia:create_table(http_poll,
[{ram_copies, [node()]},
+ {local_content, true},
{attributes, record_info(fields, http_poll)}]),
+ mnesia:add_table_copy(http_poll, node(), ram_copies),
supervisor:start_child(ejabberd_http_poll_sup, [ID, Key, IP]).
start_link(ID, Key, IP) ->
@@ -121,9 +124,9 @@ process([], #request{data = Data,
{ok, ID1, Key, NewKey, Packet} ->
ID = if
(ID1 == "0") or (ID1 == "mobile") ->
- NewID = sha:sha(term_to_binary({now(), make_ref()})),
+ NewID = make_sid(),
{ok, Pid} = start(NewID, "", IP),
- mnesia:transaction(
+ mnesia:async_dirty(
fun() ->
mnesia:write(#http_poll{id = NewID,
pid = Pid})
@@ -358,7 +361,7 @@ handle_info(_, StateName, StateData) ->
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, _StateName, StateData) ->
- mnesia:transaction(
+ mnesia:async_dirty(
fun() ->
mnesia:delete({http_poll, StateData#state.id})
end),
@@ -383,19 +386,19 @@ terminate(_Reason, _StateName, StateData) ->
%%%----------------------------------------------------------------------
http_put(ID, Key, NewKey, Packet) ->
- case mnesia:dirty_read({http_poll, ID}) of
- [] ->
+ case get_session(ID) of
+ {error, _} ->
{error, not_exists};
- [#http_poll{pid = FsmRef}] ->
+ {ok, #http_poll{pid = FsmRef}} ->
gen_fsm:sync_send_all_state_event(
FsmRef, {http_put, Key, NewKey, Packet})
end.
http_get(ID) ->
- case mnesia:dirty_read({http_poll, ID}) of
- [] ->
+ case get_session(ID) of
+ {error, _} ->
{error, not_exists};
- [#http_poll{pid = FsmRef}] ->
+ {ok, #http_poll{pid = FsmRef}} ->
gen_fsm:sync_send_all_state_event(FsmRef, http_get)
end.
@@ -461,3 +464,39 @@ get_jid("to", ParsedPacket) ->
From ->
exmpp_jid:parse(From)
end.
+
+update_tables() ->
+ case catch mnesia:table_info(http_poll, local_content) of
+ false ->
+ mnesia:delete_table(http_poll);
+ _ ->
+ ok
+ end.
+
+make_sid() ->
+ sha:sha(term_to_binary({now(), make_ref()}))
+ ++ "-" ++ ejabberd_cluster:node_id().
+
+get_session(SID) ->
+ case string:tokens(SID, "-") of
+ [_, NodeID] ->
+ case ejabberd_cluster:get_node_by_id(NodeID) of
+ Node when Node == node() ->
+ case mnesia:dirty_read({http_poll, SID}) of
+ [] ->
+ {error, enoent};
+ [Session] ->
+ {ok, Session}
+ end;
+ Node ->
+ case catch rpc:call(Node, mnesia, dirty_read,
+ [{http_poll, SID}], 5000) of
+ [Session] ->
+ {ok, Session};
+ _ ->
+ {error, enoent}
+ end
+ end;
+ _ ->
+ {error, enoent}
+ end.
diff --git a/src/web/mod_http_bind.erl b/src/web/mod_http_bind.erl
index a5ff62fa6..0c422b812 100644
--- a/src/web/mod_http_bind.erl
+++ b/src/web/mod_http_bind.erl
@@ -137,7 +137,9 @@ setup_database() ->
migrate_database(),
mnesia:create_table(http_bind,
[{ram_copies, [node()]},
- {attributes, record_info(fields, http_bind)}]).
+ {local_content, true},
+ {attributes, record_info(fields, http_bind)}]),
+ mnesia:add_table_copy(http_bind, node(), ram_copies).
migrate_database() ->
case catch mnesia:table_info(http_bind, attributes) of
@@ -147,4 +149,10 @@ migrate_database() ->
%% Since the stored information is not important, instead
%% of actually migrating data, let's just destroy the table
mnesia:delete_table(http_bind)
+ end,
+ case catch mnesia:table_info(http_bind, local_content) of
+ false ->
+ mnesia:delete_table(http_bind);
+ _ ->
+ ok
end.