Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cluster_library.c65
-rw-r--r--cluster_library.h7
-rw-r--r--redis_cluster.c59
-rw-r--r--redis_cluster.h2
-rw-r--r--redis_commands.c46
-rw-r--r--redis_commands.h3
6 files changed, 175 insertions, 7 deletions
diff --git a/cluster_library.c b/cluster_library.c
index e4d7ae9a..86cec935 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -1371,7 +1371,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
// Consume each MULTI BULK response (one per channel/pattern)
while(sctx->argc--) {
- z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
+ z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
pull, mbulk_resp_loop_raw);
if(!z_tab) {
@@ -1397,6 +1397,9 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
sctx->cb.params = z_args;
sctx->cb.no_separation = 0;
+ /* We're in a subscribe loop */
+ c->subscribed_slot = c->reply_slot;
+
/* Multibulk response, {[pattern], type, channel, payload} */
while(1) {
/* Arguments */
@@ -1404,7 +1407,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
int tab_idx=1, is_pmsg;
// Get the next subscribe response
- z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
+ z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
1, mbulk_resp_loop);
if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&z_type)
@@ -1465,7 +1468,10 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
zval_dtor(z_tab);
efree(z_tab);
}
-
+
+ // We're no longer subscribing, due to an error
+ c->subscribed_slot = -1;
+
// Cleanup
efree(sctx);
if(z_tab) {
@@ -1477,6 +1483,55 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
RETURN_FALSE;
}
+/* UNSUBSCRIBE/PUNSUBSCRIBE */
+PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, void *ctx)
+{
+ subscribeContext *sctx = (subscribeContext*)ctx;
+ zval *z_tab, **z_chan, **z_flag;
+ char *flag;
+ int pull = 0;
+
+ array_init(return_value);
+
+ // Consume each response
+ while(sctx->argc--) {
+ z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,
+ c, pull, mbulk_resp_loop_raw);
+
+ // Fail if we didn't get an array or can't find index 1
+ if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 1,
+ (void**)&z_chan)==FAILURE)
+ {
+ zval_dtor(return_value);
+ RETURN_FALSE;
+ }
+
+ // Find the flag for this channel/pattern
+ if(zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&z_flag)
+ ==FAILURE)
+ {
+ zval_dtor(return_value);
+ RETURN_FALSE;
+ }
+
+ // Sanity check
+ if(Z_STRLEN_PP(z_flag) != 2) {
+ zval_dtor(return_value);
+ RETURN_FALSE;
+ }
+
+ // Redis will give us either :1 or :0 here
+ char *flag = Z_STRVAL_PP(z_flag);
+
+ // Add result
+ add_assoc_bool(return_value, Z_STRVAL_PP(z_chan), flag[1]=='1');
+
+ efree(z_tab);
+ pull = 1;
+ }
+}
+
/* Generic MULTI BULK response processor */
PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, mbulk_cb cb, void *ctx)
@@ -1512,8 +1567,8 @@ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
}
/* MULTI BULK response loop where we might pull the next one */
-PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
- redisCluster *c, int pull, mbulk_cb cb)
+PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, int pull, mbulk_cb cb)
{
zval *z_result;
diff --git a/cluster_library.h b/cluster_library.h
index e311521a..05491799 100644
--- a/cluster_library.h
+++ b/cluster_library.h
@@ -224,6 +224,9 @@ typedef struct redisCluster {
/* The slot where we should read replies */
short reply_slot;
+ /* The slot where we're subscribed */
+ short subscribed_slot;
+
/* One RedisSock* struct for serialization and prefix information */
RedisSock *flags;
@@ -337,6 +340,8 @@ PHPAPI void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
+PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx);
/* MULTI BULK response functions */
PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
@@ -353,7 +358,7 @@ PHPAPI void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
-PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, int pull, mbulk_cb cb);
/* MULTI BULK processing callbacks */
diff --git a/redis_cluster.c b/redis_cluster.c
index bd0c3b6d..6f055804 100644
--- a/redis_cluster.c
+++ b/redis_cluster.c
@@ -141,7 +141,8 @@ zend_function_entry redis_cluster_functions[] = {
PHP_ME(RedisCluster, object, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, subscribe, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, psubscribe, NULL, ZEND_ACC_PUBLIC)
-
+ PHP_ME(RedisCluster, unsubscribe, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, punsubscribe, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, getoption, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, setoption, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, _prefix, NULL, ZEND_ACC_PUBLIC)
@@ -204,6 +205,9 @@ create_cluster_context(zend_class_entry *class_type TSRMLS_DC) {
cluster = emalloc(sizeof(redisCluster));
memset(cluster, 0, sizeof(redisCluster));
+ // We're not currently subscribed anywhere
+ cluster->subscribed_slot = -1;
+
// Allocate our RedisSock we'll use to store prefix/serialization flags
cluster->flags = ecalloc(1, sizeof(RedisSock));
@@ -1068,6 +1072,59 @@ PHP_METHOD(RedisCluster, psubscribe) {
}
/* }}} */
+static void generic_unsub_cmd(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ char *kw)
+{
+ char *cmd;
+ int cmd_len;
+ void *ctx;
+ short slot;
+
+ // There is not reason to unsubscribe outside of a subscribe loop
+ if(c->subscribed_slot == -1) {
+ php_error_docref(0 TSRMLS_CC, E_WARNING,
+ "You can't unsubscribe outside of a subscribe loop");
+ RETURN_FALSE;
+ }
+
+ // Call directly because we're going to set the slot manually
+ if(redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, c->flags, kw,
+ &cmd, &cmd_len, &slot, &ctx)
+ ==FAILURE)
+ {
+ RETURN_FALSE;
+ }
+
+ // This has to operate on our subscribe slot
+ if(cluster_send_slot(c, c->subscribed_slot, cmd, cmd_len, TYPE_MULTIBULK)
+ ==FAILURE)
+ {
+ zend_throw_exception(redis_cluster_exception_ce,
+ "Failed to UNSUBSCRIBE within our subscribe loop!", 0 TSRMLS_CC);
+ RETURN_FALSE;
+ }
+
+ // Now process response from the slot we're subscribed on
+ cluster_unsub_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx);
+
+ // Cleanup our command
+ efree(cmd);
+}
+
+/* {{{ proto array RedisCluster::unsubscribe(array chans) */
+PHP_METHOD(RedisCluster, unsubscribe) {
+ generic_unsub_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, GET_CONTEXT(),
+ "UNSUBSCRIBE");
+}
+/* }}} */
+
+/* {{{ proto array RedisCluster::punsubscribe(array pats) */
+PHP_METHOD(RedisCluster, punsubscribe) {
+ generic_unsub_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, GET_CONTEXT(),
+ "PUNSUBSCRIBE");
+}
+/* }}} */
+
/* Commands that do not interact with Redis, but just report stuff about
* various options, etc */
diff --git a/redis_cluster.h b/redis_cluster.h
index 515b243a..ead57982 100644
--- a/redis_cluster.h
+++ b/redis_cluster.h
@@ -224,6 +224,8 @@ PHP_METHOD(RedisCluster, sort);
PHP_METHOD(RedisCluster, object);
PHP_METHOD(RedisCluster, subscribe);
PHP_METHOD(RedisCluster, psubscribe);
+PHP_METHOD(RedisCluster, unsubscribe);
+PHP_METHOD(RedisCluster, punsubscribe);
/* Transactions */
PHP_METHOD(RedisCluster, multi);
diff --git a/redis_commands.c b/redis_commands.c
index b8966dd7..6e5e1311 100644
--- a/redis_commands.c
+++ b/redis_commands.c
@@ -718,6 +718,52 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
+/* UNSUBSCRIBE/PUNSUBSCRIBE */
+int redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ zval *z_arr, **z_chan;
+ HashTable *ht_arr;
+ HashPosition ptr;
+ smart_str cmdstr = {0};
+ subscribeContext *sctx = emalloc(sizeof(subscribeContext));
+
+ if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &z_arr)==FAILURE) {
+ efree(sctx);
+ return FAILURE;
+ }
+
+ ht_arr = Z_ARRVAL_P(z_arr);
+
+ sctx->argc = zend_hash_num_elements(ht_arr);
+ if(sctx->argc == 0) {
+ efree(sctx);
+ return FAILURE;
+ }
+
+ redis_cmd_init_sstr(&cmdstr, sctx->argc, kw, strlen(kw));
+
+ for(zend_hash_internal_pointer_reset_ex(ht_arr, &ptr);
+ zend_hash_get_current_data_ex(ht_arr, (void**)&z_chan, &ptr)==SUCCESS;
+ zend_hash_move_forward_ex(ht_arr, &ptr))
+ {
+ char *key = Z_STRVAL_PP(z_chan);
+ int key_len = Z_STRLEN_PP(z_chan), key_free;
+
+ key_free = redis_key_prefix(redis_sock, &key, &key_len);
+ redis_cmd_append_sstr(&cmdstr, key, key_len);
+ if(key_free) efree(key);
+ }
+
+ // Push out vals
+ *cmd_len = cmdstr.len;
+ *cmd = cmdstr.c;
+ *ctx = (void*)sctx;
+
+ return SUCCESS;
+}
+
/* Commands that take a key followed by a variable list of serializable
* values (RPUSH, LPUSH, SADD, SREM, etc...) */
int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
diff --git a/redis_commands.h b/redis_commands.h
index ef1a80b8..bb04d76b 100644
--- a/redis_commands.h
+++ b/redis_commands.h
@@ -82,6 +82,9 @@ int redis_zinter_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+int redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+
/* Commands which need a unique construction mechanism. This is either because
* they don't share a signature with any other command, or because there is
* specific processing we do (e.g. verifying subarguments) that make them