diff options
author | Pavlo Yatsukhnenko <yatsukhnenko@gmail.com> | 2022-06-26 13:10:56 +0300 |
---|---|---|
committer | Pavlo Yatsukhnenko <yatsukhnenko@gmail.com> | 2022-06-28 18:18:38 +0300 |
commit | 3c9e159c7e700a48aff29b83374039c3bdf1e909 (patch) | |
tree | 2c3271ab868ca18b35e02afdd904d512474a8c62 | |
parent | aaa4c91a7640d2fc0f3fea203dcc64d42a17b2cf (diff) |
Refactor subscribe/unsubscribepubsub
-rw-r--r-- | cluster_library.c | 10 | ||||
-rw-r--r-- | common.h | 2 | ||||
-rw-r--r-- | library.c | 134 | ||||
-rw-r--r-- | redis.stub.php | 8 | ||||
-rw-r--r-- | redis_arginfo.h | 17 | ||||
-rw-r--r-- | redis_commands.c | 2 | ||||
-rw-r--r-- | redis_commands.h | 8 | ||||
-rw-r--r-- | redis_legacy_arginfo.h | 19 |
8 files changed, 135 insertions, 65 deletions
diff --git a/cluster_library.c b/cluster_library.c index 976cb889..2512f0fd 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -1857,8 +1857,8 @@ PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster * // Set up our callback pointers zval z_ret, z_args[4]; - sctx->cb.retval = &z_ret; - sctx->cb.params = z_args; + sctx->cb.fci.retval = &z_ret; + sctx->cb.fci.params = z_args; /* We're in a subscribe loop */ c->subscribed_slot = c->cmd_slot; @@ -1911,12 +1911,10 @@ PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster * } // Set arg count - sctx->cb.param_count = tab_idx; + sctx->cb.fci.param_count = tab_idx; // Execute our callback - if (zend_call_function(&(sctx->cb), &(sctx->cb_cache)) != - SUCCESS) - { + if (zend_call_function(&sctx->cb.fci, &sctx->cb.fci_cache) != SUCCESS) { break; } @@ -289,7 +289,7 @@ typedef struct { int persistent; int watching; zend_string *persistent_id; - + HashTable *subs; redis_serializer serializer; int compression; int compression_level; @@ -438,58 +438,87 @@ redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, } } +static void +ht_free_subs(zval *data) +{ + efree(Z_PTR_P(data)); +} + PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx) { + HashTable *subs; + subscribeCallback *cb; subscribeContext *sctx = (subscribeContext*)ctx; zval *z_tmp, z_resp; + ALLOC_HASHTABLE(subs); + zend_hash_init(subs, 0, NULL, ht_free_subs, 0); // Consume response(s) from subscribe, which will vary on argc while(sctx->argc--) { + ZVAL_NULL(&z_resp); if (!redis_sock_read_multibulk_reply_zval( INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp) ) { - efree(sctx); - return -1; + goto error; } // We'll need to find the command response if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_resp), 0)) == NULL) { - zval_dtor(&z_resp); - efree(sctx); - return -1; + goto error; } // Make sure the command response matches the command we called if(strcasecmp(Z_STRVAL_P(z_tmp), sctx->kw) !=0) { - zval_dtor(&z_resp); - efree(sctx); - return -1; + goto error; } + if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_resp), 1)) == NULL) { + goto error; + } + + zend_hash_str_update_mem(subs, Z_STRVAL_P(z_tmp), Z_STRLEN_P(z_tmp), + &sctx->cb, sizeof(sctx->cb)); + zval_dtor(&z_resp); } - zval z_ret, z_args[4]; - sctx->cb.retval = &z_ret; - sctx->cb.params = z_args; + efree(sctx); + + if (redis_sock->subs) { + zend_string *zkey; + + ZEND_HASH_FOREACH_STR_KEY_PTR(subs, zkey, cb) { + zend_hash_update_mem(redis_sock->subs, zkey, cb, sizeof(*cb)); + } ZEND_HASH_FOREACH_END(); + zend_hash_destroy(subs); + efree(subs); + + RETVAL_TRUE; + return SUCCESS; + } + redis_sock->subs = subs; /* Multibulk response, {[pattern], type, channel, payload } */ - while(1) { - zval *z_type, *z_chan, *z_pat = NULL, *z_data; + while (redis_sock->subs) { + zval z_ret, z_args[4], *z_type, *z_chan, *z_pat = NULL, *z_data; HashTable *ht_tab; - int tab_idx=1, is_pmsg; + int tab_idx = 1, is_pmsg = 0; + ZVAL_NULL(&z_resp); if (!redis_sock_read_multibulk_reply_zval( - INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)) break; + INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp) + ) { + goto failure; + } ht_tab = Z_ARRVAL(z_resp); if ((z_type = zend_hash_index_find(ht_tab, 0)) == NULL || Z_TYPE_P(z_type) != IS_STRING ) { - break; + goto failure; } // Check for message or pmessage @@ -498,13 +527,14 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, { is_pmsg = *Z_STRVAL_P(z_type)=='p'; } else { - break; + zval_dtor(&z_resp); + continue; } // Extract pattern if it's a pmessage if(is_pmsg) { if ((z_pat = zend_hash_index_find(ht_tab, tab_idx++)) == NULL) { - break; + goto failure; } } @@ -512,7 +542,11 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, if ((z_chan = zend_hash_index_find(ht_tab, tab_idx++)) == NULL || (z_data = zend_hash_index_find(ht_tab, tab_idx++)) == NULL ) { - break; + goto failure; + } + + if ((cb = zend_hash_str_find_ptr(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan))) == NULL) { + goto failure; } // Different args for SUBSCRIBE and PSUBSCRIBE @@ -527,13 +561,13 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, } // Set arg count - sctx->cb.param_count = tab_idx; + cb->fci.param_count = tab_idx; + cb->fci.retval = &z_ret; + cb->fci.params = z_args; // Execute callback - if(zend_call_function(&(sctx->cb), &(sctx->cb_cache)) - ==FAILURE) - { - break; + if (zend_call_function(&cb->fci, &cb->fci_cache) != SUCCESS) { + goto failure; } // If we have a return value free it @@ -541,11 +575,18 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, zval_dtor(&z_resp); } + RETVAL_TRUE; + return SUCCESS; + // This is an error state, clean up - zval_dtor(&z_resp); +error: efree(sctx); - - return -1; + zend_hash_destroy(subs); + efree(subs); +failure: + zval_dtor(&z_resp); + RETVAL_FALSE; + return FAILURE; } PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS, @@ -553,31 +594,45 @@ PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS, void *ctx) { subscribeContext *sctx = (subscribeContext*)ctx; - zval *z_chan, zv, *z_ret = &zv, z_resp; - int i; + zval *z_chan, z_ret, z_resp; - array_init(z_ret); + array_init(&z_ret); - for (i = 0; i < sctx->argc; i++) { + while (sctx->argc--) { + ZVAL_NULL(&z_resp); if (!redis_sock_read_multibulk_reply_zval( INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp) || (z_chan = zend_hash_index_find(Z_ARRVAL(z_resp), 1)) == NULL ) { - zval_dtor(z_ret); - return -1; + efree(sctx); + zval_dtor(&z_resp); + zval_dtor(&z_ret); + RETVAL_FALSE; + return FAILURE; } - add_assoc_bool(z_ret, Z_STRVAL_P(z_chan), 1); + if (!redis_sock->subs || + !zend_hash_str_exists(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan)) + ) { + add_assoc_bool_ex(&z_ret, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan), 0); + } else { + zend_hash_str_del(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan)); + add_assoc_bool_ex(&z_ret, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan), 1); + } zval_dtor(&z_resp); } efree(sctx); - RETVAL_ZVAL(z_ret, 0, 1); + if (redis_sock->subs && !zend_hash_num_elements(redis_sock->subs)) { + zend_hash_destroy(redis_sock->subs); + efree(redis_sock->subs); + redis_sock->subs = NULL; + } - // Success - return 0; + RETVAL_ZVAL(&z_ret, 0, 1); + return SUCCESS; } PHP_REDIS_API zval * @@ -2870,6 +2925,11 @@ PHP_REDIS_API void redis_free_socket(RedisSock *redis_sock) if (redis_sock->host) { zend_string_release(redis_sock->host); } + if (redis_sock->subs) { + zend_hash_destroy(redis_sock->subs); + efree(redis_sock->subs); + redis_sock->subs = NULL; + } redis_sock_free_auth(redis_sock); free_reply_callbacks(redis_sock); efree(redis_sock); diff --git a/redis.stub.php b/redis.stub.php index f0f8a37a..93dd2730 100644 --- a/redis.stub.php +++ b/redis.stub.php @@ -319,7 +319,7 @@ public function persist(string $key): bool; /** @return bool|Redis */ public function psetex(string $key, int $expire, mixed $value); - public function psubscribe(array $patterns): void; + public function psubscribe(array $patterns, callable $cb): bool; public function pttl(string $key): int; @@ -327,7 +327,7 @@ public function persist(string $key): bool; public function pubsub(string $command, mixed $arg = null): mixed; - public function punsubscribe(array $patterns): array; + public function punsubscribe(array $patterns): bool|array; public function rPop(string $key, int $count = 0): bool|string|array; @@ -439,7 +439,7 @@ public function persist(string $key): bool; /** @return int|Redis */ public function strlen(string $key); - public function subscribe(string $channel, string ...$other_channels): array; + public function subscribe(array $channels, callable $cb): bool; public function swapdb(string $src, string $dst): bool; @@ -455,7 +455,7 @@ public function persist(string $key): bool; */ public function unlink(array|string $key, string ...$other_keys); - public function unsubscribe(string $channel, string ...$other_channels): array; + public function unsubscribe(array $channels): bool|array; /** @return bool|Redis */ public function unwatch(); diff --git a/redis_arginfo.h b/redis_arginfo.h index ee479548..ea660631 100644 --- a/redis_arginfo.h +++ b/redis_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 9671c30926e8d581a126833360b123c8ae2dd913 */ + * Stub hash: efcda1ed028d65d0b4848d32133dc0e32f17871f */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis___construct, 0, 0, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, options, IS_ARRAY, 0, "null") @@ -541,8 +541,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psetex, 0, 0, 3) ZEND_ARG_TYPE_INFO(0, value, IS_MIXED, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_psubscribe, 0, 1, IS_VOID, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_psubscribe, 0, 2, _IS_BOOL, 0) ZEND_ARG_TYPE_INFO(0, patterns, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, cb, IS_CALLABLE, 0) ZEND_END_ARG_INFO() #define arginfo_class_Redis_pttl arginfo_class_Redis_hLen @@ -557,7 +558,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_pubsub, 0, 1, IS_MIX ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, arg, IS_MIXED, 0, "null") ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_punsubscribe, 0, 1, IS_ARRAY, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_MASK_EX(arginfo_class_Redis_punsubscribe, 0, 1, MAY_BE_BOOL|MAY_BE_ARRAY) ZEND_ARG_TYPE_INFO(0, patterns, IS_ARRAY, 0) ZEND_END_ARG_INFO() @@ -732,9 +733,9 @@ ZEND_END_ARG_INFO() #define arginfo_class_Redis_strlen arginfo_class_Redis_decr -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_subscribe, 0, 1, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, channel, IS_STRING, 0) - ZEND_ARG_VARIADIC_TYPE_INFO(0, other_channels, IS_STRING, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_subscribe, 0, 2, _IS_BOOL, 0) + ZEND_ARG_TYPE_INFO(0, channels, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, cb, IS_CALLABLE, 0) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_swapdb, 0, 2, _IS_BOOL, 0) @@ -750,7 +751,9 @@ ZEND_END_ARG_INFO() #define arginfo_class_Redis_unlink arginfo_class_Redis_del -#define arginfo_class_Redis_unsubscribe arginfo_class_Redis_subscribe +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_MASK_EX(arginfo_class_Redis_unsubscribe, 0, 1, MAY_BE_BOOL|MAY_BE_ARRAY) + ZEND_ARG_TYPE_INFO(0, channels, IS_ARRAY, 0) +ZEND_END_ARG_INFO() #define arginfo_class_Redis_unwatch arginfo_class_Redis___destruct diff --git a/redis_commands.c b/redis_commands.c index 3b3fa917..aa7675af 100644 --- a/redis_commands.c +++ b/redis_commands.c @@ -1101,7 +1101,7 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *key; if (zend_parse_parameters(ZEND_NUM_ARGS(), "af", &z_arr, - &(sctx->cb), &(sctx->cb_cache)) == FAILURE) + &sctx->cb.fci, &sctx->cb.fci_cache) == FAILURE) { efree(sctx); return FAILURE; diff --git a/redis_commands.h b/redis_commands.h index dc88848e..e766e20d 100644 --- a/redis_commands.h +++ b/redis_commands.h @@ -14,11 +14,15 @@ if (slot) *slot = cluster_hash_key(key,key_len); /* Simple container so we can push subscribe context out */ +typedef struct { + zend_fcall_info fci; + zend_fcall_info_cache fci_cache; +} subscribeCallback; + typedef struct subscribeContext { char *kw; int argc; - zend_fcall_info cb; - zend_fcall_info_cache cb_cache; + subscribeCallback cb; } subscribeContext; /* Construct a raw command */ diff --git a/redis_legacy_arginfo.h b/redis_legacy_arginfo.h index 85cc8be1..622d61cb 100644 --- a/redis_legacy_arginfo.h +++ b/redis_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 9671c30926e8d581a126833360b123c8ae2dd913 */ + * Stub hash: efcda1ed028d65d0b4848d32133dc0e32f17871f */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis___construct, 0, 0, 0) ZEND_ARG_INFO(0, options) @@ -472,8 +472,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psetex, 0, 0, 3) ZEND_ARG_INFO(0, value) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psubscribe, 0, 0, 1) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psubscribe, 0, 0, 2) ZEND_ARG_INFO(0, patterns) + ZEND_ARG_INFO(0, cb) ZEND_END_ARG_INFO() #define arginfo_class_Redis_pttl arginfo_class_Redis__prefix @@ -488,7 +489,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_pubsub, 0, 0, 1) ZEND_ARG_INFO(0, arg) ZEND_END_ARG_INFO() -#define arginfo_class_Redis_punsubscribe arginfo_class_Redis_psubscribe +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_punsubscribe, 0, 0, 1) + ZEND_ARG_INFO(0, patterns) +ZEND_END_ARG_INFO() #define arginfo_class_Redis_rPop arginfo_class_Redis_lPop @@ -640,9 +643,9 @@ ZEND_END_ARG_INFO() #define arginfo_class_Redis_strlen arginfo_class_Redis__prefix -ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_subscribe, 0, 0, 1) - ZEND_ARG_INFO(0, channel) - ZEND_ARG_VARIADIC_INFO(0, other_channels) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_subscribe, 0, 0, 2) + ZEND_ARG_INFO(0, channels) + ZEND_ARG_INFO(0, cb) ZEND_END_ARG_INFO() #define arginfo_class_Redis_swapdb arginfo_class_Redis_rpoplpush @@ -655,7 +658,9 @@ ZEND_END_ARG_INFO() #define arginfo_class_Redis_unlink arginfo_class_Redis_del -#define arginfo_class_Redis_unsubscribe arginfo_class_Redis_subscribe +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_unsubscribe, 0, 0, 1) + ZEND_ARG_INFO(0, channels) +ZEND_END_ARG_INFO() #define arginfo_class_Redis_unwatch arginfo_class_Redis___destruct |