Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorMichael Grunder <michael.grunder@gmail.com>2018-09-29 21:59:01 +0300
committerGitHub <noreply@github.com>2018-09-29 21:59:01 +0300
commit2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch)
tree6982b1e1f17b7cf2fc7e024652fad8212edadacd /tests
parentbfd274712eeb372926d1106b3da3c4fc19c0a48a (diff)
Streams (#1413)
Streams API
Diffstat (limited to 'tests')
-rw-r--r--tests/RedisTest.php444
-rw-r--r--tests/TestSuite.php18
2 files changed, 461 insertions, 1 deletions
diff --git a/tests/RedisTest.php b/tests/RedisTest.php
index b5da01e8..df650ee5 100644
--- a/tests/RedisTest.php
+++ b/tests/RedisTest.php
@@ -16,6 +16,11 @@ class Redis_Test extends TestSuite
'Cupertino' => Array(-122.032182, 37.322998)
);
+ protected $serializers = Array(
+ Redis::SERIALIZER_NONE,
+ Redis::SERIALIZER_PHP,
+ );
+
/**
* @var Redis
*/
@@ -35,12 +40,20 @@ class Redis_Test extends TestSuite
$this->redis = $this->newInstance();
$info = $this->redis->info();
$this->version = (isset($info['redis_version'])?$info['redis_version']:'0.0.0');
+
+ if (defined('Redis::SERIALIZER_IGBINARY')) {
+ $this->serializers[] = Redis::SERIALIZER_IGBINARY;
+ }
}
protected function minVersionCheck($version) {
return version_compare($this->version, $version, "ge");
}
+ protected function mstime() {
+ return round(microtime(true)*1000);
+ }
+
protected function newInstance() {
$r = new Redis();
@@ -5223,6 +5236,437 @@ class Redis_Test extends TestSuite
$this->assertEquals($this->redis->lrange('mylist', 0, -1), Array('A','B','C','D'));
}
+ /* STREAMS */
+
+ protected function addStreamEntries($key, $count) {
+ $ids = Array();
+
+ $this->redis->del($key);
+
+ for ($i = 0; $i < $count; $i++) {
+ $ids[] = $this->redis->xAdd($key, '*', Array('field' => "value:$i"));
+ }
+
+ return $ids;
+ }
+
+ protected function addStreamsAndGroups($arr_streams, $count, $arr_groups) {
+ $ids = Array();
+
+ foreach ($arr_streams as $str_stream) {
+ $ids[$str_stream] = $this->addStreamEntries($str_stream, $count);
+ foreach ($arr_groups as $str_group => $str_id) {
+ $this->redis->xGroup('CREATE', $str_stream, $str_group, $str_id);
+ }
+ }
+
+ return $ids;
+ }
+
+ public function testXAdd() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ $this->redis->del('stream');
+ for ($i = 0; $i < 5; $i++) {
+ $id = $this->redis->xAdd("stream", '*', Array('k1' => 'v1', 'k2' => 'v2'));
+ $this->assertEquals($this->redis->xLen('stream'), $i+1);
+
+ /* Redis should return <timestamp>-<sequence> */
+ $bits = explode('-', $id);
+ $this->assertEquals(count($bits), 2);
+ $this->assertTrue(is_numeric($bits[0]));
+ $this->assertTrue(is_numeric($bits[1]));
+ }
+
+ /* Test an absolute maximum length */
+ for ($i = 0; $i < 100; $i++) {
+ $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10);
+ }
+ $this->assertEquals($this->redis->xLen('stream'), 10);
+
+ /* Not the greatest test but I'm unsure if approximate trimming is
+ * totally deterministic, so just make sure we are able to add with
+ * an approximate maxlen argument structure */
+ $id = $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10, true);
+ $this->assertEquals(count(explode('-', $id)), 2);
+
+ /* Empty message should fail */
+ $this->redis->xAdd('stream', '*', Array());
+ }
+
+ protected function doXRangeTest($reverse) {
+ $key = '{stream}';
+
+ if ($reverse) {
+ list($cmd,$a1,$a2) = Array('xRevRange', '+', 0);
+ } else {
+ list($cmd,$a1,$a2) = Array('xRange', 0, '+');
+ }
+
+ $this->redis->del($key);
+ for ($i = 0; $i < 3; $i++) {
+ $msg = Array('field' => "value:$i");
+ $id = $this->redis->xAdd($key, '*', $msg);
+ $rows[$id] = $msg;
+ }
+
+ $messages = $this->redis->$cmd($key, $a1, $a2);
+ $this->assertEquals(count($messages), 3);
+
+ $i = $reverse ? 2 : 0;
+ foreach ($messages as $seq => $v) {
+ $this->assertEquals(count(explode('-', $seq)), 2);
+ $this->assertEquals($v, Array('field' => "value:$i"));
+ $i += $reverse ? -1 : 1;
+ }
+
+ /* Test COUNT option */
+ for ($count = 1; $count <= 3; $count++) {
+ $messages = $this->redis->$cmd($key, $a1, $a2, $count);
+ $this->assertEquals(count($messages), $count);
+ }
+ }
+
+ public function testXRange() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ foreach (Array(false, true) as $reverse) {
+ foreach ($this->serializers as $serializer) {
+ foreach (Array(NULL, 'prefix:') as $prefix) {
+ $this->redis->setOption(Redis::OPT_PREFIX, $prefix);
+ $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer);
+ $this->doXRangeTest($reverse);
+ }
+ }
+ }
+ }
+
+ protected function testXLen() {
+ if (!$this->minVersionCheck("5.0"))
+ $this->markTestSkipped();
+
+ $this->redis->del('{stream}');
+ for ($i = 0; $i < 5; $i++) {
+ $this->redis->xadd('{stream}', '*', Array('foo' => 'bar'));
+ $this->assertEquals($this->redis->xLen('{stream}'), $i+1);
+ }
+ }
+
+ public function testXGroup() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ $this->addStreamEntries('s', 2);
+
+ /* CREATE */
+ $this->assertTrue($this->redis->xGroup('CREATE', 's', 'mygroup', '$'));
+ $this->assertFalse($this->redis->xGroup('CREATE', 's', 'mygroup', 'BAD_ID'));
+
+ /* BUSYGROUP */
+ $this->redis->xGroup('CREATE', 's', 'mygroup', '$');
+ $this->assertTrue(strpos($this->redis->getLastError(), 'BUSYGROUP') === 0);
+
+ /* SETID */
+ $this->assertTrue($this->redis->xGroup('SETID', 's', 'mygroup', '$'));
+ $this->assertFalse($this->redis->xGroup('SETID', 's', 'mygroup', 'BAD_ID'));
+
+ $this->assertEquals($this->redis->xGroup('DELCONSUMER', 's', 'mygroup', 'myconsumer'),0);
+
+ /* DELGROUP not yet implemented in Redis */
+ }
+
+ public function testXAck() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ for ($n = 1; $n <= 3; $n++) {
+ $this->addStreamsAndGroups(Array('{s}'), 3, Array('g1' => 0));
+ $msg = $this->redis->xReadGroup('g1', 'c1', Array('{s}' => 0));
+
+ /* Extract IDs */
+ $smsg = array_shift($msg);
+ $ids = array_keys($smsg);
+
+ /* Now ACK $n messages */
+ $ids = array_slice($ids, 0, $n);
+ $this->assertEquals($this->redis->xAck('{s}', 'g1', $ids), $n);
+ }
+
+ /* Verify sending no IDs is a failure */
+ $this->assertFalse($this->redis->xAck('{s}', 'g1', Array()));
+ }
+
+ protected function doXReadTest() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ $row = Array('f1' => 'v1', 'f2' => 'v2');
+ $msgdata = Array(
+ '{stream}-1' => $row,
+ '{stream}-2' => $row,
+ );
+
+ /* Append a bit of data and populate STREAM queries */
+ $this->redis->del(array_keys($msgdata));
+ foreach ($msgdata as $key => $message) {
+ for ($r = 0; $r < 2; $r++) {
+ $id = $this->redis->xAdd($key, '*', $message);
+ $qresult[$key][$id] = $message;
+ }
+ $qzero[$key] = 0;
+ $qnew[$key] = '$';
+ $keys[] = $key;
+ }
+
+ /* Everything from both streams */
+ $rmsg = $this->redis->xRead($qzero);
+ $this->assertEquals($rmsg, $qresult);
+
+ /* Test COUNT option */
+ for ($count = 1; $count <= 2; $count++) {
+ $rmsg = $this->redis->xRead($qzero, $count);
+ foreach ($keys as $key) {
+ $this->assertEquals(count($rmsg[$key]), $count);
+ }
+ }
+
+ /* Should be empty (no new entries) */
+ $this->assertEquals(count($this->redis->xRead($qnew)),0);
+
+ /* Test against a specific ID */
+ $id = $this->redis->xAdd('{stream}-1', '*', $row);
+ $new_id = $this->redis->xAdd('{stream}-1', '*', Array('final' => 'row'));
+ $rmsg = $this->redis->xRead(Array('{stream}-1' => $id));
+ $this->assertEquals(
+ $this->redis->xRead(Array('{stream}-1' => $id)),
+ Array('{stream}-1' => Array($new_id => Array('final' => 'row')))
+ );
+
+ /* Emtpy query should fail */
+ $this->assertFalse($this->redis->xRead(Array()));
+ }
+
+ public function testXRead() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ foreach ($this->serializers as $serializer) {
+ $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer);
+ $this->doXReadTest();
+ }
+
+ /* Don't need to test BLOCK multiple times */
+ $m1 = round(microtime(true)*1000);
+ $this->redis->xRead(Array('somestream' => '$'), -1, 100);
+ $m2 = round(microtime(true)*1000);
+ $this->assertTrue($m2 - $m1 >= 100);
+ }
+
+ protected function compareStreamIds($redis, $control) {
+ foreach ($control as $stream => $ids) {
+ $rcount = count($redis[$stream]);
+ $lcount = count($control[$stream]);
+
+ /* We should have the same number of messages */
+ $this->assertEquals($rcount, $lcount);
+
+ /* We should have the exact same IDs */
+ foreach ($ids as $k => $id) {
+ $this->assertTrue(isset($redis[$stream][$id]));
+ }
+ }
+ }
+
+ public function testXReadGroup() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ /* Create some streams and groups */
+ $streams = Array('{s}-1', '{s}-2');
+ $qstreams = Array('{s}-1' => 0, '{s}-2' => 0);
+ $groups = Array('g1' => 0, 'g2' => 0);
+
+ $ids = $this->addStreamsAndGroups($streams, 3, $groups);
+
+ /* Test that we get get the IDs we should */
+ foreach (Array('g1', 'g2') as $group) {
+ foreach ($ids as $stream => $messages) {
+ while ($ids[$stream]) {
+ /* Read more messages */
+ $resp = $this->redis->xReadGroup($group, 'consumer', $qstreams);
+
+ /* They should match with our local control array */
+ $this->compareStreamIds($resp, $ids);
+
+ /* Remove a message from our control *and* XACK it in Redis */
+ $id = array_shift($ids[$stream]);
+ $this->redis->xAck($stream, $group, Array($id));
+ }
+ }
+ }
+
+ /* Test COUNT option */
+ for ($c = 1; $c <= 3; $c++) {
+ $this->addStreamsAndGroups($streams, 3, $groups);
+ $resp = $this->redis->xReadGroup('g1', 'consumer', $qstreams, $c);
+
+ foreach ($resp as $stream => $smsg) {
+ $this->assertEquals(count($smsg), $c);
+ }
+ }
+
+ /* Finally test BLOCK with a sloppy timing test */
+ $t1 = $this->mstime();
+ $qnew = Array('{s}-1' => '>', '{s}-2' => '>');
+ $this->redis->xReadGroup('g1', 'c1', $qnew, -1, 100);
+ $t2 = $this->mstime();
+ $this->assertTrue($t2 - $t1 >= 100);
+ }
+
+ public function testXPending() {
+ if (!$this->minVersionCheck("5.0")) {
+ return $this->markTestSkipped();
+ }
+
+ $rows = 5;
+ $this->addStreamsAndGroups(Array('s'), $rows, Array('group' => 0));
+
+ $msg = $this->redis->xReadGroup('group', 'consumer', Array('s' => 0));
+ $ids = array_keys($msg['s']);
+
+ for ($n = count($ids); $n >= 0; $n--) {
+ $xp = $this->redis->xPending('s', 'group');
+ $this->assertEquals($xp[0], count($ids));
+
+ /* Verify we're seeing the IDs themselves */
+ for ($idx = 1; $idx <= 2; $idx++) {
+ if ($xp[$idx]) {
+ $this->assertPatternMatch($xp[$idx], "/^[0-9].*-[0-9].*/");
+ }
+ }
+
+ if ($ids) {
+ $id = array_shift($ids);
+ $this->redis->xAck('s', 'group', Array($id));
+ }
+ }
+ }
+
+ public function testXDel() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ for ($n = 5; $n > 0; $n--) {
+ $ids = $this->addStreamEntries('s', 5);
+ $todel = array_slice($ids, 0, $n);
+ $this->assertEquals($this->redis->xDel('s', $todel), count($todel));
+ }
+
+ /* Empty array should fail */
+ $this->assertFalse($this->redis->xDel('s', Array()));
+ }
+
+ public function testXTrim() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ for ($maxlen = 0; $maxlen <= 50; $maxlen += 10) {
+ $this->addStreamEntries('stream', 100);
+ $trimmed = $this->redis->xTrim('stream', $maxlen);
+ $this->assertEquals($trimmed, 100 - $maxlen);
+ }
+
+ /* APPROX trimming isn't easily deterministic, so just make sure we
+ can call it with the flag */
+ $this->addStreamEntries('stream', 100);
+ $this->assertFalse($this->redis->xTrim('stream', 1, true) === false);
+ }
+
+ /* XCLAIM is one of the most complicated commands, with a great deal of different options
+ * The following test attempts to verify every combination of every possible option. */
+ public function testXClaim() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ foreach (Array(0, 100) as $min_idle_time) {
+ foreach (Array(false, true) as $justid) {
+ foreach (Array(0, 10) as $retrycount) {
+ /* We need to test not passing TIME/IDLE as well as passing either */
+ if ($min_idle_time == 0) {
+ $topts = Array(Array(), Array('IDLE', 1000000), Array('TIME', time() * 1000));
+ } else {
+ $topts = Array(NULL);
+ }
+
+ foreach ($topts as $tinfo) {
+ if ($tinfo) {
+ list($ttype, $tvalue) = $tinfo;
+ } else {
+ $ttype = NULL; $tvalue = NULL;
+ }
+
+ /* Add some messages and create a group */
+ $this->addStreamsAndGroups(Array('s'), 10, Array('group1' => 0));
+
+ /* Create a second stream we can FORCE ownership on */
+ $fids = $this->addStreamsAndGroups(Array('f'), 10, Array('group1' => 0));
+ $fids = $fids['f'];
+
+ /* Have consumer 'Mike' read the messages */
+ $oids = $this->redis->xReadGroup('group1', 'Mike', Array('s' => 0));
+ $oids = array_keys($oids['s']); /* We're only dealing with stream 's' */
+
+ /* Construct our options array */
+ $opts = Array();
+ if ($justid) $opts[] = 'JUSTID';
+ if ($retrycount) $opts['RETRYCOUNT'] = $retrycount;
+ if ($tvalue !== NULL) $opts[$ttype] = $tvalue;
+
+ /* Now have pavlo XCLAIM them */
+ $cids = $this->redis->xClaim('s', 'group1', 'Pavlo', $min_idle_time, $oids, $opts);
+ if (!$justid) $cids = array_keys($cids);
+
+ if ($min_idle_time == 0) {
+ $this->assertEquals($cids, $oids);
+
+ /* Append the FORCE option to our second stream where we have not already
+ * assigned to a PEL group */
+ $opts[] = 'FORCE';
+ $freturn = $this->redis->xClaim('f', 'group1', 'Test', 0, $fids, $opts);
+ if (!$justid) $freturn = array_keys($freturn);
+ $this->assertEquals($freturn, $fids);
+
+ if ($retrycount || $tvalue !== NULL) {
+ $pending = $this->redis->xPending('s', 'group1', 0, '+', 1, 'Pavlo');
+
+ if ($retrycount) {
+ $this->assertEquals($pending[0][3], $retrycount);
+ }
+ if ($tvalue !== NULL) {
+ if ($ttype == 'IDLE') {
+ /* If testing IDLE the value must be >= what we set */
+ $this->assertTrue($pending[0][2] >= $tvalue);
+ } else {
+ /* Timing tests are notoriously irritating but I don't see
+ * how we'll get >= 20,000 ms between XCLAIM and XPENDING no
+ * matter how slow the machine/VM running the tests is */
+ $this->assertTrue($pending[0][2] <= 20000);
+ }
+ }
+ }
+ } else {
+ /* We're verifying that we get no messages when we've set 100 seconds
+ * as our idle time, which should match nothing */
+ $this->assertEquals($cids, Array());
+ }
+ }
+ }
+ }
+ }
+ }
+
public function testSession_savedToRedis()
{
$this->setSessionHandler();
diff --git a/tests/TestSuite.php b/tests/TestSuite.php
index 461ac7dc..c2d68259 100644
--- a/tests/TestSuite.php
+++ b/tests/TestSuite.php
@@ -65,7 +65,14 @@ class TestSuite {
}
protected function assertFalse($bool) {
- return $this->assertTrue(!$bool);
+ if(!$bool)
+ return true;
+
+ $bt = debug_backtrace(false);
+ self::$errors []= sprintf("Assertion failed: %s:%d (%s)\n",
+ $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]);
+
+ return false;
}
protected function assertTrue($bool) {
@@ -99,6 +106,15 @@ class TestSuite {
$bt[0]["file"], $bt[0]["line"], $bt[1]["function"]);
}
+ protected function assertPatternMatch($str_test, $str_regex) {
+ if (preg_match($str_regex, $str_test))
+ return;
+
+ $bt = debug_backtrace(false);
+ self::$errors []= sprintf("Assertion failed ('%s' doesnt match '%s'): %s:%d (%s)\n",
+ $str_test, $str_regex, $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]);
+ }
+
protected function markTestSkipped($msg='') {
$bt = debug_backtrace(false);
self::$warnings []= sprintf("Skipped test: %s:%d (%s) %s\n",