diff options
-rw-r--r-- | cluster_library.c | 65 | ||||
-rw-r--r-- | cluster_library.h | 7 | ||||
-rw-r--r-- | redis_cluster.c | 59 | ||||
-rw-r--r-- | redis_cluster.h | 2 | ||||
-rw-r--r-- | redis_commands.c | 46 | ||||
-rw-r--r-- | redis_commands.h | 3 |
6 files changed, 175 insertions, 7 deletions
diff --git a/cluster_library.c b/cluster_library.c index e4d7ae9a..86cec935 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -1371,7 +1371,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, // Consume each MULTI BULK response (one per channel/pattern) while(sctx->argc--) { - z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, pull, mbulk_resp_loop_raw); if(!z_tab) { @@ -1397,6 +1397,9 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, sctx->cb.params = z_args; sctx->cb.no_separation = 0; + /* We're in a subscribe loop */ + c->subscribed_slot = c->reply_slot; + /* Multibulk response, {[pattern], type, channel, payload} */ while(1) { /* Arguments */ @@ -1404,7 +1407,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, int tab_idx=1, is_pmsg; // Get the next subscribe response - z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, mbulk_resp_loop); if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&z_type) @@ -1465,7 +1468,10 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, zval_dtor(z_tab); efree(z_tab); } - + + // We're no longer subscribing, due to an error + c->subscribed_slot = -1; + // Cleanup efree(sctx); if(z_tab) { @@ -1477,6 +1483,55 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, RETURN_FALSE; } +/* UNSUBSCRIBE/PUNSUBSCRIBE */ +PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, void *ctx) +{ + subscribeContext *sctx = (subscribeContext*)ctx; + zval *z_tab, **z_chan, **z_flag; + char *flag; + int pull = 0; + + array_init(return_value); + + // Consume each response + while(sctx->argc--) { + z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, + c, pull, mbulk_resp_loop_raw); + + // Fail if we didn't get an array or can't find index 1 + if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, + (void**)&z_chan)==FAILURE) + { + zval_dtor(return_value); + RETURN_FALSE; + } + + // Find the flag for this channel/pattern + if(zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&z_flag) + ==FAILURE) + { + zval_dtor(return_value); + RETURN_FALSE; + } + + // Sanity check + if(Z_STRLEN_PP(z_flag) != 2) { + zval_dtor(return_value); + RETURN_FALSE; + } + + // Redis will give us either :1 or :0 here + char *flag = Z_STRVAL_PP(z_flag); + + // Add result + add_assoc_bool(return_value, Z_STRVAL_PP(z_chan), flag[1]=='1'); + + efree(z_tab); + pull = 1; + } +} + /* Generic MULTI BULK response processor */ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, mbulk_cb cb, void *ctx) @@ -1512,8 +1567,8 @@ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, } /* MULTI BULK response loop where we might pull the next one */ -PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, - redisCluster *c, int pull, mbulk_cb cb) +PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, int pull, mbulk_cb cb) { zval *z_result; diff --git a/cluster_library.h b/cluster_library.h index e311521a..05491799 100644 --- a/cluster_library.h +++ b/cluster_library.h @@ -224,6 +224,9 @@ typedef struct redisCluster { /* The slot where we should read replies */ short reply_slot; + /* The slot where we're subscribed */ + short subscribed_slot; + /* One RedisSock* struct for serialization and prefix information */ RedisSock *flags; @@ -337,6 +340,8 @@ PHPAPI void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); +PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx); /* MULTI BULK response functions */ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, @@ -353,7 +358,7 @@ PHPAPI void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, +PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, int pull, mbulk_cb cb); /* MULTI BULK processing callbacks */ diff --git a/redis_cluster.c b/redis_cluster.c index bd0c3b6d..6f055804 100644 --- a/redis_cluster.c +++ b/redis_cluster.c @@ -141,7 +141,8 @@ zend_function_entry redis_cluster_functions[] = { PHP_ME(RedisCluster, object, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, subscribe, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, psubscribe, NULL, ZEND_ACC_PUBLIC) - + PHP_ME(RedisCluster, unsubscribe, NULL, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, punsubscribe, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, getoption, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, setoption, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, _prefix, NULL, ZEND_ACC_PUBLIC) @@ -204,6 +205,9 @@ create_cluster_context(zend_class_entry *class_type TSRMLS_DC) { cluster = emalloc(sizeof(redisCluster)); memset(cluster, 0, sizeof(redisCluster)); + // We're not currently subscribed anywhere + cluster->subscribed_slot = -1; + // Allocate our RedisSock we'll use to store prefix/serialization flags cluster->flags = ecalloc(1, sizeof(RedisSock)); @@ -1068,6 +1072,59 @@ PHP_METHOD(RedisCluster, psubscribe) { } /* }}} */ +static void generic_unsub_cmd(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + char *kw) +{ + char *cmd; + int cmd_len; + void *ctx; + short slot; + + // There is not reason to unsubscribe outside of a subscribe loop + if(c->subscribed_slot == -1) { + php_error_docref(0 TSRMLS_CC, E_WARNING, + "You can't unsubscribe outside of a subscribe loop"); + RETURN_FALSE; + } + + // Call directly because we're going to set the slot manually + if(redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, c->flags, kw, + &cmd, &cmd_len, &slot, &ctx) + ==FAILURE) + { + RETURN_FALSE; + } + + // This has to operate on our subscribe slot + if(cluster_send_slot(c, c->subscribed_slot, cmd, cmd_len, TYPE_MULTIBULK) + ==FAILURE) + { + zend_throw_exception(redis_cluster_exception_ce, + "Failed to UNSUBSCRIBE within our subscribe loop!", 0 TSRMLS_CC); + RETURN_FALSE; + } + + // Now process response from the slot we're subscribed on + cluster_unsub_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, ctx); + + // Cleanup our command + efree(cmd); +} + +/* {{{ proto array RedisCluster::unsubscribe(array chans) */ +PHP_METHOD(RedisCluster, unsubscribe) { + generic_unsub_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, GET_CONTEXT(), + "UNSUBSCRIBE"); +} +/* }}} */ + +/* {{{ proto array RedisCluster::punsubscribe(array pats) */ +PHP_METHOD(RedisCluster, punsubscribe) { + generic_unsub_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, GET_CONTEXT(), + "PUNSUBSCRIBE"); +} +/* }}} */ + /* Commands that do not interact with Redis, but just report stuff about * various options, etc */ diff --git a/redis_cluster.h b/redis_cluster.h index 515b243a..ead57982 100644 --- a/redis_cluster.h +++ b/redis_cluster.h @@ -224,6 +224,8 @@ PHP_METHOD(RedisCluster, sort); PHP_METHOD(RedisCluster, object); PHP_METHOD(RedisCluster, subscribe); PHP_METHOD(RedisCluster, psubscribe); +PHP_METHOD(RedisCluster, unsubscribe); +PHP_METHOD(RedisCluster, punsubscribe); /* Transactions */ PHP_METHOD(RedisCluster, multi); diff --git a/redis_commands.c b/redis_commands.c index b8966dd7..6e5e1311 100644 --- a/redis_commands.c +++ b/redis_commands.c @@ -718,6 +718,52 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return SUCCESS; } +/* UNSUBSCRIBE/PUNSUBSCRIBE */ +int redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + zval *z_arr, **z_chan; + HashTable *ht_arr; + HashPosition ptr; + smart_str cmdstr = {0}; + subscribeContext *sctx = emalloc(sizeof(subscribeContext)); + + if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &z_arr)==FAILURE) { + efree(sctx); + return FAILURE; + } + + ht_arr = Z_ARRVAL_P(z_arr); + + sctx->argc = zend_hash_num_elements(ht_arr); + if(sctx->argc == 0) { + efree(sctx); + return FAILURE; + } + + redis_cmd_init_sstr(&cmdstr, sctx->argc, kw, strlen(kw)); + + for(zend_hash_internal_pointer_reset_ex(ht_arr, &ptr); + zend_hash_get_current_data_ex(ht_arr, (void**)&z_chan, &ptr)==SUCCESS; + zend_hash_move_forward_ex(ht_arr, &ptr)) + { + char *key = Z_STRVAL_PP(z_chan); + int key_len = Z_STRLEN_PP(z_chan), key_free; + + key_free = redis_key_prefix(redis_sock, &key, &key_len); + redis_cmd_append_sstr(&cmdstr, key, key_len); + if(key_free) efree(key); + } + + // Push out vals + *cmd_len = cmdstr.len; + *cmd = cmdstr.c; + *ctx = (void*)sctx; + + return SUCCESS; +} + /* Commands that take a key followed by a variable list of serializable * values (RPUSH, LPUSH, SADD, SREM, etc...) */ int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, diff --git a/redis_commands.h b/redis_commands.h index ef1a80b8..bb04d76b 100644 --- a/redis_commands.h +++ b/redis_commands.h @@ -82,6 +82,9 @@ int redis_zinter_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); +int redis_unsubscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); + /* Commands which need a unique construction mechanism. This is either because * they don't share a signature with any other command, or because there is * specific processing we do (e.g. verifying subarguments) that make them |