diff options
Diffstat (limited to 'src/ejabberd_riak.erl')
-rw-r--r-- | src/ejabberd_riak.erl | 76 |
1 files changed, 55 insertions, 21 deletions
diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl index ec9fc4716..d80a77d3e 100644 --- a/src/ejabberd_riak.erl +++ b/src/ejabberd_riak.erl @@ -27,9 +27,9 @@ -behaviour(gen_server). %% API --export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2, - get/1, get/2, get_by_index/3, delete/1, delete/2, - count_by_index/3, get_by_index_range/4, +-export([start_link/4, get_proc/1, make_bucket/1, put/2, put/3, + get/2, get/3, get_by_index/4, delete/1, delete/2, + count_by_index/3, get_by_index_range/5, get_keys/1, get_keys_by_index/3, is_connected/0, count/1, delete_by_index/3]). %% For debugging @@ -50,6 +50,11 @@ -type index_info() :: [{i, any()} | {'2i', [index()]}]. +%% The `record_schema()' is just a tuple: +%% {record_info(fields, some_record), #some_record{}} + +-type record_schema() :: {[atom()], tuple()}. + %% The `index_info()' is used in put/delete functions: %% `i' defines a primary index, `` '2i' '' defines secondary indexes. %% There must be only one primary index. If `i' is not specified, @@ -81,19 +86,19 @@ get_proc(I) -> make_bucket(Table) -> erlang:atom_to_binary(Table, utf8). --spec put(tuple()) -> ok | {error, any()}. +-spec put(tuple(), record_schema()) -> ok | {error, any()}. %% @equiv put(Record, []) -put(Record) -> - ?MODULE:put(Record, []). +put(Record, RecFields) -> + ?MODULE:put(Record, RecFields, []). --spec put(tuple(), index_info()) -> ok | {error, any()}. +-spec put(tuple(), record_schema(), index_info()) -> ok | {error, any()}. %% @doc Stores a record `Rec' with indexes described in ``IndexInfo'' -put(Rec, IndexInfo) -> +put(Rec, RecSchema, IndexInfo) -> Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))), SecIdxs = [encode_index_key(K, V) || {K, V} <- proplists:get_value('2i', IndexInfo, [])], Table = element(1, Rec), - Value = term_to_binary(Rec), + Value = encode_record(Rec, RecSchema), case put_raw(Table, Key, Value, SecIdxs) of ok -> ok; @@ -118,9 +123,9 @@ get_object_raw(Table, Key) -> Bucket = make_bucket(Table), catch riakc_pb_socket:get(get_random_pid(), Bucket, Key). --spec get(atom()) -> {ok, [any()]} | {error, any()}. +-spec get(atom(), record_schema()) -> {ok, [any()]} | {error, any()}. %% @doc Returns all objects from table `Table' -get(Table) -> +get(Table, RecSchema) -> Bucket = make_bucket(Table), case catch riakc_pb_socket:mapred( get_random_pid(), @@ -130,7 +135,7 @@ get(Table) -> {ok, [{_, Objs}]} -> {ok, lists:flatmap( fun(Obj) -> - case catch binary_to_term(Obj) of + case catch decode_record(Obj, RecSchema) of {'EXIT', _} -> Error = {error, make_invalid_object(Obj)}, log_error(Error, get, @@ -148,12 +153,12 @@ get(Table) -> Error end. --spec get(atom(), any()) -> {ok, any()} | {error, any()}. +-spec get(atom(), record_schema(), any()) -> {ok, any()} | {error, any()}. %% @doc Reads record by `Key' from table `Table' -get(Table, Key) -> +get(Table, RecSchema, Key) -> case get_raw(Table, encode_key(Key)) of {ok, Val} -> - case catch binary_to_term(Val) of + case catch decode_record(Val, RecSchema) of {'EXIT', _} -> Error = {error, make_invalid_object(Val)}, log_error(Error, get, [{table, Table}, {key, Key}]), @@ -167,15 +172,16 @@ get(Table, Key) -> Error end. --spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}. +-spec get_by_index(atom(), record_schema(), binary(), any()) -> + {ok, [any()]} | {error, any()}. %% @doc Reads records by `Index' and value `Key' from `Table' -get_by_index(Table, Index, Key) -> +get_by_index(Table, RecSchema, Index, Key) -> {NewIndex, NewKey} = encode_index_key(Index, Key), case get_by_index_raw(Table, NewIndex, NewKey) of {ok, Vals} -> {ok, lists:flatmap( fun(Val) -> - case catch binary_to_term(Val) of + case catch decode_record(Val, RecSchema) of {'EXIT', _} -> Error = {error, make_invalid_object(Val)}, log_error(Error, get_by_index, @@ -197,17 +203,17 @@ get_by_index(Table, Index, Key) -> Error end. --spec get_by_index_range(atom(), binary(), any(), any()) -> +-spec get_by_index_range(atom(), record_schema(), binary(), any(), any()) -> {ok, [any()]} | {error, any()}. %% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table' -get_by_index_range(Table, Index, FromKey, ToKey) -> +get_by_index_range(Table, RecSchema, Index, FromKey, ToKey) -> {NewIndex, NewFromKey} = encode_index_key(Index, FromKey), {NewIndex, NewToKey} = encode_index_key(Index, ToKey), case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of {ok, Vals} -> {ok, lists:flatmap( fun(Val) -> - case catch binary_to_term(Val) of + case catch decode_record(Val, RecSchema) of {'EXIT', _} -> Error = {error, make_invalid_object(Val)}, log_error(Error, get_by_index_range, @@ -518,3 +524,31 @@ get_random_pid() -> {'EXIT', Err} -> throw({error, Err}) end. + +encode_record(Rec, {Fields, DefRec}) -> + term_to_binary(encode_record(Rec, Fields, DefRec, 2)). + +encode_record(Rec, [FieldName|Fields], DefRec, Pos) -> + Value = element(Pos, Rec), + DefValue = element(Pos, DefRec), + if Value == DefValue -> + encode_record(Rec, Fields, DefRec, Pos+1); + true -> + [{FieldName, Value}|encode_record(Rec, Fields, DefRec, Pos+1)] + end; +encode_record(_, [], _, _) -> + []. + +decode_record(Bin, {Fields, DefRec}) -> + decode_record(binary_to_term(Bin), Fields, DefRec, 2). + +decode_record(KeyVals, [FieldName|Fields], Rec, Pos) -> + case lists:keyfind(FieldName, 1, KeyVals) of + {_, Value} -> + NewRec = setelement(Pos, Rec, Value), + decode_record(KeyVals, Fields, NewRec, Pos+1); + false -> + decode_record(KeyVals, Fields, Rec, Pos+1) + end; +decode_record(_, [], Rec, _) -> + Rec. |