diff options
author | michael-grunder <michael.grunder@gmail.com> | 2012-09-09 19:13:44 +0400 |
---|---|---|
committer | michael-grunder <michael.grunder@gmail.com> | 2012-09-09 19:13:44 +0400 |
commit | d1b12168527dc0bb06d7fab51924af28c2028a1b (patch) | |
tree | 1d30f5228eebc072f78d82235d1966479e3a982b | |
parent | 9eb217c061dbc60609f426b6fc365e0c3ef0580c (diff) |
PSUBSCRIBE
Implemented PSUBSCRIBE/PUNSUBSCRIBE and changed the present
subscribe and unsubscribe functions into generic versions
that can handle both cases.
-rw-r--r-- | README.markdown | 14 | ||||
-rw-r--r-- | php_redis.h | 5 | ||||
-rw-r--r-- | redis.c | 96 |
3 files changed, 89 insertions, 26 deletions
diff --git a/README.markdown b/README.markdown index 90b55a59..5b8bdd2e 100644 --- a/README.markdown +++ b/README.markdown @@ -377,6 +377,20 @@ function f($redis, $chan, $msg) { $redis->subscribe(array('chan-1', 'chan-2', 'chan-3'), 'f'); // subscribe to 3 chans </pre> +## psubscribe +##### Description +Subscribe to channels by pattern +##### Parameters +*patterns*: An array of patterns to match +*callback*: Either a string or an array with an object and method. The callback will get four arguments ($redis, $pattern, $channel, $message) +##### Example +<pre> +function psubscribe($redis, $pattern, $chan, $msg) { + echo "Pattern: $pattern\n"; + echo "Channel: $chan\n"; + echo "Payload: $msg\n"; +} +</pre> ## publish ##### Description diff --git a/php_redis.h b/php_redis.h index 71e29ffd..5ffe5c31 100644 --- a/php_redis.h +++ b/php_redis.h @@ -171,7 +171,9 @@ PHP_METHOD(Redis, pipeline); PHP_METHOD(Redis, publish); PHP_METHOD(Redis, subscribe); +PHP_METHOD(Redis, psubscribe); PHP_METHOD(Redis, unsubscribe); +PHP_METHOD(Redis, punsubscribe); PHP_METHOD(Redis, getOption); PHP_METHOD(Redis, setOption); @@ -204,6 +206,9 @@ PHPAPI void generic_empty_cmd_impl(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int PHPAPI void generic_empty_cmd(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len, ...); PHPAPI void generic_empty_long_cmd(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len, ...); +PHPAPI void generic_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sub_cmd); +PHPAPI void generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *unsub_cmd); + PHPAPI void array_zip_values_and_scores(RedisSock *redis_sock, zval *z_tab, int use_atof TSRMLS_DC); PHPAPI int redis_response_enqueued(RedisSock *redis_sock TSRMLS_DC); @@ -206,7 +206,9 @@ static zend_function_entry redis_functions[] = { PHP_ME(Redis, publish, NULL, ZEND_ACC_PUBLIC) PHP_ME(Redis, subscribe, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Redis, psubscribe, NULL, ZEND_ACC_PUBLIC) PHP_ME(Redis, unsubscribe, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Redis, punsubscribe, NULL, ZEND_ACC_PUBLIC) PHP_ME(Redis, time, NULL, ZEND_ACC_PUBLIC) @@ -5274,11 +5276,7 @@ PHP_METHOD(Redis, publish) REDIS_PROCESS_RESPONSE(redis_long_response); } -/* - subscribe channel_1 channel_2 ... channel_n - subscribe(array(channel_1, channel_2, ..., channel_n), callback) -*/ -PHP_METHOD(Redis, subscribe) +PHPAPI void generic_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sub_cmd) { zval *z_callback,*object, *array, **data; HashTable *arr_hash; @@ -5286,12 +5284,11 @@ PHP_METHOD(Redis, subscribe) RedisSock *redis_sock; char *cmd = "", *old_cmd = NULL, *callback_ft_name; int cmd_len, array_count, callback_ft_name_len; - zval *z_tab, **tmp; char *type_response; int callback_type = 0; - zval *z_o, *z_fun = NULL,*z_ret, *z_args[3]; + zval *z_o, *z_fun = NULL,*z_ret, *z_args[4]; char *method_name; if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oaz|z", @@ -5327,7 +5324,7 @@ PHP_METHOD(Redis, subscribe) } old_cmd = cmd; - cmd_len = spprintf(&cmd, 0, "SUBSCRIBE %s\r\n", cmd); + cmd_len = spprintf(&cmd, 0, "%s %s\r\n", sub_cmd, cmd); efree(old_cmd); if (redis_sock_write(redis_sock, cmd, cmd_len TSRMLS_CC) < 0) { efree(cmd); @@ -5344,7 +5341,7 @@ PHP_METHOD(Redis, subscribe) if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&tmp) == SUCCESS) { type_response = Z_STRVAL_PP(tmp); - if(strcmp(type_response, "subscribe") != 0) { + if(strcmp(type_response, sub_cmd) != 0) { efree(tmp); efree(z_tab); RETURN_FALSE; @@ -5385,42 +5382,66 @@ PHP_METHOD(Redis, subscribe) /* Multibulk Response, format : {message type, originating channel, message payload} */ while(1) { /* call the callback with this z_tab in argument */ - zval **type, **channel, **data; + zval **type, **channel, **pattern, **data; z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock); + int is_pmsg, tab_idx = 1; if(z_tab == NULL || Z_TYPE_P(z_tab) != IS_ARRAY) { //ERROR break; } - if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE) { + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE || Z_TYPE_PP(type) != IS_STRING) { break; } - if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&channel) == FAILURE) { + + // Make sure we have a message or pmessage + if(!strncmp(Z_STRVAL_PP(type), "message", 7) || !strncmp(Z_STRVAL_PP(type), "pmessage", 8)) { + // Is this a pmessage + is_pmsg = *Z_STRVAL_PP(type) == 'p'; + } else { + continue; // It's not a message or pmessage + } + + // If this is a pmessage, we'll want to extract the pattern first + if(is_pmsg) { + // Extract pattern + if(zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&pattern) == FAILURE) { + break; + } + } + + // Extract channel and data + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&channel) == FAILURE) { break; - } - if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&data) == FAILURE) { + } + if (zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&data) == FAILURE) { break; } - if(Z_TYPE_PP(type) == IS_STRING && strncmp(Z_STRVAL_PP(type), "message", 7) != 0) { - continue; /* only forwarding published messages */ - } - + // Always pass the Redis object through z_args[0] = getThis(); - z_args[1] = *channel; - z_args[2] = *data; + + // Set up our callback args depending on the message type + if(is_pmsg) { + z_args[1] = *pattern; + z_args[2] = *channel; + z_args[3] = *data; + } else { + z_args[1] = *channel; + z_args[2] = *data; + } switch(callback_type) { case R_SUB_CALLBACK_CLASS_TYPE: MAKE_STD_ZVAL(z_ret); - call_user_function(&redis_ce->function_table, &z_o, z_fun, z_ret, 3, z_args TSRMLS_CC); + call_user_function(&redis_ce->function_table, &z_o, z_fun, z_ret, tab_idx, z_args TSRMLS_CC); efree(z_ret); break; case R_SUB_CALLBACK_FT_TYPE: MAKE_STD_ZVAL(z_ret); - call_user_function(EG(function_table), NULL, z_fun, z_ret, 3, z_args TSRMLS_CC); + call_user_function(EG(function_table), NULL, z_fun, z_ret, tab_idx, z_args TSRMLS_CC); efree(z_ret); break; } @@ -5433,9 +5454,22 @@ PHP_METHOD(Redis, subscribe) efree(z_fun); } +/* {{{ proto void Redis::psubscribe(Array(channel1, channel2, ... channelN)) + */ +PHP_METHOD(Redis, psubscribe) +{ + generic_subscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "psubscribe"); +} + +/* {{{ proto void Redis::psubscribe(Array(channel1, channel2, ... channelN)) + */ +PHP_METHOD(Redis, subscribe) { + generic_subscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "subscribe"); +} + /** - * unsubscribe channel_0 channel_1 ... channel_n - * unsubscribe(array(channel_0, channel_1, ..., channel_n)) + * [p]unsubscribe channel_0 channel_1 ... channel_n + * [p]unsubscribe(array(channel_0, channel_1, ..., channel_n)) * response format : * array( * channel_0 => TRUE|FALSE, @@ -5445,7 +5479,7 @@ PHP_METHOD(Redis, subscribe) * ); **/ -PHP_METHOD(Redis, unsubscribe) +PHPAPI void generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *unsub_cmd) { zval *object, *array, **data; HashTable *arr_hash; @@ -5490,7 +5524,7 @@ PHP_METHOD(Redis, unsubscribe) } old_cmd = cmd; - cmd_len = spprintf(&cmd, 0, "UNSUBSCRIBE %s\r\n", cmd); + cmd_len = spprintf(&cmd, 0, "%s %s\r\n", unsub_cmd, cmd); efree(old_cmd); if (redis_sock_write(redis_sock, cmd, cmd_len TSRMLS_CC) < 0) { @@ -5520,6 +5554,16 @@ PHP_METHOD(Redis, unsubscribe) } } +PHP_METHOD(Redis, unsubscribe) +{ + generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "UNSUBSCRIBE"); +} + +PHP_METHOD(Redis, punsubscribe) +{ + generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "PUNSUBSCRIBE"); +} + /* {{{ proto string Redis::bgrewriteaof() */ PHP_METHOD(Redis, bgrewriteaof) |