diff options
Diffstat (limited to 'cluster_library.c')
-rw-r--r-- | cluster_library.c | 65 |
1 files changed, 60 insertions, 5 deletions
diff --git a/cluster_library.c b/cluster_library.c index e4d7ae9a..86cec935 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -1371,7 +1371,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, // Consume each MULTI BULK response (one per channel/pattern) while(sctx->argc--) { - z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, pull, mbulk_resp_loop_raw); if(!z_tab) { @@ -1397,6 +1397,9 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, sctx->cb.params = z_args; sctx->cb.no_separation = 0; + /* We're in a subscribe loop */ + c->subscribed_slot = c->reply_slot; + /* Multibulk response, {[pattern], type, channel, payload} */ while(1) { /* Arguments */ @@ -1404,7 +1407,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, int tab_idx=1, is_pmsg; // Get the next subscribe response - z_tab = cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, mbulk_resp_loop); if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 0, (void**)&z_type) @@ -1465,7 +1468,10 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, zval_dtor(z_tab); efree(z_tab); } - + + // We're no longer subscribing, due to an error + c->subscribed_slot = -1; + // Cleanup efree(sctx); if(z_tab) { @@ -1477,6 +1483,55 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, RETURN_FALSE; } +/* UNSUBSCRIBE/PUNSUBSCRIBE */ +PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, void *ctx) +{ + subscribeContext *sctx = (subscribeContext*)ctx; + zval *z_tab, **z_chan, **z_flag; + char *flag; + int pull = 0; + + array_init(return_value); + + // Consume each response + while(sctx->argc--) { + z_tab = cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, + c, pull, mbulk_resp_loop_raw); + + // Fail if we didn't get an array or can't find index 1 + if(!z_tab || zend_hash_index_find(Z_ARRVAL_P(z_tab), 1, + (void**)&z_chan)==FAILURE) + { + zval_dtor(return_value); + RETURN_FALSE; + } + + // Find the flag for this channel/pattern + if(zend_hash_index_find(Z_ARRVAL_P(z_tab), 2, (void**)&z_flag) + ==FAILURE) + { + zval_dtor(return_value); + RETURN_FALSE; + } + + // Sanity check + if(Z_STRLEN_PP(z_flag) != 2) { + zval_dtor(return_value); + RETURN_FALSE; + } + + // Redis will give us either :1 or :0 here + char *flag = Z_STRVAL_PP(z_flag); + + // Add result + add_assoc_bool(return_value, Z_STRVAL_PP(z_chan), flag[1]=='1'); + + efree(z_tab); + pull = 1; + } +} + /* Generic MULTI BULK response processor */ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, mbulk_cb cb, void *ctx) @@ -1512,8 +1567,8 @@ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, } /* MULTI BULK response loop where we might pull the next one */ -PHPAPI zval *cluster_sub_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, - redisCluster *c, int pull, mbulk_cb cb) +PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, int pull, mbulk_cb cb) { zval *z_result; |