diff options
-rw-r--r-- | README.markdown | 21 | ||||
-rw-r--r-- | common.h | 7 | ||||
-rw-r--r-- | php_redis.h | 1 | ||||
-rw-r--r-- | redis.c | 164 | ||||
-rw-r--r-- | tests/TestRedis.php | 43 |
5 files changed, 235 insertions, 1 deletions
diff --git a/README.markdown b/README.markdown index f54516e9..634193ac 100644 --- a/README.markdown +++ b/README.markdown @@ -2864,6 +2864,7 @@ while($arr_matches = $redis->zscan('zset', $it, '*pattern*')) { * [psubscribe](#psubscribe) - Subscribe to channels by pattern * [publish](#publish) - Post a message to a channel * [subscribe](#subscribe) - Subscribe to channels +* [pubsub](#pubsub) - Introspection into the pub/sub subsystem ### psubscribe ----- @@ -2924,6 +2925,26 @@ function f($redis, $chan, $msg) { $redis->subscribe(array('chan-1', 'chan-2', 'chan-3'), 'f'); // subscribe to 3 chans ~~~ +### pubsub +----- +_**Description**_: A command allowing you to get information on the Redis pub/sub system. + +##### *Parameters* +*keyword*: String, which can be: "channels", "numsub", or "numpat" +*argument*: Optional, variant. For the "channels" subcommand, you can pass a string pattern. For "numsub" an array of channel names. + +##### *Return value* +*CHANNELS*: Returns an array where the members are the matching channels. +*NUMSUB*: Returns a key/value array where the keys are channel names and values are their counts. +*NUMPAT*: Integer return containing the number active pattern subscriptions + +##### *Example* +~~~ +$redis->pubsub("channels"); /*All channels */ +$redis->pubsub("channels", "*pattern*"); /* Just channels matching your pattern */ +$redis->pubsub("numsub", Array("chan1", "chan2")); /*Get subscriber counts for 'chan1' and 'chan2'*/ +$redsi->pubsub("numpat"); /* Get the number of pattern subscribers */ +``` ## Transactions @@ -45,6 +45,13 @@ typedef enum _REDIS_SCAN_TYPE { TYPE_ZSCAN } REDIS_SCAN_TYPE; +/* PUBSUB subcommands */ +typedef enum _PUBSUB_TYPE { + PUBSUB_CHANNELS, + PUBSUB_NUMSUB, + PUBSUB_NUMPAT +} PUBSUB_TYPE; + /* options */ #define REDIS_OPT_SERIALIZER 1 #define REDIS_OPT_PREFIX 2 diff --git a/php_redis.h b/php_redis.h index 12eb7462..4bb79f74 100644 --- a/php_redis.h +++ b/php_redis.h @@ -185,6 +185,7 @@ PHP_METHOD(Redis, setOption); PHP_METHOD(Redis, config); PHP_METHOD(Redis, slowlog); PHP_METHOD(Redis, wait); +PHP_METHOD(Redis, pubsub); PHP_METHOD(Redis, client); @@ -278,6 +278,7 @@ static zend_function_entry redis_functions[] = { PHP_ME(Redis, isConnected, NULL, ZEND_ACC_PUBLIC) PHP_ME(Redis, wait, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Redis, pubsub, NULL, ZEND_ACC_PUBLIC) /* aliases */ PHP_MALIAS(Redis, open, connect, NULL, ZEND_ACC_PUBLIC) @@ -6132,6 +6133,169 @@ PHP_METHOD(Redis, wait) { REDIS_PROCESS_RESPONSE(redis_long_response); } +/* + * Construct a PUBSUB command + */ +PHPAPI int +redis_build_pubsub_cmd(RedisSock *redis_sock, char **ret, PUBSUB_TYPE type, + zval *arg TSRMLS_CC) +{ + HashTable *ht_chan; + HashPosition ptr; + zval **z_ele; + char *key; + int cmd_len, key_len, key_free; + smart_str cmd = {0}; + + if(type == PUBSUB_CHANNELS) { + if(arg) { + // Get string argument and length. + key = Z_STRVAL_P(arg); + key_len = Z_STRLEN_P(arg); + + // Prefix if necissary + key_free = redis_key_prefix(redis_sock, &key, &key_len TSRMLS_CC); + + // With a pattern + cmd_len = redis_cmd_format_static(ret, "PUBSUB", "ss", "CHANNELS", sizeof("CHANNELS")-1, + key, key_len); + + // Free the channel name if we prefixed it + if(key_free) efree(key); + + // Return command length + return cmd_len; + } else { + // No pattern + return redis_cmd_format_static(ret, "PUBSUB", "s", "CHANNELS", sizeof("CHANNELS")-1); + } + } else if(type == PUBSUB_NUMSUB) { + ht_chan = Z_ARRVAL_P(arg); + + // Add PUBSUB and NUMSUB bits + redis_cmd_init_sstr(&cmd, zend_hash_num_elements(ht_chan)+1, "PUBSUB", sizeof("PUBSUB")-1); + redis_cmd_append_sstr(&cmd, "NUMSUB", sizeof("NUMSUB")-1); + + // Iterate our elements + for(zend_hash_internal_pointer_reset_ex(ht_chan, &ptr); + zend_hash_get_current_data_ex(ht_chan, (void**)&z_ele, &ptr)==SUCCESS; + zend_hash_move_forward_ex(ht_chan, &ptr)) + { + char *key; + int key_len, key_free; + zval *z_tmp = NULL; + + if(Z_TYPE_PP(z_ele) == IS_STRING) { + key = Z_STRVAL_PP(z_ele); + key_len = Z_STRLEN_PP(z_ele); + } else { + MAKE_STD_ZVAL(z_tmp); + *z_tmp = **z_ele; + zval_copy_ctor(z_tmp); + convert_to_string(z_tmp); + + key = Z_STRVAL_P(z_tmp); + key_len = Z_STRLEN_P(z_tmp); + } + + // Apply prefix if required + key_free = redis_key_prefix(redis_sock, &key, &key_len TSRMLS_CC); + + // Append this channel + redis_cmd_append_sstr(&cmd, key, key_len); + + // Free key if prefixed + if(key_free) efree(key); + + // Free our temp var if we converted from something other than a string + if(z_tmp) { + zval_dtor(z_tmp); + efree(z_tmp); + z_tmp = NULL; + } + } + + // Set return + *ret = cmd.c; + return cmd.len; + } else if(type == PUBSUB_NUMPAT) { + return redis_cmd_format_static(ret, "PUBSUB", "s", "NUMPAT", sizeof("NUMPAT")-1); + } + + // Shouldn't ever happen + return -1; +} + +/* + * {{{ proto Redis::pubsub("channels", pattern); + * proto Redis::pubsub("numsub", Array channels); + * proto Redis::pubsub("numpat"); }}} + */ +PHP_METHOD(Redis, pubsub) { + zval *object; + RedisSock *redis_sock; + char *keyword, *cmd; + int kw_len, cmd_len; + PUBSUB_TYPE type; + zval *arg=NULL; + + // Parse arguments + if(zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os|z", + &object, redis_ce, &keyword, &kw_len, &arg) + ==FAILURE) + { + RETURN_FALSE; + } + + // Validate our sub command keyword, and that we've got proper arguments + if(!strncasecmp(keyword, "channels", sizeof("channels"))) { + // One (optional) string argument + if(arg && Z_TYPE_P(arg) != IS_STRING) { + RETURN_FALSE; + } + type = PUBSUB_CHANNELS; + } else if(!strncasecmp(keyword, "numsub", sizeof("numsub"))) { + // One array argument + if(ZEND_NUM_ARGS() < 2 || Z_TYPE_P(arg) != IS_ARRAY || + zend_hash_num_elements(Z_ARRVAL_P(arg))==0) + { + RETURN_FALSE; + } + type = PUBSUB_NUMSUB; + } else if(!strncasecmp(keyword, "numpat", sizeof("numpat"))) { + type = PUBSUB_NUMPAT; + } else { + // Invalid keyword + RETURN_FALSE; + } + + // Grab our socket context object + if(redis_sock_get(object, &redis_sock TSRMLS_CC, 0)<0) { + RETURN_FALSE; + } + + // Construct our "PUBSUB" command + cmd_len = redis_build_pubsub_cmd(redis_sock, &cmd, type, arg TSRMLS_CC); + + REDIS_PROCESS_REQUEST(redis_sock, cmd, cmd_len); + + if(type == PUBSUB_NUMSUB) { + IF_ATOMIC() { + if(redis_sock_read_multibulk_reply_zipped(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, NULL, NULL)<0) { + RETURN_FALSE; + } + } + REDIS_PROCESS_RESPONSE(redis_sock_read_multibulk_reply_zipped); + } else { + IF_ATOMIC() { + if(redis_read_variant_reply(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, NULL)<0) { + RETURN_FALSE; + } + } + REDIS_PROCESS_RESPONSE(redis_read_variant_reply); + } +} + // Construct an EVAL or EVALSHA command, with option argument array and number of arguments that are keys parameter PHPAPI int redis_build_eval_cmd(RedisSock *redis_sock, char **ret, char *keyword, char *value, int val_len, zval *args, int keys_count TSRMLS_DC) { diff --git a/tests/TestRedis.php b/tests/TestRedis.php index 9fa220ae..83fdd5cc 100644 --- a/tests/TestRedis.php +++ b/tests/TestRedis.php @@ -70,7 +70,48 @@ class Redis_Test extends TestSuite ->exec(); $this->assertTrue(is_array($ret) && count($ret) === 1 && $ret[0] >= 0); - } + } + + // Run some simple tests against the PUBSUB command. This is problematic, as we + // can't be sure what's going on in the instance, but we can do some things. + public function testPubSub() { + // Only available since 2.8.0 + if(version_compare($this->version, "2.8.0", "lt")) { + $this->markTestSkipped(); + return; + } + + // PUBSUB CHANNELS ... + $result = $this->redis->pubsub("channels", "*"); + $this->assertTrue(is_array($result)); + $result = $this->redis->pubsub("channels"); + $this->assertTrue(is_array($result)); + + // PUBSUB NUMSUB + + $c1 = uniqid() . '-' . rand(1,100); + $c2 = uniqid() . '-' . rand(1,100); + + $result = $this->redis->pubsub("numsub", Array($c1, $c2)); + + // Should get an array back, with two elements + $this->assertTrue(is_array($result)); + $this->assertEquals(count($result), 2); + + // Make sure the elements are correct, and have zero counts + foreach(Array($c1,$c2) as $channel) { + $this->assertTrue(isset($result[$channel])); + $this->assertEquals($result[$channel], "0"); + } + + // PUBSUB NUMPAT + $result = $this->redis->pubsub("numpat"); + $this->assertTrue(is_int($result)); + + // Invalid calls + $this->assertFalse($this->redis->pubsub("notacommand")); + $this->assertFalse($this->redis->pubsub("numsub", "not-an-array")); + } public function testBitsets() { |