Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.markdown14
-rw-r--r--php_redis.h5
-rw-r--r--redis.c96
3 files changed, 89 insertions, 26 deletions
diff --git a/README.markdown b/README.markdown
index 90b55a59..5b8bdd2e 100644
--- a/README.markdown
+++ b/README.markdown
@@ -377,6 +377,20 @@ function f($redis, $chan, $msg) {
$redis->subscribe(array('chan-1', 'chan-2', 'chan-3'), 'f'); // subscribe to 3 chans
</pre>
+## psubscribe
+##### Description
+Subscribe to channels by pattern
+##### Parameters
+*patterns*: An array of patterns to match
+*callback*: Either a string or an array with an object and method. The callback will get four arguments ($redis, $pattern, $channel, $message)
+##### Example
+<pre>
+function psubscribe($redis, $pattern, $chan, $msg) {
+ echo "Pattern: $pattern\n";
+ echo "Channel: $chan\n";
+ echo "Payload: $msg\n";
+}
+</pre>
## publish
##### Description
diff --git a/php_redis.h b/php_redis.h
index 71e29ffd..5ffe5c31 100644
--- a/php_redis.h
+++ b/php_redis.h
@@ -171,7 +171,9 @@ PHP_METHOD(Redis, pipeline);
PHP_METHOD(Redis, publish);
PHP_METHOD(Redis, subscribe);
+PHP_METHOD(Redis, psubscribe);
PHP_METHOD(Redis, unsubscribe);
+PHP_METHOD(Redis, punsubscribe);
PHP_METHOD(Redis, getOption);
PHP_METHOD(Redis, setOption);
@@ -204,6 +206,9 @@ PHPAPI void generic_empty_cmd_impl(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int
PHPAPI void generic_empty_cmd(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len, ...);
PHPAPI void generic_empty_long_cmd(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len, ...);
+PHPAPI void generic_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sub_cmd);
+PHPAPI void generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *unsub_cmd);
+
PHPAPI void array_zip_values_and_scores(RedisSock *redis_sock, zval *z_tab, int use_atof TSRMLS_DC);
PHPAPI int redis_response_enqueued(RedisSock *redis_sock TSRMLS_DC);
diff --git a/redis.c b/redis.c
index c3667fc3..99f8d03f 100644
--- a/redis.c
+++ b/redis.c
@@ -206,7 +206,9 @@ static zend_function_entry redis_functions[] = {
PHP_ME(Redis, publish, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Redis, subscribe, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, psubscribe, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Redis, unsubscribe, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, punsubscribe, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Redis, time, NULL, ZEND_ACC_PUBLIC)
@@ -5274,11 +5276,7 @@ PHP_METHOD(Redis, publish)
REDIS_PROCESS_RESPONSE(redis_long_response);
}
-/*
- subscribe channel_1 channel_2 ... channel_n
- subscribe(array(channel_1, channel_2, ..., channel_n), callback)
-*/
-PHP_METHOD(Redis, subscribe)
+PHPAPI void generic_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *sub_cmd)
{
zval *z_callback,*object, *array, **data;
HashTable *arr_hash;
@@ -5286,12 +5284,11 @@ PHP_METHOD(Redis, subscribe)
RedisSock *redis_sock;
char *cmd = "", *old_cmd = NULL, *callback_ft_name;
int cmd_len, array_count, callback_ft_name_len;
-
zval *z_tab, **tmp;
char *type_response;
int callback_type = 0;
- zval *z_o, *z_fun = NULL,*z_ret, *z_args[3];
+ zval *z_o, *z_fun = NULL,*z_ret, *z_args[4];
char *method_name;
if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Oaz|z",
@@ -5327,7 +5324,7 @@ PHP_METHOD(Redis, subscribe)
}
old_cmd = cmd;
- cmd_len = spprintf(&cmd, 0, "SUBSCRIBE %s\r\n", cmd);
+ cmd_len = spprintf(&cmd, 0, "%s %s\r\n", sub_cmd, cmd);
efree(old_cmd);
if (redis_sock_write(redis_sock, cmd, cmd_len TSRMLS_CC) < 0) {
efree(cmd);
@@ -5344,7 +5341,7 @@ PHP_METHOD(Redis, subscribe)
if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&tmp) == SUCCESS) {
type_response = Z_STRVAL_PP(tmp);
- if(strcmp(type_response, "subscribe") != 0) {
+ if(strcmp(type_response, sub_cmd) != 0) {
efree(tmp);
efree(z_tab);
RETURN_FALSE;
@@ -5385,42 +5382,66 @@ PHP_METHOD(Redis, subscribe)
/* Multibulk Response, format : {message type, originating channel, message payload} */
while(1) {
/* call the callback with this z_tab in argument */
- zval **type, **channel, **data;
+ zval **type, **channel, **pattern, **data;
z_tab = redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock);
+ int is_pmsg, tab_idx = 1;
if(z_tab == NULL || Z_TYPE_P(z_tab) != IS_ARRAY) {
//ERROR
break;
}
- if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE) {
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&type) == FAILURE || Z_TYPE_PP(type) != IS_STRING) {
break;
}
- if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, (void**)&channel) == FAILURE) {
+
+ // Make sure we have a message or pmessage
+ if(!strncmp(Z_STRVAL_PP(type), "message", 7) || !strncmp(Z_STRVAL_PP(type), "pmessage", 8)) {
+ // Is this a pmessage
+ is_pmsg = *Z_STRVAL_PP(type) == 'p';
+ } else {
+ continue; // It's not a message or pmessage
+ }
+
+ // If this is a pmessage, we'll want to extract the pattern first
+ if(is_pmsg) {
+ // Extract pattern
+ if(zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&pattern) == FAILURE) {
+ break;
+ }
+ }
+
+ // Extract channel and data
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&channel) == FAILURE) {
break;
- }
- if (zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&data) == FAILURE) {
+ }
+ if (zend_hash_index_find(Z_ARRVAL_P(z_tab), tab_idx++, (void**)&data) == FAILURE) {
break;
}
- if(Z_TYPE_PP(type) == IS_STRING && strncmp(Z_STRVAL_PP(type), "message", 7) != 0) {
- continue; /* only forwarding published messages */
- }
-
+ // Always pass the Redis object through
z_args[0] = getThis();
- z_args[1] = *channel;
- z_args[2] = *data;
+
+ // Set up our callback args depending on the message type
+ if(is_pmsg) {
+ z_args[1] = *pattern;
+ z_args[2] = *channel;
+ z_args[3] = *data;
+ } else {
+ z_args[1] = *channel;
+ z_args[2] = *data;
+ }
switch(callback_type) {
case R_SUB_CALLBACK_CLASS_TYPE:
MAKE_STD_ZVAL(z_ret);
- call_user_function(&redis_ce->function_table, &z_o, z_fun, z_ret, 3, z_args TSRMLS_CC);
+ call_user_function(&redis_ce->function_table, &z_o, z_fun, z_ret, tab_idx, z_args TSRMLS_CC);
efree(z_ret);
break;
case R_SUB_CALLBACK_FT_TYPE:
MAKE_STD_ZVAL(z_ret);
- call_user_function(EG(function_table), NULL, z_fun, z_ret, 3, z_args TSRMLS_CC);
+ call_user_function(EG(function_table), NULL, z_fun, z_ret, tab_idx, z_args TSRMLS_CC);
efree(z_ret);
break;
}
@@ -5433,9 +5454,22 @@ PHP_METHOD(Redis, subscribe)
efree(z_fun);
}
+/* {{{ proto void Redis::psubscribe(Array(channel1, channel2, ... channelN))
+ */
+PHP_METHOD(Redis, psubscribe)
+{
+ generic_subscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "psubscribe");
+}
+
+/* {{{ proto void Redis::psubscribe(Array(channel1, channel2, ... channelN))
+ */
+PHP_METHOD(Redis, subscribe) {
+ generic_subscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "subscribe");
+}
+
/**
- * unsubscribe channel_0 channel_1 ... channel_n
- * unsubscribe(array(channel_0, channel_1, ..., channel_n))
+ * [p]unsubscribe channel_0 channel_1 ... channel_n
+ * [p]unsubscribe(array(channel_0, channel_1, ..., channel_n))
* response format :
* array(
* channel_0 => TRUE|FALSE,
@@ -5445,7 +5479,7 @@ PHP_METHOD(Redis, subscribe)
* );
**/
-PHP_METHOD(Redis, unsubscribe)
+PHPAPI void generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, char *unsub_cmd)
{
zval *object, *array, **data;
HashTable *arr_hash;
@@ -5490,7 +5524,7 @@ PHP_METHOD(Redis, unsubscribe)
}
old_cmd = cmd;
- cmd_len = spprintf(&cmd, 0, "UNSUBSCRIBE %s\r\n", cmd);
+ cmd_len = spprintf(&cmd, 0, "%s %s\r\n", unsub_cmd, cmd);
efree(old_cmd);
if (redis_sock_write(redis_sock, cmd, cmd_len TSRMLS_CC) < 0) {
@@ -5520,6 +5554,16 @@ PHP_METHOD(Redis, unsubscribe)
}
}
+PHP_METHOD(Redis, unsubscribe)
+{
+ generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "UNSUBSCRIBE");
+}
+
+PHP_METHOD(Redis, punsubscribe)
+{
+ generic_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, "PUNSUBSCRIBE");
+}
+
/* {{{ proto string Redis::bgrewriteaof()
*/
PHP_METHOD(Redis, bgrewriteaof)