diff options
author | Nasreddine Bouafif <nasreddine.bouafif@gmail.com> | 2010-06-11 15:31:50 +0400 |
---|---|---|
committer | Nasreddine Bouafif <nasreddine.bouafif@gmail.com> | 2010-06-11 15:31:50 +0400 |
commit | 43b43fce7bf38c28e221d1b72fdd554f9a199142 (patch) | |
tree | ce4fa7a4ca3a81582633f5458bf8a49dd8e1f3ce /redis.c | |
parent | c6a510af7c49fe820c8a4b30e3ad5e85703a7b5d (diff) |
publish/subscribe/unsubscribe implementation, fiw warnings
Diffstat (limited to 'redis.c')
-rwxr-xr-x | redis.c | 391 |
1 files changed, 345 insertions, 46 deletions
@@ -31,7 +31,8 @@ #include "library.h" #define _NL "\r\n" - +#define R_SUB_CALLBACK_CLASS_TYPE 1 +#define R_SUB_CALLBACK_FT_TYPE 2 static int le_redis_sock; static int le_redis_multi_access_type; @@ -134,6 +135,10 @@ static zend_function_entry redis_functions[] = { PHP_ME(Redis, exec, NULL, ZEND_ACC_PUBLIC) PHP_ME(Redis, pipeline, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Redis, publish, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Redis, subscribe, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Redis, unsubscribe, NULL, ZEND_ACC_PUBLIC) + /* aliases */ PHP_MALIAS(Redis, open, connect, NULL, ZEND_ACC_PUBLIC) PHP_MALIAS(Redis, lLen, lSize, NULL, ZEND_ACC_PUBLIC) @@ -342,7 +347,6 @@ PHPAPI int get_flag(zval *object) PHPAPI void set_flag(zval *object, int new_flag) { zval **multi_flag = NULL; - int flag_result; zend_hash_find(Z_OBJPROP_P(object), "multi_flag", sizeof("multi_flag"), (void **) &multi_flag); zend_list_delete(Z_LVAL_PP(multi_flag)); @@ -512,8 +516,8 @@ PHP_METHOD(Redis, getSet) zval *object; RedisSock *redis_sock; - char *key = NULL, *val = NULL, *cmd, *response; - int key_len, val_len, cmd_len, response_len; + char *key = NULL, *val = NULL, *cmd; + int key_len, val_len, cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss", &object, redis_ce, &key, &key_len, @@ -555,8 +559,8 @@ PHP_METHOD(Redis, randomKey) zval *object; RedisSock *redis_sock; - char *cmd, *response, *ret; - int cmd_len, response_len; + char *cmd; + int cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", &object, redis_ce) == FAILURE) { @@ -673,8 +677,8 @@ PHP_METHOD(Redis, get) { zval *object; RedisSock *redis_sock; - char *key = NULL, *cmd, *response; - int key_len, cmd_len, response_len; + char *key = NULL, *cmd; + int key_len, cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os", &object, redis_ce, @@ -712,8 +716,7 @@ PHP_METHOD(Redis, ping) { zval *object; RedisSock *redis_sock; - char *response; - int cmd_len, response_len; + int cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", &object, redis_ce) == FAILURE) { @@ -814,7 +817,7 @@ PHP_METHOD(Redis, getMultiple) HashPosition pointer; RedisSock *redis_sock; char *cmd = "", *old_cmd = NULL; - int cmd_len, response_len, array_count; + int cmd_len, array_count; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oa", &object, redis_ce, &array) == FAILURE) { @@ -1125,9 +1128,8 @@ generic_pop_function(INTERNAL_FUNCTION_PARAMETERS, char *keyword, int keyword_le zval *object; RedisSock *redis_sock; - char *key = NULL, *cmd, *response; - int key_len, cmd_len, response_len; - long type = 0; + char *key = NULL, *cmd; + int key_len, cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os", &object, redis_ce, @@ -1681,8 +1683,8 @@ PHPAPI int generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAMETERS, char *keyword zend_hash_has_more_elements(keytable) == SUCCESS; zend_hash_move_forward(keytable), i++) { - char *key, *val; - int key_len, val_len; + char *key; + int key_len; unsigned long idx; int type; zval **z_value_pp; @@ -1741,7 +1743,6 @@ PHPAPI int generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAMETERS, char *keyword */ PHP_METHOD(Redis, sInter) { - int response_len; RedisSock *redis_sock; generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, @@ -1785,7 +1786,6 @@ PHP_METHOD(Redis, sInterStore) { */ PHP_METHOD(Redis, sUnion) { - int response_len; RedisSock *redis_sock; generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, @@ -1825,7 +1825,6 @@ PHP_METHOD(Redis, sUnionStore) { */ PHP_METHOD(Redis, sDiff) { - int response_len; RedisSock *redis_sock; generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, @@ -1871,23 +1870,13 @@ PHPAPI void generic_sort_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sort, int use_a zval *object; RedisSock *redis_sock; char *key = NULL, *pattern = NULL, *get = NULL, *store = NULL, *cmd; - int key_len, pattern_len = -1, get_len = -1, store_len = -1, cmd_len, response_len; + int key_len, pattern_len = -1, get_len = -1, store_len = -1, cmd_len; long sort_start = -1, sort_count = -1; int cmd_elements; long use_pound = 0; - char *by_cmd = ""; - char *by_arg = ""; - - char *get_cmd = ""; - char *get_arg = ""; - char *get_pound = ""; - - char *limit = ""; - - char *alpha = ""; char *cmd_lines[30]; int cmd_sizes[30]; @@ -2677,7 +2666,7 @@ PHP_METHOD(Redis, zRange) zval *object; RedisSock *redis_sock; char *key = NULL, *cmd; - int key_len, cmd_len, response_len; + int key_len, cmd_len; long start, end; long withscores = 0; @@ -2744,7 +2733,7 @@ PHP_METHOD(Redis, zDelete) zval *object; RedisSock *redis_sock; char *key = NULL, *member = NULL, *cmd; - int key_len, member_len, cmd_len, count; + int key_len, member_len, cmd_len, count; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss", &object, redis_ce, &key, &key_len, @@ -3077,7 +3066,7 @@ PHP_METHOD(Redis, zScore) zval *object; RedisSock *redis_sock; char *key = NULL, *member = NULL, *cmd; - int key_len, member_len, cmd_len, count; + int key_len, member_len, cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss", &object, redis_ce, &key, &key_len, @@ -3168,7 +3157,7 @@ PHPAPI void generic_z_command(INTERNAL_FUNCTION_PARAMETERS, char *command, int c HashPosition pointer; char *cmd = ""; - int cmd_len, response_len, array_count, cmd_elements; + int cmd_len, cmd_elements; if(zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Osa|as", &object, redis_ce, @@ -3366,8 +3355,8 @@ PHP_METHOD(Redis, hGet) { zval *object; RedisSock *redis_sock; - char *key = NULL, *cmd, *member, *response; - int key_len, member_len, cmd_len, response_len; + char *key = NULL, *cmd, *member; + int key_len, member_len, cmd_len; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss", &object, redis_ce, @@ -3598,7 +3587,7 @@ PHPAPI void array_zip_values_and_scores(zval *z_tab, int use_atof TSRMLS_DC) { array_init(z_ret); HashTable *keytable = Z_ARRVAL_P(z_tab); - int i = 0; + for(zend_hash_internal_pointer_reset(keytable); zend_hash_has_more_elements(keytable) == SUCCESS; zend_hash_move_forward(keytable)) { @@ -3649,8 +3638,8 @@ PHP_METHOD(Redis, hIncrBy) { zval *object; RedisSock *redis_sock; - char *key = NULL, *cmd, *member, *response; - int key_len, member_len, cmd_len, response_len; + char *key = NULL, *cmd, *member; + int key_len, member_len, cmd_len; long val; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Ossl", @@ -3830,8 +3819,7 @@ PHPAPI int redis_sock_read_multibulk_multi_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock TSRMLS_DC) { - char inbuf[1024], *response; - int response_len; + char inbuf[1024]; redis_check_eof(redis_sock TSRMLS_CC); @@ -3887,8 +3875,7 @@ PHP_METHOD(Redis, exec) RedisSock *redis_sock; char *cmd; - int response_len, cmd_len; - char * response; + int cmd_len; zval *object; struct request_item *ri; @@ -3981,9 +3968,6 @@ PHPAPI int redis_sock_read_multibulk_multi_reply_loop(INTERNAL_FUNCTION_PARAMETE PHP_METHOD(Redis, pipeline) { RedisSock *redis_sock; - char *cmd; - int response_len, cmd_len; - char * response; zval *object; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", @@ -4006,5 +3990,320 @@ PHP_METHOD(Redis, pipeline) RETURN_ZVAL(getThis(), 1, 0); } + +/* + publish channel message + @return the number of subscribers +*/ +PHP_METHOD(Redis, publish) +{ + zval *object; + RedisSock *redis_sock; + char *cmd, *key, *val; + int cmd_len, key_len, val_len; + + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss", + &object, redis_ce, + &key, &key_len, &val, &val_len) == FAILURE) { + RETURN_NULL(); + } + + if (redis_sock_get(object, &redis_sock TSRMLS_CC) < 0) { + RETURN_FALSE; + } + + cmd_len = redis_cmd_format(&cmd, "PUBLISH %s %d\r\n%s\r\n", + key, key_len, + val_len, + val, val_len); + + if (redis_sock_write(redis_sock, cmd, cmd_len) < 0) { + efree(cmd); + RETURN_FALSE; + } + efree(cmd); + redis_long_response(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, NULL TSRMLS_CC); +} + +/* + subscribe channel_1 channel_2 ... channel_n + subscribe(array(channel_1, channel_2, ..., channel_n), callback) +*/ +PHP_METHOD(Redis, subscribe) +{ + zval *z_callback,*object, *array, **data; + HashTable *arr_hash; + HashPosition pointer; + RedisSock *redis_sock; + char *cmd = "", *old_cmd = NULL, *callback_ft_name; + int cmd_len, array_count, callback_ft_name_len; + + + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oasz", + &object, redis_ce, &array, &callback_ft_name, &callback_ft_name_len, &z_callback) == FAILURE) { + RETURN_FALSE; + } + + if (redis_sock_get(object, &redis_sock TSRMLS_CC) < 0) { + RETURN_FALSE; + } + + arr_hash = Z_ARRVAL_P(array); + array_count = zend_hash_num_elements(arr_hash); + + if (array_count == 0) { + RETURN_FALSE; + } + for (zend_hash_internal_pointer_reset_ex(arr_hash, &pointer); + zend_hash_get_current_data_ex(arr_hash, (void**) &data, + &pointer) == SUCCESS; + zend_hash_move_forward_ex(arr_hash, &pointer)) { + + if (Z_TYPE_PP(data) == IS_STRING) { + char *old_cmd = NULL; + if(*cmd) { + old_cmd = cmd; + } + cmd_len = spprintf(&cmd, 0, "%s %s", cmd, Z_STRVAL_PP(data)); + if(old_cmd) { + efree(old_cmd); + } + } + } + + old_cmd = cmd; + cmd_len = spprintf(&cmd, 0, "SUBSCRIBE %s\r\n", cmd); + efree(old_cmd); + if (redis_sock_write(redis_sock, cmd, cmd_len) < 0) { + efree(cmd); + RETURN_FALSE; + } + efree(cmd); + + /* read the status of the execution of the command `subscribe` */ + zval *z_tab, **tmp; + char *type_response; + + z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU, + redis_sock TSRMLS_CC); + + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&tmp) == SUCCESS) { + type_response = Z_STRVAL_PP(tmp); + if(strcmp(type_response, "subscribe") != 0) { + efree(tmp); + efree(z_tab); + RETURN_FALSE; + } + } else { + efree(z_tab); + RETURN_FALSE; + } + efree(z_tab); + + int callback_type; + zval *z_o, *z_fun,*z_ret, *z_args[3]; + char *class_name, *method_name; + zend_class_entry **class_entry_pp, *ce; + + MAKE_STD_ZVAL(z_ret); + + /* verify the callback */ + if(Z_TYPE_P(z_callback) == IS_ARRAY) { + + if (zend_hash_index_find(Z_ARRVAL_P(z_callback), 0, (void**)&tmp) == FAILURE) { + RETURN_FALSE; + } + + class_name = Z_STRVAL_PP(tmp); + + if (zend_hash_index_find(Z_ARRVAL_P(z_callback), 1, (void**)&tmp) == FAILURE) { + RETURN_FALSE; + } + + method_name = Z_STRVAL_PP(tmp); + if(zend_lookup_class(class_name, strlen(class_name), &class_entry_pp TSRMLS_CC) == FAILURE) { + /* The class didn't exist */ + /* generate error */ + RETURN_FALSE; + } + + + ce = *class_entry_pp; + // create an empty object. + MAKE_STD_ZVAL(z_o); + object_init_ex(z_o, ce); + + ALLOC_INIT_ZVAL(z_fun); + ZVAL_STRING(z_fun, method_name, 1); + callback_type = R_SUB_CALLBACK_CLASS_TYPE; + + } else if(Z_TYPE_P(z_callback) == IS_STRING) { + callback_ft_name = Z_STRVAL_P(z_callback); + callback_ft_name_len = strlen(callback_ft_name); + callback_type = R_SUB_CALLBACK_FT_TYPE; + } + + /* Multibulk Response, format : {message type, originating channel, message payload} */ + while(1) { + /* call the callback with this z_tab in argument */ + z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU, + redis_sock TSRMLS_CC); + zval **type, **channel, **data; + + if(Z_TYPE_P(z_tab) == IS_NULL) { + //ERROR + break; + } + + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE) { + break; + } + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&channel) == FAILURE) { + break; + } + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&data) == FAILURE) { + break; + } + + z_args[0] = getThis(); + z_args[1] = *channel; + z_args[2] = *data; + + switch(callback_type) { + case R_SUB_CALLBACK_CLASS_TYPE: + call_user_function(&ce->function_table, &z_o, z_fun, z_ret, 3, z_args TSRMLS_CC); + //efree(z_o); + //efree(z_fun); + //zval_dtor(z_ret); efree(z_ret); + //free(z_args[0]); free(z_args[1]); free(z_args[2]); + //free(z_args); + + break; + case R_SUB_CALLBACK_FT_TYPE: + MAKE_STD_ZVAL(z_ret); + MAKE_STD_ZVAL(z_fun); + ZVAL_STRINGL(z_fun, callback_ft_name, callback_ft_name_len, 0); + call_user_function(EG(function_table), NULL, z_fun, z_ret, 3, z_args TSRMLS_CC); + efree(z_fun); + //free(z_args[0]); free(z_args[1]); free(z_args[2]); + //free(z_args); + break; + } + + if(Z_TYPE_P(z_ret) == IS_BOOL) { + // the callback function return TRUE if we want to continue listening on the channel + // or FALSE if we need to stop listeneing + if(!Z_BVAL_P(z_ret)) { + efree(z_o); + efree(z_fun); + zval_dtor(z_tab); + efree(z_tab); + zval_dtor(z_ret); + efree(z_ret); + break; + } + } else { + //error : the callback must return BOOL reponse + efree(z_o); + efree(z_fun); + zval_dtor(z_tab); + efree(z_tab); + zval_dtor(z_ret); + efree(z_ret); + RETURN_FALSE; + } + zval_dtor(z_tab); + efree(z_tab); + } + /*@TODO : collect all the returned data and return it */ +} + +/** + * unsubscribe channel_0 channel_1 ... channel_n + * unsubscribe(array(channel_0, channel_1, ..., channel_n)) + * response format : + * array( + * channel_0 => TRUE|FALSE, + * channel_1 => TRUE_FALSE, + * ... + * channel_n => TRUE|FALSE + * ); + **/ + +PHP_METHOD(Redis, unsubscribe) +{ + zval *object, *array, **data; + HashTable *arr_hash; + HashPosition pointer; + RedisSock *redis_sock; + char *cmd = "", *old_cmd = NULL; + int cmd_len, array_count; + + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oa", + &object, redis_ce, &array) == FAILURE) { + RETURN_FALSE; + } + if (redis_sock_get(object, &redis_sock TSRMLS_CC) < 0) { + RETURN_FALSE; + } + + arr_hash = Z_ARRVAL_P(array); + array_count = zend_hash_num_elements(arr_hash); + + if (array_count == 0) { + RETURN_FALSE; + } + + for (zend_hash_internal_pointer_reset_ex(arr_hash, &pointer); + zend_hash_get_current_data_ex(arr_hash, (void**) &data, + &pointer) == SUCCESS; + zend_hash_move_forward_ex(arr_hash, &pointer)) { + + if (Z_TYPE_PP(data) == IS_STRING) { + char *old_cmd = NULL; + if(*cmd) { + old_cmd = cmd; + } + cmd_len = spprintf(&cmd, 0, "%s %s", cmd, Z_STRVAL_PP(data)); + if(old_cmd) { + efree(old_cmd); + } + } + } + + old_cmd = cmd; + cmd_len = spprintf(&cmd, 0, "UNSUBSCRIBE %s\r\n", cmd); + efree(old_cmd); + + if (redis_sock_write(redis_sock, cmd, cmd_len) < 0) { + efree(cmd); + RETURN_FALSE; + } + efree(cmd); + + int i = 1; + zval *z_tab, **z_channel; + + array_init(return_value); + + while( i <= array_count) { + z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU, + redis_sock TSRMLS_CC); + + if(Z_TYPE_P(z_tab) == IS_ARRAY) { + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&z_channel) == FAILURE) { + RETURN_FALSE; + } + add_assoc_bool(return_value, Z_STRVAL_PP(z_channel), 1); + } else { + //error + efree(z_tab); + RETURN_FALSE; + } + efree(z_tab); + i ++; + } +} + /* vim: set tabstop=4 expandtab: */ |