Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexey Shchepin <alexey@process-one.net>2012-11-06 19:58:08 +0400
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2014-07-10 13:15:15 +0400
commita4b02c38db131285aa06da9ecad743f219781d6f (patch)
tree7e78dba5be6939ad5b8b92cb4c6c15a57f9d26a3 /src/mod_roster.erl
parent47763c10e3072208d762b4808ab8d66d02628f16 (diff)
Updated riak support
Diffstat (limited to 'src/mod_roster.erl')
-rw-r--r--src/mod_roster.erl363
1 files changed, 355 insertions, 8 deletions
diff --git a/src/mod_roster.erl b/src/mod_roster.erl
index 01646229f..8ef70eb59 100644
--- a/src/mod_roster.erl
+++ b/src/mod_roster.erl
@@ -204,6 +204,13 @@ read_roster_version(LUser, LServer, odbc) ->
of
{selected, [<<"version">>], [[Version]]} -> Version;
{selected, [<<"version">>], []} -> error
+ end;
+read_roster_version(LServer, LUser, riak) ->
+ Username = LUser,
+ case ejabberd_riak:get(LServer, <<"roster_version">>,
+ Username) of
+ {ok, Version} -> Version;
+ {error, notfound} -> error
end.
write_roster_version(LUser, LServer) ->
@@ -239,7 +246,11 @@ write_roster_version(LUser, LServer, InTransaction, Ver,
odbc_queries:set_roster_version(Username,
EVer)
end)
- end.
+ end;
+write_roster_version(LUser, LServer, _InTransaction, Ver,
+ riak) ->
+ Username = LUser,
+ riak_set_roster_version(LServer, Username, Ver).
%% Load roster from DB only if neccesary.
%% It is neccesary if
@@ -388,6 +399,37 @@ get_roster(LUser, LServer, odbc) ->
Items),
RItems;
_ -> []
+ end;
+get_roster(LUser, LServer, riak) ->
+ Username = LUser,
+ case catch riak_get_roster(LServer, Username) of
+ {ok, Items} when is_list(Items) ->
+ JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of
+ {ok, JGrps} when is_list(JGrps) ->
+ JGrps;
+ _ ->
+ []
+ end,
+ GroupsDict = dict:from_list(JIDGroups),
+ RItems = lists:flatmap(
+ fun(I) ->
+ case riak_raw_to_record(LServer, I) of
+ %% Bad JID in database:
+ error ->
+ [];
+ R ->
+ SJID = jlib:jid_to_string(R#roster.jid),
+ Groups =
+ case dict:find(SJID, GroupsDict) of
+ {ok, Gs} -> Gs;
+ error -> []
+ end,
+ [R#roster{groups = Groups}]
+ end
+ end, Items),
+ RItems;
+ _ ->
+ []
end.
item_to_xml(Item) ->
@@ -455,6 +497,31 @@ get_roster_by_jid_t(LUser, LServer, LJID, odbc) ->
R#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer}, jid = LJID, name = <<"">>}
end
+ end;
+get_roster_by_jid_t(LUser, LServer, LJID, riak) ->
+ Username = LUser,
+ SJID = jlib:jid_to_string(LJID),
+ Res = riak_get_roster_by_jid(LServer, Username, SJID),
+ case Res of
+ {error, _} ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID};
+ {ok, I} ->
+ R = riak_raw_to_record(LServer, I),
+ case R of
+ %% Bad JID in database:
+ error ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID};
+ _ ->
+ R#roster{
+ usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID,
+ name = ""}
+ end
end.
try_process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) ->
@@ -631,8 +698,16 @@ get_subscription_lists(_, LUser, LServer, odbc) ->
<<"server">>, <<"subscribe">>, <<"type">>],
Items}
when is_list(Items) ->
- Items;
+ lists:map(fun(I) -> raw_to_record(LServer, I) end, Items);
_ -> []
+ end;
+get_subscription_lists(_, LUser, LServer, riak) ->
+ Username = LUser,
+ case catch riak_get_roster(LServer, Username) of
+ {ok, Items} when is_list(Items) ->
+ lists:map(fun(I) -> riak_raw_to_record(LServer, I) end, Items);
+ _ ->
+ []
end.
fill_subscription_lists(LServer, [#roster{} = I | Is],
@@ -671,12 +746,18 @@ roster_subscribe_t(LUser, LServer, LJID, Item, odbc) ->
Username = ejabberd_odbc:escape(LUser),
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
odbc_queries:roster_subscribe(LServer, Username, SJID,
- ItemVals).
+ ItemVals);
+roster_subscribe_t(LUser, LServer, LJID, Item, riak) ->
+ ItemVals = riak_record_to_string(Item),
+ Username = LUser,
+ SJID = jlib:jid_to_string(LJID),
+ riak_roster_subscribe(LServer, Username, SJID, ItemVals).
transaction(LServer, F) ->
case gen_mod:db_type(LServer, ?MODULE) of
mnesia -> mnesia:transaction(F);
- odbc -> ejabberd_odbc:sql_transaction(LServer, F)
+ odbc -> ejabberd_odbc:sql_transaction(LServer, F);
+ riak -> {atomic, F()}
end.
in_subscription(_, User, Server, JID, Type, Reason) ->
@@ -727,6 +808,25 @@ get_roster_by_jid_with_groups_t(LUser, LServer, LJID,
[]} ->
#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer}, jid = LJID}
+ end;
+get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) ->
+ Username = LUser,
+ SJID = jlib:jid_to_string(LJID),
+ case riak_get_roster_by_jid(LServer, Username, SJID) of
+ {ok, I} ->
+ R = riak_raw_to_record(LServer, I),
+ Groups =
+ case riak_get_roster_groups(LServer, Username, SJID) of
+ {ok, JGrps} when is_list(JGrps) ->
+ JGrps;
+ _ ->
+ []
+ end,
+ R#roster{groups = Groups};
+ {error, _} ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID}
end.
process_subscription(Direction, User, Server, JID1,
@@ -924,12 +1024,12 @@ in_auto_reply(_, _, _) -> none.
remove_user(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
+ send_unsubscription_to_rosteritems(LUser, LServer),
remove_user(LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
remove_user(LUser, LServer, mnesia) ->
US = {LUser, LServer},
- send_unsubscription_to_rosteritems(LUser, LServer),
F = fun () ->
lists:foreach(fun (R) -> mnesia:delete_object(R) end,
mnesia:index_read(roster, US, #roster.us))
@@ -937,8 +1037,11 @@ remove_user(LUser, LServer, mnesia) ->
mnesia:transaction(F);
remove_user(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
- send_unsubscription_to_rosteritems(LUser, LServer),
odbc_queries:del_user_roster_t(LServer, Username),
+ ok;
+remove_user(LUser, LServer, riak) ->
+ Username = LUser,
+ riak_del_user_roster(LServer, Username),
ok.
%% For each contact with Subscription:
@@ -1009,7 +1112,15 @@ update_roster_t(LUser, LServer, LJID, Item, odbc) ->
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
ItemVals = record_to_string(Item),
ItemGroups = groups_to_string(Item),
- odbc_queries:update_roster(LServer, Username, SJID, ItemVals, ItemGroups).
+ odbc_queries:update_roster(LServer, Username, SJID, ItemVals,
+ ItemGroups);
+update_roster_t(LUser, LServer, LJID, Item, riak) ->
+ Username = LUser,
+ SJID = jlib:jid_to_string(LJID),
+ ItemVals = riak_record_to_string(Item),
+ ItemGroups = riak_groups_to_binary(Item),
+ riak_update_roster(
+ LServer, Username, SJID, ItemVals, ItemGroups).
del_roster_t(LUser, LServer, LJID) ->
DBType = gen_mod:db_type(LServer, ?MODULE),
@@ -1020,7 +1131,11 @@ del_roster_t(LUser, LServer, LJID, mnesia) ->
del_roster_t(LUser, LServer, LJID, odbc) ->
Username = ejabberd_odbc:escape(LUser),
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
- odbc_queries:del_roster(LServer, Username, SJID).
+ odbc_queries:del_roster(LServer, Username, SJID);
+del_roster_t(LUser, LServer, LJID, riak) ->
+ Username = LUser,
+ SJID = jlib:jid_to_string(LJID),
+ riak_del_roster(LServer, Username, SJID).
process_item_set_t(LUser, LServer,
#xmlel{attrs = Attrs, children = Els}) ->
@@ -1161,6 +1276,44 @@ get_in_pending_subscriptions(Ls, User, Server, odbc) ->
end,
Items));
_ -> Ls
+ end;
+get_in_pending_subscriptions(Ls, User, Server, riak) ->
+ JID = jlib:make_jid(User, Server, <<"">>),
+ LUser = JID#jid.luser,
+ LServer = JID#jid.lserver,
+ Username = LUser,
+ case catch riak_get_roster(LServer, Username) of
+ {ok, Items} when is_list(Items) ->
+ Ls ++ lists:map(
+ fun(R) ->
+ Message = R#roster.askmessage,
+ #xmlel{name = <<"presence">>,
+ attrs = [{<<"from">>,
+ jlib:jid_to_string(R#roster.jid)},
+ {<<"to">>, jlib:jid_to_string(JID)},
+ {<<"type">>, <<"subscribe">>}],
+ children = [#xmlel{name = <<"status">>,
+ attrs = [],
+ children =
+ [{xmlcdata, Message}]}]}
+ end,
+ lists:flatmap(
+ fun(I) ->
+ case riak_raw_to_record(LServer, I) of
+ %% Bad JID in database:
+ error ->
+ [];
+ R ->
+ case R#roster.ask of
+ in -> [R];
+ both -> [R];
+ _ -> []
+ end
+ end
+ end,
+ Items));
+ _ ->
+ Ls
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1205,6 +1358,21 @@ read_subscription_and_groups(LUser, LServer, LJID,
end,
{Subscription, Groups};
_ -> error
+ end;
+read_subscription_and_groups(LUser, LServer, LJID,
+ riak) ->
+ Username = LUser,
+ SJID = jlib:jid_to_string(LJID),
+ case catch riak_get_subscription(LServer, Username, SJID) of
+ {ok, Subscription} ->
+ Groups = case riak_get_roster_jid_groups(LServer, Username) of
+ {ok, JGrps} when is_list(JGrps) ->
+ JGrps;
+ _ ->
+ []
+ end,
+ {Subscription, Groups};
+ _ -> error
end.
get_jid_info(_, User, Server, JID) ->
@@ -1695,3 +1863,182 @@ import(_LServer, mnesia, #roster_version{} = RV) ->
mnesia:dirty_write(RV);
import(_, _, _) ->
pass.
+
+riak_get_roster(LServer, Username) ->
+ ejabberd_riak:get_by_index(
+ LServer, <<"roster">>, <<"user_bin">>, Username).
+
+riak_get_roster_jid_groups(LServer, Username) ->
+ case ejabberd_riak:get_by_index(
+ LServer, <<"roster_groups">>, <<"user_bin">>, Username) of
+ {ok, JGs} ->
+ Res = lists:map(fun riak_binary_to_groups/1, JGs),
+ {ok, Res};
+ Error -> Error
+ end.
+
+riak_get_roster_groups(LServer, Username, SJID) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ case ejabberd_riak:get(LServer, <<"roster_groups">>, Key) of
+ {ok, Gs} ->
+ {_, Res} = riak_binary_to_groups(Gs),
+ {ok, Res};
+ {error, notfound} ->
+ {ok, []};
+ Error -> Error
+ end.
+
+riak_get_roster_by_jid(LServer, Username, SJID) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:get(LServer, <<"roster">>, Key).
+
+riak_del_roster(LServer, Username, SJID) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:delete(LServer, <<"roster">>, Key).
+
+riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:put(
+ LServer, <<"roster">>, Key, ItemVals,
+ [{<<"user_bin">>, Username}]),
+ ejabberd_riak:put(
+ LServer, <<"roster_groups">>, Key, ItemGroups,
+ [{<<"user_bin">>, Username}]).
+
+riak_roster_subscribe(LServer, Username, SJID, ItemVals) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:put(
+ LServer, <<"roster">>, Key, ItemVals,
+ [{<<"user_bin">>, Username}]).
+
+riak_get_subscription(LServer, Username, SJID) ->
+ case riak_get_roster_by_jid(LServer, Username, SJID) of
+ {ok, SR} ->
+ case riak_raw_to_record(LServer, SR) of
+ error ->
+ {error, bad_record};
+ R ->
+ {ok, R#roster.subscription}
+ end;
+ Error ->
+ Error
+ end.
+
+riak_set_roster_version(LServer, Username, RosterVersion) ->
+ ejabberd_riak:put(LServer, <<"roster_version">>,
+ Username, RosterVersion).
+
+
+riak_del_user_roster(LServer, Username) ->
+ case ejabberd_riak:get_keys_by_index(
+ LServer, <<"roster">>, <<"user_bin">>, Username) of
+ {ok, Keys} ->
+ lists:foreach(
+ fun(Key) ->
+ ejabberd_riak:delete(LServer, <<"roster">>, Key)
+ end, Keys);
+ _ ->
+ ok
+ end,
+ case ejabberd_riak:get_keys_by_index(
+ LServer, <<"roster_groups">>, <<"user_bin">>, Username) of
+ {ok, GKeys} ->
+ lists:foreach(
+ fun(Key) ->
+ ejabberd_riak:delete(LServer, <<"roster_groups">>, Key)
+ end, GKeys);
+ _ ->
+ ok
+ end,
+ ejabberd_riak:delete(LServer, <<"roster_version">>, Username).
+
+riak_raw_to_record(LServer,
+ <<UsernameLen:16, Username:UsernameLen/binary,
+ SJIDLen:16, SJID:SJIDLen/binary,
+ NickLen:16, Nick:NickLen/binary,
+ SSubscription, SAsk,
+ SAskMessageLen:16, SAskMessage:SAskMessageLen/binary>>) ->
+ User = Username,
+ case jlib:string_to_jid(SJID) of
+ error ->
+ error;
+ JID ->
+ LJID = jlib:jid_tolower(JID),
+ Subscription = case SSubscription of
+ $B -> both;
+ $T -> to;
+ $F -> from;
+ _ -> none
+ end,
+ Ask = case SAsk of
+ $S -> subscribe;
+ $U -> unsubscribe;
+ $B -> both;
+ $O -> out;
+ $I -> in;
+ _ -> none
+ end,
+ #roster{usj = {User, LServer, LJID},
+ us = {User, LServer},
+ jid = LJID,
+ name = Nick,
+ subscription = Subscription,
+ ask = Ask,
+ askmessage = SAskMessage}
+ end.
+
+riak_record_to_string(#roster{us = {User, _Server},
+ jid = JID,
+ name = Name,
+ subscription = Subscription,
+ ask = Ask,
+ askmessage = AskMessage}) ->
+ Username = User,
+ UsernameLen = size(Username),
+ SJID = jlib:jid_to_string(jlib:jid_tolower(JID)),
+ SJIDLen = size(SJID),
+ Nick = Name,
+ NickLen = size(Nick),
+ SSubscription = case Subscription of
+ both -> $B;
+ to -> $T;
+ from -> $F;
+ none -> $N
+ end,
+ SAsk = case Ask of
+ subscribe -> $S;
+ unsubscribe -> $U;
+ both -> $B;
+ out -> $O;
+ in -> $I;
+ none -> $N
+ end,
+ SAskMessage = iolist_to_binary(AskMessage),
+ SAskMessageLen = size(SAskMessage),
+ <<UsernameLen:16, Username/binary,
+ SJIDLen:16, SJID/binary,
+ NickLen:16, Nick/binary,
+ SSubscription, SAsk,
+ SAskMessageLen:16, SAskMessage/binary>>.
+
+riak_groups_to_binary(#roster{jid = JID, groups = Groups}) ->
+ SJID = jlib:jid_to_string(jlib:jid_tolower(JID)),
+ SJIDLen = size(SJID),
+ %% Empty groups do not need to be converted to string to be inserted in
+ %% the database
+ lists:foldl(
+ fun([], Acc) ->
+ Acc;
+ (Group, Acc) ->
+ G = Group,
+ Len = size(G),
+ <<Acc/binary, Len:16, G/binary>>
+ end, <<SJIDLen:16, SJID/binary>>, Groups).
+
+riak_binary_to_groups(<<Len:16, SJID:Len/binary, Rest/binary>>) ->
+ {SJID, riak_binary_to_groups(Rest, [])}.
+
+riak_binary_to_groups(<<Len:16, G:Len/binary, Rest/binary>>, Res) ->
+ riak_binary_to_groups(Rest, [G | Res]);
+riak_binary_to_groups(_, Res) ->
+ Res.