diff options
author | Michael Grunder <michael.grunder@gmail.com> | 2018-09-29 21:59:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-29 21:59:01 +0300 |
commit | 2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch) | |
tree | 6982b1e1f17b7cf2fc7e024652fad8212edadacd /tests | |
parent | bfd274712eeb372926d1106b3da3c4fc19c0a48a (diff) |
Streams (#1413)
Streams API
Diffstat (limited to 'tests')
-rw-r--r-- | tests/RedisTest.php | 444 | ||||
-rw-r--r-- | tests/TestSuite.php | 18 |
2 files changed, 461 insertions, 1 deletions
diff --git a/tests/RedisTest.php b/tests/RedisTest.php index b5da01e8..df650ee5 100644 --- a/tests/RedisTest.php +++ b/tests/RedisTest.php @@ -16,6 +16,11 @@ class Redis_Test extends TestSuite 'Cupertino' => Array(-122.032182, 37.322998) ); + protected $serializers = Array( + Redis::SERIALIZER_NONE, + Redis::SERIALIZER_PHP, + ); + /** * @var Redis */ @@ -35,12 +40,20 @@ class Redis_Test extends TestSuite $this->redis = $this->newInstance(); $info = $this->redis->info(); $this->version = (isset($info['redis_version'])?$info['redis_version']:'0.0.0'); + + if (defined('Redis::SERIALIZER_IGBINARY')) { + $this->serializers[] = Redis::SERIALIZER_IGBINARY; + } } protected function minVersionCheck($version) { return version_compare($this->version, $version, "ge"); } + protected function mstime() { + return round(microtime(true)*1000); + } + protected function newInstance() { $r = new Redis(); @@ -5223,6 +5236,437 @@ class Redis_Test extends TestSuite $this->assertEquals($this->redis->lrange('mylist', 0, -1), Array('A','B','C','D')); } + /* STREAMS */ + + protected function addStreamEntries($key, $count) { + $ids = Array(); + + $this->redis->del($key); + + for ($i = 0; $i < $count; $i++) { + $ids[] = $this->redis->xAdd($key, '*', Array('field' => "value:$i")); + } + + return $ids; + } + + protected function addStreamsAndGroups($arr_streams, $count, $arr_groups) { + $ids = Array(); + + foreach ($arr_streams as $str_stream) { + $ids[$str_stream] = $this->addStreamEntries($str_stream, $count); + foreach ($arr_groups as $str_group => $str_id) { + $this->redis->xGroup('CREATE', $str_stream, $str_group, $str_id); + } + } + + return $ids; + } + + public function testXAdd() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + $this->redis->del('stream'); + for ($i = 0; $i < 5; $i++) { + $id = $this->redis->xAdd("stream", '*', Array('k1' => 'v1', 'k2' => 'v2')); + $this->assertEquals($this->redis->xLen('stream'), $i+1); + + /* Redis should return <timestamp>-<sequence> */ + $bits = explode('-', $id); + $this->assertEquals(count($bits), 2); + $this->assertTrue(is_numeric($bits[0])); + $this->assertTrue(is_numeric($bits[1])); + } + + /* Test an absolute maximum length */ + for ($i = 0; $i < 100; $i++) { + $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10); + } + $this->assertEquals($this->redis->xLen('stream'), 10); + + /* Not the greatest test but I'm unsure if approximate trimming is + * totally deterministic, so just make sure we are able to add with + * an approximate maxlen argument structure */ + $id = $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10, true); + $this->assertEquals(count(explode('-', $id)), 2); + + /* Empty message should fail */ + $this->redis->xAdd('stream', '*', Array()); + } + + protected function doXRangeTest($reverse) { + $key = '{stream}'; + + if ($reverse) { + list($cmd,$a1,$a2) = Array('xRevRange', '+', 0); + } else { + list($cmd,$a1,$a2) = Array('xRange', 0, '+'); + } + + $this->redis->del($key); + for ($i = 0; $i < 3; $i++) { + $msg = Array('field' => "value:$i"); + $id = $this->redis->xAdd($key, '*', $msg); + $rows[$id] = $msg; + } + + $messages = $this->redis->$cmd($key, $a1, $a2); + $this->assertEquals(count($messages), 3); + + $i = $reverse ? 2 : 0; + foreach ($messages as $seq => $v) { + $this->assertEquals(count(explode('-', $seq)), 2); + $this->assertEquals($v, Array('field' => "value:$i")); + $i += $reverse ? -1 : 1; + } + + /* Test COUNT option */ + for ($count = 1; $count <= 3; $count++) { + $messages = $this->redis->$cmd($key, $a1, $a2, $count); + $this->assertEquals(count($messages), $count); + } + } + + public function testXRange() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + foreach (Array(false, true) as $reverse) { + foreach ($this->serializers as $serializer) { + foreach (Array(NULL, 'prefix:') as $prefix) { + $this->redis->setOption(Redis::OPT_PREFIX, $prefix); + $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer); + $this->doXRangeTest($reverse); + } + } + } + } + + protected function testXLen() { + if (!$this->minVersionCheck("5.0")) + $this->markTestSkipped(); + + $this->redis->del('{stream}'); + for ($i = 0; $i < 5; $i++) { + $this->redis->xadd('{stream}', '*', Array('foo' => 'bar')); + $this->assertEquals($this->redis->xLen('{stream}'), $i+1); + } + } + + public function testXGroup() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + $this->addStreamEntries('s', 2); + + /* CREATE */ + $this->assertTrue($this->redis->xGroup('CREATE', 's', 'mygroup', '$')); + $this->assertFalse($this->redis->xGroup('CREATE', 's', 'mygroup', 'BAD_ID')); + + /* BUSYGROUP */ + $this->redis->xGroup('CREATE', 's', 'mygroup', '$'); + $this->assertTrue(strpos($this->redis->getLastError(), 'BUSYGROUP') === 0); + + /* SETID */ + $this->assertTrue($this->redis->xGroup('SETID', 's', 'mygroup', '$')); + $this->assertFalse($this->redis->xGroup('SETID', 's', 'mygroup', 'BAD_ID')); + + $this->assertEquals($this->redis->xGroup('DELCONSUMER', 's', 'mygroup', 'myconsumer'),0); + + /* DELGROUP not yet implemented in Redis */ + } + + public function testXAck() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + for ($n = 1; $n <= 3; $n++) { + $this->addStreamsAndGroups(Array('{s}'), 3, Array('g1' => 0)); + $msg = $this->redis->xReadGroup('g1', 'c1', Array('{s}' => 0)); + + /* Extract IDs */ + $smsg = array_shift($msg); + $ids = array_keys($smsg); + + /* Now ACK $n messages */ + $ids = array_slice($ids, 0, $n); + $this->assertEquals($this->redis->xAck('{s}', 'g1', $ids), $n); + } + + /* Verify sending no IDs is a failure */ + $this->assertFalse($this->redis->xAck('{s}', 'g1', Array())); + } + + protected function doXReadTest() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + $row = Array('f1' => 'v1', 'f2' => 'v2'); + $msgdata = Array( + '{stream}-1' => $row, + '{stream}-2' => $row, + ); + + /* Append a bit of data and populate STREAM queries */ + $this->redis->del(array_keys($msgdata)); + foreach ($msgdata as $key => $message) { + for ($r = 0; $r < 2; $r++) { + $id = $this->redis->xAdd($key, '*', $message); + $qresult[$key][$id] = $message; + } + $qzero[$key] = 0; + $qnew[$key] = '$'; + $keys[] = $key; + } + + /* Everything from both streams */ + $rmsg = $this->redis->xRead($qzero); + $this->assertEquals($rmsg, $qresult); + + /* Test COUNT option */ + for ($count = 1; $count <= 2; $count++) { + $rmsg = $this->redis->xRead($qzero, $count); + foreach ($keys as $key) { + $this->assertEquals(count($rmsg[$key]), $count); + } + } + + /* Should be empty (no new entries) */ + $this->assertEquals(count($this->redis->xRead($qnew)),0); + + /* Test against a specific ID */ + $id = $this->redis->xAdd('{stream}-1', '*', $row); + $new_id = $this->redis->xAdd('{stream}-1', '*', Array('final' => 'row')); + $rmsg = $this->redis->xRead(Array('{stream}-1' => $id)); + $this->assertEquals( + $this->redis->xRead(Array('{stream}-1' => $id)), + Array('{stream}-1' => Array($new_id => Array('final' => 'row'))) + ); + + /* Emtpy query should fail */ + $this->assertFalse($this->redis->xRead(Array())); + } + + public function testXRead() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + foreach ($this->serializers as $serializer) { + $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer); + $this->doXReadTest(); + } + + /* Don't need to test BLOCK multiple times */ + $m1 = round(microtime(true)*1000); + $this->redis->xRead(Array('somestream' => '$'), -1, 100); + $m2 = round(microtime(true)*1000); + $this->assertTrue($m2 - $m1 >= 100); + } + + protected function compareStreamIds($redis, $control) { + foreach ($control as $stream => $ids) { + $rcount = count($redis[$stream]); + $lcount = count($control[$stream]); + + /* We should have the same number of messages */ + $this->assertEquals($rcount, $lcount); + + /* We should have the exact same IDs */ + foreach ($ids as $k => $id) { + $this->assertTrue(isset($redis[$stream][$id])); + } + } + } + + public function testXReadGroup() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + /* Create some streams and groups */ + $streams = Array('{s}-1', '{s}-2'); + $qstreams = Array('{s}-1' => 0, '{s}-2' => 0); + $groups = Array('g1' => 0, 'g2' => 0); + + $ids = $this->addStreamsAndGroups($streams, 3, $groups); + + /* Test that we get get the IDs we should */ + foreach (Array('g1', 'g2') as $group) { + foreach ($ids as $stream => $messages) { + while ($ids[$stream]) { + /* Read more messages */ + $resp = $this->redis->xReadGroup($group, 'consumer', $qstreams); + + /* They should match with our local control array */ + $this->compareStreamIds($resp, $ids); + + /* Remove a message from our control *and* XACK it in Redis */ + $id = array_shift($ids[$stream]); + $this->redis->xAck($stream, $group, Array($id)); + } + } + } + + /* Test COUNT option */ + for ($c = 1; $c <= 3; $c++) { + $this->addStreamsAndGroups($streams, 3, $groups); + $resp = $this->redis->xReadGroup('g1', 'consumer', $qstreams, $c); + + foreach ($resp as $stream => $smsg) { + $this->assertEquals(count($smsg), $c); + } + } + + /* Finally test BLOCK with a sloppy timing test */ + $t1 = $this->mstime(); + $qnew = Array('{s}-1' => '>', '{s}-2' => '>'); + $this->redis->xReadGroup('g1', 'c1', $qnew, -1, 100); + $t2 = $this->mstime(); + $this->assertTrue($t2 - $t1 >= 100); + } + + public function testXPending() { + if (!$this->minVersionCheck("5.0")) { + return $this->markTestSkipped(); + } + + $rows = 5; + $this->addStreamsAndGroups(Array('s'), $rows, Array('group' => 0)); + + $msg = $this->redis->xReadGroup('group', 'consumer', Array('s' => 0)); + $ids = array_keys($msg['s']); + + for ($n = count($ids); $n >= 0; $n--) { + $xp = $this->redis->xPending('s', 'group'); + $this->assertEquals($xp[0], count($ids)); + + /* Verify we're seeing the IDs themselves */ + for ($idx = 1; $idx <= 2; $idx++) { + if ($xp[$idx]) { + $this->assertPatternMatch($xp[$idx], "/^[0-9].*-[0-9].*/"); + } + } + + if ($ids) { + $id = array_shift($ids); + $this->redis->xAck('s', 'group', Array($id)); + } + } + } + + public function testXDel() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + for ($n = 5; $n > 0; $n--) { + $ids = $this->addStreamEntries('s', 5); + $todel = array_slice($ids, 0, $n); + $this->assertEquals($this->redis->xDel('s', $todel), count($todel)); + } + + /* Empty array should fail */ + $this->assertFalse($this->redis->xDel('s', Array())); + } + + public function testXTrim() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + for ($maxlen = 0; $maxlen <= 50; $maxlen += 10) { + $this->addStreamEntries('stream', 100); + $trimmed = $this->redis->xTrim('stream', $maxlen); + $this->assertEquals($trimmed, 100 - $maxlen); + } + + /* APPROX trimming isn't easily deterministic, so just make sure we + can call it with the flag */ + $this->addStreamEntries('stream', 100); + $this->assertFalse($this->redis->xTrim('stream', 1, true) === false); + } + + /* XCLAIM is one of the most complicated commands, with a great deal of different options + * The following test attempts to verify every combination of every possible option. */ + public function testXClaim() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + foreach (Array(0, 100) as $min_idle_time) { + foreach (Array(false, true) as $justid) { + foreach (Array(0, 10) as $retrycount) { + /* We need to test not passing TIME/IDLE as well as passing either */ + if ($min_idle_time == 0) { + $topts = Array(Array(), Array('IDLE', 1000000), Array('TIME', time() * 1000)); + } else { + $topts = Array(NULL); + } + + foreach ($topts as $tinfo) { + if ($tinfo) { + list($ttype, $tvalue) = $tinfo; + } else { + $ttype = NULL; $tvalue = NULL; + } + + /* Add some messages and create a group */ + $this->addStreamsAndGroups(Array('s'), 10, Array('group1' => 0)); + + /* Create a second stream we can FORCE ownership on */ + $fids = $this->addStreamsAndGroups(Array('f'), 10, Array('group1' => 0)); + $fids = $fids['f']; + + /* Have consumer 'Mike' read the messages */ + $oids = $this->redis->xReadGroup('group1', 'Mike', Array('s' => 0)); + $oids = array_keys($oids['s']); /* We're only dealing with stream 's' */ + + /* Construct our options array */ + $opts = Array(); + if ($justid) $opts[] = 'JUSTID'; + if ($retrycount) $opts['RETRYCOUNT'] = $retrycount; + if ($tvalue !== NULL) $opts[$ttype] = $tvalue; + + /* Now have pavlo XCLAIM them */ + $cids = $this->redis->xClaim('s', 'group1', 'Pavlo', $min_idle_time, $oids, $opts); + if (!$justid) $cids = array_keys($cids); + + if ($min_idle_time == 0) { + $this->assertEquals($cids, $oids); + + /* Append the FORCE option to our second stream where we have not already + * assigned to a PEL group */ + $opts[] = 'FORCE'; + $freturn = $this->redis->xClaim('f', 'group1', 'Test', 0, $fids, $opts); + if (!$justid) $freturn = array_keys($freturn); + $this->assertEquals($freturn, $fids); + + if ($retrycount || $tvalue !== NULL) { + $pending = $this->redis->xPending('s', 'group1', 0, '+', 1, 'Pavlo'); + + if ($retrycount) { + $this->assertEquals($pending[0][3], $retrycount); + } + if ($tvalue !== NULL) { + if ($ttype == 'IDLE') { + /* If testing IDLE the value must be >= what we set */ + $this->assertTrue($pending[0][2] >= $tvalue); + } else { + /* Timing tests are notoriously irritating but I don't see + * how we'll get >= 20,000 ms between XCLAIM and XPENDING no + * matter how slow the machine/VM running the tests is */ + $this->assertTrue($pending[0][2] <= 20000); + } + } + } + } else { + /* We're verifying that we get no messages when we've set 100 seconds + * as our idle time, which should match nothing */ + $this->assertEquals($cids, Array()); + } + } + } + } + } + } + public function testSession_savedToRedis() { $this->setSessionHandler(); diff --git a/tests/TestSuite.php b/tests/TestSuite.php index 461ac7dc..c2d68259 100644 --- a/tests/TestSuite.php +++ b/tests/TestSuite.php @@ -65,7 +65,14 @@ class TestSuite { } protected function assertFalse($bool) { - return $this->assertTrue(!$bool); + if(!$bool) + return true; + + $bt = debug_backtrace(false); + self::$errors []= sprintf("Assertion failed: %s:%d (%s)\n", + $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]); + + return false; } protected function assertTrue($bool) { @@ -99,6 +106,15 @@ class TestSuite { $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]); } + protected function assertPatternMatch($str_test, $str_regex) { + if (preg_match($str_regex, $str_test)) + return; + + $bt = debug_backtrace(false); + self::$errors []= sprintf("Assertion failed ('%s' doesnt match '%s'): %s:%d (%s)\n", + $str_test, $str_regex, $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]); + } + protected function markTestSkipped($msg='') { $bt = debug_backtrace(false); self::$warnings []= sprintf("Skipped test: %s:%d (%s) %s\n", |