Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Yatsukhnenko <yatsukhnenko@gmail.com>2022-06-26 13:10:56 +0300
committerPavlo Yatsukhnenko <yatsukhnenko@gmail.com>2022-06-28 18:18:38 +0300
commit3c9e159c7e700a48aff29b83374039c3bdf1e909 (patch)
tree2c3271ab868ca18b35e02afdd904d512474a8c62
parentaaa4c91a7640d2fc0f3fea203dcc64d42a17b2cf (diff)
Refactor subscribe/unsubscribepubsub
-rw-r--r--cluster_library.c10
-rw-r--r--common.h2
-rw-r--r--library.c134
-rw-r--r--redis.stub.php8
-rw-r--r--redis_arginfo.h17
-rw-r--r--redis_commands.c2
-rw-r--r--redis_commands.h8
-rw-r--r--redis_legacy_arginfo.h19
8 files changed, 135 insertions, 65 deletions
diff --git a/cluster_library.c b/cluster_library.c
index 976cb889..2512f0fd 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -1857,8 +1857,8 @@ PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *
// Set up our callback pointers
zval z_ret, z_args[4];
- sctx->cb.retval = &z_ret;
- sctx->cb.params = z_args;
+ sctx->cb.fci.retval = &z_ret;
+ sctx->cb.fci.params = z_args;
/* We're in a subscribe loop */
c->subscribed_slot = c->cmd_slot;
@@ -1911,12 +1911,10 @@ PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *
}
// Set arg count
- sctx->cb.param_count = tab_idx;
+ sctx->cb.fci.param_count = tab_idx;
// Execute our callback
- if (zend_call_function(&(sctx->cb), &(sctx->cb_cache)) !=
- SUCCESS)
- {
+ if (zend_call_function(&sctx->cb.fci, &sctx->cb.fci_cache) != SUCCESS) {
break;
}
diff --git a/common.h b/common.h
index 8b1a0127..0ae19a2d 100644
--- a/common.h
+++ b/common.h
@@ -289,7 +289,7 @@ typedef struct {
int persistent;
int watching;
zend_string *persistent_id;
-
+ HashTable *subs;
redis_serializer serializer;
int compression;
int compression_level;
diff --git a/library.c b/library.c
index 9c51680c..1038ae8b 100644
--- a/library.c
+++ b/library.c
@@ -438,58 +438,87 @@ redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
}
}
+static void
+ht_free_subs(zval *data)
+{
+ efree(Z_PTR_P(data));
+}
+
PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab,
void *ctx)
{
+ HashTable *subs;
+ subscribeCallback *cb;
subscribeContext *sctx = (subscribeContext*)ctx;
zval *z_tmp, z_resp;
+ ALLOC_HASHTABLE(subs);
+ zend_hash_init(subs, 0, NULL, ht_free_subs, 0);
// Consume response(s) from subscribe, which will vary on argc
while(sctx->argc--) {
+ ZVAL_NULL(&z_resp);
if (!redis_sock_read_multibulk_reply_zval(
INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)
) {
- efree(sctx);
- return -1;
+ goto error;
}
// We'll need to find the command response
if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_resp), 0)) == NULL) {
- zval_dtor(&z_resp);
- efree(sctx);
- return -1;
+ goto error;
}
// Make sure the command response matches the command we called
if(strcasecmp(Z_STRVAL_P(z_tmp), sctx->kw) !=0) {
- zval_dtor(&z_resp);
- efree(sctx);
- return -1;
+ goto error;
}
+ if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_resp), 1)) == NULL) {
+ goto error;
+ }
+
+ zend_hash_str_update_mem(subs, Z_STRVAL_P(z_tmp), Z_STRLEN_P(z_tmp),
+ &sctx->cb, sizeof(sctx->cb));
+
zval_dtor(&z_resp);
}
- zval z_ret, z_args[4];
- sctx->cb.retval = &z_ret;
- sctx->cb.params = z_args;
+ efree(sctx);
+
+ if (redis_sock->subs) {
+ zend_string *zkey;
+
+ ZEND_HASH_FOREACH_STR_KEY_PTR(subs, zkey, cb) {
+ zend_hash_update_mem(redis_sock->subs, zkey, cb, sizeof(*cb));
+ } ZEND_HASH_FOREACH_END();
+ zend_hash_destroy(subs);
+ efree(subs);
+
+ RETVAL_TRUE;
+ return SUCCESS;
+ }
+ redis_sock->subs = subs;
/* Multibulk response, {[pattern], type, channel, payload } */
- while(1) {
- zval *z_type, *z_chan, *z_pat = NULL, *z_data;
+ while (redis_sock->subs) {
+ zval z_ret, z_args[4], *z_type, *z_chan, *z_pat = NULL, *z_data;
HashTable *ht_tab;
- int tab_idx=1, is_pmsg;
+ int tab_idx = 1, is_pmsg = 0;
+ ZVAL_NULL(&z_resp);
if (!redis_sock_read_multibulk_reply_zval(
- INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)) break;
+ INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)
+ ) {
+ goto failure;
+ }
ht_tab = Z_ARRVAL(z_resp);
if ((z_type = zend_hash_index_find(ht_tab, 0)) == NULL ||
Z_TYPE_P(z_type) != IS_STRING
) {
- break;
+ goto failure;
}
// Check for message or pmessage
@@ -498,13 +527,14 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
{
is_pmsg = *Z_STRVAL_P(z_type)=='p';
} else {
- break;
+ zval_dtor(&z_resp);
+ continue;
}
// Extract pattern if it's a pmessage
if(is_pmsg) {
if ((z_pat = zend_hash_index_find(ht_tab, tab_idx++)) == NULL) {
- break;
+ goto failure;
}
}
@@ -512,7 +542,11 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
if ((z_chan = zend_hash_index_find(ht_tab, tab_idx++)) == NULL ||
(z_data = zend_hash_index_find(ht_tab, tab_idx++)) == NULL
) {
- break;
+ goto failure;
+ }
+
+ if ((cb = zend_hash_str_find_ptr(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan))) == NULL) {
+ goto failure;
}
// Different args for SUBSCRIBE and PSUBSCRIBE
@@ -527,13 +561,13 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
}
// Set arg count
- sctx->cb.param_count = tab_idx;
+ cb->fci.param_count = tab_idx;
+ cb->fci.retval = &z_ret;
+ cb->fci.params = z_args;
// Execute callback
- if(zend_call_function(&(sctx->cb), &(sctx->cb_cache))
- ==FAILURE)
- {
- break;
+ if (zend_call_function(&cb->fci, &cb->fci_cache) != SUCCESS) {
+ goto failure;
}
// If we have a return value free it
@@ -541,11 +575,18 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
zval_dtor(&z_resp);
}
+ RETVAL_TRUE;
+ return SUCCESS;
+
// This is an error state, clean up
- zval_dtor(&z_resp);
+error:
efree(sctx);
-
- return -1;
+ zend_hash_destroy(subs);
+ efree(subs);
+failure:
+ zval_dtor(&z_resp);
+ RETVAL_FALSE;
+ return FAILURE;
}
PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS,
@@ -553,31 +594,45 @@ PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS,
void *ctx)
{
subscribeContext *sctx = (subscribeContext*)ctx;
- zval *z_chan, zv, *z_ret = &zv, z_resp;
- int i;
+ zval *z_chan, z_ret, z_resp;
- array_init(z_ret);
+ array_init(&z_ret);
- for (i = 0; i < sctx->argc; i++) {
+ while (sctx->argc--) {
+ ZVAL_NULL(&z_resp);
if (!redis_sock_read_multibulk_reply_zval(
INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp) ||
(z_chan = zend_hash_index_find(Z_ARRVAL(z_resp), 1)) == NULL
) {
- zval_dtor(z_ret);
- return -1;
+ efree(sctx);
+ zval_dtor(&z_resp);
+ zval_dtor(&z_ret);
+ RETVAL_FALSE;
+ return FAILURE;
}
- add_assoc_bool(z_ret, Z_STRVAL_P(z_chan), 1);
+ if (!redis_sock->subs ||
+ !zend_hash_str_exists(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan))
+ ) {
+ add_assoc_bool_ex(&z_ret, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan), 0);
+ } else {
+ zend_hash_str_del(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan));
+ add_assoc_bool_ex(&z_ret, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan), 1);
+ }
zval_dtor(&z_resp);
}
efree(sctx);
- RETVAL_ZVAL(z_ret, 0, 1);
+ if (redis_sock->subs && !zend_hash_num_elements(redis_sock->subs)) {
+ zend_hash_destroy(redis_sock->subs);
+ efree(redis_sock->subs);
+ redis_sock->subs = NULL;
+ }
- // Success
- return 0;
+ RETVAL_ZVAL(&z_ret, 0, 1);
+ return SUCCESS;
}
PHP_REDIS_API zval *
@@ -2870,6 +2925,11 @@ PHP_REDIS_API void redis_free_socket(RedisSock *redis_sock)
if (redis_sock->host) {
zend_string_release(redis_sock->host);
}
+ if (redis_sock->subs) {
+ zend_hash_destroy(redis_sock->subs);
+ efree(redis_sock->subs);
+ redis_sock->subs = NULL;
+ }
redis_sock_free_auth(redis_sock);
free_reply_callbacks(redis_sock);
efree(redis_sock);
diff --git a/redis.stub.php b/redis.stub.php
index f0f8a37a..93dd2730 100644
--- a/redis.stub.php
+++ b/redis.stub.php
@@ -319,7 +319,7 @@ public function persist(string $key): bool;
/** @return bool|Redis */
public function psetex(string $key, int $expire, mixed $value);
- public function psubscribe(array $patterns): void;
+ public function psubscribe(array $patterns, callable $cb): bool;
public function pttl(string $key): int;
@@ -327,7 +327,7 @@ public function persist(string $key): bool;
public function pubsub(string $command, mixed $arg = null): mixed;
- public function punsubscribe(array $patterns): array;
+ public function punsubscribe(array $patterns): bool|array;
public function rPop(string $key, int $count = 0): bool|string|array;
@@ -439,7 +439,7 @@ public function persist(string $key): bool;
/** @return int|Redis */
public function strlen(string $key);
- public function subscribe(string $channel, string ...$other_channels): array;
+ public function subscribe(array $channels, callable $cb): bool;
public function swapdb(string $src, string $dst): bool;
@@ -455,7 +455,7 @@ public function persist(string $key): bool;
*/
public function unlink(array|string $key, string ...$other_keys);
- public function unsubscribe(string $channel, string ...$other_channels): array;
+ public function unsubscribe(array $channels): bool|array;
/** @return bool|Redis */
public function unwatch();
diff --git a/redis_arginfo.h b/redis_arginfo.h
index ee479548..ea660631 100644
--- a/redis_arginfo.h
+++ b/redis_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 9671c30926e8d581a126833360b123c8ae2dd913 */
+ * Stub hash: efcda1ed028d65d0b4848d32133dc0e32f17871f */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis___construct, 0, 0, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, options, IS_ARRAY, 0, "null")
@@ -541,8 +541,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psetex, 0, 0, 3)
ZEND_ARG_TYPE_INFO(0, value, IS_MIXED, 0)
ZEND_END_ARG_INFO()
-ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_psubscribe, 0, 1, IS_VOID, 0)
+ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_psubscribe, 0, 2, _IS_BOOL, 0)
ZEND_ARG_TYPE_INFO(0, patterns, IS_ARRAY, 0)
+ ZEND_ARG_TYPE_INFO(0, cb, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
#define arginfo_class_Redis_pttl arginfo_class_Redis_hLen
@@ -557,7 +558,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_pubsub, 0, 1, IS_MIX
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, arg, IS_MIXED, 0, "null")
ZEND_END_ARG_INFO()
-ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_punsubscribe, 0, 1, IS_ARRAY, 0)
+ZEND_BEGIN_ARG_WITH_RETURN_TYPE_MASK_EX(arginfo_class_Redis_punsubscribe, 0, 1, MAY_BE_BOOL|MAY_BE_ARRAY)
ZEND_ARG_TYPE_INFO(0, patterns, IS_ARRAY, 0)
ZEND_END_ARG_INFO()
@@ -732,9 +733,9 @@ ZEND_END_ARG_INFO()
#define arginfo_class_Redis_strlen arginfo_class_Redis_decr
-ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_subscribe, 0, 1, IS_ARRAY, 0)
- ZEND_ARG_TYPE_INFO(0, channel, IS_STRING, 0)
- ZEND_ARG_VARIADIC_TYPE_INFO(0, other_channels, IS_STRING, 0)
+ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_subscribe, 0, 2, _IS_BOOL, 0)
+ ZEND_ARG_TYPE_INFO(0, channels, IS_ARRAY, 0)
+ ZEND_ARG_TYPE_INFO(0, cb, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_swapdb, 0, 2, _IS_BOOL, 0)
@@ -750,7 +751,9 @@ ZEND_END_ARG_INFO()
#define arginfo_class_Redis_unlink arginfo_class_Redis_del
-#define arginfo_class_Redis_unsubscribe arginfo_class_Redis_subscribe
+ZEND_BEGIN_ARG_WITH_RETURN_TYPE_MASK_EX(arginfo_class_Redis_unsubscribe, 0, 1, MAY_BE_BOOL|MAY_BE_ARRAY)
+ ZEND_ARG_TYPE_INFO(0, channels, IS_ARRAY, 0)
+ZEND_END_ARG_INFO()
#define arginfo_class_Redis_unwatch arginfo_class_Redis___destruct
diff --git a/redis_commands.c b/redis_commands.c
index 3b3fa917..aa7675af 100644
--- a/redis_commands.c
+++ b/redis_commands.c
@@ -1101,7 +1101,7 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *key;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "af", &z_arr,
- &(sctx->cb), &(sctx->cb_cache)) == FAILURE)
+ &sctx->cb.fci, &sctx->cb.fci_cache) == FAILURE)
{
efree(sctx);
return FAILURE;
diff --git a/redis_commands.h b/redis_commands.h
index dc88848e..e766e20d 100644
--- a/redis_commands.h
+++ b/redis_commands.h
@@ -14,11 +14,15 @@
if (slot) *slot = cluster_hash_key(key,key_len);
/* Simple container so we can push subscribe context out */
+typedef struct {
+ zend_fcall_info fci;
+ zend_fcall_info_cache fci_cache;
+} subscribeCallback;
+
typedef struct subscribeContext {
char *kw;
int argc;
- zend_fcall_info cb;
- zend_fcall_info_cache cb_cache;
+ subscribeCallback cb;
} subscribeContext;
/* Construct a raw command */
diff --git a/redis_legacy_arginfo.h b/redis_legacy_arginfo.h
index 85cc8be1..622d61cb 100644
--- a/redis_legacy_arginfo.h
+++ b/redis_legacy_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 9671c30926e8d581a126833360b123c8ae2dd913 */
+ * Stub hash: efcda1ed028d65d0b4848d32133dc0e32f17871f */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis___construct, 0, 0, 0)
ZEND_ARG_INFO(0, options)
@@ -472,8 +472,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psetex, 0, 0, 3)
ZEND_ARG_INFO(0, value)
ZEND_END_ARG_INFO()
-ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psubscribe, 0, 0, 1)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psubscribe, 0, 0, 2)
ZEND_ARG_INFO(0, patterns)
+ ZEND_ARG_INFO(0, cb)
ZEND_END_ARG_INFO()
#define arginfo_class_Redis_pttl arginfo_class_Redis__prefix
@@ -488,7 +489,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_pubsub, 0, 0, 1)
ZEND_ARG_INFO(0, arg)
ZEND_END_ARG_INFO()
-#define arginfo_class_Redis_punsubscribe arginfo_class_Redis_psubscribe
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_punsubscribe, 0, 0, 1)
+ ZEND_ARG_INFO(0, patterns)
+ZEND_END_ARG_INFO()
#define arginfo_class_Redis_rPop arginfo_class_Redis_lPop
@@ -640,9 +643,9 @@ ZEND_END_ARG_INFO()
#define arginfo_class_Redis_strlen arginfo_class_Redis__prefix
-ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_subscribe, 0, 0, 1)
- ZEND_ARG_INFO(0, channel)
- ZEND_ARG_VARIADIC_INFO(0, other_channels)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_subscribe, 0, 0, 2)
+ ZEND_ARG_INFO(0, channels)
+ ZEND_ARG_INFO(0, cb)
ZEND_END_ARG_INFO()
#define arginfo_class_Redis_swapdb arginfo_class_Redis_rpoplpush
@@ -655,7 +658,9 @@ ZEND_END_ARG_INFO()
#define arginfo_class_Redis_unlink arginfo_class_Redis_del
-#define arginfo_class_Redis_unsubscribe arginfo_class_Redis_subscribe
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_unsubscribe, 0, 0, 1)
+ ZEND_ARG_INFO(0, channels)
+ZEND_END_ARG_INFO()
#define arginfo_class_Redis_unwatch arginfo_class_Redis___destruct