Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-05-21 23:21:13 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-05-21 23:21:13 +0300
commitd88e4d495ffc2ae950f77e440aa7c5d06c864309 (patch)
tree6e032ad05ee9f78e164bb73219cc9d2d8b628f96
parent66a4e405e05b87a7e90bb6ed1adf66ec44d87c23 (diff)
Don't store messages via a single process
-rw-r--r--src/mod_offline.erl195
-rw-r--r--src/mod_offline_mnesia.erl27
-rw-r--r--src/mod_offline_riak.erl32
-rw-r--r--src/mod_offline_sql.erl41
-rw-r--r--src/prosody2ejabberd.erl19
-rw-r--r--src/sql_queries.erl11
6 files changed, 106 insertions, 219 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index c06bb8976..2c2c6185a 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -33,14 +33,13 @@
-protocol({xep, 160, '1.0'}).
-protocol({xep, 334, '0.2'}).
--behaviour(gen_server).
-behaviour(gen_mod).
-export([start/2,
stop/1,
reload/3,
store_packet/1,
- store_offline_msg/5,
+ store_offline_msg/1,
c2s_self_presence/1,
get_sm_features/5,
get_sm_identity/5,
@@ -64,9 +63,7 @@
webadmin_user/4,
webadmin_user_parse_query/5]).
--export([init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3,
- mod_opt_type/1, depends/2]).
+-export([mod_opt_type/1, depends/2]).
-deprecated({get_queue_length,2}).
@@ -86,14 +83,11 @@
%% default value for the maximum number of user messages
-define(MAX_USER_MESSAGES, infinity).
--type us() :: {binary(), binary()}.
-type c2s_state() :: ejabberd_c2s:state().
-callback init(binary(), gen_mod:opts()) -> any().
-callback import(#offline_msg{}) -> ok.
--callback store_messages(binary(), us(), [#offline_msg{}],
- non_neg_integer(), non_neg_integer()) ->
- {atomic, any()}.
+-callback store_message(#offline_msg{}) -> ok | {error, any()}.
-callback pop_messages(binary(), binary()) ->
{ok, [#offline_msg{}]} | {error, any()}.
-callback remove_expired_messages(binary()) -> {atomic, any()}.
@@ -108,25 +102,10 @@
-callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
-callback count_messages(binary(), binary()) -> non_neg_integer().
-start(Host, Opts) ->
- gen_mod:start_child(?MODULE, Host, Opts).
-
-stop(Host) ->
- gen_mod:stop_child(?MODULE, Host).
-
-reload(Host, NewOpts, OldOpts) ->
- Proc = gen_mod:get_module_proc(Host, ?MODULE),
- gen_server:cast(Proc, {reload, NewOpts, OldOpts}).
-
depends(_Host, _Opts) ->
[].
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-init([Host, Opts]) ->
- process_flag(trap_exit, true),
+start(Host, Opts) ->
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
Mod:init(Host, Opts),
IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)),
@@ -153,64 +132,9 @@ init([Host, Opts]) ->
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
- ?MODULE, handle_offline_query, IQDisc),
- AccessMaxOfflineMsgs =
- gen_mod:get_opt(access_max_user_messages, Opts,
- max_user_offline_messages),
- {ok,
- #state{host = Host,
- access_max_offline_messages = AccessMaxOfflineMsgs}}.
-
-
-handle_call(stop, _From, State) ->
- {stop, normal, ok, State}.
-
-handle_cast({reload, NewOpts, OldOpts}, #state{host = Host} = State) ->
- NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
- OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
- if NewMod /= OldMod ->
- NewMod:init(Host, NewOpts);
- true ->
- ok
- end,
- case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of
- {false, IQDisc, _} ->
- gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
- ?MODULE, handle_offline_query, IQDisc);
- true ->
- ok
- end,
- case gen_mod:is_equal_opt(access_max_user_messages, NewOpts, OldOpts,
- max_user_offline_messages) of
- {false, AccessMaxOfflineMsgs, _} ->
- {noreply,
- State#state{access_max_offline_messages = AccessMaxOfflineMsgs}};
- true ->
- {noreply, State}
- end;
-handle_cast(Msg, State) ->
- ?WARNING_MSG("unexpected cast: ~p", [Msg]),
- {noreply, State}.
-
+ ?MODULE, handle_offline_query, IQDisc).
-handle_info(#offline_msg{us = UserServer} = Msg, State) ->
- #state{host = Host,
- access_max_offline_messages = AccessMaxOfflineMsgs} = State,
- DBType = gen_mod:db_type(Host, ?MODULE),
- Msgs = receive_all(UserServer, [Msg], DBType),
- Len = length(Msgs),
- MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs,
- UserServer, Host),
- store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs),
- {noreply, State};
-
-handle_info(_Info, State) ->
- ?ERROR_MSG("got unexpected info: ~p", [_Info]),
- {noreply, State}.
-
-
-terminate(_Reason, State) ->
- Host = State#state.host,
+stop(Host) ->
ejabberd_hooks:delete(offline_message_hook, Host,
?MODULE, store_packet, 50),
ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@@ -229,41 +153,48 @@ terminate(_Reason, State) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
- gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE),
- ok.
-
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
-store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) ->
- Mod = gen_mod:db_mod(Host, ?MODULE),
- case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of
- {atomic, discard} ->
- discard_warn_sender(Msgs);
- _ ->
+reload(Host, NewOpts, OldOpts) ->
+ NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
+ OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
+ if NewMod /= OldMod ->
+ NewMod:init(Host, NewOpts);
+ true ->
+ ok
+ end,
+ case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of
+ {false, IQDisc, _} ->
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
+ ?MODULE, handle_offline_query, IQDisc);
+ true ->
ok
end.
-get_max_user_messages(AccessRule, {User, Server}, Host) ->
- case acl:match_rule(
- Host, AccessRule, jid:make(User, Server)) of
+-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
+store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case get_max_user_messages(User, Server) of
+ infinity ->
+ Mod:store_message(Msg);
+ Limit ->
+ Num = count_offline_messages(User, Server),
+ if Num < Limit ->
+ Mod:store_message(Msg);
+ true ->
+ {error, full}
+ end
+ end.
+
+get_max_user_messages(User, Server) ->
+ Access = gen_mod:get_module_opt(Server, ?MODULE, access_max_user_messages,
+ max_user_offline_messages),
+ case acl:match_rule(Server, Access, jid:make(User, Server)) of
Max when is_integer(Max) -> Max;
infinity -> infinity;
_ -> ?MAX_USER_MESSAGES
end.
-receive_all(US, Msgs, DBType) ->
- receive
- #offline_msg{us = US} = Msg ->
- receive_all(US, [Msg | Msgs], DBType)
- after 0 ->
- case DBType of
- mnesia -> Msgs;
- sql -> lists:reverse(Msgs);
- riak -> Msgs
- end
- end.
-
get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
Feats = case Acc of
{result, I} -> I;
@@ -484,14 +415,19 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
NewPacket ->
TimeStamp = p1_time_compat:timestamp(),
Expire = find_x_expire(TimeStamp, NewPacket),
- gen_mod:get_module_proc(To#jid.lserver, ?MODULE) !
- #offline_msg{us = {LUser, LServer},
- timestamp = TimeStamp,
- expire = Expire,
- from = From,
- to = To,
- packet = NewPacket},
- {offlined, NewPacket}
+ OffMsg = #offline_msg{us = {LUser, LServer},
+ timestamp = TimeStamp,
+ expire = Expire,
+ from = From,
+ to = To,
+ packet = NewPacket},
+ case store_offline_msg(OffMsg) of
+ ok ->
+ {offlined, NewPacket};
+ {error, Reason} ->
+ discard_warn_sender(Packet, Reason),
+ stop
+ end
end;
_ -> Acc
end;
@@ -635,15 +571,18 @@ remove_user(User, Server) ->
%% Helper functions:
%% Warn senders that their messages have been discarded:
-discard_warn_sender(Msgs) ->
- lists:foreach(
- fun(#offline_msg{packet = Packet}) ->
- ErrText = <<"Your contact offline message queue is "
- "full. The message has been discarded.">>,
- Lang = xmpp:get_lang(Packet),
- Err = xmpp:err_resource_constraint(ErrText, Lang),
- ejabberd_router:route_error(Packet, Err)
- end, Msgs).
+-spec discard_warn_sender(message(), full | any()) -> ok.
+discard_warn_sender(Packet, full) ->
+ ErrText = <<"Your contact offline message queue is "
+ "full. The message has been discarded.">>,
+ Lang = xmpp:get_lang(Packet),
+ Err = xmpp:err_resource_constraint(ErrText, Lang),
+ ejabberd_router:route_error(Packet, Err);
+discard_warn_sender(Packet, _) ->
+ ErrText = <<"Database failure">>,
+ Lang = xmpp:get_lang(Packet),
+ Err = xmpp:err_internal_server_error(ErrText, Lang),
+ ejabberd_router:route_error(Packet, Err).
webadmin_page(_, Host,
#request{us = _US, path = [<<"user">>, U, <<"queue">>],
@@ -790,11 +729,7 @@ get_queue_length(LUser, LServer) ->
count_offline_messages(LUser, LServer).
get_messages_subset(User, Host, MsgsAll) ->
- Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages,
- max_user_offline_messages),
- MaxOfflineMsgs = case get_max_user_messages(Access,
- User, Host)
- of
+ MaxOfflineMsgs = case get_max_user_messages(User, Host) of
Number when is_integer(Number) -> Number;
_ -> 100
end,
diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl
index d0d0de418..a725ab003 100644
--- a/src/mod_offline_mnesia.erl
+++ b/src/mod_offline_mnesia.erl
@@ -26,7 +26,7 @@
-behaviour(mod_offline).
--export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1]).
@@ -36,8 +36,6 @@
-include("mod_offline.hrl").
-include("logger.hrl").
--define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
-
%%%===================================================================
%%% API
%%%===================================================================
@@ -46,26 +44,9 @@ init(_Host, _Opts) ->
[{disc_only_copies, [node()]}, {type, bag},
{attributes, record_info(fields, offline_msg)}]).
-store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) ->
- F = fun () ->
- Count = if MaxOfflineMsgs =/= infinity ->
- Len + count_mnesia_records(US);
- true -> 0
- end,
- if Count > MaxOfflineMsgs -> discard;
- true ->
- if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
- mnesia:write_lock_table(offline_msg);
- true -> ok
- end,
- lists:foreach(
- fun(#offline_msg{packet = Pkt} = M) ->
- El = xmpp:encode(Pkt),
- mnesia:write(M#offline_msg{packet = El})
- end, Msgs)
- end
- end,
- mnesia:transaction(F).
+store_message(#offline_msg{packet = Pkt} = OffMsg) ->
+ El = xmpp:encode(Pkt),
+ mnesia:dirty_write(OffMsg#offline_msg{packet = El}).
pop_messages(LUser, LServer) ->
US = {LUser, LServer},
diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl
index ffc1450aa..5d0fd1af8 100644
--- a/src/mod_offline_riak.erl
+++ b/src/mod_offline_riak.erl
@@ -26,7 +26,7 @@
-behaviour(mod_offline).
--export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1]).
@@ -40,31 +40,11 @@
init(_Host, _Opts) ->
ok.
-store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) ->
- Count = if MaxOfflineMsgs =/= infinity ->
- Len + count_messages(User, Host);
- true -> 0
- end,
- if
- Count > MaxOfflineMsgs ->
- {atomic, discard};
- true ->
- try
- lists:foreach(
- fun(#offline_msg{us = US,
- packet = Pkt,
- timestamp = TS} = M) ->
- El = xmpp:encode(Pkt),
- ok = ejabberd_riak:put(
- M#offline_msg{packet = El},
- offline_msg_schema(),
- [{i, TS}, {'2i', [{<<"us">>, US}]}])
- end, Msgs),
- {atomic, ok}
- catch _:{badmatch, Err} ->
- {atomic, Err}
- end
- end.
+store_message(#offline_msg{us = US, packet = Pkt, timestamp = TS} = M) ->
+ El = xmpp:encode(Pkt),
+ ejabberd_riak:put(M#offline_msg{packet = El},
+ offline_msg_schema(),
+ [{i, TS}, {'2i', [{<<"us">>, US}]}]).
pop_messages(LUser, LServer) ->
case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl
index a8c587679..48b32be81 100644
--- a/src/mod_offline_sql.erl
+++ b/src/mod_offline_sql.erl
@@ -28,7 +28,7 @@
-behaviour(mod_offline).
--export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1, export/1]).
@@ -44,30 +44,21 @@
init(_Host, _Opts) ->
ok.
-store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) ->
- Count = if MaxOfflineMsgs =/= infinity ->
- Len + count_messages(User, Host);
- true -> 0
- end,
- if Count > MaxOfflineMsgs -> {atomic, discard};
- true ->
- Query = lists:map(
- fun(M) ->
- LUser = (M#offline_msg.to)#jid.luser,
- From = M#offline_msg.from,
- To = M#offline_msg.to,
- Packet = xmpp:set_from_to(
- M#offline_msg.packet, From, To),
- NewPacket = xmpp_util:add_delay_info(
- Packet, jid:make(Host),
- M#offline_msg.timestamp,
- <<"Offline Storage">>),
- XML = fxml:element_to_binary(
- xmpp:encode(NewPacket)),
- sql_queries:add_spool_sql(LUser, XML)
- end,
- Msgs),
- sql_queries:add_spool(Host, Query)
+store_message(#offline_msg{us = {LUser, LServer}} = M) ->
+ From = M#offline_msg.from,
+ To = M#offline_msg.to,
+ Packet = xmpp:set_from_to(M#offline_msg.packet, From, To),
+ NewPacket = xmpp_util:add_delay_info(
+ Packet, jid:make(LServer),
+ M#offline_msg.timestamp,
+ <<"Offline Storage">>),
+ XML = fxml:element_to_binary(
+ xmpp:encode(NewPacket)),
+ case sql_queries:add_spool(LUser, LServer, XML) of
+ {updated, _} ->
+ ok;
+ _ ->
+ {error, db_failure}
end.
pop_messages(LUser, LServer) ->
diff --git a/src/prosody2ejabberd.erl b/src/prosody2ejabberd.erl
index 072da0908..312a177be 100644
--- a/src/prosody2ejabberd.erl
+++ b/src/prosody2ejabberd.erl
@@ -185,15 +185,16 @@ convert_data(_Host, "config", _User, [Data]) ->
convert_data(Host, "offline", User, [Data]) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Host),
- Msgs = lists:flatmap(
- fun({_, RawXML}) ->
- case deserialize(RawXML) of
- [El] -> el_to_offline_msg(LUser, LServer, El);
- _ -> []
- end
- end, Data),
- mod_offline:store_offline_msg(
- LServer, {LUser, LServer}, Msgs, length(Msgs), infinity);
+ lists:foreach(
+ fun({_, RawXML}) ->
+ case deserialize(RawXML) of
+ [El] ->
+ Msg = el_to_offline_msg(LUser, LServer, El),
+ ok = mod_offline:store_offline_msg(Msg);
+ _ ->
+ ok
+ end
+ end, Data);
convert_data(Host, "privacy", User, [Data]) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Host),
diff --git a/src/sql_queries.erl b/src/sql_queries.erl
index 2f2e55863..0cf595bdf 100644
--- a/src/sql_queries.erl
+++ b/src/sql_queries.erl
@@ -37,7 +37,7 @@
set_password_scram_t/6, add_user/3, add_user_scram/6,
del_user/2, del_user_return_password/3, list_users/1,
list_users/2, users_number/1, users_number/2,
- add_spool_sql/2, add_spool/2, get_and_del_spool_msg_t/2,
+ add_spool/3, get_and_del_spool_msg_t/2,
del_spool_msg/2, get_roster/2, get_roster_jid_groups/2,
get_roster_groups/3, del_user_roster_t/2,
get_roster_by_jid/3, get_rostergroup_by_jid/3,
@@ -273,11 +273,10 @@ users_number(LServer, [{prefix, Prefix}])
users_number(LServer, []) ->
users_number(LServer).
-add_spool_sql(LUser, XML) ->
- ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)").
-
-add_spool(LServer, Queries) ->
- ejabberd_sql:sql_transaction(LServer, Queries).
+add_spool(LUser, LServer, XML) ->
+ ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)")).
get_and_del_spool_msg_t(LServer, LUser) ->
F = fun () ->