diff options
author | Nicolas Favre-Felix <n.favrefelix@gmail.com> | 2011-09-04 18:31:34 +0400 |
---|---|---|
committer | Nicolas Favre-Felix <n.favrefelix@gmail.com> | 2011-09-04 18:31:34 +0400 |
commit | 732366a739112303d356c217f80dd20348a2a560 (patch) | |
tree | d9acae8c55ccdcb799fc5693a97827fad0d474f4 | |
parent | 68abdf4282bae996b5796d321adeaf042c878dd3 (diff) |
Multi/exec, per node.
-rw-r--r-- | redis_array.c | 93 | ||||
-rw-r--r-- | redis_array.h | 16 | ||||
-rw-r--r-- | redis_array_impl.c | 32 | ||||
-rw-r--r-- | redis_array_impl.h | 3 | ||||
-rw-r--r-- | tests/array-rehash.php | 153 |
5 files changed, 261 insertions, 36 deletions
diff --git a/redis_array.c b/redis_array.c index 7d32aabd..444e2597 100644 --- a/redis_array.c +++ b/redis_array.c @@ -56,6 +56,10 @@ zend_function_entry redis_array_functions[] = { PHP_ME(RedisArray, getOption, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisArray, setOption, NULL, ZEND_ACC_PUBLIC) + /* Multi/Exec */ + PHP_ME(RedisArray, multi, NULL, ZEND_ACC_PUBLIC) + PHP_ME(RedisArray, exec, NULL, ZEND_ACC_PUBLIC) + /* Aliases */ PHP_MALIAS(RedisArray, delete, del, NULL, ZEND_ACC_PUBLIC) PHP_MALIAS(RedisArray, getMultiple, mget, NULL, ZEND_ACC_PUBLIC) @@ -247,23 +251,27 @@ ra_forward_call(INTERNAL_FUNCTION_PARAMETERS, RedisArray *ra, const char *cmd, i h_args = Z_ARRVAL_P(z_args); argc = zend_hash_num_elements(h_args); - /* extract key and hash it. */ - if(!(key = ra_find_key(ra, z_args, cmd, &key_len))) { - php_error_docref(NULL TSRMLS_CC, E_ERROR, "Could not find key"); - RETURN_FALSE; - } + if(ra->z_multi_exec) { + redis_inst = ra->z_multi_exec; /* we already have the instance */ + } else { + /* extract key and hash it. */ + if(!(key = ra_find_key(ra, z_args, cmd, &key_len))) { + php_error_docref(NULL TSRMLS_CC, E_ERROR, "Could not find key"); + RETURN_FALSE; + } - /* find node */ - redis_inst = ra_find_node(ra, key, key_len, NULL TSRMLS_CC); - if(!redis_inst) { - php_error_docref(NULL TSRMLS_CC, E_ERROR, "Could not find any redis servers for this key."); - RETURN_FALSE; + /* find node */ + redis_inst = ra_find_node(ra, key, key_len, NULL TSRMLS_CC); + if(!redis_inst) { + php_error_docref(NULL TSRMLS_CC, E_ERROR, "Could not find any redis servers for this key."); + RETURN_FALSE; + } } /* check if write cmd */ b_write_cmd = ra_is_write_cmd(ra, cmd, cmd_len); - if(ra->index && b_write_cmd) { // add MULTI + SADD + if(ra->index && b_write_cmd && !ra->z_multi_exec) { // add MULTI + SADD ra_index_multi(redis_inst TSRMLS_CC); } @@ -280,6 +288,13 @@ ra_forward_call(INTERNAL_FUNCTION_PARAMETERS, RedisArray *ra, const char *cmd, i z_callargs[i] = *zp_tmp; } + /* multi/exec */ + if(ra->z_multi_exec) { + call_user_function(&redis_ce->function_table, &ra->z_multi_exec, &z_fun, return_value, argc, z_callargs TSRMLS_CC); + efree(z_callargs); + RETURN_ZVAL(getThis(), 1, 0); + } + /* CALL! */ if(ra->index && b_write_cmd) { // call using discarded temp value and extract exec results after. @@ -290,7 +305,7 @@ ra_forward_call(INTERNAL_FUNCTION_PARAMETERS, RedisArray *ra, const char *cmd, i ra_index_key(key, key_len, redis_inst TSRMLS_CC); // call EXEC - ra_index_exec(redis_inst, return_value TSRMLS_CC); + ra_index_exec(redis_inst, return_value, 0 TSRMLS_CC); } else { // call directly through. call_user_function(&redis_ce->function_table, &redis_inst, &z_fun, return_value, argc, z_callargs TSRMLS_CC); @@ -883,3 +898,57 @@ PHP_METHOD(RedisArray, del) efree(z_args); RETURN_LONG(total); } + +PHP_METHOD(RedisArray, multi) +{ + zval *object; + RedisArray *ra; + zval *z_redis; + char *host; + int host_len; + + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os", + &object, redis_array_ce, &host, &host_len) == FAILURE) { + RETURN_FALSE; + } + + if (redis_array_get(object, &ra TSRMLS_CC) < 0) { + RETURN_FALSE; + } + + /* find node */ + z_redis = ra_find_node_by_name(ra, host, host_len TSRMLS_CC); + if(!z_redis) { + RETURN_FALSE; + } + + /* save multi object */ + ra->z_multi_exec = z_redis; + + /* switch redis instance to multi/exec mode. */ + ra_index_multi(z_redis TSRMLS_CC); + + /* return this. */ + RETURN_ZVAL(object, 1, 0); +} + +PHP_METHOD(RedisArray, exec) +{ + zval *object; + RedisArray *ra; + + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "O", + &object, redis_array_ce) == FAILURE) { + RETURN_FALSE; + } + + if (redis_array_get(object, &ra TSRMLS_CC) < 0 || !ra->z_multi_exec) { + RETURN_FALSE; + } + + /* switch redis instance out of multi/exec mode. */ + ra_index_exec(ra->z_multi_exec, return_value, 1 TSRMLS_CC); + + /* remove multi object */ + ra->z_multi_exec = NULL; +} diff --git a/redis_array.h b/redis_array.h index 84f5db30..613a6ae7 100644 --- a/redis_array.h +++ b/redis_array.h @@ -22,17 +22,21 @@ PHP_METHOD(RedisArray, del); PHP_METHOD(RedisArray, getOption); PHP_METHOD(RedisArray, setOption); +PHP_METHOD(RedisArray, multi); +PHP_METHOD(RedisArray, exec); + typedef struct RedisArray_ { int count; - char **hosts; - zval **redis; - zend_bool index; - zval *z_fun; /* key extractor */ - zval *z_pure_cmds; /* hash table */ + char **hosts; /* array of host:port strings */ + zval **redis; /* array of Redis instances */ + zval *z_multi_exec; /* Redis instance to be used in multi-exec */ + zend_bool index; /* use per-node index */ + zend_bool auto_rehash; /* migrate keys on read operations */ + zval *z_fun; /* key extractor, callable */ + zval *z_pure_cmds; /* hash table */ - int auto_rehash; /* migrate keys on read operations */ struct RedisArray_ *prev; } RedisArray; diff --git a/redis_array_impl.c b/redis_array_impl.c index 35af3630..8a789031 100644 --- a/redis_array_impl.c +++ b/redis_array_impl.c @@ -238,7 +238,9 @@ ra_make_array(HashTable *hosts, zval *z_fun, HashTable *hosts_prev, zend_bool b_ ra->redis = emalloc(count * sizeof(zval*)); ra->count = count; ra->z_fun = NULL; + ra->z_multi_exec = NULL; ra->index = b_index; + ra->auto_rehash = 0; /* init array data structures */ ra_init_function_table(ra); @@ -337,6 +339,18 @@ ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_D return ra->redis[pos]; } +zval * +ra_find_node_by_name(RedisArray *ra, const char *host, int host_len TSRMLS_DC) { + + int i; + for(i = 0; i < ra->count; ++i) { + if(strncmp(ra->hosts[i], host, host_len) == 0) { + return ra->redis[i]; + } + } + return NULL; +} + char * ra_find_key(RedisArray *ra, zval *z_args, const char *cmd, int *key_len) { @@ -391,7 +405,7 @@ ra_index_key(const char *key, int key_len, zval *z_redis TSRMLS_DC) { } void -ra_index_exec(zval *z_redis, zval *return_value TSRMLS_DC) { +ra_index_exec(zval *z_redis, zval *return_value, int keep_all TSRMLS_DC) { zval z_fun_exec, z_ret, **zp_tmp; @@ -399,11 +413,17 @@ ra_index_exec(zval *z_redis, zval *return_value TSRMLS_DC) { ZVAL_STRING(&z_fun_exec, "EXEC", 0); call_user_function(&redis_ce->function_table, &z_redis, &z_fun_exec, &z_ret, 0, NULL TSRMLS_CC); + /* extract first element of exec array and put into return_value. */ if(Z_TYPE(z_ret) == IS_ARRAY) { - if(return_value && zend_hash_quick_find(Z_ARRVAL(z_ret), NULL, 0, 0, (void**)&zp_tmp) != FAILURE) { - *return_value = **zp_tmp; - zval_copy_ctor(return_value); + if(return_value) { + if(keep_all) { + *return_value = z_ret; + zval_copy_ctor(return_value); + } else if(zend_hash_quick_find(Z_ARRVAL(z_ret), NULL, 0, 0, (void**)&zp_tmp) != FAILURE) { + *return_value = **zp_tmp; + zval_copy_ctor(return_value); + } } zval_dtor(&z_ret); } @@ -549,7 +569,7 @@ ra_del_key(const char *key, int key_len, zval *z_from TSRMLS_DC) { ra_remove_from_index(z_from, key, key_len TSRMLS_CC); /* close transaction */ - ra_index_exec(z_from, NULL TSRMLS_CC); + ra_index_exec(z_from, NULL, 0 TSRMLS_CC); } static zend_bool @@ -815,7 +835,7 @@ ra_move_key(const char *key, int key_len, zval *z_from, zval *z_to TSRMLS_DC) { } /* close transaction */ - ra_index_exec(z_to, NULL TSRMLS_CC); + ra_index_exec(z_to, NULL, 0 TSRMLS_CC); } /* callback with the current progress, with hostname and count */ diff --git a/redis_array_impl.h b/redis_array_impl.h index 99472999..a9be177c 100644 --- a/redis_array_impl.h +++ b/redis_array_impl.h @@ -8,6 +8,7 @@ RedisArray* ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC); RedisArray *ra_load_array(const char *name TSRMLS_DC); RedisArray *ra_make_array(HashTable *hosts, zval *z_fun, HashTable *hosts_prev, zend_bool b_index TSRMLS_DC); +zval *ra_find_node_by_name(RedisArray *ra, const char *host, int host_len TSRMLS_DC); zval *ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_DC); void ra_init_function_table(RedisArray *ra); @@ -16,7 +17,7 @@ char * ra_find_key(RedisArray *ra, zval *z_args, const char *cmd, int *key_len); void ra_index_multi(zval *z_redis TSRMLS_DC); void ra_index_key(const char *key, int key_len, zval *z_redis TSRMLS_DC); -void ra_index_exec(zval *z_redis, zval *return_value TSRMLS_DC); +void ra_index_exec(zval *z_redis, zval *return_value, int keep_all TSRMLS_DC); zend_bool ra_is_write_cmd(RedisArray *ra, const char *cmd, int cmd_len); void ra_rehash(RedisArray *ra, zval *z_cb TSRMLS_DC); diff --git a/tests/array-rehash.php b/tests/array-rehash.php index 394ce628..5e50ede7 100644 --- a/tests/array-rehash.php +++ b/tests/array-rehash.php @@ -3,11 +3,6 @@ require_once 'PHPUnit.php'; echo "Redis Array rehashing tests.\n\n"; -$newRing = array('localhost:6379', 'localhost:6380', 'localhost:6381'); -$oldRing = array(); -$serverList = array('localhost:6379', 'localhost:6380', 'localhost:6381', 'localhost:6382'); - - class Redis_Rehashing_Test extends PHPUnit_TestCase { @@ -24,7 +19,7 @@ class Redis_Rehashing_Test extends PHPUnit_TestCase public function setUp() { // initialize strings. - $n = 1; + $n = 1000; $this->strings = array(); for($i = 0; $i < $n; $i++) { $this->strings['key-'.$i] = 'val-'.$i; @@ -160,9 +155,9 @@ class Redis_Rehashing_Test extends PHPUnit_TestCase // add a new node. public function testCreateSecondRing() { - global $newRing, $oldRing; + global $newRing, $oldRing, $serverList; $oldRing = $newRing; // back up the original. - $newRing []= 'localhost:6382'; // add a new node to the main ring. + $newRing = $serverList; // add a new node to the main ring. } public function testReadUsingFallbackMechanism() { @@ -178,9 +173,145 @@ class Redis_Rehashing_Test extends PHPUnit_TestCase } } -$suite = new PHPUnit_TestSuite("Redis_Rehashing_Test"); -$result = PHPUnit::run($suite); +// Test auto-migration of keys +class Redis_Auto_Rehashing_Test extends PHPUnit_TestCase { + + public $ra = NULL; + + // data + private $strings; + + public function setUp() + { + // initialize strings. + $n = 1000; + $this->strings = array(); + for($i = 0; $i < $n; $i++) { + $this->strings['key-'.$i] = 'val-'.$i; + } + + global $newRing, $oldRing; + + // create array + $this->ra = new RedisArray($newRing, array('previous' => $oldRing, 'index' => TRUE, 'autorehash' => TRUE)); + } + + public function testDistribute() { + // strings + foreach($this->strings as $k => $v) { + $this->ra->set($k, $v); + } + } + + private function readAllvalues() { + foreach($this->strings as $k => $v) { + $this->assertTrue($this->ra->get($k) === $v); + } + } + + + public function testReadAll() { + $this->readAllvalues(); + } + + // add a new node. + public function testCreateSecondRing() { + + global $newRing, $oldRing, $serverList; + $oldRing = $newRing; // back up the original. + $newRing = $serverList; // add a new node to the main ring. + } + + // Read and migrate keys on fallback, causing the whole ring to be rehashed. + public function testReadAndMigrateAll() { + $this->readAllvalues(); + } + + // Read and migrate keys on fallback, causing the whole ring to be rehashed. + public function testAllKeysHaveBeenMigrated() { + foreach($this->strings as $k => $v) { + // get the target for each key + $target = $this->ra->_target($k); + + // connect to the target host + list($host,$port) = split(':', $target); + $r = new Redis; + $r->connect($host, $port); + + $this->assertTrue($v === $r->get($k)); // check that the key has actually been migrated to the new node. + } + } +} + +// Test node-specific multi/exec +class Redis_Multi_Exec_Test extends PHPUnit_TestCase { + + public $ra = NULL; + + public function setUp() + { + global $newRing, $oldRing; + // create array + $this->ra = new RedisArray($newRing, array('previous' => $oldRing, 'index' => FALSE)); + } + + public function testInit() { + $this->ra->set('group:managers', 2); + $this->ra->set('group:executives', 3); + + $this->ra->set('1_{employee:joe}_name', 'joe'); + $this->ra->set('1_{employee:joe}_group', 2); + $this->ra->set('1_{employee:joe}_salary', 2000); + } + + public function testKeyDistribution() { + // check that all of joe's keys are on the same instance + $lastNode = NULL; + foreach(array('name', 'group', 'salary') as $field) { + $node = $this->ra->_target('1_{employee:joe}_'.$field); + if($lastNode) { + $this->assertTrue($node === $lastNode); + } + $lastNode = $node; + } + } + + public function testMultiExec() { + + $newGroup = $this->ra->get('group:executives'); + $newSalary = 4000; + + // change both in a transaction. + $host = $this->ra->_target('{employee:joe}'); // transactions are per-node, so we need a reference to it. + $tr = $this->ra->multi($host) + ->set('1_{employee:joe}_group', $newGroup) + ->set('1_{employee:joe}_salary', $newSalary) + ->exec(); + + // check that the group and salary have been changed + $this->assertTrue($this->ra->get('1_{employee:joe}_group') === $newGroup); + $this->assertTrue($this->ra->get('1_{employee:joe}_salary') == $newSalary); + } + +} + + +function run_tests($className) { + // reset rings + global $newRing, $oldRing, $serverList; + $newRing = array('localhost:6379', 'localhost:6380', 'localhost:6381'); + $oldRing = array(); + $serverList = array('localhost:6379', 'localhost:6380', 'localhost:6381', 'localhost:6382'); + + // run + $suite = new PHPUnit_TestSuite($className); + $result = PHPUnit::run($suite); + echo $result->toString(); + echo "\n"; +} -echo $result->toString(); +run_tests('Redis_Rehashing_Test'); +run_tests('Redis_Auto_Rehashing_Test'); +run_tests('Redis_Multi_Exec_Test'); ?> |