Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormichael-grunder <michael.grunder@gmail.com>2014-06-14 20:30:18 +0400
committermichael-grunder <michael.grunder@gmail.com>2015-05-06 01:00:51 +0300
commit5daa5a2f045c43de88dcd5790be96b37d89b42eb (patch)
tree2475bd6af4bdbc0189049047e71712ae1744b677
parent32d1f407d863958d137dfe94647f2c59e9b189c7 (diff)
UNSUBSCRIBE/PUNSUBSCRIBE
Implemented the two unsubscribe commands in Redis Cluster. Presently, like with the standard Redis class once you subscribe you're there for good, but it would be nice to be able to use the callback return value to break out of the subscribe loop.
-rw-r--r--cluster_library.c65
-rw-r--r--cluster_library.h7
-rw-r--r--redis_cluster.c59
-rw-r--r--redis_cluster.h2
-rw-r--r--redis_commands.c46
-rw-r--r--redis_commands.h3
6 files changed, 175 insertions, 7 deletions
diff --git a/cluster_library.c b/cluster_library.c
index e4d7ae9a..86cec935 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -1371,7 +1371,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
// Consume each MULTI BULK response (one per channel/pattern)
while(sctx->argc--) {
- z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
+ z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
pull, mbulk_resp_loop_raw);
if(!z_tab) {
@@ -1397,6 +1397,9 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
sctx->cb.params = z_args;
sctx->cb.no_separation = 0;
+ /* We're in a subscribe loop */
+ c->subscribed_slot = c->reply_slot;
+
/* Multibulk response, {[pattern], type, channel, payload} */
while(1) {
/* Arguments */
@@ -1404,7 +1407,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
int tab_idx=1, is_pmsg;
// Get the next subscribe response
- z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
+ z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
1, mbulk_resp_loop);
if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&z_type)
@@ -1465,7 +1468,10 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
zval_dtor(z_tab);
efree(z_tab);
}
-
+
+ // We're no longer subscribing, due to an error
+ c->subscribed_slot = -1;
+
// Cleanup
efree(sctx);
if(z_tab) {
@@ -1477,6 +1483,55 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
RETURN_FALSE;
}
+/* UNSUBSCRIBE/PUNSUBSCRIBE */
+PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, void *ctx)
+{
+ subscribeContext *sctx = (subscribeContext*)ctx;
+ zval *z_tab, **z_chan, **z_flag;
+ char *flag;
+ int pull = 0;
+
+ array_init(return_value);
+
+ // Consume each response
+ while(sctx->argc--) {
+ z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU,
+ c, pull, mbulk_resp_loop_raw);
+
+ // Fail if we didn't get an array or can't find index 1
+ if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 1,
+ (void**)&z_chan)==FAILURE)
+ {
+ zval_dtor(return_value);
+ RETURN_FALSE;
+ }
+
+ // Find the flag for this channel/pattern
+ if(zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&z_flag)
+ ==FAILURE)
+ {
+ zval_dtor(return_value);
+ RETURN_FALSE;
+ }
+
+ // Sanity check
+ if(Z_STRLEN_PP(z_flag) != 2) {
+ zval_dtor(return_value);
+ RETURN_FALSE;
+ }
+
+ // Redis will give us either :1 or :0 here
+ char *flag = Z_STRVAL_PP(z_flag);
+
+ // Add result
+ add_assoc_bool(return_value, Z_STRVAL_PP(z_chan), flag[1]=='1');
+
+ efree(z_tab);
+ pull = 1;
+ }
+}
+
/* Generic MULTI BULK response processor */
PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, mbulk_cb cb, void *ctx)
@@ -1512,8 +1567,8 @@ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
}
/* MULTI BULK response loop where we might pull the next one */
-PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
- redisCluster *c, int pull, mbulk_cb cb)
+PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, int pull, mbulk_cb cb)
{
zval *z_result;
diff --git a/cluster_library.h b/cluster_library.h
index e311521a..05491799 100644
--- a/cluster_library.h
+++ b/cluster_library.h
@@ -224,6 +224,9 @@ typedef struct redisCluster {
/* The slot where we should read replies */
short reply_slot;
+ /* The slot where we're subscribed */
+ short subscribed_slot;
+
/* One RedisSock* struct for serialization and prefix information */
RedisSock *flags;
@@ -337,6 +340,8 @@ PHPAPI void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
+PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx);
/* MULTI BULK response functions */
PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
@@ -353,7 +358,7 @@ PHPAPI void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
-PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, int pull, mbulk_cb cb);
/* MULTI BULK processing callbacks */
diff --git a/redis_cluster.c b/redis_cluster.c
index bd0c3b6d..6f055804 100644
--- a/redis_cluster.c
+++ b/redis_cluster.c
@@ -141,7 +141,8 @@ zend_function_entry redis_cluster_functions[] = {
PHP_ME(RedisCluster, object, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, subscribe, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, psubscribe, NULL, ZEND_ACC_PUBLIC)
-
+ PHP_ME(RedisCluster, unsubscribe, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, punsubscribe, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, getoption, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, setoption, NULL, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, _prefix, NULL, ZEND_ACC_PUBLIC)
@@ -204,6 +205,9 @@ create_cluster_context(zend_class_entry *class_type TSRMLS_DC) {
cluster = emalloc(sizeof(redisCluster));
memset(cluster, 0, sizeof(redisCluster));
+ // We're not currently subscribed anywhere
+ cluster->subscribed_slot = -1;
+
// Allocate our RedisSock we'll use to store prefix/serialization flags
cluster->flags = ecalloc(1, sizeof(RedisSock));
@@ -1068,6 +1072,59 @@ PHP_METHOD(RedisCluster, psubscribe) {
}
/* }}} */
+static void generic_unsub_cmd(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ char *kw)
+{
+ char *cmd;
+ int cmd_len;
+ void *ctx;
+ short slot;
+
+ // There is not reason to unsubscribe outside of a subscribe loop
+ if(c->subscribed_slot == -1) {
+ php_error_docref(0 TSRMLS_CC, E_WARNING,
+ "You can't unsubscribe outside of a subscribe loop");
+ RETURN_FALSE;
+ }
+
+ // Call directly because we're going to set the slot manually
+ if(redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, c->flags, kw,
+ &cmd, &cmd_len, &slot, &ctx)
+ ==FAILURE)
+ {
+ RETURN_FALSE;
+ }
+
+ // This has to operate on our subscribe slot
+ if(cluster_send_slot(c, c->subscribed_slot, cmd, cmd_len, TYPE_MULTIBULK)
+ ==FAILURE)
+ {
+ zend_throw_exception(redis_cluster_exception_ce,
+ "Failed to UNSUBSCRIBE within our subscribe loop!", 0 TSRMLS_CC);
+ RETURN_FALSE;
+ }
+
+ // Now process response from the slot we're subscribed on
+ cluster_unsub_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx);
+
+ // Cleanup our command
+ efree(cmd);
+}
+
+/* {{{ proto array RedisCluster::unsubscribe(array chans) */
+PHP_METHOD(RedisCluster, unsubscribe) {
+ generic_unsub_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, GET_CONTEXT(),
+ "UNSUBSCRIBE");
+}
+/* }}} */
+
+/* {{{ proto array RedisCluster::punsubscribe(array pats) */
+PHP_METHOD(RedisCluster, punsubscribe) {
+ generic_unsub_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, GET_CONTEXT(),
+ "PUNSUBSCRIBE");
+}
+/* }}} */
+
/* Commands that do not interact with Redis, but just report stuff about
* various options, etc */
diff --git a/redis_cluster.h b/redis_cluster.h
index 515b243a..ead57982 100644
--- a/redis_cluster.h
+++ b/redis_cluster.h
@@ -224,6 +224,8 @@ PHP_METHOD(RedisCluster, sort);
PHP_METHOD(RedisCluster, object);
PHP_METHOD(RedisCluster, subscribe);
PHP_METHOD(RedisCluster, psubscribe);
+PHP_METHOD(RedisCluster, unsubscribe);
+PHP_METHOD(RedisCluster, punsubscribe);
/* Transactions */
PHP_METHOD(RedisCluster, multi);
diff --git a/redis_commands.c b/redis_commands.c
index b8966dd7..6e5e1311 100644
--- a/redis_commands.c
+++ b/redis_commands.c
@@ -718,6 +718,52 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
+/* UNSUBSCRIBE/PUNSUBSCRIBE */
+int redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ zval *z_arr, **z_chan;
+ HashTable *ht_arr;
+ HashPosition ptr;
+ smart_str cmdstr = {0};
+ subscribeContext *sctx = emalloc(sizeof(subscribeContext));
+
+ if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &z_arr)==FAILURE) {
+ efree(sctx);
+ return FAILURE;
+ }
+
+ ht_arr = Z_ARRVAL_P(z_arr);
+
+ sctx->argc = zend_hash_num_elements(ht_arr);
+ if(sctx->argc == 0) {
+ efree(sctx);
+ return FAILURE;
+ }
+
+ redis_cmd_init_sstr(&cmdstr, sctx->argc, kw, strlen(kw));
+
+ for(zend_hash_internal_pointer_reset_ex(ht_arr, &ptr);
+ zend_hash_get_current_data_ex(ht_arr, (void**)&z_chan, &ptr)==SUCCESS;
+ zend_hash_move_forward_ex(ht_arr, &ptr))
+ {
+ char *key = Z_STRVAL_PP(z_chan);
+ int key_len = Z_STRLEN_PP(z_chan), key_free;
+
+ key_free = redis_key_prefix(redis_sock, &key, &key_len);
+ redis_cmd_append_sstr(&cmdstr, key, key_len);
+ if(key_free) efree(key);
+ }
+
+ // Push out vals
+ *cmd_len = cmdstr.len;
+ *cmd = cmdstr.c;
+ *ctx = (void*)sctx;
+
+ return SUCCESS;
+}
+
/* Commands that take a key followed by a variable list of serializable
* values (RPUSH, LPUSH, SADD, SREM, etc...) */
int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
diff --git a/redis_commands.h b/redis_commands.h
index ef1a80b8..bb04d76b 100644
--- a/redis_commands.h
+++ b/redis_commands.h
@@ -82,6 +82,9 @@ int redis_zinter_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+int redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+
/* Commands which need a unique construction mechanism. This is either because
* they don't share a signature with any other command, or because there is
* specific processing we do (e.g. verifying subarguments) that make them