diff options
author | Alexey Shchepin <alexey@process-one.net> | 2012-11-06 19:58:08 +0400 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2014-07-10 13:15:15 +0400 |
commit | a4b02c38db131285aa06da9ecad743f219781d6f (patch) | |
tree | 7e78dba5be6939ad5b8b92cb4c6c15a57f9d26a3 /src/mod_roster.erl | |
parent | 47763c10e3072208d762b4808ab8d66d02628f16 (diff) |
Updated riak support
Diffstat (limited to 'src/mod_roster.erl')
-rw-r--r-- | src/mod_roster.erl | 363 |
1 files changed, 355 insertions, 8 deletions
diff --git a/src/mod_roster.erl b/src/mod_roster.erl index 01646229f..8ef70eb59 100644 --- a/src/mod_roster.erl +++ b/src/mod_roster.erl @@ -204,6 +204,13 @@ read_roster_version(LUser, LServer, odbc) -> of {selected, [<<"version">>], [[Version]]} -> Version; {selected, [<<"version">>], []} -> error + end; +read_roster_version(LServer, LUser, riak) -> + Username = LUser, + case ejabberd_riak:get(LServer, <<"roster_version">>, + Username) of + {ok, Version} -> Version; + {error, notfound} -> error end. write_roster_version(LUser, LServer) -> @@ -239,7 +246,11 @@ write_roster_version(LUser, LServer, InTransaction, Ver, odbc_queries:set_roster_version(Username, EVer) end) - end. + end; +write_roster_version(LUser, LServer, _InTransaction, Ver, + riak) -> + Username = LUser, + riak_set_roster_version(LServer, Username, Ver). %% Load roster from DB only if neccesary. %% It is neccesary if @@ -388,6 +399,37 @@ get_roster(LUser, LServer, odbc) -> Items), RItems; _ -> [] + end; +get_roster(LUser, LServer, riak) -> + Username = LUser, + case catch riak_get_roster(LServer, Username) of + {ok, Items} when is_list(Items) -> + JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of + {ok, JGrps} when is_list(JGrps) -> + JGrps; + _ -> + [] + end, + GroupsDict = dict:from_list(JIDGroups), + RItems = lists:flatmap( + fun(I) -> + case riak_raw_to_record(LServer, I) of + %% Bad JID in database: + error -> + []; + R -> + SJID = jlib:jid_to_string(R#roster.jid), + Groups = + case dict:find(SJID, GroupsDict) of + {ok, Gs} -> Gs; + error -> [] + end, + [R#roster{groups = Groups}] + end + end, Items), + RItems; + _ -> + [] end. item_to_xml(Item) -> @@ -455,6 +497,31 @@ get_roster_by_jid_t(LUser, LServer, LJID, odbc) -> R#roster{usj = {LUser, LServer, LJID}, us = {LUser, LServer}, jid = LJID, name = <<"">>} end + end; +get_roster_by_jid_t(LUser, LServer, LJID, riak) -> + Username = LUser, + SJID = jlib:jid_to_string(LJID), + Res = riak_get_roster_by_jid(LServer, Username, SJID), + case Res of + {error, _} -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID}; + {ok, I} -> + R = riak_raw_to_record(LServer, I), + case R of + %% Bad JID in database: + error -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID}; + _ -> + R#roster{ + usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID, + name = ""} + end end. try_process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) -> @@ -631,8 +698,16 @@ get_subscription_lists(_, LUser, LServer, odbc) -> <<"server">>, <<"subscribe">>, <<"type">>], Items} when is_list(Items) -> - Items; + lists:map(fun(I) -> raw_to_record(LServer, I) end, Items); _ -> [] + end; +get_subscription_lists(_, LUser, LServer, riak) -> + Username = LUser, + case catch riak_get_roster(LServer, Username) of + {ok, Items} when is_list(Items) -> + lists:map(fun(I) -> riak_raw_to_record(LServer, I) end, Items); + _ -> + [] end. fill_subscription_lists(LServer, [#roster{} = I | Is], @@ -671,12 +746,18 @@ roster_subscribe_t(LUser, LServer, LJID, Item, odbc) -> Username = ejabberd_odbc:escape(LUser), SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), odbc_queries:roster_subscribe(LServer, Username, SJID, - ItemVals). + ItemVals); +roster_subscribe_t(LUser, LServer, LJID, Item, riak) -> + ItemVals = riak_record_to_string(Item), + Username = LUser, + SJID = jlib:jid_to_string(LJID), + riak_roster_subscribe(LServer, Username, SJID, ItemVals). transaction(LServer, F) -> case gen_mod:db_type(LServer, ?MODULE) of mnesia -> mnesia:transaction(F); - odbc -> ejabberd_odbc:sql_transaction(LServer, F) + odbc -> ejabberd_odbc:sql_transaction(LServer, F); + riak -> {atomic, F()} end. in_subscription(_, User, Server, JID, Type, Reason) -> @@ -727,6 +808,25 @@ get_roster_by_jid_with_groups_t(LUser, LServer, LJID, []} -> #roster{usj = {LUser, LServer, LJID}, us = {LUser, LServer}, jid = LJID} + end; +get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) -> + Username = LUser, + SJID = jlib:jid_to_string(LJID), + case riak_get_roster_by_jid(LServer, Username, SJID) of + {ok, I} -> + R = riak_raw_to_record(LServer, I), + Groups = + case riak_get_roster_groups(LServer, Username, SJID) of + {ok, JGrps} when is_list(JGrps) -> + JGrps; + _ -> + [] + end, + R#roster{groups = Groups}; + {error, _} -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID} end. process_subscription(Direction, User, Server, JID1, @@ -924,12 +1024,12 @@ in_auto_reply(_, _, _) -> none. remove_user(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), + send_unsubscription_to_rosteritems(LUser, LServer), remove_user(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). remove_user(LUser, LServer, mnesia) -> US = {LUser, LServer}, - send_unsubscription_to_rosteritems(LUser, LServer), F = fun () -> lists:foreach(fun (R) -> mnesia:delete_object(R) end, mnesia:index_read(roster, US, #roster.us)) @@ -937,8 +1037,11 @@ remove_user(LUser, LServer, mnesia) -> mnesia:transaction(F); remove_user(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), - send_unsubscription_to_rosteritems(LUser, LServer), odbc_queries:del_user_roster_t(LServer, Username), + ok; +remove_user(LUser, LServer, riak) -> + Username = LUser, + riak_del_user_roster(LServer, Username), ok. %% For each contact with Subscription: @@ -1009,7 +1112,15 @@ update_roster_t(LUser, LServer, LJID, Item, odbc) -> SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), ItemVals = record_to_string(Item), ItemGroups = groups_to_string(Item), - odbc_queries:update_roster(LServer, Username, SJID, ItemVals, ItemGroups). + odbc_queries:update_roster(LServer, Username, SJID, ItemVals, + ItemGroups); +update_roster_t(LUser, LServer, LJID, Item, riak) -> + Username = LUser, + SJID = jlib:jid_to_string(LJID), + ItemVals = riak_record_to_string(Item), + ItemGroups = riak_groups_to_binary(Item), + riak_update_roster( + LServer, Username, SJID, ItemVals, ItemGroups). del_roster_t(LUser, LServer, LJID) -> DBType = gen_mod:db_type(LServer, ?MODULE), @@ -1020,7 +1131,11 @@ del_roster_t(LUser, LServer, LJID, mnesia) -> del_roster_t(LUser, LServer, LJID, odbc) -> Username = ejabberd_odbc:escape(LUser), SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), - odbc_queries:del_roster(LServer, Username, SJID). + odbc_queries:del_roster(LServer, Username, SJID); +del_roster_t(LUser, LServer, LJID, riak) -> + Username = LUser, + SJID = jlib:jid_to_string(LJID), + riak_del_roster(LServer, Username, SJID). process_item_set_t(LUser, LServer, #xmlel{attrs = Attrs, children = Els}) -> @@ -1161,6 +1276,44 @@ get_in_pending_subscriptions(Ls, User, Server, odbc) -> end, Items)); _ -> Ls + end; +get_in_pending_subscriptions(Ls, User, Server, riak) -> + JID = jlib:make_jid(User, Server, <<"">>), + LUser = JID#jid.luser, + LServer = JID#jid.lserver, + Username = LUser, + case catch riak_get_roster(LServer, Username) of + {ok, Items} when is_list(Items) -> + Ls ++ lists:map( + fun(R) -> + Message = R#roster.askmessage, + #xmlel{name = <<"presence">>, + attrs = [{<<"from">>, + jlib:jid_to_string(R#roster.jid)}, + {<<"to">>, jlib:jid_to_string(JID)}, + {<<"type">>, <<"subscribe">>}], + children = [#xmlel{name = <<"status">>, + attrs = [], + children = + [{xmlcdata, Message}]}]} + end, + lists:flatmap( + fun(I) -> + case riak_raw_to_record(LServer, I) of + %% Bad JID in database: + error -> + []; + R -> + case R#roster.ask of + in -> [R]; + both -> [R]; + _ -> [] + end + end + end, + Items)); + _ -> + Ls end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -1205,6 +1358,21 @@ read_subscription_and_groups(LUser, LServer, LJID, end, {Subscription, Groups}; _ -> error + end; +read_subscription_and_groups(LUser, LServer, LJID, + riak) -> + Username = LUser, + SJID = jlib:jid_to_string(LJID), + case catch riak_get_subscription(LServer, Username, SJID) of + {ok, Subscription} -> + Groups = case riak_get_roster_jid_groups(LServer, Username) of + {ok, JGrps} when is_list(JGrps) -> + JGrps; + _ -> + [] + end, + {Subscription, Groups}; + _ -> error end. get_jid_info(_, User, Server, JID) -> @@ -1695,3 +1863,182 @@ import(_LServer, mnesia, #roster_version{} = RV) -> mnesia:dirty_write(RV); import(_, _, _) -> pass. + +riak_get_roster(LServer, Username) -> + ejabberd_riak:get_by_index( + LServer, <<"roster">>, <<"user_bin">>, Username). + +riak_get_roster_jid_groups(LServer, Username) -> + case ejabberd_riak:get_by_index( + LServer, <<"roster_groups">>, <<"user_bin">>, Username) of + {ok, JGs} -> + Res = lists:map(fun riak_binary_to_groups/1, JGs), + {ok, Res}; + Error -> Error + end. + +riak_get_roster_groups(LServer, Username, SJID) -> + Key = <<Username/binary, $/, SJID/binary>>, + case ejabberd_riak:get(LServer, <<"roster_groups">>, Key) of + {ok, Gs} -> + {_, Res} = riak_binary_to_groups(Gs), + {ok, Res}; + {error, notfound} -> + {ok, []}; + Error -> Error + end. + +riak_get_roster_by_jid(LServer, Username, SJID) -> + Key = <<Username/binary, $/, SJID/binary>>, + ejabberd_riak:get(LServer, <<"roster">>, Key). + +riak_del_roster(LServer, Username, SJID) -> + Key = <<Username/binary, $/, SJID/binary>>, + ejabberd_riak:delete(LServer, <<"roster">>, Key). + +riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups) -> + Key = <<Username/binary, $/, SJID/binary>>, + ejabberd_riak:put( + LServer, <<"roster">>, Key, ItemVals, + [{<<"user_bin">>, Username}]), + ejabberd_riak:put( + LServer, <<"roster_groups">>, Key, ItemGroups, + [{<<"user_bin">>, Username}]). + +riak_roster_subscribe(LServer, Username, SJID, ItemVals) -> + Key = <<Username/binary, $/, SJID/binary>>, + ejabberd_riak:put( + LServer, <<"roster">>, Key, ItemVals, + [{<<"user_bin">>, Username}]). + +riak_get_subscription(LServer, Username, SJID) -> + case riak_get_roster_by_jid(LServer, Username, SJID) of + {ok, SR} -> + case riak_raw_to_record(LServer, SR) of + error -> + {error, bad_record}; + R -> + {ok, R#roster.subscription} + end; + Error -> + Error + end. + +riak_set_roster_version(LServer, Username, RosterVersion) -> + ejabberd_riak:put(LServer, <<"roster_version">>, + Username, RosterVersion). + + +riak_del_user_roster(LServer, Username) -> + case ejabberd_riak:get_keys_by_index( + LServer, <<"roster">>, <<"user_bin">>, Username) of + {ok, Keys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"roster">>, Key) + end, Keys); + _ -> + ok + end, + case ejabberd_riak:get_keys_by_index( + LServer, <<"roster_groups">>, <<"user_bin">>, Username) of + {ok, GKeys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"roster_groups">>, Key) + end, GKeys); + _ -> + ok + end, + ejabberd_riak:delete(LServer, <<"roster_version">>, Username). + +riak_raw_to_record(LServer, + <<UsernameLen:16, Username:UsernameLen/binary, + SJIDLen:16, SJID:SJIDLen/binary, + NickLen:16, Nick:NickLen/binary, + SSubscription, SAsk, + SAskMessageLen:16, SAskMessage:SAskMessageLen/binary>>) -> + User = Username, + case jlib:string_to_jid(SJID) of + error -> + error; + JID -> + LJID = jlib:jid_tolower(JID), + Subscription = case SSubscription of + $B -> both; + $T -> to; + $F -> from; + _ -> none + end, + Ask = case SAsk of + $S -> subscribe; + $U -> unsubscribe; + $B -> both; + $O -> out; + $I -> in; + _ -> none + end, + #roster{usj = {User, LServer, LJID}, + us = {User, LServer}, + jid = LJID, + name = Nick, + subscription = Subscription, + ask = Ask, + askmessage = SAskMessage} + end. + +riak_record_to_string(#roster{us = {User, _Server}, + jid = JID, + name = Name, + subscription = Subscription, + ask = Ask, + askmessage = AskMessage}) -> + Username = User, + UsernameLen = size(Username), + SJID = jlib:jid_to_string(jlib:jid_tolower(JID)), + SJIDLen = size(SJID), + Nick = Name, + NickLen = size(Nick), + SSubscription = case Subscription of + both -> $B; + to -> $T; + from -> $F; + none -> $N + end, + SAsk = case Ask of + subscribe -> $S; + unsubscribe -> $U; + both -> $B; + out -> $O; + in -> $I; + none -> $N + end, + SAskMessage = iolist_to_binary(AskMessage), + SAskMessageLen = size(SAskMessage), + <<UsernameLen:16, Username/binary, + SJIDLen:16, SJID/binary, + NickLen:16, Nick/binary, + SSubscription, SAsk, + SAskMessageLen:16, SAskMessage/binary>>. + +riak_groups_to_binary(#roster{jid = JID, groups = Groups}) -> + SJID = jlib:jid_to_string(jlib:jid_tolower(JID)), + SJIDLen = size(SJID), + %% Empty groups do not need to be converted to string to be inserted in + %% the database + lists:foldl( + fun([], Acc) -> + Acc; + (Group, Acc) -> + G = Group, + Len = size(G), + <<Acc/binary, Len:16, G/binary>> + end, <<SJIDLen:16, SJID/binary>>, Groups). + +riak_binary_to_groups(<<Len:16, SJID:Len/binary, Rest/binary>>) -> + {SJID, riak_binary_to_groups(Rest, [])}. + +riak_binary_to_groups(<<Len:16, G:Len/binary, Rest/binary>>, Res) -> + riak_binary_to_groups(Rest, [G | Res]); +riak_binary_to_groups(_, Res) -> + Res. |