Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNasreddine Bouafif <nasreddine.bouafif@gmail.com>2010-06-11 15:31:50 +0400
committerNasreddine Bouafif <nasreddine.bouafif@gmail.com>2010-06-11 15:31:50 +0400
commit43b43fce7bf38c28e221d1b72fdd554f9a199142 (patch)
treece4fa7a4ca3a81582633f5458bf8a49dd8e1f3ce /redis.c
parentc6a510af7c49fe820c8a4b30e3ad5e85703a7b5d (diff)
publish/subscribe/unsubscribe implementation, fiw warnings
Diffstat (limited to 'redis.c')
-rwxr-xr-xredis.c391
1 files changed, 345 insertions, 46 deletions
diff --git a/redis.c b/redis.c
index 07b9e3ff..9d05265c 100755
--- a/redis.c
+++ b/redis.c
@@ -31,7 +31,8 @@
#include "library.h"
#define _NL "\r\n"
-
+#define R_SUB_CALLBACK_CLASS_TYPE 1
+#define R_SUB_CALLBACK_FT_TYPE 2
static int le_redis_sock;
static int le_redis_multi_access_type;
@@ -134,6 +135,10 @@ static zend_function_entry redis_functions[] = {
PHP_ME(Redis, exec, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Redis, pipeline, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, publish, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, subscribe, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, unsubscribe, NULL, ZEND_ACC_PUBLIC)
+
/* aliases */
PHP_MALIAS(Redis, open, connect, NULL, ZEND_ACC_PUBLIC)
PHP_MALIAS(Redis, lLen, lSize, NULL, ZEND_ACC_PUBLIC)
@@ -342,7 +347,6 @@ PHPAPI int get_flag(zval *object)
PHPAPI void set_flag(zval *object, int new_flag)
{
zval **multi_flag = NULL;
- int flag_result;
zend_hash_find(Z_OBJPROP_P(object), "multi_flag", sizeof("multi_flag"), (void **) &multi_flag);
zend_list_delete(Z_LVAL_PP(multi_flag));
@@ -512,8 +516,8 @@ PHP_METHOD(Redis, getSet)
zval *object;
RedisSock *redis_sock;
- char *key = NULL, *val = NULL, *cmd, *response;
- int key_len, val_len, cmd_len, response_len;
+ char *key = NULL, *val = NULL, *cmd;
+ int key_len, val_len, cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss",
&object, redis_ce, &key, &key_len,
@@ -555,8 +559,8 @@ PHP_METHOD(Redis, randomKey)
zval *object;
RedisSock *redis_sock;
- char *cmd, *response, *ret;
- int cmd_len, response_len;
+ char *cmd;
+ int cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O",
&object, redis_ce) == FAILURE) {
@@ -673,8 +677,8 @@ PHP_METHOD(Redis, get)
{
zval *object;
RedisSock *redis_sock;
- char *key = NULL, *cmd, *response;
- int key_len, cmd_len, response_len;
+ char *key = NULL, *cmd;
+ int key_len, cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os",
&object, redis_ce,
@@ -712,8 +716,7 @@ PHP_METHOD(Redis, ping)
{
zval *object;
RedisSock *redis_sock;
- char *response;
- int cmd_len, response_len;
+ int cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O",
&object, redis_ce) == FAILURE) {
@@ -814,7 +817,7 @@ PHP_METHOD(Redis, getMultiple)
HashPosition pointer;
RedisSock *redis_sock;
char *cmd = "", *old_cmd = NULL;
- int cmd_len, response_len, array_count;
+ int cmd_len, array_count;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oa",
&object, redis_ce, &array) == FAILURE) {
@@ -1125,9 +1128,8 @@ generic_pop_function(INTERNAL_FUNCTION_PARAMETERS, char *keyword, int keyword_le
zval *object;
RedisSock *redis_sock;
- char *key = NULL, *cmd, *response;
- int key_len, cmd_len, response_len;
- long type = 0;
+ char *key = NULL, *cmd;
+ int key_len, cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os",
&object, redis_ce,
@@ -1681,8 +1683,8 @@ PHPAPI int generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAMETERS, char *keyword
zend_hash_has_more_elements(keytable) == SUCCESS;
zend_hash_move_forward(keytable), i++) {
- char *key, *val;
- int key_len, val_len;
+ char *key;
+ int key_len;
unsigned long idx;
int type;
zval **z_value_pp;
@@ -1741,7 +1743,6 @@ PHPAPI int generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAMETERS, char *keyword
*/
PHP_METHOD(Redis, sInter) {
- int response_len;
RedisSock *redis_sock;
generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU,
@@ -1785,7 +1786,6 @@ PHP_METHOD(Redis, sInterStore) {
*/
PHP_METHOD(Redis, sUnion) {
- int response_len;
RedisSock *redis_sock;
generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU,
@@ -1825,7 +1825,6 @@ PHP_METHOD(Redis, sUnionStore) {
*/
PHP_METHOD(Redis, sDiff) {
- int response_len;
RedisSock *redis_sock;
generic_multiple_args_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU,
@@ -1871,23 +1870,13 @@ PHPAPI void generic_sort_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sort, int use_a
zval *object;
RedisSock *redis_sock;
char *key = NULL, *pattern = NULL, *get = NULL, *store = NULL, *cmd;
- int key_len, pattern_len = -1, get_len = -1, store_len = -1, cmd_len, response_len;
+ int key_len, pattern_len = -1, get_len = -1, store_len = -1, cmd_len;
long sort_start = -1, sort_count = -1;
int cmd_elements;
long use_pound = 0;
- char *by_cmd = "";
- char *by_arg = "";
-
- char *get_cmd = "";
- char *get_arg = "";
- char *get_pound = "";
-
- char *limit = "";
-
- char *alpha = "";
char *cmd_lines[30];
int cmd_sizes[30];
@@ -2677,7 +2666,7 @@ PHP_METHOD(Redis, zRange)
zval *object;
RedisSock *redis_sock;
char *key = NULL, *cmd;
- int key_len, cmd_len, response_len;
+ int key_len, cmd_len;
long start, end;
long withscores = 0;
@@ -2744,7 +2733,7 @@ PHP_METHOD(Redis, zDelete)
zval *object;
RedisSock *redis_sock;
char *key = NULL, *member = NULL, *cmd;
- int key_len, member_len, cmd_len, count;
+ int key_len, member_len, cmd_len, count;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss",
&object, redis_ce, &key, &key_len,
@@ -3077,7 +3066,7 @@ PHP_METHOD(Redis, zScore)
zval *object;
RedisSock *redis_sock;
char *key = NULL, *member = NULL, *cmd;
- int key_len, member_len, cmd_len, count;
+ int key_len, member_len, cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss",
&object, redis_ce, &key, &key_len,
@@ -3168,7 +3157,7 @@ PHPAPI void generic_z_command(INTERNAL_FUNCTION_PARAMETERS, char *command, int c
HashPosition pointer;
char *cmd = "";
- int cmd_len, response_len, array_count, cmd_elements;
+ int cmd_len, cmd_elements;
if(zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Osa|as",
&object, redis_ce,
@@ -3366,8 +3355,8 @@ PHP_METHOD(Redis, hGet)
{
zval *object;
RedisSock *redis_sock;
- char *key = NULL, *cmd, *member, *response;
- int key_len, member_len, cmd_len, response_len;
+ char *key = NULL, *cmd, *member;
+ int key_len, member_len, cmd_len;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss",
&object, redis_ce,
@@ -3598,7 +3587,7 @@ PHPAPI void array_zip_values_and_scores(zval *z_tab, int use_atof TSRMLS_DC) {
array_init(z_ret);
HashTable *keytable = Z_ARRVAL_P(z_tab);
- int i = 0;
+
for(zend_hash_internal_pointer_reset(keytable);
zend_hash_has_more_elements(keytable) == SUCCESS;
zend_hash_move_forward(keytable)) {
@@ -3649,8 +3638,8 @@ PHP_METHOD(Redis, hIncrBy)
{
zval *object;
RedisSock *redis_sock;
- char *key = NULL, *cmd, *member, *response;
- int key_len, member_len, cmd_len, response_len;
+ char *key = NULL, *cmd, *member;
+ int key_len, member_len, cmd_len;
long val;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Ossl",
@@ -3830,8 +3819,7 @@ PHPAPI int redis_sock_read_multibulk_multi_reply(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock TSRMLS_DC)
{
- char inbuf[1024], *response;
- int response_len;
+ char inbuf[1024];
redis_check_eof(redis_sock TSRMLS_CC);
@@ -3887,8 +3875,7 @@ PHP_METHOD(Redis, exec)
RedisSock *redis_sock;
char *cmd;
- int response_len, cmd_len;
- char * response;
+ int cmd_len;
zval *object;
struct request_item *ri;
@@ -3981,9 +3968,6 @@ PHPAPI int redis_sock_read_multibulk_multi_reply_loop(INTERNAL_FUNCTION_PARAMETE
PHP_METHOD(Redis, pipeline)
{
RedisSock *redis_sock;
- char *cmd;
- int response_len, cmd_len;
- char * response;
zval *object;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O",
@@ -4006,5 +3990,320 @@ PHP_METHOD(Redis, pipeline)
RETURN_ZVAL(getThis(), 1, 0);
}
+
+/*
+ publish channel message
+ @return the number of subscribers
+*/
+PHP_METHOD(Redis, publish)
+{
+ zval *object;
+ RedisSock *redis_sock;
+ char *cmd, *key, *val;
+ int cmd_len, key_len, val_len;
+
+ if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oss",
+ &object, redis_ce,
+ &key, &key_len, &val, &val_len) == FAILURE) {
+ RETURN_NULL();
+ }
+
+ if (redis_sock_get(object, &redis_sock TSRMLS_CC) < 0) {
+ RETURN_FALSE;
+ }
+
+ cmd_len = redis_cmd_format(&cmd, "PUBLISH %s %d\r\n%s\r\n",
+ key, key_len,
+ val_len,
+ val, val_len);
+
+ if (redis_sock_write(redis_sock, cmd, cmd_len) < 0) {
+ efree(cmd);
+ RETURN_FALSE;
+ }
+ efree(cmd);
+ redis_long_response(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, NULL TSRMLS_CC);
+}
+
+/*
+ subscribe channel_1 channel_2 ... channel_n
+ subscribe(array(channel_1, channel_2, ..., channel_n), callback)
+*/
+PHP_METHOD(Redis, subscribe)
+{
+ zval *z_callback,*object, *array, **data;
+ HashTable *arr_hash;
+ HashPosition pointer;
+ RedisSock *redis_sock;
+ char *cmd = "", *old_cmd = NULL, *callback_ft_name;
+ int cmd_len, array_count, callback_ft_name_len;
+
+
+ if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oasz",
+ &object, redis_ce, &array, &callback_ft_name, &callback_ft_name_len, &z_callback) == FAILURE) {
+ RETURN_FALSE;
+ }
+
+ if (redis_sock_get(object, &redis_sock TSRMLS_CC) < 0) {
+ RETURN_FALSE;
+ }
+
+ arr_hash = Z_ARRVAL_P(array);
+ array_count = zend_hash_num_elements(arr_hash);
+
+ if (array_count == 0) {
+ RETURN_FALSE;
+ }
+ for (zend_hash_internal_pointer_reset_ex(arr_hash, &pointer);
+ zend_hash_get_current_data_ex(arr_hash, (void**) &data,
+ &pointer) == SUCCESS;
+ zend_hash_move_forward_ex(arr_hash, &pointer)) {
+
+ if (Z_TYPE_PP(data) == IS_STRING) {
+ char *old_cmd = NULL;
+ if(*cmd) {
+ old_cmd = cmd;
+ }
+ cmd_len = spprintf(&cmd, 0, "%s %s", cmd, Z_STRVAL_PP(data));
+ if(old_cmd) {
+ efree(old_cmd);
+ }
+ }
+ }
+
+ old_cmd = cmd;
+ cmd_len = spprintf(&cmd, 0, "SUBSCRIBE %s\r\n", cmd);
+ efree(old_cmd);
+ if (redis_sock_write(redis_sock, cmd, cmd_len) < 0) {
+ efree(cmd);
+ RETURN_FALSE;
+ }
+ efree(cmd);
+
+ /* read the status of the execution of the command `subscribe` */
+ zval *z_tab, **tmp;
+ char *type_response;
+
+ z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU,
+ redis_sock TSRMLS_CC);
+
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&tmp) == SUCCESS) {
+ type_response = Z_STRVAL_PP(tmp);
+ if(strcmp(type_response, "subscribe") != 0) {
+ efree(tmp);
+ efree(z_tab);
+ RETURN_FALSE;
+ }
+ } else {
+ efree(z_tab);
+ RETURN_FALSE;
+ }
+ efree(z_tab);
+
+ int callback_type;
+ zval *z_o, *z_fun,*z_ret, *z_args[3];
+ char *class_name, *method_name;
+ zend_class_entry **class_entry_pp, *ce;
+
+ MAKE_STD_ZVAL(z_ret);
+
+ /* verify the callback */
+ if(Z_TYPE_P(z_callback) == IS_ARRAY) {
+
+ if (zend_hash_index_find(Z_ARRVAL_P(z_callback), 0, (void**)&tmp) == FAILURE) {
+ RETURN_FALSE;
+ }
+
+ class_name = Z_STRVAL_PP(tmp);
+
+ if (zend_hash_index_find(Z_ARRVAL_P(z_callback), 1, (void**)&tmp) == FAILURE) {
+ RETURN_FALSE;
+ }
+
+ method_name = Z_STRVAL_PP(tmp);
+ if(zend_lookup_class(class_name, strlen(class_name), &class_entry_pp TSRMLS_CC) == FAILURE) {
+ /* The class didn't exist */
+ /* generate error */
+ RETURN_FALSE;
+ }
+
+
+ ce = *class_entry_pp;
+ // create an empty object.
+ MAKE_STD_ZVAL(z_o);
+ object_init_ex(z_o, ce);
+
+ ALLOC_INIT_ZVAL(z_fun);
+ ZVAL_STRING(z_fun, method_name, 1);
+ callback_type = R_SUB_CALLBACK_CLASS_TYPE;
+
+ } else if(Z_TYPE_P(z_callback) == IS_STRING) {
+ callback_ft_name = Z_STRVAL_P(z_callback);
+ callback_ft_name_len = strlen(callback_ft_name);
+ callback_type = R_SUB_CALLBACK_FT_TYPE;
+ }
+
+ /* Multibulk Response, format : {message type, originating channel, message payload} */
+ while(1) {
+ /* call the callback with this z_tab in argument */
+ z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU,
+ redis_sock TSRMLS_CC);
+ zval **type, **channel, **data;
+
+ if(Z_TYPE_P(z_tab) == IS_NULL) {
+ //ERROR
+ break;
+ }
+
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE) {
+ break;
+ }
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&channel) == FAILURE) {
+ break;
+ }
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&data) == FAILURE) {
+ break;
+ }
+
+ z_args[0] = getThis();
+ z_args[1] = *channel;
+ z_args[2] = *data;
+
+ switch(callback_type) {
+ case R_SUB_CALLBACK_CLASS_TYPE:
+ call_user_function(&ce->function_table, &z_o, z_fun, z_ret, 3, z_args TSRMLS_CC);
+ //efree(z_o);
+ //efree(z_fun);
+ //zval_dtor(z_ret); efree(z_ret);
+ //free(z_args[0]); free(z_args[1]); free(z_args[2]);
+ //free(z_args);
+
+ break;
+ case R_SUB_CALLBACK_FT_TYPE:
+ MAKE_STD_ZVAL(z_ret);
+ MAKE_STD_ZVAL(z_fun);
+ ZVAL_STRINGL(z_fun, callback_ft_name, callback_ft_name_len, 0);
+ call_user_function(EG(function_table), NULL, z_fun, z_ret, 3, z_args TSRMLS_CC);
+ efree(z_fun);
+ //free(z_args[0]); free(z_args[1]); free(z_args[2]);
+ //free(z_args);
+ break;
+ }
+
+ if(Z_TYPE_P(z_ret) == IS_BOOL) {
+ // the callback function return TRUE if we want to continue listening on the channel
+ // or FALSE if we need to stop listeneing
+ if(!Z_BVAL_P(z_ret)) {
+ efree(z_o);
+ efree(z_fun);
+ zval_dtor(z_tab);
+ efree(z_tab);
+ zval_dtor(z_ret);
+ efree(z_ret);
+ break;
+ }
+ } else {
+ //error : the callback must return BOOL reponse
+ efree(z_o);
+ efree(z_fun);
+ zval_dtor(z_tab);
+ efree(z_tab);
+ zval_dtor(z_ret);
+ efree(z_ret);
+ RETURN_FALSE;
+ }
+ zval_dtor(z_tab);
+ efree(z_tab);
+ }
+ /*@TODO : collect all the returned data and return it */
+}
+
+/**
+ * unsubscribe channel_0 channel_1 ... channel_n
+ * unsubscribe(array(channel_0, channel_1, ..., channel_n))
+ * response format :
+ * array(
+ * channel_0 => TRUE|FALSE,
+ * channel_1 => TRUE_FALSE,
+ * ...
+ * channel_n => TRUE|FALSE
+ * );
+ **/
+
+PHP_METHOD(Redis, unsubscribe)
+{
+ zval *object, *array, **data;
+ HashTable *arr_hash;
+ HashPosition pointer;
+ RedisSock *redis_sock;
+ char *cmd = "", *old_cmd = NULL;
+ int cmd_len, array_count;
+
+ if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oa",
+ &object, redis_ce, &array) == FAILURE) {
+ RETURN_FALSE;
+ }
+ if (redis_sock_get(object, &redis_sock TSRMLS_CC) < 0) {
+ RETURN_FALSE;
+ }
+
+ arr_hash = Z_ARRVAL_P(array);
+ array_count = zend_hash_num_elements(arr_hash);
+
+ if (array_count == 0) {
+ RETURN_FALSE;
+ }
+
+ for (zend_hash_internal_pointer_reset_ex(arr_hash, &pointer);
+ zend_hash_get_current_data_ex(arr_hash, (void**) &data,
+ &pointer) == SUCCESS;
+ zend_hash_move_forward_ex(arr_hash, &pointer)) {
+
+ if (Z_TYPE_PP(data) == IS_STRING) {
+ char *old_cmd = NULL;
+ if(*cmd) {
+ old_cmd = cmd;
+ }
+ cmd_len = spprintf(&cmd, 0, "%s %s", cmd, Z_STRVAL_PP(data));
+ if(old_cmd) {
+ efree(old_cmd);
+ }
+ }
+ }
+
+ old_cmd = cmd;
+ cmd_len = spprintf(&cmd, 0, "UNSUBSCRIBE %s\r\n", cmd);
+ efree(old_cmd);
+
+ if (redis_sock_write(redis_sock, cmd, cmd_len) < 0) {
+ efree(cmd);
+ RETURN_FALSE;
+ }
+ efree(cmd);
+
+ int i = 1;
+ zval *z_tab, **z_channel;
+
+ array_init(return_value);
+
+ while( i <= array_count) {
+ z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU,
+ redis_sock TSRMLS_CC);
+
+ if(Z_TYPE_P(z_tab) == IS_ARRAY) {
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&z_channel) == FAILURE) {
+ RETURN_FALSE;
+ }
+ add_assoc_bool(return_value, Z_STRVAL_PP(z_channel), 1);
+ } else {
+ //error
+ efree(z_tab);
+ RETURN_FALSE;
+ }
+ efree(z_tab);
+ i ++;
+ }
+}
+
/* vim: set tabstop=4 expandtab: */