Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_riak.erl')
-rw-r--r--src/ejabberd_riak.erl76
1 files changed, 55 insertions, 21 deletions
diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl
index ec9fc4716..d80a77d3e 100644
--- a/src/ejabberd_riak.erl
+++ b/src/ejabberd_riak.erl
@@ -27,9 +27,9 @@
-behaviour(gen_server).
%% API
--export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2,
- get/1, get/2, get_by_index/3, delete/1, delete/2,
- count_by_index/3, get_by_index_range/4,
+-export([start_link/4, get_proc/1, make_bucket/1, put/2, put/3,
+ get/2, get/3, get_by_index/4, delete/1, delete/2,
+ count_by_index/3, get_by_index_range/5,
get_keys/1, get_keys_by_index/3, is_connected/0,
count/1, delete_by_index/3]).
%% For debugging
@@ -50,6 +50,11 @@
-type index_info() :: [{i, any()} | {'2i', [index()]}].
+%% The `record_schema()' is just a tuple:
+%% {record_info(fields, some_record), #some_record{}}
+
+-type record_schema() :: {[atom()], tuple()}.
+
%% The `index_info()' is used in put/delete functions:
%% `i' defines a primary index, `` '2i' '' defines secondary indexes.
%% There must be only one primary index. If `i' is not specified,
@@ -81,19 +86,19 @@ get_proc(I) ->
make_bucket(Table) ->
erlang:atom_to_binary(Table, utf8).
--spec put(tuple()) -> ok | {error, any()}.
+-spec put(tuple(), record_schema()) -> ok | {error, any()}.
%% @equiv put(Record, [])
-put(Record) ->
- ?MODULE:put(Record, []).
+put(Record, RecFields) ->
+ ?MODULE:put(Record, RecFields, []).
--spec put(tuple(), index_info()) -> ok | {error, any()}.
+-spec put(tuple(), record_schema(), index_info()) -> ok | {error, any()}.
%% @doc Stores a record `Rec' with indexes described in ``IndexInfo''
-put(Rec, IndexInfo) ->
+put(Rec, RecSchema, IndexInfo) ->
Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))),
SecIdxs = [encode_index_key(K, V) ||
{K, V} <- proplists:get_value('2i', IndexInfo, [])],
Table = element(1, Rec),
- Value = term_to_binary(Rec),
+ Value = encode_record(Rec, RecSchema),
case put_raw(Table, Key, Value, SecIdxs) of
ok ->
ok;
@@ -118,9 +123,9 @@ get_object_raw(Table, Key) ->
Bucket = make_bucket(Table),
catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
--spec get(atom()) -> {ok, [any()]} | {error, any()}.
+-spec get(atom(), record_schema()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns all objects from table `Table'
-get(Table) ->
+get(Table, RecSchema) ->
Bucket = make_bucket(Table),
case catch riakc_pb_socket:mapred(
get_random_pid(),
@@ -130,7 +135,7 @@ get(Table) ->
{ok, [{_, Objs}]} ->
{ok, lists:flatmap(
fun(Obj) ->
- case catch binary_to_term(Obj) of
+ case catch decode_record(Obj, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Obj)},
log_error(Error, get,
@@ -148,12 +153,12 @@ get(Table) ->
Error
end.
--spec get(atom(), any()) -> {ok, any()} | {error, any()}.
+-spec get(atom(), record_schema(), any()) -> {ok, any()} | {error, any()}.
%% @doc Reads record by `Key' from table `Table'
-get(Table, Key) ->
+get(Table, RecSchema, Key) ->
case get_raw(Table, encode_key(Key)) of
{ok, Val} ->
- case catch binary_to_term(Val) of
+ case catch decode_record(Val, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get, [{table, Table}, {key, Key}]),
@@ -167,15 +172,16 @@ get(Table, Key) ->
Error
end.
--spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}.
+-spec get_by_index(atom(), record_schema(), binary(), any()) ->
+ {ok, [any()]} | {error, any()}.
%% @doc Reads records by `Index' and value `Key' from `Table'
-get_by_index(Table, Index, Key) ->
+get_by_index(Table, RecSchema, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
case get_by_index_raw(Table, NewIndex, NewKey) of
{ok, Vals} ->
{ok, lists:flatmap(
fun(Val) ->
- case catch binary_to_term(Val) of
+ case catch decode_record(Val, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get_by_index,
@@ -197,17 +203,17 @@ get_by_index(Table, Index, Key) ->
Error
end.
--spec get_by_index_range(atom(), binary(), any(), any()) ->
+-spec get_by_index_range(atom(), record_schema(), binary(), any(), any()) ->
{ok, [any()]} | {error, any()}.
%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table'
-get_by_index_range(Table, Index, FromKey, ToKey) ->
+get_by_index_range(Table, RecSchema, Index, FromKey, ToKey) ->
{NewIndex, NewFromKey} = encode_index_key(Index, FromKey),
{NewIndex, NewToKey} = encode_index_key(Index, ToKey),
case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of
{ok, Vals} ->
{ok, lists:flatmap(
fun(Val) ->
- case catch binary_to_term(Val) of
+ case catch decode_record(Val, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get_by_index_range,
@@ -518,3 +524,31 @@ get_random_pid() ->
{'EXIT', Err} ->
throw({error, Err})
end.
+
+encode_record(Rec, {Fields, DefRec}) ->
+ term_to_binary(encode_record(Rec, Fields, DefRec, 2)).
+
+encode_record(Rec, [FieldName|Fields], DefRec, Pos) ->
+ Value = element(Pos, Rec),
+ DefValue = element(Pos, DefRec),
+ if Value == DefValue ->
+ encode_record(Rec, Fields, DefRec, Pos+1);
+ true ->
+ [{FieldName, Value}|encode_record(Rec, Fields, DefRec, Pos+1)]
+ end;
+encode_record(_, [], _, _) ->
+ [].
+
+decode_record(Bin, {Fields, DefRec}) ->
+ decode_record(binary_to_term(Bin), Fields, DefRec, 2).
+
+decode_record(KeyVals, [FieldName|Fields], Rec, Pos) ->
+ case lists:keyfind(FieldName, 1, KeyVals) of
+ {_, Value} ->
+ NewRec = setelement(Pos, Rec, Value),
+ decode_record(KeyVals, Fields, NewRec, Pos+1);
+ false ->
+ decode_record(KeyVals, Fields, Rec, Pos+1)
+ end;
+decode_record(_, [], Rec, _) ->
+ Rec.