diff options
author | Pavlo Yatsukhnenko <yatsukhnenko@gmail.com> | 2022-10-31 16:39:56 +0300 |
---|---|---|
committer | Michael Grunder <michael.grunder@gmail.com> | 2022-11-01 03:01:35 +0300 |
commit | 2a0d1c1e6d10d53c3de510cef4310be912bec4c0 (patch) | |
tree | 0d68aa7bab611a4e0a652d16648fca5bcaee6eb9 | |
parent | cc2383f07666e6afefd7b58995fb607d9967d650 (diff) |
Refactor PubSub command
-rw-r--r-- | library.c | 22 | ||||
-rw-r--r-- | library.h | 2 | ||||
-rw-r--r-- | redis.c | 111 | ||||
-rw-r--r-- | redis_commands.c | 76 | ||||
-rw-r--r-- | redis_commands.h | 3 | ||||
-rw-r--r-- | tests/RedisTest.php | 4 |
6 files changed, 105 insertions, 113 deletions
@@ -455,6 +455,22 @@ redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, } } +PHP_REDIS_API int +redis_pubsub_response(INTERNAL_FUNCTION_PARAMETERS, + RedisSock *redis_sock, zval *z_tab, void *ctx) +{ + if (ctx == NULL) { + return redis_long_response(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); + } else if (ctx == PHPREDIS_CTX_PTR) { + return redis_read_variant_reply(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); + } else if (ctx == PHPREDIS_CTX_PTR + 1) { + return redis_mbulk_reply_zipped_keys_int(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); + } else { + ZEND_ASSERT(!"memory corruption?"); + return FAILURE; + } +} + static void ht_free_subs(zval *data) { @@ -1354,6 +1370,7 @@ redis_zrandmember_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return redis_mbulk_reply_zipped_keys_dbl(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); } else { ZEND_ASSERT(!"memory corruption?"); + return FAILURE; } } @@ -1366,6 +1383,7 @@ redis_zdiff_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval * return redis_mbulk_reply_zipped_keys_dbl(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); } else { ZEND_ASSERT(!"memory corruption?"); + return FAILURE; } } @@ -1378,6 +1396,7 @@ redis_set_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_ return redis_string_response(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); } else { ZEND_ASSERT(!"memory corruption?"); + return FAILURE; } } @@ -1392,6 +1411,7 @@ redis_hrandfield_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, z return redis_mbulk_reply_zipped_raw(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); } else { ZEND_ASSERT(!"memory corruption?"); + return FAILURE; } } @@ -1404,6 +1424,7 @@ redis_pop_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_ return redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, NULL); } else { ZEND_ASSERT(!"memory corruption?"); + return FAILURE; } } @@ -1447,6 +1468,7 @@ redis_lpos_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z } } else { ZEND_ASSERT(!"memory corruption?"); + return FAILURE; } if (IS_ATOMIC(redis_sock)) { @@ -109,6 +109,8 @@ PHP_REDIS_API int redis_xclaim_reply(INTERNAL_FUNCTION_PARAMETERS, PHP_REDIS_API int redis_xinfo_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); +PHP_REDIS_API int redis_pubsub_response(INTERNAL_FUNCTION_PARAMETERS, + RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS, @@ -2707,122 +2707,13 @@ PHP_METHOD(Redis, wait) { REDIS_PROCESS_RESPONSE(redis_long_response); } -/* Construct a PUBSUB command */ -PHP_REDIS_API int -redis_build_pubsub_cmd(RedisSock *redis_sock, char **ret, PUBSUB_TYPE type, - zval *arg) -{ - HashTable *ht_chan; - zval *z_ele; - smart_string cmd = {0}; - - if (type == PUBSUB_CHANNELS) { - if (arg) { - /* With a pattern */ - return REDIS_SPPRINTF(ret, "PUBSUB", "sk", "CHANNELS", sizeof("CHANNELS") - 1, - Z_STRVAL_P(arg), Z_STRLEN_P(arg)); - } else { - /* No pattern */ - return REDIS_SPPRINTF(ret, "PUBSUB", "s", "CHANNELS", sizeof("CHANNELS") - 1); - } - } else if (type == PUBSUB_NUMSUB) { - ht_chan = Z_ARRVAL_P(arg); - - // Add PUBSUB and NUMSUB bits - redis_cmd_init_sstr(&cmd, zend_hash_num_elements(ht_chan)+1, "PUBSUB", sizeof("PUBSUB")-1); - redis_cmd_append_sstr(&cmd, "NUMSUB", sizeof("NUMSUB")-1); - - /* Iterate our elements */ - ZEND_HASH_FOREACH_VAL(ht_chan, z_ele) { - zend_string *zstr = zval_get_string(z_ele); - redis_cmd_append_sstr_key(&cmd, ZSTR_VAL(zstr), ZSTR_LEN(zstr), redis_sock, NULL); - zend_string_release(zstr); - } ZEND_HASH_FOREACH_END(); - - /* Set return */ - *ret = cmd.c; - return cmd.len; - } else if (type == PUBSUB_NUMPAT) { - return REDIS_SPPRINTF(ret, "PUBSUB", "s", "NUMPAT", sizeof("NUMPAT") - 1); - } - - /* Shouldn't ever happen */ - return -1; -} - /* * {{{ proto Redis::pubsub("channels", pattern); * proto Redis::pubsub("numsub", Array channels); * proto Redis::pubsub("numpat"); }}} */ PHP_METHOD(Redis, pubsub) { - zval *object; - RedisSock *redis_sock; - char *keyword, *cmd; - int cmd_len; - size_t kw_len; - PUBSUB_TYPE type; - zval *arg = NULL; - - // Parse arguments - if(zend_parse_method_parameters(ZEND_NUM_ARGS(), getThis(), - "Os|z", &object, redis_ce, &keyword, - &kw_len, &arg)==FAILURE) - { - RETURN_FALSE; - } - - /* Validate our sub command keyword, and that we've got proper arguments */ - if(!strncasecmp(keyword, "channels", sizeof("channels"))) { - /* One (optional) string argument */ - if(arg && Z_TYPE_P(arg) != IS_STRING) { - RETURN_FALSE; - } - type = PUBSUB_CHANNELS; - } else if(!strncasecmp(keyword, "numsub", sizeof("numsub"))) { - /* One array argument */ - if(ZEND_NUM_ARGS() < 2 || Z_TYPE_P(arg) != IS_ARRAY || - zend_hash_num_elements(Z_ARRVAL_P(arg)) == 0) - { - RETURN_FALSE; - } - type = PUBSUB_NUMSUB; - } else if(!strncasecmp(keyword, "numpat", sizeof("numpat"))) { - type = PUBSUB_NUMPAT; - } else { - /* Invalid keyword */ - RETURN_FALSE; - } - - /* Grab our socket context object */ - if ((redis_sock = redis_sock_get(object, 0)) == NULL) { - RETURN_FALSE; - } - - /* Construct our "PUBSUB" command */ - cmd_len = redis_build_pubsub_cmd(redis_sock, &cmd, type, arg); - - REDIS_PROCESS_REQUEST(redis_sock, cmd, cmd_len); - - if(type == PUBSUB_NUMSUB) { - if (IS_ATOMIC(redis_sock)) { - if(redis_mbulk_reply_zipped_keys_int(INTERNAL_FUNCTION_PARAM_PASSTHRU, - redis_sock, NULL, NULL) < 0) - { - RETURN_FALSE; - } - } - REDIS_PROCESS_RESPONSE(redis_mbulk_reply_zipped_keys_int); - } else { - if (IS_ATOMIC(redis_sock)) { - if(redis_read_variant_reply(INTERNAL_FUNCTION_PARAM_PASSTHRU, - redis_sock, NULL, NULL) < 0) - { - RETURN_FALSE; - } - } - REDIS_PROCESS_RESPONSE(redis_read_variant_reply); - } + REDIS_PROCESS_CMD(pubsub, redis_pubsub_response); } /* {{{ proto variant Redis::eval(string script, [array keys, long num_keys]) */ diff --git a/redis_commands.c b/redis_commands.c index b17c7d59..dabeae3f 100644 --- a/redis_commands.c +++ b/redis_commands.c @@ -1373,7 +1373,81 @@ redis_zinterunionstore_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return SUCCESS; } -/* SUBSCRIBE/PSUBSCRIBE */ +int redis_pubsub_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + HashTable *channels = NULL; + smart_string cmdstr = {0}; + zend_string *op, *pattern = NULL; + zval *arg = NULL, *z_chan; + + ZEND_PARSE_PARAMETERS_START(1, 2) + Z_PARAM_STR(op) + Z_PARAM_OPTIONAL + Z_PARAM_ZVAL(arg) + ZEND_PARSE_PARAMETERS_END_EX(return FAILURE); + + if (zend_string_equals_literal_ci(op, "NUMPAT")) { + *ctx = NULL; + } else if (zend_string_equals_literal_ci(op, "CHANNELS") || + zend_string_equals_literal_ci(op, "SHARDCHANNELS") + ) { + if (arg != NULL) { + if (Z_TYPE_P(arg) != IS_STRING) { + php_error_docref(NULL, E_WARNING, "Invalid patern value"); + return FAILURE; + } + pattern = zval_get_string(arg); + } + *ctx = PHPREDIS_CTX_PTR; + } else if (zend_string_equals_literal_ci(op, "NUMSUB") || + zend_string_equals_literal_ci(op, "SHARDNUMSUB") + ) { + if (arg != NULL) { + if (Z_TYPE_P(arg) != IS_ARRAY) { + php_error_docref(NULL, E_WARNING, "Invalid channels value"); + return FAILURE; + } + channels = Z_ARRVAL_P(arg); + } + *ctx = PHPREDIS_CTX_PTR + 1; + } else { + php_error_docref(NULL, E_WARNING, "Unknown PUBSUB operation '%s'", ZSTR_VAL(op)); + return FAILURE; + } + + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, 1 + !!pattern + (channels ? zend_hash_num_elements(channels) : 0), "PUBSUB"); + redis_cmd_append_sstr_zstr(&cmdstr, op); + + if (pattern != NULL) { + redis_cmd_append_sstr_zstr(&cmdstr, pattern); + zend_string_release(pattern); + } else if (channels != NULL) { + ZEND_HASH_FOREACH_VAL(channels, z_chan) { + // We want to deal with strings here + zend_string *zstr = zval_get_string(z_chan); + + // Grab channel name, prefix if required + char *key = ZSTR_VAL(zstr); + size_t key_len = ZSTR_LEN(zstr); + int key_free = redis_key_prefix(redis_sock, &key, &key_len); + + // Add this channel + redis_cmd_append_sstr(&cmdstr, key, key_len); + + zend_string_release(zstr); + // Free our key if it was prefixed + if (key_free) efree(key); + } ZEND_HASH_FOREACH_END(); + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + + return SUCCESS; +} + +/* SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE */ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw, char **cmd, int *cmd_len, short *slot, void **ctx) diff --git a/redis_commands.h b/redis_commands.h index 6a4ec01a..47176c48 100644 --- a/redis_commands.h +++ b/redis_commands.h @@ -139,6 +139,9 @@ int redis_mpop_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw int redis_restore_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char **cmd, int *cmd_len, short *slot, void **ctx); +int redis_pubsub_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); diff --git a/tests/RedisTest.php b/tests/RedisTest.php index 336cf23e..a88d8009 100644 --- a/tests/RedisTest.php +++ b/tests/RedisTest.php @@ -202,8 +202,8 @@ class Redis_Test extends TestSuite $this->assertTrue(is_int($result)); // Invalid calls - $this->assertFalse($this->redis->pubsub("notacommand")); - $this->assertFalse($this->redis->pubsub("numsub", "not-an-array")); + $this->assertFalse(@$this->redis->pubsub("notacommand")); + $this->assertFalse(@$this->redis->pubsub("numsub", "not-an-array")); } /* These test cases were generated randomly. We're just trying to test |